|
|
@ -61,7 +61,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri |
|
|
|
if err == nil && len(fullUrls) > 0 { |
|
|
|
return |
|
|
|
} |
|
|
|
err = pb.WithMasterClient(false, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { |
|
|
|
err = pb.WithMasterClient(false, mc.GetMaster(context.Background()), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { |
|
|
|
resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ |
|
|
|
VolumeOrFileIds: []string{fileId}, |
|
|
|
}) |
|
|
@ -103,31 +103,43 @@ func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) { |
|
|
|
mc.currentMasterLock.Unlock() |
|
|
|
} |
|
|
|
|
|
|
|
func (mc *MasterClient) GetMaster() pb.ServerAddress { |
|
|
|
mc.WaitUntilConnected() |
|
|
|
func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress { |
|
|
|
mc.WaitUntilConnected(ctx) |
|
|
|
return mc.getCurrentMaster() |
|
|
|
} |
|
|
|
|
|
|
|
func (mc *MasterClient) GetMasters() []pb.ServerAddress { |
|
|
|
mc.WaitUntilConnected() |
|
|
|
func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress { |
|
|
|
mc.WaitUntilConnected(ctx) |
|
|
|
return mc.masters.GetInstances() |
|
|
|
} |
|
|
|
|
|
|
|
func (mc *MasterClient) WaitUntilConnected() { |
|
|
|
func (mc *MasterClient) WaitUntilConnected(ctx context.Context) { |
|
|
|
for { |
|
|
|
if mc.getCurrentMaster() != "" { |
|
|
|
select { |
|
|
|
case <-ctx.Done(): |
|
|
|
glog.V(0).Infof("Connection wait stopped: %v", ctx.Err()) |
|
|
|
return |
|
|
|
default: |
|
|
|
if mc.getCurrentMaster() != "" { |
|
|
|
return |
|
|
|
} |
|
|
|
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond) |
|
|
|
print(".") |
|
|
|
} |
|
|
|
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond) |
|
|
|
print(".") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (mc *MasterClient) KeepConnectedToMaster() { |
|
|
|
func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) { |
|
|
|
glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters) |
|
|
|
for { |
|
|
|
mc.tryAllMasters() |
|
|
|
time.Sleep(time.Second) |
|
|
|
select { |
|
|
|
case <-ctx.Done(): |
|
|
|
glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err()) |
|
|
|
return |
|
|
|
default: |
|
|
|
mc.tryAllMasters(ctx) |
|
|
|
time.Sleep(time.Second) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -157,23 +169,29 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
func (mc *MasterClient) tryAllMasters() { |
|
|
|
func (mc *MasterClient) tryAllMasters(ctx context.Context) { |
|
|
|
var nextHintedLeader pb.ServerAddress |
|
|
|
mc.masters.RefreshBySrvIfAvailable() |
|
|
|
for _, master := range mc.masters.GetInstances() { |
|
|
|
nextHintedLeader = mc.tryConnectToMaster(master) |
|
|
|
nextHintedLeader = mc.tryConnectToMaster(ctx, master) |
|
|
|
for nextHintedLeader != "" { |
|
|
|
nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader) |
|
|
|
select { |
|
|
|
case <-ctx.Done(): |
|
|
|
glog.V(0).Infof("Connection attempt to all masters stopped: %v", ctx.Err()) |
|
|
|
return |
|
|
|
default: |
|
|
|
nextHintedLeader = mc.tryConnectToMaster(ctx, nextHintedLeader) |
|
|
|
} |
|
|
|
} |
|
|
|
mc.setCurrentMaster("") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { |
|
|
|
func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { |
|
|
|
glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master) |
|
|
|
stats.MasterClientConnectCounter.WithLabelValues("total").Inc() |
|
|
|
gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
|
ctx, cancel := context.WithCancel(ctx) |
|
|
|
defer cancel() |
|
|
|
|
|
|
|
stream, err := client.KeepConnected(ctx) |
|
|
@ -229,8 +247,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL |
|
|
|
|
|
|
|
if resp.VolumeLocation != nil { |
|
|
|
// maybe the leader is changed
|
|
|
|
if resp.VolumeLocation.Leader != "" && string(mc.GetMaster()) != resp.VolumeLocation.Leader { |
|
|
|
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(), resp.VolumeLocation.Leader) |
|
|
|
if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader { |
|
|
|
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader) |
|
|
|
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) |
|
|
|
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc() |
|
|
|
return nil |
|
|
@ -254,6 +272,10 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL |
|
|
|
} |
|
|
|
mc.OnPeerUpdateLock.RUnlock() |
|
|
|
} |
|
|
|
if err := ctx.Err(); err != nil { |
|
|
|
glog.V(0).Infof("Connection attempt to master stopped: %v", err) |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
}) |
|
|
|
if gprcErr != nil { |
|
|
@ -298,8 +320,13 @@ func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) { |
|
|
|
} |
|
|
|
|
|
|
|
func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { |
|
|
|
getMasterF := func() pb.ServerAddress { return mc.GetMaster(context.Background()) } |
|
|
|
return mc.WithClientCustomGetMaster(getMasterF, streamingMode, fn) |
|
|
|
} |
|
|
|
|
|
|
|
func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAddress, streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { |
|
|
|
return util.Retry("master grpc", func() error { |
|
|
|
return pb.WithMasterClient(streamingMode, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { |
|
|
|
return pb.WithMasterClient(streamingMode, getMasterF(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { |
|
|
|
return fn(client) |
|
|
|
}) |
|
|
|
}) |
|
|
|