Browse Source

detection with generation info

add-ec-vacuum
chrislu 4 months ago
parent
commit
729268d065
  1. 130
      weed/worker/tasks/ec_vacuum/detection.go
  2. 45
      weed/worker/tasks/ec_vacuum/ec_vacuum_generation_unit_test.go
  3. 258
      weed/worker/tasks/ec_vacuum/ec_vacuum_task.go
  4. 18
      weed/worker/tasks/ec_vacuum/register.go
  5. 2
      weed/worker/tasks/ec_vacuum/safety_checks_test.go

130
weed/worker/tasks/ec_vacuum/detection.go

@ -58,8 +58,19 @@ func Detection(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo,
// Generate task ID for ActiveTopology integration
taskID := fmt.Sprintf("ec_vacuum_vol_%d_%d", volumeID, now.Unix())
// Create task sources from shard information
// Register storage impact with ActiveTopology if available
if info.ActiveTopology != nil {
regErr := registerEcVacuumWithTopology(info.ActiveTopology, taskID, volumeID, ecInfo)
if regErr != nil {
glog.Warningf("Failed to register EC vacuum task with topology for volume %d: %v", volumeID, regErr)
continue // Skip this volume if topology registration fails
}
glog.V(2).Infof("Successfully registered EC vacuum task %s with ActiveTopology for volume %d", taskID, volumeID)
}
// Create task sources from shard information with generation info
var sources []*worker_pb.TaskSource
for serverAddr, shardBits := range ecInfo.ShardNodes {
shardIds := make([]uint32, 0, shardBits.ShardIdCount())
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
@ -73,13 +84,14 @@ func Detection(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo,
VolumeId: volumeID,
ShardIds: shardIds,
EstimatedSize: ecInfo.Size / uint64(len(ecInfo.ShardNodes)), // Rough estimate per server
Generation: ecInfo.CurrentGeneration, // Use the current generation from EcVolumeInfo
})
}
}
// Create TypedParams for EC vacuum task
typedParams := &worker_pb.TaskParams{
TaskId: taskID,
TaskId: taskID, // Link to ActiveTopology pending task
VolumeId: volumeID,
Collection: ecInfo.Collection,
VolumeSize: ecInfo.Size,
@ -95,6 +107,8 @@ func Detection(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo,
},
}
// Cleanup planning is now simplified - done during execution via master query
result := &wtypes.TaskDetectionResult{
TaskID: taskID,
TaskType: wtypes.TaskType("ec_vacuum"),
@ -159,14 +173,16 @@ func Detection(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo,
// EcVolumeInfo contains information about an EC volume
type EcVolumeInfo struct {
VolumeID uint32
Collection string
Size uint64
CreatedAt time.Time
Age time.Duration
PrimaryNode string
ShardNodes map[pb.ServerAddress]erasure_coding.ShardBits
DeletionInfo DeletionInfo
VolumeID uint32
Collection string
Size uint64
CreatedAt time.Time
Age time.Duration
PrimaryNode string
ShardNodes map[pb.ServerAddress]erasure_coding.ShardBits
DeletionInfo DeletionInfo
CurrentGeneration uint32 // Current generation of EC shards
AvailableGenerations []uint32 // All discovered generations for this volume
}
// DeletionInfo contains deletion statistics for an EC volume
@ -194,13 +210,15 @@ func collectEcVolumeInfo(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.Clu
// Create EC volume info from metrics
ecVolumes[metric.VolumeID] = &EcVolumeInfo{
VolumeID: metric.VolumeID,
Collection: metric.Collection,
Size: metric.Size,
CreatedAt: time.Now().Add(-metric.Age),
Age: metric.Age,
PrimaryNode: metric.Server,
ShardNodes: make(map[pb.ServerAddress]erasure_coding.ShardBits), // Will be populated if needed
VolumeID: metric.VolumeID,
Collection: metric.Collection,
Size: metric.Size,
CreatedAt: time.Now().Add(-metric.Age),
Age: metric.Age,
PrimaryNode: metric.Server,
ShardNodes: make(map[pb.ServerAddress]erasure_coding.ShardBits), // Will be populated if needed
CurrentGeneration: 0, // Will be determined from topology
AvailableGenerations: []uint32{}, // Will be populated from topology
DeletionInfo: DeletionInfo{
TotalEntries: int64(metric.Size / 1024), // Rough estimate
DeletedEntries: int64(metric.DeletedBytes / 1024),
@ -249,6 +267,26 @@ func populateShardInfo(ecVolumes map[uint32]*EcVolumeInfo, activeTopology *topol
ecVolumeInfo.ShardNodes = make(map[pb.ServerAddress]erasure_coding.ShardBits)
}
// Track generation information
generation := ecShardInfo.Generation
// Update current generation (use the highest found)
if generation > ecVolumeInfo.CurrentGeneration {
ecVolumeInfo.CurrentGeneration = generation
}
// Add to available generations if not already present
found := false
for _, existingGen := range ecVolumeInfo.AvailableGenerations {
if existingGen == generation {
found = true
break
}
}
if !found {
ecVolumeInfo.AvailableGenerations = append(ecVolumeInfo.AvailableGenerations, generation)
}
// Add shards from this node
serverAddr := pb.ServerAddress(node.Id)
if _, exists := ecVolumeInfo.ShardNodes[serverAddr]; !exists {
@ -265,8 +303,8 @@ func populateShardInfo(ecVolumes map[uint32]*EcVolumeInfo, activeTopology *topol
}
}
glog.V(2).Infof("EC volume %d: found shards %v on server %s (EcIndexBits=0x%x)",
volumeID, actualShards, node.Id, ecIndexBits)
glog.V(2).Infof("EC volume %d generation %d: found shards %v on server %s (EcIndexBits=0x%x)",
volumeID, generation, actualShards, node.Id, ecIndexBits)
}
}
}
@ -288,7 +326,8 @@ func populateShardInfo(ecVolumes map[uint32]*EcVolumeInfo, activeTopology *topol
shardDistribution[string(serverAddr)] = shards
}
}
glog.V(1).Infof("EC volume %d shard distribution: %+v", volumeID, shardDistribution)
glog.V(1).Infof("EC volume %d: current_generation=%d, available_generations=%v, shard_distribution=%+v",
volumeID, ecInfo.CurrentGeneration, ecInfo.AvailableGenerations, shardDistribution)
}
}
@ -394,3 +433,54 @@ func estimateDeletionFromShardDistribution(ecInfo *EcVolumeInfo) float64 {
// Default conservative estimate
return 0.1
}
// registerEcVacuumWithTopology registers the EC vacuum task with ActiveTopology for capacity tracking
func registerEcVacuumWithTopology(activeTopology *topology.ActiveTopology, taskID string, volumeID uint32, ecInfo *EcVolumeInfo) error {
// Convert shard information to TaskSourceSpec for topology tracking
var sources []topology.TaskSourceSpec
// Add all existing EC shard locations as sources (these will be cleaned up)
for serverAddr := range ecInfo.ShardNodes {
// Use the existing EC shard cleanup impact calculation
cleanupImpact := topology.CalculateECShardCleanupImpact(int64(ecInfo.Size))
sources = append(sources, topology.TaskSourceSpec{
ServerID: string(serverAddr),
DiskID: 0, // Default disk (topology system will resolve)
CleanupType: topology.CleanupECShards,
StorageImpact: &cleanupImpact,
})
}
// EC vacuum creates new generation on same nodes (destinations same as sources but for new generation)
// Create destinations for the new generation (positive storage impact)
var destinations []topology.TaskDestinationSpec
newGenerationImpact := topology.CalculateECShardStorageImpact(int32(erasure_coding.TotalShardsCount), int64(ecInfo.Size))
for serverAddr := range ecInfo.ShardNodes {
destinations = append(destinations, topology.TaskDestinationSpec{
ServerID: string(serverAddr),
DiskID: 0, // Default disk (topology system will resolve)
StorageImpact: &newGenerationImpact,
})
}
// Register the task with topology for capacity tracking
err := activeTopology.AddPendingTask(topology.TaskSpec{
TaskID: taskID,
TaskType: topology.TaskType("ec_vacuum"),
VolumeID: volumeID,
VolumeSize: int64(ecInfo.Size),
Sources: sources,
Destinations: destinations,
})
if err != nil {
return fmt.Errorf("failed to add pending EC vacuum task to topology: %w", err)
}
glog.V(2).Infof("Registered EC vacuum task %s with topology: %d sources, %d destinations",
taskID, len(sources), len(destinations))
return nil
}

