diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 76a582b31..9ac082e81 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -56,6 +56,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry") + volumespPerStep := volFixReplicationCommand.Int("volumes_per_step", 0, "how many volumes to fix in one cycle") + if err = volFixReplicationCommand.Parse(args); err != nil { return nil } @@ -66,44 +68,54 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, takeAction := !*skipChange - // collect topology information - topologyInfo, _, err := collectTopologyInfo(commandEnv) - if err != nil { - return err - } - - // find all volumes that needs replication - // collect all data nodes - volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) + underReplicatedVolumeIdsCount := 1 + for underReplicatedVolumeIdsCount > 0 { + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv) + if err != nil { + return err + } - if len(allLocations) == 0 { - return fmt.Errorf("no data nodes at all") - } + // find all volumes that needs replication + // collect all data nodes + volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) - // find all under replicated volumes - var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 - for vid, replicas := range volumeReplicas { - replica := replicas[0] - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) - if replicaPlacement.GetCopyCount() > len(replicas) { - underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) - } else if replicaPlacement.GetCopyCount() < len(replicas) { - overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) - fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) + if len(allLocations) == 0 { + return fmt.Errorf("no data nodes at all") } - } - if len(overReplicatedVolumeIds) > 0 { - return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations) - } + // find all under replicated volumes + var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 + for vid, replicas := range volumeReplicas { + replica := replicas[0] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) + if replicaPlacement.GetCopyCount() > len(replicas) { + underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) + } else if replicaPlacement.GetCopyCount() < len(replicas) { + overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) + fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) + } + } - if len(underReplicatedVolumeIds) == 0 { - return nil - } + if len(overReplicatedVolumeIds) > 0 { + if err := c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations); err != nil { + return err + } + } - // find the most under populated data nodes - return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount) + underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) + if underReplicatedVolumeIdsCount > 0 { + // find the most under populated data nodes + if err := c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumespPerStep); err != nil { + return err + } + } + if *skipChange { + break + } + } + return nil } func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) { @@ -156,8 +168,10 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) { - +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumespPerStep int) (err error) { + if len(underReplicatedVolumeIds) > volumespPerStep && volumespPerStep > 0 { + underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumespPerStep] + } for _, vid := range underReplicatedVolumeIds { for i := 0; i < retryCount+1; i++ { if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil {