Browse Source

refactoring

pull/1255/head
Chris Lu 5 years ago
parent
commit
b97768c51c
  1. 14
      weed/replication/sink/filersink/fetch_write.go
  2. 19
      weed/replication/sink/filersink/filer_sink.go

14
weed/replication/sink/filersink/fetch_write.go

@ -3,7 +3,6 @@ package filersink
import (
"context"
"fmt"
"strings"
"sync"
"google.golang.org/grpc"
@ -69,7 +68,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string)
var host string
var auth security.EncodedJwt
if err := fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@ -114,7 +113,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string)
return
}
func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
@ -122,11 +121,6 @@ func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error)
}, fs.grpcAddress, fs.grpcDialOption)
}
func volumeId(fileId string) string {
lastCommaIndex := strings.LastIndex(fileId, ",")
if lastCommaIndex > 0 {
return fileId[:lastCommaIndex]
}
return fileId
func (fs *FilerSink) AdjustedUrl(hostAndPort string) string {
return hostAndPort
}

19
weed/replication/sink/filersink/filer_sink.go

@ -65,30 +65,21 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string,
}
func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error {
return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir, name := util.FullPath(key).DirAndName()
request := &filer_pb.DeleteEntryRequest{
Directory: dir,
Name: name,
IsDeleteData: deleteIncludeChunks,
}
glog.V(1).Infof("delete entry: %v", request)
_, err := client.DeleteEntry(context.Background(), request)
glog.V(1).Infof("delete entry: %v", key)
err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, false, false)
if err != nil {
glog.V(0).Infof("delete entry %s: %v", key, err)
return fmt.Errorf("delete entry %s: %v", key, err)
}
return nil
})
}
func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error {
return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir, name := util.FullPath(key).DirAndName()
@ -140,7 +131,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
// read existing entry
var existingEntry *filer_pb.Entry
err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
@ -192,7 +183,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
}
// save updated meta data
return true, fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
return true, fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: newParentPath,

Loading…
Cancel
Save