Browse Source

Added context for the MasterClient's methods to avoid endless loops (#5628)

* Added context for the MasterClient's methods to avoid endless loops

* Returned WithClient function. Added WithClientCustomGetMaster function

* Hid unused ctx arguments

* Using a common context for the KeepConnectedToMaster and WaitUntilConnected functions

* Changed the context termination check in the tryConnectToMaster function

* Added a child context to the tryConnectToMaster function

* Added a common context for KeepConnectedToMaster and WaitUntilConnected functions in benchmark
pull/4508/merge
vadimartynov 6 months ago
committed by GitHub
parent
commit
8aae82dd71
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 3
      unmaintained/repeated_vacuum/repeated_vacuum.go
  2. 3
      unmaintained/volume_tailer/volume_tailer.go
  3. 3
      weed/command/backup.go
  4. 8
      weed/command/benchmark.go
  5. 3
      weed/command/download.go
  6. 2
      weed/command/filer_copy.go
  7. 3
      weed/command/master.go
  8. 2
      weed/command/master_follower.go
  9. 4
      weed/command/upload.go
  10. 12
      weed/filer/filer.go
  11. 5
      weed/mq/broker/broker_server.go
  12. 6
      weed/operation/assign_file_id.go
  13. 7
      weed/operation/assign_file_id_test.go
  14. 3
      weed/operation/chunked_file.go
  15. 2
      weed/operation/lookup.go
  16. 3
      weed/operation/submit.go
  17. 4
      weed/server/filer_server.go
  18. 4
      weed/server/master_server.go
  19. 4
      weed/server/master_server_handlers_admin.go
  20. 6
      weed/server/volume_grpc_admin.go
  21. 2
      weed/server/volume_grpc_client_to_master.go
  22. 6
      weed/server/volume_grpc_copy.go
  23. 3
      weed/server/volume_server_handlers_read.go
  24. 2
      weed/shell/command_cluster_check.go
  25. 5
      weed/shell/shell_liner.go
  26. 67
      weed/wdclient/masterclient.go

3
unmaintained/repeated_vacuum/repeated_vacuum.go

