|
|
@ -203,6 +203,14 @@ func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer |
|
|
|
server.addEcVolumeShards(volumeId, collection, copiedShardIds) |
|
|
|
} |
|
|
|
} |
|
|
|
cleanupFunc := func(server *EcNode, allocatedEcShardIds []uint32) { |
|
|
|
if err := unmountEcShards(grpcDialOption, volumeId, pb.NewServerAddressFromDataNode(server.info), allocatedEcShardIds); err != nil { |
|
|
|
fmt.Printf("unmount aborted shards %d.%v on %s: %v\n", volumeId, allocatedEcShardIds, server.info.Id, err) |
|
|
|
} |
|
|
|
if err := sourceServerDeleteEcShards(grpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(server.info), allocatedEcShardIds); err != nil { |
|
|
|
fmt.Printf("remove aborted shards %d.%v on %s: %v\n", volumeId, allocatedEcShardIds, server.info.Id, err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// maybe parallelize
|
|
|
|
for i, server := range targetServers { |
|
|
@ -221,6 +229,12 @@ func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer |
|
|
|
close(shardIdChan) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
for i, server := range targetServers { |
|
|
|
if len(allocatedEcIds[i]) <= 0 { |
|
|
|
continue |
|
|
|
} |
|
|
|
cleanupFunc(server, allocatedEcIds[i]) |
|
|
|
} |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|