From 9f9ef1340c6441c10c15e2642b5074d34fe40332 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 26 Dec 2021 00:15:03 -0800 Subject: [PATCH] use streaming mode for long poll grpc calls streaming mode would create separate grpc connections for each call. this is to ensure the long poll connections are properly closed. --- .../diff_volume_servers.go | 4 +- .../load_test_meta_tail.go | 2 +- .../stream_read_volume/stream_read_volume.go | 11 ++- weed/command/filer_cat.go | 2 +- weed/command/filer_copy.go | 14 ++-- weed/command/filer_meta_backup.go | 4 +- weed/command/filer_remote_gateway.go | 6 +- weed/command/filer_remote_sync.go | 4 +- weed/command/filer_remote_sync_dir.go | 2 +- weed/command/filer_sync.go | 4 +- weed/command/iam.go | 2 +- weed/command/master_follower.go | 2 +- weed/command/mount_std.go | 2 +- weed/command/msg_broker.go | 2 +- weed/command/s3.go | 2 +- weed/command/upload.go | 2 +- weed/command/webdav.go | 2 +- weed/filer/filer.go | 4 +- weed/filer/filer_conf.go | 2 +- weed/filer/filer_delete_entry.go | 2 +- weed/filer/meta_aggregator.go | 6 +- weed/filer/read_remote.go | 2 +- weed/filer/reader_at.go | 2 +- weed/filer/remote_mapping.go | 10 +-- weed/filer/remote_storage.go | 2 +- weed/filesys/dir.go | 6 +- weed/filesys/dir_link.go | 4 +- weed/filesys/dir_rename.go | 2 +- weed/filesys/file.go | 4 +- weed/filesys/filehandle.go | 2 +- weed/filesys/wfs.go | 2 +- weed/filesys/wfs_filer_client.go | 4 +- weed/filesys/wfs_write.go | 2 +- weed/iamapi/iamapi_server.go | 42 +++++------ weed/messaging/broker/broker_append.go | 8 +- .../broker/broker_grpc_server_discovery.go | 6 +- weed/messaging/broker/broker_server.go | 10 +-- weed/operation/assign_file_id.go | 4 +- weed/operation/delete_content.go | 2 +- weed/operation/grpc_client.go | 8 +- weed/operation/lookup.go | 2 +- weed/operation/sync_volume.go | 2 +- weed/operation/tail_volume.go | 2 +- weed/pb/filer_pb/filer_client.go | 18 ++--- weed/pb/filer_pb_tail.go | 4 +- weed/pb/grpc_client_server.go | 73 +++++++++++-------- weed/remote_storage/track_sync_offset.go | 4 +- weed/replication/replicator.go | 2 +- .../replication/sink/filersink/fetch_write.go | 6 +- weed/replication/sink/filersink/filer_sink.go | 6 +- weed/replication/source/filer_source.go | 6 +- weed/s3api/auth_credentials.go | 2 +- weed/s3api/filer_util.go | 2 +- weed/s3api/filer_util_tags.go | 6 +- weed/s3api/s3api_bucket_handlers.go | 4 +- weed/s3api/s3api_handlers.go | 4 +- weed/s3api/s3api_object_handlers.go | 2 +- weed/s3api/s3api_objects_list_handlers.go | 2 +- weed/server/filer_grpc_server.go | 6 +- weed/server/filer_grpc_server_remote.go | 2 +- weed/server/filer_server.go | 2 +- weed/server/master_grpc_server_collection.go | 4 +- weed/server/master_server_handlers_admin.go | 2 +- weed/server/volume_grpc_client_to_master.go | 2 +- weed/server/volume_grpc_copy.go | 4 +- weed/server/volume_grpc_erasure_coding.go | 2 +- weed/server/webdav_server.go | 14 ++-- weed/shell/command_cluster_ps.go | 2 +- weed/shell/command_collection_delete.go | 2 +- weed/shell/command_collection_list.go | 2 +- weed/shell/command_ec_common.go | 8 +- weed/shell/command_ec_decode.go | 10 +-- weed/shell/command_ec_encode.go | 2 +- weed/shell/command_ec_rebuild.go | 4 +- weed/shell/command_fs_cat.go | 2 +- weed/shell/command_fs_configure.go | 2 +- weed/shell/command_fs_meta_cat.go | 2 +- weed/shell/command_fs_meta_load.go | 2 +- weed/shell/command_fs_mkdir.go | 2 +- weed/shell/command_fs_mv.go | 2 +- weed/shell/command_fs_rm.go | 2 +- weed/shell/command_remote_configure.go | 4 +- weed/shell/command_remote_meta_sync.go | 2 +- weed/shell/command_remote_mount.go | 2 +- weed/shell/command_remote_uncache.go | 2 +- weed/shell/command_remote_unmount.go | 2 +- weed/shell/command_s3_bucket_create.go | 2 +- weed/shell/command_s3_bucket_delete.go | 2 +- weed/shell/command_s3_bucket_list.go | 2 +- weed/shell/command_s3_configure.go | 4 +- weed/shell/command_volume_check_disk.go | 6 +- .../command_volume_configure_replication.go | 2 +- weed/shell/command_volume_fix_replication.go | 2 +- weed/shell/command_volume_fsck.go | 2 +- weed/shell/command_volume_mount.go | 2 +- weed/shell/command_volume_move.go | 12 +-- weed/shell/command_volume_server_leave.go | 2 +- weed/shell/command_volume_tier_download.go | 2 +- weed/shell/command_volume_tier_upload.go | 2 +- weed/shell/command_volume_unmount.go | 2 +- weed/shell/command_volume_vacuum.go | 2 +- weed/shell/commands.go | 4 +- weed/shell/shell_liner.go | 4 +- weed/storage/store_ec.go | 4 +- weed/storage/store_ec_delete.go | 2 +- weed/storage/volume_backup.go | 2 +- weed/topology/allocate_volume.go | 2 +- weed/topology/topology_vacuum.go | 8 +- .../exclusive_locks/exclusive_locker.go | 6 +- weed/wdclient/masterclient.go | 8 +- 110 files changed, 268 insertions(+), 262 deletions(-) diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index e8361a6cf..0188d18d4 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -122,7 +122,7 @@ type needleState struct { func getVolumeFiles(v uint32, addr pb.ServerAddress) (map[types.NeedleId]needleState, int64, error) { var idxFile *bytes.Reader - err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() copyFileClient, err := vs.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ @@ -180,7 +180,7 @@ func getVolumeFiles(v uint32, addr pb.ServerAddress) (map[types.NeedleId]needleS func getNeedleFileId(v uint32, nid types.NeedleId, addr pb.ServerAddress) (string, error) { var id string - err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{ VolumeId: v, NeedleId: uint64(nid), diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go index 8ccad1e49..faaea9e8d 100644 --- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go +++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go @@ -51,7 +51,7 @@ func main() { } func startGenerateMetadata() { - pb.WithFilerClient(pb.ServerAddress(*tailFiler), grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, pb.ServerAddress(*tailFiler), grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error { for i := 0; i < *n; i++ { name := fmt.Sprintf("file%d", i) diff --git a/unmaintained/stream_read_volume/stream_read_volume.go b/unmaintained/stream_read_volume/stream_read_volume.go index e120b9920..bbe5abedb 100644 --- a/unmaintained/stream_read_volume/stream_read_volume.go +++ b/unmaintained/stream_read_volume/stream_read_volume.go @@ -15,9 +15,9 @@ import ( ) var ( - volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server") - volumeId = flag.Int("volumeId", -1, "a volume id to stream read") - grpcDialOption grpc.DialOption + volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server") + volumeId = flag.Int("volumeId", -1, "a volume id to stream read") + grpcDialOption grpc.DialOption ) func main() { @@ -33,11 +33,11 @@ func main() { return nil } - err := operation.WithVolumeServerClient(pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() copyFileClient, err := vs.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{ - VolumeIds: []uint32{vid}, + VolumeIds: []uint32{vid}, }) if err != nil { return err @@ -61,4 +61,3 @@ func main() { } } - diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go index 71c3a48d6..7f613f72b 100644 --- a/weed/command/filer_cat.go +++ b/weed/command/filer_cat.go @@ -97,7 +97,7 @@ func runFilerCat(cmd *Command, args []string) bool { writer = f } - pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 8a8701828..88ae55e84 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -172,7 +172,7 @@ func runCopy(cmd *Command, args []string) bool { } func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) @@ -302,7 +302,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi return } - err = pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: task.destinationUrlPath, @@ -344,7 +344,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err err = util.Retry("upload", func() error { // assign a volume - assignErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + assignErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -404,7 +404,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err } - if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -461,7 +461,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, var assignResult *filer_pb.AssignVolumeResponse var assignError error err := util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, Replication: *worker.options.replication, @@ -540,7 +540,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, return fmt.Errorf("create manifest: %v", manifestErr) } - if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -598,7 +598,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off var fileId, host string var auth security.EncodedJwt - if flushErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 0b8fa76c6..d52ed3349 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -222,9 +222,9 @@ func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) err var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{}) -func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go index 9426f3841..fa0239558 100644 --- a/weed/command/filer_remote_gateway.go +++ b/weed/command/filer_remote_gateway.go @@ -32,8 +32,8 @@ type RemoteGatewayOptions struct { var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) -func (option *RemoteGatewayOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } @@ -87,7 +87,7 @@ func runFilerRemoteGateway(cmd *Command, args []string) bool { remoteGatewayOptions.bucketsDir = "/buckets" // check buckets again - remoteGatewayOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { + remoteGatewayOptions.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index bceeb097e..681ea35e9 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -22,8 +22,8 @@ type RemoteSyncOptions struct { var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) -func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 30782942e..947f526bb 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -227,7 +227,7 @@ func shouldSendToRemote(entry *filer_pb.Entry) bool { func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano() entry.RemoteEntry = remoteEntry - return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ Directory: dir, Entry: entry, diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 20755dbe5..230b24a52 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -183,7 +183,7 @@ const ( func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { - readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) @@ -209,7 +209,7 @@ func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature } func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error { - return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) diff --git a/weed/command/iam.go b/weed/command/iam.go index ebe9657f2..8fb14be06 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -48,7 +48,7 @@ func (iamopt *IamOptions) startIamServer() bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go index 95f1c80b8..6d7aa2848 100644 --- a/weed/command/master_follower.go +++ b/weed/command/master_follower.go @@ -87,7 +87,7 @@ func startMasterFollower(masterOptions MasterOptions) { var err error grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master") for i := 0; i < 10; i++ { - err = pb.WithOneOfGrpcMasterClients(masters, grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithOneOfGrpcMasterClients(false, masters, grpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master grpc address %v configuration: %v", masters, err) diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 2603260a2..ce9a998f6 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -78,7 +78,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { var cipher bool var err error for i := 0; i < 10; i++ { - err = pb.WithOneOfGrpcFilerClients(filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err) diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index 61517ab39..35d59ea20 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -68,7 +68,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { cipher := false for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/s3.go b/weed/command/s3.go index d7cd7818d..ee726fcec 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -153,7 +153,7 @@ func (s3opt *S3Options) startS3Server() bool { var metricsIntervalSec int for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/upload.go b/weed/command/upload.go index f46e70cb1..f2b0b7fe4 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -130,7 +130,7 @@ func runUpload(cmd *Command, args []string) bool { } func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress pb.ServerAddress) (replication string, err error) { - err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithMasterClient(false, masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", masterAddress, err) diff --git a/weed/command/webdav.go b/weed/command/webdav.go index bf4609d63..319302175 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -85,7 +85,7 @@ func (wo *WebDavOption) startWebDav() bool { var cipher bool // connect to filer for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 9eabbc337..7ca198b38 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -79,9 +79,9 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress) { } -func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate){ +func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) { - if grpcErr := pb.WithMasterClient(f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, }) diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go index 1229f0139..45c368b9b 100644 --- a/weed/filer/filer_conf.go +++ b/weed/filer/filer_conf.go @@ -32,7 +32,7 @@ type FilerConf struct { func ReadFilerConf(filerGrpcAddress pb.ServerAddress, grpcDialOption grpc.DialOption, masterClient *wdclient.MasterClient) (*FilerConf, error) { var buf bytes.Buffer - if err := pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if masterClient != nil { return ReadEntry(masterClient, client, DirectoryEtcSeaweedFS, FilerConfName, &buf) } else { diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index cc33811aa..bda69b15f 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -139,7 +139,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou func (f *Filer) doDeleteCollection(collectionName string) (err error) { - return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + return f.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: collectionName, }) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index bb2c947e5..282668146 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -72,7 +72,7 @@ func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) { delete(ma.peerStatues, address) } } -func (ma *MetaAggregator) isActive(address pb.ServerAddress)(isActive bool) { +func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { ma.peerStatuesLock.Lock() defer ma.peerStatuesLock.Unlock() _, isActive = ma.peerStatues[address] @@ -152,7 +152,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p for { glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs)) - err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ @@ -194,7 +194,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p } func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) { - err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go index a3fb48ae0..6372dac72 100644 --- a/weed/filer/read_remote.go +++ b/weed/filer/read_remote.go @@ -26,7 +26,7 @@ func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remot } func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error { - return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, err := client.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{ Directory: string(parent), Name: entry.Name, diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 68594cb03..b73526761 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -44,7 +44,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp if !found { util.Retry("lookup volume "+vid, func() error { - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, }) diff --git a/weed/filer/remote_mapping.go b/weed/filer/remote_mapping.go index c95c4e5bd..b0534e2ca 100644 --- a/weed/filer/remote_mapping.go +++ b/weed/filer/remote_mapping.go @@ -11,7 +11,7 @@ import ( func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error) { var oldContent []byte - if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) return readErr }); readErr != nil { @@ -30,7 +30,7 @@ func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStor // read current mapping var oldContent, newContent []byte - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) return err }) @@ -47,7 +47,7 @@ func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStor } // save back - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent) }) if err != nil { @@ -61,7 +61,7 @@ func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error // read current mapping var oldContent, newContent []byte - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) return err }) @@ -78,7 +78,7 @@ func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error } // save back - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent) }) if err != nil { diff --git a/weed/filer/remote_storage.go b/weed/filer/remote_storage.go index 9d682b698..5362ba738 100644 --- a/weed/filer/remote_storage.go +++ b/weed/filer/remote_storage.go @@ -133,7 +133,7 @@ func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.Remo func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) { var oldContent []byte - if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) return readErr }); readErr != nil { diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index cedcf2d76..9d935e53c 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -201,7 +201,7 @@ func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, ex } glog.V(1).Infof("create %s/%s", dirFullPath, name) - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(request.Entry) defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) @@ -242,7 +242,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err dirFullPath := dir.FullPath() - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(newEntry) defer dir.wfs.mapPbIdFromFilerToLocal(newEntry) @@ -566,7 +566,7 @@ func (dir *Dir) saveEntry(entry *filer_pb.Entry) error { parentDir, name := util.FullPath(dir.FullPath()).DirAndName() - return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(entry) defer dir.wfs.mapPbIdFromFilerToLocal(entry) diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index acdcd2de4..68f5a79e2 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -68,7 +68,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f } // apply changes to the filer, and also apply to local metaCache - err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(request.Entry) defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) @@ -121,7 +121,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, Signatures: []int32{dir.wfs.signature}, } - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(request.Entry) defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 1ee6922d8..d8ea3e459 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -22,7 +22,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath) // update remote filer - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(true, func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 767841f9d..bbc5c8e21 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -331,7 +331,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { } func (file *File) saveEntry(entry *filer_pb.Entry) error { - return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return file.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { file.wfs.mapPbIdFromLocalToFiler(entry) defer file.wfs.mapPbIdFromFilerToLocal(entry) @@ -362,7 +362,7 @@ func (file *File) getEntry() *filer_pb.Entry { } func (file *File) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) { - err := file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := file.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CacheRemoteObjectToLocalClusterRequest{ Directory: file.dir.FullPath(), diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 2aa4de6da..607b901ff 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -277,7 +277,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { return nil } - err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := fh.f.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { entry := fh.f.getEntry() if entry == nil { diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index aa4f9dacd..127c160c4 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -197,7 +197,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. if wfs.stats.lastChecked < time.Now().Unix()-20 { - err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.StatisticsRequest{ Collection: wfs.option.Collection, diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go index cce6aa1a1..4feef867e 100644 --- a/weed/filesys/wfs_filer_client.go +++ b/weed/filesys/wfs_filer_client.go @@ -11,7 +11,7 @@ import ( var _ = filer_pb.FilerClient(&WFS{}) -func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { +func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { return util.Retry("filer grpc", func() error { @@ -20,7 +20,7 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err for x := 0; x < n; x++ { filerGrpcAddress := wfs.option.FilerAddresses[i].ToGrpcAddress() - err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerGrpcAddress, wfs.option.GrpcDialOption) diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index 61a463e88..17489547c 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -19,7 +19,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun var fileId, host string var auth security.EncodedJwt - if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return util.Retry("assignVolume", func() error { request := &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go index ad1a8c4a2..fc0e6b700 100644 --- a/weed/iamapi/iamapi_server.go +++ b/weed/iamapi/iamapi_server.go @@ -76,7 +76,7 @@ func (iama *IamApiServer) registerRouter(router *mux.Router) { func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { var buf bytes.Buffer - err = pb.WithGrpcFilerClient(iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirecotry, filer.IamIdentityFile, &buf); err != nil { return err } @@ -98,24 +98,20 @@ func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat if err := filer.ProtoToText(&buf, s3cfg); err != nil { return fmt.Errorf("ProtoToText: %s", err) } - return pb.WithGrpcFilerClient( - iam.option.Filer, - iam.option.GrpcDialOption, - func(client filer_pb.SeaweedFilerClient) error { - err = util.Retry("saveIamIdentity", func() error { - return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes()) - }) - if err != nil { - return err - } - return nil - }, - ) + return pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = util.Retry("saveIamIdentity", func() error { + return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes()) + }) + if err != nil { + return err + } + return nil + }) } func (iam IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) { var buf bytes.Buffer - err = pb.WithGrpcFilerClient(iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirecotry, filer.IamPoliciesFile, &buf); err != nil { return err } @@ -139,14 +135,10 @@ func (iam IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) { if b, err = json.Marshal(policies); err != nil { return err } - return pb.WithGrpcFilerClient( - iam.option.Filer, - iam.option.GrpcDialOption, - func(client filer_pb.SeaweedFilerClient) error { - if err := filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamPoliciesFile, b); err != nil { - return err - } - return nil - }, - ) + return pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamPoliciesFile, b); err != nil { + return err + } + return nil + }) } diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go index 9958a0752..9a31a8ac0 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -24,7 +24,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag dir, name := util.FullPath(targetFile).DirAndName() // append the chunk - if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AppendToEntryRequest{ Directory: dir, @@ -51,7 +51,7 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf var assignResult = &operation.AssignResult{} // assign a volume location - if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { assignErr := util.Retry("assignVolume", func() error { request := &filer_pb.AssignVolumeRequest{ @@ -108,10 +108,10 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf var _ = filer_pb.FilerClient(&MessageBroker{}) -func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { +func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { for _, filer := range broker.option.Filers { - if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil { + if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil { if err == io.EOF { return } diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go index 66821d404..5cd8edd33 100644 --- a/weed/messaging/broker/broker_grpc_server_discovery.go +++ b/weed/messaging/broker/broker_grpc_server_discovery.go @@ -34,7 +34,7 @@ func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition) for _, filer := range broker.option.Filers { - err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{ Resource: targetTopicPartition, }) @@ -68,7 +68,7 @@ func (broker *MessageBroker) checkFilers() { found := false for !found { for _, filer := range broker.option.Filers { - err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err @@ -93,7 +93,7 @@ func (broker *MessageBroker) checkFilers() { found = false for !found { for _, master := range masters { - err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { + err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, }) diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go index 193c1c689..acf2d6d34 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -49,7 +49,7 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { for { for _, filer := range broker.option.Filers { - broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.KeepConnected(ctx) @@ -101,15 +101,15 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { } -func (broker *MessageBroker) withFilerClient(filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { +func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(filer, broker.grpcDialOption, fn) + return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn) } -func (broker *MessageBroker) withMasterClient(master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { +func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { - return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { return fn(client) }) diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index f0f7581f3..b716300e2 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -48,7 +48,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest continue } - lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: request.Count, @@ -105,7 +105,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) { - WithMasterServerClient(master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ VolumeOrFileIds: []string{fileId}, diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index d762f51e1..996c0b29e 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -123,7 +123,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc func DeleteFilesAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) { - err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { req := &volume_server_pb.BatchDeleteRequest{ FileIds: fileIds, diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index 743682203..9b68d2286 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -8,18 +8,18 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) -func WithVolumeServerClient(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { +func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) }, volumeServer.ToGrpcAddress(), grpcDialOption) } -func WithMasterServerClient(masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { +func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterServer.ToGrpcAddress(), grpcDialOption) diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index c222dff92..1eb5dd320 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -79,7 +79,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids //only query unknown_vids - err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeOrFileIds: unknown_vids, diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go index fdd22ac85..de71a198d 100644 --- a/weed/operation/sync_volume.go +++ b/weed/operation/sync_volume.go @@ -9,7 +9,7 @@ import ( func GetVolumeSyncStatus(server pb.ServerAddress, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { - WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{ VolumeId: vid, diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index bedeeb3b5..d3449873b 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -28,7 +28,7 @@ func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle } func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { - return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return WithVolumeServerClient(true, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index 719039d88..266bdae51 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -20,7 +20,7 @@ var ( ) type FilerClient interface { - WithFilerClient(fn func(SeaweedFilerClient) error) error + WithFilerClient(streamingMode bool, fn func(SeaweedFilerClient) error) error AdjustedUrl(location *Location) string } @@ -28,7 +28,7 @@ func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry dir, name := fullFilePath.DirAndName() - err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { request := &LookupDirectoryEntryRequest{ Directory: dir, @@ -83,13 +83,13 @@ func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefi } func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) { - return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit) }) } func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) { - return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { return doSeaweedList(client, fullDirPath, prefix, fn, startFrom, inclusive, limit) }) } @@ -157,7 +157,7 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) { - err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { request := &LookupDirectoryEntryRequest{ Directory: parentDirectoryPath, @@ -185,7 +185,7 @@ func Exists(filerClient FilerClient, parentDirectoryPath string, entryName strin func Touch(filerClient FilerClient, parentDirectoryPath string, entryName string, entry *Entry) (err error) { - return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { request := &UpdateEntryRequest{ Directory: parentDirectoryPath, @@ -204,7 +204,7 @@ func Touch(filerClient FilerClient, parentDirectoryPath string, entryName string } func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error { - return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { return DoMkdir(client, parentDirectoryPath, dirName, fn) }) } @@ -241,7 +241,7 @@ func DoMkdir(client SeaweedFilerClient, parentDirectoryPath string, dirName stri } func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk, fn func(entry *Entry)) error { - return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { entry := &Entry{ Name: fileName, @@ -276,7 +276,7 @@ func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string } func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error { - return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { return DoRemove(client, parentDirectoryPath, name, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster, signatures) }) } diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index 9909e035b..1db9883a4 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -16,7 +16,7 @@ func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, fatalOnError bool) error { - err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(clientName, + err := WithFilerClient(true, filerAddress, grpcDialOption, makeFunc(clientName, pathPrefix, additionalPathPrefixes, &lastTsNs, selfSignature, processEventFn, fatalOnError)) if err != nil { return fmt.Errorf("subscribing filer meta change: %v", err) @@ -28,7 +28,7 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, clientName string, pathPrefix string, lastTsNs *int64, selfSignature int32, processEventFn ProcessMetadataFunc, fatalOnError bool) error { - err := filerClient.WithFilerClient(makeFunc(clientName, + err := filerClient.WithFilerClient(true, makeFunc(clientName, pathPrefix, nil, lastTsNs, selfSignature, processEventFn, fatalOnError)) if err != nil { return fmt.Errorf("subscribing filer meta change: %v", err) diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 08fb7fac7..532c66932 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -97,7 +97,8 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG return existingConnection, nil } - grpcConnection, err := GrpcDial(context.Background(), address, opts...) + ctx := context.Background() + grpcConnection, err := GrpcDial(ctx, address, opts...) if err != nil { return nil, fmt.Errorf("fail to dial %s: %v", address, err) } @@ -112,28 +113,42 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG return vgc, nil } -func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { +// WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection. +func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { - vgc, err := getOrCreateConnection(address, opts...) - if err != nil { - return fmt.Errorf("getOrCreateConnection %s: %v", address, err) - } - executionErr := fn(vgc.ClientConn) - if executionErr != nil { - if strings.Contains(executionErr.Error(), "transport") || - strings.Contains(executionErr.Error(), "connection closed") { - grpcClientsLock.Lock() - if t, ok := grpcClients[address]; ok { - if t.version == vgc.version { - vgc.Close() - delete(grpcClients, address) + if !streamingMode { + vgc, err := getOrCreateConnection(address, opts...) + if err != nil { + return fmt.Errorf("getOrCreateConnection %s: %v", address, err) + } + executionErr := fn(vgc.ClientConn) + if executionErr != nil { + if strings.Contains(executionErr.Error(), "transport") || + strings.Contains(executionErr.Error(), "connection closed") { + grpcClientsLock.Lock() + if t, ok := grpcClients[address]; ok { + if t.version == vgc.version { + vgc.Close() + delete(grpcClients, address) + } } + grpcClientsLock.Unlock() } - grpcClientsLock.Unlock() } + return executionErr + } else { + grpcConnection, err := GrpcDial(context.Background(), address, opts...) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", address, err) + } + defer grpcConnection.Close() + executionErr := fn(grpcConnection) + if executionErr != nil { + return executionErr + } + return nil } - return executionErr } func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) { @@ -184,18 +199,18 @@ func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) { return util.JoinHostPort(host, port) } -func WithMasterClient(master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { - return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { +func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { + return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, master.ToGrpcAddress(), grpcDialOption) } -func WithOneOfGrpcMasterClients(masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) { +func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) { for _, masterGrpcAddress := range masterGrpcAddresses { - err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterGrpcAddress.ToGrpcAddress(), grpcDialOption) @@ -207,34 +222,34 @@ func WithOneOfGrpcMasterClients(masterGrpcAddresses []ServerAddress, grpcDialOpt return err } -func WithBrokerGrpcClient(brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error { +func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error { - return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := messaging_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) }, brokerGrpcAddress, grpcDialOption) } -func WithFilerClient(filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithFilerClient(streamingMode bool, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { - return WithGrpcFilerClient(filer, grpcDialOption, fn) + return WithGrpcFilerClient(streamingMode, filer, grpcDialOption, fn) } -func WithGrpcFilerClient(filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { - return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerGrpcAddress.ToGrpcAddress(), grpcDialOption) } -func WithOneOfGrpcFilerClients(filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) { +func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) { for _, filerAddress := range filerAddresses { - err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerAddress.ToGrpcAddress(), grpcDialOption) diff --git a/weed/remote_storage/track_sync_offset.go b/weed/remote_storage/track_sync_offset.go index 439491d12..25ac0d340 100644 --- a/weed/remote_storage/track_sync_offset.go +++ b/weed/remote_storage/track_sync_offset.go @@ -17,7 +17,7 @@ func GetSyncOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, dir s dirHash := uint32(util.HashStringToLong(dir)) - readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(SyncKeyPrefix + "____") util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash) @@ -46,7 +46,7 @@ func SetSyncOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, dir s dirHash := uint32(util.HashStringToLong(dir)) - return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(SyncKeyPrefix + "____") util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash) diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 3e100497f..eaab2c13e 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -83,7 +83,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p } func ReadFilerSignature(grpcDialOption grpc.DialOption, filer pb.ServerAddress) (filerSignature int32, readErr error) { - if readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}); err != nil { return fmt.Errorf("GetFilerConfiguration %s: %v", filer, err) } else { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 4c536b71c..825d2af95 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -70,7 +70,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) var host string var auth security.EncodedJwt - if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return util.Retry("assignVolume", func() error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -131,9 +131,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) var _ = filer_pb.FilerClient(&FilerSink{}) -func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSink) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, fs.grpcDialOption) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index d42ca63a8..c48ab2368 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -100,7 +100,7 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { - return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir, name := util.FullPath(key).DirAndName() @@ -156,7 +156,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent // read existing entry var existingEntry *filer_pb.Entry - err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, @@ -211,7 +211,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent } // save updated meta data - return true, fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return true, fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: newParentPath, diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 60c33463f..4108f3821 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -56,7 +56,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) vid := volumeId(part) - err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, @@ -118,9 +118,9 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea var _ = filer_pb.FilerClient(&FilerSource{}) -func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSource) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, fs.grpcDialOption) diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 0d46ad7ca..87d478136 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -84,7 +84,7 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) (err error) { var content []byte - err = pb.WithFilerClient(option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { content, err = filer.ReadInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile) return err }) diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 888003e45..d227c609e 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -41,7 +41,7 @@ func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, incl func (s3a *S3ApiServer) rm(parentDirectoryPath, entryName string, isDeleteData, isRecursive bool) error { - return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) if err != nil { diff --git a/weed/s3api/filer_util_tags.go b/weed/s3api/filer_util_tags.go index 75d3b37d0..e45230165 100644 --- a/weed/s3api/filer_util_tags.go +++ b/weed/s3api/filer_util_tags.go @@ -13,7 +13,7 @@ const ( func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (tags map[string]string, err error) { - err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ Directory: parentDirectoryPath, @@ -35,7 +35,7 @@ func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (t func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, tags map[string]string) (err error) { - return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ Directory: parentDirectoryPath, @@ -71,7 +71,7 @@ func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, ta func (s3a *S3ApiServer) rmTags(parentDirectoryPath string, entryName string) (err error) { - return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ Directory: parentDirectoryPath, diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 247e33104..c3e50ec44 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -83,7 +83,7 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) // avoid duplicated buckets errCode := s3err.ErrNone - if err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { if resp, err := client.CollectionList(context.Background(), &filer_pb.CollectionListRequest{ IncludeEcVolumes: true, IncludeNormalVolumes: true, @@ -146,7 +146,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque return } - err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // delete collection deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{ diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index e42fb6c44..4ace4bb21 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -13,9 +13,9 @@ import ( var _ = filer_pb.FilerClient(&S3ApiServer{}) -func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, s3a.option.Filer.ToGrpcAddress(), s3a.option.GrpcDialOption) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 2ac9c8102..d78332395 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -233,7 +233,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h if s3err.Logger != nil { auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) } - s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // delete file entries for _, object := range deleteObjects.Objects { diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 4decb5eac..a3b858dcb 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -146,7 +146,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m var nextMarker string // check filer - err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, func(dir string, entry *filer_pb.Entry) { if entry.IsDirectory { diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index a0385f487..8e6cd8451 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -327,7 +327,7 @@ func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.Collect glog.V(4).Infof("CollectionList %v", req) resp = &filer_pb.CollectionListResponse{} - err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{ IncludeNormalVolumes: req.IncludeNormalVolumes, IncludeEcVolumes: req.IncludeEcVolumes, @@ -348,7 +348,7 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet glog.V(4).Infof("DeleteCollection %v", req) - err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: req.GetCollection(), }) @@ -362,7 +362,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR var output *master_pb.StatisticsResponse - err = fs.filer.MasterClient.WithClient(func(masterClient master_pb.SeaweedClient) error { + err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error { grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{ Replication: req.Replication, Collection: req.Collection, diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 7f31d8cc1..3be986023 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -123,7 +123,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req // tell filer to tell volume server to download into needles assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort) - err = operation.WithVolumeServerClient(assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ VolumeId: uint32(fileId.VolumeId), NeedleId: uint64(fileId.Key), diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index c9343a9bf..1a5f80369 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -165,7 +165,7 @@ func (fs *FilerServer) checkWithMaster() { isConnected := false for !isConnected { for _, master := range fs.option.Masters { - readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", master, err) diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go index 55f3faf8c..654da6b3c 100644 --- a/weed/server/master_grpc_server_collection.go +++ b/weed/server/master_grpc_server_collection.go @@ -58,7 +58,7 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error { } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) @@ -78,7 +78,7 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error { listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName) for _, server := range listOfEcServers { - err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 41a2b570b..72d4e20d7 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -27,7 +27,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collection.Name, }) diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 2659307fc..f3f99ee7b 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -26,7 +26,7 @@ func (vs *VolumeServer) GetMaster() pb.ServerAddress { func (vs *VolumeServer) checkWithMaster() (err error) { for { for _, master := range vs.SeedMasterNodes { - err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err = operation.WithMasterServerClient(false, master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", master, err) diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 9d9f756ce..52181a771 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -45,7 +45,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre // confirm size and timestamp var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string - err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { var err error volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(), &volume_server_pb.ReadVolumeFileStatusRequest{ @@ -226,7 +226,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s if receiveErr == io.EOF { break } - if resp!=nil && resp.ModifiedTsNs != 0 { + if resp != nil && resp.ModifiedTsNs != 0 { modifiedTsNs = resp.ModifiedTsNs } if receiveErr != nil { diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 861d352d7..79611f499 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -126,7 +126,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId)) - err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy ec data slices for _, shardId := range req.ShardIds { diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 6c1de3154..018daed8b 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -120,9 +120,9 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { var _ = filer_pb.FilerClient(&WebDavFileSystem{}) -func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption) @@ -162,7 +162,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm return os.ErrExist } - return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir, name := util.FullPath(fullDirPath).DirAndName() request := &filer_pb.CreateEntryRequest{ Directory: dir, @@ -212,7 +212,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f } dir, name := util.FullPath(fullFilePath).DirAndName() - err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ @@ -315,7 +315,7 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) oldDir, oldBaseName := util.FullPath(oldName).DirAndName() newDir, newBaseName := util.FullPath(newName).DirAndName() - return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: oldDir, @@ -375,7 +375,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64 var fileId, host string var auth security.EncodedJwt - if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() @@ -477,7 +477,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { f.entry.Chunks = manifestedChunks } - flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { f.entry.Attributes.Mtime = time.Now().Unix() f.entry.Attributes.Collection = f.collection f.entry.Attributes.Replication = f.replication diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go index 5ed1677c8..2d391de1d 100644 --- a/weed/shell/command_cluster_ps.go +++ b/weed/shell/command_cluster_ps.go @@ -36,7 +36,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W return nil } - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, }) diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go index 8942c15da..55c8ddf19 100644 --- a/weed/shell/command_collection_delete.go +++ b/weed/shell/command_collection_delete.go @@ -53,7 +53,7 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writ return nil } - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: *collectionName, }) diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go index ba502a6b9..b2aa6d2f4 100644 --- a/weed/shell/command_collection_list.go +++ b/weed/shell/command_collection_list.go @@ -62,7 +62,7 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) { var resp *master_pb.CollectionListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{ IncludeNormalVolumes: includeNormalVolumes, IncludeEcVolumes: includeEcVolumes, diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 51c8c32cd..765c0ad89 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -61,7 +61,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) targetAddress := pb.NewServerAddressFromDataNode(targetServer.info) - err = operation.WithVolumeServerClient(targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if targetAddress != existingLocation { @@ -241,7 +241,7 @@ func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection strin fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), Collection: collection, @@ -256,7 +256,7 @@ func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, s fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{ VolumeId: uint32(volumeId), ShardIds: toBeUnmountedhardIds, @@ -269,7 +269,7 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index b2ca605c7..288fa4945 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -116,7 +116,7 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume - if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) @@ -149,7 +149,7 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{ VolumeId: uint32(vid), Collection: collection, @@ -187,7 +187,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr continue } - err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) @@ -223,7 +223,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) { var resp *master_pb.LookupVolumeResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds}) return err }) @@ -236,7 +236,7 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) { var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 6add14749..fcdee264e 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -126,7 +126,7 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 409ec4329..f5d1166d2 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -172,7 +172,7 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) { - err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{ VolumeId: uint32(volumeId), Collection: collection, @@ -213,7 +213,7 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection var copyErr error if applyBalancing { - copyErr = operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + copyErr = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index a5731240d..17e9c6550 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -41,7 +41,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write dir, name := util.FullPath(path).DirAndName() - return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index ece805db3..73bb8e5c6 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -117,7 +117,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io if *apply { - if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf2.Bytes()) }); err != nil && err != filer_pb.ErrNotFound { return err diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index e0525defa..a7de6d3ef 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -40,7 +40,7 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W dir, name := util.FullPath(path).DirAndName() - return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go index 46dc07e9a..7fe4cf809 100644 --- a/weed/shell/command_fs_meta_load.go +++ b/weed/shell/command_fs_meta_load.go @@ -48,7 +48,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. var dirCount, fileCount uint64 - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { sizeBuf := make([]byte, 4) diff --git a/weed/shell/command_fs_mkdir.go b/weed/shell/command_fs_mkdir.go index 11b663eec..71a35cb68 100644 --- a/weed/shell/command_fs_mkdir.go +++ b/weed/shell/command_fs_mkdir.go @@ -36,7 +36,7 @@ func (c *commandFsMkdir) Do(args []string, commandEnv *CommandEnv, writer io.Wri dir, name := util.FullPath(path).DirAndName() - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, createErr := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ Directory: dir, diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go index 2448c8f61..612cdc84e 100644 --- a/weed/shell/command_fs_mv.go +++ b/weed/shell/command_fs_mv.go @@ -54,7 +54,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer destinationDir, destinationName := util.FullPath(destinationPath).DirAndName() - return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // collect destination entry info destinationRequest := &filer_pb.LookupDirectoryEntryRequest{ diff --git a/weed/shell/command_fs_rm.go b/weed/shell/command_fs_rm.go index b383366ca..2ce3275bb 100644 --- a/weed/shell/command_fs_rm.go +++ b/weed/shell/command_fs_rm.go @@ -56,7 +56,7 @@ func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer return fmt.Errorf("need to have arguments") } - commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { for _, entry := range entiries { targetPath, err := commandEnv.parseUrl(entry) if err != nil { diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go index 3fa905237..c892c3443 100644 --- a/weed/shell/command_remote_configure.go +++ b/weed/shell/command_remote_configure.go @@ -184,7 +184,7 @@ func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandE func (c *commandRemoteConfigure) deleteRemoteStorage(commandEnv *CommandEnv, writer io.Writer, storageName string) error { - return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ Directory: filer.DirectoryEtcRemote, @@ -214,7 +214,7 @@ func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, write return err } - if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, conf.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, data) }); err != nil && err != filer_pb.ErrNotFound { return err diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go index 277c4c2be..e09a66761 100644 --- a/weed/shell/command_remote_meta_sync.go +++ b/weed/shell/command_remote_meta_sync.go @@ -117,7 +117,7 @@ func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache) - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir) diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go index 2b57db707..019bc93c9 100644 --- a/weed/shell/command_remote_mount.go +++ b/weed/shell/command_remote_mount.go @@ -117,7 +117,7 @@ func jsonPrintln(writer io.Writer, message proto.Message) error { func syncMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *remote_pb.RemoteConf, remote *remote_pb.RemoteStorageLocation) error { // find existing directory, and ensure the directory is empty - err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { parent, name := util.FullPath(dir).DirAndName() _, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ Directory: parent, diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go index 53d043ebf..a3433621e 100644 --- a/weed/shell/command_remote_uncache.go +++ b/weed/shell/command_remote_uncache.go @@ -105,7 +105,7 @@ func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer fmt.Fprintf(writer, "Uncache %+v ... ", dir.Child(entry.Name)) - err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ Directory: string(dir), Entry: entry, diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go index c947a19e6..49b07f8f1 100644 --- a/weed/shell/command_remote_unmount.go +++ b/weed/shell/command_remote_unmount.go @@ -83,7 +83,7 @@ func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir string) error { // find existing directory, and ensure the directory is empty - err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { parent, name := util.FullPath(dir).DirAndName() lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ Directory: parent, diff --git a/weed/shell/command_s3_bucket_create.go b/weed/shell/command_s3_bucket_create.go index a512ffc4a..35ecae4ee 100644 --- a/weed/shell/command_s3_bucket_create.go +++ b/weed/shell/command_s3_bucket_create.go @@ -45,7 +45,7 @@ func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer return fmt.Errorf("empty bucket name") } - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go index 26953c249..5f8585b0b 100644 --- a/weed/shell/command_s3_bucket_delete.go +++ b/weed/shell/command_s3_bucket_delete.go @@ -52,7 +52,7 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer } // delete the collection directly first - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: *bucketName, }) diff --git a/weed/shell/command_s3_bucket_list.go b/weed/shell/command_s3_bucket_list.go index 0c4e8d18f..a344a79a4 100644 --- a/weed/shell/command_s3_bucket_list.go +++ b/weed/shell/command_s3_bucket_list.go @@ -65,7 +65,7 @@ func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer i } func readFilerBucketsPath(filerClient filer_pb.FilerClient) (filerBucketsPath string, err error) { - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go index 5eab2ebd0..cefb1deeb 100644 --- a/weed/shell/command_s3_configure.go +++ b/weed/shell/command_s3_configure.go @@ -48,7 +48,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io } var buf bytes.Buffer - if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.ReadEntry(commandEnv.MasterClient, client, filer.IamConfigDirecotry, filer.IamIdentityFile, &buf) }); err != nil && err != filer_pb.ErrNotFound { return err @@ -171,7 +171,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io if *apply { - if err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes()) }); err != nil { return err diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 643cccac3..cf3bc604a 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -182,7 +182,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { - err = operation.WithVolumeServerClient(sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{ VolumeId: volumeId, NeedleId: uint64(needleValue.Key), @@ -200,7 +200,7 @@ func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.Serv func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { - return operation.WithVolumeServerClient(targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ VolumeId: volumeId, NeedleId: uint64(needleValue.Key), @@ -229,7 +229,7 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error { - return operation.WithVolumeServerClient(volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(true, volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { ext := ".idx" diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index 27cba618b..5c4c10146 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -86,7 +86,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman } for _, dst := range allLocations { - err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ VolumeId: uint32(vid), Replication: replicaPlacement.String(), diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 2885ba11f..43bf4d0f8 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -265,7 +265,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co break } - err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: replica.info.Id, SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)), diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index a7a981339..7a7cdee56 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -248,7 +248,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server) } - return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { ext := ".idx" if vinfo.isEcVolume { diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go index 575051ffe..7b03b8dfa 100644 --- a/weed/shell/command_volume_mount.go +++ b/weed/shell/command_volume_mount.go @@ -55,7 +55,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io } func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(volumeId), }) diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 796f74264..d7cc8b8ce 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -112,7 +112,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl return } - clientErr := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + clientErr := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ VolumeId: uint32(volumeId), }) @@ -123,7 +123,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl } }() - err = operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, statusErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ VolumeId: uint32(volumeId), }) @@ -140,7 +140,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl return } - err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: uint32(volumeId), SourceDataNode: string(sourceVolumeServer), @@ -173,7 +173,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) { - return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{ VolumeId: uint32(volumeId), SinceNs: lastAppendAtNs, @@ -186,7 +186,7 @@ func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source } func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ VolumeId: uint32(volumeId), }) @@ -195,7 +195,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour } func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if writable { _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ VolumeId: uint32(volumeId), diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go index 4daa589be..029ef2201 100644 --- a/weed/shell/command_volume_server_leave.go +++ b/weed/shell/command_volume_server_leave.go @@ -56,7 +56,7 @@ func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, wri } func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, writer io.Writer) (err error) { - return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{}) if leaveErr != nil { fmt.Fprintf(writer, "ask volume server %s to leave: %v\n", volumeServer, leaveErr) diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index 57d3bf347..a2330ab8a 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -124,7 +124,7 @@ func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection s func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer pb.ServerAddress) error { - err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index a22fe92a1..0df0790e6 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -118,7 +118,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress, dest string, keepLocalDatFile bool) error { - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go index d5cb9f07c..85bec44f7 100644 --- a/weed/shell/command_volume_unmount.go +++ b/weed/shell/command_volume_unmount.go @@ -55,7 +55,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer } func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{ VolumeId: uint32(volumeId), }) diff --git a/weed/shell/command_volume_vacuum.go b/weed/shell/command_volume_vacuum.go index 2e09a8c1b..a09bf5d56 100644 --- a/weed/shell/command_volume_vacuum.go +++ b/weed/shell/command_volume_vacuum.go @@ -39,7 +39,7 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ return } - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err = client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{ GarbageThreshold: float32(*garbageThreshold), }) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 02c0af59e..985f6423b 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -97,9 +97,9 @@ func (ce *CommandEnv) checkDirectory(path string) error { var _ = filer_pb.FilerClient(&CommandEnv{}) -func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (ce *CommandEnv) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithGrpcFilerClient(ce.option.FilerAddress, ce.option.GrpcDialOption, fn) + return pb.WithGrpcFilerClient(streamingMode, ce.option.FilerAddress, ce.option.GrpcDialOption, fn) } diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index caf8da859..0aa65f049 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -53,7 +53,7 @@ func RunShell(options ShellOptions) { if commandEnv.option.FilerAddress == "" { var filers []pb.ServerAddress - commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, }) @@ -75,7 +75,7 @@ func RunShell(options ShellOptions) { } if commandEnv.option.FilerAddress != "" { - commandEnv.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { + commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 0e33a8e32..70e1593a0 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -239,7 +239,7 @@ func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId) - err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err = operation.WithMasterServerClient(false, s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupEcVolumeRequest{ VolumeId: uint32(ecVolume.VolumeId), } @@ -287,7 +287,7 @@ func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, ne func (s *Store) doReadRemoteEcShardInterval(sourceDataNode pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { - err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy data slice shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{ diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go index d40165ff5..398771a83 100644 --- a/weed/storage/store_ec_delete.go +++ b/weed/storage/store_ec_delete.go @@ -88,7 +88,7 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.Sh func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode pb.ServerAddress, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error { - return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy data slice _, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{ diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index a8ec50cad..67406aff6 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -73,7 +73,7 @@ func (v *Volume) IncrementalBackup(volumeServer pb.ServerAddress, grpcDialOption writeOffset := int64(startFromOffset) - err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { stream, err := client.VolumeIncrementalCopy(context.Background(), &volume_server_pb.VolumeIncrementalCopyRequest{ VolumeId: uint32(v.Id), diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index 83043c23f..e1ae30c2d 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -15,7 +15,7 @@ type AllocateVolumeResult struct { func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error { - return operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{ VolumeId: uint32(vid), diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 40e3f70e7..74d70bcdb 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -22,7 +22,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne errCount := int32(0) for index, dn := range locationlist.list { go func(index int, url pb.ServerAddress, vid needle.VolumeId) { - err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: uint32(vid), }) @@ -70,7 +70,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl * for index, dn := range locationlist.list { go func(index int, url pb.ServerAddress, vid needle.VolumeId) { glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) - err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ VolumeId: uint32(vid), Preallocate: preallocate, @@ -121,7 +121,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V isReadOnly := false for _, dn := range locationlist.list { glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: uint32(vid), }) @@ -147,7 +147,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ VolumeId: uint32(vid), }) diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go index 725fa307d..1767ee4a4 100644 --- a/weed/wdclient/exclusive_locks/exclusive_locker.go +++ b/weed/wdclient/exclusive_locks/exclusive_locker.go @@ -54,7 +54,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) { // retry to get the lease for { - if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { + if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{ PreviousToken: atomic.LoadInt64(&l.token), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), @@ -82,7 +82,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) { defer cancel2() for l.isLocking { - if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { + if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{ PreviousToken: atomic.LoadInt64(&l.token), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), @@ -114,7 +114,7 @@ func (l *ExclusiveLocker) ReleaseLock() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { + l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{ PreviousToken: atomic.LoadInt64(&l.token), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 727d9cd34..672b3ac49 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -59,7 +59,7 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres if master == myMasterAddress { continue } - if grpcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond) defer cancel() resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{}) @@ -96,7 +96,7 @@ func (mc *MasterClient) tryAllMasters() { func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master) - gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -172,12 +172,12 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL return } -func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error { +func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { return util.Retry("master grpc", func() error { for mc.currentMaster == "" { time.Sleep(3 * time.Second) } - return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { return fn(client) }) })