Browse Source

Unify the re-balancing logic for `ec.encode` with `ec.balance`. (#6339)

Among others, this enables recent changes related to topology aware
re-balancing at EC encoding time.
pull/6341/head
Lisandro Pin 2 weeks ago
committed by GitHub
parent
commit
8c82c037b9
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 68
      weed/shell/command_ec_balance.go
  2. 126
      weed/shell/command_ec_common.go
  3. 1
      weed/shell/command_ec_common_test.go
  4. 20
      weed/shell/command_ec_decode.go
  5. 67
      weed/shell/command_ec_encode.go
  6. 4
      weed/shell/command_ec_rebuild.go

68
weed/shell/command_ec_balance.go

@ -24,73 +24,7 @@ func (c *commandEcBalance) Help() string {
ec.balance [-c EACH_COLLECTION|<collection_name>] [-force] [-dataCenter <data_center>] [-shardReplicaPlacement <replica_placement>] ec.balance [-c EACH_COLLECTION|<collection_name>] [-force] [-dataCenter <data_center>] [-shardReplicaPlacement <replica_placement>]
Algorithm: Algorithm:
func EcBalance() {
for each collection:
balanceEcVolumes(collectionName)
for each rack:
balanceEcRack(rack)
}
func balanceEcVolumes(collectionName){
for each volume:
doDeduplicateEcShards(volumeId)
tracks rack~shardCount mapping
for each volume:
doBalanceEcShardsAcrossRacks(volumeId)
for each volume:
doBalanceEcShardsWithinRacks(volumeId)
}
// spread ec shards into more racks
func doBalanceEcShardsAcrossRacks(volumeId){
tracks rack~volumeIdShardCount mapping
averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
for each ecShardsToMove {
destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
destVolumeServers = volume servers on the destRack
pickOneEcNodeAndMoveOneShard(destVolumeServers)
}
}
func doBalanceEcShardsWithinRacks(volumeId){
racks = collect all racks that the volume id is on
for rack, shards := range racks
doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
}
// move ec shards
func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
tracks volumeServer~volumeIdShardCount mapping
averageShardCount = len(shards) / numVolumeServers
volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
ecShardsToMove = select overflown ec shards from volumeServersOverAverage
for each ecShardsToMove {
destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
pickOneEcNodeAndMoveOneShard(destVolumeServers)
}
}
// move ec shards while keeping shard distribution for the same volume unchanged or more even
func balanceEcRack(rack){
averageShardCount = total shards / numVolumeServers
for hasMovedOneEcShard {
sort all volume servers ordered by the number of local ec shards
pick the volume server A with the lowest number of ec shards x
pick the volume server B with the highest number of ec shards y
if y > averageShardCount and x +1 <= averageShardCount {
if B has a ec shard with volume id v that A does not have {
move one ec shard v from B to A
hasMovedOneEcShard = true
}
}
}
}
`
` + ecBalanceAlgorithmDescription
} }
func (c *commandEcBalance) HasTag(CommandTag) bool { func (c *commandEcBalance) HasTag(CommandTag) bool {

126
weed/shell/command_ec_common.go

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/rand/v2" "math/rand/v2"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
@ -40,6 +41,72 @@ type EcRack struct {
} }
var ( var (
ecBalanceAlgorithmDescription = `
func EcBalance() {
for each collection:
balanceEcVolumes(collectionName)
for each rack:
balanceEcRack(rack)
}
func balanceEcVolumes(collectionName){
for each volume:
doDeduplicateEcShards(volumeId)
tracks rack~shardCount mapping
for each volume:
doBalanceEcShardsAcrossRacks(volumeId)
for each volume:
doBalanceEcShardsWithinRacks(volumeId)
}
// spread ec shards into more racks
func doBalanceEcShardsAcrossRacks(volumeId){
tracks rack~volumeIdShardCount mapping
averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
for each ecShardsToMove {
destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
destVolumeServers = volume servers on the destRack
pickOneEcNodeAndMoveOneShard(destVolumeServers)
}
}
func doBalanceEcShardsWithinRacks(volumeId){
racks = collect all racks that the volume id is on
for rack, shards := range racks
doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
}
// move ec shards
func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
tracks volumeServer~volumeIdShardCount mapping
averageShardCount = len(shards) / numVolumeServers
volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
ecShardsToMove = select overflown ec shards from volumeServersOverAverage
for each ecShardsToMove {
destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
pickOneEcNodeAndMoveOneShard(destVolumeServers)
}
}
// move ec shards while keeping shard distribution for the same volume unchanged or more even
func balanceEcRack(rack){
averageShardCount = total shards / numVolumeServers
for hasMovedOneEcShard {
sort all volume servers ordered by the number of local ec shards
pick the volume server A with the lowest number of ec shards x
pick the volume server B with the highest number of ec shards y
if y > averageShardCount and x +1 <= averageShardCount {
if B has a ec shard with volume id v that A does not have {
move one ec shard v from B to A
hasMovedOneEcShard = true
}
}
}
}
`
// Overridable functions for testing. // Overridable functions for testing.
getDefaultReplicaPlacement = _getDefaultReplicaPlacement getDefaultReplicaPlacement = _getDefaultReplicaPlacement
) )
@ -58,6 +125,7 @@ func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPl
return super_block.NewReplicaPlacementFromString(resp.DefaultReplication) return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
} }
func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) { func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
if replicaStr != "" { if replicaStr != "" {
rp, err := super_block.NewReplicaPlacementFromString(replicaStr) rp, err := super_block.NewReplicaPlacementFromString(replicaStr)
@ -75,6 +143,45 @@ func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super
return rp, err return rp, err
} }
func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
if delayBeforeCollecting > 0 {
time.Sleep(delayBeforeCollecting)
}
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
return
}
return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
}
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return
}
// find out all volume servers with one slot left.
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
sortEcNodesByFreeslotsDescending(ecNodes)
return
}
func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
return collectEcNodesForDC(commandEnv, "")
}
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
if !commandEnv.isLocked() { if !commandEnv.isLocked() {
@ -243,22 +350,6 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int {
return 0 return 0
} }
func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return
}
// find out all volume servers with one slot left.
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
sortEcNodesByFreeslotsDescending(ecNodes)
return
}
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) { func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if selectedDataCenter != "" && selectedDataCenter != string(dc) { if selectedDataCenter != "" && selectedDataCenter != string(dc) {
@ -523,7 +614,6 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int
}) })
} }
// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards.
func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error { func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error {
racks := ecb.racks() racks := ecb.racks()
@ -876,7 +966,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
} }
// collect all ec nodes // collect all ec nodes
allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, dc)
allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
if err != nil { if err != nil {
return err return err
} }