@ -7,6 +7,7 @@ import (
"log"
"math/rand"
"time"
"context"
"google.golang.org/grpc"
@ -53,7 +54,7 @@ func main() {
}
func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, string) {
assignResult, err := operation.Assign(func() pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, &operation.VolumeAssignRequest{
assignResult, err := operation.Assign(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, &operation.VolumeAssignRequest{
Count: 1,
Replication: *replication,
})

3
unmaintained/volume_tailer/volume_tailer.go

@ -5,6 +5,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"log"
"time"
"context"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/security"
@ -38,7 +39,7 @@ func main() {
sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano()
}
err := operation.TailVolume(func() pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) {
err := operation.TailVolume(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) {
if n.Size == 0 {
println("-", n.String())
return nil

3
weed/command/backup.go

@ -1,6 +1,7 @@
package command
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
@ -74,7 +75,7 @@ func runBackup(cmd *Command, args []string) bool {
vid := needle.VolumeId(*s.volumeId)
// find volume location, replication, ttl info
lookup, err := operation.LookupVolumeId(func() pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String())
lookup, err := operation.LookupVolumeId(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String())
if err != nil {
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true

8
weed/command/benchmark.go

@ -2,6 +2,7 @@ package command
import (
"bufio"
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
@ -128,8 +129,9 @@ func runBenchmark(cmd *Command, args []string) bool {
}
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", *pb.ServerAddresses(*b.masters).ToServiceDiscovery())
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()
ctx := context.Background()
go b.masterClient.KeepConnectedToMaster(ctx)
b.masterClient.WaitUntilConnected(ctx)
if *b.write {
benchWrite()
@ -210,7 +212,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
}
var jwtAuthorization security.EncodedJwt
if isSecure {
jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), b.grpcDialOption, df.fp.Fid)
jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(context.Background()), b.grpcDialOption, df.fp.Fid)
}
if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
s.completed++

3
weed/command/download.go

@ -1,6 +1,7 @@
package command
import (
"context"
"fmt"
"io"
"net/http"
@ -50,7 +51,7 @@ func runDownload(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for _, fid := range args {
if e := downloadToFile(func() pb.ServerAddress { return pb.ServerAddress(*d.server) }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil {
if e := downloadToFile(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*d.server) }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil {
fmt.Println("Download Error: ", fid, e)
}
}

2
weed/command/filer_copy.go

@ -472,7 +472,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
operation.DeleteFiles(func() pb.ServerAddress {
operation.DeleteFiles(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress(copy.masters[0])
}, false, worker.options.grpcDialOption, fileIds)
return uploadError

3
weed/command/master.go

@ -1,6 +1,7 @@
package command
import (
"context"
"fmt"
"net/http"
"os"
@ -218,7 +219,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}()
}
go ms.MasterClient.KeepConnectedToMaster()
go ms.MasterClient.KeepConnectedToMaster(context.Background())
// start http server
var (

2
weed/command/master_follower.go

@ -140,7 +140,7 @@ func startMasterFollower(masterOptions MasterOptions) {
}
go grpcS.Serve(grpcL)
go ms.MasterClient.KeepConnectedToMaster()
go ms.MasterClient.KeepConnectedToMaster(context.Background())
// start http server
httpS := &http.Server{Handler: r}

4
weed/command/upload.go

@ -97,7 +97,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
results, e := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
results, e := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@ -119,7 +119,7 @@ func runUpload(cmd *Command, args []string) bool {
fmt.Println(e.Error())
return false
}
results, err := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
results, err := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
if err != nil {
fmt.Println(err.Error())
return false

12
weed/filer/filer.go

@ -143,8 +143,8 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*maste
}
func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) {
return cluster.ListExistingPeerUpdates(f.GetMaster(), f.GrpcDialOption, f.MasterClient.FilerGroup, cluster.FilerType)
func (f *Filer) ListExistingPeerUpdates(ctx context.Context) (existingNodes []*master_pb.ClusterNodeUpdate) {
return cluster.ListExistingPeerUpdates(f.GetMaster(ctx), f.GrpcDialOption, f.MasterClient.FilerGroup, cluster.FilerType)
}
func (f *Filer) SetStore(store FilerStore) (isFresh bool) {
@ -177,12 +177,12 @@ func (f *Filer) GetStore() (store FilerStore) {
return f.Store
}
func (fs *Filer) GetMaster() pb.ServerAddress {
return fs.MasterClient.GetMaster()
func (fs *Filer) GetMaster(ctx context.Context) pb.ServerAddress {
return fs.MasterClient.GetMaster(ctx)
}
func (fs *Filer) KeepMasterClientConnected() {
fs.MasterClient.KeepConnectedToMaster()
func (fs *Filer) KeepMasterClientConnected(ctx context.Context) {
fs.MasterClient.KeepConnectedToMaster(ctx)
}
func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {

5
weed/mq/broker/broker_server.go

@ -1,6 +1,7 @@
package broker
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
@ -68,9 +69,9 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnSubAddBroker
pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnSubRemoveBroker
go mqBroker.MasterClient.KeepConnectedToMaster()
go mqBroker.MasterClient.KeepConnectedToMaster(context.Background())
existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType)
existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(context.Background()), grpcDialOption, option.FilerGroup, cluster.FilerType)
for _, newNode := range existingNodes {
mqBroker.OnBrokerUpdate(newNode, time.Now())
}

6
weed/operation/assign_file_id.go

@ -47,9 +47,9 @@ func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concur
ap = &AssignProxy{
pool: make(chan *singleThreadAssignProxy, concurrency),
}
ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption)
ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn(context.Background()).ToGrpcAddress(), true, grpcDialOption)
if err != nil {
return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err)
return nil, fmt.Errorf("fail to dial %s: %v", masterFn(context.Background()).ToGrpcAddress(), err)
}
for i := 0; i < concurrency; i++ {
ap.pool <- &singleThreadAssignProxy{}
@ -153,7 +153,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
continue
}
lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: request.Count,
Replication: request.Replication,

7
weed/operation/assign_file_id_test.go

@ -1,6 +1,7 @@
package operation
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
@ -11,7 +12,7 @@ import (
func BenchmarkWithConcurrency(b *testing.B) {
concurrencyLevels := []int{1, 10, 100, 1000}
ap, _ := NewAssignProxy(func() pb.ServerAddress {
ap, _ := NewAssignProxy(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), 16)
@ -47,7 +48,7 @@ func BenchmarkWithConcurrency(b *testing.B) {
}
func BenchmarkStreamAssign(b *testing.B) {
ap, _ := NewAssignProxy(func() pb.ServerAddress {
ap, _ := NewAssignProxy(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), 16)
for i := 0; i < b.N; i++ {
@ -59,7 +60,7 @@ func BenchmarkStreamAssign(b *testing.B) {
func BenchmarkUnaryAssign(b *testing.B) {
for i := 0; i < b.N; i++ {
Assign(func() pb.ServerAddress {
Assign(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), &VolumeAssignRequest{
Count: 1,

3
weed/operation/chunked_file.go

@ -1,6 +1,7 @@
package operation
import (
"context"
"encoding/json"
"errors"
"fmt"
@ -173,7 +174,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
ci := cf.chunkList[chunkIndex]
// if we need read date from local volume server first?
fileUrl, jwt, lookupError := LookupFileId(func() pb.ServerAddress {
fileUrl, jwt, lookupError := LookupFileId(func(_ context.Context) pb.ServerAddress {
return cf.master
}, cf.grpcDialOption, ci.Fid)
if lookupError != nil {

2
weed/operation/lookup.go

@ -80,7 +80,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids
//only query unknown_vids
err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
err := WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeOrFileIds: unknown_vids,

3
weed/operation/submit.go

@ -1,6 +1,7 @@
package operation
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"mime"
@ -40,7 +41,7 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
type GetMasterFn func() pb.ServerAddress
type GetMasterFn func(ctx context.Context) pb.ServerAddress
func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))

4
weed/server/filer_server.go

@ -160,7 +160,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.checkWithMaster()
go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.KeepMasterClientConnected()
go fs.filer.KeepMasterClientConnected(context.Background())
if !util.LoadConfiguration("filer", false) {
v.SetDefault("leveldb2.enabled", true)
@ -196,7 +196,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
existingNodes := fs.filer.ListExistingPeerUpdates()
existingNodes := fs.filer.ListExistingPeerUpdates(context.Background())
startFromTime := time.Now().Add(-filer.LogFlushInterval)
if option.JoinExistingFiler {
startFromTime = time.Time{}

4
weed/server/master_server.go

@ -292,12 +292,12 @@ func (ms *MasterServer) startAdminScripts() {
reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
go commandEnv.MasterClient.KeepConnectedToMaster()
go commandEnv.MasterClient.KeepConnectedToMaster(context.Background())
go func() {
for {
time.Sleep(time.Duration(sleepMinutes) * time.Minute)
if ms.Topo.IsLeader() && ms.MasterClient.GetMaster() != "" {
if ms.Topo.IsLeader() && ms.MasterClient.GetMaster(context.Background()) != "" {
shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroupName(*shellOptions.FilerGroup))
if shellOptions.FilerAddress == "" {
continue

4
weed/server/master_server_handlers_admin.go

@ -124,13 +124,13 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
submitForClientHandler(w, r, func() pb.ServerAddress { return ms.option.Master }, ms.grpcDialOption)
submitForClientHandler(w, r, func(ctx context.Context) pb.ServerAddress { return ms.option.Master }, ms.grpcDialOption)
} else {
masterUrl, err := ms.Topo.Leader()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
submitForClientHandler(w, r, func() pb.ServerAddress { return masterUrl }, ms.grpcDialOption)
submitForClientHandler(w, r, func(ctx context.Context) pb.ServerAddress { return masterUrl }, ms.grpcDialOption)
}
}
}

6
weed/server/volume_grpc_admin.go

@ -181,7 +181,7 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
}
func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly bool) error {
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(context.Background()), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
_, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{
Ip: vs.store.Ip,
Port: uint32(vs.store.Port),
@ -197,8 +197,8 @@ func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly
}
return nil
}); grpcErr != nil {
glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
return fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(), grpcErr)
glog.V(0).Infof("connect to %s: %v", vs.GetMaster(context.Background()), grpcErr)
return fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(context.Background()), grpcErr)
}
return nil
}

2
weed/server/volume_grpc_client_to_master.go

@ -21,7 +21,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
func (vs *VolumeServer) GetMaster() pb.ServerAddress {
func (vs *VolumeServer) GetMaster(ctx context.Context) pb.ServerAddress {
return vs.currentMaster
}

6
weed/server/volume_grpc_copy.go

@ -84,17 +84,17 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}()
var preallocateSize int64
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(context.Background()), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err)
return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(context.Background()), err)
}
if resp.VolumePreallocate {
preallocateSize = int64(resp.VolumeSizeLimitMB) * (1 << 20)
}
return nil
}); grpcErr != nil {
glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
glog.V(0).Infof("connect to %s: %v", vs.GetMaster(context.Background()), grpcErr)
}
if preallocateSize > 0 && !hasRemoteDatFile {

3
weed/server/volume_server_handlers_read.go

@ -2,6 +2,7 @@ package weed_server
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@ -291,7 +292,7 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string,
w.Header().Set("X-File-Store", "chunked")
chunkedFileReader := operation.NewChunkedFileReader(chunkManifest.Chunks, vs.GetMaster(), vs.grpcDialOption)
chunkedFileReader := operation.NewChunkedFileReader(chunkManifest.Chunks, vs.GetMaster(context.Background()), vs.grpcDialOption)
defer chunkedFileReader.Close()
rs := conditionallyCropImages(chunkedFileReader, ext, r)

2
weed/shell/command_cluster_check.go

@ -103,7 +103,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
// collect all masters
var masters []pb.ServerAddress
masters = append(masters, commandEnv.MasterClient.GetMasters()...)
masters = append(masters, commandEnv.MasterClient.GetMasters(context.Background())...)
// check from master to volume servers
for _, master := range masters {

5
weed/shell/shell_liner.go

@ -46,8 +46,9 @@ func RunShell(options ShellOptions) {
commandEnv := NewCommandEnv(&options)
go commandEnv.MasterClient.KeepConnectedToMaster()
commandEnv.MasterClient.WaitUntilConnected()
ctx := context.Background()
go commandEnv.MasterClient.KeepConnectedToMaster(ctx)
commandEnv.MasterClient.WaitUntilConnected(ctx)
if commandEnv.option.FilerAddress == "" {
var filers []pb.ServerAddress

67
weed/wdclient/masterclient.go

@ -61,7 +61,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
if err == nil && len(fullUrls) > 0 {
return
}
err = pb.WithMasterClient(false, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
err = pb.WithMasterClient(false, mc.GetMaster(context.Background()), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
})
@ -103,31 +103,43 @@ func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
mc.currentMasterLock.Unlock()
}
func (mc *MasterClient) GetMaster() pb.ServerAddress {
mc.WaitUntilConnected()
func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
mc.WaitUntilConnected(ctx)
return mc.getCurrentMaster()
}
func (mc *MasterClient) GetMasters() []pb.ServerAddress {
mc.WaitUntilConnected()
func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
mc.WaitUntilConnected(ctx)
return mc.masters.GetInstances()
}
func (mc *MasterClient) WaitUntilConnected() {
func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
for {
if mc.getCurrentMaster() != "" {
select {
case <-ctx.Done():
glog.V(0).Infof("Connection wait stopped: %v", ctx.Err())
return
default:
if mc.getCurrentMaster() != "" {
return
}
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
print(".")
}
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
print(".")
}
}
func (mc *MasterClient) KeepConnectedToMaster() {
func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
for {
mc.tryAllMasters()
time.Sleep(time.Second)
select {
case <-ctx.Done():
glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
return
default:
mc.tryAllMasters(ctx)
time.Sleep(time.Second)
}
}
}
@ -157,23 +169,29 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres
return
}
func (mc *MasterClient) tryAllMasters() {
func (mc *MasterClient) tryAllMasters(ctx context.Context) {
var nextHintedLeader pb.ServerAddress
mc.masters.RefreshBySrvIfAvailable()
for _, master := range mc.masters.GetInstances() {
nextHintedLeader = mc.tryConnectToMaster(master)
nextHintedLeader = mc.tryConnectToMaster(ctx, master)
for nextHintedLeader != "" {
nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader)
select {
case <-ctx.Done():
glog.V(0).Infof("Connection attempt to all masters stopped: %v", ctx.Err())
return
default:
nextHintedLeader = mc.tryConnectToMaster(ctx, nextHintedLeader)
}
}
mc.setCurrentMaster("")
}
}
func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master)
stats.MasterClientConnectCounter.WithLabelValues("total").Inc()
gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.KeepConnected(ctx)
@ -229,8 +247,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
if resp.VolumeLocation != nil {
// maybe the leader is changed
if resp.VolumeLocation.Leader != "" && string(mc.GetMaster()) != resp.VolumeLocation.Leader {
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(), resp.VolumeLocation.Leader)
if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader {
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
return nil
@ -254,6 +272,10 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
}
mc.OnPeerUpdateLock.RUnlock()
}
if err := ctx.Err(); err != nil {
glog.V(0).Infof("Connection attempt to master stopped: %v", err)
return err
}
}
})
if gprcErr != nil {
@ -298,8 +320,13 @@ func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
}
func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
getMasterF := func() pb.ServerAddress { return mc.GetMaster(context.Background()) }
return mc.WithClientCustomGetMaster(getMasterF, streamingMode, fn)
}
func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAddress, streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
return util.Retry("master grpc", func() error {
return pb.WithMasterClient(streamingMode, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return pb.WithMasterClient(streamingMode, getMasterF(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return fn(client)
})
})

Loading…
Cancel
Save