|
|
|
@ -2,6 +2,7 @@ package erasure_coding |
|
|
|
|
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"errors" |
|
|
|
"fmt" |
|
|
|
"io" |
|
|
|
"os" |
|
|
|
@ -112,13 +113,31 @@ func DistributeEcShards(volumeID uint32, collection string, targets []*worker_pb |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
shardAssignment[target.Node] = assignedShards |
|
|
|
existing := shardAssignment[target.Node] |
|
|
|
if len(existing) == 0 { |
|
|
|
shardAssignment[target.Node] = assignedShards |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
seen := make(map[string]struct{}, len(existing)) |
|
|
|
for _, shard := range existing { |
|
|
|
seen[shard] = struct{}{} |
|
|
|
} |
|
|
|
for _, shard := range assignedShards { |
|
|
|
if _, ok := seen[shard]; ok { |
|
|
|
continue |
|
|
|
} |
|
|
|
seen[shard] = struct{}{} |
|
|
|
existing = append(existing, shard) |
|
|
|
} |
|
|
|
shardAssignment[target.Node] = existing |
|
|
|
} |
|
|
|
|
|
|
|
if len(shardAssignment) == 0 { |
|
|
|
return nil, fmt.Errorf("no shard assignments found from planning phase") |
|
|
|
} |
|
|
|
|
|
|
|
var mountErrors []error |
|
|
|
for destNode, assignedShards := range shardAssignment { |
|
|
|
withFields(log, map[string]interface{}{ |
|
|
|
"destination": destNode, |
|
|
|
@ -215,6 +234,7 @@ func MountEcShards(volumeID uint32, collection string, shardAssignment map[strin |
|
|
|
}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
mountErrors = append(mountErrors, fmt.Errorf("mount %s shards %v: %w", destNode, shardIds, err)) |
|
|
|
withFields(log, map[string]interface{}{ |
|
|
|
"destination": destNode, |
|
|
|
"shard_ids": shardIds, |
|
|
|
@ -230,6 +250,9 @@ func MountEcShards(volumeID uint32, collection string, shardAssignment map[strin |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if len(mountErrors) > 0 { |
|
|
|
return errors.Join(mountErrors...) |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
|