1
weed/shell/command_ec_common_test.go

@ -32,6 +32,7 @@ func errorCheck(got error, want string) error {
} }
return nil return nil
} }
func TestParseReplicaPlacementArg(t *testing.T) { func TestParseReplicaPlacementArg(t *testing.T) {
getDefaultReplicaPlacementOrig := getDefaultReplicaPlacement getDefaultReplicaPlacementOrig := getDefaultReplicaPlacement
getDefaultReplicaPlacement = func(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) { getDefaultReplicaPlacement = func(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {

20
weed/shell/command_ec_decode.go

@ -5,7 +5,6 @@ import (
"flag" "flag"
"fmt" "fmt"
"io" "io"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
@ -241,25 +240,6 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati
return resp.VolumeIdLocations, nil return resp.VolumeIdLocations, nil
} }
func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
if delayBeforeCollecting > 0 {
time.Sleep(delayBeforeCollecting)
}
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
return
}
return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
}
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) { func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
vidMap := make(map[uint32]bool) vidMap := make(map[uint32]bool)

67
weed/shell/command_ec_encode.go

@ -42,7 +42,7 @@ func (c *commandEcEncode) Help() string {
This command will: This command will:
1. freeze one volume 1. freeze one volume
2. apply erasure coding to the volume 2. apply erasure coding to the volume
3. move the encoded shards to multiple volume servers
3. (optionally) re-balance encoded shards across multiple volume servers
The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
to lose 4 volume servers. to lose 4 volume servers.
@ -53,7 +53,8 @@ func (c *commandEcEncode) Help() string {
If you only have less than 4 volume servers, with erasure coding, at least you can afford to If you only have less than 4 volume servers, with erasure coding, at least you can afford to
have 4 corrupted shard files. have 4 corrupted shard files.
`
Re-balancing algorithm:
` + ecBalanceAlgorithmDescription
} }
func (c *commandEcEncode) HasTag(CommandTag) bool { func (c *commandEcEncode) HasTag(CommandTag) bool {
@ -67,15 +68,22 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
collection := encodeCommand.String("collection", "", "the collection name") collection := encodeCommand.String("collection", "", "the collection name")
fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period") quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
parallelCopy := encodeCommand.Bool("parallelCopy", true, "copy shards in parallel")
// TODO: Add concurrency support to EcBalance and reenable this switch?
//parallelCopy := encodeCommand.Bool("parallelCopy", true, "copy shards in parallel")
forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes") forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation")
if err = encodeCommand.Parse(args); err != nil { if err = encodeCommand.Parse(args); err != nil {
return nil return nil
} }
if err = commandEnv.confirmIsLocked(args); err != nil { if err = commandEnv.confirmIsLocked(args); err != nil {
return return
} }
rp, err := parseReplicaPlacementArg(commandEnv, *shardReplicaPlacement)
if err != nil {
return err
}
// collect topology information // collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
@ -94,29 +102,44 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
} }
} }
vid := needle.VolumeId(*volumeId)
var volumeIds []needle.VolumeId
if vid := needle.VolumeId(*volumeId); vid != 0 {
// volumeId is provided // volumeId is provided
if vid != 0 {
return doEcEncode(commandEnv, *collection, vid, *parallelCopy)
volumeIds = append(volumeIds, vid)
} else {
// apply to all volumes in the collection
volumeIds, err = collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
} }
// apply to all volumes in the collection
volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
var collections []string
if *collection != "" {
collections = []string{*collection}
} else {
// TODO: should we limit this to collections associated with the provided volume ID?
collections, err = ListCollectionNames(commandEnv, false, true)
if err != nil { if err != nil {
return err return err
} }
fmt.Printf("ec encode volumes: %v\n", volumeIds)
}
// encode all requested volumes...
for _, vid := range volumeIds { for _, vid := range volumeIds {
if err = doEcEncode(commandEnv, *collection, vid, *parallelCopy); err != nil {
return err
if err = doEcEncode(commandEnv, *collection, vid); err != nil {
return fmt.Errorf("ec encode for volume %d: %v", vid, err)
}
} }
// ...then re-balance ec shards.
if err := EcBalance(commandEnv, collections, "", rp, *applyBalancing); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", collections, err)
} }
return nil return nil
} }
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) {
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) error {
if !commandEnv.isLocked() { if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost") return fmt.Errorf("lock is lost")
} }
@ -130,23 +153,15 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
// fmt.Printf("found ec %d shards on %v\n", vid, locations) // fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly // mark the volume as readonly
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false)
if err != nil {
if err := markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
} }
// generate ec shards // generate ec shards
err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].ServerAddress())
if err != nil {
if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].ServerAddress()); err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
} }
// balance the ec shards to current cluster
err = spreadEcShards(commandEnv, vid, collection, locations, parallelCopy)
if err != nil {
return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
}
return nil return nil
} }
@ -166,9 +181,10 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId,
} }
// TODO: delete this (now unused) shard spread logic.
func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location, parallelCopy bool) (err error) { func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location, parallelCopy bool) (err error) {
allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "")
allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv)
if err != nil { if err != nil {
return err return err
} }
@ -296,7 +312,6 @@ func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
} }
func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
// collect topology information // collect topology information
topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0) topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0)
if err != nil { if err != nil {

4
weed/shell/command_ec_rebuild.go

@ -4,10 +4,10 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io" "io"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
@ -74,7 +74,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
} }
// collect all ec nodes // collect all ec nodes
allEcNodes, _, err := collectEcNodes(commandEnv, "")
allEcNodes, _, err := collectEcNodes(commandEnv)
if err != nil { if err != nil {
return err return err
} }

Loading…
Cancel
Save