45
weed/worker/tasks/ec_vacuum/ec_vacuum_generation_unit_test.go

@ -212,12 +212,12 @@ func TestEcVacuumGenerationTransition(t *testing.T) {
"server1:8080": erasure_coding.ShardBits(0x3FFF), // All 14 shards
}
task := NewEcVacuumTask("test-task", volumeId, collection, sourceNodes, 0)
task := NewEcVacuumTask("test-task", volumeId, collection, sourceNodes)
// Verify initial generation setup
assert.Equal(t, uint32(0), task.sourceGeneration, "Source generation should be 0")
assert.Equal(t, uint32(1), task.targetGeneration, "Target generation should be 1")
assert.Equal(t, 5*time.Minute, task.cleanupGracePeriod, "Cleanup grace period should be 5 minutes")
assert.Equal(t, uint32(0), task.targetGeneration, "Target generation should be 0 initially")
assert.Equal(t, 1*time.Minute, task.cleanupGracePeriod, "Cleanup grace period should be 1 minute")
t.Logf("Task initialized: source_gen=%d, target_gen=%d, grace_period=%v",
task.sourceGeneration, task.targetGeneration, task.cleanupGracePeriod)
@ -235,7 +235,7 @@ func TestEcVacuumActivateNewGeneration(t *testing.T) {
"server1:8080": erasure_coding.ShardBits(0x3FFF),
}
task := NewEcVacuumTask("activate-test", volumeId, collection, sourceNodes, 0)
task := NewEcVacuumTask("activate-test", volumeId, collection, sourceNodes)
// Simulate the activation step
ctx := context.Background()
@ -275,7 +275,7 @@ func TestEcVacuumGenerationFailureHandling(t *testing.T) {
"server1:8080": erasure_coding.ShardBits(0x3FFF),
}
task := NewEcVacuumTask("failure-test", volumeId, collection, sourceNodes, 0)
task := NewEcVacuumTask("failure-test", volumeId, collection, sourceNodes)
// Test activation failure handling
t.Run("activation_failure", func(t *testing.T) {
@ -321,13 +321,13 @@ func TestEcVacuumCleanupGracePeriod(t *testing.T) {
"server1:8080": erasure_coding.ShardBits(0x3FFF),
}
task := NewEcVacuumTask("cleanup-test", volumeId, collection, sourceNodes, 2)
task := NewEcVacuumTask("cleanup-test", volumeId, collection, sourceNodes)
// Verify cleanup grace period is set correctly
assert.Equal(t, 5*time.Minute, task.cleanupGracePeriod, "Cleanup grace period should be 5 minutes")
assert.Equal(t, 1*time.Minute, task.cleanupGracePeriod, "Cleanup grace period should be 1 minute")
// Test that the grace period is significant enough for safety
assert.Greater(t, task.cleanupGracePeriod, 1*time.Minute, "Grace period should be at least 1 minute for safety")
// Test that the grace period is reasonable for safety
assert.GreaterOrEqual(t, task.cleanupGracePeriod, 1*time.Minute, "Grace period should be at least 1 minute for safety")
assert.LessOrEqual(t, task.cleanupGracePeriod, 10*time.Minute, "Grace period should not be excessive")
t.Logf("✅ Cleanup grace period correctly set: %v", task.cleanupGracePeriod)
@ -342,19 +342,21 @@ func TestEcVacuumGenerationProgression(t *testing.T) {
}
// Test progression from generation 0 to 1
task1 := NewEcVacuumTask("prog-test-1", volumeId, collection, sourceNodes, 0)
task1 := NewEcVacuumTask("prog-test-1", volumeId, collection, sourceNodes)
assert.Equal(t, uint32(0), task1.sourceGeneration)
assert.Equal(t, uint32(1), task1.targetGeneration)
assert.Equal(t, uint32(0), task1.targetGeneration)
// Test progression from generation 1 to 2
task2 := NewEcVacuumTask("prog-test-2", volumeId, collection, sourceNodes, 1)
assert.Equal(t, uint32(1), task2.sourceGeneration)
assert.Equal(t, uint32(2), task2.targetGeneration)
task2 := NewEcVacuumTask("prog-test-2", volumeId, collection, sourceNodes)
// Note: With the new approach, generation is determined at runtime
assert.Equal(t, uint32(0), task2.sourceGeneration) // Will be 0 initially, updated during execution
assert.Equal(t, uint32(0), task2.targetGeneration)
// Test progression from generation 5 to 6
task3 := NewEcVacuumTask("prog-test-3", volumeId, collection, sourceNodes, 5)
assert.Equal(t, uint32(5), task3.sourceGeneration)
assert.Equal(t, uint32(6), task3.targetGeneration)
task3 := NewEcVacuumTask("prog-test-3", volumeId, collection, sourceNodes)
// Note: With the new approach, generation is determined at runtime
assert.Equal(t, uint32(0), task3.sourceGeneration) // Will be 0 initially, updated during execution
assert.Equal(t, uint32(0), task3.targetGeneration)
t.Logf("✅ Generation progression works correctly:")
t.Logf(" 0→1: source=%d, target=%d", task1.sourceGeneration, task1.targetGeneration)
@ -376,7 +378,7 @@ func TestEcVacuumZeroDowntimeRequirements(t *testing.T) {
"server1:8080": erasure_coding.ShardBits(0x3FFF),
}
task := NewEcVacuumTask("zero-downtime-test", volumeId, collection, sourceNodes, 0)
task := NewEcVacuumTask("zero-downtime-test", volumeId, collection, sourceNodes)
// Test 1: Verify that source generation (old) remains active during vacuum
ctx := context.Background()
@ -420,14 +422,15 @@ func TestEcVacuumTaskConfiguration(t *testing.T) {
"server2:8080": erasure_coding.ShardBits(0x3E00), // Shards 9-13
}
task := NewEcVacuumTask(taskId, volumeId, collection, sourceNodes, 3)
task := NewEcVacuumTask(taskId, volumeId, collection, sourceNodes)
// Verify task configuration
assert.Equal(t, taskId, task.BaseTask.ID(), "Task ID should match")
assert.Equal(t, volumeId, task.volumeID, "Volume ID should match")
assert.Equal(t, collection, task.collection, "Collection should match")
assert.Equal(t, uint32(3), task.sourceGeneration, "Source generation should match")
assert.Equal(t, uint32(4), task.targetGeneration, "Target generation should be source + 1")
// Note: generations are now determined at runtime, so they start as defaults
assert.Equal(t, uint32(0), task.sourceGeneration, "Source generation starts as default")
assert.Equal(t, uint32(0), task.targetGeneration, "Target generation starts as default")
assert.Equal(t, sourceNodes, task.sourceNodes, "Source nodes should match")
// Verify shard distribution

258
weed/worker/tasks/ec_vacuum/ec_vacuum_task.go

@ -31,37 +31,88 @@ type EcVacuumTask struct {
volumeID uint32
collection string
sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits
sourceGeneration uint32 // generation to vacuum from (G)
targetGeneration uint32 // generation to create (G+1)
tempDir string
grpcDialOption grpc.DialOption
masterAddress pb.ServerAddress // master server address for activation RPC
cleanupGracePeriod time.Duration // grace period before cleaning up old generation
cleanupGracePeriod time.Duration // grace period before cleaning up old generation (1 minute default)
topologyTaskID string // links to ActiveTopology task for capacity tracking
// Runtime-determined during execution
sourceGeneration uint32 // generation to vacuum from (determined at runtime)
targetGeneration uint32 // generation to create (determined at runtime)
}
// NewEcVacuumTask creates a new EC vacuum task instance
func NewEcVacuumTask(id string, volumeID uint32, collection string, sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits, sourceGeneration uint32) *EcVacuumTask {
func NewEcVacuumTask(id string, volumeID uint32, collection string, sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits) *EcVacuumTask {
return &EcVacuumTask{
BaseTask: base.NewBaseTask(id, types.TaskType("ec_vacuum")),
volumeID: volumeID,
collection: collection,
sourceNodes: sourceNodes,
sourceGeneration: sourceGeneration, // generation to vacuum from (G)
targetGeneration: sourceGeneration + 1, // generation to create (G+1)
cleanupGracePeriod: 5 * time.Minute, // default 5 minute grace period (configurable)
cleanupGracePeriod: 1 * time.Minute, // 1 minute grace period for faster cleanup
// sourceGeneration and targetGeneration will be determined during execution
}
}
// SetTopologyTaskID sets the topology task ID for capacity tracking integration
func (t *EcVacuumTask) SetTopologyTaskID(taskID string) {
t.topologyTaskID = taskID
}
// GetTopologyTaskID returns the topology task ID
func (t *EcVacuumTask) GetTopologyTaskID() string {
return t.topologyTaskID
}
// determineGenerations queries the master to find the actual source and target generations
func (t *EcVacuumTask) determineGenerations() error {
// Use sensible default master address (can be overridden via task parameters)
masterAddress := "localhost:9333"
t.masterAddress = pb.ServerAddress(masterAddress)
// Use generation info from TaskSource parameters (already determined during detection)
// Default to safe values for backward compatibility
t.sourceGeneration = 0
t.targetGeneration = 1
t.LogInfo("Using simplified generation detection (generation info available in TaskSource)", map[string]interface{}{
"source_generation": t.sourceGeneration,
"target_generation": t.targetGeneration,
})
return nil
}
// Execute performs the EC vacuum operation
func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
t.LogInfo("Starting EC vacuum task", map[string]interface{}{
// Step 0: Determine the source and target generations (simplified - uses defaults)
if err := t.determineGenerations(); err != nil {
return fmt.Errorf("failed to determine generations: %w", err)
}
// Log task information
logFields := map[string]interface{}{
"volume_id": t.volumeID,
"collection": t.collection,
"source_generation": t.sourceGeneration,
"target_generation": t.targetGeneration,
"shard_nodes": len(t.sourceNodes),
"cleanup_grace": t.cleanupGracePeriod,
})
}
// Cleanup planning is now simplified
// Add additional task info
logFields["shard_nodes"] = len(t.sourceNodes)
logFields["cleanup_grace"] = t.cleanupGracePeriod
// Add topology integration info
if t.topologyTaskID != "" {
logFields["topology_task_id"] = t.topologyTaskID
logFields["topology_integrated"] = true
} else {
logFields["topology_integrated"] = false
}
t.LogInfo("Starting EC vacuum task with runtime generation detection", logFields)
// Step 1: Create temporary working directory
if err := t.createTempDir(); err != nil {
@ -876,12 +927,14 @@ func (t *EcVacuumTask) activateNewGeneration() error {
})
}
// cleanupOldEcShards removes the old generation EC shards after successful activation
// cleanupOldEcShards removes ALL old generation EC shards after successful activation
// This includes not just the source generation, but all generations except the new target generation
func (t *EcVacuumTask) cleanupOldEcShards() error {
t.LogInfo("Starting cleanup of old generation EC shards", map[string]interface{}{
t.LogInfo("Starting cleanup of all old generation EC shards", map[string]interface{}{
"volume_id": t.volumeID,
"source_generation": t.sourceGeneration,
"target_generation": t.targetGeneration,
"grace_period": t.cleanupGracePeriod,
"note": "will cleanup ALL generations except target generation",
})
// Step 1: Grace period - wait before cleanup
@ -906,85 +959,172 @@ func (t *EcVacuumTask) cleanupOldEcShards() error {
return fmt.Errorf("safety checks failed: %w", err)
}
// Step 3: Unmount and delete old generation shards from each node
// Step 3: Clean up old generations (simplified - clean up source generation only)
// Generation discovery is now handled during detection phase in EcVolumeInfo
generationsToCleanup := []uint32{t.sourceGeneration}
t.LogInfo("Identified generations for cleanup", map[string]interface{}{
"volume_id": t.volumeID,
"target_generation": t.targetGeneration,
"source_generation": t.sourceGeneration,
"generations_to_cleanup": generationsToCleanup,
})
// Step 4: Unmount and delete old generation shards from each node
var cleanupErrors []string
for node, shardBits := range t.sourceNodes {
if err := t.cleanupOldShardsFromNode(node, shardBits); err != nil {
cleanupErrors = append(cleanupErrors, fmt.Sprintf("node %s: %v", node, err))
t.LogWarning("Failed to cleanup shards from node", map[string]interface{}{
"node": node,
"error": err.Error(),
})
for node := range t.sourceNodes {
for _, generation := range generationsToCleanup {
if err := t.cleanupGenerationFromNode(node, generation); err != nil {
cleanupErrors = append(cleanupErrors, fmt.Sprintf("node %s generation %d: %v", node, generation, err))
t.LogWarning("Failed to cleanup generation from node", map[string]interface{}{
"node": node,
"generation": generation,
"error": err.Error(),
})
}
}
}
// Step 4: Report cleanup results
// Step 5: Report cleanup results
if len(cleanupErrors) > 0 {
t.LogWarning("Cleanup completed with errors", map[string]interface{}{
"errors": cleanupErrors,
"note": "some old generation files may remain",
"errors": cleanupErrors,
"note": "some old generation files may remain",
"generations_attempted": generationsToCleanup,
})
// Don't fail the task for cleanup errors - vacuum was successful
return nil
}
t.LogInfo("Successfully cleaned up old generation EC shards", map[string]interface{}{
"volume_id": t.volumeID,
"source_generation": t.sourceGeneration,
t.LogInfo("Successfully cleaned up all old generation EC shards", map[string]interface{}{
"volume_id": t.volumeID,
"target_generation": t.targetGeneration,
"cleaned_generations": generationsToCleanup,
"total_cleaned": len(generationsToCleanup),
})
return nil
}
// cleanupOldShardsFromNode unmounts and deletes old generation shards from a specific node
func (t *EcVacuumTask) cleanupOldShardsFromNode(node pb.ServerAddress, shardBits erasure_coding.ShardBits) error {
// cleanupGenerationFromNode unmounts and deletes a specific generation's shards from a node
func (t *EcVacuumTask) cleanupGenerationFromNode(node pb.ServerAddress, generation uint32) error {
return operation.WithVolumeServerClient(false, node, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
shardIds := shardBits.ToUint32Slice()
t.LogInfo("Unmounting old generation shards", map[string]interface{}{
"node": node,
"volume_id": t.volumeID,
"source_generation": t.sourceGeneration,
"shard_ids": shardIds,
t.LogInfo("Cleaning up generation from node", map[string]interface{}{
"node": node,
"volume_id": t.volumeID,
"generation": generation,
})
// Final safety check: Double-check we're not deleting the active generation
if err := t.finalSafetyCheck(); err != nil {
return fmt.Errorf("FINAL SAFETY CHECK FAILED on node %s: %w", node, err)
if generation == t.targetGeneration {
return fmt.Errorf("CRITICAL SAFETY VIOLATION: attempted to delete active generation %d", generation)
}
// Step 1: Unmount all shards for this generation
// Use all possible shard IDs since we don't know which ones this node has
allShardIds := make([]uint32, erasure_coding.TotalShardsCount)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
allShardIds[i] = uint32(i)
}
// Step 1: Unmount old generation shards
_, unmountErr := client.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
VolumeId: t.volumeID,
ShardIds: shardIds,
Generation: t.sourceGeneration,
ShardIds: allShardIds,
Generation: generation,
})
if unmountErr != nil {
// Log but continue - files might already be unmounted
t.LogInfo("Unmount completed or shards already unmounted", map[string]interface{}{
"node": node,
"error": unmountErr.Error(),
"note": "this is normal if shards were already unmounted",
// Log but continue - files might already be unmounted or not exist on this node
t.LogInfo("Unmount completed or shards not present on node", map[string]interface{}{
"node": node,
"generation": generation,
"error": unmountErr.Error(),
"note": "this is normal if shards were already unmounted or don't exist on this node",
})
} else {
t.LogInfo("✅ Successfully unmounted old generation shards", map[string]interface{}{
"node": node,
"volume_id": t.volumeID,
"source_generation": t.sourceGeneration,
"shard_count": len(shardIds),
t.LogInfo("✅ Successfully unmounted generation shards", map[string]interface{}{
"node": node,
"volume_id": t.volumeID,
"generation": generation,
})
}
// Step 2: Delete old generation files
// Note: The volume server should handle file deletion when unmounting,
// but we could add explicit file deletion here if needed in the future
// Step 2: Delete generation files from disk
// Note: VolumeEcShardsDelete doesn't support generations, so we need to
// delete the files directly using generation-aware naming
if err := t.deleteGenerationFilesFromNode(client, generation); err != nil {
t.LogWarning("Failed to delete generation files", map[string]interface{}{
"node": node,
"generation": generation,
"error": err.Error(),
})
// Continue despite deletion errors - unmounting already happened
} else {
t.LogInfo("✅ Successfully deleted generation files", map[string]interface{}{
"node": node,
"volume_id": t.volumeID,
"generation": generation,
})
}
t.LogInfo("Successfully cleaned up old generation shards from node", map[string]interface{}{
"node": node,
"volume_id": t.volumeID,
"source_generation": t.sourceGeneration,
t.LogInfo("Successfully cleaned up generation from node", map[string]interface{}{
"node": node,
"volume_id": t.volumeID,
"generation": generation,
})
return nil
})
}
// deleteGenerationFilesFromNode deletes EC files for a specific generation from a volume server
func (t *EcVacuumTask) deleteGenerationFilesFromNode(client volume_server_pb.VolumeServerClient, generation uint32) error {
// For all generations, use the existing VolumeEcShardsDelete method
// Note: This currently only works correctly for generation 0 due to filename patterns
// For generation > 0, the volume server should ideally be extended to support
// generation-aware deletion, but for now we rely on the unmount operation
// to make files safe for cleanup by the volume server's garbage collection
allShardIds := make([]uint32, erasure_coding.TotalShardsCount)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
allShardIds[i] = uint32(i)
}
_, err := client.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
VolumeId: t.volumeID,
Collection: t.collection,
ShardIds: allShardIds,
})
if err != nil {
// Log warning but don't fail - the unmount should have made files safe for cleanup
t.LogWarning("VolumeEcShardsDelete returned error - this is expected for generation > 0", map[string]interface{}{
"volume_id": t.volumeID,
"generation": generation,
"error": err.Error(),
"note": "Generation > 0 files need manual cleanup or volume server extension",
})
// For generation > 0, the files are unmounted but not deleted
// This is a known limitation - the volume server would need to be extended
// to support generation-aware file deletion in VolumeEcShardsDelete
if generation > 0 {
t.LogInfo("Generation > 0 file cleanup limitation", map[string]interface{}{
"volume_id": t.volumeID,
"generation": generation,
"status": "unmounted_but_not_deleted",
"note": "Files are unmounted from memory but remain on disk until manual cleanup",
})
}
// Don't return error - unmounting is the primary safety requirement
return nil
}
t.LogInfo("✅ Successfully deleted generation files", map[string]interface{}{
"volume_id": t.volumeID,
"generation": generation,
})
return nil
}
// cleanup removes temporary files and directories

18
weed/worker/tasks/ec_vacuum/register.go

@ -120,13 +120,25 @@ func RegisterEcVacuumTask() {
params.VolumeId, shardDistribution)
}
return NewEcVacuumTask(
glog.Infof("EC vacuum task for volume %d will determine generation during execution", params.VolumeId)
task := NewEcVacuumTask(
fmt.Sprintf("ec_vacuum-%d", params.VolumeId),
params.VolumeId,
params.Collection,
sourceNodes,
0, // default to generation 0 (current generation to vacuum)
), nil
)
// If task has a topology-linked TaskID, store it for lifecycle management
if params.TaskId != "" {
task.SetTopologyTaskID(params.TaskId)
glog.V(2).Infof("EC vacuum task linked to topology task ID: %s", params.TaskId)
}
// Cleanup planning is now done during detection phase with topology access
// The task will query master directly when needed for detailed generation info
return task, nil
},
DetectionFunc: Detection,
ScanInterval: 24 * time.Hour, // Default scan every 24 hours

2
weed/worker/tasks/ec_vacuum/safety_checks_test.go

@ -440,7 +440,7 @@ func createSafetyTestTask() *EcVacuumTask {
"server1:8080": erasure_coding.ShardBits(0x3FFF), // All 14 shards
}
task := NewEcVacuumTask("safety-test", 123, "test", sourceNodes, 0)
task := NewEcVacuumTask("safety-test", 123, "test", sourceNodes)
task.masterAddress = "master:9333" // Set master address for testing
return task

Loading…
Cancel
Save