diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go index 231694094..893f4a68a 100644 --- a/unmaintained/repeated_vacuum/repeated_vacuum.go +++ b/unmaintained/repeated_vacuum/repeated_vacuum.go @@ -7,6 +7,7 @@ import ( "log" "math/rand" "time" + "context" "google.golang.org/grpc" @@ -53,7 +54,7 @@ func main() { } func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, string) { - assignResult, err := operation.Assign(func() pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, &operation.VolumeAssignRequest{ + assignResult, err := operation.Assign(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, &operation.VolumeAssignRequest{ Count: 1, Replication: *replication, }) diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go index 8a352dbb8..50ef1c74f 100644 --- a/unmaintained/volume_tailer/volume_tailer.go +++ b/unmaintained/volume_tailer/volume_tailer.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "log" "time" + "context" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/security" @@ -38,7 +39,7 @@ func main() { sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano() } - err := operation.TailVolume(func() pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) { + err := operation.TailVolume(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) { if n.Size == 0 { println("-", n.String()) return nil diff --git a/weed/command/backup.go b/weed/command/backup.go index bbb6c6724..a8be4838e 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -1,6 +1,7 @@ package command import ( + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -74,7 +75,7 @@ func runBackup(cmd *Command, args []string) bool { vid := needle.VolumeId(*s.volumeId) // find volume location, replication, ttl info - lookup, err := operation.LookupVolumeId(func() pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String()) + lookup, err := operation.LookupVolumeId(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String()) if err != nil { fmt.Printf("Error looking up volume %d: %v\n", vid, err) return true diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 7f9a23cf8..2a0db47c2 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -2,6 +2,7 @@ package command import ( "bufio" + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "io" @@ -128,8 +129,9 @@ func runBenchmark(cmd *Command, args []string) bool { } b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", *pb.ServerAddresses(*b.masters).ToServiceDiscovery()) - go b.masterClient.KeepConnectedToMaster() - b.masterClient.WaitUntilConnected() + ctx := context.Background() + go b.masterClient.KeepConnectedToMaster(ctx) + b.masterClient.WaitUntilConnected(ctx) if *b.write { benchWrite() @@ -210,7 +212,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { } var jwtAuthorization security.EncodedJwt if isSecure { - jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), b.grpcDialOption, df.fp.Fid) + jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(context.Background()), b.grpcDialOption, df.fp.Fid) } if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil { s.completed++ diff --git a/weed/command/download.go b/weed/command/download.go index de33643fc..060be9f14 100644 --- a/weed/command/download.go +++ b/weed/command/download.go @@ -1,6 +1,7 @@ package command import ( + "context" "fmt" "io" "net/http" @@ -50,7 +51,7 @@ func runDownload(cmd *Command, args []string) bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for _, fid := range args { - if e := downloadToFile(func() pb.ServerAddress { return pb.ServerAddress(*d.server) }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil { + if e := downloadToFile(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*d.server) }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil { fmt.Println("Download Error: ", fid, e) } } diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 83c5b167e..df5e002c5 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -472,7 +472,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, for _, chunk := range chunks { fileIds = append(fileIds, chunk.FileId) } - operation.DeleteFiles(func() pb.ServerAddress { + operation.DeleteFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(copy.masters[0]) }, false, worker.options.grpcDialOption, fileIds) return uploadError diff --git a/weed/command/master.go b/weed/command/master.go index 6a32b8abe..f80d8faeb 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -1,6 +1,7 @@ package command import ( + "context" "fmt" "net/http" "os" @@ -218,7 +219,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { }() } - go ms.MasterClient.KeepConnectedToMaster() + go ms.MasterClient.KeepConnectedToMaster(context.Background()) // start http server var ( diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go index 64583c602..7217aff0b 100644 --- a/weed/command/master_follower.go +++ b/weed/command/master_follower.go @@ -140,7 +140,7 @@ func startMasterFollower(masterOptions MasterOptions) { } go grpcS.Serve(grpcL) - go ms.MasterClient.KeepConnectedToMaster() + go ms.MasterClient.KeepConnectedToMaster(context.Background()) // start http server httpS := &http.Server{Handler: r} diff --git a/weed/command/upload.go b/weed/command/upload.go index 1f03f7b5a..3e6b8f9a2 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -97,7 +97,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + results, e := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -119,7 +119,7 @@ func runUpload(cmd *Command, args []string) bool { fmt.Println(e.Error()) return false } - results, err := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + results, err := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) if err != nil { fmt.Println(err.Error()) return false diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 9be8d5259..016bfc8fa 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -143,8 +143,8 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*maste } -func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) { - return cluster.ListExistingPeerUpdates(f.GetMaster(), f.GrpcDialOption, f.MasterClient.FilerGroup, cluster.FilerType) +func (f *Filer) ListExistingPeerUpdates(ctx context.Context) (existingNodes []*master_pb.ClusterNodeUpdate) { + return cluster.ListExistingPeerUpdates(f.GetMaster(ctx), f.GrpcDialOption, f.MasterClient.FilerGroup, cluster.FilerType) } func (f *Filer) SetStore(store FilerStore) (isFresh bool) { @@ -177,12 +177,12 @@ func (f *Filer) GetStore() (store FilerStore) { return f.Store } -func (fs *Filer) GetMaster() pb.ServerAddress { - return fs.MasterClient.GetMaster() +func (fs *Filer) GetMaster(ctx context.Context) pb.ServerAddress { + return fs.MasterClient.GetMaster(ctx) } -func (fs *Filer) KeepMasterClientConnected() { - fs.MasterClient.KeepConnectedToMaster() +func (fs *Filer) KeepMasterClientConnected(ctx context.Context) { + fs.MasterClient.KeepConnectedToMaster(ctx) } func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) { diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 9c321744b..9e62fe996 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -1,6 +1,7 @@ package broker import ( + "context" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" @@ -68,9 +69,9 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnSubAddBroker pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnSubRemoveBroker - go mqBroker.MasterClient.KeepConnectedToMaster() + go mqBroker.MasterClient.KeepConnectedToMaster(context.Background()) - existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType) + existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(context.Background()), grpcDialOption, option.FilerGroup, cluster.FilerType) for _, newNode := range existingNodes { mqBroker.OnBrokerUpdate(newNode, time.Now()) } diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 1b7a0146d..cc8e87b21 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -47,9 +47,9 @@ func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concur ap = &AssignProxy{ pool: make(chan *singleThreadAssignProxy, concurrency), } - ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption) + ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn(context.Background()).ToGrpcAddress(), true, grpcDialOption) if err != nil { - return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err) + return nil, fmt.Errorf("fail to dial %s: %v", masterFn(context.Background()).ToGrpcAddress(), err) } for i := 0; i < concurrency; i++ { ap.pool <- &singleThreadAssignProxy{} @@ -153,7 +153,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest continue } - lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: request.Count, Replication: request.Replication, diff --git a/weed/operation/assign_file_id_test.go b/weed/operation/assign_file_id_test.go index f6362dceb..ac0f4eee6 100644 --- a/weed/operation/assign_file_id_test.go +++ b/weed/operation/assign_file_id_test.go @@ -1,6 +1,7 @@ package operation import ( + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "google.golang.org/grpc" @@ -11,7 +12,7 @@ import ( func BenchmarkWithConcurrency(b *testing.B) { concurrencyLevels := []int{1, 10, 100, 1000} - ap, _ := NewAssignProxy(func() pb.ServerAddress { + ap, _ := NewAssignProxy(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress("localhost:9333") }, grpc.WithInsecure(), 16) @@ -47,7 +48,7 @@ func BenchmarkWithConcurrency(b *testing.B) { } func BenchmarkStreamAssign(b *testing.B) { - ap, _ := NewAssignProxy(func() pb.ServerAddress { + ap, _ := NewAssignProxy(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress("localhost:9333") }, grpc.WithInsecure(), 16) for i := 0; i < b.N; i++ { @@ -59,7 +60,7 @@ func BenchmarkStreamAssign(b *testing.B) { func BenchmarkUnaryAssign(b *testing.B) { for i := 0; i < b.N; i++ { - Assign(func() pb.ServerAddress { + Assign(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress("localhost:9333") }, grpc.WithInsecure(), &VolumeAssignRequest{ Count: 1, diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index eacf64112..c451420fe 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -1,6 +1,7 @@ package operation import ( + "context" "encoding/json" "errors" "fmt" @@ -173,7 +174,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { for ; chunkIndex < len(cf.chunkList); chunkIndex++ { ci := cf.chunkList[chunkIndex] // if we need read date from local volume server first? - fileUrl, jwt, lookupError := LookupFileId(func() pb.ServerAddress { + fileUrl, jwt, lookupError := LookupFileId(func(_ context.Context) pb.ServerAddress { return cf.master }, cf.grpcDialOption, ci.Fid) if lookupError != nil { diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index fc4609a2d..6c89c17b1 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -80,7 +80,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids //only query unknown_vids - err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err := WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeOrFileIds: unknown_vids, diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 3eb38c31e..57bd81b14 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -1,6 +1,7 @@ package operation import ( + "context" "github.com/seaweedfs/seaweedfs/weed/pb" "io" "mime" @@ -40,7 +41,7 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -type GetMasterFn func() pb.ServerAddress +type GetMasterFn func(ctx context.Context) pb.ServerAddress func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 795cd3ccc..0b7254c0d 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -160,7 +160,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.checkWithMaster() go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec) - go fs.filer.KeepMasterClientConnected() + go fs.filer.KeepMasterClientConnected(context.Background()) if !util.LoadConfiguration("filer", false) { v.SetDefault("leveldb2.enabled", true) @@ -196,7 +196,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - existingNodes := fs.filer.ListExistingPeerUpdates() + existingNodes := fs.filer.ListExistingPeerUpdates(context.Background()) startFromTime := time.Now().Add(-filer.LogFlushInterval) if option.JoinExistingFiler { startFromTime = time.Time{} diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 580cdfed2..3499a2e13 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -292,12 +292,12 @@ func (ms *MasterServer) startAdminScripts() { reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`) - go commandEnv.MasterClient.KeepConnectedToMaster() + go commandEnv.MasterClient.KeepConnectedToMaster(context.Background()) go func() { for { time.Sleep(time.Duration(sleepMinutes) * time.Minute) - if ms.Topo.IsLeader() && ms.MasterClient.GetMaster() != "" { + if ms.Topo.IsLeader() && ms.MasterClient.GetMaster(context.Background()) != "" { shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroupName(*shellOptions.FilerGroup)) if shellOptions.FilerAddress == "" { continue diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 0d7131340..f40b819af 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -124,13 +124,13 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { - submitForClientHandler(w, r, func() pb.ServerAddress { return ms.option.Master }, ms.grpcDialOption) + submitForClientHandler(w, r, func(ctx context.Context) pb.ServerAddress { return ms.option.Master }, ms.grpcDialOption) } else { masterUrl, err := ms.Topo.Leader() if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else { - submitForClientHandler(w, r, func() pb.ServerAddress { return masterUrl }, ms.grpcDialOption) + submitForClientHandler(w, r, func(ctx context.Context) pb.ServerAddress { return masterUrl }, ms.grpcDialOption) } } } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index abd39b582..00a285406 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -181,7 +181,7 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv } func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly bool) error { - if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, vs.GetMaster(context.Background()), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { _, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{ Ip: vs.store.Ip, Port: uint32(vs.store.Port), @@ -197,8 +197,8 @@ func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly } return nil }); grpcErr != nil { - glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr) - return fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(), grpcErr) + glog.V(0).Infof("connect to %s: %v", vs.GetMaster(context.Background()), grpcErr) + return fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(context.Background()), grpcErr) } return nil } diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index d2aa61a17..81bb87613 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -21,7 +21,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) -func (vs *VolumeServer) GetMaster() pb.ServerAddress { +func (vs *VolumeServer) GetMaster(ctx context.Context) pb.ServerAddress { return vs.currentMaster } diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 51b61b225..6548b7c56 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -84,17 +84,17 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre }() var preallocateSize int64 - if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, vs.GetMaster(context.Background()), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { - return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err) + return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(context.Background()), err) } if resp.VolumePreallocate { preallocateSize = int64(resp.VolumeSizeLimitMB) * (1 << 20) } return nil }); grpcErr != nil { - glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr) + glog.V(0).Infof("connect to %s: %v", vs.GetMaster(context.Background()), grpcErr) } if preallocateSize > 0 && !hasRemoteDatFile { diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 08e536811..cc364513b 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -2,6 +2,7 @@ package weed_server import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -291,7 +292,7 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, w.Header().Set("X-File-Store", "chunked") - chunkedFileReader := operation.NewChunkedFileReader(chunkManifest.Chunks, vs.GetMaster(), vs.grpcDialOption) + chunkedFileReader := operation.NewChunkedFileReader(chunkManifest.Chunks, vs.GetMaster(context.Background()), vs.grpcDialOption) defer chunkedFileReader.Close() rs := conditionallyCropImages(chunkedFileReader, ext, r) diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go index 03acca5b2..5c9866d29 100644 --- a/weed/shell/command_cluster_check.go +++ b/weed/shell/command_cluster_check.go @@ -103,7 +103,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i // collect all masters var masters []pb.ServerAddress - masters = append(masters, commandEnv.MasterClient.GetMasters()...) + masters = append(masters, commandEnv.MasterClient.GetMasters(context.Background())...) // check from master to volume servers for _, master := range masters { diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index 20add302a..407dce006 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -46,8 +46,9 @@ func RunShell(options ShellOptions) { commandEnv := NewCommandEnv(&options) - go commandEnv.MasterClient.KeepConnectedToMaster() - commandEnv.MasterClient.WaitUntilConnected() + ctx := context.Background() + go commandEnv.MasterClient.KeepConnectedToMaster(ctx) + commandEnv.MasterClient.WaitUntilConnected(ctx) if commandEnv.option.FilerAddress == "" { var filers []pb.ServerAddress diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 98442c1af..da46a440b 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -61,7 +61,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri if err == nil && len(fullUrls) > 0 { return } - err = pb.WithMasterClient(false, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + err = pb.WithMasterClient(false, mc.GetMaster(context.Background()), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ VolumeOrFileIds: []string{fileId}, }) @@ -103,31 +103,43 @@ func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) { mc.currentMasterLock.Unlock() } -func (mc *MasterClient) GetMaster() pb.ServerAddress { - mc.WaitUntilConnected() +func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress { + mc.WaitUntilConnected(ctx) return mc.getCurrentMaster() } -func (mc *MasterClient) GetMasters() []pb.ServerAddress { - mc.WaitUntilConnected() +func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress { + mc.WaitUntilConnected(ctx) return mc.masters.GetInstances() } -func (mc *MasterClient) WaitUntilConnected() { +func (mc *MasterClient) WaitUntilConnected(ctx context.Context) { for { - if mc.getCurrentMaster() != "" { + select { + case <-ctx.Done(): + glog.V(0).Infof("Connection wait stopped: %v", ctx.Err()) return + default: + if mc.getCurrentMaster() != "" { + return + } + time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond) + print(".") } - time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond) - print(".") } } -func (mc *MasterClient) KeepConnectedToMaster() { +func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) { glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters) for { - mc.tryAllMasters() - time.Sleep(time.Second) + select { + case <-ctx.Done(): + glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err()) + return + default: + mc.tryAllMasters(ctx) + time.Sleep(time.Second) + } } } @@ -157,23 +169,29 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres return } -func (mc *MasterClient) tryAllMasters() { +func (mc *MasterClient) tryAllMasters(ctx context.Context) { var nextHintedLeader pb.ServerAddress mc.masters.RefreshBySrvIfAvailable() for _, master := range mc.masters.GetInstances() { - nextHintedLeader = mc.tryConnectToMaster(master) + nextHintedLeader = mc.tryConnectToMaster(ctx, master) for nextHintedLeader != "" { - nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader) + select { + case <-ctx.Done(): + glog.V(0).Infof("Connection attempt to all masters stopped: %v", ctx.Err()) + return + default: + nextHintedLeader = mc.tryConnectToMaster(ctx, nextHintedLeader) + } } mc.setCurrentMaster("") } } -func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { +func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master) stats.MasterClientConnectCounter.WithLabelValues("total").Inc() gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() stream, err := client.KeepConnected(ctx) @@ -229,8 +247,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL if resp.VolumeLocation != nil { // maybe the leader is changed - if resp.VolumeLocation.Leader != "" && string(mc.GetMaster()) != resp.VolumeLocation.Leader { - glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(), resp.VolumeLocation.Leader) + if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader { + glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader) nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc() return nil @@ -254,6 +272,10 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL } mc.OnPeerUpdateLock.RUnlock() } + if err := ctx.Err(); err != nil { + glog.V(0).Infof("Connection attempt to master stopped: %v", err) + return err + } } }) if gprcErr != nil { @@ -298,8 +320,13 @@ func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) { } func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { + getMasterF := func() pb.ServerAddress { return mc.GetMaster(context.Background()) } + return mc.WithClientCustomGetMaster(getMasterF, streamingMode, fn) +} + +func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAddress, streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { return util.Retry("master grpc", func() error { - return pb.WithMasterClient(streamingMode, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, getMasterF(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { return fn(client) }) })