diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 043c70366..81d214fcc 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -50,7 +50,7 @@ func (c *commandEcBalance) Help() string { averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack for each ecShardsToMove { - destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, averageShardsPerEcRack, ecShardReplicaPlacement) + destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement) destVolumeServers = volume servers on the destRack pickOneEcNodeAndMoveOneShard(destVolumeServers) } @@ -69,7 +69,7 @@ func (c *commandEcBalance) Help() string { volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack ecShardsToMove = select overflown ec shards from volumeServersOverAverage for each ecShardsToMove { - destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, averageShardCount, ecShardReplicaPlacement) + destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement) pickOneEcNodeAndMoveOneShard(destVolumeServers) } } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index e91656931..625674bfd 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -549,7 +549,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl } for shardId, ecNode := range ecShardsToMove { - rackId, err := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount, averageShardsPerEcRack) + rackId, err := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount) if err != nil { fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error()) continue @@ -559,7 +559,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl for _, n := range racks[rackId].ecNodes { possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) } - err = ecb.pickOneEcNodeAndMoveOneShard(averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes) + err = ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes) if err != nil { return err } @@ -572,7 +572,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl return nil } -func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) (RackId, error) { +func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int) (RackId, error) { targets := []RackId{} targetShards := -1 for _, shards := range rackToShardCount { @@ -593,10 +593,7 @@ func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcR details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for other racks (%d)\n", rackId, shards, ecb.replicaPlacement.DiffRackCount) continue } - if shards >= averageShardsPerEcRack { - details += fmt.Sprintf(" Skipped %s because shards %d >= averageShards (%d)\n", rackId, shards, averageShardsPerEcRack) - continue - } + if shards < targetShards { // Favor racks with less shards, to ensure an uniform distribution. targets = nil @@ -659,7 +656,7 @@ func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId) - err := ecb.pickOneEcNodeAndMoveOneShard(averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes) + err := ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes) if err != nil { return err } @@ -752,7 +749,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { return nil } -func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode, averageShardsPerEcNode int) (*EcNode, error) { +func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, error) { if existingLocation == nil { return nil, fmt.Errorf("INTERNAL: missing source nodes") } @@ -788,11 +785,6 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, ecb.replicaPlacement.SameRackCount) continue } - if shards >= averageShardsPerEcNode { - details += fmt.Sprintf(" Skipped %s because shards %d >= averageShards (%d)\n", - node.info.Id, shards, averageShardsPerEcNode) - continue - } if shards < targetShards { // Favor nodes with less shards, to ensure an uniform distribution. @@ -810,9 +802,8 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi return targets[rand.IntN(len(targets))], nil } -// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards. -func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error { - destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes, averageShardsPerEcNode) +func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error { + destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes) if err != nil { fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error()) return nil @@ -879,26 +870,6 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo return vidLocations } -// TODO: Unused, delete me. -func (ecb *ecBalancer) volumeIdToReplicaPlacement(vid needle.VolumeId) (*super_block.ReplicaPlacement, error) { - for _, ecNode := range ecb.ecNodes { - for _, diskInfo := range ecNode.info.DiskInfos { - for _, volumeInfo := range diskInfo.VolumeInfos { - if needle.VolumeId(volumeInfo.Id) == vid { - return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) - } - } - for _, ecShardInfo := range diskInfo.EcShardInfos { - if needle.VolumeId(ecShardInfo.Id) == vid { - return ecb.replicaPlacement, nil - } - } - } - } - - return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid) -} - func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, applyBalancing bool) (err error) { if len(collections) == 0 { return fmt.Errorf("no collections to balance") diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go index b5ea2efa8..963706894 100644 --- a/weed/shell/command_ec_common_test.go +++ b/weed/shell/command_ec_common_test.go @@ -88,60 +88,6 @@ func TestEcDistribution(t *testing.T) { } } -func TestVolumeIdToReplicaPlacement(t *testing.T) { - ecReplicaPlacement, _ := super_block.NewReplicaPlacementFromString("123") - - testCases := []struct { - topology *master_pb.TopologyInfo - vid string - want string - wantErr string - }{ - {topology1, "", "", "failed to resolve replica placement"}, - {topology1, "0", "", "failed to resolve replica placement"}, - {topology1, "1", "100", ""}, - {topology1, "296", "100", ""}, - {topology2, "", "", "failed to resolve replica placement"}, - {topology2, "19012", "", "failed to resolve replica placement"}, - {topology2, "6271", "002", ""}, - {topology2, "17932", "002", ""}, - {topologyEc, "", "", "failed to resolve replica placement"}, - {topologyEc, "0", "", "failed to resolve replica placement"}, - {topologyEc, "6225", "002", ""}, - {topologyEc, "6241", "002", ""}, - {topologyEc, "9577", "123", ""}, // EC volume - {topologyEc, "12737", "123", ""}, // EC volume - } - - for _, tc := range testCases { - vid, _ := needle.NewVolumeId(tc.vid) - ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") - - ecb := ecBalancer{ - ecNodes: ecNodes, - replicaPlacement: ecReplicaPlacement, - } - - got, gotErr := ecb.volumeIdToReplicaPlacement(vid) - - if err := errorCheck(gotErr, tc.wantErr); err != nil { - t.Errorf("volume %q: %s", tc.vid, err.Error()) - continue - } - - if got == nil { - if tc.want != "" { - t.Errorf("expected replica placement %q for volume %q, got nil", tc.want, tc.vid) - } - continue - } - want, _ := super_block.NewReplicaPlacementFromString(tc.want) - if !got.Equals(want) { - t.Errorf("got replica placement %q for volune %q, want %q", got.String(), tc.vid, want.String()) - } - } -} - func TestPickRackToBalanceShardsInto(t *testing.T) { testCases := []struct { topology *master_pb.TopologyInfo @@ -177,9 +123,8 @@ func TestPickRackToBalanceShardsInto(t *testing.T) { racks := ecb.racks() rackToShardCount := countShardsByRack(vid, ecNodes) - averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) - got, gotErr := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount, averageShardsPerEcRack) + got, gotErr := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount) if err := errorCheck(gotErr, tc.wantErr); err != nil { t.Errorf("volume %q: %s", tc.vid, err.Error()) continue @@ -259,8 +204,7 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) { } } - averageShardsPerEcNode := 5 - got, gotErr := ecb.pickEcNodeToBalanceShardsInto(vid, ecNode, allEcNodes, averageShardsPerEcNode) + got, gotErr := ecb.pickEcNodeToBalanceShardsInto(vid, ecNode, allEcNodes) if err := errorCheck(gotErr, tc.wantErr); err != nil { t.Errorf("node %q, volume %q: %s", tc.nodeId, tc.vid, err.Error()) continue