From 90b134f83088c116faaac69fde1b4f3dd2dca161 Mon Sep 17 00:00:00 2001 From: chrislusf Date: Tue, 2 Dec 2025 12:51:01 -0800 Subject: [PATCH] ec: update helper functions to use configurable diskType Update the following functions to accept/use diskType parameter: - findEcVolumeShards() - addEcVolumeShards() - deleteEcVolumeShards() - moveMountedShardToEcNode() - countShardsByRack() - pickNEcShardsToMoveFrom() All ecBalancer methods now use ecb.diskType instead of hardcoded types.HardDriveType. Non-ecBalancer callers (like volumeServer.evacuate and ec.rebuild) use types.HardDriveType as the default. Update all test files to pass diskType where needed. --- weed/shell/command_ec_common.go | 56 ++++++++++---------- weed/shell/command_ec_common_test.go | 3 +- weed/shell/command_ec_rebuild.go | 2 +- weed/shell/command_ec_test.go | 8 ++- weed/shell/command_volume_server_evacuate.go | 2 +- 5 files changed, 39 insertions(+), 32 deletions(-) diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 06284edf8..aa9b49cb1 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -242,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol return collections } -func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) { +func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool, diskType types.DiskType) (err error) { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") @@ -280,8 +280,8 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, } - destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds) - existingLocation.deleteEcVolumeShards(vid, copiedShardIds) + destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds, diskType) + existingLocation.deleteEcVolumeShards(vid, copiedShardIds, diskType) return nil @@ -551,9 +551,9 @@ func ceilDivide(a, b int) int { return (a / b) + r } -func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { +func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) erasure_coding.ShardBits { - if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found { + if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found { for _, shardInfo := range diskInfo.EcShardInfos { if needle.VolumeId(shardInfo.Id) == vid { return erasure_coding.ShardBits(shardInfo.EcIndexBits) @@ -564,10 +564,10 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar return 0 } -func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode { +func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32, diskType types.DiskType) *EcNode { foundVolume := false - diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + diskInfo, found := ecNode.info.DiskInfos[string(diskType)] if found { for _, shardInfo := range diskInfo.EcShardInfos { if needle.VolumeId(shardInfo.Id) == vid { @@ -584,9 +584,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, } } else { diskInfo = &master_pb.DiskInfo{ - Type: string(types.HardDriveType), + Type: string(diskType), } - ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo + ecNode.info.DiskInfos[string(diskType)] = diskInfo } if !foundVolume { @@ -598,7 +598,7 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, Id: uint32(vid), Collection: collection, EcIndexBits: uint32(newShardBits), - DiskType: string(types.HardDriveType), + DiskType: string(diskType), }) ecNode.freeEcSlot -= len(shardIds) } @@ -606,9 +606,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, return ecNode } -func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode { +func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32, diskType types.DiskType) *EcNode { - if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found { + if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found { for _, shardInfo := range diskInfo.EcShardInfos { if needle.VolumeId(shardInfo.Id) == vid { oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) @@ -706,7 +706,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum // Use MaxShardCount (32) to support custom EC ratios shardToLocations := make([][]*EcNode, erasure_coding.MaxShardCount) for _, ecNode := range locations { - shardBits := findEcVolumeShards(ecNode, vid) + shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType) for _, shardId := range shardBits.ShardIds() { shardToLocations[shardId] = append(shardToLocations[shardId], ecNode) } @@ -729,7 +729,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { return err } - ecNode.deleteEcVolumeShards(vid, duplicatedShardIds) + ecNode.deleteEcVolumeShards(vid, duplicatedShardIds, ecb.diskType) } } return nil @@ -749,9 +749,9 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error { return ewg.Wait() } -func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int { +func countShardsByRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType) map[string]int { return groupByCount(locations, func(ecNode *EcNode) (id string, count int) { - shardBits := findEcVolumeShards(ecNode, vid) + shardBits := findEcVolumeShards(ecNode, vid, diskType) return string(ecNode.rack), shardBits.ShardIdCount() }) } @@ -760,7 +760,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl racks := ecb.racks() // see the volume's shards are in how many racks, and how many in each rack - rackToShardCount := countShardsByRack(vid, locations) + rackToShardCount := countShardsByRack(vid, locations, ecb.diskType) // Calculate actual total shards for this volume (not hardcoded default) var totalShardsForVolume int @@ -780,7 +780,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl continue } possibleEcNodes := rackEcNodesWithVid[rackId] - for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) { + for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack, ecb.diskType) { ecShardsToMove[shardId] = ecNode } } @@ -857,7 +857,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { for vid, locations := range vidLocations { // see the volume's shards are in how many racks, and how many in each rack - rackToShardCount := countShardsByRack(vid, locations) + rackToShardCount := countShardsByRack(vid, locations, ecb.diskType) rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string { return string(ecNode.rack) }) @@ -866,7 +866,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { var possibleDestinationEcNodes []*EcNode for _, n := range racks[RackId(rackId)].ecNodes { - if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found { + if _, found := n.info.DiskInfos[string(ecb.diskType)]; found { possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) } } @@ -883,7 +883,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error { for _, ecNode := range existingLocations { - shardBits := findEcVolumeShards(ecNode, vid) + shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType) overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode for _, shardId := range shardBits.ShardIds() { @@ -974,7 +974,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) } - err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing) + err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType) if err != nil { return err } @@ -1004,7 +1004,7 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi nodeShards := map[*EcNode]int{} for _, node := range possibleDestinations { - nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount() + nodeShards[node] = findEcVolumeShards(node, vid, ecb.diskType).ShardIdCount() } targets := []*EcNode{} @@ -1135,14 +1135,14 @@ func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, co } else { fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) } - return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing) + return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing, ecb.diskType) } -func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode { +func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, diskType types.DiskType) map[erasure_coding.ShardId]*EcNode { picked := make(map[erasure_coding.ShardId]*EcNode) var candidateEcNodes []*CandidateEcNode for _, ecNode := range ecNodes { - shardBits := findEcVolumeShards(ecNode, vid) + shardBits := findEcVolumeShards(ecNode, vid, diskType) if shardBits.ShardIdCount() > 0 { candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{ ecNode: ecNode, @@ -1156,13 +1156,13 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[ for i := 0; i < n; i++ { selectedEcNodeIndex := -1 for i, candidateEcNode := range candidateEcNodes { - shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid) + shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid, diskType) if shardBits > 0 { selectedEcNodeIndex = i for _, shardId := range shardBits.ShardIds() { candidateEcNode.shardCount-- picked[shardId] = candidateEcNode.ecNode - candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}) + candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}, diskType) break } break diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go index eac3d7860..c6e40ce79 100644 --- a/weed/shell/command_ec_common_test.go +++ b/weed/shell/command_ec_common_test.go @@ -155,10 +155,11 @@ func TestPickRackToBalanceShardsInto(t *testing.T) { ecb := &ecBalancer{ ecNodes: ecNodes, replicaPlacement: rp, + diskType: types.HardDriveType, } racks := ecb.racks() - rackToShardCount := countShardsByRack(vid, ecNodes) + rackToShardCount := countShardsByRack(vid, ecNodes, types.HardDriveType) got, gotErr := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount) if err := errorCheck(gotErr, tc.wantErr); err != nil { diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 698705853..97c689248 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -295,7 +295,7 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo // ensure ECNode updates are atomic erb.ecNodesMu.Lock() defer erb.ecNodesMu.Unlock() - rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds) + rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds, types.HardDriveType) return nil } diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index fa6697435..7d7b59f8f 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func TestCommandEcBalanceSmall(t *testing.T) { @@ -14,6 +15,7 @@ func TestCommandEcBalanceSmall(t *testing.T) { newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), }, applyBalancing: false, + diskType: types.HardDriveType, } ecb.balanceEcVolumes("c1") @@ -30,6 +32,7 @@ func TestCommandEcBalanceNothingToMove(t *testing.T) { addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), }, applyBalancing: false, + diskType: types.HardDriveType, } ecb.balanceEcVolumes("c1") @@ -48,6 +51,7 @@ func TestCommandEcBalanceAddNewServers(t *testing.T) { newEcNode("dc1", "rack1", "dn4", 100), }, applyBalancing: false, + diskType: types.HardDriveType, } ecb.balanceEcVolumes("c1") @@ -66,6 +70,7 @@ func TestCommandEcBalanceAddNewRacks(t *testing.T) { newEcNode("dc1", "rack2", "dn4", 100), }, applyBalancing: false, + diskType: types.HardDriveType, } ecb.balanceEcVolumes("c1") @@ -109,6 +114,7 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) { newEcNode("dc1", "rack1", "dn3", 100), }, applyBalancing: false, + diskType: types.HardDriveType, } ecb.balanceEcVolumes("c1") @@ -128,5 +134,5 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod } func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []uint32) *EcNode { - return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds) + return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds, types.HardDriveType) } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index a61593231..a074f4ff2 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -204,7 +204,7 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv } else { fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id) } - err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange) + err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange, types.HardDriveType) if err != nil { return } else {