From 200a8627010b3824bbf3ecdc39bc42fb510ae634 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=B3=E6=98=8C=E6=9E=97?= Date: Thu, 30 Jun 2022 13:41:56 +0800 Subject: [PATCH] fixed `volume xx not found` caused by missing VolumeLocation events When the requested master node is the leader, `VolumeLocation` or `ClusterNodeUpdate` may be returned here. If it is `VolumeLocation`, the update will be performed while resetting the vidMap, otherwise the event will be lost --- weed/wdclient/masterclient.go | 68 +++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 35f1c4cf8..3e76dc0c5 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -168,15 +168,19 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL } // check if it is the leader to determine whether to reset the vidMap - if resp.VolumeLocation != nil && resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader { - glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader) - nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) - stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc() - return nil + if resp.VolumeLocation != nil { + if resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader { + glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader) + nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) + stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc() + return nil + } + mc.vidMap = newVidMap("") + mc.updateVidMap(resp) + } else { + mc.vidMap = newVidMap("") } - mc.currentMaster = master - mc.vidMap = newVidMap("") for { resp, err := stream.Recv() @@ -195,29 +199,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL return nil } - // process new volume location - loc := Location{ - Url: resp.VolumeLocation.Url, - PublicUrl: resp.VolumeLocation.PublicUrl, - DataCenter: resp.VolumeLocation.DataCenter, - GrpcPort: int(resp.VolumeLocation.GrpcPort), - } - for _, newVid := range resp.VolumeLocation.NewVids { - glog.V(1).Infof("%s.%s: %s masterClient adds volume %d", mc.FilerGroup, mc.clientType, loc.Url, newVid) - mc.addLocation(newVid, loc) - } - for _, deletedVid := range resp.VolumeLocation.DeletedVids { - glog.V(1).Infof("%s.%s: %s masterClient removes volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedVid) - mc.deleteLocation(deletedVid, loc) - } - for _, newEcVid := range resp.VolumeLocation.NewEcVids { - glog.V(1).Infof("%s.%s: %s masterClient adds ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, newEcVid) - mc.addEcLocation(newEcVid, loc) - } - for _, deletedEcVid := range resp.VolumeLocation.DeletedEcVids { - glog.V(1).Infof("%s.%s: %s masterClient removes ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedEcVid) - mc.deleteEcLocation(deletedEcVid, loc) - } + mc.updateVidMap(resp) } if resp.ClusterNodeUpdate != nil { @@ -245,6 +227,32 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL return } +func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) { + // process new volume location + loc := Location{ + Url: resp.VolumeLocation.Url, + PublicUrl: resp.VolumeLocation.PublicUrl, + DataCenter: resp.VolumeLocation.DataCenter, + GrpcPort: int(resp.VolumeLocation.GrpcPort), + } + for _, newVid := range resp.VolumeLocation.NewVids { + glog.V(1).Infof("%s.%s: %s masterClient adds volume %d", mc.FilerGroup, mc.clientType, loc.Url, newVid) + mc.addLocation(newVid, loc) + } + for _, deletedVid := range resp.VolumeLocation.DeletedVids { + glog.V(1).Infof("%s.%s: %s masterClient removes volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedVid) + mc.deleteLocation(deletedVid, loc) + } + for _, newEcVid := range resp.VolumeLocation.NewEcVids { + glog.V(1).Infof("%s.%s: %s masterClient adds ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, newEcVid) + mc.addEcLocation(newEcVid, loc) + } + for _, deletedEcVid := range resp.VolumeLocation.DeletedEcVids { + glog.V(1).Infof("%s.%s: %s masterClient removes ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedEcVid) + mc.deleteEcLocation(deletedEcVid, loc) + } +} + func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { return util.Retry("master grpc", func() error { for mc.currentMaster == "" {