Browse Source

avoid race conditions access to MasterClient.currentMaster (#3538)

https://github.com/seaweedfs/seaweedfs/issues/3510
pull/3551/merge
Konstantin Lebedev 2 years ago
committed by GitHub
parent
commit
e16dda88e4
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      weed/wdclient/masterclient.go

33
weed/wdclient/masterclient.go

@ -28,9 +28,9 @@ type MasterClient struct {
vidMap vidMap
vidMapCacheSize int vidMapCacheSize int
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex OnPeerUpdateLock sync.RWMutex
accessLock sync.RWMutex
} }
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters map[string]pb.ServerAddress) *MasterClient { func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters map[string]pb.ServerAddress) *MasterClient {
@ -61,7 +61,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
if err == nil && len(fullUrls) > 0 { if err == nil && len(fullUrls) > 0 {
return return
} }
err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
err = pb.WithMasterClient(false, mc.getCurrentMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId}, VolumeOrFileIds: []string{fileId},
}) })
@ -91,9 +91,21 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
return return
} }
func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
mc.accessLock.RLock()
defer mc.accessLock.RUnlock()
return mc.currentMaster
}
func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
mc.accessLock.Lock()
mc.currentMaster = master
mc.accessLock.Unlock()
}
func (mc *MasterClient) GetMaster() pb.ServerAddress { func (mc *MasterClient) GetMaster() pb.ServerAddress {
mc.WaitUntilConnected() mc.WaitUntilConnected()
return mc.currentMaster
return mc.getCurrentMaster()
} }
func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress { func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
@ -103,7 +115,7 @@ func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
func (mc *MasterClient) WaitUntilConnected() { func (mc *MasterClient) WaitUntilConnected() {
for { for {
if mc.currentMaster != "" {
if mc.getCurrentMaster() != "" {
return return
} }
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond) time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
@ -151,8 +163,7 @@ func (mc *MasterClient) tryAllMasters() {
for nextHintedLeader != "" { for nextHintedLeader != "" {
nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader) nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader)
} }
mc.currentMaster = ""
mc.setCurrentMaster("")
} }
} }
@ -204,7 +215,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
} else { } else {
mc.resetVidMap() mc.resetVidMap()
} }
mc.currentMaster = master
mc.setCurrentMaster(master)
for { for {
resp, err := stream.Recv() resp, err := stream.Recv()
@ -216,8 +227,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
if resp.VolumeLocation != nil { if resp.VolumeLocation != nil {
// maybe the leader is changed // maybe the leader is changed
if resp.VolumeLocation.Leader != "" && string(mc.currentMaster) != resp.VolumeLocation.Leader {
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.currentMaster, resp.VolumeLocation.Leader)
if resp.VolumeLocation.Leader != "" && string(mc.getCurrentMaster()) != resp.VolumeLocation.Leader {
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.getCurrentMaster(), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc() stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
return nil return nil
@ -279,10 +290,10 @@ func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
return util.Retry("master grpc", func() error { return util.Retry("master grpc", func() error {
for mc.currentMaster == "" {
for mc.getCurrentMaster() == "" {
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
} }
return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return pb.WithMasterClient(streamingMode, mc.getCurrentMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return fn(client) return fn(client)
}) })
}) })

Loading…
Cancel
Save