@ -168,15 +168,19 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
}
}
// check if it is the leader to determine whether to reset the vidMap
// check if it is the leader to determine whether to reset the vidMap
if resp . VolumeLocation != nil && resp . VolumeLocation . Leader != "" && string ( master ) != resp . VolumeLocation . Leader {
if resp . VolumeLocation != nil {
if resp . VolumeLocation . Leader != "" && string ( master ) != resp . VolumeLocation . Leader {
glog . V ( 0 ) . Infof ( "master %v redirected to leader %v" , master , resp . VolumeLocation . Leader )
glog . V ( 0 ) . Infof ( "master %v redirected to leader %v" , master , resp . VolumeLocation . Leader )
nextHintedLeader = pb . ServerAddress ( resp . VolumeLocation . Leader )
nextHintedLeader = pb . ServerAddress ( resp . VolumeLocation . Leader )
stats . MasterClientConnectCounter . WithLabelValues ( stats . RedirectedToleader ) . Inc ( )
stats . MasterClientConnectCounter . WithLabelValues ( stats . RedirectedToleader ) . Inc ( )
return nil
return nil
}
}
mc . currentMaster = master
mc . vidMap = newVidMap ( "" )
mc . vidMap = newVidMap ( "" )
mc . updateVidMap ( resp )
} else {
mc . vidMap = newVidMap ( "" )
}
mc . currentMaster = master
for {
for {
resp , err := stream . Recv ( )
resp , err := stream . Recv ( )
@ -195,29 +199,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
return nil
return nil
}
}
// process new volume location
loc := Location {
Url : resp . VolumeLocation . Url ,
PublicUrl : resp . VolumeLocation . PublicUrl ,
DataCenter : resp . VolumeLocation . DataCenter ,
GrpcPort : int ( resp . VolumeLocation . GrpcPort ) ,
}
for _ , newVid := range resp . VolumeLocation . NewVids {
glog . V ( 1 ) . Infof ( "%s.%s: %s masterClient adds volume %d" , mc . FilerGroup , mc . clientType , loc . Url , newVid )
mc . addLocation ( newVid , loc )
}
for _ , deletedVid := range resp . VolumeLocation . DeletedVids {
glog . V ( 1 ) . Infof ( "%s.%s: %s masterClient removes volume %d" , mc . FilerGroup , mc . clientType , loc . Url , deletedVid )
mc . deleteLocation ( deletedVid , loc )
}
for _ , newEcVid := range resp . VolumeLocation . NewEcVids {
glog . V ( 1 ) . Infof ( "%s.%s: %s masterClient adds ec volume %d" , mc . FilerGroup , mc . clientType , loc . Url , newEcVid )
mc . addEcLocation ( newEcVid , loc )
}
for _ , deletedEcVid := range resp . VolumeLocation . DeletedEcVids {
glog . V ( 1 ) . Infof ( "%s.%s: %s masterClient removes ec volume %d" , mc . FilerGroup , mc . clientType , loc . Url , deletedEcVid )
mc . deleteEcLocation ( deletedEcVid , loc )
}
mc . updateVidMap ( resp )
}
}
if resp . ClusterNodeUpdate != nil {
if resp . ClusterNodeUpdate != nil {
@ -245,6 +227,32 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
return
return
}
}
func ( mc * MasterClient ) updateVidMap ( resp * master_pb . KeepConnectedResponse ) {
// process new volume location
loc := Location {
Url : resp . VolumeLocation . Url ,
PublicUrl : resp . VolumeLocation . PublicUrl ,
DataCenter : resp . VolumeLocation . DataCenter ,
GrpcPort : int ( resp . VolumeLocation . GrpcPort ) ,
}
for _ , newVid := range resp . VolumeLocation . NewVids {
glog . V ( 1 ) . Infof ( "%s.%s: %s masterClient adds volume %d" , mc . FilerGroup , mc . clientType , loc . Url , newVid )
mc . addLocation ( newVid , loc )
}
for _ , deletedVid := range resp . VolumeLocation . DeletedVids {
glog . V ( 1 ) . Infof ( "%s.%s: %s masterClient removes volume %d" , mc . FilerGroup , mc . clientType , loc . Url , deletedVid )
mc . deleteLocation ( deletedVid , loc )
}
for _ , newEcVid := range resp . VolumeLocation . NewEcVids {
glog . V ( 1 ) . Infof ( "%s.%s: %s masterClient adds ec volume %d" , mc . FilerGroup , mc . clientType , loc . Url , newEcVid )
mc . addEcLocation ( newEcVid , loc )
}
for _ , deletedEcVid := range resp . VolumeLocation . DeletedEcVids {
glog . V ( 1 ) . Infof ( "%s.%s: %s masterClient removes ec volume %d" , mc . FilerGroup , mc . clientType , loc . Url , deletedEcVid )
mc . deleteEcLocation ( deletedEcVid , loc )
}
}
func ( mc * MasterClient ) WithClient ( streamingMode bool , fn func ( client master_pb . SeaweedClient ) error ) error {
func ( mc * MasterClient ) WithClient ( streamingMode bool , fn func ( client master_pb . SeaweedClient ) error ) error {
return util . Retry ( "master grpc" , func ( ) error {
return util . Retry ( "master grpc" , func ( ) error {
for mc . currentMaster == "" {
for mc . currentMaster == "" {