Browse Source

refactor

pull/2353/head
Chris Lu 3 years ago
parent
commit
e862b2529a
  1. 6
      weed/shell/command_ec_decode.go
  2. 10
      weed/shell/command_volume_fix_replication.go

6
weed/shell/command_ec_decode.go

@ -208,16 +208,16 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr
} }
func LookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (err error, volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation) {
func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) {
var resp *master_pb.LookupVolumeResponse var resp *master_pb.LookupVolumeResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds}) resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds})
return err return err
}) })
if err != nil { if err != nil {
return err, nil
return nil, err
} }
return nil, resp.VolumeIdLocations
return resp.VolumeIdLocations, nil
} }
func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) { func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {

10
weed/shell/command_volume_fix_replication.go

@ -110,7 +110,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
if underReplicatedVolumeIdsCount > 0 { if underReplicatedVolumeIdsCount > 0 {
// find the most under populated data nodes // find the most under populated data nodes
err, fixedVolumeReplicas = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep)
fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep)
if err != nil { if err != nil {
return err return err
} }
@ -126,7 +126,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
for k, _ := range fixedVolumeReplicas { for k, _ := range fixedVolumeReplicas {
fixedVolumes = append(fixedVolumes, k) fixedVolumes = append(fixedVolumes, k)
} }
err, volumeIdLocations := LookupVolumeIds(commandEnv, fixedVolumes)
volumeIdLocations, err := lookupVolumeIds(commandEnv, fixedVolumes)
if err != nil { if err != nil {
return err return err
} }
@ -137,7 +137,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount { for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount {
fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId) fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId)
time.Sleep(time.Duration(i+1) * time.Second * 7) time.Sleep(time.Duration(i+1) * time.Second * 7)
err, volumeLocIds := LookupVolumeIds(commandEnv, []string{volumeId})
volumeLocIds, err := lookupVolumeIds(commandEnv, []string{volumeId})
if err != nil { if err != nil {
return err return err
} }
@ -203,7 +203,7 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
return nil return nil
} }
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (err error, fixedVolumes map[string]int) {
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) {
fixedVolumes = map[string]int{} fixedVolumes = map[string]int{}
if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 { if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 {
underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep] underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep]
@ -218,7 +218,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
} }
} }
} }
return nil, fixedVolumes
return fixedVolumes, nil
} }
func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error { func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {

Loading…
Cancel
Save