Browse Source

Address review comments from Gemini and CodeRabbit

Fix HIGH issues:
- Fix empty disk discovery: Now discovers all disks from VolumeInfos,
  not just from EC shards. This ensures disks without EC shards are
  still considered for placement.
- Fix EC shard count calculation in detection.go: Now correctly filters
  by DiskId and sums actual shard counts using ShardBits.ShardIdCount()
  instead of just counting EcShardInfo entries.

Fix MEDIUM issues:
- Add disk ID to evacuation log messages for consistency with other logging
- Remove unused serverToDisks variable in placement.go
- Fix comment that incorrectly said 'ascending' when sorting is 'descending'
pull/7597/head
Chris Lu 4 days ago
parent
commit
d5d6cbcaa3
  1. 41
      weed/shell/command_ec_common.go
  2. 4
      weed/shell/command_volume_server_evacuate.go
  3. 4
      weed/storage/erasure_coding/placement/placement.go
  4. 9
      weed/worker/tasks/erasure_coding/detection.go

41
weed/shell/command_ec_common.go

@ -436,13 +436,29 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
disks: make(map[uint32]*EcDisk),
}
// Build disk-level information from EC shard info
// Build disk-level information from volumes and EC shards
// First, discover all unique disk IDs from VolumeInfos (includes empty disks)
allDiskIds := make(map[uint32]string) // diskId -> diskType
for diskType, diskInfo := range dn.DiskInfos {
if diskInfo == nil {
continue
}
// Get all disk IDs from volumes
for _, vi := range diskInfo.VolumeInfos {
allDiskIds[vi.DiskId] = diskType
}
// Also get disk IDs from EC shards
for _, ecShardInfo := range diskInfo.EcShardInfos {
allDiskIds[ecShardInfo.DiskId] = diskType
}
}
// Group EC shards by disk_id
diskShards := make(map[uint32]map[needle.VolumeId]erasure_coding.ShardBits)
for _, diskInfo := range dn.DiskInfos {
if diskInfo == nil {
continue
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
diskId := ecShardInfo.DiskId
if diskShards[diskId] == nil {
@ -451,24 +467,24 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
vid := needle.VolumeId(ecShardInfo.Id)
diskShards[diskId][vid] = erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
}
}
// If no EC shards, still create disk entry based on DiskInfo.DiskId
if len(diskShards) == 0 && diskInfo.DiskId > 0 {
diskShards[diskInfo.DiskId] = make(map[needle.VolumeId]erasure_coding.ShardBits)
// Create EcDisk for each discovered disk
diskCount := len(allDiskIds)
if diskCount == 0 {
diskCount = 1
}
freePerDisk := int(freeEcSlots) / diskCount
// Create EcDisk for each disk_id found
for diskId, shards := range diskShards {
for diskId, diskType := range allDiskIds {
shards := diskShards[diskId]
if shards == nil {
shards = make(map[needle.VolumeId]erasure_coding.ShardBits)
}
totalShardCount := 0
for _, shardBits := range shards {
totalShardCount += shardBits.ShardIdCount()
}
// Estimate free slots per disk (simplified: divide evenly if multiple disks)
diskCount := len(diskShards)
if diskCount == 0 {
diskCount = 1
}
freePerDisk := int(freeEcSlots) / diskCount
ecNode.disks[diskId] = &EcDisk{
diskId: diskId,
@ -478,7 +494,6 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
ecShards: shards,
}
}
}
ecNodes = append(ecNodes, ecNode)
totalFreeEcSlots += freeEcSlots

4
weed/shell/command_volume_server_evacuate.go

@ -199,7 +199,11 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
}
vid := needle.VolumeId(ecShardInfo.Id)
destDiskId := pickBestDiskOnNode(emptyNode, vid)
if destDiskId > 0 {
fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId)
} else {
fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
}
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange)
if err != nil {
return

4
weed/storage/erasure_coding/placement/placement.go

@ -114,8 +114,6 @@ func SelectDestinations(disks []*DiskCandidate, config PlacementRequest) (*Place
// Build indexes for efficient lookup
rackToDisks := groupDisksByRack(suitable)
serverToDisks := groupDisksByServer(suitable)
_ = serverToDisks // Used for reference
result := &PlacementResult{
SelectedDisks: make([]*DiskCandidate, 0, config.ShardsNeeded),
@ -130,7 +128,7 @@ func SelectDestinations(disks []*DiskCandidate, config PlacementRequest) (*Place
// Pass 1: Select one disk from each rack (maximize rack diversity)
if config.PreferDifferentRacks {
// Sort racks by number of available servers (ascending) to prioritize underutilized racks
// Sort racks by number of available servers (descending) to prioritize racks with more options
sortedRacks := sortRacksByServerCount(rackToDisks)
for _, rackKey := range sortedRacks {
if len(result.SelectedDisks) >= config.ShardsNeeded {

9
weed/worker/tasks/erasure_coding/detection.go

@ -477,10 +477,15 @@ func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidat
freeSlots = 0
}
// Calculate EC shard count from EcShardInfos slice
// Calculate EC shard count for this specific disk
// EcShardInfos contains all shards, so we need to filter by DiskId and sum actual shard counts
ecShardCount := 0
if disk.DiskInfo.EcShardInfos != nil {
ecShardCount = len(disk.DiskInfo.EcShardInfos)
for _, shardInfo := range disk.DiskInfo.EcShardInfos {
if shardInfo.DiskId == disk.DiskID {
ecShardCount += erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIdCount()
}
}
}
candidates = append(candidates, &placement.DiskCandidate{

Loading…
Cancel
Save