diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go index fe4442393..efcf3e987 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go @@ -9,6 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" @@ -27,6 +28,7 @@ type EcVacuumTask struct { targetGeneration uint32 // generation to create (G+1) tempDir string grpcDialOption grpc.DialOption + masterAddress pb.ServerAddress // master server address for activation RPC } // NewEcVacuumTask creates a new EC vacuum task instance @@ -78,7 +80,12 @@ func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams return fmt.Errorf("failed to distribute new EC shards: %w", err) } - // Step 6: Clean up old EC shards + // Step 6: Activate new generation (atomic switch from G to G+1) + if err := t.activateNewGeneration(); err != nil { + return fmt.Errorf("failed to activate new generation: %w", err) + } + + // Step 7: Clean up old EC shards if err := t.cleanupOldEcShards(); err != nil { t.LogWarning("Failed to clean up old EC shards", map[string]interface{}{ "error": err.Error(), @@ -291,6 +298,42 @@ func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error return nil } +// activateNewGeneration atomically switches the master to use the new generation +func (t *EcVacuumTask) activateNewGeneration() error { + t.LogInfo("Activating new generation", map[string]interface{}{ + "volume_id": t.volumeID, + "source_generation": t.sourceGeneration, + "target_generation": t.targetGeneration, + "master_address": t.masterAddress, + }) + + if t.masterAddress == "" { + t.LogWarning("Master address not set - skipping automatic generation activation", map[string]interface{}{ + "volume_id": t.volumeID, + "target_generation": t.targetGeneration, + "note": "Generation activation must be done manually via master API", + }) + return nil + } + + return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error { + _, err := client.ActivateEcGeneration(context.Background(), &master_pb.ActivateEcGenerationRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + Generation: t.targetGeneration, + }) + if err != nil { + return fmt.Errorf("failed to activate generation %d for volume %d: %w", t.targetGeneration, t.volumeID, err) + } + + t.LogInfo("Successfully activated new generation", map[string]interface{}{ + "volume_id": t.volumeID, + "active_generation": t.targetGeneration, + }) + return nil + }) +} + // cleanupOldEcShards removes the original volume after successful vacuum func (t *EcVacuumTask) cleanupOldEcShards() error { t.LogInfo("Cleaning up original volume", map[string]interface{}{ @@ -353,3 +396,8 @@ func (t *EcVacuumTask) GetCollection() string { func (t *EcVacuumTask) SetGrpcDialOption(option grpc.DialOption) { t.grpcDialOption = option } + +// SetMasterAddress sets the master server address for generation activation +func (t *EcVacuumTask) SetMasterAddress(address pb.ServerAddress) { + t.masterAddress = address +}