|
|
@ -29,6 +29,7 @@ func init() { |
|
|
|
|
|
|
|
type commandVolumeFixReplication struct { |
|
|
|
collectionPattern *string |
|
|
|
// TODO: move parameter flags here so we don't shuffle them around via function calls.
|
|
|
|
} |
|
|
|
|
|
|
|
func (c *commandVolumeFixReplication) Name() string { |
|
|
@ -67,6 +68,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, |
|
|
|
applyChanges := volFixReplicationCommand.Bool("force", false, "apply the fix") |
|
|
|
doDelete := volFixReplicationCommand.Bool("doDelete", true, "Also delete over-replicated volumes besides fixing under-replication") |
|
|
|
doCheck := volFixReplicationCommand.Bool("doCheck", true, "Also check synchronization before deleting") |
|
|
|
maxParallelization := volFixReplicationCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") |
|
|
|
retryCount := volFixReplicationCommand.Int("retry", 5, "how many times to retry") |
|
|
|
volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle") |
|
|
|
|
|
|
@ -81,6 +83,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
ewg := NewErrorWaitGroup(*maxParallelization) |
|
|
|
underReplicatedVolumeIdsCount := 1 |
|
|
|
for underReplicatedVolumeIdsCount > 0 { |
|
|
|
fixedVolumeReplicas := map[string]int{} |
|
|
@ -107,6 +110,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, |
|
|
|
switch { |
|
|
|
case replicaPlacement.GetCopyCount() > len(replicas) || !satisfyReplicaCurrentLocation(replicaPlacement, replicas): |
|
|
|
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) |
|
|
|
fmt.Fprintf(writer, "volume %d replication %s, but under replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) |
|
|
|
case isMisplaced(replicas, replicaPlacement): |
|
|
|
misplacedVolumeIds = append(misplacedVolumeIds, vid) |
|
|
|
fmt.Fprintf(writer, "volume %d replication %s is not well placed %s\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id) |
|
|
@ -115,30 +119,28 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, |
|
|
|
fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) |
|
|
|
} |
|
|
|
} |
|
|
|
underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) |
|
|
|
|
|
|
|
if !commandEnv.isLocked() { |
|
|
|
return fmt.Errorf("lock is lost") |
|
|
|
} |
|
|
|
|
|
|
|
if len(overReplicatedVolumeIds) > 0 && *doDelete { |
|
|
|
if err := c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if len(misplacedVolumeIds) > 0 && *doDelete { |
|
|
|
if err := c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) |
|
|
|
if underReplicatedVolumeIdsCount > 0 { |
|
|
|
ewg.Reset() |
|
|
|
ewg.Add(func() error { |
|
|
|
// find the most underpopulated data nodes
|
|
|
|
fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, *applyChanges, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
return err |
|
|
|
}) |
|
|
|
if *doDelete { |
|
|
|
ewg.Add(func() error { |
|
|
|
return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete) |
|
|
|
}) |
|
|
|
ewg.Add(func() error { |
|
|
|
return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume) |
|
|
|
}) |
|
|
|
} |
|
|
|
if err := ewg.Wait(); err != nil { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
if !*applyChanges { |
|
|
@ -219,8 +221,13 @@ func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, grpcDi |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, doCheck bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error { |
|
|
|
for _, vid := range overReplicatedVolumeIds { |
|
|
|
func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, doCheck bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error { |
|
|
|
if len(volumeIds) == 0 { |
|
|
|
// nothing to do
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
for _, vid := range volumeIds { |
|
|
|
replicas := volumeReplicas[vid] |
|
|
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement)) |
|
|
|
|
|
|
@ -279,12 +286,17 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) { |
|
|
|
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) { |
|
|
|
fixedVolumes = map[string]int{} |
|
|
|
if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 { |
|
|
|
underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep] |
|
|
|
|
|
|
|
if len(volumeIds) == 0 { |
|
|
|
return fixedVolumes, nil |
|
|
|
} |
|
|
|
|
|
|
|
if len(volumeIds) > volumesPerStep && volumesPerStep > 0 { |
|
|
|
volumeIds = volumeIds[0:volumesPerStep] |
|
|
|
} |
|
|
|
for _, vid := range underReplicatedVolumeIds { |
|
|
|
for _, vid := range volumeIds { |
|
|
|
for i := 0; i < retryCount+1; i++ { |
|
|
|
if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, applyChanges, volumeReplicas, vid, allLocations); err == nil { |
|
|
|
if applyChanges { |
|
|
|