diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 1f6f0e005..a6f714a0d 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -809,24 +809,48 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode, diskType types. }) } -// shardsByTypePerRack counts data shards (< dataShards) and parity shards (>= dataShards) per rack -func shardsByTypePerRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType, dataShards int) (dataPerRack, parityPerRack map[string][]erasure_coding.ShardId) { - dataPerRack = make(map[string][]erasure_coding.ShardId) - parityPerRack = make(map[string][]erasure_coding.ShardId) +// shardsByType is a generic helper that counts data and parity shards per group +func shardsByType(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType, dataShards int, keyExtractor func(*EcNode) string) (dataPerGroup, parityPerGroup map[string][]erasure_coding.ShardId) { + dataPerGroup = make(map[string][]erasure_coding.ShardId) + parityPerGroup = make(map[string][]erasure_coding.ShardId) for _, ecNode := range locations { si := findEcVolumeShardsInfo(ecNode, vid, diskType) - rackId := string(ecNode.rack) + groupKey := keyExtractor(ecNode) for _, shardId := range si.Ids() { if int(shardId) < dataShards { - dataPerRack[rackId] = append(dataPerRack[rackId], shardId) + dataPerGroup[groupKey] = append(dataPerGroup[groupKey], shardId) } else { - parityPerRack[rackId] = append(parityPerRack[rackId], shardId) + parityPerGroup[groupKey] = append(parityPerGroup[groupKey], shardId) } } } return } +// shardsByTypePerRack counts data shards (< dataShards) and parity shards (>= dataShards) per rack +func shardsByTypePerRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType, dataShards int) (dataPerRack, parityPerRack map[string][]erasure_coding.ShardId) { + return shardsByType(vid, locations, diskType, dataShards, func(ecNode *EcNode) string { + return string(ecNode.rack) + }) +} + +// shardsByTypePerNode counts data shards and parity shards per node +func shardsByTypePerNode(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType, dataShards int) (dataPerNode, parityPerNode map[string][]erasure_coding.ShardId) { + return shardsByType(vid, locations, diskType, dataShards, func(ecNode *EcNode) string { + return ecNode.info.Id + }) +} + +func countShardsByNode(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType) map[string]int { + return groupByCount(locations, func(ecNode *EcNode) (id string, count int) { + id = ecNode.info.Id + if si := findEcVolumeShardsInfo(ecNode, vid, diskType); si != nil { + count = si.Count() + } + return + }) +} + func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error { racks := ecb.racks() numRacks := len(racks) @@ -852,20 +876,30 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl rackToShardCount := countShardsByRack(vid, locations, ecb.diskType) // First pass: Balance data shards across racks - if err := ecb.balanceShardTypeAcrossRacks(collection, vid, racks, rackEcNodesWithVid, dataPerRack, rackToShardCount, maxDataPerRack, "data"); err != nil { + if err := ecb.balanceShardTypeAcrossRacks(collection, vid, racks, rackEcNodesWithVid, dataPerRack, rackToShardCount, maxDataPerRack, "data", nil); err != nil { return err } // Refresh locations after data shard moves and get parity distribution locations = ecb.collectVolumeIdToEcNodes(collection)[vid] - _, parityPerRack := shardsByTypePerRack(vid, locations, ecb.diskType, dataShardCount) + dataPerRack, parityPerRack := shardsByTypePerRack(vid, locations, ecb.diskType, dataShardCount) rackEcNodesWithVid = groupBy(locations, func(ecNode *EcNode) string { return string(ecNode.rack) }) rackToShardCount = countShardsByRack(vid, locations, ecb.diskType) - // Second pass: Balance parity shards across racks - if err := ecb.balanceShardTypeAcrossRacks(collection, vid, racks, rackEcNodesWithVid, parityPerRack, rackToShardCount, maxParityPerRack, "parity"); err != nil { + // Identify racks containing data shards to avoid for parity placement. + // We call this "antiAffinityRacks" because we want parity shards to have anti-affinity + // with racks that hold data shards, to ensure better fault tolerance. + antiAffinityRacks := make(map[string]bool) + for rackId, shards := range dataPerRack { + if len(shards) > 0 { + antiAffinityRacks[rackId] = true + } + } + + // Second pass: Balance parity shards across racks, ignoring racks with data shards if possible + if err := ecb.balanceShardTypeAcrossRacks(collection, vid, racks, rackEcNodesWithVid, parityPerRack, rackToShardCount, maxParityPerRack, "parity", antiAffinityRacks); err != nil { return err } @@ -882,6 +916,7 @@ func (ecb *ecBalancer) balanceShardTypeAcrossRacks( rackToShardCount map[string]int, maxPerRack int, shardType string, + antiAffinityRacks map[string]bool, ) error { // Find racks with too many shards of this type shardsToMove := make(map[erasure_coding.ShardId]*EcNode) @@ -908,7 +943,7 @@ func (ecb *ecBalancer) balanceShardTypeAcrossRacks( // Move shards to racks that have fewer than maxPerRack of this type for shardId, ecNode := range shardsToMove { // Find destination rack with room for this shard type - destRackId, err := ecb.pickRackForShardType(racks, shardsPerRack, maxPerRack, rackToShardCount) + destRackId, err := ecb.pickRackForShardType(racks, shardsPerRack, maxPerRack, rackToShardCount, antiAffinityRacks) if err != nil { fmt.Printf("ec %s shard %d.%d at %s can not find a destination rack:\n%s\n", shardType, vid, shardId, ecNode.info.Id, err.Error()) continue @@ -942,41 +977,145 @@ func (ecb *ecBalancer) balanceShardTypeAcrossRacks( return nil } -// pickRackForShardType selects a rack that has room for more shards of a specific type -func (ecb *ecBalancer) pickRackForShardType( - rackToEcNodes map[RackId]*EcRack, - shardsPerRack map[string][]erasure_coding.ShardId, - maxPerRack int, - rackToShardCount map[string]int, -) (RackId, error) { - var candidates []RackId - minShards := maxPerRack + 1 +// twoPassSelector implements two-pass selection with anti-affinity +// Pass 1: Select from candidates NOT in antiAffinity set +// Pass 2: Fallback to any valid candidate if Pass 1 yields no results +type twoPassSelector[T any] struct { + candidates []T + shardsPerTarget map[string][]erasure_coding.ShardId + maxPerTarget int + targetToShardCount map[string]int + antiAffinity map[string]bool + + // Functions to extract info from candidate + getKey func(T) string + hasFreeSlots func(T) bool + checkLimit func(T) bool // replica placement or other limits +} - for rackId, rack := range rackToEcNodes { - if rack.freeEcSlot <= 0 { +func (s *twoPassSelector[T]) selectCandidate() (T, error) { + var selected []T + minShards := s.maxPerTarget + 1 + + // Pass 1: Try candidates NOT in anti-affinity set + for _, candidate := range s.candidates { + if !s.hasFreeSlots(candidate) { continue } - currentCount := len(shardsPerRack[string(rackId)]) - if currentCount >= maxPerRack { + key := s.getKey(candidate) + currentCount := len(s.shardsPerTarget[key]) + if currentCount >= s.maxPerTarget { continue } - // For EC shards, replica placement constraint only applies when DiffRackCount > 0. - if ecb.replicaPlacement != nil && ecb.replicaPlacement.DiffRackCount > 0 && rackToShardCount[string(rackId)] >= ecb.replicaPlacement.DiffRackCount { + if !s.checkLimit(candidate) { continue } + + // Skip anti-affinity targets in pass 1 + if s.antiAffinity != nil && s.antiAffinity[key] { + continue + } + if currentCount < minShards { - candidates = nil + selected = nil minShards = currentCount } if currentCount == minShards { - candidates = append(candidates, rackId) + selected = append(selected, candidate) } } - if len(candidates) == 0 { + // Pass 2: Fallback if no candidates found + if len(selected) == 0 { + minShards = s.maxPerTarget + 1 + for _, candidate := range s.candidates { + if !s.hasFreeSlots(candidate) { + continue + } + key := s.getKey(candidate) + currentCount := len(s.shardsPerTarget[key]) + if currentCount >= s.maxPerTarget { + continue + } + if !s.checkLimit(candidate) { + continue + } + + if currentCount < minShards { + selected = nil + minShards = currentCount + } + if currentCount == minShards { + selected = append(selected, candidate) + } + } + } + + if len(selected) == 0 { + var zero T + return zero, errors.New("no valid candidate available") + } + return selected[rand.IntN(len(selected))], nil +} + +// pickRackForShardType selects a rack that has room for more shards of a specific type +func (ecb *ecBalancer) pickRackForShardType( + rackToEcNodes map[RackId]*EcRack, + shardsPerRack map[string][]erasure_coding.ShardId, + maxPerRack int, + rackToShardCount map[string]int, + antiAffinityRacks map[string]bool, +) (RackId, error) { + // Convert map to slice for iteration + var rackCandidates []struct { + id RackId + rack *EcRack + } + for id, rack := range rackToEcNodes { + rackCandidates = append(rackCandidates, struct { + id RackId + rack *EcRack + }{id, rack}) + } + + selector := &twoPassSelector[struct { + id RackId + rack *EcRack + }]{ + candidates: rackCandidates, + shardsPerTarget: shardsPerRack, + maxPerTarget: maxPerRack, + targetToShardCount: rackToShardCount, + antiAffinity: antiAffinityRacks, + getKey: func(c struct { + id RackId + rack *EcRack + }) string { + return string(c.id) + }, + hasFreeSlots: func(c struct { + id RackId + rack *EcRack + }) bool { + return c.rack.freeEcSlot > 0 + }, + checkLimit: func(c struct { + id RackId + rack *EcRack + }) bool { + // For EC shards, replica placement constraint only applies when DiffRackCount > 0. + if ecb.replicaPlacement != nil && ecb.replicaPlacement.DiffRackCount > 0 { + return rackToShardCount[string(c.id)] < ecb.replicaPlacement.DiffRackCount + } + return true + }, + } + + selected, err := selector.selectCandidate() + if err != nil { return "", errors.New("no rack available for shard type balancing") } - return candidates[rand.IntN(len(candidates))], nil + return selected.id, nil } func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int) (RackId, error) { @@ -1032,51 +1171,74 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { // see the volume's shards are in how many racks, and how many in each rack rackToShardCount := countShardsByRack(vid, locations, ecb.diskType) - rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string { - return string(ecNode.rack) - }) - - for rackId, _ := range rackToShardCount { + for rackId := range rackToShardCount { var possibleDestinationEcNodes []*EcNode for _, n := range racks[RackId(rackId)].ecNodes { if _, found := n.info.DiskInfos[string(ecb.diskType)]; found { possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) } } - sourceEcNodes := rackEcNodesWithVid[rackId] - averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes)) ewg.Add(func() error { - return ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes) + return ecb.doBalanceEcShardsWithinOneRack(collection, vid, possibleDestinationEcNodes) }) } } return ewg.Wait() } -func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error { - for _, ecNode := range existingLocations { +func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(collection string, vid needle.VolumeId, possibleDestinationEcNodes []*EcNode) error { + // Use configured EC scheme + dataShardCount := ecb.getDataShardCount() - si := findEcVolumeShardsInfo(ecNode, vid, ecb.diskType) - overLimitCount := si.Count() - averageShardsPerEcNode + // Get current distribution of data shards per node + dataPerNode, parityPerNode := shardsByTypePerNode(vid, possibleDestinationEcNodes, ecb.diskType, dataShardCount) - for _, shardId := range si.Ids() { + // Calculate max shards per node for each type + numNodes := len(possibleDestinationEcNodes) + if numNodes == 0 { + return nil + } - if overLimitCount <= 0 { - break - } + // Calculate totals based on actual shards present in the rack (subset of all shards) + totalData := 0 + for _, shards := range dataPerNode { + totalData += len(shards) + } + totalParity := 0 + for _, shards := range parityPerNode { + totalParity += len(shards) + } - fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId) + maxDataPerNode := ceilDivide(totalData, numNodes) + maxParityPerNode := ceilDivide(totalParity, numNodes) - err := ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes) - if err != nil { - return err - } + // Track total shard count per node + nodeToShardCount := countShardsByNode(vid, possibleDestinationEcNodes, ecb.diskType) + + // First pass: Balance data shards across nodes + if err := ecb.balanceShardTypeAcrossNodes(collection, vid, possibleDestinationEcNodes, dataPerNode, nodeToShardCount, maxDataPerNode, "data", nil); err != nil { + return err + } + + // Refresh locations after data shard moves + // We need to re-scan because moving shards changes node states + dataPerNode, parityPerNode = shardsByTypePerNode(vid, possibleDestinationEcNodes, ecb.diskType, dataShardCount) + nodeToShardCount = countShardsByNode(vid, possibleDestinationEcNodes, ecb.diskType) - overLimitCount-- + // Identify nodes containing data shards to avoid for parity placement + antiAffinityNodes := make(map[string]bool) + for nodeId, shards := range dataPerNode { + if len(shards) > 0 { + antiAffinityNodes[nodeId] = true } } + // Second pass: Balance parity shards across nodes, avoiding nodes with data shards if possible + if err := ecb.balanceShardTypeAcrossNodes(collection, vid, possibleDestinationEcNodes, parityPerNode, nodeToShardCount, maxParityPerNode, "parity", antiAffinityNodes); err != nil { + return err + } + return nil } @@ -1144,7 +1306,9 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { for _, shardId := range si.Ids() { vid := needle.VolumeId(shards.Id) // For balancing, strictly require matching disk type - destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true) + // For balancing, strictly require matching disk type and apply anti-affinity + dataShardCount := ecb.getDataShardCount() + destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true, shardId, dataShardCount) if destDiskId > 0 { fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId) @@ -1265,8 +1429,11 @@ func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int { // pickBestDiskOnNode selects the best disk on a node for placing a new EC shard // It prefers disks of the specified type with fewer shards and more free slots +// When shardId is provided and dataShardCount > 0, it applies anti-affinity: +// - For data shards (shardId < dataShardCount): prefer disks without parity shards +// - For parity shards (shardId >= dataShardCount): prefer disks without data shards // If strictDiskType is false, it will fall back to other disk types if no matching disk is found -func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType, strictDiskType bool) uint32 { +func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType, strictDiskType bool, shardId erasure_coding.ShardId, dataShardCount int) uint32 { if len(ecNode.disks) == 0 { return 0 // No disk info available, let the server decide } @@ -1276,21 +1443,47 @@ func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.Disk var fallbackDiskId uint32 fallbackScore := -1 + // Determine if we're placing a data or parity shard + isDataShard := dataShardCount > 0 && int(shardId) < dataShardCount + for diskId, disk := range ecNode.disks { if disk.freeEcSlots <= 0 { continue } - // Check if this volume already has shards on this disk + // Check existing shards on this disk for this volume existingShards := 0 + hasDataShards := false + hasParityShards := false if si, ok := disk.ecShards[vid]; ok { existingShards = si.Count() + // Check what type of shards are on this disk + if dataShardCount > 0 { + for _, existingShardId := range si.Ids() { + if int(existingShardId) < dataShardCount { + hasDataShards = true + } else { + hasParityShards = true + } + } + } } // Score: prefer disks with fewer total shards and fewer shards of this volume // Lower score is better score := disk.ecShardCount*10 + existingShards*100 + // Apply anti-affinity penalty if applicable + if dataShardCount > 0 { + if isDataShard && hasParityShards { + // Penalize placing data shard on disk with parity shards + score += 1000 + } else if !isDataShard && hasDataShards { + // Penalize placing parity shard on disk with data shards + score += 1000 + } + } + if disk.diskType == string(diskType) { // Matching disk type - this is preferred if bestScore == -1 || score < bestScore { @@ -1314,19 +1507,20 @@ func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.Disk } // pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk -func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, uint32, error) { +func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, shardId erasure_coding.ShardId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, uint32, error) { node, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinations) if err != nil { return nil, 0, err } - // For balancing, strictly require matching disk type - diskId := pickBestDiskOnNode(node, vid, ecb.diskType, true) + // For balancing, strictly require matching disk type and apply anti-affinity + dataShardCount := ecb.getDataShardCount() + diskId := pickBestDiskOnNode(node, vid, ecb.diskType, true, shardId, dataShardCount) return node, diskId, nil } func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error { - destNode, destDiskId, err := ecb.pickEcNodeAndDiskToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes) + destNode, destDiskId, err := ecb.pickEcNodeAndDiskToBalanceShardsInto(vid, shardId, existingLocation, possibleDestinationEcNodes) if err != nil { fmt.Printf("WARNING: Could not find suitable target node for %d.%d:\n%s", vid, shardId, err.Error()) return nil @@ -1441,3 +1635,114 @@ func compileCollectionPattern(pattern string) (*regexp.Regexp, error) { } return regexp.Compile(pattern) } + +// balanceShardTypeAcrossNodes spreads shards of a specific type (data or parity) evenly across nodes +func (ecb *ecBalancer) balanceShardTypeAcrossNodes( + collection string, + vid needle.VolumeId, + possibleDestinationEcNodes []*EcNode, + shardsPerNode map[string][]erasure_coding.ShardId, + nodeToShardCount map[string]int, + maxPerNode int, + shardType string, + antiAffinityNodes map[string]bool, +) error { + // Map ID to EcNode for lookup + nodeMap := make(map[string]*EcNode) + for _, n := range possibleDestinationEcNodes { + nodeMap[n.info.Id] = n + } + + // Find nodes with too many shards of this type + shardsToMove := make(map[erasure_coding.ShardId]*EcNode) + for nodeId, shards := range shardsPerNode { + if len(shards) <= maxPerNode { + continue + } + // Pick excess shards to move + excess := len(shards) - maxPerNode + ecNode := nodeMap[nodeId] + if ecNode == nil { + continue + } + + for i := 0; i < excess && i < len(shards); i++ { + shardId := shards[i] + // Verify node has this shard + si := findEcVolumeShardsInfo(ecNode, vid, ecb.diskType) + if si.Has(shardId) { + shardsToMove[shardId] = ecNode + } + } + } + + // Move shards to nodes that have fewer than maxPerNode of this type + for shardId, ecNode := range shardsToMove { + // Find destination node with room for this shard type + destNode, err := ecb.pickNodeForShardType(possibleDestinationEcNodes, shardsPerNode, maxPerNode, nodeToShardCount, antiAffinityNodes) + if err != nil { + fmt.Printf("ec %s shard %d.%d at %s can not find a destination node:\n%s\n", shardType, vid, shardId, ecNode.info.Id, err.Error()) + continue + } + + err = ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, []*EcNode{destNode}) + if err != nil { + return err + } + + // Update tracking + destNodeId := destNode.info.Id + shardsPerNode[destNodeId] = append(shardsPerNode[destNodeId], shardId) + + // Remove from source node + srcNodeId := ecNode.info.Id + for i, s := range shardsPerNode[srcNodeId] { + if s == shardId { + shardsPerNode[srcNodeId] = append(shardsPerNode[srcNodeId][:i], shardsPerNode[srcNodeId][i+1:]...) + break + } + } + nodeToShardCount[destNodeId] += 1 + nodeToShardCount[srcNodeId] -= 1 + destNode.freeEcSlot -= 1 + ecNode.freeEcSlot += 1 + } + + return nil +} + +// pickNodeForShardType selects a node that has room for more shards of a specific type +func (ecb *ecBalancer) pickNodeForShardType( + nodes []*EcNode, + shardsPerNode map[string][]erasure_coding.ShardId, + maxPerNode int, + nodeToShardCount map[string]int, + antiAffinityNodes map[string]bool, +) (*EcNode, error) { + selector := &twoPassSelector[*EcNode]{ + candidates: nodes, + shardsPerTarget: shardsPerNode, + maxPerTarget: maxPerNode, + targetToShardCount: nodeToShardCount, + antiAffinity: antiAffinityNodes, + getKey: func(n *EcNode) string { + return n.info.Id + }, + hasFreeSlots: func(n *EcNode) bool { + return n.freeEcSlot > 0 + }, + checkLimit: func(n *EcNode) bool { + // For EC shards, replica placement constraint only applies when SameRackCount > 0. + if ecb.replicaPlacement != nil && ecb.replicaPlacement.SameRackCount > 0 { + return nodeToShardCount[n.info.Id] < ecb.replicaPlacement.SameRackCount+1 + } + return true + }, + } + + selected, err := selector.selectCandidate() + if err != nil { + return nil, errors.New("no node available for shard type balancing") + } + return selected, nil +} diff --git a/weed/shell/command_ec_common_avoid_test.go b/weed/shell/command_ec_common_avoid_test.go new file mode 100644 index 000000000..b420eec5b --- /dev/null +++ b/weed/shell/command_ec_common_avoid_test.go @@ -0,0 +1,377 @@ +package shell + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +func TestPickRackForShardType_AntiAffinityRacks(t *testing.T) { + // Setup topology with 3 racks, each with 1 node, enough free slots + topo := &master_pb.TopologyInfo{ + Id: "test_topo", + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + buildRackWithEcShards("rack0", "node0:8080", 100, nil), + buildRackWithEcShards("rack1", "node1:8080", 100, nil), + buildRackWithEcShards("rack2", "node2:8080", 100, nil), + }, + }, + }, + } + + ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType) + ecb := &ecBalancer{ + ecNodes: ecNodes, + diskType: types.HardDriveType, + } + + racks := ecb.racks() + rackToShardCount := make(map[string]int) + shardsPerRack := make(map[string][]erasure_coding.ShardId) + maxPerRack := 2 + + // Case 1: Avoid rack0 + antiAffinityRacks := map[string]bool{"rack0": true} + + // Try multiple times to ensure randomness doesn't accidentally pass + for i := 0; i < 20; i++ { + picked, err := ecb.pickRackForShardType(racks, shardsPerRack, maxPerRack, rackToShardCount, antiAffinityRacks) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if picked == "rack0" { + t.Errorf("picked avoided rack rack0") + } + } + + // Case 2: Fallback - avoid all racks + avoidAll := map[string]bool{"rack0": true, "rack1": true, "rack2": true} + picked, err := ecb.pickRackForShardType(racks, shardsPerRack, maxPerRack, rackToShardCount, avoidAll) + if err != nil { + t.Fatalf("fallback failed: %v", err) + } + if picked == "" { + t.Errorf("expected some rack to be picked in fallback") + } +} + +func TestPickRackForShardType_EdgeCases(t *testing.T) { + t.Run("NoFreeSlots", func(t *testing.T) { + topo := &master_pb.TopologyInfo{ + Id: "test_topo", + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + buildRackWithEcShards("rack0", "node0:8080", 0, nil), // maxVolumes=0 + buildRackWithEcShards("rack1", "node1:8080", 0, nil), + }, + }, + }, + } + + ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType) + ecb := &ecBalancer{ + ecNodes: ecNodes, + diskType: types.HardDriveType, + } + + racks := ecb.racks() + _, err := ecb.pickRackForShardType(racks, make(map[string][]erasure_coding.ShardId), 2, make(map[string]int), nil) + if err == nil { + t.Error("expected error when no free slots, got nil") + } + }) + + t.Run("AllRacksAtMaxCapacity", func(t *testing.T) { + topo := &master_pb.TopologyInfo{ + Id: "test_topo", + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + buildRackWithEcShards("rack0", "node0:8080", 100, nil), + buildRackWithEcShards("rack1", "node1:8080", 100, nil), + }, + }, + }, + } + + ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType) + ecb := &ecBalancer{ + ecNodes: ecNodes, + diskType: types.HardDriveType, + } + + racks := ecb.racks() + shardsPerRack := map[string][]erasure_coding.ShardId{ + "rack0": {0, 1}, // 2 shards + "rack1": {2, 3}, // 2 shards + } + maxPerRack := 2 + + _, err := ecb.pickRackForShardType(racks, shardsPerRack, maxPerRack, make(map[string]int), nil) + if err == nil { + t.Error("expected error when all racks at max capacity, got nil") + } + }) + + t.Run("ReplicaPlacementLimit", func(t *testing.T) { + topo := &master_pb.TopologyInfo{ + Id: "test_topo", + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + buildRackWithEcShards("rack0", "node0:8080", 100, nil), + buildRackWithEcShards("rack1", "node1:8080", 100, nil), + }, + }, + }, + } + + ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType) + rp, _ := super_block.NewReplicaPlacementFromString("012") // DiffRackCount = 1 + ecb := &ecBalancer{ + ecNodes: ecNodes, + diskType: types.HardDriveType, + replicaPlacement: rp, + } + + racks := ecb.racks() + rackToShardCount := map[string]int{ + "rack0": 1, // At limit + "rack1": 0, + } + + picked, err := ecb.pickRackForShardType(racks, make(map[string][]erasure_coding.ShardId), 5, rackToShardCount, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if picked != "rack1" { + t.Errorf("expected rack1 (not at limit), got %v", picked) + } + }) + + t.Run("PreferFewerShards", func(t *testing.T) { + topo := &master_pb.TopologyInfo{ + Id: "test_topo", + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + buildRackWithEcShards("rack0", "node0:8080", 100, nil), + buildRackWithEcShards("rack1", "node1:8080", 100, nil), + buildRackWithEcShards("rack2", "node2:8080", 100, nil), + }, + }, + }, + } + + ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType) + ecb := &ecBalancer{ + ecNodes: ecNodes, + diskType: types.HardDriveType, + } + + racks := ecb.racks() + shardsPerRack := map[string][]erasure_coding.ShardId{ + "rack0": {0, 1}, // 2 shards + "rack1": {2}, // 1 shard + "rack2": {}, // 0 shards + } + + // Should pick rack2 (fewest shards) + picked, err := ecb.pickRackForShardType(racks, shardsPerRack, 5, make(map[string]int), nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if picked != "rack2" { + t.Errorf("expected rack2 (fewest shards), got %v", picked) + } + }) +} + +func TestPickNodeForShardType_AntiAffinityNodes(t *testing.T) { + // Setup topology with 1 rack, 3 nodes + topo := &master_pb.TopologyInfo{ + Id: "test_topo", + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack0", + DataNodeInfos: []*master_pb.DataNodeInfo{ + buildDataNode("node0:8080", 100), + buildDataNode("node1:8080", 100), + buildDataNode("node2:8080", 100), + }, + }, + }, + }, + }, + } + + ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType) + ecb := &ecBalancer{ + ecNodes: ecNodes, + diskType: types.HardDriveType, + } + + nodeToShardCount := make(map[string]int) + shardsPerNode := make(map[string][]erasure_coding.ShardId) + maxPerNode := 2 + + // Case 1: Avoid node0 + antiAffinityNodes := map[string]bool{"node0:8080": true} + + for i := 0; i < 20; i++ { + picked, err := ecb.pickNodeForShardType(ecNodes, shardsPerNode, maxPerNode, nodeToShardCount, antiAffinityNodes) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if picked.info.Id == "node0:8080" { + t.Errorf("picked avoided node node0") + } + } +} + +func TestPickNodeForShardType_EdgeCases(t *testing.T) { + t.Run("NoFreeSlots", func(t *testing.T) { + topo := &master_pb.TopologyInfo{ + Id: "test_topo", + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack0", + DataNodeInfos: []*master_pb.DataNodeInfo{ + buildDataNode("node0:8080", 0), // No capacity + }, + }, + }, + }, + }, + } + + ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType) + ecb := &ecBalancer{ + ecNodes: ecNodes, + diskType: types.HardDriveType, + } + + _, err := ecb.pickNodeForShardType(ecNodes, make(map[string][]erasure_coding.ShardId), 2, make(map[string]int), nil) + if err == nil { + t.Error("expected error when no free slots, got nil") + } + }) + + t.Run("ReplicaPlacementSameRackLimit", func(t *testing.T) { + topo := &master_pb.TopologyInfo{ + Id: "test_topo", + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack0", + DataNodeInfos: []*master_pb.DataNodeInfo{ + buildDataNode("node0:8080", 100), + buildDataNode("node1:8080", 100), + }, + }, + }, + }, + }, + } + + ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType) + rp, _ := super_block.NewReplicaPlacementFromString("021") // SameRackCount = 1 + ecb := &ecBalancer{ + ecNodes: ecNodes, + diskType: types.HardDriveType, + replicaPlacement: rp, + } + + nodeToShardCount := map[string]int{ + "node0:8080": 3, // Exceeds SameRackCount + 1 + "node1:8080": 0, + } + + picked, err := ecb.pickNodeForShardType(ecNodes, make(map[string][]erasure_coding.ShardId), 5, nodeToShardCount, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if picked.info.Id != "node1:8080" { + t.Errorf("expected node1 (not at limit), got %v", picked.info.Id) + } + }) +} + +func TestShardsByType(t *testing.T) { + vid := needle.VolumeId(123) + + // Create mock nodes with shards + nodes := []*EcNode{ + { + info: &master_pb.DataNodeInfo{ + Id: "node1", + DiskInfos: map[string]*master_pb.DiskInfo{ + string(types.HardDriveType): { + EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{ + { + Id: uint32(vid), + EcIndexBits: uint32((1 << 0) | (1 << 1) | (1 << 10) | (1 << 11)), // data: 0,1 parity: 10,11 + }, + }, + }, + }, + }, + rack: "rack1", + }, + } + + t.Run("Standard10Plus4", func(t *testing.T) { + dataPerRack, parityPerRack := shardsByTypePerRack(vid, nodes, types.HardDriveType, 10) + + if len(dataPerRack["rack1"]) != 2 { + t.Errorf("expected 2 data shards, got %d", len(dataPerRack["rack1"])) + } + if len(parityPerRack["rack1"]) != 2 { + t.Errorf("expected 2 parity shards, got %d", len(parityPerRack["rack1"])) + } + }) + + t.Run("NodeGrouping", func(t *testing.T) { + dataPerNode, parityPerNode := shardsByTypePerNode(vid, nodes, types.HardDriveType, 10) + + if len(dataPerNode["node1"]) != 2 { + t.Errorf("expected 2 data shards on node1, got %d", len(dataPerNode["node1"])) + } + if len(parityPerNode["node1"]) != 2 { + t.Errorf("expected 2 parity shards on node1, got %d", len(parityPerNode["node1"])) + } + }) +} + +func buildDataNode(nodeId string, maxVolumes int64) *master_pb.DataNodeInfo { + return &master_pb.DataNodeInfo{ + Id: nodeId, + DiskInfos: map[string]*master_pb.DiskInfo{ + string(types.HardDriveType): { + Type: string(types.HardDriveType), + MaxVolumeCount: maxVolumes, + VolumeCount: 0, + }, + }, + } +} diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 04e982f93..cd636e9e8 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -227,7 +227,8 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv } vid := needle.VolumeId(ecShardInfo.Id) // For evacuation, prefer same disk type but allow fallback to other types - destDiskId := pickBestDiskOnNode(emptyNode, vid, diskType, false) + // No anti-affinity needed for evacuation (dataShardCount=0) + destDiskId := pickBestDiskOnNode(emptyNode, vid, diskType, false, shardId, 0) if destDiskId > 0 { fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId) } else {