diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 5c3218111..34f6116bb 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -138,7 +138,6 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, if underReplicatedVolumeIdsCount > 0 { // find the most underpopulated data nodes fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, *applyChanges, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep, *maxParallelization) - if err != nil { return err } @@ -282,7 +281,6 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr return nil } - func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep, maxParallelization int) (fixedVolumes map[string]int, err error) { fixedVolumes = map[string]int{} if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 { @@ -291,6 +289,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm var ( wg sync.WaitGroup + mu = &sync.Mutex{} semaphore = make(chan struct{}, maxParallelization) ) @@ -303,7 +302,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm defer func() { <-semaphore }() // Release semaphore for attempt := 0; attempt <= retryCount; attempt++ { - if err := c.fixOneUnderReplicatedVolume(commandEnv, writer, applyChanges, volumeReplicas, vid, allLocations); err == nil { + if err := c.fixOneUnderReplicatedVolume(commandEnv, writer, applyChanges, volumeReplicas, vid, allLocations, mu); err == nil { if applyChanges { fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid]) } @@ -318,77 +317,77 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm return fixedVolumes, nil } -func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error { +func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location, mu *sync.Mutex) error { replicas := volumeReplicas[vid] replica := pickOneReplicaToCopyFrom(replicas) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) - foundNewLocation := false - hasSkippedCollection := false - keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) - fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) - for _, dst := range allLocations { - // check whether data nodes satisfy the constraints - if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { - // check collection name pattern - if *c.collectionPattern != "" { - matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) - if err != nil { - return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) - } - if !matched { - hasSkippedCollection = true + // check collection name pattern + if *c.collectionPattern != "" { + matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) + if err != nil { + return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) + } + if !matched { + return nil + } + } + mu.Lock() + dst := findTargetNode(allLocations, replica, replicaPlacement, replicas) + mu.Unlock() + if dst == nil { + fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) + return nil + } + + fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) + if !applyChanges { + return nil + } + + err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ + VolumeId: replica.info.Id, + SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)), + }) + if replicateErr != nil { + return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) + } + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { break + } else { + return recvErr } } - - // ask the volume server to replicate the volume - foundNewLocation = true - fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) - - if !applyChanges { - // adjust volume count - addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1) - break + if resp.ProcessedBytes > 0 { + fmt.Fprintf(writer, "volume %d processed %d bytes\n", replica.info.Id, resp.ProcessedBytes) } + } - err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ - VolumeId: replica.info.Id, - SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)), - }) - if replicateErr != nil { - return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) - } - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - break - } else { - return recvErr - } - } - if resp.ProcessedBytes > 0 { - fmt.Fprintf(writer, "volume %d processed %d bytes\n", replica.info.Id, resp.ProcessedBytes) - } - } + return nil + }) - return nil - }) + if err != nil { + // rollback + mu.Lock() + addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], -1) + mu.Lock() + } - if err != nil { - return err - } + return err +} - // adjust volume count +func findTargetNode(allLocations []location, replica *VolumeReplica, replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica) *location { + keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) + fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) + for _, dst := range allLocations { + if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1) - break + return &dst } } - - if !foundNewLocation && !hasSkippedCollection { - fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) - } return nil }