@ -26,12 +26,25 @@ type DataCenterId string
type EcNodeId string
type RackId string
// EcDisk represents a single disk on a volume server
type EcDisk struct {
diskId uint32
diskType string
freeEcSlots int
ecShardCount int // Total EC shards on this disk
// Map of volumeId -> shardBits for shards on this disk
ecShards map [ needle . VolumeId ] erasure_coding . ShardBits
}
type EcNode struct {
info * master_pb . DataNodeInfo
dc DataCenterId
rack RackId
freeEcSlot int
// disks maps diskId -> EcDisk for disk-level balancing
disks map [ uint32 ] * EcDisk
}
type CandidateEcNode struct {
ecNode * EcNode
shardCount int
@ -229,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol
return collections
}
func moveMountedShardToEcNode ( commandEnv * CommandEnv , existingLocation * EcNode , collection string , vid needle . VolumeId , shardId erasure_coding . ShardId , destinationEcNode * EcNode , applyBalancing bool ) ( err error ) {
func moveMountedShardToEcNode ( commandEnv * CommandEnv , existingLocation * EcNode , collection string , vid needle . VolumeId , shardId erasure_coding . ShardId , destinationEcNode * EcNode , destDiskId uint32 , applyBalancing bool ) ( err error ) {
if ! commandEnv . isLocked ( ) {
return fmt . Errorf ( "lock is lost" )
@ -242,7 +255,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
existingServerAddress := pb . NewServerAddressFromDataNode ( existingLocation . info )
// ask destination node to copy shard and the ecx file from source node, and mount it
copiedShardIds , err = oneServerCopyAndMountEcShardsFromSource ( commandEnv . option . GrpcDialOption , destinationEcNode , [ ] uint32 { uint32 ( shardId ) } , vid , collection , existingServerAddress )
copiedShardIds , err = oneServerCopyAndMountEcShardsFromSource ( commandEnv . option . GrpcDialOption , destinationEcNode , [ ] uint32 { uint32 ( shardId ) } , vid , collection , existingServerAddress , destDiskId )
if err != nil {
return err
}
@ -259,7 +272,11 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
return err
}
fmt . Printf ( "moved ec shard %d.%d %s => %s\n" , vid , shardId , existingLocation . info . Id , destinationEcNode . info . Id )
if destDiskId > 0 {
fmt . Printf ( "moved ec shard %d.%d %s => %s (disk %d)\n" , vid , shardId , existingLocation . info . Id , destinationEcNode . info . Id , destDiskId )
} else {
fmt . Printf ( "moved ec shard %d.%d %s => %s\n" , vid , shardId , existingLocation . info . Id , destinationEcNode . info . Id )
}
}
@ -272,7 +289,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
func oneServerCopyAndMountEcShardsFromSource ( grpcDialOption grpc . DialOption ,
targetServer * EcNode , shardIdsToCopy [ ] uint32 ,
volumeId needle . VolumeId , collection string , existingLocation pb . ServerAddress ) ( copiedShardIds [ ] uint32 , err error ) {
volumeId needle . VolumeId , collection string , existingLocation pb . ServerAddress , destDiskId uint32 ) ( copiedShardIds [ ] uint32 , err error ) {
fmt . Printf ( "allocate %d.%v %s => %s\n" , volumeId , shardIdsToCopy , existingLocation , targetServer . info . Id )
@ -289,6 +306,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
CopyEcjFile : true ,
CopyVifFile : true ,
SourceDataNode : string ( existingLocation ) ,
DiskId : destDiskId ,
} )
if copyErr != nil {
return fmt . Errorf ( "copy %d.%v %s => %s : %v\n" , volumeId , shardIdsToCopy , existingLocation , targetServer . info . Id , copyErr )
@ -410,12 +428,59 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
}
freeEcSlots := countFreeShardSlots ( dn , types . HardDriveType )
ecNodes = append ( ecNodes , & EcNode {
ecNode := & EcNode {
info : dn ,
dc : dc ,
rack : rack ,
freeEcSlot : int ( freeEcSlots ) ,
} )
disks : make ( map [ uint32 ] * EcDisk ) ,
}
// Build disk-level information from EC shard info
for diskType , diskInfo := range dn . DiskInfos {
if diskInfo == nil {
continue
}
// Group EC shards by disk_id
diskShards := make ( map [ uint32 ] map [ needle . VolumeId ] erasure_coding . ShardBits )
for _ , ecShardInfo := range diskInfo . EcShardInfos {
diskId := ecShardInfo . DiskId
if diskShards [ diskId ] == nil {
diskShards [ diskId ] = make ( map [ needle . VolumeId ] erasure_coding . ShardBits )
}
vid := needle . VolumeId ( ecShardInfo . Id )
diskShards [ diskId ] [ vid ] = erasure_coding . ShardBits ( ecShardInfo . EcIndexBits )
}
// If no EC shards, still create disk entry based on DiskInfo.DiskId
if len ( diskShards ) == 0 && diskInfo . DiskId > 0 {
diskShards [ diskInfo . DiskId ] = make ( map [ needle . VolumeId ] erasure_coding . ShardBits )
}
// Create EcDisk for each disk_id found
for diskId , shards := range diskShards {
totalShardCount := 0
for _ , shardBits := range shards {
totalShardCount += shardBits . ShardIdCount ( )
}
// Estimate free slots per disk (simplified: divide evenly if multiple disks)
diskCount := len ( diskShards )
if diskCount == 0 {
diskCount = 1
}
freePerDisk := int ( freeEcSlots ) / diskCount
ecNode . disks [ diskId ] = & EcDisk {
diskId : diskId ,
diskType : diskType ,
freeEcSlots : freePerDisk ,
ecShardCount : totalShardCount ,
ecShards : shards ,
}
}
}
ecNodes = append ( ecNodes , ecNode )
totalFreeEcSlots += freeEcSlots
} )
return
@ -884,10 +949,16 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
for _ , shards := range fullDiskInfo . EcShardInfos {
if _ , found := emptyNodeIds [ shards . Id ] ; ! found {
for _ , shardId := range erasure_coding . ShardBits ( shards . EcIndexBits ) . ShardIds ( ) {
vid := needle . VolumeId ( shards . Id )
destDiskId := pickBestDiskOnNode ( emptyNode , vid )
fmt . Printf ( "%s moves ec shards %d.%d to %s\n" , fullNode . info . Id , shards . Id , shardId , emptyNode . info . Id )
if destDiskId > 0 {
fmt . Printf ( "%s moves ec shards %d.%d to %s (disk %d)\n" , fullNode . info . Id , shards . Id , shardId , emptyNode . info . Id , destDiskId )
} else {
fmt . Printf ( "%s moves ec shards %d.%d to %s\n" , fullNode . info . Id , shards . Id , shardId , emptyNode . info . Id )
}
err := moveMountedShardToEcNode ( ecb . commandEnv , fullNode , shards . Collection , needle . VolumeId ( shards . Id ) , shardId , emptyNode , ecb . applyBalancing )
err := moveMountedShardToEcNode ( ecb . commandEnv , fullNode , shards . Collection , vid , shardId , emptyNode , destDiskId , ecb . applyBalancing )
if err != nil {
return err
}
@ -957,18 +1028,98 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
if len ( targets ) == 0 {
return nil , errors . New ( details )
}
// When multiple nodes have the same shard count, prefer nodes with better disk distribution
// (i.e., nodes with more disks that have fewer shards of this volume)
if len ( targets ) > 1 {
slices . SortFunc ( targets , func ( a , b * EcNode ) int {
aScore := diskDistributionScore ( a , vid )
bScore := diskDistributionScore ( b , vid )
return aScore - bScore // Lower score is better
} )
return targets [ 0 ] , nil
}
return targets [ rand . IntN ( len ( targets ) ) ] , nil
}
// diskDistributionScore calculates a score for how well-distributed shards are on the node's disks
// Lower score is better (means more room for balanced distribution)
func diskDistributionScore ( ecNode * EcNode , vid needle . VolumeId ) int {
if len ( ecNode . disks ) == 0 {
return 0
}
// Sum the existing shard count for this volume on each disk
// Lower total means more room for new shards
score := 0
for _ , disk := range ecNode . disks {
if shardBits , ok := disk . ecShards [ vid ] ; ok {
score += shardBits . ShardIdCount ( ) * 10 // Weight shards of this volume heavily
}
score += disk . ecShardCount // Also consider total shards on disk
}
return score
}
// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard
// It prefers disks with fewer shards and more free slots
func pickBestDiskOnNode ( ecNode * EcNode , vid needle . VolumeId ) uint32 {
if len ( ecNode . disks ) == 0 {
return 0 // No disk info available, let the server decide
}
var bestDiskId uint32
bestScore := - 1
for diskId , disk := range ecNode . disks {
if disk . freeEcSlots <= 0 {
continue
}
// Check if this volume already has shards on this disk
existingShards := 0
if shardBits , ok := disk . ecShards [ vid ] ; ok {
existingShards = shardBits . ShardIdCount ( )
}
// Score: prefer disks with fewer total shards and fewer shards of this volume
// Lower score is better
score := disk . ecShardCount * 10 + existingShards * 100
if bestScore == - 1 || score < bestScore {
bestScore = score
bestDiskId = diskId
}
}
return bestDiskId
}
// pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk
func ( ecb * ecBalancer ) pickEcNodeAndDiskToBalanceShardsInto ( vid needle . VolumeId , existingLocation * EcNode , possibleDestinations [ ] * EcNode ) ( * EcNode , uint32 , error ) {
node , err := ecb . pickEcNodeToBalanceShardsInto ( vid , existingLocation , possibleDestinations )
if err != nil {
return nil , 0 , err
}
diskId := pickBestDiskOnNode ( node , vid )
return node , diskId , nil
}
func ( ecb * ecBalancer ) pickOneEcNodeAndMoveOneShard ( existingLocation * EcNode , collection string , vid needle . VolumeId , shardId erasure_coding . ShardId , possibleDestinationEcNodes [ ] * EcNode ) error {
destNode , err := ecb . pickEcNodeToBalanceShardsInto ( vid , existingLocation , possibleDestinationEcNodes )
destNode , destDiskId , err := ecb . pickEcNodeAndDisk ToBalanceShardsInto ( vid , existingLocation , possibleDestinationEcNodes )
if err != nil {
fmt . Printf ( "WARNING: Could not find suitable taget node for %d.%d:\n%s" , vid , shardId , err . Error ( ) )
fmt . Printf ( "WARNING: Could not find suitable tar get node for %d.%d:\n%s" , vid , shardId , err . Error ( ) )
return nil
}
fmt . Printf ( "%s moves ec shard %d.%d to %s\n" , existingLocation . info . Id , vid , shardId , destNode . info . Id )
return moveMountedShardToEcNode ( ecb . commandEnv , existingLocation , collection , vid , shardId , destNode , ecb . applyBalancing )
if destDiskId > 0 {
fmt . Printf ( "%s moves ec shard %d.%d to %s (disk %d)\n" , existingLocation . info . Id , vid , shardId , destNode . info . Id , destDiskId )
} else {
fmt . Printf ( "%s moves ec shard %d.%d to %s\n" , existingLocation . info . Id , vid , shardId , destNode . info . Id )
}
return moveMountedShardToEcNode ( ecb . commandEnv , existingLocation , collection , vid , shardId , destNode , destDiskId , ecb . applyBalancing )
}
func pickNEcShardsToMoveFrom ( ecNodes [ ] * EcNode , vid needle . VolumeId , n int ) map [ erasure_coding . ShardId ] * EcNode {