@ -549,7 +549,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl
}
}
for shardId , ecNode := range ecShardsToMove {
for shardId , ecNode := range ecShardsToMove {
rackId , err := ecb . pickRackToBalanceShardsInto ( racks , rackToShardCount , averageShardsPerEcRack )
rackId , err := ecb . pickRackToBalanceShardsInto ( racks , rackToShardCount )
if err != nil {
if err != nil {
fmt . Printf ( "ec shard %d.%d at %s can not find a destination rack:\n%s\n" , vid , shardId , ecNode . info . Id , err . Error ( ) )
fmt . Printf ( "ec shard %d.%d at %s can not find a destination rack:\n%s\n" , vid , shardId , ecNode . info . Id , err . Error ( ) )
continue
continue
@ -559,7 +559,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl
for _ , n := range racks [ rackId ] . ecNodes {
for _ , n := range racks [ rackId ] . ecNodes {
possibleDestinationEcNodes = append ( possibleDestinationEcNodes , n )
possibleDestinationEcNodes = append ( possibleDestinationEcNodes , n )
}
}
err = ecb . pickOneEcNodeAndMoveOneShard ( averageShardsPerEcRack , ecNode , collection , vid , shardId , possibleDestinationEcNodes )
err = ecb . pickOneEcNodeAndMoveOneShard ( ecNode , collection , vid , shardId , possibleDestinationEcNodes )
if err != nil {
if err != nil {
return err
return err
}
}
@ -572,7 +572,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl
return nil
return nil
}
}
func ( ecb * ecBalancer ) pickRackToBalanceShardsInto ( rackToEcNodes map [ RackId ] * EcRack , rackToShardCount map [ string ] int , averageShardsPerEcRack int ) ( RackId , error ) {
func ( ecb * ecBalancer ) pickRackToBalanceShardsInto ( rackToEcNodes map [ RackId ] * EcRack , rackToShardCount map [ string ] int ) ( RackId , error ) {
targets := [ ] RackId { }
targets := [ ] RackId { }
targetShards := - 1
targetShards := - 1
for _ , shards := range rackToShardCount {
for _ , shards := range rackToShardCount {
@ -593,10 +593,7 @@ func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcR
details += fmt . Sprintf ( " Skipped %s because shards %d >= replica placement limit for other racks (%d)\n" , rackId , shards , ecb . replicaPlacement . DiffRackCount )
details += fmt . Sprintf ( " Skipped %s because shards %d >= replica placement limit for other racks (%d)\n" , rackId , shards , ecb . replicaPlacement . DiffRackCount )
continue
continue
}
}
if shards >= averageShardsPerEcRack {
details += fmt . Sprintf ( " Skipped %s because shards %d >= averageShards (%d)\n" , rackId , shards , averageShardsPerEcRack )
continue
}
if shards < targetShards {
if shards < targetShards {
// Favor racks with less shards, to ensure an uniform distribution.
// Favor racks with less shards, to ensure an uniform distribution.
targets = nil
targets = nil
@ -659,7 +656,7 @@ func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int
fmt . Printf ( "%s has %d overlimit, moving ec shard %d.%d\n" , ecNode . info . Id , overLimitCount , vid , shardId )
fmt . Printf ( "%s has %d overlimit, moving ec shard %d.%d\n" , ecNode . info . Id , overLimitCount , vid , shardId )
err := ecb . pickOneEcNodeAndMoveOneShard ( averageShardsPerEcNode , ecNode , collection , vid , shardId , possibleDestinationEcNodes )
err := ecb . pickOneEcNodeAndMoveOneShard ( ecNode , collection , vid , shardId , possibleDestinationEcNodes )
if err != nil {
if err != nil {
return err
return err
}
}
@ -752,7 +749,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
return nil
return nil
}
}
func ( ecb * ecBalancer ) pickEcNodeToBalanceShardsInto ( vid needle . VolumeId , existingLocation * EcNode , possibleDestinations [ ] * EcNode , averageShardsPerEcNode int ) ( * EcNode , error ) {
func ( ecb * ecBalancer ) pickEcNodeToBalanceShardsInto ( vid needle . VolumeId , existingLocation * EcNode , possibleDestinations [ ] * EcNode ) ( * EcNode , error ) {
if existingLocation == nil {
if existingLocation == nil {
return nil , fmt . Errorf ( "INTERNAL: missing source nodes" )
return nil , fmt . Errorf ( "INTERNAL: missing source nodes" )
}
}
@ -788,11 +785,6 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
details += fmt . Sprintf ( " Skipped %s because shards %d >= replica placement limit for the rack (%d)\n" , node . info . Id , shards , ecb . replicaPlacement . SameRackCount )
details += fmt . Sprintf ( " Skipped %s because shards %d >= replica placement limit for the rack (%d)\n" , node . info . Id , shards , ecb . replicaPlacement . SameRackCount )
continue
continue
}
}
if shards >= averageShardsPerEcNode {
details += fmt . Sprintf ( " Skipped %s because shards %d >= averageShards (%d)\n" ,
node . info . Id , shards , averageShardsPerEcNode )
continue
}
if shards < targetShards {
if shards < targetShards {
// Favor nodes with less shards, to ensure an uniform distribution.
// Favor nodes with less shards, to ensure an uniform distribution.
@ -810,9 +802,8 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
return targets [ rand . IntN ( len ( targets ) ) ] , nil
return targets [ rand . IntN ( len ( targets ) ) ] , nil
}
}
// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards.
func ( ecb * ecBalancer ) pickOneEcNodeAndMoveOneShard ( averageShardsPerEcNode int , existingLocation * EcNode , collection string , vid needle . VolumeId , shardId erasure_coding . ShardId , possibleDestinationEcNodes [ ] * EcNode ) error {
destNode , err := ecb . pickEcNodeToBalanceShardsInto ( vid , existingLocation , possibleDestinationEcNodes , averageShardsPerEcNode )
func ( ecb * ecBalancer ) pickOneEcNodeAndMoveOneShard ( existingLocation * EcNode , collection string , vid needle . VolumeId , shardId erasure_coding . ShardId , possibleDestinationEcNodes [ ] * EcNode ) error {
destNode , err := ecb . pickEcNodeToBalanceShardsInto ( vid , existingLocation , possibleDestinationEcNodes )
if err != nil {
if err != nil {
fmt . Printf ( "WARNING: Could not find suitable taget node for %d.%d:\n%s" , vid , shardId , err . Error ( ) )
fmt . Printf ( "WARNING: Could not find suitable taget node for %d.%d:\n%s" , vid , shardId , err . Error ( ) )
return nil
return nil
@ -879,26 +870,6 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo
return vidLocations
return vidLocations
}
}
// TODO: Unused, delete me.
func ( ecb * ecBalancer ) volumeIdToReplicaPlacement ( vid needle . VolumeId ) ( * super_block . ReplicaPlacement , error ) {
for _ , ecNode := range ecb . ecNodes {
for _ , diskInfo := range ecNode . info . DiskInfos {
for _ , volumeInfo := range diskInfo . VolumeInfos {
if needle . VolumeId ( volumeInfo . Id ) == vid {
return super_block . NewReplicaPlacementFromByte ( byte ( volumeInfo . ReplicaPlacement ) )
}
}
for _ , ecShardInfo := range diskInfo . EcShardInfos {
if needle . VolumeId ( ecShardInfo . Id ) == vid {
return ecb . replicaPlacement , nil
}
}
}
}
return nil , fmt . Errorf ( "failed to resolve replica placement for volume ID %d" , vid )
}
func EcBalance ( commandEnv * CommandEnv , collections [ ] string , dc string , ecReplicaPlacement * super_block . ReplicaPlacement , applyBalancing bool ) ( err error ) {
func EcBalance ( commandEnv * CommandEnv , collections [ ] string , dc string , ecReplicaPlacement * super_block . ReplicaPlacement , applyBalancing bool ) ( err error ) {
if len ( collections ) == 0 {
if len ( collections ) == 0 {
return fmt . Errorf ( "no collections to balance" )
return fmt . Errorf ( "no collections to balance" )