diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 74f3a72bb..07b091073 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -3,7 +3,6 @@ package filersink import ( "context" "fmt" - "strings" "sync" "google.golang.org/grpc" @@ -69,7 +68,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) var host string var auth security.EncodedJwt - if err := fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -114,7 +113,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) return } -func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) @@ -122,11 +121,6 @@ func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) }, fs.grpcAddress, fs.grpcDialOption) } - -func volumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] - } - return fileId +func (fs *FilerSink) AdjustedUrl(hostAndPort string) string { + return hostAndPort } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index ffce853b8..5f055f9d1 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -65,30 +65,21 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string, } func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { - return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - dir, name := util.FullPath(key).DirAndName() - - request := &filer_pb.DeleteEntryRequest{ - Directory: dir, - Name: name, - IsDeleteData: deleteIncludeChunks, - } - - glog.V(1).Infof("delete entry: %v", request) - _, err := client.DeleteEntry(context.Background(), request) - if err != nil { - glog.V(0).Infof("delete entry %s: %v", key, err) - return fmt.Errorf("delete entry %s: %v", key, err) - } + dir, name := util.FullPath(key).DirAndName() - return nil - }) + glog.V(1).Infof("delete entry: %v", key) + err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, false, false) + if err != nil { + glog.V(0).Infof("delete entry %s: %v", key, err) + return fmt.Errorf("delete entry %s: %v", key, err) + } + return nil } func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { - return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { dir, name := util.FullPath(key).DirAndName() @@ -140,7 +131,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent // read existing entry var existingEntry *filer_pb.Entry - err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, @@ -192,7 +183,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent } // save updated meta data - return true, fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return true, fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: newParentPath,