|
|
@ -598,25 +598,16 @@ func (t *EcVacuumTask) distributeNewEcShards() error { |
|
|
|
|
|
|
|
|
targetBaseFileName := filepath.Join(t.tempDir, fmt.Sprintf("%s_%d_g%d", t.collection, t.volumeID, t.targetGeneration)) |
|
|
targetBaseFileName := filepath.Join(t.tempDir, fmt.Sprintf("%s_%d_g%d", t.collection, t.volumeID, t.targetGeneration)) |
|
|
|
|
|
|
|
|
// Step 1: Find best server for shared index files (.vif, .ecj, .ecx)
|
|
|
|
|
|
var indexServer pb.ServerAddress |
|
|
|
|
|
for serverAddr := range t.sourceNodes { |
|
|
|
|
|
// Use the first server as index server
|
|
|
|
|
|
// Future enhancement: Query volume server capabilities to find servers with dedicated index folders
|
|
|
|
|
|
// This could be done via a new gRPC call or by checking server configuration
|
|
|
|
|
|
indexServer = serverAddr |
|
|
|
|
|
break |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Step 2: Distribute index files (.vif, .ecj, .ecx) to index server only (shared files)
|
|
|
|
|
|
if indexServer != "" { |
|
|
|
|
|
err := t.distributeIndexFiles(indexServer, targetBaseFileName) |
|
|
|
|
|
|
|
|
// Step 1: Distribute index files (.vif, .ecj, .ecx) to all volume servers that will have shards
|
|
|
|
|
|
// Each volume server needs its own copy of index files for mounting
|
|
|
|
|
|
for targetNode := range t.sourceNodes { |
|
|
|
|
|
err := t.distributeIndexFiles(targetNode, targetBaseFileName) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return fmt.Errorf("failed to distribute index files to %s: %w", indexServer, err) |
|
|
|
|
|
|
|
|
return fmt.Errorf("failed to distribute index files to %s: %w", targetNode, err) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Step 3: Distribute shard files (.ec00-.ec13) to appropriate volume servers
|
|
|
|
|
|
|
|
|
// Step 2: Distribute shard files (.ec00-.ec13) to appropriate volume servers
|
|
|
for targetNode, originalShardBits := range t.sourceNodes { |
|
|
for targetNode, originalShardBits := range t.sourceNodes { |
|
|
if originalShardBits.ShardIdCount() == 0 { |
|
|
if originalShardBits.ShardIdCount() == 0 { |
|
|
continue |
|
|
continue |
|
|
@ -634,7 +625,7 @@ func (t *EcVacuumTask) distributeNewEcShards() error { |
|
|
return fmt.Errorf("failed to distribute shards to %s: %w", targetNode, err) |
|
|
return fmt.Errorf("failed to distribute shards to %s: %w", targetNode, err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Step 4: Mount the new shards on the target volume server
|
|
|
|
|
|
|
|
|
// Step 3: Mount the new shards on the target volume server
|
|
|
err = operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { |
|
|
err = operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { |
|
|
_, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ |
|
|
_, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ |
|
|
VolumeId: t.volumeID, |
|
|
VolumeId: t.volumeID, |
|
|
@ -656,7 +647,6 @@ func (t *EcVacuumTask) distributeNewEcShards() error { |
|
|
t.LogInfo("Successfully distributed all new EC shards", map[string]interface{}{ |
|
|
t.LogInfo("Successfully distributed all new EC shards", map[string]interface{}{ |
|
|
"volume_id": t.volumeID, |
|
|
"volume_id": t.volumeID, |
|
|
"target_generation": t.targetGeneration, |
|
|
"target_generation": t.targetGeneration, |
|
|
"index_server": indexServer, |
|
|
|
|
|
"shard_servers": len(t.sourceNodes), |
|
|
"shard_servers": len(t.sourceNodes), |
|
|
}) |
|
|
}) |
|
|
|
|
|
|
|
|
|