diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index de428d38f..3103dc207 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -83,8 +83,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume disk.DeleteVolumeById(vid) deletedVolumes = append(deletedVolumes, v) - deltaDiskUsages := newDiskUsages() - deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + deltaDiskUsage := &DiskUsageCounts{} deltaDiskUsage.volumeCount = -1 if v.IsRemote() { deltaDiskUsage.remoteVolumeCount = -1 @@ -92,7 +91,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume if !v.ReadOnly { deltaDiskUsage.activeVolumeCount = -1 } - disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + disk.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage) } } for _, v := range actualVolumes { @@ -120,8 +119,7 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu } disk.DeleteVolumeById(v.Id) - deltaDiskUsages := newDiskUsages() - deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + deltaDiskUsage := &DiskUsageCounts{} deltaDiskUsage.volumeCount = -1 if v.IsRemote() { deltaDiskUsage.remoteVolumeCount = -1 @@ -129,7 +127,7 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu if !v.ReadOnly { deltaDiskUsage.activeVolumeCount = -1 } - disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + disk.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage) } for _, v := range newVolumes { dn.doAddOrUpdateVolume(v) @@ -143,7 +141,6 @@ func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) { // the volume server may have set the max to zero continue } - deltaDiskUsages := newDiskUsages() dt := types.ToDiskType(diskType) currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt) currentDiskUsageMaxVolumeCount := atomic.LoadInt64(¤tDiskUsage.maxVolumeCount) @@ -151,9 +148,9 @@ func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) { continue } disk := dn.getOrCreateDisk(dt.String()) - deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt) - deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsageMaxVolumeCount - disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + disk.UpAdjustDiskUsageDelta(dt, &DiskUsageCounts{ + maxVolumeCount: int64(maxVolumeCount) - currentDiskUsageMaxVolumeCount, + }) } } diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index 100b44f59..839499982 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -26,12 +26,10 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) existingEcShards := dn.GetEcShards() // find out the newShards and deletedShards - var newShardCount, deletedShardCount int for _, ecShards := range existingEcShards { + var newShardCount, deletedShardCount int disk := dn.getOrCreateDisk(ecShards.DiskType) - deltaDiskUsages := newDiskUsages() - deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType)) vid := ecShards.VolumeId if actualEcShards, ok := actualEcShardMap[vid]; !ok { @@ -52,8 +50,11 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) } } - deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount) - disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + if (newShardCount - deletedShardCount) != 0 { + disk.UpAdjustDiskUsageDelta(types.ToDiskType(ecShards.DiskType), &DiskUsageCounts{ + ecShardCount: int64(newShardCount - deletedShardCount), + }) + } } @@ -65,10 +66,9 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) newShards = append(newShards, ecShards) disk := dn.getOrCreateDisk(ecShards.DiskType) - deltaDiskUsages := newDiskUsages() - deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType)) - deltaDiskUsage.ecShardCount = int64(ecShards.ShardIdCount()) - disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + disk.UpAdjustDiskUsageDelta(types.ToDiskType(ecShards.DiskType), &DiskUsageCounts{ + ecShardCount: int64(ecShards.ShardIdCount()), + }) } if len(newShards) > 0 || len(deletedShards) > 0 { diff --git a/weed/topology/disk.go b/weed/topology/disk.go index 4597bfc29..6d789e34b 100644 --- a/weed/topology/disk.go +++ b/weed/topology/disk.go @@ -152,8 +152,7 @@ func (d *Disk) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) { } func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) { - deltaDiskUsages := newDiskUsages() - deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + deltaDiskUsage := &DiskUsageCounts{} if oldV, ok := d.volumes[v.Id]; !ok { d.volumes[v.Id] = v deltaDiskUsage.volumeCount = 1 @@ -164,7 +163,7 @@ func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) deltaDiskUsage.activeVolumeCount = 1 } d.UpAdjustMaxVolumeId(v.Id) - d.UpAdjustDiskUsageDelta(deltaDiskUsages) + d.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage) isNew = true } else { if oldV.IsRemote() != v.IsRemote() { @@ -174,7 +173,7 @@ func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) if oldV.IsRemote() { deltaDiskUsage.remoteVolumeCount = -1 } - d.UpAdjustDiskUsageDelta(deltaDiskUsages) + d.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage) } isChanged = d.volumes[v.Id].ReadOnly != v.ReadOnly d.volumes[v.Id] = v diff --git a/weed/topology/disk_ec.go b/weed/topology/disk_ec.go index 4f950025f..1fea29272 100644 --- a/weed/topology/disk_ec.go +++ b/weed/topology/disk_ec.go @@ -29,10 +29,12 @@ func (d *Disk) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) { delta = existing.ShardBits.ShardIdCount() - oldCount } - deltaDiskUsages := newDiskUsages() - deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id()))) - deltaDiskUsage.ecShardCount = int64(delta) - d.UpAdjustDiskUsageDelta(deltaDiskUsages) + if delta == 0 { + return + } + d.UpAdjustDiskUsageDelta(types.ToDiskType(string(d.Id())), &DiskUsageCounts{ + ecShardCount: int64(delta), + }) } @@ -45,10 +47,11 @@ func (d *Disk) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { existing.ShardBits = existing.ShardBits.Minus(s.ShardBits) delta := existing.ShardBits.ShardIdCount() - oldCount - deltaDiskUsages := newDiskUsages() - deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id()))) - deltaDiskUsage.ecShardCount = int64(delta) - d.UpAdjustDiskUsageDelta(deltaDiskUsages) + if delta != 0 { + d.UpAdjustDiskUsageDelta(types.ToDiskType(string(d.Id())), &DiskUsageCounts{ + ecShardCount: int64(delta), + }) + } if existing.ShardBits.ShardIdCount() == 0 { delete(d.ecShards, s.VolumeId) diff --git a/weed/topology/node.go b/weed/topology/node.go index d33bbce2b..aa178b561 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -21,7 +21,7 @@ type Node interface { String() string AvailableSpaceFor(option *VolumeGrowOption) int64 ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error) - UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) + UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts) UpAdjustMaxVolumeId(vid needle.VolumeId) GetDiskUsages() *DiskUsages @@ -214,13 +214,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned return nil, errors.New("No free volume slot found!") } -func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative - for diskType, diskUsage := range deltaDiskUsages.usages { - existingDisk := n.getOrCreateDisk(diskType) - existingDisk.addDiskUsageCounts(diskUsage) - } +func (n *NodeImpl) UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts) { //can be negative + existingDisk := n.getOrCreateDisk(diskType) + existingDisk.addDiskUsageCounts(diskUsage) if n.parent != nil { - n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages) + n.parent.UpAdjustDiskUsageDelta(diskType, diskUsage) } } func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative @@ -244,7 +242,9 @@ func (n *NodeImpl) LinkChildNode(node Node) { func (n *NodeImpl) doLinkChildNode(node Node) { if n.children[node.Id()] == nil { n.children[node.Id()] = node - n.UpAdjustDiskUsageDelta(node.GetDiskUsages()) + for dt, du := range node.GetDiskUsages().usages { + n.UpAdjustDiskUsageDelta(dt, du) + } n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) node.SetParent(n) glog.V(0).Infoln(n, "adds child", node.Id()) @@ -258,7 +258,9 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { if node != nil { node.SetParent(nil) delete(n.children, node.Id()) - n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative()) + for dt, du := range node.GetDiskUsages().negative().usages { + n.UpAdjustDiskUsageDelta(dt, du) + } glog.V(0).Infoln(n, "removes", node.Id()) } } diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index ff1c642a6..109f29ee0 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -65,10 +65,9 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { if !volumeInfo.ReadOnly { disk := dn.getOrCreateDisk(volumeInfo.DiskType) - deltaDiskUsages := newDiskUsages() - deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(volumeInfo.DiskType)) - deltaDiskUsage.activeVolumeCount = -1 - disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + disk.UpAdjustDiskUsageDelta(types.ToDiskType(volumeInfo.DiskType), &DiskUsageCounts{ + activeVolumeCount: -1, + }) } } @@ -96,7 +95,9 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { } negativeUsages := dn.GetDiskUsages().negative() - dn.UpAdjustDiskUsageDelta(negativeUsages) + for dt, du := range negativeUsages.usages { + dn.UpAdjustDiskUsageDelta(dt, du) + } dn.DeltaUpdateVolumes([]storage.VolumeInfo{}, dn.GetVolumes()) dn.DeltaUpdateEcShards([]*erasure_coding.EcVolumeInfo{}, dn.GetEcShards()) if dn.Parent() != nil { diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index 1624ec32a..49dd0957c 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -122,10 +122,9 @@ func setup(topologyLayout string) *Topology { } disk := server.getOrCreateDisk("") - deltaDiskUsages := newDiskUsages() - deltaDiskUsage := deltaDiskUsages.getOrCreateDisk("") - deltaDiskUsage.maxVolumeCount = int64(serverMap["limit"].(float64)) - disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + disk.UpAdjustDiskUsageDelta("", &DiskUsageCounts{ + maxVolumeCount: int64(serverMap["limit"].(float64)), + }) } }