@ -64,7 +64,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
volFixReplicationCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
volFixReplicationCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
c . collectionPattern = volFixReplicationCommand . String ( "collectionPattern" , "" , "match with wildcard characters '*' and '?'" )
c . collectionPattern = volFixReplicationCommand . String ( "collectionPattern" , "" , "match with wildcard characters '*' and '?'" )
skipChange := volFixReplicationCommand . Bool ( "n " , false , "skip the changes " )
applyChanges := volFixReplicationCommand . Bool ( "force " , false , "apply the fix " )
doDelete := volFixReplicationCommand . Bool ( "doDelete" , true , "Also delete over-replicated volumes besides fixing under-replication" )
doDelete := volFixReplicationCommand . Bool ( "doDelete" , true , "Also delete over-replicated volumes besides fixing under-replication" )
doCheck := volFixReplicationCommand . Bool ( "doCheck" , true , "Also check synchronization before deleting" )
doCheck := volFixReplicationCommand . Bool ( "doCheck" , true , "Also check synchronization before deleting" )
retryCount := volFixReplicationCommand . Int ( "retry" , 5 , "how many times to retry" )
retryCount := volFixReplicationCommand . Int ( "retry" , 5 , "how many times to retry" )
@ -73,11 +73,11 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
if err = volFixReplicationCommand . Parse ( args ) ; err != nil {
if err = volFixReplicationCommand . Parse ( args ) ; err != nil {
return nil
return nil
}
}
infoAboutSimulationMode ( writer , * applyChanges , "-force" )
commandEnv . noLock = * skipChange
takeAction := ! * skipChange
commandEnv . noLock = ! * applyChanges
if err = commandEnv . confirmIsLocked ( args ) ; takeAction && err != nil {
if err = commandEnv . confirmIsLocked ( args ) ; * applyChanges && err != nil {
return
return
}
}
@ -121,13 +121,13 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
}
if len ( overReplicatedVolumeIds ) > 0 && * doDelete {
if len ( overReplicatedVolumeIds ) > 0 && * doDelete {
if err := c . deleteOneVolume ( commandEnv , writer , takeAction , * doCheck , overReplicatedVolumeIds , volumeReplicas , allLocations , pickOneReplicaToDelete ) ; err != nil {
if err := c . deleteOneVolume ( commandEnv , writer , * applyChanges , * doCheck , overReplicatedVolumeIds , volumeReplicas , allLocations , pickOneReplicaToDelete ) ; err != nil {
return err
return err
}
}
}
}
if len ( misplacedVolumeIds ) > 0 && * doDelete {
if len ( misplacedVolumeIds ) > 0 && * doDelete {
if err := c . deleteOneVolume ( commandEnv , writer , takeAction , * doCheck , misplacedVolumeIds , volumeReplicas , allLocations , pickOneMisplacedVolume ) ; err != nil {
if err := c . deleteOneVolume ( commandEnv , writer , * applyChanges , * doCheck , misplacedVolumeIds , volumeReplicas , allLocations , pickOneMisplacedVolume ) ; err != nil {
return err
return err
}
}
}
}
@ -135,13 +135,13 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
underReplicatedVolumeIdsCount = len ( underReplicatedVolumeIds )
underReplicatedVolumeIdsCount = len ( underReplicatedVolumeIds )
if underReplicatedVolumeIdsCount > 0 {
if underReplicatedVolumeIdsCount > 0 {
// find the most underpopulated data nodes
// find the most underpopulated data nodes
fixedVolumeReplicas , err = c . fixUnderReplicatedVolumes ( commandEnv , writer , takeAction , underReplicatedVolumeIds , volumeReplicas , allLocations , * retryCount , * volumesPerStep )
fixedVolumeReplicas , err = c . fixUnderReplicatedVolumes ( commandEnv , writer , * applyChanges , underReplicatedVolumeIds , volumeReplicas , allLocations , * retryCount , * volumesPerStep )
if err != nil {
if err != nil {
return err
return err
}
}
}
}
if * skipChange {
if ! * applyChanges {
break
break
}
}
@ -219,7 +219,7 @@ func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, grpcDi
return
return
}
}
func ( c * commandVolumeFixReplication ) deleteOneVolume ( commandEnv * CommandEnv , writer io . Writer , takeAction bool , doCheck bool , overReplicatedVolumeIds [ ] uint32 , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , allLocations [ ] location , selectOneVolumeFn SelectOneVolumeFunc ) error {
func ( c * commandVolumeFixReplication ) deleteOneVolume ( commandEnv * CommandEnv , writer io . Writer , applyChanges bool , doCheck bool , overReplicatedVolumeIds [ ] uint32 , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , allLocations [ ] location , selectOneVolumeFn SelectOneVolumeFunc ) error {
for _ , vid := range overReplicatedVolumeIds {
for _ , vid := range overReplicatedVolumeIds {
replicas := volumeReplicas [ vid ]
replicas := volumeReplicas [ vid ]
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( replicas [ 0 ] . info . ReplicaPlacement ) )
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( replicas [ 0 ] . info . ReplicaPlacement ) )
@ -250,7 +250,7 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr
fmt . Fprintf ( writer , "deleting volume %d from %s ...\n" , replica . info . Id , replica . location . dataNode . Id )
fmt . Fprintf ( writer , "deleting volume %d from %s ...\n" , replica . info . Id , replica . location . dataNode . Id )
if ! takeAction {
if ! applyChanges {
break
break
}
}
@ -279,15 +279,15 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr
return nil
return nil
}
}
func ( c * commandVolumeFixReplication ) fixUnderReplicatedVolumes ( commandEnv * CommandEnv , writer io . Writer , takeAction bool , underReplicatedVolumeIds [ ] uint32 , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , allLocations [ ] location , retryCount int , volumesPerStep int ) ( fixedVolumes map [ string ] int , err error ) {
func ( c * commandVolumeFixReplication ) fixUnderReplicatedVolumes ( commandEnv * CommandEnv , writer io . Writer , applyChanges bool , underReplicatedVolumeIds [ ] uint32 , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , allLocations [ ] location , retryCount int , volumesPerStep int ) ( fixedVolumes map [ string ] int , err error ) {
fixedVolumes = map [ string ] int { }
fixedVolumes = map [ string ] int { }
if len ( underReplicatedVolumeIds ) > volumesPerStep && volumesPerStep > 0 {
if len ( underReplicatedVolumeIds ) > volumesPerStep && volumesPerStep > 0 {
underReplicatedVolumeIds = underReplicatedVolumeIds [ 0 : volumesPerStep ]
underReplicatedVolumeIds = underReplicatedVolumeIds [ 0 : volumesPerStep ]
}
}
for _ , vid := range underReplicatedVolumeIds {
for _ , vid := range underReplicatedVolumeIds {
for i := 0 ; i < retryCount + 1 ; i ++ {
for i := 0 ; i < retryCount + 1 ; i ++ {
if err = c . fixOneUnderReplicatedVolume ( commandEnv , writer , takeAction , volumeReplicas , vid , allLocations ) ; err == nil {
if takeAction {
if err = c . fixOneUnderReplicatedVolume ( commandEnv , writer , applyChanges , volumeReplicas , vid , allLocations ) ; err == nil {
if applyChanges {
fixedVolumes [ strconv . FormatUint ( uint64 ( vid ) , 10 ) ] = len ( volumeReplicas [ vid ] )
fixedVolumes [ strconv . FormatUint ( uint64 ( vid ) , 10 ) ] = len ( volumeReplicas [ vid ] )
}
}
break
break
@ -299,7 +299,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
return fixedVolumes , nil
return fixedVolumes , nil
}
}
func ( c * commandVolumeFixReplication ) fixOneUnderReplicatedVolume ( commandEnv * CommandEnv , writer io . Writer , takeAction bool , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , vid uint32 , allLocations [ ] location ) error {
func ( c * commandVolumeFixReplication ) fixOneUnderReplicatedVolume ( commandEnv * CommandEnv , writer io . Writer , applyChanges bool , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , vid uint32 , allLocations [ ] location ) error {
replicas := volumeReplicas [ vid ]
replicas := volumeReplicas [ vid ]
replica := pickOneReplicaToCopyFrom ( replicas )
replica := pickOneReplicaToCopyFrom ( replicas )
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( replica . info . ReplicaPlacement ) )
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( replica . info . ReplicaPlacement ) )
@ -326,7 +326,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
foundNewLocation = true
foundNewLocation = true
fmt . Fprintf ( writer , "replicating volume %d %s from %s to dataNode %s ...\n" , replica . info . Id , replicaPlacement , replica . location . dataNode . Id , dst . dataNode . Id )
fmt . Fprintf ( writer , "replicating volume %d %s from %s to dataNode %s ...\n" , replica . info . Id , replicaPlacement , replica . location . dataNode . Id , dst . dataNode . Id )
if ! takeAction {
if ! applyChanges {
// adjust volume count
// adjust volume count
addVolumeCount ( dst . dataNode . DiskInfos [ replica . info . DiskType ] , 1 )
addVolumeCount ( dst . dataNode . DiskInfos [ replica . info . DiskType ] , 1 )
break
break