diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 906e2e0dc..7510a5277 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -39,6 +39,8 @@ type EcRack struct { freeEcSlot int } +// TODO: We're shuffling way too many parameters between internal functions. Encapsulate in a ecBalancer{} struct. + var ( // Overridable functions for testing. getDefaultReplicaPlacement = _getDefaultReplicaPlacement @@ -434,7 +436,7 @@ func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack { return racks } -func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { +func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, rp *super_block.ReplicaPlacement, applyBalancing bool) error { fmt.Printf("balanceEcVolumes %s\n", collection) @@ -442,7 +444,7 @@ func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*E return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err) } - if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil { + if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, rp, applyBalancing); err != nil { return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err) } @@ -499,12 +501,12 @@ func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle return nil } -func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error { +func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, rp *super_block.ReplicaPlacement, applyBalancing bool) error { // collect vid => []ecNode, since previous steps can change the locations vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection) // spread the ec shards evenly for vid, locations := range vidLocations { - if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, applyBalancing); err != nil { + if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, rp, applyBalancing); err != nil { return err } } @@ -519,7 +521,7 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int } // TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards. -func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { +func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, rp *super_block.ReplicaPlacement, applyBalancing bool) error { // calculate average number of shards an ec rack should have for one volume averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) @@ -543,7 +545,7 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid for shardId, ecNode := range ecShardsToMove { // TODO: consider volume replica info when balancing racks - rackId, err := pickRackToBalanceShardsInto(racks, rackToShardCount, nil, averageShardsPerEcRack) + rackId, err := pickRackToBalanceShardsInto(racks, rackToShardCount, rp, averageShardsPerEcRack) if err != nil { fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error()) continue @@ -912,7 +914,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic racks := collectRacks(allEcNodes) for _, c := range collections { - if err = balanceEcVolumes(commandEnv, c, allEcNodes, racks, applyBalancing); err != nil { + if err = balanceEcVolumes(commandEnv, c, allEcNodes, racks, ecReplicaPlacement, applyBalancing); err != nil { return err } } diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go index 76609c89d..29d7c2d4b 100644 --- a/weed/shell/command_ec_common_test.go +++ b/weed/shell/command_ec_common_test.go @@ -139,35 +139,40 @@ func TestVolumeIdToReplicaPlacement(t *testing.T) { func TestPickRackToBalanceShardsInto(t *testing.T) { testCases := []struct { - topology *master_pb.TopologyInfo - vid string - wantOneOf []string + topology *master_pb.TopologyInfo + vid string + replicaPlacement string + wantOneOf []string + wantErr string }{ // Non-EC volumes. We don't care about these, but the function should return all racks as a safeguard. - {topologyEc, "", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}}, - {topologyEc, "6225", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}}, - {topologyEc, "6226", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}}, - {topologyEc, "6241", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}}, - {topologyEc, "6242", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}}, + {topologyEc, "", "123", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}, ""}, + {topologyEc, "6225", "123", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}, ""}, + {topologyEc, "6226", "123", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}, ""}, + {topologyEc, "6241", "123", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}, ""}, + {topologyEc, "6242", "123", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}, ""}, // EC volumes. - {topologyEc, "9577", []string{"rack1", "rack2", "rack3"}}, - {topologyEc, "10457", []string{"rack1"}}, - {topologyEc, "12737", []string{"rack2"}}, - {topologyEc, "14322", []string{"rack3"}}, + {topologyEc, "9577", "", nil, "shards 1 >= replica placement limit for other racks (0)"}, + {topologyEc, "9577", "111", nil, "shards 1 >= replica placement limit for other racks (1)"}, + {topologyEc, "9577", "222", []string{"rack1", "rack2", "rack3"}, ""}, + {topologyEc, "10457", "222", []string{"rack1"}, ""}, + {topologyEc, "12737", "222", []string{"rack2"}, ""}, + {topologyEc, "14322", "222", []string{"rack3"}, ""}, } for _, tc := range testCases { vid, _ := needle.NewVolumeId(tc.vid) ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") racks := collectRacks(ecNodes) + rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement) locations := ecNodes rackToShardCount := countShardsByRack(vid, locations) averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) - got, gotErr := pickRackToBalanceShardsInto(racks, rackToShardCount, nil, averageShardsPerEcRack) - if gotErr != nil { - t.Errorf("volume %q: %s", tc.vid, gotErr.Error()) + got, gotErr := pickRackToBalanceShardsInto(racks, rackToShardCount, rp, averageShardsPerEcRack) + if err := errorCheck(gotErr, tc.wantErr); err != nil { + t.Errorf("volume %q: %s", tc.vid, err.Error()) continue } diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index ef9461ef0..50e06ba6c 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -28,7 +28,7 @@ func TestCommandEcBalanceSmall(t *testing.T) { } racks := collectRacks(allEcNodes) - balanceEcVolumes(nil, "c1", allEcNodes, racks, false) + balanceEcVolumes(nil, "c1", allEcNodes, racks, nil, false) } func TestCommandEcBalanceNothingToMove(t *testing.T) { @@ -43,7 +43,7 @@ func TestCommandEcBalanceNothingToMove(t *testing.T) { } racks := collectRacks(allEcNodes) - balanceEcVolumes(nil, "c1", allEcNodes, racks, false) + balanceEcVolumes(nil, "c1", allEcNodes, racks, nil, false) } func TestCommandEcBalanceAddNewServers(t *testing.T) { @@ -60,7 +60,7 @@ func TestCommandEcBalanceAddNewServers(t *testing.T) { } racks := collectRacks(allEcNodes) - balanceEcVolumes(nil, "c1", allEcNodes, racks, false) + balanceEcVolumes(nil, "c1", allEcNodes, racks, nil, false) } func TestCommandEcBalanceAddNewRacks(t *testing.T) { @@ -77,7 +77,7 @@ func TestCommandEcBalanceAddNewRacks(t *testing.T) { } racks := collectRacks(allEcNodes) - balanceEcVolumes(nil, "c1", allEcNodes, racks, false) + balanceEcVolumes(nil, "c1", allEcNodes, racks, nil, false) } func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) { @@ -119,7 +119,7 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) { } racks := collectRacks(allEcNodes) - balanceEcVolumes(nil, "c1", allEcNodes, racks, false) + balanceEcVolumes(nil, "c1", allEcNodes, racks, nil, false) balanceEcRacks(nil, racks, false) }