From 559a1fd0f4565bca3e2f4e6f0d90d188c7b3377a Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Wed, 27 Nov 2024 20:51:57 +0100 Subject: [PATCH] Improve EC shards rebalancing logic across nodes (#6297) * Improve EC shards rebalancing logic across nodes. - Favor target nodes with less preexisting shards, to ensure a fair distribution. - Randomize selection when multiple possible target nodes are available. - Add logic to account for replication settings when selecting target nodes (currently disabled). * Fix minor test typo. * Clarify internal error messages for `pickEcNodeToBalanceShardsInto()`. --- weed/shell/command_ec_common.go | 80 ++++++++++++++----- weed/shell/command_ec_common_test.go | 112 ++++++++++++++++++++++++--- 2 files changed, 159 insertions(+), 33 deletions(-) diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 7328fffe7..f70bc0131 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -2,6 +2,7 @@ package shell import ( "context" + "errors" "fmt" "math/rand/v2" @@ -481,6 +482,7 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int }) } +// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards. func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { // calculate average number of shards an ec rack should have for one volume averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) @@ -527,6 +529,7 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid return nil } +// TOOD: Return an error with details upon failure to resolve a destination rack. func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcRack int) RackId { targets := []RackId{} targetShards := -1 @@ -575,10 +578,7 @@ func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra for vid, locations := range vidLocations { // see the volume's shards are in how many racks, and how many in each rack - rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) { - shardBits := findEcVolumeShards(ecNode, vid) - return string(ecNode.rack), shardBits.ShardIdCount() - }) + rackToShardCount := countShardsByRack(vid, locations) rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string { return string(ecNode.rack) }) @@ -711,37 +711,75 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool return nil } -func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { +func pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcNode int) (*EcNode, error) { + if existingLocation == nil { + return nil, fmt.Errorf("INTERNAL: missing source nodes") + } + if len(possibleDestinations) == 0 { + return nil, fmt.Errorf("INTERNAL: missing destination nodes") + } - sortEcNodesByFreeslotsDescending(possibleDestinationEcNodes) - skipReason := "" - for _, destEcNode := range possibleDestinationEcNodes { + nodeShards := map[*EcNode]int{} + for _, node := range possibleDestinations { + nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount() + } - if destEcNode.info.Id == existingLocation.info.Id { - continue + targets := []*EcNode{} + targetShards := -1 + for _, shards := range nodeShards { + if shards > targetShards { + targetShards = shards } + } - if destEcNode.freeEcSlot <= 0 { - skipReason += fmt.Sprintf(" Skipping %s because it has no free slots\n", destEcNode.info.Id) + details := "" + for _, node := range possibleDestinations { + if node.info.Id == existingLocation.info.Id { continue } - if findEcVolumeShards(destEcNode, vid).ShardIdCount() >= averageShardsPerEcNode { - skipReason += fmt.Sprintf(" Skipping %s because it %d >= avernageShards (%d)\n", - destEcNode.info.Id, findEcVolumeShards(destEcNode, vid).ShardIdCount(), averageShardsPerEcNode) + if node.freeEcSlot <= 0 { + details += fmt.Sprintf(" Skipped %s because it has no free slots\n", node.info.Id) continue } - fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id) + shards := nodeShards[node] + if replicaPlacement != nil && shards >= replicaPlacement.SameRackCount { + details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, replicaPlacement.SameRackCount) + continue + } + if shards >= averageShardsPerEcNode { + details += fmt.Sprintf(" Skipped %s because shards %d >= averageShards (%d)\n", + node.info.Id, shards, averageShardsPerEcNode) + continue + } - err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing) - if err != nil { - return err + if shards < targetShards { + // Favor nodes with less shards, to ensure an uniform distribution. + targets = nil + targetShards = shards } + if shards == targetShards { + targets = append(targets, node) + } + } + if len(targets) == 0 { + return nil, errors.New(details) + } + return targets[rand.IntN(len(targets))], nil +} + +// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards. +func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { + // TODO: consider volume replica info when balancing nodes + destNode, err := pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes, nil, averageShardsPerEcNode) + if err != nil { + fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error()) return nil } - fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, skipReason) - return nil + + fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) + return moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destNode, applyBalancing) } func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode { diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go index bdce47bf8..5b4983413 100644 --- a/weed/shell/command_ec_common_test.go +++ b/weed/shell/command_ec_common_test.go @@ -2,6 +2,7 @@ package shell import ( "fmt" + "strings" "testing" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -16,6 +17,22 @@ var ( topologyEc = parseOutput(topoDataEc) ) +func errorCheck(got error, want string) error { + if got == nil && want == "" { + return nil + } + if got != nil && want == "" { + return fmt.Errorf("expected no error, got %q", got.Error()) + } + if got == nil && want != "" { + return fmt.Errorf("got no error, expected %q", want) + } + if !strings.Contains(got.Error(), want) { + return fmt.Errorf("expected error %q, got %q", want, got.Error()) + } + return nil +} + func TestEcDistribution(t *testing.T) { // find out all volume servers with one slot left. @@ -59,20 +76,10 @@ func TestVolumeIdToReplicaPlacement(t *testing.T) { ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") got, gotErr := volumeIdToReplicaPlacement(vid, ecNodes) - if tc.wantErr == "" && gotErr != nil { - t.Errorf("expected no error for volume %q, got %q", tc.vid, gotErr.Error()) + if err := errorCheck(gotErr, tc.wantErr); err != nil { + t.Errorf("volume %q: %s", tc.vid, err.Error()) continue } - if tc.wantErr != "" { - if gotErr == nil { - t.Errorf("got no error for volume %q, expected %q", tc.vid, tc.wantErr) - continue - } - if gotErr.Error() != tc.wantErr { - t.Errorf("expected error %q for volume %q, got %q", tc.wantErr, tc.vid, gotErr.Error()) - continue - } - } if got == nil { if tc.want != "" { @@ -131,3 +138,84 @@ func TestPickRackToBalanceShardsInto(t *testing.T) { } } } +func TestPickEcNodeToBalanceShardsInto(t *testing.T) { + testCases := []struct { + topology *master_pb.TopologyInfo + nodeId string + vid string + wantOneOf []string + wantErr string + }{ + {topologyEc, "", "", nil, "INTERNAL: missing source nodes"}, + {topologyEc, "idontexist", "12737", nil, "INTERNAL: missing source nodes"}, + // Non-EC nodes. We don't care about these, but the function should return all available target nodes as a safeguard. + { + topologyEc, "172.19.0.10:8702", "6225", []string{ + "172.19.0.13:8701", "172.19.0.14:8711", "172.19.0.16:8704", "172.19.0.17:8703", + "172.19.0.19:8700", "172.19.0.20:8706", "172.19.0.21:8710", "172.19.0.3:8708", + "172.19.0.4:8707", "172.19.0.5:8705", "172.19.0.6:8713", "172.19.0.8:8709", + "172.19.0.9:8712"}, + "", + }, + { + topologyEc, "172.19.0.8:8709", "6226", []string{ + "172.19.0.10:8702", "172.19.0.13:8701", "172.19.0.14:8711", "172.19.0.16:8704", + "172.19.0.17:8703", "172.19.0.19:8700", "172.19.0.20:8706", "172.19.0.21:8710", + "172.19.0.3:8708", "172.19.0.4:8707", "172.19.0.5:8705", "172.19.0.6:8713", + "172.19.0.9:8712"}, + "", + }, + // EC volumes. + {topologyEc, "172.19.0.10:8702", "14322", []string{ + "172.19.0.14:8711", "172.19.0.5:8705", "172.19.0.6:8713"}, + ""}, + {topologyEc, "172.19.0.13:8701", "10457", []string{ + "172.19.0.10:8702", "172.19.0.6:8713"}, + ""}, + {topologyEc, "172.19.0.17:8703", "12737", []string{ + "172.19.0.13:8701"}, + ""}, + {topologyEc, "172.19.0.20:8706", "14322", []string{ + "172.19.0.14:8711", "172.19.0.5:8705", "172.19.0.6:8713"}, + ""}, + } + + for _, tc := range testCases { + vid, _ := needle.NewVolumeId(tc.vid) + allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + + // Resolve target node by name + var ecNode *EcNode + for _, n := range allEcNodes { + if n.info.Id == tc.nodeId { + ecNode = n + break + } + } + + averageShardsPerEcNode := 5 + got, gotErr := pickEcNodeToBalanceShardsInto(vid, ecNode, allEcNodes, nil, averageShardsPerEcNode) + if err := errorCheck(gotErr, tc.wantErr); err != nil { + t.Errorf("node %q, volume %q: %s", tc.nodeId, tc.vid, err.Error()) + continue + } + + if got == nil { + if len(tc.wantOneOf) == 0 { + continue + } + t.Errorf("node %q, volume %q: got no node, want %q", tc.nodeId, tc.vid, tc.wantOneOf) + continue + } + found := false + for _, want := range tc.wantOneOf { + if got := got.info.Id; got == want { + found = true + break + } + } + if !(found) { + t.Errorf("expected one of %v for volume %q, got %q", tc.wantOneOf, tc.vid, got.info.Id) + } + } +}