@ -192,7 +192,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
}
}
// encode all requested volumes...
// encode all requested volumes...
if err = doEcEncode ( commandEnv , writer , volumeIdToCollection , volumeIds , * maxParallelization ) ; err != nil {
if err = doEcEncode ( commandEnv , writer , volumeIdToCollection , volumeIds , * maxParallelization , topologyInfo ) ; err != nil {
return fmt . Errorf ( "ec encode for volumes %v: %w" , volumeIds , err )
return fmt . Errorf ( "ec encode for volumes %v: %w" , volumeIds , err )
}
}
// ...re-balance ec shards...
// ...re-balance ec shards...
@ -222,7 +222,7 @@ func volumeLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId) (map[n
return res , nil
return res , nil
}
}
func doEcEncode ( commandEnv * CommandEnv , writer io . Writer , volumeIdToCollection map [ needle . VolumeId ] string , volumeIds [ ] needle . VolumeId , maxParallelization int ) error {
func doEcEncode ( commandEnv * CommandEnv , writer io . Writer , volumeIdToCollection map [ needle . VolumeId ] string , volumeIds [ ] needle . VolumeId , maxParallelization int , topologyInfo * master_pb . TopologyInfo ) error {
if ! commandEnv . isLocked ( ) {
if ! commandEnv . isLocked ( ) {
return fmt . Errorf ( "lock is lost" )
return fmt . Errorf ( "lock is lost" )
}
}
@ -231,6 +231,17 @@ func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection m
return fmt . Errorf ( "failed to get volume locations for EC encoding: %w" , err )
return fmt . Errorf ( "failed to get volume locations for EC encoding: %w" , err )
}
}
// build a map of (volumeId, serverAddress) -> freeVolumeCount
freeVolumeCountMap := make ( map [ string ] int ) // key: volumeId-serverAddress
eachDataNode ( topologyInfo , func ( dc DataCenterId , rack RackId , dn * master_pb . DataNodeInfo ) {
for _ , diskInfo := range dn . DiskInfos {
for _ , v := range diskInfo . VolumeInfos {
key := fmt . Sprintf ( "%d-%s" , v . Id , dn . Id )
freeVolumeCountMap [ key ] = int ( diskInfo . FreeVolumeCount )
}
}
} )
// mark volumes as readonly
// mark volumes as readonly
ewg := NewErrorWaitGroup ( maxParallelization )
ewg := NewErrorWaitGroup ( maxParallelization )
for _ , vid := range volumeIds {
for _ , vid := range volumeIds {
@ -254,8 +265,22 @@ func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection m
for _ , vid := range volumeIds {
for _ , vid := range volumeIds {
locs := locations [ vid ]
locs := locations [ vid ]
collection := volumeIdToCollection [ vid ]
collection := volumeIdToCollection [ vid ]
// Filter locations to only include those on healthy disks (FreeVolumeCount >= 2)
var filteredLocs [ ] wdclient . Location
for _ , l := range locs {
key := fmt . Sprintf ( "%d-%s" , vid , l . Url )
if freeCount , found := freeVolumeCountMap [ key ] ; found && freeCount >= 2 {
filteredLocs = append ( filteredLocs , l )
}
}
if len ( filteredLocs ) == 0 {
return fmt . Errorf ( "no healthy replicas (FreeVolumeCount >= 2) found for volume %d to use as source for EC encoding" , vid )
}
// Sync missing entries between replicas, then select the best one
// Sync missing entries between replicas, then select the best one
bestLoc , selectErr := syncAndSelectBestReplica ( commandEnv . option . GrpcDialOption , vid , collection , locs , "" , writer )
bestLoc , selectErr := syncAndSelectBestReplica ( commandEnv . option . GrpcDialOption , vid , collection , fi lteredL ocs, "" , writer )
if selectErr != nil {
if selectErr != nil {
return fmt . Errorf ( "failed to sync and select replica for volume %d: %v" , vid , selectErr )
return fmt . Errorf ( "failed to sync and select replica for volume %d: %v" , vid , selectErr )
}
}
@ -444,36 +469,23 @@ func selectVolumeIdsFromTopology(topologyInfo *master_pb.TopologyInfo, volumeSiz
}
}
// check free disk space
// check free disk space
if good , found := vidMap [ v . Id ] ; found {
if good {
if diskInfo . FreeVolumeCount < 2 {
glog . V ( 0 ) . Infof ( "skip %s %d on %s, no free disk" , v . Collection , v . Id , dn . Id )
if verbose {
fmt . Printf ( "skip volume %d on %s: insufficient free disk space (free volumes: %d, required: 2)\n" ,
v . Id , dn . Id , diskInfo . FreeVolumeCount )
}
vidMap [ v . Id ] = false
noFreeDisk ++
}
if diskInfo . FreeVolumeCount < 2 {
glog . V ( 0 ) . Infof ( "replica %s %d on %s has no free disk" , v . Collection , v . Id , dn . Id )
if verbose {
fmt . Printf ( "skip replica of volume %d on %s: insufficient free disk space (free volumes: %d, required: 2)\n" ,
v . Id , dn . Id , diskInfo . FreeVolumeCount )
}
}
} else {
if diskInfo . FreeVolumeCount < 2 {
glog . V ( 0 ) . Infof ( "skip %s %d on %s, no free disk" , v . Collection , v . Id , dn . Id )
if verbose {
fmt . Printf ( "skip volume %d on %s: insufficient free disk space (free volumes: %d, required: 2)\n" ,
v . Id , dn . Id , diskInfo . FreeVolumeCount )
}
if _ , found := vidMap [ v . Id ] ; ! found {
vidMap [ v . Id ] = false
vidMap [ v . Id ] = false
noFreeDisk ++
} else {
if verbose {
fmt . Printf ( "selected volume %d on %s: size %.1f MB (%.1f%% full), last modified %d seconds ago, free volumes: %d\n" ,
v . Id , dn . Id , float64 ( v . Size ) / ( 1024 * 1024 ) ,
float64 ( v . Size ) * 100 / ( float64 ( volumeSizeLimitMb ) * 1024 * 1024 ) ,
nowUnixSeconds - v . ModifiedAtSecond , diskInfo . FreeVolumeCount )
}
vidMap [ v . Id ] = true
}
}
} else {
if verbose {
fmt . Printf ( "selected volume %d on %s: size %.1f MB (%.1f%% full), last modified %d seconds ago, free volumes: %d\n" ,
v . Id , dn . Id , float64 ( v . Size ) / ( 1024 * 1024 ) ,
float64 ( v . Size ) * 100 / ( float64 ( volumeSizeLimitMb ) * 1024 * 1024 ) ,
nowUnixSeconds - v . ModifiedAtSecond , diskInfo . FreeVolumeCount )
}
vidMap [ v . Id ] = true
}
}
}
}
}
}
@ -482,6 +494,8 @@ func selectVolumeIdsFromTopology(topologyInfo *master_pb.TopologyInfo, volumeSiz
for vid , good := range vidMap {
for vid , good := range vidMap {
if good {
if good {
vids = append ( vids , needle . VolumeId ( vid ) )
vids = append ( vids , needle . VolumeId ( vid ) )
} else {
noFreeDisk ++
}
}
}
}