Browse Source

fix: master lose some volumes

pull/2885/head
shibinbin 3 years ago
parent
commit
c20e1edd99
  1. 3
      weed/server/master_grpc_server.go
  2. 11
      weed/topology/topology.go
  3. 4
      weed/topology/topology_event_handling.go

3
weed/server/master_grpc_server.go

@ -113,6 +113,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
} }
if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes { if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
ms.Topo.DataNodeRegistration(dcName, rackName, dn)
// process heartbeat.Volumes // process heartbeat.Volumes
stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc() stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc()
newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn) newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn)

11
weed/topology/topology.go

@ -283,3 +283,14 @@ func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolume
return return
} }
func (t *Topology) DataNodeRegistration(dcName, rackName string ,dn *DataNode){
if dn.Parent() != nil{
return
}
// registration to topo
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
rack.LinkChildNode(dn)
glog.Infof("[%s] reLink To topo ", dn.Id())
}

4
weed/topology/topology_event_handling.go

@ -1,6 +1,7 @@
package topology package topology
import ( import (
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/storage/types"
"google.golang.org/grpc" "google.golang.org/grpc"
"math/rand" "math/rand"
@ -84,7 +85,8 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
negativeUsages := dn.GetDiskUsages().negative() negativeUsages := dn.GetDiskUsages().negative()
dn.UpAdjustDiskUsageDelta(negativeUsages) dn.UpAdjustDiskUsageDelta(negativeUsages)
dn.DeltaUpdateVolumes([]storage.VolumeInfo{}, dn.GetVolumes())
dn.DeltaUpdateEcShards([]*erasure_coding.EcVolumeInfo{}, dn.GetEcShards())
if dn.Parent() != nil { if dn.Parent() != nil {
dn.Parent().UnlinkChildNode(dn.Id()) dn.Parent().UnlinkChildNode(dn.Id())
} }

Loading…
Cancel
Save