diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index d330e9ec6..08bee0f73 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -27,10 +27,10 @@ type MasterClient struct { grpcDialOption grpc.DialOption vidMap - vidMapCacheSize int - + vidMapCacheSize int OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) OnPeerUpdateLock sync.RWMutex + accessLock sync.RWMutex } func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters map[string]pb.ServerAddress) *MasterClient { @@ -61,7 +61,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri if err == nil && len(fullUrls) > 0 { return } - err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + err = pb.WithMasterClient(false, mc.getCurrentMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ VolumeOrFileIds: []string{fileId}, }) @@ -91,9 +91,21 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri return } +func (mc *MasterClient) getCurrentMaster() pb.ServerAddress { + mc.accessLock.RLock() + defer mc.accessLock.RUnlock() + return mc.currentMaster +} + +func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) { + mc.accessLock.Lock() + mc.currentMaster = master + mc.accessLock.Unlock() +} + func (mc *MasterClient) GetMaster() pb.ServerAddress { mc.WaitUntilConnected() - return mc.currentMaster + return mc.getCurrentMaster() } func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress { @@ -103,7 +115,7 @@ func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress { func (mc *MasterClient) WaitUntilConnected() { for { - if mc.currentMaster != "" { + if mc.getCurrentMaster() != "" { return } time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond) @@ -151,8 +163,7 @@ func (mc *MasterClient) tryAllMasters() { for nextHintedLeader != "" { nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader) } - - mc.currentMaster = "" + mc.setCurrentMaster("") } } @@ -204,7 +215,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL } else { mc.resetVidMap() } - mc.currentMaster = master + mc.setCurrentMaster(master) for { resp, err := stream.Recv() @@ -216,8 +227,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL if resp.VolumeLocation != nil { // maybe the leader is changed - if resp.VolumeLocation.Leader != "" && string(mc.currentMaster) != resp.VolumeLocation.Leader { - glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.currentMaster, resp.VolumeLocation.Leader) + if resp.VolumeLocation.Leader != "" && string(mc.getCurrentMaster()) != resp.VolumeLocation.Leader { + glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.getCurrentMaster(), resp.VolumeLocation.Leader) nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc() return nil @@ -279,10 +290,10 @@ func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) { func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { return util.Retry("master grpc", func() error { - for mc.currentMaster == "" { + for mc.getCurrentMaster() == "" { time.Sleep(3 * time.Second) } - return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, mc.getCurrentMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { return fn(client) }) })