diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 18cea0504..3483156cb 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -208,6 +208,18 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr } +func LookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (err error, volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation) { + var resp *master_pb.LookupVolumeResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds}) + return err + }) + if err != nil { + return err, nil + } + return nil, resp.VolumeIdLocations +} + func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) { var resp *master_pb.VolumeListResponse diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 21b3ead6b..f03cd550e 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -10,6 +10,8 @@ import ( "io" "path/filepath" "sort" + "strconv" + "time" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -70,6 +72,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, underReplicatedVolumeIdsCount := 1 for underReplicatedVolumeIdsCount > 0 { + fixedVolumeReplicas := map[string]int{} + // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv) if err != nil { @@ -106,7 +110,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) if underReplicatedVolumeIdsCount > 0 { // find the most under populated data nodes - if err := c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep); err != nil { + err, fixedVolumeReplicas = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep) + if err != nil { return err } } @@ -114,6 +119,36 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, if *skipChange { break } + + // check that the topology has been updated + if len(fixedVolumeReplicas) > 0 { + fixedVolumes := make([]string, 0, len(fixedVolumeReplicas)) + for k, _ := range fixedVolumeReplicas { + fixedVolumes = append(fixedVolumes, k) + } + err, volumeIdLocations := LookupVolumeIds(commandEnv, fixedVolumes) + if err != nil { + return err + } + for _, volumeIdLocation := range volumeIdLocations { + volumeId := volumeIdLocation.VolumeOrFileId + volumeIdLocationCount := len(volumeIdLocation.Locations) + i := 0 + for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount { + fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId) + time.Sleep(time.Duration(i+1) * time.Second * 7) + err, volumeLocIds := LookupVolumeIds(commandEnv, []string{volumeId}) + if err != nil { + return err + } + volumeIdLocationCount = len(volumeLocIds[0].Locations) + if *retryCount > i { + return fmt.Errorf("replicas volume %s mismatch in topology", volumeId) + } + i += 1 + } + } + } } return nil } @@ -168,18 +203,22 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (err error) { +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (err error, fixedVolumes map[string]int) { + fixedVolumes = map[string]int{} if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 { underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep] } for _, vid := range underReplicatedVolumeIds { for i := 0; i < retryCount+1; i++ { if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil { + if takeAction { + fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid]) + } break } } } - return + return nil, fixedVolumes } func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {