Browse Source

optimize memory usage for large number of volumes

1. unwrap the map to avoid extra map object creation
2. fix ec shard counting in UpdateEcShards
pull/6118/head
chrislu 2 months ago
parent
commit
35fd1e1c9a
  1. 17
      weed/topology/data_node.go
  2. 18
      weed/topology/data_node_ec.go
  3. 7
      weed/topology/disk.go
  4. 19
      weed/topology/disk_ec.go
  5. 20
      weed/topology/node.go
  6. 11
      weed/topology/topology_event_handling.go
  7. 7
      weed/topology/volume_growth_test.go

17
weed/topology/data_node.go

@ -83,8 +83,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
disk.DeleteVolumeById(vid)
deletedVolumes = append(deletedVolumes, v)
deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
deltaDiskUsage := &DiskUsageCounts{}
deltaDiskUsage.volumeCount = -1
if v.IsRemote() {
deltaDiskUsage.remoteVolumeCount = -1
@ -92,7 +91,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
if !v.ReadOnly {
deltaDiskUsage.activeVolumeCount = -1
}
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
disk.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage)
}
}
for _, v := range actualVolumes {
@ -120,8 +119,7 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu
}
disk.DeleteVolumeById(v.Id)
deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
deltaDiskUsage := &DiskUsageCounts{}
deltaDiskUsage.volumeCount = -1
if v.IsRemote() {
deltaDiskUsage.remoteVolumeCount = -1
@ -129,7 +127,7 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu
if !v.ReadOnly {
deltaDiskUsage.activeVolumeCount = -1
}
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
disk.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage)
}
for _, v := range newVolumes {
dn.doAddOrUpdateVolume(v)
@ -143,7 +141,6 @@ func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
// the volume server may have set the max to zero
continue
}
deltaDiskUsages := newDiskUsages()
dt := types.ToDiskType(diskType)
currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt)
currentDiskUsageMaxVolumeCount := atomic.LoadInt64(&currentDiskUsage.maxVolumeCount)
@ -151,9 +148,9 @@ func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
continue
}
disk := dn.getOrCreateDisk(dt.String())
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt)
deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsageMaxVolumeCount
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
disk.UpAdjustDiskUsageDelta(dt, &DiskUsageCounts{
maxVolumeCount: int64(maxVolumeCount) - currentDiskUsageMaxVolumeCount,
})
}
}

18
weed/topology/data_node_ec.go

@ -26,12 +26,10 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
existingEcShards := dn.GetEcShards()
// find out the newShards and deletedShards
var newShardCount, deletedShardCount int
for _, ecShards := range existingEcShards {
var newShardCount, deletedShardCount int
disk := dn.getOrCreateDisk(ecShards.DiskType)
deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
vid := ecShards.VolumeId
if actualEcShards, ok := actualEcShardMap[vid]; !ok {
@ -52,8 +50,11 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
}
}
deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount)
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
if (newShardCount - deletedShardCount) != 0 {
disk.UpAdjustDiskUsageDelta(types.ToDiskType(ecShards.DiskType), &DiskUsageCounts{
ecShardCount: int64(newShardCount - deletedShardCount),
})
}
}
@ -65,10 +66,9 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
newShards = append(newShards, ecShards)
disk := dn.getOrCreateDisk(ecShards.DiskType)
deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
deltaDiskUsage.ecShardCount = int64(ecShards.ShardIdCount())
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
disk.UpAdjustDiskUsageDelta(types.ToDiskType(ecShards.DiskType), &DiskUsageCounts{
ecShardCount: int64(ecShards.ShardIdCount()),
})
}
if len(newShards) > 0 || len(deletedShards) > 0 {

7
weed/topology/disk.go

@ -152,8 +152,7 @@ func (d *Disk) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) {
}
func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) {
deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
deltaDiskUsage := &DiskUsageCounts{}
if oldV, ok := d.volumes[v.Id]; !ok {
d.volumes[v.Id] = v
deltaDiskUsage.volumeCount = 1
@ -164,7 +163,7 @@ func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool)
deltaDiskUsage.activeVolumeCount = 1
}
d.UpAdjustMaxVolumeId(v.Id)
d.UpAdjustDiskUsageDelta(deltaDiskUsages)
d.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage)
isNew = true
} else {
if oldV.IsRemote() != v.IsRemote() {
@ -174,7 +173,7 @@ func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool)
if oldV.IsRemote() {
deltaDiskUsage.remoteVolumeCount = -1
}
d.UpAdjustDiskUsageDelta(deltaDiskUsages)
d.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage)
}
isChanged = d.volumes[v.Id].ReadOnly != v.ReadOnly
d.volumes[v.Id] = v

19
weed/topology/disk_ec.go

@ -29,10 +29,12 @@ func (d *Disk) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
delta = existing.ShardBits.ShardIdCount() - oldCount
}
deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
deltaDiskUsage.ecShardCount = int64(delta)
d.UpAdjustDiskUsageDelta(deltaDiskUsages)
if delta == 0 {
return
}
d.UpAdjustDiskUsageDelta(types.ToDiskType(string(d.Id())), &DiskUsageCounts{
ecShardCount: int64(delta),
})
}
@ -45,10 +47,11 @@ func (d *Disk) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
existing.ShardBits = existing.ShardBits.Minus(s.ShardBits)
delta := existing.ShardBits.ShardIdCount() - oldCount
deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
deltaDiskUsage.ecShardCount = int64(delta)
d.UpAdjustDiskUsageDelta(deltaDiskUsages)
if delta != 0 {
d.UpAdjustDiskUsageDelta(types.ToDiskType(string(d.Id())), &DiskUsageCounts{
ecShardCount: int64(delta),
})
}
if existing.ShardBits.ShardIdCount() == 0 {
delete(d.ecShards, s.VolumeId)

20
weed/topology/node.go

@ -21,7 +21,7 @@ type Node interface {
String() string
AvailableSpaceFor(option *VolumeGrowOption) int64
ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages)
UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts)
UpAdjustMaxVolumeId(vid needle.VolumeId)
GetDiskUsages() *DiskUsages
@ -214,13 +214,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned
return nil, errors.New("No free volume slot found!")
}
func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative
for diskType, diskUsage := range deltaDiskUsages.usages {
existingDisk := n.getOrCreateDisk(diskType)
existingDisk.addDiskUsageCounts(diskUsage)
}
func (n *NodeImpl) UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts) { //can be negative
existingDisk := n.getOrCreateDisk(diskType)
existingDisk.addDiskUsageCounts(diskUsage)
if n.parent != nil {
n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages)
n.parent.UpAdjustDiskUsageDelta(diskType, diskUsage)
}
}
func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
@ -244,7 +242,9 @@ func (n *NodeImpl) LinkChildNode(node Node) {
func (n *NodeImpl) doLinkChildNode(node Node) {
if n.children[node.Id()] == nil {
n.children[node.Id()] = node
n.UpAdjustDiskUsageDelta(node.GetDiskUsages())
for dt, du := range node.GetDiskUsages().usages {
n.UpAdjustDiskUsageDelta(dt, du)
}
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
node.SetParent(n)
glog.V(0).Infoln(n, "adds child", node.Id())
@ -258,7 +258,9 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
if node != nil {
node.SetParent(nil)
delete(n.children, node.Id())
n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative())
for dt, du := range node.GetDiskUsages().negative().usages {
n.UpAdjustDiskUsageDelta(dt, du)
}
glog.V(0).Infoln(n, "removes", node.Id())
}
}

11
weed/topology/topology_event_handling.go

@ -65,10 +65,9 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
if !volumeInfo.ReadOnly {
disk := dn.getOrCreateDisk(volumeInfo.DiskType)
deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(volumeInfo.DiskType))
deltaDiskUsage.activeVolumeCount = -1
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
disk.UpAdjustDiskUsageDelta(types.ToDiskType(volumeInfo.DiskType), &DiskUsageCounts{
activeVolumeCount: -1,
})
}
}
@ -96,7 +95,9 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
}
negativeUsages := dn.GetDiskUsages().negative()
dn.UpAdjustDiskUsageDelta(negativeUsages)
for dt, du := range negativeUsages.usages {
dn.UpAdjustDiskUsageDelta(dt, du)
}
dn.DeltaUpdateVolumes([]storage.VolumeInfo{}, dn.GetVolumes())
dn.DeltaUpdateEcShards([]*erasure_coding.EcVolumeInfo{}, dn.GetEcShards())
if dn.Parent() != nil {

7
weed/topology/volume_growth_test.go

@ -122,10 +122,9 @@ func setup(topologyLayout string) *Topology {
}
disk := server.getOrCreateDisk("")
deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk("")
deltaDiskUsage.maxVolumeCount = int64(serverMap["limit"].(float64))
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
disk.UpAdjustDiskUsageDelta("", &DiskUsageCounts{
maxVolumeCount: int64(serverMap["limit"].(float64)),
})
}
}

Loading…
Cancel
Save