diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index 4de864980..6107f3d48 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -124,7 +124,9 @@ type needleState struct { func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) { var idxFile *bytes.Reader err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { - copyFileClient, err := vs.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + copyFileClient, err := vs.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ VolumeId: v, Ext: ".idx", CompactionRevision: math.MaxUint32, diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 58905cb99..c1e73aaf9 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -117,7 +117,9 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string for { err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ ClientName: "filer:" + self, PathPrefix: "/", SinceNs: lastTsNs, diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 0e417e0ab..b9e9e300b 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -29,6 +29,8 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector // update remote filer err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: dir.FullPath(), @@ -37,7 +39,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector NewName: req.NewName, } - _, err := client.AtomicRenameEntry(context.Background(), request) + _, err := client.AtomicRenameEntry(ctx, request) if err != nil { return fuse.EIO } diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go index 0c04d2841..06162471c 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -48,7 +48,9 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { for { for _, filer := range broker.option.Filers { broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { - stream, err := client.KeepConnected(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := client.KeepConnected(ctx) if err != nil { glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err) return err diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index 3cd66b5da..a15c21ae4 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -28,8 +28,10 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.Volume func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{ + stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{ VolumeId: uint32(vid), SinceNs: sinceNs, IdleTimeoutSeconds: uint32(idleTimeoutSeconds), diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index f2db152c5..4eacfa2e5 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -85,11 +85,11 @@ func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, f glog.V(4).Infof("read directory: %v", request) ctx, cancel := context.WithCancel(context.Background()) + defer cancel() stream, err := client.ListEntries(ctx, request) if err != nil { return fmt.Errorf("list %s: %v", fullDirPath, err) } - defer cancel() var prevEntry *Entry for { diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index b6779dfb7..30d566f94 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -212,7 +212,9 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d InclusiveStartFrom: false, } - stream, listErr := client.ListEntries(context.Background(), request) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, listErr := client.ListEntries(ctx, request) if listErr != nil { err = fmt.Errorf("list entires %+v: %v", request, listErr) return diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 3a1b95f26..ac94ff9d4 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -58,14 +58,17 @@ func (vs *VolumeServer) heartbeat() { func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { - grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) } defer grpcConection.Close() client := master_pb.NewSeaweedClient(grpcConection) - stream, err := client.SendHeartbeat(context.Background()) + stream, err := client.SendHeartbeat(ctx) if err != nil { glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err) return "", err diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go index 1ecfe6ce2..801de14ce 100644 --- a/weed/wdclient/exclusive_locks/exclusive_locker.go +++ b/weed/wdclient/exclusive_locks/exclusive_locker.go @@ -46,10 +46,13 @@ func (l *ExclusiveLocker) RequestLock() { return } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // retry to get the lease for { if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{ + resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{ PreviousToken: atomic.LoadInt64(&l.token), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), LockName: AdminLockName, @@ -73,7 +76,7 @@ func (l *ExclusiveLocker) RequestLock() { go func() { for l.isLocking { if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{ + resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{ PreviousToken: atomic.LoadInt64(&l.token), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), LockName: AdminLockName, @@ -98,8 +101,12 @@ func (l *ExclusiveLocker) RequestLock() { func (l *ExclusiveLocker) ReleaseLock() { l.isLocking = false + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { - client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{ + client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{ PreviousToken: atomic.LoadInt64(&l.token), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), LockName: AdminLockName, diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 4c066d535..3d23d8f13 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -70,7 +70,10 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master) gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { - stream, err := client.KeepConnected(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.KeepConnected(ctx) if err != nil { glog.V(0).Infof("%s masterClient failed to keep connected to %s: %v", mc.clientType, master, err) return err