@ -138,7 +138,6 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
if underReplicatedVolumeIdsCount > 0 {
// find the most underpopulated data nodes
fixedVolumeReplicas , err = c . fixUnderReplicatedVolumes ( commandEnv , writer , * applyChanges , underReplicatedVolumeIds , volumeReplicas , allLocations , * retryCount , * volumesPerStep , * maxParallelization )
if err != nil {
return err
}
@ -282,7 +281,6 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr
return nil
}
func ( c * commandVolumeFixReplication ) fixUnderReplicatedVolumes ( commandEnv * CommandEnv , writer io . Writer , applyChanges bool , underReplicatedVolumeIds [ ] uint32 , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , allLocations [ ] location , retryCount int , volumesPerStep , maxParallelization int ) ( fixedVolumes map [ string ] int , err error ) {
fixedVolumes = map [ string ] int { }
if len ( underReplicatedVolumeIds ) > volumesPerStep && volumesPerStep > 0 {
@ -291,6 +289,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
var (
wg sync . WaitGroup
mu = & sync . Mutex { }
semaphore = make ( chan struct { } , maxParallelization )
)
@ -303,7 +302,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
defer func ( ) { <- semaphore } ( ) // Release semaphore
for attempt := 0 ; attempt <= retryCount ; attempt ++ {
if err := c . fixOneUnderReplicatedVolume ( commandEnv , writer , applyChanges , volumeReplicas , vid , allLocations ) ; err == nil {
if err := c . fixOneUnderReplicatedVolume ( commandEnv , writer , applyChanges , volumeReplicas , vid , allLocations , mu ) ; err == nil {
if applyChanges {
fixedVolumes [ strconv . FormatUint ( uint64 ( vid ) , 10 ) ] = len ( volumeReplicas [ vid ] )
}
@ -318,17 +317,10 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
return fixedVolumes , nil
}
func ( c * commandVolumeFixReplication ) fixOneUnderReplicatedVolume ( commandEnv * CommandEnv , writer io . Writer , applyChanges bool , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , vid uint32 , allLocations [ ] location ) error {
func ( c * commandVolumeFixReplication ) fixOneUnderReplicatedVolume ( commandEnv * CommandEnv , writer io . Writer , applyChanges bool , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , vid uint32 , allLocations [ ] location , mu * sync . Mutex ) error {
replicas := volumeReplicas [ vid ]
replica := pickOneReplicaToCopyFrom ( replicas )
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( replica . info . ReplicaPlacement ) )
foundNewLocation := false
hasSkippedCollection := false
keepDataNodesSorted ( allLocations , types . ToDiskType ( replica . info . DiskType ) )
fn := capacityByFreeVolumeCount ( types . ToDiskType ( replica . info . DiskType ) )
for _ , dst := range allLocations {
// check whether data nodes satisfy the constraints
if fn ( dst . dataNode ) > 0 && satisfyReplicaPlacement ( replicaPlacement , replicas , dst ) {
// check collection name pattern
if * c . collectionPattern != "" {
matched , err := filepath . Match ( * c . collectionPattern , replica . info . Collection )
@ -336,19 +328,20 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
return fmt . Errorf ( "match pattern %s with collection %s: %v" , * c . collectionPattern , replica . info . Collection , err )
}
if ! matched {
hasSkippedCollection = true
break
return nil
}
}
mu . Lock ( )
dst := findTargetNode ( allLocations , replica , replicaPlacement , replicas )
mu . Unlock ( )
if dst == nil {
fmt . Fprintf ( writer , "failed to place volume %d replica as %s, existing:%+v\n" , replica . info . Id , replicaPlacement , len ( replicas ) )
return nil
}
// ask the volume server to replicate the volume
foundNewLocation = true
fmt . Fprintf ( writer , "replicating volume %d %s from %s to dataNode %s ...\n" , replica . info . Id , replicaPlacement , replica . location . dataNode . Id , dst . dataNode . Id )
if ! applyChanges {
// adjust volume count
addVolumeCount ( dst . dataNode . DiskInfos [ replica . info . DiskType ] , 1 )
break
return nil
}
err := operation . WithVolumeServerClient ( false , pb . NewServerAddressFromDataNode ( dst . dataNode ) , commandEnv . option . GrpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
@ -377,18 +370,24 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
} )
if err != nil {
// rollback
mu . Lock ( )
addVolumeCount ( dst . dataNode . DiskInfos [ replica . info . DiskType ] , - 1 )
mu . Lock ( )
}
return err
}
// adjust volume count
func findTargetNode ( allLocations [ ] location , replica * VolumeReplica , replicaPlacement * super_block . ReplicaPlacement , replicas [ ] * VolumeReplica ) * location {
keepDataNodesSorted ( allLocations , types . ToDiskType ( replica . info . DiskType ) )
fn := capacityByFreeVolumeCount ( types . ToDiskType ( replica . info . DiskType ) )
for _ , dst := range allLocations {
if fn ( dst . dataNode ) > 0 && satisfyReplicaPlacement ( replicaPlacement , replicas , dst ) {
addVolumeCount ( dst . dataNode . DiskInfos [ replica . info . DiskType ] , 1 )
break
return & dst
}
}
if ! foundNewLocation && ! hasSkippedCollection {
fmt . Fprintf ( writer , "failed to place volume %d replica as %s, existing:%+v\n" , replica . info . Id , replicaPlacement , len ( replicas ) )
}
return nil
}