|
@ -144,23 +144,18 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *commandEnv, rebuilder * |
|
|
fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds) |
|
|
fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// ask the rebuilder to delete the copied shards
|
|
|
|
|
|
err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
fmt.Fprintf(writer, "%s delete generated ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, generatedShardIds) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
}() |
|
|
}() |
|
|
|
|
|
|
|
|
if !applyChanges { |
|
|
if !applyChanges { |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// generate ec shards, and maybe ecx file, and mount them
|
|
|
|
|
|
|
|
|
// generate ec shards, and maybe ecx file
|
|
|
generatedShardIds, err = generateMissingShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id) |
|
|
generatedShardIds, err = generateMissingShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
rebuilder.freeEcSlot -= len(generatedShardIds) |
|
|
|
|
|
|
|
|
// mount the generated shards
|
|
|
// mount the generated shards
|
|
|
err = mountEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) |
|
|
err = mountEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) |
|
@ -205,14 +200,14 @@ func prepareDataToRecover(ctx context.Context, commandEnv *commandEnv, rebuilder |
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if localShardBits.HasShardId(erasure_coding.ShardId(shardId)){ |
|
|
|
|
|
|
|
|
if localShardBits.HasShardId(erasure_coding.ShardId(shardId)) { |
|
|
localShardIds = append(localShardIds, uint32(shardId)) |
|
|
localShardIds = append(localShardIds, uint32(shardId)) |
|
|
fmt.Fprintf(writer, "use existing shard %d.%d\n", volumeId, shardId) |
|
|
fmt.Fprintf(writer, "use existing shard %d.%d\n", volumeId, shardId) |
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
var copyErr error |
|
|
var copyErr error |
|
|
if applyBalancing{ |
|
|
|
|
|
|
|
|
if applyBalancing { |
|
|
copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { |
|
|
copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { |
|
|
_, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ |
|
|
_, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ |
|
|
VolumeId: uint32(volumeId), |
|
|
VolumeId: uint32(volumeId), |
|
|