From 96496d5286ca172bf3237d856b2273a0b85e6819 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 23 Jun 2022 00:41:33 -0700 Subject: [PATCH] master: broadcast new volume locations to clients to avoid possible racing condition fix https://github.com/chrislusf/seaweedfs/issues/3220 --- weed/server/master_grpc_server_volume.go | 7 ++++- weed/topology/volume_growth.go | 39 +++++++++++++++--------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index bc92dd332..0382c2dae 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -52,8 +52,13 @@ func (ms *MasterServer) ProcessGrowRequest() { go func() { glog.V(1).Infoln("starting automatic volume grow") start := time.Now() - _, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count) + newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count) glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start)) + if err == nil { + for _, newVidLocation := range newVidLocations { + ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation}) + } + } vl.DoneGrowRequest() if req.ErrCh != nil { diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 7886c3998..238ca99f4 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -3,6 +3,7 @@ package topology import ( "encoding/json" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "math/rand" "sync" @@ -77,42 +78,50 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { return } -func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (count int, err error) { +func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (result []*master_pb.VolumeLocation, err error) { if targetCount == 0 { targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()) } - count, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo) - if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 { - return count, nil + result, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo) + if len(result) > 0 && len(result)%option.ReplicaPlacement.GetCopyCount() == 0 { + return result, nil } - return count, err + return result, err } -func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { +func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (result []*master_pb.VolumeLocation, err error) { vg.accessLock.Lock() defer vg.accessLock.Unlock() for i := 0; i < targetCount; i++ { - if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil { - counter += c + if res, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil { + result = append(result, res...) } else { - glog.V(0).Infof("create %d volume, created %d: %v", targetCount, counter, e) - return counter, e + glog.V(0).Infof("create %d volume, created %d: %v", targetCount, len(result), e) + return result, e } } return } -func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (int, error) { +func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) { servers, e := vg.findEmptySlotsForOneVolume(topo, option) if e != nil { - return 0, e + return nil, e } vid, raftErr := topo.NextVolumeId() if raftErr != nil { - return 0, raftErr + return nil, raftErr } - err := vg.grow(grpcDialOption, topo, vid, option, servers...) - return len(servers), err + if err = vg.grow(grpcDialOption, topo, vid, option, servers...); err == nil { + for _, server := range servers { + result = append(result, &master_pb.VolumeLocation{ + Url: server.Url(), + PublicUrl: server.PublicUrl, + NewVids: []uint32{uint32(vid)}, + }) + } + } + return } // 1. find the main data node