|
|
@ -168,15 +168,19 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL |
|
|
|
} |
|
|
|
|
|
|
|
// check if it is the leader to determine whether to reset the vidMap
|
|
|
|
if resp.VolumeLocation != nil && resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader { |
|
|
|
glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader) |
|
|
|
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) |
|
|
|
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc() |
|
|
|
return nil |
|
|
|
if resp.VolumeLocation != nil { |
|
|
|
if resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader { |
|
|
|
glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader) |
|
|
|
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) |
|
|
|
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc() |
|
|
|
return nil |
|
|
|
} |
|
|
|
mc.vidMap = newVidMap("") |
|
|
|
mc.updateVidMap(resp) |
|
|
|
} else { |
|
|
|
mc.vidMap = newVidMap("") |
|
|
|
} |
|
|
|
|
|
|
|
mc.currentMaster = master |
|
|
|
mc.vidMap = newVidMap("") |
|
|
|
|
|
|
|
for { |
|
|
|
resp, err := stream.Recv() |
|
|
@ -195,29 +199,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// process new volume location
|
|
|
|
loc := Location{ |
|
|
|
Url: resp.VolumeLocation.Url, |
|
|
|
PublicUrl: resp.VolumeLocation.PublicUrl, |
|
|
|
DataCenter: resp.VolumeLocation.DataCenter, |
|
|
|
GrpcPort: int(resp.VolumeLocation.GrpcPort), |
|
|
|
} |
|
|
|
for _, newVid := range resp.VolumeLocation.NewVids { |
|
|
|
glog.V(1).Infof("%s.%s: %s masterClient adds volume %d", mc.FilerGroup, mc.clientType, loc.Url, newVid) |
|
|
|
mc.addLocation(newVid, loc) |
|
|
|
} |
|
|
|
for _, deletedVid := range resp.VolumeLocation.DeletedVids { |
|
|
|
glog.V(1).Infof("%s.%s: %s masterClient removes volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedVid) |
|
|
|
mc.deleteLocation(deletedVid, loc) |
|
|
|
} |
|
|
|
for _, newEcVid := range resp.VolumeLocation.NewEcVids { |
|
|
|
glog.V(1).Infof("%s.%s: %s masterClient adds ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, newEcVid) |
|
|
|
mc.addEcLocation(newEcVid, loc) |
|
|
|
} |
|
|
|
for _, deletedEcVid := range resp.VolumeLocation.DeletedEcVids { |
|
|
|
glog.V(1).Infof("%s.%s: %s masterClient removes ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedEcVid) |
|
|
|
mc.deleteEcLocation(deletedEcVid, loc) |
|
|
|
} |
|
|
|
mc.updateVidMap(resp) |
|
|
|
} |
|
|
|
|
|
|
|
if resp.ClusterNodeUpdate != nil { |
|
|
@ -245,6 +227,32 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) { |
|
|
|
// process new volume location
|
|
|
|
loc := Location{ |
|
|
|
Url: resp.VolumeLocation.Url, |
|
|
|
PublicUrl: resp.VolumeLocation.PublicUrl, |
|
|
|
DataCenter: resp.VolumeLocation.DataCenter, |
|
|
|
GrpcPort: int(resp.VolumeLocation.GrpcPort), |
|
|
|
} |
|
|
|
for _, newVid := range resp.VolumeLocation.NewVids { |
|
|
|
glog.V(1).Infof("%s.%s: %s masterClient adds volume %d", mc.FilerGroup, mc.clientType, loc.Url, newVid) |
|
|
|
mc.addLocation(newVid, loc) |
|
|
|
} |
|
|
|
for _, deletedVid := range resp.VolumeLocation.DeletedVids { |
|
|
|
glog.V(1).Infof("%s.%s: %s masterClient removes volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedVid) |
|
|
|
mc.deleteLocation(deletedVid, loc) |
|
|
|
} |
|
|
|
for _, newEcVid := range resp.VolumeLocation.NewEcVids { |
|
|
|
glog.V(1).Infof("%s.%s: %s masterClient adds ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, newEcVid) |
|
|
|
mc.addEcLocation(newEcVid, loc) |
|
|
|
} |
|
|
|
for _, deletedEcVid := range resp.VolumeLocation.DeletedEcVids { |
|
|
|
glog.V(1).Infof("%s.%s: %s masterClient removes ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedEcVid) |
|
|
|
mc.deleteEcLocation(deletedEcVid, loc) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { |
|
|
|
return util.Retry("master grpc", func() error { |
|
|
|
for mc.currentMaster == "" { |
|
|
|