@ -3,12 +3,13 @@ package shell
import (
import (
"flag"
"flag"
"fmt"
"fmt"
"io"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"golang.org/x/exp/slices"
"golang.org/x/exp/slices"
"io"
)
)
func init ( ) {
func init ( ) {
@ -184,7 +185,7 @@ func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*E
func deleteDuplicatedEcShards ( commandEnv * CommandEnv , allEcNodes [ ] * EcNode , collection string , applyBalancing bool ) error {
func deleteDuplicatedEcShards ( commandEnv * CommandEnv , allEcNodes [ ] * EcNode , collection string , applyBalancing bool ) error {
// vid => []ecNode
// vid => []ecNode
vidLocations := collectVolumeIdToEcNodes ( allEcNodes )
vidLocations := collectVolumeIdToEcNodes ( allEcNodes , collection )
// deduplicate ec shards
// deduplicate ec shards
for vid , locations := range vidLocations {
for vid , locations := range vidLocations {
if err := doDeduplicateEcShards ( commandEnv , collection , vid , locations , applyBalancing ) ; err != nil {
if err := doDeduplicateEcShards ( commandEnv , collection , vid , locations , applyBalancing ) ; err != nil {
@ -230,7 +231,7 @@ func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle
func balanceEcShardsAcrossRacks ( commandEnv * CommandEnv , allEcNodes [ ] * EcNode , racks map [ RackId ] * EcRack , collection string , applyBalancing bool ) error {
func balanceEcShardsAcrossRacks ( commandEnv * CommandEnv , allEcNodes [ ] * EcNode , racks map [ RackId ] * EcRack , collection string , applyBalancing bool ) error {
// collect vid => []ecNode, since previous steps can change the locations
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := collectVolumeIdToEcNodes ( allEcNodes )
vidLocations := collectVolumeIdToEcNodes ( allEcNodes , collection )
// spread the ec shards evenly
// spread the ec shards evenly
for vid , locations := range vidLocations {
for vid , locations := range vidLocations {
if err := doBalanceEcShardsAcrossRacks ( commandEnv , collection , vid , locations , racks , applyBalancing ) ; err != nil {
if err := doBalanceEcShardsAcrossRacks ( commandEnv , collection , vid , locations , racks , applyBalancing ) ; err != nil {
@ -309,7 +310,7 @@ func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]i
func balanceEcShardsWithinRacks ( commandEnv * CommandEnv , allEcNodes [ ] * EcNode , racks map [ RackId ] * EcRack , collection string , applyBalancing bool ) error {
func balanceEcShardsWithinRacks ( commandEnv * CommandEnv , allEcNodes [ ] * EcNode , racks map [ RackId ] * EcRack , collection string , applyBalancing bool ) error {
// collect vid => []ecNode, since previous steps can change the locations
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := collectVolumeIdToEcNodes ( allEcNodes )
vidLocations := collectVolumeIdToEcNodes ( allEcNodes , collection )
// spread the ec shards evenly
// spread the ec shards evenly
for vid , locations := range vidLocations {
for vid , locations := range vidLocations {
@ -520,7 +521,7 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[
return picked
return picked
}
}
func collectVolumeIdToEcNodes ( allEcNodes [ ] * EcNode ) map [ needle . VolumeId ] [ ] * EcNode {
func collectVolumeIdToEcNodes ( allEcNodes [ ] * EcNode , collection string ) map [ needle . VolumeId ] [ ] * EcNode {
vidLocations := make ( map [ needle . VolumeId ] [ ] * EcNode )
vidLocations := make ( map [ needle . VolumeId ] [ ] * EcNode )
for _ , ecNode := range allEcNodes {
for _ , ecNode := range allEcNodes {
diskInfo , found := ecNode . info . DiskInfos [ string ( types . HardDriveType ) ]
diskInfo , found := ecNode . info . DiskInfos [ string ( types . HardDriveType ) ]
@ -528,7 +529,10 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode) map[needle.VolumeId][]*EcNod
continue
continue
}
}
for _ , shardInfo := range diskInfo . EcShardInfos {
for _ , shardInfo := range diskInfo . EcShardInfos {
vidLocations [ needle . VolumeId ( shardInfo . Id ) ] = append ( vidLocations [ needle . VolumeId ( shardInfo . Id ) ] , ecNode )
// ignore if not in current collection
if shardInfo . Collection == collection {
vidLocations [ needle . VolumeId ( shardInfo . Id ) ] = append ( vidLocations [ needle . VolumeId ( shardInfo . Id ) ] , ecNode )
}
}
}
}
}
return vidLocations
return vidLocations