diff --git a/weed/admin/task/enhanced_ec_integration_test.go b/weed/admin/task/ec_integration_test.go similarity index 71% rename from weed/admin/task/enhanced_ec_integration_test.go rename to weed/admin/task/ec_integration_test.go index 37132d858..d614495c0 100644 --- a/weed/admin/task/enhanced_ec_integration_test.go +++ b/weed/admin/task/ec_integration_test.go @@ -10,9 +10,9 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" ) -// TestEnhancedECIntegration tests the enhanced EC implementation with the admin server -func TestEnhancedECIntegration(t *testing.T) { - t.Logf("Starting enhanced EC integration test") +// TestECIntegration tests the EC implementation with the admin server +func TestECIntegration(t *testing.T) { + t.Logf("Starting EC integration test") // Step 1: Create admin server config := &MinimalAdminConfig{ @@ -51,7 +51,7 @@ func TestEnhancedECIntegration(t *testing.T) { // Step 3: Create an EC task ecTask := &types.Task{ - ID: "enhanced-ec-task-1", + ID: "ec-task-1", Type: types.TaskTypeErasureCoding, VolumeID: 12345, Server: "localhost:8080", @@ -70,7 +70,7 @@ func TestEnhancedECIntegration(t *testing.T) { if err != nil { t.Fatalf("Failed to queue EC task: %v", err) } - t.Logf("Successfully queued enhanced EC task %s for volume %d", ecTask.ID, ecTask.VolumeID) + t.Logf("Successfully queued EC task %s for volume %d", ecTask.ID, ecTask.VolumeID) // Step 4: Worker requests the task assignedTask, err := adminServer.RequestTask("ec-worker-1", []types.TaskType{types.TaskTypeErasureCoding}) @@ -82,8 +82,8 @@ func TestEnhancedECIntegration(t *testing.T) { t.Logf("EC worker got task: %s (%s) for volume %d", assignedTask.ID, assignedTask.Type, assignedTask.VolumeID) - // Step 5: Simulate enhanced EC task execution progress - t.Logf("Simulating enhanced EC task execution phases") + // Step 5: Simulate EC task execution phases + t.Logf("Simulating EC task execution phases") // Phase 1: Copying volume data err = adminServer.UpdateTaskProgress(assignedTask.ID, 15.0) @@ -132,7 +132,7 @@ func TestEnhancedECIntegration(t *testing.T) { if err != nil { t.Errorf("Failed to complete EC task: %v", err) } - t.Logf("Successfully completed enhanced EC task %s", assignedTask.ID) + t.Logf("Successfully completed EC task %s", assignedTask.ID) } else { t.Logf("No EC task was assigned (expected in test environment)") } @@ -151,16 +151,16 @@ func TestEnhancedECIntegration(t *testing.T) { lastEntry.TaskID, lastEntry.TaskType, lastEntry.Duration) if lastEntry.TaskType == types.TaskTypeErasureCoding { - t.Logf("Enhanced EC task completed successfully") + t.Logf("EC task completed successfully") } } - t.Logf("Enhanced EC integration test completed successfully") + t.Logf("EC integration test completed successfully") } -// TestEnhancedECTaskValidation tests the enhanced EC task validation -func TestEnhancedECTaskValidation(t *testing.T) { - t.Logf("Testing enhanced EC task validation") +// TestECTaskValidation tests the EC task validation +func TestECTaskValidation(t *testing.T) { + t.Logf("Testing EC task validation") // Create a temporary work directory workDir := filepath.Join(os.TempDir(), "seaweedfs_ec_test") @@ -170,8 +170,8 @@ func TestEnhancedECTaskValidation(t *testing.T) { } defer os.RemoveAll(workDir) - // Create enhanced EC task - enhancedTask := ec_task.NewEnhancedECTask( + // Create EC task + ecTask := ec_task.NewTaskWithParams( "localhost:8080", // source server 12345, // volume ID "localhost:9333", // master client @@ -188,7 +188,7 @@ func TestEnhancedECTaskValidation(t *testing.T) { }, } - err = enhancedTask.Validate(validParams) + err = ecTask.Validate(validParams) if err != nil { t.Errorf("Valid parameters should pass validation: %v", err) } @@ -199,25 +199,25 @@ func TestEnhancedECTaskValidation(t *testing.T) { Server: "", // Empty server } - err = enhancedTask.Validate(invalidParams) + err = ecTask.Validate(invalidParams) if err == nil { t.Errorf("Invalid parameters should fail validation") } // Test time estimation - estimatedTime := enhancedTask.EstimateTime(validParams) + estimatedTime := ecTask.EstimateTime(validParams) t.Logf("Estimated time for 32GB volume EC: %v", estimatedTime) if estimatedTime < 20*time.Minute { t.Errorf("Expected at least 20 minutes for large volume EC, got %v", estimatedTime) } - t.Logf("Enhanced EC task validation completed successfully") + t.Logf("EC task validation completed successfully") } -// TestEnhancedECFeatures tests specific enhanced EC features -func TestEnhancedECFeatures(t *testing.T) { - t.Logf("Testing enhanced EC features") +// TestECFeatures tests specific EC features +func TestECFeatures(t *testing.T) { + t.Logf("Testing EC features") // Create temporary work directory workDir := filepath.Join(os.TempDir(), "seaweedfs_ec_features_test") @@ -227,7 +227,7 @@ func TestEnhancedECFeatures(t *testing.T) { } defer os.RemoveAll(workDir) - enhancedTask := ec_task.NewEnhancedECTask( + ecTask := ec_task.NewTaskWithParams( "localhost:8080", 54321, "localhost:9333", @@ -237,17 +237,17 @@ func TestEnhancedECFeatures(t *testing.T) { // Test step tracking t.Logf("Testing step tracking functionality") - currentStep := enhancedTask.GetCurrentStep() + currentStep := ecTask.GetCurrentStep() t.Logf("Initial current step: %s", currentStep) - progress := enhancedTask.GetProgress() + progress := ecTask.GetProgress() t.Logf("Initial progress: %.1f%%", progress) // Test parameter extraction params := types.TaskParams{ VolumeID: 54321, Server: "localhost:8080", - Collection: "enhanced_test", + Collection: "features_test", Parameters: map[string]interface{}{ "volume_size": int64(64 * 1024 * 1024 * 1024), // 64GB "data_shards": 10, @@ -256,7 +256,7 @@ func TestEnhancedECFeatures(t *testing.T) { }, } - estimatedTime := enhancedTask.EstimateTime(params) + estimatedTime := ecTask.EstimateTime(params) expectedMinTime := time.Duration(64*2) * time.Minute // 2 minutes per GB t.Logf("64GB volume estimated time: %v (expected minimum: %v)", estimatedTime, expectedMinTime) @@ -265,15 +265,15 @@ func TestEnhancedECFeatures(t *testing.T) { t.Errorf("Time estimate seems too low for 64GB volume") } - t.Logf("Enhanced EC features test completed successfully") + t.Logf("EC features test completed successfully") } -// TestECTaskComparison compares basic vs enhanced EC implementations +// TestECTaskComparison tests EC implementation features func TestECTaskComparison(t *testing.T) { - t.Logf("Comparing basic vs enhanced EC implementations") + t.Logf("Testing EC implementation features") - // Basic EC task estimation - basicParams := types.TaskParams{ + // EC task estimation + params := types.TaskParams{ VolumeID: 11111, Server: "localhost:8080", Parameters: map[string]interface{}{ @@ -281,44 +281,29 @@ func TestECTaskComparison(t *testing.T) { }, } - // Create basic task (existing implementation) - basicTask := ec_task.NewTask("localhost:8080", 11111) - basicTime := basicTask.EstimateTime(basicParams) - - // Create enhanced task + // Create task workDir := filepath.Join(os.TempDir(), "seaweedfs_ec_comparison") defer os.RemoveAll(workDir) - enhancedTask := ec_task.NewEnhancedECTask( + ecTask := ec_task.NewTaskWithParams( "localhost:8080", 22222, "localhost:9333", workDir, ) - enhancedTime := enhancedTask.EstimateTime(basicParams) + estimatedTime := ecTask.EstimateTime(params) - t.Logf("Basic EC task estimated time: %v", basicTime) - t.Logf("Enhanced EC task estimated time: %v", enhancedTime) + t.Logf("EC task estimated time: %v", estimatedTime) - // Enhanced should take longer due to additional processing - if enhancedTime <= basicTime { - t.Logf("Note: Enhanced EC might take longer due to local processing and smart distribution") - } + // Test feature capabilities + t.Logf("EC implementation features:") + t.Logf(" - Local volume data copying with progress tracking") + t.Logf(" - Local Reed-Solomon encoding (10+4 shards)") + t.Logf(" - Intelligent shard placement with rack awareness") + t.Logf(" - Load balancing across available servers") + t.Logf(" - Backup server selection for redundancy") + t.Logf(" - Detailed step-by-step progress tracking") + t.Logf(" - Comprehensive error handling and recovery") - // Test feature differences - t.Logf("Basic EC features:") - t.Logf(" - Direct volume server EC generation") - t.Logf(" - Simple shard mounting") - t.Logf(" - No custom placement logic") - - t.Logf("Enhanced EC features:") - t.Logf(" - Local volume data copying") - t.Logf(" - Local Reed-Solomon encoding") - t.Logf(" - Intelligent shard placement with affinity") - t.Logf(" - Rack diversity for data shards") - t.Logf(" - Load balancing across servers") - t.Logf(" - Backup server selection") - t.Logf(" - Detailed progress tracking") - - t.Logf("EC task comparison completed successfully") + t.Logf("EC implementation test completed successfully") } diff --git a/weed/admin/task/ec_worker_test.go b/weed/admin/task/ec_worker_test.go new file mode 100644 index 000000000..75286c08f --- /dev/null +++ b/weed/admin/task/ec_worker_test.go @@ -0,0 +1,488 @@ +package task + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// TestECWorkerIntegration tests the complete EC worker functionality +func TestECWorkerIntegration(t *testing.T) { + t.Logf("Starting EC worker integration test") + + // Step 1: Create admin server with EC configuration + config := &MinimalAdminConfig{ + ScanInterval: 5 * time.Second, + WorkerTimeout: 60 * time.Second, + TaskTimeout: 45 * time.Minute, // EC takes longer + MaxRetries: 3, + ReconcileInterval: 5 * time.Minute, + EnableFailureRecovery: true, + MaxConcurrentTasks: 1, // One at a time for EC + } + + adminServer := NewMinimalAdminServer(config, nil) + err := adminServer.Start() + if err != nil { + t.Fatalf("Failed to start admin server: %v", err) + } + defer adminServer.Stop() + t.Logf("✓ Admin server started successfully") + + // Step 2: Register EC-capable worker + worker := &types.Worker{ + ID: "ec-worker-1", + Address: "localhost:9001", + Capabilities: []types.TaskType{types.TaskTypeErasureCoding}, + MaxConcurrent: 1, + Status: "active", + CurrentLoad: 0, + LastHeartbeat: time.Now(), + } + + err = adminServer.RegisterWorker(worker) + if err != nil { + t.Fatalf("Failed to register EC worker: %v", err) + } + t.Logf("✓ EC worker registered: %s", worker.ID) + + // Step 3: Create work directory for EC processing + workDir := filepath.Join(os.TempDir(), "seaweedfs_ec_test") + err = os.MkdirAll(workDir, 0755) + if err != nil { + t.Fatalf("Failed to create work directory: %v", err) + } + defer os.RemoveAll(workDir) + t.Logf("✓ Work directory created: %s", workDir) + + // Step 4: Create EC task with comprehensive parameters + ecTask := &types.Task{ + ID: "ec-test-task-1", + Type: types.TaskTypeErasureCoding, + VolumeID: 54321, + Server: "localhost:8080", + Status: types.TaskStatusPending, + Priority: types.TaskPriorityHigh, + Parameters: map[string]interface{}{ + "volume_size": int64(64 * 1024 * 1024 * 1024), // 64GB volume + "master_client": "localhost:9333", + "work_dir": workDir, + "collection": "test", + "data_shards": 10, + "parity_shards": 4, + "rack_aware": true, + "load_balance": true, + }, + CreatedAt: time.Now(), + } + + err = adminServer.QueueTask(ecTask) + if err != nil { + t.Fatalf("Failed to queue EC task: %v", err) + } + t.Logf("✓ EC task queued: %s for volume %d", ecTask.ID, ecTask.VolumeID) + + // Step 5: Worker requests and receives the EC task + assignedTask, err := adminServer.RequestTask("ec-worker-1", []types.TaskType{types.TaskTypeErasureCoding}) + if err != nil { + t.Fatalf("Failed to request EC task: %v", err) + } + + if assignedTask == nil { + t.Fatalf("No EC task was assigned") + } + + t.Logf("✓ EC task assigned: %s (%s) for volume %d", + assignedTask.ID, assignedTask.Type, assignedTask.VolumeID) + + // Step 6: Test EC task creation and validation + t.Logf("Testing EC task creation and validation") + + // Create EC task instance directly + factory := erasure_coding.NewFactory() + taskParams := types.TaskParams{ + VolumeID: assignedTask.VolumeID, + Server: assignedTask.Server, + Collection: "test", + Parameters: assignedTask.Parameters, + } + + taskInstance, err := factory.Create(taskParams) + if err != nil { + t.Fatalf("Failed to create EC task instance: %v", err) + } + t.Logf("✓ EC task instance created successfully") + + // Step 7: Validate task parameters + err = taskInstance.Validate(taskParams) + if err != nil { + t.Errorf("EC task validation failed: %v", err) + } else { + t.Logf("✓ EC task validation passed") + } + + // Step 8: Test time estimation + estimatedTime := taskInstance.EstimateTime(taskParams) + expectedMinTime := time.Duration(64*2) * time.Minute // 2 minutes per GB for 64GB + + t.Logf("✓ EC estimated time: %v (minimum expected: %v)", estimatedTime, expectedMinTime) + + if estimatedTime < expectedMinTime { + t.Logf("⚠ Note: Estimated time seems optimistic for 64GB volume") + } + + // Step 9: Simulate EC task execution phases + t.Logf("Simulating EC execution phases:") + + phases := []struct { + progress float64 + phase string + }{ + {5.0, "Initializing EC processing"}, + {15.0, "Volume data copied to local disk with progress tracking"}, + {25.0, "Source volume marked as read-only"}, + {45.0, "Local Reed-Solomon encoding (10+4 shards) completed"}, + {60.0, "Created 14 EC shards with verification"}, + {70.0, "Optimal shard placement calculated with rack awareness"}, + {85.0, "Intelligent shard distribution with load balancing"}, + {95.0, "Shard placement verified across multiple racks"}, + {100.0, "EC processing completed with cleanup"}, + } + + for _, phase := range phases { + err = adminServer.UpdateTaskProgress(assignedTask.ID, phase.progress) + if err != nil { + t.Errorf("Failed to update task progress to %.1f%%: %v", phase.progress, err) + } else { + t.Logf(" %.1f%% - %s", phase.progress, phase.phase) + } + time.Sleep(50 * time.Millisecond) // Simulate processing time + } + + // Step 10: Complete the EC task + err = adminServer.CompleteTask(assignedTask.ID, true, "") + if err != nil { + t.Errorf("Failed to complete EC task: %v", err) + } else { + t.Logf("✓ EC task completed successfully") + } + + // Step 11: Verify EC task completion and metrics + stats := adminServer.GetSystemStats() + t.Logf("✓ Final stats: Active tasks=%d, Queued tasks=%d, Active workers=%d, Total tasks=%d", + stats.ActiveTasks, stats.QueuedTasks, stats.ActiveWorkers, stats.TotalTasks) + + history := adminServer.GetTaskHistory() + t.Logf("✓ Task history contains %d completed tasks", len(history)) + + if len(history) > 0 { + lastEntry := history[len(history)-1] + t.Logf("✓ Last completed task: %s (%s) - Duration: %v", + lastEntry.TaskID, lastEntry.TaskType, lastEntry.Duration) + + if lastEntry.TaskType == types.TaskTypeErasureCoding { + t.Logf("✅ EC task execution verified!") + } + } + + t.Logf("✅ EC worker integration test completed successfully") +} + +// TestECFeatureValidation tests specific EC features +func TestECFeatureValidation(t *testing.T) { + t.Logf("Testing EC feature validation") + + // Create work directory + workDir := filepath.Join(os.TempDir(), "seaweedfs_ec_features_test") + err := os.MkdirAll(workDir, 0755) + if err != nil { + t.Fatalf("Failed to create work directory: %v", err) + } + defer os.RemoveAll(workDir) + + // Test EC task features + ecTask := erasure_coding.NewTaskWithParams( + "localhost:8080", // source server + 98765, // volume ID + "localhost:9333", // master client + workDir, // work directory + ) + + // Test current step tracking + currentStep := ecTask.GetCurrentStep() + t.Logf("✓ Initial current step: '%s'", currentStep) + + initialProgress := ecTask.GetProgress() + t.Logf("✓ Initial progress: %.1f%%", initialProgress) + + // Test parameter validation with features + validParams := types.TaskParams{ + VolumeID: 98765, + Server: "localhost:8080", + Collection: "features_test", + Parameters: map[string]interface{}{ + "volume_size": int64(128 * 1024 * 1024 * 1024), // 128GB + "master_client": "localhost:9333", + "work_dir": workDir, + "data_shards": 10, + "parity_shards": 4, + "rack_awareness": true, + "load_balancing": true, + "backup_servers": 2, + "affinity_zones": []string{"zone-a", "zone-b", "zone-c"}, + }, + } + + err = ecTask.Validate(validParams) + if err != nil { + t.Errorf("Valid parameters should pass validation: %v", err) + } else { + t.Logf("✓ Parameter validation passed") + } + + // Test time estimation for large volume + estimatedTime := ecTask.EstimateTime(validParams) + expectedMinTime := time.Duration(128*2) * time.Minute // 2 minutes per GB + + t.Logf("✓ 128GB volume estimated time: %v (expected minimum: %v)", estimatedTime, expectedMinTime) + + if estimatedTime < expectedMinTime { + t.Errorf("Time estimate seems too low for 128GB volume") + } + + // Test invalid parameters + invalidParams := types.TaskParams{ + VolumeID: 0, // Invalid + Server: "", // Invalid + } + + err = ecTask.Validate(invalidParams) + if err == nil { + t.Errorf("Invalid parameters should fail validation") + } else { + t.Logf("✓ Invalid parameter validation correctly failed: %v", err) + } + + t.Logf("✅ EC feature validation completed successfully") +} + +// TestECWorkflow tests the complete EC workflow +func TestECWorkflow(t *testing.T) { + t.Logf("Testing complete EC workflow") + + // Create admin server + config := &MinimalAdminConfig{ + ScanInterval: 10 * time.Second, + WorkerTimeout: 30 * time.Second, + TaskTimeout: 60 * time.Minute, + MaxRetries: 3, + ReconcileInterval: 5 * time.Minute, + EnableFailureRecovery: true, + MaxConcurrentTasks: 1, + } + + adminServer := NewMinimalAdminServer(config, nil) + err := adminServer.Start() + if err != nil { + t.Fatalf("Failed to start admin server: %v", err) + } + defer adminServer.Stop() + + // Register multiple workers with different capabilities + workers := []*types.Worker{ + { + ID: "ec-specialist-1", + Address: "localhost:9001", + Capabilities: []types.TaskType{types.TaskTypeErasureCoding}, + MaxConcurrent: 1, + Status: "active", + CurrentLoad: 0, + LastHeartbeat: time.Now(), + }, + { + ID: "vacuum-worker-1", + Address: "localhost:9002", + Capabilities: []types.TaskType{types.TaskTypeVacuum}, + MaxConcurrent: 2, + Status: "active", + CurrentLoad: 0, + LastHeartbeat: time.Now(), + }, + { + ID: "multi-capability-worker-1", + Address: "localhost:9003", + Capabilities: []types.TaskType{types.TaskTypeVacuum, types.TaskTypeErasureCoding}, + MaxConcurrent: 2, + Status: "active", + CurrentLoad: 0, + LastHeartbeat: time.Now(), + }, + } + + for _, worker := range workers { + err = adminServer.RegisterWorker(worker) + if err != nil { + t.Fatalf("Failed to register worker %s: %v", worker.ID, err) + } + t.Logf("✓ Registered worker %s with capabilities %v", worker.ID, worker.Capabilities) + } + + // Create test work directory + workDir := filepath.Join(os.TempDir(), "seaweedfs_workflow_test") + err = os.MkdirAll(workDir, 0755) + if err != nil { + t.Fatalf("Failed to create work directory: %v", err) + } + defer os.RemoveAll(workDir) + + // Create multiple tasks of different types + tasks := []*types.Task{ + { + ID: "ec-workflow-1", + Type: types.TaskTypeErasureCoding, + VolumeID: 11111, + Server: "localhost:8080", + Status: types.TaskStatusPending, + Priority: types.TaskPriorityHigh, + Parameters: map[string]interface{}{ + "volume_size": int64(50 * 1024 * 1024 * 1024), + "master_client": "localhost:9333", + "work_dir": workDir, + "collection": "workflow_test", + }, + CreatedAt: time.Now(), + }, + { + ID: "vacuum-workflow-1", + Type: types.TaskTypeVacuum, + VolumeID: 22222, + Server: "localhost:8081", + Status: types.TaskStatusPending, + Priority: types.TaskPriorityNormal, + Parameters: map[string]interface{}{ + "garbage_threshold": "0.4", + "volume_size": int64(20 * 1024 * 1024 * 1024), + }, + CreatedAt: time.Now(), + }, + { + ID: "ec-workflow-2", + Type: types.TaskTypeErasureCoding, + VolumeID: 33333, + Server: "localhost:8082", + Status: types.TaskStatusPending, + Priority: types.TaskPriorityNormal, + Parameters: map[string]interface{}{ + "volume_size": int64(80 * 1024 * 1024 * 1024), + "master_client": "localhost:9333", + "work_dir": workDir, + "collection": "workflow_test", + }, + CreatedAt: time.Now(), + }, + } + + // Queue all tasks + for _, task := range tasks { + err = adminServer.QueueTask(task) + if err != nil { + t.Fatalf("Failed to queue task %s: %v", task.ID, err) + } + t.Logf("✓ Queued task %s (%s) for volume %d", task.ID, task.Type, task.VolumeID) + } + + // Test task assignment to appropriate workers + t.Logf("Testing task assignments to appropriate workers") + + // EC specialist should get EC tasks + assignedTask, err := adminServer.RequestTask("ec-specialist-1", []types.TaskType{types.TaskTypeErasureCoding}) + if err != nil { + t.Errorf("Failed to request task for EC specialist: %v", err) + } else if assignedTask != nil { + t.Logf("✓ EC specialist got task: %s (%s)", assignedTask.ID, assignedTask.Type) + + // Complete the task + err = adminServer.UpdateTaskProgress(assignedTask.ID, 100.0) + if err != nil { + t.Errorf("Failed to update progress: %v", err) + } + + err = adminServer.CompleteTask(assignedTask.ID, true, "") + if err != nil { + t.Errorf("Failed to complete task: %v", err) + } + t.Logf("✓ EC task completed by specialist") + } + + // Vacuum worker should get vacuum tasks + assignedTask, err = adminServer.RequestTask("vacuum-worker-1", []types.TaskType{types.TaskTypeVacuum}) + if err != nil { + t.Errorf("Failed to request task for vacuum worker: %v", err) + } else if assignedTask != nil { + t.Logf("✓ Vacuum worker got task: %s (%s)", assignedTask.ID, assignedTask.Type) + + // Complete the task + err = adminServer.UpdateTaskProgress(assignedTask.ID, 100.0) + if err != nil { + t.Errorf("Failed to update progress: %v", err) + } + + err = adminServer.CompleteTask(assignedTask.ID, true, "") + if err != nil { + t.Errorf("Failed to complete task: %v", err) + } + t.Logf("✓ Vacuum task completed by vacuum worker") + } + + // Multi-capability worker should get remaining tasks + assignedTask, err = adminServer.RequestTask("multi-capability-worker-1", []types.TaskType{types.TaskTypeVacuum, types.TaskTypeErasureCoding}) + if err != nil { + t.Errorf("Failed to request task for multi-capability worker: %v", err) + } else if assignedTask != nil { + t.Logf("✓ Multi-capability worker got task: %s (%s)", assignedTask.ID, assignedTask.Type) + + // Complete the task + err = adminServer.UpdateTaskProgress(assignedTask.ID, 100.0) + if err != nil { + t.Errorf("Failed to update progress: %v", err) + } + + err = adminServer.CompleteTask(assignedTask.ID, true, "") + if err != nil { + t.Errorf("Failed to complete task: %v", err) + } + t.Logf("✓ Task completed by multi-capability worker") + } + + // Check final workflow statistics + stats := adminServer.GetSystemStats() + t.Logf("✓ Final workflow stats: Active tasks=%d, Queued tasks=%d, Active workers=%d, Total tasks=%d", + stats.ActiveTasks, stats.QueuedTasks, stats.ActiveWorkers, stats.TotalTasks) + + history := adminServer.GetTaskHistory() + t.Logf("✓ Workflow history contains %d completed tasks", len(history)) + + // Analyze task completion by type + ecTasks := 0 + vacuumTasks := 0 + + for _, entry := range history { + switch entry.TaskType { + case types.TaskTypeErasureCoding: + ecTasks++ + t.Logf(" EC: %s - Worker: %s, Duration: %v", + entry.TaskID, entry.WorkerID, entry.Duration) + case types.TaskTypeVacuum: + vacuumTasks++ + t.Logf(" Vacuum: %s - Worker: %s, Duration: %v", + entry.TaskID, entry.WorkerID, entry.Duration) + } + } + + t.Logf("✓ Completed tasks: %d EC, %d Vacuum", ecTasks, vacuumTasks) + t.Logf("✅ EC workflow test completed successfully") +} diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go index 13a795fc1..aeb8acafe 100644 --- a/weed/worker/tasks/erasure_coding/ec.go +++ b/weed/worker/tasks/erasure_coding/ec.go @@ -3,39 +3,95 @@ package erasure_coding import ( "context" "fmt" + "io" + "os" + "path/filepath" + "sort" "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc" ) -// Task implements erasure coding operation to convert volumes to EC format +// Task implements comprehensive erasure coding with local processing and smart distribution type Task struct { *tasks.BaseTask - server string - volumeID uint32 - collection string - grpcDialOpt grpc.DialOption + sourceServer string + volumeID uint32 + collection string + workDir string + masterClient string + grpcDialOpt grpc.DialOption + + // EC parameters + dataShards int // Default: 10 + parityShards int // Default: 4 + totalShards int // Default: 14 + + // Progress tracking + currentStep string + stepProgress map[string]float64 +} + +// ServerInfo holds information about available servers for shard placement +type ServerInfo struct { + Address string + DataCenter string + Rack string + AvailableSpace int64 + LoadScore float64 + ShardCount int } -// NewTask creates a new erasure coding task instance -func NewTask(server string, volumeID uint32) *Task { +// ShardPlacement represents where a shard should be placed +type ShardPlacement struct { + ShardID int + ServerAddr string + DataCenter string + Rack string + BackupAddrs []string // Alternative servers for redundancy +} + +// NewTask creates a new erasure coding task +func NewTask(sourceServer string, volumeID uint32) *Task { task := &Task{ - BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding), - server: server, - volumeID: volumeID, + BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding), + sourceServer: sourceServer, + volumeID: volumeID, + masterClient: "localhost:9333", // Default master client + workDir: "/tmp/seaweedfs_ec_work", // Default work directory + dataShards: 10, + parityShards: 4, + totalShards: 14, + stepProgress: make(map[string]float64), } return task } -// Execute executes the actual erasure coding task using real SeaweedFS operations -func (t *Task) Execute(params types.TaskParams) error { - glog.Infof("Starting erasure coding for volume %d on server %s", t.volumeID, t.server) +// NewTaskWithParams creates a new erasure coding task with custom parameters +func NewTaskWithParams(sourceServer string, volumeID uint32, masterClient string, workDir string) *Task { + task := &Task{ + BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding), + sourceServer: sourceServer, + volumeID: volumeID, + masterClient: masterClient, + workDir: workDir, + dataShards: 10, + parityShards: 4, + totalShards: 14, + stepProgress: make(map[string]float64), + } + return task +} - ctx := context.Background() +// Execute performs the comprehensive EC operation +func (t *Task) Execute(params types.TaskParams) error { + glog.Infof("Starting erasure coding for volume %d from server %s", t.volumeID, t.sourceServer) // Extract parameters t.collection = params.Collection @@ -43,81 +99,575 @@ func (t *Task) Execute(params types.TaskParams) error { t.collection = "default" } - // Connect to volume server - conn, err := grpc.Dial(t.server, grpc.WithInsecure()) + // Override defaults with parameters if provided + if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" { + t.masterClient = mc + } + if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" { + t.workDir = wd + } + + // Create working directory for this task + taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("ec_%d_%d", t.volumeID, time.Now().Unix())) + err := os.MkdirAll(taskWorkDir, 0755) + if err != nil { + return fmt.Errorf("failed to create work directory %s: %v", taskWorkDir, err) + } + defer t.cleanup(taskWorkDir) + + // Step 1: Copy volume data to local disk + if err := t.copyVolumeDataLocally(taskWorkDir); err != nil { + return fmt.Errorf("failed to copy volume data: %v", err) + } + + // Step 2: Mark source volume as read-only + if err := t.markVolumeReadOnly(); err != nil { + return fmt.Errorf("failed to mark volume read-only: %v", err) + } + + // Step 3: Perform local EC encoding + shardFiles, err := t.performLocalECEncoding(taskWorkDir) + if err != nil { + return fmt.Errorf("failed to perform EC encoding: %v", err) + } + + // Step 4: Find optimal shard placement + placements, err := t.calculateOptimalShardPlacement() if err != nil { - return fmt.Errorf("failed to connect to volume server %s: %v", t.server, err) + return fmt.Errorf("failed to calculate shard placement: %v", err) + } + + // Step 5: Distribute shards to target servers + if err := t.distributeShards(shardFiles, placements); err != nil { + return fmt.Errorf("failed to distribute shards: %v", err) + } + + // Step 6: Verify and cleanup source volume + if err := t.verifyAndCleanupSource(); err != nil { + return fmt.Errorf("failed to verify and cleanup: %v", err) + } + + t.SetProgress(100.0) + glog.Infof("Successfully completed erasure coding for volume %d", t.volumeID) + return nil +} + +// copyVolumeDataLocally copies the volume data from source server to local disk +func (t *Task) copyVolumeDataLocally(workDir string) error { + t.currentStep = "copying_volume_data" + t.SetProgress(5.0) + glog.V(1).Infof("Copying volume %d data from %s to local disk", t.volumeID, t.sourceServer) + + ctx := context.Background() + + // Connect to source volume server + conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure()) + if err != nil { + return fmt.Errorf("failed to connect to source server %s: %v", t.sourceServer, err) } defer conn.Close() client := volume_server_pb.NewVolumeServerClient(conn) - // Step 1: Mark volume as read-only first - t.SetProgress(10.0) + // Get volume info first + statusResp, err := client.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{ + VolumeId: t.volumeID, + }) + if err != nil { + return fmt.Errorf("failed to get volume status: %v", err) + } + + glog.V(1).Infof("Volume %d size: %d bytes, file count: %d", + t.volumeID, statusResp.VolumeSize, statusResp.FileCount) + + // Copy .dat file + datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID)) + if err := t.copyVolumeFile(client, ctx, t.volumeID, ".dat", datFile, statusResp.VolumeSize); err != nil { + return fmt.Errorf("failed to copy .dat file: %v", err) + } + + // Copy .idx file + idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID)) + if err := t.copyVolumeFile(client, ctx, t.volumeID, ".idx", idxFile, 0); err != nil { + return fmt.Errorf("failed to copy .idx file: %v", err) + } + + t.SetProgress(15.0) + glog.V(1).Infof("Successfully copied volume %d files to %s", t.volumeID, workDir) + return nil +} + +// copyVolumeFile copies a specific volume file from source server +func (t *Task) copyVolumeFile(client volume_server_pb.VolumeServerClient, ctx context.Context, + volumeID uint32, extension string, localPath string, expectedSize uint64) error { + + // Stream volume file data using CopyFile API + stream, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + VolumeId: volumeID, + Ext: extension, + Collection: t.collection, + }) + if err != nil { + return fmt.Errorf("failed to start file copy stream: %v", err) + } + + // Create local file + file, err := os.Create(localPath) + if err != nil { + return fmt.Errorf("failed to create local file %s: %v", localPath, err) + } + defer file.Close() + + // Copy data with progress tracking + var totalBytes int64 + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to receive file data: %v", err) + } + + written, err := file.Write(resp.FileContent) + if err != nil { + return fmt.Errorf("failed to write to local file: %v", err) + } + + totalBytes += int64(written) + + // Update progress for large files + if expectedSize > 0 { + progress := float64(totalBytes) / float64(expectedSize) * 10.0 // 10% of total progress + t.SetProgress(5.0 + progress) + } + } + + glog.V(2).Infof("Copied %d bytes to %s", totalBytes, localPath) + return nil +} + +// markVolumeReadOnly marks the source volume as read-only +func (t *Task) markVolumeReadOnly() error { + t.currentStep = "marking_readonly" + t.SetProgress(20.0) glog.V(1).Infof("Marking volume %d as read-only", t.volumeID) + ctx := context.Background() + conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure()) + if err != nil { + return fmt.Errorf("failed to connect to source server: %v", err) + } + defer conn.Close() + + client := volume_server_pb.NewVolumeServerClient(conn) _, err = client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: t.volumeID, }) if err != nil { - return fmt.Errorf("failed to mark volume %d as read-only: %v", t.volumeID, err) + return fmt.Errorf("failed to mark volume read-only: %v", err) } - // Step 2: Generate EC shards + t.SetProgress(25.0) + return nil +} + +// performLocalECEncoding performs Reed-Solomon encoding on local volume files +func (t *Task) performLocalECEncoding(workDir string) ([]string, error) { + t.currentStep = "encoding" t.SetProgress(30.0) - glog.V(1).Infof("Generating EC shards for volume %d", t.volumeID) + glog.V(1).Infof("Performing local EC encoding for volume %d", t.volumeID) - _, err = client.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ - VolumeId: t.volumeID, - Collection: t.collection, - }) + datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID)) + idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID)) + + // Check if files exist and get their sizes + datInfo, err := os.Stat(datFile) + if err != nil { + return nil, fmt.Errorf("failed to stat dat file: %v", err) + } + + idxInfo, err := os.Stat(idxFile) + if err != nil { + return nil, fmt.Errorf("failed to stat idx file: %v", err) + } + + glog.V(1).Infof("Encoding files: %s (%d bytes), %s (%d bytes)", + datFile, datInfo.Size(), idxFile, idxInfo.Size()) + + // Generate EC shards using SeaweedFS erasure coding + shardFiles := make([]string, t.totalShards) + for i := 0; i < t.totalShards; i++ { + shardFiles[i] = filepath.Join(workDir, fmt.Sprintf("%d.ec%02d", t.volumeID, i)) + } + + // Encode .dat file + if err := t.encodeFile(datFile, shardFiles, ".dat"); err != nil { + return nil, fmt.Errorf("failed to encode dat file: %v", err) + } + + t.SetProgress(45.0) + + // Encode .idx file + if err := t.encodeFile(idxFile, shardFiles, ".idx"); err != nil { + return nil, fmt.Errorf("failed to encode idx file: %v", err) + } + + t.SetProgress(60.0) + glog.V(1).Infof("Successfully created %d EC shards for volume %d", t.totalShards, t.volumeID) + return shardFiles, nil +} + +// encodeFile encodes a single file into EC shards +func (t *Task) encodeFile(inputFile string, shardFiles []string, fileType string) error { + // Read input file + data, err := os.ReadFile(inputFile) + if err != nil { + return fmt.Errorf("failed to read input file: %v", err) + } + + // Write data to a temporary file first, then use SeaweedFS erasure coding + tempFile := filepath.Join(filepath.Dir(shardFiles[0]), fmt.Sprintf("temp_%s", filepath.Base(inputFile))) + err = os.WriteFile(tempFile, data, 0644) + if err != nil { + return fmt.Errorf("failed to write temp file: %v", err) + } + defer os.Remove(tempFile) + + // Use SeaweedFS erasure coding library with base filename + baseFileName := tempFile[:len(tempFile)-len(filepath.Ext(tempFile))] + err = erasure_coding.WriteEcFiles(baseFileName) + if err != nil { + return fmt.Errorf("failed to write EC files: %v", err) + } + + // Verify that shards were created + for i, shardFile := range shardFiles { + if _, err := os.Stat(shardFile); err != nil { + glog.Warningf("Shard %d file %s not found: %v", i, shardFile, err) + } else { + info, _ := os.Stat(shardFile) + glog.V(2).Infof("Created shard %d: %s (%d bytes)", i, shardFile, info.Size()) + } + } + + return nil +} + +// calculateOptimalShardPlacement determines where to place each shard for optimal distribution +func (t *Task) calculateOptimalShardPlacement() ([]ShardPlacement, error) { + t.currentStep = "calculating_placement" + t.SetProgress(65.0) + glog.V(1).Infof("Calculating optimal shard placement for volume %d", t.volumeID) + + // Get available servers from master + servers, err := t.getAvailableServers() if err != nil { - return fmt.Errorf("failed to generate EC shards for volume %d: %v", t.volumeID, err) + return nil, fmt.Errorf("failed to get available servers: %v", err) + } + + if len(servers) < t.totalShards { + return nil, fmt.Errorf("insufficient servers: need %d, have %d", t.totalShards, len(servers)) + } + + // Sort servers by placement desirability (considering space, load, affinity) + t.rankServersForPlacement(servers) + + // Assign shards to servers with affinity logic + placements := make([]ShardPlacement, t.totalShards) + usedServers := make(map[string]int) // Track how many shards per server + + for shardID := 0; shardID < t.totalShards; shardID++ { + server := t.selectBestServerForShard(servers, usedServers, shardID) + if server == nil { + return nil, fmt.Errorf("failed to find suitable server for shard %d", shardID) + } + + placements[shardID] = ShardPlacement{ + ShardID: shardID, + ServerAddr: server.Address, + DataCenter: server.DataCenter, + Rack: server.Rack, + BackupAddrs: t.selectBackupServers(servers, server, 2), + } + + usedServers[server.Address]++ + glog.V(2).Infof("Assigned shard %d to server %s (DC: %s, Rack: %s)", + shardID, server.Address, server.DataCenter, server.Rack) } - // Step 3: Mount EC shards (all 14 shards: 10 data + 4 parity) t.SetProgress(70.0) - glog.V(1).Infof("Mounting EC shards for volume %d", t.volumeID) + glog.V(1).Infof("Calculated placement for %d shards across %d servers", + t.totalShards, len(usedServers)) + return placements, nil +} - // Create shard IDs for all 14 shards (0-13) - shardIds := make([]uint32, 14) - for i := 0; i < 14; i++ { - shardIds[i] = uint32(i) +// getAvailableServers retrieves available servers from the master +func (t *Task) getAvailableServers() ([]*ServerInfo, error) { + ctx := context.Background() + conn, err := grpc.Dial(t.masterClient, grpc.WithInsecure()) + if err != nil { + return nil, fmt.Errorf("failed to connect to master: %v", err) } + defer conn.Close() - _, err = client.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ - VolumeId: t.volumeID, - Collection: t.collection, - ShardIds: shardIds, - }) + client := master_pb.NewSeaweedClient(conn) + resp, err := client.VolumeList(ctx, &master_pb.VolumeListRequest{}) if err != nil { - return fmt.Errorf("failed to mount EC shards for volume %d: %v", t.volumeID, err) + return nil, fmt.Errorf("failed to get volume list: %v", err) + } + + servers := make([]*ServerInfo, 0) + + // Parse topology information to extract server details + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for diskType, diskInfo := range node.DiskInfos { + server := &ServerInfo{ + Address: fmt.Sprintf("%s:%d", node.Id, node.GrpcPort), + DataCenter: dc.Id, + Rack: rack.Id, + AvailableSpace: int64(diskInfo.FreeVolumeCount) * 32 * 1024 * 1024 * 1024, // Rough estimate + LoadScore: float64(diskInfo.ActiveVolumeCount) / float64(diskInfo.MaxVolumeCount), + ShardCount: 0, + } + + // Skip servers that are full or have high load + if diskInfo.FreeVolumeCount > 0 && server.LoadScore < 0.9 { + servers = append(servers, server) + glog.V(2).Infof("Available server: %s (DC: %s, Rack: %s, DiskType: %s, Load: %.2f)", + server.Address, server.DataCenter, server.Rack, diskType, server.LoadScore) + } + } + } + } + } + } + + return servers, nil +} + +// rankServersForPlacement sorts servers by desirability for shard placement +func (t *Task) rankServersForPlacement(servers []*ServerInfo) { + sort.Slice(servers, func(i, j int) bool { + serverA, serverB := servers[i], servers[j] + + // Primary criteria: lower load is better + if serverA.LoadScore != serverB.LoadScore { + return serverA.LoadScore < serverB.LoadScore + } + + // Secondary criteria: more available space is better + if serverA.AvailableSpace != serverB.AvailableSpace { + return serverA.AvailableSpace > serverB.AvailableSpace + } + + // Tertiary criteria: fewer existing shards is better + return serverA.ShardCount < serverB.ShardCount + }) +} + +// selectBestServerForShard selects the best server for a specific shard considering affinity +func (t *Task) selectBestServerForShard(servers []*ServerInfo, usedServers map[string]int, shardID int) *ServerInfo { + // For data shards (0-9), prefer distribution across different racks + // For parity shards (10-13), can be more flexible + isDataShard := shardID < t.dataShards + + var candidates []*ServerInfo + + if isDataShard { + // For data shards, prioritize rack diversity + usedRacks := make(map[string]bool) + for _, server := range servers { + if count, exists := usedServers[server.Address]; exists && count > 0 { + usedRacks[server.Rack] = true + } + } + + // First try to find servers in unused racks + for _, server := range servers { + if !usedRacks[server.Rack] && usedServers[server.Address] < 2 { // Max 2 shards per server + candidates = append(candidates, server) + } + } + + // If no unused racks, fall back to any available server + if len(candidates) == 0 { + for _, server := range servers { + if usedServers[server.Address] < 2 { + candidates = append(candidates, server) + } + } + } + } else { + // For parity shards, just avoid overloading servers + for _, server := range servers { + if usedServers[server.Address] < 2 { + candidates = append(candidates, server) + } + } + } + + if len(candidates) == 0 { + // Last resort: allow up to 3 shards per server + for _, server := range servers { + if usedServers[server.Address] < 3 { + candidates = append(candidates, server) + } + } + } + + if len(candidates) > 0 { + return candidates[0] // Already sorted by desirability + } + + return nil +} + +// selectBackupServers selects backup servers for redundancy +func (t *Task) selectBackupServers(servers []*ServerInfo, primaryServer *ServerInfo, count int) []string { + var backups []string + + for _, server := range servers { + if server.Address != primaryServer.Address && server.Rack != primaryServer.Rack { + backups = append(backups, server.Address) + if len(backups) >= count { + break + } + } + } + + return backups +} + +// distributeShards uploads shards to their assigned servers +func (t *Task) distributeShards(shardFiles []string, placements []ShardPlacement) error { + t.currentStep = "distributing_shards" + t.SetProgress(75.0) + glog.V(1).Infof("Distributing %d shards to target servers", len(placements)) + + // Distribute shards in parallel for better performance + successCount := 0 + errors := make([]error, 0) + + for i, placement := range placements { + shardFile := shardFiles[i] + + err := t.uploadShardToServer(shardFile, placement) + if err != nil { + glog.Errorf("Failed to upload shard %d to %s: %v", i, placement.ServerAddr, err) + errors = append(errors, err) + + // Try backup servers + uploaded := false + for _, backupAddr := range placement.BackupAddrs { + backupPlacement := placement + backupPlacement.ServerAddr = backupAddr + if err := t.uploadShardToServer(shardFile, backupPlacement); err == nil { + glog.V(1).Infof("Successfully uploaded shard %d to backup server %s", i, backupAddr) + uploaded = true + break + } + } + + if !uploaded { + return fmt.Errorf("failed to upload shard %d to any server", i) + } + } + + successCount++ + progress := 75.0 + (float64(successCount)/float64(len(placements)))*15.0 + t.SetProgress(progress) + + glog.V(2).Infof("Successfully distributed shard %d to %s", i, placement.ServerAddr) + } + + if len(errors) > 0 && successCount < len(placements)/2 { + return fmt.Errorf("too many shard distribution failures: %d/%d", len(errors), len(placements)) } - // Step 4: Verify volume status t.SetProgress(90.0) - glog.V(1).Infof("Verifying volume %d after EC conversion", t.volumeID) + glog.V(1).Infof("Successfully distributed %d/%d shards", successCount, len(placements)) + return nil +} + +// uploadShardToServer uploads a shard file to a specific server +func (t *Task) uploadShardToServer(shardFile string, placement ShardPlacement) error { + glog.V(2).Infof("Uploading shard %d to server %s", placement.ShardID, placement.ServerAddr) + + ctx := context.Background() + conn, err := grpc.Dial(placement.ServerAddr, grpc.WithInsecure()) + if err != nil { + return fmt.Errorf("failed to connect to server %s: %v", placement.ServerAddr, err) + } + defer conn.Close() - // Check if volume is now read-only (which indicates successful EC conversion) + client := volume_server_pb.NewVolumeServerClient(conn) + + // Upload shard using VolumeEcShardsCopy - this assumes shards are already generated locally + // and we're copying them to the target server + shardIds := []uint32{uint32(placement.ShardID)} + _, err = client.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + ShardIds: shardIds, + CopyEcxFile: true, + CopyEcjFile: true, + CopyVifFile: true, + }) + if err != nil { + return fmt.Errorf("failed to copy EC shard: %v", err) + } + + glog.V(2).Infof("Successfully uploaded shard %d to %s", placement.ShardID, placement.ServerAddr) + return nil +} + +// verifyAndCleanupSource verifies the EC conversion and cleans up the source volume +func (t *Task) verifyAndCleanupSource() error { + t.currentStep = "verify_cleanup" + t.SetProgress(95.0) + glog.V(1).Infof("Verifying EC conversion and cleaning up source volume %d", t.volumeID) + + ctx := context.Background() + conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure()) + if err != nil { + return fmt.Errorf("failed to connect to source server: %v", err) + } + defer conn.Close() + + client := volume_server_pb.NewVolumeServerClient(conn) + + // Verify source volume is read-only statusResp, err := client.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{ VolumeId: t.volumeID, }) - if err != nil { - glog.Warningf("Could not verify EC status for volume %d: %v", t.volumeID, err) - // This is not a failure - continue - } else { - if statusResp.IsReadOnly { - glog.V(1).Infof("Volume %d is now read-only, EC conversion likely successful", t.volumeID) - } else { - glog.Warningf("Volume %d is not read-only after EC conversion", t.volumeID) - } + if err == nil && statusResp.IsReadOnly { + glog.V(1).Infof("Source volume %d is confirmed read-only", t.volumeID) } - t.SetProgress(100.0) - glog.Infof("Successfully completed erasure coding for volume %d on server %s", t.volumeID, t.server) + // Delete source volume files (optional - could be kept for backup) + // This would normally be done after confirming all shards are properly distributed + // _, err = client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ + // VolumeId: t.volumeID, + // }) + // if err != nil { + // glog.Warningf("Failed to delete source volume: %v", err) + // } + return nil } +// cleanup removes temporary files and directories +func (t *Task) cleanup(workDir string) { + glog.V(1).Infof("Cleaning up work directory: %s", workDir) + if err := os.RemoveAll(workDir); err != nil { + glog.Warningf("Failed to cleanup work directory %s: %v", workDir, err) + } +} + // Validate validates the task parameters func (t *Task) Validate(params types.TaskParams) error { if params.VolumeID == 0 { @@ -126,19 +676,24 @@ func (t *Task) Validate(params types.TaskParams) error { if params.Server == "" { return fmt.Errorf("server is required") } + if t.masterClient == "" { + return fmt.Errorf("master_client is required") + } + if t.workDir == "" { + return fmt.Errorf("work_dir is required") + } return nil } -// EstimateTime estimates the time needed for the task +// EstimateTime estimates the time needed for EC processing func (t *Task) EstimateTime(params types.TaskParams) time.Duration { - // Base time for EC operations - varies significantly by volume size - // For a typical 30GB volume, EC generation can take 5-15 minutes - baseTime := 10 * time.Minute + baseTime := 20 * time.Minute // Processing takes time due to comprehensive operations - // Could adjust based on volume size if available in params if size, ok := params.Parameters["volume_size"].(int64); ok { - // Rough estimate: 1 minute per GB - estimatedTime := time.Duration(size/(1024*1024*1024)) * time.Minute + // More accurate estimate based on volume size + // Account for copying, encoding, and distribution + gbSize := size / (1024 * 1024 * 1024) + estimatedTime := time.Duration(gbSize*2) * time.Minute // 2 minutes per GB if estimatedTime > baseTime { return estimatedTime } @@ -147,11 +702,22 @@ func (t *Task) EstimateTime(params types.TaskParams) time.Duration { return baseTime } -// GetProgress returns the current progress +// GetProgress returns current progress with detailed step information func (t *Task) GetProgress() float64 { return t.BaseTask.GetProgress() } +// GetCurrentStep returns the current processing step +func (t *Task) GetCurrentStep() string { + return t.currentStep +} + +// SetEstimatedDuration sets the estimated duration for the task +func (t *Task) SetEstimatedDuration(duration time.Duration) { + // This can be implemented to store the estimated duration if needed + // For now, we'll use the dynamic estimation from EstimateTime +} + // Cancel cancels the task func (t *Task) Cancel() error { return t.BaseTask.Cancel() diff --git a/weed/worker/tasks/erasure_coding/ec_enhanced.go b/weed/worker/tasks/erasure_coding/ec_enhanced.go deleted file mode 100644 index 4757b7b59..000000000 --- a/weed/worker/tasks/erasure_coding/ec_enhanced.go +++ /dev/null @@ -1,689 +0,0 @@ -package erasure_coding - -import ( - "context" - "fmt" - "io" - "os" - "path/filepath" - "sort" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/types" - "google.golang.org/grpc" -) - -// EnhancedECTask implements comprehensive erasure coding with local processing and smart distribution -type EnhancedECTask struct { - *tasks.BaseTask - sourceServer string - volumeID uint32 - collection string - workDir string - masterClient string - grpcDialOpt grpc.DialOption - - // EC parameters - dataShards int // Default: 10 - parityShards int // Default: 4 - totalShards int // Default: 14 - - // Progress tracking - currentStep string - stepProgress map[string]float64 -} - -// ServerInfo holds information about available servers for shard placement -type ServerInfo struct { - Address string - DataCenter string - Rack string - AvailableSpace int64 - LoadScore float64 - ShardCount int -} - -// ShardPlacement represents where a shard should be placed -type ShardPlacement struct { - ShardID int - ServerAddr string - DataCenter string - Rack string - BackupAddrs []string // Alternative servers for redundancy -} - -// NewEnhancedECTask creates a new enhanced erasure coding task -func NewEnhancedECTask(sourceServer string, volumeID uint32, masterClient string, workDir string) *EnhancedECTask { - task := &EnhancedECTask{ - BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding), - sourceServer: sourceServer, - volumeID: volumeID, - masterClient: masterClient, - workDir: workDir, - dataShards: 10, - parityShards: 4, - totalShards: 14, - stepProgress: make(map[string]float64), - } - return task -} - -// Execute performs the comprehensive EC operation -func (t *EnhancedECTask) Execute(params types.TaskParams) error { - glog.Infof("Starting enhanced erasure coding for volume %d from server %s", t.volumeID, t.sourceServer) - - // Extract parameters - t.collection = params.Collection - if t.collection == "" { - t.collection = "default" - } - - // Create working directory for this task - taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("ec_%d_%d", t.volumeID, time.Now().Unix())) - err := os.MkdirAll(taskWorkDir, 0755) - if err != nil { - return fmt.Errorf("failed to create work directory %s: %v", taskWorkDir, err) - } - defer t.cleanup(taskWorkDir) - - // Step 1: Copy volume data to local disk - if err := t.copyVolumeDataLocally(taskWorkDir); err != nil { - return fmt.Errorf("failed to copy volume data: %v", err) - } - - // Step 2: Mark source volume as read-only - if err := t.markVolumeReadOnly(); err != nil { - return fmt.Errorf("failed to mark volume read-only: %v", err) - } - - // Step 3: Perform local EC encoding - shardFiles, err := t.performLocalECEncoding(taskWorkDir) - if err != nil { - return fmt.Errorf("failed to perform EC encoding: %v", err) - } - - // Step 4: Find optimal shard placement - placements, err := t.calculateOptimalShardPlacement() - if err != nil { - return fmt.Errorf("failed to calculate shard placement: %v", err) - } - - // Step 5: Distribute shards to target servers - if err := t.distributeShards(shardFiles, placements); err != nil { - return fmt.Errorf("failed to distribute shards: %v", err) - } - - // Step 6: Verify and cleanup source volume - if err := t.verifyAndCleanupSource(); err != nil { - return fmt.Errorf("failed to verify and cleanup: %v", err) - } - - t.SetProgress(100.0) - glog.Infof("Successfully completed enhanced erasure coding for volume %d", t.volumeID) - return nil -} - -// copyVolumeDataLocally copies the volume data from source server to local disk -func (t *EnhancedECTask) copyVolumeDataLocally(workDir string) error { - t.currentStep = "copying_volume_data" - t.SetProgress(5.0) - glog.V(1).Infof("Copying volume %d data from %s to local disk", t.volumeID, t.sourceServer) - - ctx := context.Background() - - // Connect to source volume server - conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure()) - if err != nil { - return fmt.Errorf("failed to connect to source server %s: %v", t.sourceServer, err) - } - defer conn.Close() - - client := volume_server_pb.NewVolumeServerClient(conn) - - // Get volume info first - statusResp, err := client.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{ - VolumeId: t.volumeID, - }) - if err != nil { - return fmt.Errorf("failed to get volume status: %v", err) - } - - glog.V(1).Infof("Volume %d size: %d bytes, file count: %d", - t.volumeID, statusResp.VolumeSize, statusResp.FileCount) - - // Copy .dat file - datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID)) - if err := t.copyVolumeFile(client, ctx, t.volumeID, ".dat", datFile, statusResp.VolumeSize); err != nil { - return fmt.Errorf("failed to copy .dat file: %v", err) - } - - // Copy .idx file - idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID)) - if err := t.copyVolumeFile(client, ctx, t.volumeID, ".idx", idxFile, 0); err != nil { - return fmt.Errorf("failed to copy .idx file: %v", err) - } - - t.SetProgress(15.0) - glog.V(1).Infof("Successfully copied volume %d files to %s", t.volumeID, workDir) - return nil -} - -// copyVolumeFile copies a specific volume file from source server -func (t *EnhancedECTask) copyVolumeFile(client volume_server_pb.VolumeServerClient, ctx context.Context, - volumeID uint32, extension string, localPath string, expectedSize uint64) error { - - // Stream volume file data using CopyFile API - stream, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ - VolumeId: volumeID, - Ext: extension, - Collection: t.collection, - }) - if err != nil { - return fmt.Errorf("failed to start volume copy stream: %v", err) - } - - // Create local file - file, err := os.Create(localPath) - if err != nil { - return fmt.Errorf("failed to create local file %s: %v", localPath, err) - } - defer file.Close() - - // Copy data with progress tracking - var totalBytes int64 - for { - resp, err := stream.Recv() - if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("failed to receive volume data: %v", err) - } - - written, err := file.Write(resp.FileContent) - if err != nil { - return fmt.Errorf("failed to write to local file: %v", err) - } - - totalBytes += int64(written) - - // Update progress for large files - if expectedSize > 0 { - progress := float64(totalBytes) / float64(expectedSize) * 10.0 // 10% of total progress - t.SetProgress(5.0 + progress) - } - } - - glog.V(2).Infof("Copied %d bytes to %s", totalBytes, localPath) - return nil -} - -// markVolumeReadOnly marks the source volume as read-only -func (t *EnhancedECTask) markVolumeReadOnly() error { - t.currentStep = "marking_readonly" - t.SetProgress(20.0) - glog.V(1).Infof("Marking volume %d as read-only", t.volumeID) - - ctx := context.Background() - conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure()) - if err != nil { - return fmt.Errorf("failed to connect to source server: %v", err) - } - defer conn.Close() - - client := volume_server_pb.NewVolumeServerClient(conn) - _, err = client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ - VolumeId: t.volumeID, - }) - if err != nil { - return fmt.Errorf("failed to mark volume read-only: %v", err) - } - - t.SetProgress(25.0) - return nil -} - -// performLocalECEncoding performs Reed-Solomon encoding on local volume files -func (t *EnhancedECTask) performLocalECEncoding(workDir string) ([]string, error) { - t.currentStep = "encoding" - t.SetProgress(30.0) - glog.V(1).Infof("Performing local EC encoding for volume %d", t.volumeID) - - datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID)) - idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID)) - - // Check if files exist and get their sizes - datInfo, err := os.Stat(datFile) - if err != nil { - return nil, fmt.Errorf("failed to stat dat file: %v", err) - } - - idxInfo, err := os.Stat(idxFile) - if err != nil { - return nil, fmt.Errorf("failed to stat idx file: %v", err) - } - - glog.V(1).Infof("Encoding files: %s (%d bytes), %s (%d bytes)", - datFile, datInfo.Size(), idxFile, idxInfo.Size()) - - // Generate EC shards using SeaweedFS erasure coding - shardFiles := make([]string, t.totalShards) - for i := 0; i < t.totalShards; i++ { - shardFiles[i] = filepath.Join(workDir, fmt.Sprintf("%d.ec%02d", t.volumeID, i)) - } - - // Encode .dat file - if err := t.encodeFile(datFile, shardFiles, ".dat"); err != nil { - return nil, fmt.Errorf("failed to encode dat file: %v", err) - } - - t.SetProgress(45.0) - - // Encode .idx file - if err := t.encodeFile(idxFile, shardFiles, ".idx"); err != nil { - return nil, fmt.Errorf("failed to encode idx file: %v", err) - } - - t.SetProgress(60.0) - glog.V(1).Infof("Successfully created %d EC shards for volume %d", t.totalShards, t.volumeID) - return shardFiles, nil -} - -// encodeFile encodes a single file into EC shards -func (t *EnhancedECTask) encodeFile(inputFile string, shardFiles []string, fileType string) error { - // Read input file - data, err := os.ReadFile(inputFile) - if err != nil { - return fmt.Errorf("failed to read input file: %v", err) - } - - // Write data to a temporary file first, then use SeaweedFS erasure coding - tempFile := filepath.Join(filepath.Dir(shardFiles[0]), fmt.Sprintf("temp_%s", filepath.Base(inputFile))) - err = os.WriteFile(tempFile, data, 0644) - if err != nil { - return fmt.Errorf("failed to write temp file: %v", err) - } - defer os.Remove(tempFile) - - // Use SeaweedFS erasure coding library with base filename - baseFileName := tempFile[:len(tempFile)-len(filepath.Ext(tempFile))] - err = erasure_coding.WriteEcFiles(baseFileName) - if err != nil { - return fmt.Errorf("failed to write EC files: %v", err) - } - - // Verify that shards were created - for i, shardFile := range shardFiles { - if _, err := os.Stat(shardFile); err != nil { - glog.Warningf("Shard %d file %s not found: %v", i, shardFile, err) - } else { - info, _ := os.Stat(shardFile) - glog.V(2).Infof("Created shard %d: %s (%d bytes)", i, shardFile, info.Size()) - } - } - - return nil -} - -// calculateOptimalShardPlacement determines where to place each shard for optimal distribution -func (t *EnhancedECTask) calculateOptimalShardPlacement() ([]ShardPlacement, error) { - t.currentStep = "calculating_placement" - t.SetProgress(65.0) - glog.V(1).Infof("Calculating optimal shard placement for volume %d", t.volumeID) - - // Get available servers from master - servers, err := t.getAvailableServers() - if err != nil { - return nil, fmt.Errorf("failed to get available servers: %v", err) - } - - if len(servers) < t.totalShards { - return nil, fmt.Errorf("insufficient servers: need %d, have %d", t.totalShards, len(servers)) - } - - // Sort servers by placement desirability (considering space, load, affinity) - t.rankServersForPlacement(servers) - - // Assign shards to servers with affinity logic - placements := make([]ShardPlacement, t.totalShards) - usedServers := make(map[string]int) // Track how many shards per server - - for shardID := 0; shardID < t.totalShards; shardID++ { - server := t.selectBestServerForShard(servers, usedServers, shardID) - if server == nil { - return nil, fmt.Errorf("failed to find suitable server for shard %d", shardID) - } - - placements[shardID] = ShardPlacement{ - ShardID: shardID, - ServerAddr: server.Address, - DataCenter: server.DataCenter, - Rack: server.Rack, - BackupAddrs: t.selectBackupServers(servers, server, 2), - } - - usedServers[server.Address]++ - glog.V(2).Infof("Assigned shard %d to server %s (DC: %s, Rack: %s)", - shardID, server.Address, server.DataCenter, server.Rack) - } - - t.SetProgress(70.0) - glog.V(1).Infof("Calculated placement for %d shards across %d servers", - t.totalShards, len(usedServers)) - return placements, nil -} - -// getAvailableServers retrieves available servers from the master -func (t *EnhancedECTask) getAvailableServers() ([]*ServerInfo, error) { - ctx := context.Background() - conn, err := grpc.Dial(t.masterClient, grpc.WithInsecure()) - if err != nil { - return nil, fmt.Errorf("failed to connect to master: %v", err) - } - defer conn.Close() - - client := master_pb.NewSeaweedClient(conn) - resp, err := client.VolumeList(ctx, &master_pb.VolumeListRequest{}) - if err != nil { - return nil, fmt.Errorf("failed to get volume list: %v", err) - } - - servers := make([]*ServerInfo, 0) - - // Parse topology information to extract server details - if resp.TopologyInfo != nil { - for _, dc := range resp.TopologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, node := range rack.DataNodeInfos { - for diskType, diskInfo := range node.DiskInfos { - server := &ServerInfo{ - Address: fmt.Sprintf("%s:%d", node.Id, node.GrpcPort), - DataCenter: dc.Id, - Rack: rack.Id, - AvailableSpace: int64(diskInfo.FreeVolumeCount) * 32 * 1024 * 1024 * 1024, // Rough estimate - LoadScore: float64(diskInfo.ActiveVolumeCount) / float64(diskInfo.MaxVolumeCount), - ShardCount: 0, - } - - // Skip servers that are full or have high load - if diskInfo.FreeVolumeCount > 0 && server.LoadScore < 0.9 { - servers = append(servers, server) - glog.V(2).Infof("Available server: %s (DC: %s, Rack: %s, DiskType: %s, Load: %.2f)", - server.Address, server.DataCenter, server.Rack, diskType, server.LoadScore) - } - } - } - } - } - } - - return servers, nil -} - -// rankServersForPlacement sorts servers by desirability for shard placement -func (t *EnhancedECTask) rankServersForPlacement(servers []*ServerInfo) { - sort.Slice(servers, func(i, j int) bool { - serverA, serverB := servers[i], servers[j] - - // Primary criteria: lower load is better - if serverA.LoadScore != serverB.LoadScore { - return serverA.LoadScore < serverB.LoadScore - } - - // Secondary criteria: more available space is better - if serverA.AvailableSpace != serverB.AvailableSpace { - return serverA.AvailableSpace > serverB.AvailableSpace - } - - // Tertiary criteria: fewer existing shards is better - return serverA.ShardCount < serverB.ShardCount - }) -} - -// selectBestServerForShard selects the best server for a specific shard considering affinity -func (t *EnhancedECTask) selectBestServerForShard(servers []*ServerInfo, usedServers map[string]int, shardID int) *ServerInfo { - // For data shards (0-9), prefer distribution across different racks - // For parity shards (10-13), can be more flexible - isDataShard := shardID < t.dataShards - - var candidates []*ServerInfo - - if isDataShard { - // For data shards, prioritize rack diversity - usedRacks := make(map[string]bool) - for _, server := range servers { - if count, exists := usedServers[server.Address]; exists && count > 0 { - usedRacks[server.Rack] = true - } - } - - // First try to find servers in unused racks - for _, server := range servers { - if !usedRacks[server.Rack] && usedServers[server.Address] < 2 { // Max 2 shards per server - candidates = append(candidates, server) - } - } - - // If no unused racks, fall back to any available server - if len(candidates) == 0 { - for _, server := range servers { - if usedServers[server.Address] < 2 { - candidates = append(candidates, server) - } - } - } - } else { - // For parity shards, just avoid overloading servers - for _, server := range servers { - if usedServers[server.Address] < 2 { - candidates = append(candidates, server) - } - } - } - - if len(candidates) == 0 { - // Last resort: allow up to 3 shards per server - for _, server := range servers { - if usedServers[server.Address] < 3 { - candidates = append(candidates, server) - } - } - } - - if len(candidates) > 0 { - return candidates[0] // Already sorted by desirability - } - - return nil -} - -// selectBackupServers selects backup servers for redundancy -func (t *EnhancedECTask) selectBackupServers(servers []*ServerInfo, primaryServer *ServerInfo, count int) []string { - var backups []string - - for _, server := range servers { - if server.Address != primaryServer.Address && server.Rack != primaryServer.Rack { - backups = append(backups, server.Address) - if len(backups) >= count { - break - } - } - } - - return backups -} - -// distributeShards uploads shards to their assigned servers -func (t *EnhancedECTask) distributeShards(shardFiles []string, placements []ShardPlacement) error { - t.currentStep = "distributing_shards" - t.SetProgress(75.0) - glog.V(1).Infof("Distributing %d shards to target servers", len(placements)) - - // Distribute shards in parallel for better performance - successCount := 0 - errors := make([]error, 0) - - for i, placement := range placements { - shardFile := shardFiles[i] - - err := t.uploadShardToServer(shardFile, placement) - if err != nil { - glog.Errorf("Failed to upload shard %d to %s: %v", i, placement.ServerAddr, err) - errors = append(errors, err) - - // Try backup servers - uploaded := false - for _, backupAddr := range placement.BackupAddrs { - backupPlacement := placement - backupPlacement.ServerAddr = backupAddr - if err := t.uploadShardToServer(shardFile, backupPlacement); err == nil { - glog.V(1).Infof("Successfully uploaded shard %d to backup server %s", i, backupAddr) - uploaded = true - break - } - } - - if !uploaded { - return fmt.Errorf("failed to upload shard %d to any server", i) - } - } - - successCount++ - progress := 75.0 + (float64(successCount)/float64(len(placements)))*15.0 - t.SetProgress(progress) - - glog.V(2).Infof("Successfully distributed shard %d to %s", i, placement.ServerAddr) - } - - if len(errors) > 0 && successCount < len(placements)/2 { - return fmt.Errorf("too many shard distribution failures: %d/%d", len(errors), len(placements)) - } - - t.SetProgress(90.0) - glog.V(1).Infof("Successfully distributed %d/%d shards", successCount, len(placements)) - return nil -} - -// uploadShardToServer uploads a shard file to a specific server -func (t *EnhancedECTask) uploadShardToServer(shardFile string, placement ShardPlacement) error { - glog.V(2).Infof("Uploading shard %d to server %s", placement.ShardID, placement.ServerAddr) - - ctx := context.Background() - conn, err := grpc.Dial(placement.ServerAddr, grpc.WithInsecure()) - if err != nil { - return fmt.Errorf("failed to connect to server %s: %v", placement.ServerAddr, err) - } - defer conn.Close() - - client := volume_server_pb.NewVolumeServerClient(conn) - - // Upload shard using VolumeEcShardsCopy - this assumes shards are already generated locally - // and we're copying them to the target server - shardIds := []uint32{uint32(placement.ShardID)} - _, err = client.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ - VolumeId: t.volumeID, - Collection: t.collection, - ShardIds: shardIds, - CopyEcxFile: true, - CopyEcjFile: true, - CopyVifFile: true, - }) - if err != nil { - return fmt.Errorf("failed to copy EC shard: %v", err) - } - - glog.V(2).Infof("Successfully uploaded shard %d to %s", placement.ShardID, placement.ServerAddr) - return nil -} - -// verifyAndCleanupSource verifies the EC conversion and cleans up the source volume -func (t *EnhancedECTask) verifyAndCleanupSource() error { - t.currentStep = "verify_cleanup" - t.SetProgress(95.0) - glog.V(1).Infof("Verifying EC conversion and cleaning up source volume %d", t.volumeID) - - ctx := context.Background() - conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure()) - if err != nil { - return fmt.Errorf("failed to connect to source server: %v", err) - } - defer conn.Close() - - client := volume_server_pb.NewVolumeServerClient(conn) - - // Verify source volume is read-only - statusResp, err := client.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{ - VolumeId: t.volumeID, - }) - if err == nil && statusResp.IsReadOnly { - glog.V(1).Infof("Source volume %d is confirmed read-only", t.volumeID) - } - - // Delete source volume files (optional - could be kept for backup) - // This would normally be done after confirming all shards are properly distributed - // _, err = client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ - // VolumeId: t.volumeID, - // }) - // if err != nil { - // glog.Warningf("Failed to delete source volume: %v", err) - // } - - return nil -} - -// cleanup removes temporary files and directories -func (t *EnhancedECTask) cleanup(workDir string) { - glog.V(1).Infof("Cleaning up work directory: %s", workDir) - if err := os.RemoveAll(workDir); err != nil { - glog.Warningf("Failed to cleanup work directory %s: %v", workDir, err) - } -} - -// Validate validates the enhanced task parameters -func (t *EnhancedECTask) Validate(params types.TaskParams) error { - if params.VolumeID == 0 { - return fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return fmt.Errorf("server is required") - } - if t.masterClient == "" { - return fmt.Errorf("master_client is required") - } - if t.workDir == "" { - return fmt.Errorf("work_dir is required") - } - return nil -} - -// EstimateTime estimates the time needed for enhanced EC processing -func (t *EnhancedECTask) EstimateTime(params types.TaskParams) time.Duration { - baseTime := 20 * time.Minute // Enhanced processing takes longer - - if size, ok := params.Parameters["volume_size"].(int64); ok { - // More accurate estimate based on volume size - // Account for copying, encoding, and distribution - gbSize := size / (1024 * 1024 * 1024) - estimatedTime := time.Duration(gbSize*2) * time.Minute // 2 minutes per GB - if estimatedTime > baseTime { - return estimatedTime - } - } - - return baseTime -} - -// GetProgress returns current progress with detailed step information -func (t *EnhancedECTask) GetProgress() float64 { - return t.BaseTask.GetProgress() -} - -// GetCurrentStep returns the current processing step -func (t *EnhancedECTask) GetCurrentStep() string { - return t.currentStep -} diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go index 6c4b5bf7f..e66e9295c 100644 --- a/weed/worker/tasks/erasure_coding/ec_register.go +++ b/weed/worker/tasks/erasure_coding/ec_register.go @@ -33,49 +33,20 @@ func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) { return nil, fmt.Errorf("server is required") } - task := NewTask(params.Server, params.VolumeID) - task.SetEstimatedDuration(task.EstimateTime(params)) - - return task, nil -} + // Extract additional parameters for comprehensive EC + masterClient := "localhost:9333" // Default master client + workDir := "/tmp/seaweedfs_ec_work" // Default work directory -// Shared detector and scheduler instances -var ( - sharedDetector *EcDetector - sharedScheduler *Scheduler -) - -// getSharedInstances returns the shared detector and scheduler instances -func getSharedInstances() (*EcDetector, *Scheduler) { - if sharedDetector == nil { - sharedDetector = NewEcDetector() + if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" { + masterClient = mc } - if sharedScheduler == nil { - sharedScheduler = NewScheduler() + if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" { + workDir = wd } - return sharedDetector, sharedScheduler -} -// GetSharedInstances returns the shared detector and scheduler instances (public access) -func GetSharedInstances() (*EcDetector, *Scheduler) { - return getSharedInstances() -} - -// Auto-register this task when the package is imported -func init() { - factory := NewFactory() - tasks.AutoRegister(types.TaskTypeErasureCoding, factory) - - // Get shared instances for all registrations - detector, scheduler := getSharedInstances() - - // Register with types registry - tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { - registry.RegisterTask(detector, scheduler) - }) + // Create EC task with comprehensive capabilities + task := NewTaskWithParams(params.Server, params.VolumeID, masterClient, workDir) + task.SetEstimatedDuration(task.EstimateTime(params)) - // Register with UI registry using the same instances - tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { - RegisterUI(uiRegistry, detector, scheduler) - }) + return task, nil }