diff --git a/weed/admin/maintenance/maintenance_integration.go b/weed/admin/maintenance/maintenance_integration.go index a6cef590c..a569ea09f 100644 --- a/weed/admin/maintenance/maintenance_integration.go +++ b/weed/admin/maintenance/maintenance_integration.go @@ -305,6 +305,7 @@ func (s *MaintenanceIntegration) convertToExistingFormat(result *types.TaskDetec } return &TaskDetectionResult{ + TaskID: result.TaskID, TaskType: existingType, VolumeID: result.VolumeID, Server: result.Server, @@ -523,19 +524,25 @@ func (s *MaintenanceIntegration) SyncTask(task *MaintenanceTask) { var estimatedSize int64 if task.TypedParams != nil { + // Calculate storage impact for this task type + // Volume size is not currently used for Balance/Vacuum impact and is not stored in MaintenanceTask + sourceImpact, targetImpact := topology.CalculateTaskStorageImpact(topology.TaskType(string(taskType)), 0) + // Use unified sources and targets from TaskParams for _, src := range task.TypedParams.Sources { sources = append(sources, topology.TaskSource{ - SourceServer: src.Node, - SourceDisk: src.DiskId, + SourceServer: src.Node, + SourceDisk: src.DiskId, + StorageChange: sourceImpact, }) // Sum estimated size from all sources estimatedSize += int64(src.EstimatedSize) } for _, target := range task.TypedParams.Targets { destinations = append(destinations, topology.TaskDestination{ - TargetServer: target.Node, - TargetDisk: target.DiskId, + TargetServer: target.Node, + TargetDisk: target.DiskId, + StorageChange: targetImpact, }) } diff --git a/weed/admin/maintenance/maintenance_manager.go b/weed/admin/maintenance/maintenance_manager.go index 9e0964188..1ca8346f6 100644 --- a/weed/admin/maintenance/maintenance_manager.go +++ b/weed/admin/maintenance/maintenance_manager.go @@ -603,6 +603,13 @@ func (mm *MaintenanceManager) CancelTask(taskID string) error { } } + // Notify ActiveTopology to release capacity + if mm.scanner != nil && mm.scanner.integration != nil { + if at := mm.scanner.integration.GetActiveTopology(); at != nil { + _ = at.CompleteTask(taskID) + } + } + glog.V(2).Infof("Cancelled task %s", taskID) return nil } diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index e0c388eb7..e4b354723 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -90,6 +90,11 @@ func (mq *MaintenanceQueue) LoadTasksFromPersistence() error { } } } + + // Sync task with ActiveTopology for capacity tracking + if mq.integration != nil { + mq.integration.SyncTask(task) + } } // Sort pending tasks by priority and schedule time @@ -134,7 +139,9 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { return } - task.ID = generateTaskID() + if task.ID == "" { + task.ID = generateTaskID() + } task.Status = TaskStatusPending task.CreatedAt = time.Now() task.MaxRetries = 3 // Default retry count @@ -200,6 +207,7 @@ func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) } task := &MaintenanceTask{ + ID: result.TaskID, Type: result.TaskType, Priority: result.Priority, VolumeID: result.VolumeID, @@ -311,6 +319,24 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena selectedTask.WorkerID = workerID selectedTask.StartedAt = &now + // Notify ActiveTopology to reserve capacity (move from pending to assigned) + if mq.integration != nil { + if at := mq.integration.GetActiveTopology(); at != nil { + if err := at.AssignTask(selectedTask.ID); err != nil { + glog.Warningf("Failed to update ActiveTopology for task assignment %s: %v. Rolling back assignment.", selectedTask.ID, err) + // Rollback assignment in MaintenanceQueue + selectedTask.Status = TaskStatusPending + selectedTask.WorkerID = "" + selectedTask.StartedAt = nil + if len(selectedTask.AssignmentHistory) > 0 { + selectedTask.AssignmentHistory = selectedTask.AssignmentHistory[:len(selectedTask.AssignmentHistory)-1] + } + // Return nil so the task is not removed from pendingTasks and not returned to the worker + return nil + } + } + } + // Remove from pending tasks mq.pendingTasks = append(mq.pendingTasks[:selectedIndex], mq.pendingTasks[selectedIndex+1:]...) @@ -342,6 +368,17 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { return } + // Notify ActiveTopology to release capacity (move from assigned to recent) + // We do this for both success and failure cases to release the capacity + if mq.integration != nil { + if at := mq.integration.GetActiveTopology(); at != nil { + if task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress { + // Ignore error as task might not be in ActiveTopology (e.g. after restart) + _ = at.CompleteTask(taskID) + } + } + } + completedTime := time.Now() task.CompletedAt = &completedTime @@ -377,6 +414,12 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { task.ScheduledAt = time.Now().Add(15 * time.Minute) // Retry delay mq.pendingTasks = append(mq.pendingTasks, task) + + // Resync with ActiveTopology (re-add as pending) + if mq.integration != nil { + mq.integration.SyncTask(task) + } + // Save task state after retry setup mq.saveTaskState(task) glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s", @@ -703,6 +746,13 @@ func (mq *MaintenanceQueue) RemoveStaleWorkers(timeout time.Duration) int { task.Error = "Worker became unavailable" completedTime := time.Now() task.CompletedAt = &completedTime + + // Notify ActiveTopology to release capacity + if mq.integration != nil { + if at := mq.integration.GetActiveTopology(); at != nil { + _ = at.CompleteTask(task.ID) + } + } } } diff --git a/weed/admin/maintenance/maintenance_queue_test.go b/weed/admin/maintenance/maintenance_queue_test.go index 2c38471a0..27010de96 100644 --- a/weed/admin/maintenance/maintenance_queue_test.go +++ b/weed/admin/maintenance/maintenance_queue_test.go @@ -2,7 +2,10 @@ package maintenance import ( "testing" + "time" + "github.com/seaweedfs/seaweedfs/weed/admin/topology" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" ) @@ -351,3 +354,618 @@ func TestCanScheduleTaskNow_WithPolicy(t *testing.T) { t.Errorf("Expected canScheduleTaskNow to return false when at policy limit, got true") } } + +func TestMaintenanceQueue_TaskIDPreservation(t *testing.T) { + // Setup Policy + policy := &MaintenancePolicy{ + TaskPolicies: make(map[string]*worker_pb.TaskPolicy), + GlobalMaxConcurrent: 10, + } + + // Setup Queue and Integration + mq := NewMaintenanceQueue(policy) + // We handle the integration manually to avoid complex setup + // integration := NewMaintenanceIntegration(mq, policy) + // mq.SetIntegration(integration) + + // 2. Verify ID Preservation in AddTasksFromResults + originalID := "ec_task_123" + results := []*TaskDetectionResult{ + { + TaskID: originalID, + TaskType: MaintenanceTaskType("erasure_coding"), + VolumeID: 100, + Server: "server1", + Priority: PriorityNormal, + TypedParams: &worker_pb.TaskParams{}, + }, + } + + mq.AddTasksFromResults(results) + + // Verify task exists with correct ID + queuedTask, exists := mq.tasks[originalID] + if !exists { + t.Errorf("Task with original ID %s not found in queue", originalID) + } else { + if queuedTask.ID != originalID { + t.Errorf("Task ID mismatch: expected %s, got %s", originalID, queuedTask.ID) + } + } + + // 3. Verify AddTask preserves ID + manualTask := &MaintenanceTask{ + ID: "manual_id_456", + Type: MaintenanceTaskType("vacuum"), + Status: TaskStatusPending, + } + mq.AddTask(manualTask) + + if manualTask.ID != "manual_id_456" { + t.Errorf("AddTask overwrote ID: expected manual_id_456, got %s", manualTask.ID) + } +} + +func TestMaintenanceQueue_ActiveTopologySync(t *testing.T) { + // Setup Policy + policy := &MaintenancePolicy{ + TaskPolicies: map[string]*worker_pb.TaskPolicy{ + "balance": {MaxConcurrent: 1}, + }, + GlobalMaxConcurrent: 10, + } + + // Setup Queue and Integration + mq := NewMaintenanceQueue(policy) + integration := NewMaintenanceIntegration(mq, policy) + mq.SetIntegration(integration) + + // 4. Verify ActiveTopology Synchronization (Assign and Complete) + // Get and Setup Topology + at := integration.GetActiveTopology() + if at == nil { + t.Fatalf("ActiveTopology not found in integration") + } + + topologyInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "server1", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + DiskId: 1, + VolumeCount: 1, + MaxVolumeCount: 10, + VolumeInfos: []*master_pb.VolumeInformationMessage{ + {Id: 100, Collection: "col1"}, + }, + }, + "hdd2": { + DiskId: 2, + VolumeCount: 0, + MaxVolumeCount: 10, + }, + }, + }, + }, + }, + }, + }, + }, + } + at.UpdateTopology(topologyInfo) + + // Add pending task to ActiveTopology + taskID := "sync_test_123" + err := at.AddPendingTask(topology.TaskSpec{ + TaskID: taskID, + TaskType: topology.TaskTypeBalance, + VolumeID: 100, + VolumeSize: 1024 * 1024, + Sources: []topology.TaskSourceSpec{ + {ServerID: "server1", DiskID: 1}, + }, + Destinations: []topology.TaskDestinationSpec{ + {ServerID: "server1", DiskID: 2}, + }, + }) + if err != nil { + t.Fatalf("Failed to add pending task to ActiveTopology: %v", err) + } + + // Add the same task to MaintenanceQueue + mq.AddTask(&MaintenanceTask{ + ID: taskID, + Type: MaintenanceTaskType("balance"), + VolumeID: 100, + Server: "server1", + Collection: "col1", + TypedParams: &worker_pb.TaskParams{ + TaskId: taskID, + Targets: []*worker_pb.TaskTarget{ + {Node: "server1", DiskId: 2}, + }, + }, + }) + + // Check initial available capacity on destination disk (server1:2) + // server1:2 has MaxVolumeCount=10, VolumeCount=0. + // Capacity should be 9 because AddPendingTask already reserved 1 slot. + capacityBefore := at.GetEffectiveAvailableCapacity("server1", 2) + if capacityBefore != 9 { + t.Errorf("Expected capacity 9 after AddPendingTask, got %d", capacityBefore) + } + + // 5. Verify AssignTask (via GetNextTask) + mq.workers["worker1"] = &MaintenanceWorker{ + ID: "worker1", + Status: "active", + Capabilities: []MaintenanceTaskType{"balance"}, + MaxConcurrent: 10, + } + + taskFound := mq.GetNextTask("worker1", []MaintenanceTaskType{"balance"}) + if taskFound == nil || taskFound.ID != taskID { + t.Fatalf("Expected to get task %s, got %+v", taskID, taskFound) + } + + // Capacity should still be 9 on destination disk (server1:2) + capacityAfterAssign := at.GetEffectiveAvailableCapacity("server1", 2) + if capacityAfterAssign != 9 { + t.Errorf("Capacity should still be 9 after assignment, got %d", capacityAfterAssign) + } + + // 6. Verify CompleteTask + mq.CompleteTask(taskID, "") + + // Capacity should be released back to 10 + capacityAfterComplete := at.GetEffectiveAvailableCapacity("server1", 2) + if capacityAfterComplete != 10 { + t.Errorf("Capacity should have returned to 10 after completion, got %d", capacityAfterComplete) + } +} + +func TestMaintenanceQueue_StaleWorkerCapacityRelease(t *testing.T) { + // Setup + policy := &MaintenancePolicy{ + TaskPolicies: map[string]*worker_pb.TaskPolicy{ + "balance": {MaxConcurrent: 1}, + }, + } + mq := NewMaintenanceQueue(policy) + integration := NewMaintenanceIntegration(mq, policy) + mq.SetIntegration(integration) + at := integration.GetActiveTopology() + + topologyInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "server1", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd1": {DiskId: 1, VolumeCount: 1, MaxVolumeCount: 10}, + "hdd2": {DiskId: 2, VolumeCount: 0, MaxVolumeCount: 10}, + }, + }, + }, + }, + }, + }, + }, + } + at.UpdateTopology(topologyInfo) + + taskID := "stale_test_123" + at.AddPendingTask(topology.TaskSpec{ + TaskID: taskID, + TaskType: topology.TaskTypeBalance, + VolumeID: 100, + VolumeSize: 1024, + Sources: []topology.TaskSourceSpec{{ServerID: "server1", DiskID: 1}}, + Destinations: []topology.TaskDestinationSpec{{ServerID: "server1", DiskID: 2}}, + }) + + mq.AddTask(&MaintenanceTask{ + ID: taskID, + Type: "balance", + VolumeID: 100, + Server: "server1", + TypedParams: &worker_pb.TaskParams{ + TaskId: taskID, + Targets: []*worker_pb.TaskTarget{{Node: "server1", DiskId: 2}}, + }, + }) + + mq.workers["worker1"] = &MaintenanceWorker{ + ID: "worker1", + Status: "active", + Capabilities: []MaintenanceTaskType{"balance"}, + MaxConcurrent: 1, + LastHeartbeat: time.Now(), + } + + // Assign task + mq.GetNextTask("worker1", []MaintenanceTaskType{"balance"}) + + // Verify capacity reserved (9 left) + if at.GetEffectiveAvailableCapacity("server1", 2) != 9 { + t.Errorf("Expected capacity 9, got %d", at.GetEffectiveAvailableCapacity("server1", 2)) + } + + // Make worker stale + mq.workers["worker1"].LastHeartbeat = time.Now().Add(-1 * time.Hour) + + // Remove stale workers + mq.RemoveStaleWorkers(10 * time.Minute) + + // Verify capacity released (back to 10) + if at.GetEffectiveAvailableCapacity("server1", 2) != 10 { + t.Errorf("Expected capacity 10 after removing stale worker, got %d", at.GetEffectiveAvailableCapacity("server1", 2)) + } +} + +func TestMaintenanceManager_CancelTaskCapacityRelease(t *testing.T) { + // Setup Manager + config := DefaultMaintenanceConfig() + mm := NewMaintenanceManager(nil, config) + integration := mm.scanner.integration + mq := mm.queue + at := integration.GetActiveTopology() + + topologyInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "server1", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd1": {DiskId: 1, VolumeCount: 1, MaxVolumeCount: 10}, + "hdd2": {DiskId: 2, VolumeCount: 0, MaxVolumeCount: 10}, + }, + }, + }, + }, + }, + }, + }, + } + at.UpdateTopology(topologyInfo) + + taskID := "cancel_test_123" + // Note: AddPendingTask reserves capacity + at.AddPendingTask(topology.TaskSpec{ + TaskID: taskID, + TaskType: topology.TaskTypeBalance, + VolumeID: 100, + VolumeSize: 1024, + Sources: []topology.TaskSourceSpec{{ServerID: "server1", DiskID: 1}}, + Destinations: []topology.TaskDestinationSpec{{ServerID: "server1", DiskID: 2}}, + }) + + mq.AddTask(&MaintenanceTask{ + ID: taskID, + Type: "balance", + VolumeID: 100, + Server: "server1", + TypedParams: &worker_pb.TaskParams{ + TaskId: taskID, + Targets: []*worker_pb.TaskTarget{{Node: "server1", DiskId: 2}}, + }, + }) + + // Verify capacity reserved (9 left) + if at.GetEffectiveAvailableCapacity("server1", 2) != 9 { + t.Errorf("Expected capacity 9, got %d", at.GetEffectiveAvailableCapacity("server1", 2)) + } + + // Cancel task + err := mm.CancelTask(taskID) + if err != nil { + t.Fatalf("Failed to cancel task: %v", err) + } + + // Verify capacity released (back to 10) + if at.GetEffectiveAvailableCapacity("server1", 2) != 10 { + t.Errorf("Expected capacity 10 after cancelling task, got %d", at.GetEffectiveAvailableCapacity("server1", 2)) + } +} + +type MockPersistence struct { + tasks []*MaintenanceTask +} + +func (m *MockPersistence) SaveTaskState(task *MaintenanceTask) error { return nil } +func (m *MockPersistence) LoadTaskState(taskID string) (*MaintenanceTask, error) { return nil, nil } +func (m *MockPersistence) LoadAllTaskStates() ([]*MaintenanceTask, error) { return m.tasks, nil } +func (m *MockPersistence) DeleteTaskState(taskID string) error { return nil } +func (m *MockPersistence) CleanupCompletedTasks() error { return nil } +func (m *MockPersistence) SaveTaskPolicy(taskType string, policy *TaskPolicy) error { return nil } + +func TestMaintenanceQueue_LoadTasksCapacitySync(t *testing.T) { + // Setup + policy := &MaintenancePolicy{ + TaskPolicies: map[string]*worker_pb.TaskPolicy{ + "balance": {MaxConcurrent: 1}, + }, + } + mq := NewMaintenanceQueue(policy) + integration := NewMaintenanceIntegration(mq, policy) + mq.SetIntegration(integration) + at := integration.GetActiveTopology() + + topologyInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "server1", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd1": {DiskId: 1, VolumeCount: 1, MaxVolumeCount: 10}, + "hdd2": {DiskId: 2, VolumeCount: 0, MaxVolumeCount: 10}, + }, + }, + }, + }, + }, + }, + }, + } + at.UpdateTopology(topologyInfo) + + // Setup mock persistence with a pending task + taskID := "load_test_123" + mockTask := &MaintenanceTask{ + ID: taskID, + Type: "balance", + Status: TaskStatusPending, + TypedParams: &worker_pb.TaskParams{ + TaskId: taskID, + Sources: []*worker_pb.TaskSource{{Node: "server1", DiskId: 1}}, + Targets: []*worker_pb.TaskTarget{{Node: "server1", DiskId: 2}}, + }, + } + mq.SetPersistence(&MockPersistence{tasks: []*MaintenanceTask{mockTask}}) + + // Load tasks + err := mq.LoadTasksFromPersistence() + if err != nil { + t.Fatalf("Failed to load tasks: %v", err) + } + + // Verify capacity is reserved in ActiveTopology after loading (9 left) + if at.GetEffectiveAvailableCapacity("server1", 2) != 9 { + t.Errorf("Expected capacity 9 after loading tasks, got %d", at.GetEffectiveAvailableCapacity("server1", 2)) + } +} + +func TestMaintenanceQueue_RetryCapacitySync(t *testing.T) { + // Setup + policy := &MaintenancePolicy{ + TaskPolicies: map[string]*worker_pb.TaskPolicy{ + "balance": {MaxConcurrent: 1}, + }, + } + mq := NewMaintenanceQueue(policy) + integration := NewMaintenanceIntegration(mq, policy) + mq.SetIntegration(integration) + at := integration.GetActiveTopology() + + topologyInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "server1", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd1": {DiskId: 1, VolumeCount: 1, MaxVolumeCount: 10}, + "hdd2": {DiskId: 2, VolumeCount: 0, MaxVolumeCount: 10}, + }, + }, + }, + }, + }, + }, + }, + } + at.UpdateTopology(topologyInfo) + + taskID := "retry_test_123" + // 1. Add task + at.AddPendingTask(topology.TaskSpec{ + TaskID: taskID, + TaskType: topology.TaskTypeBalance, + VolumeID: 100, + VolumeSize: 1024, + Sources: []topology.TaskSourceSpec{{ServerID: "server1", DiskID: 1}}, + Destinations: []topology.TaskDestinationSpec{{ServerID: "server1", DiskID: 2}}, + }) + + mq.AddTask(&MaintenanceTask{ + ID: taskID, + Type: "balance", + VolumeID: 100, + Server: "server1", + MaxRetries: 3, + TypedParams: &worker_pb.TaskParams{ + TaskId: taskID, + Sources: []*worker_pb.TaskSource{{Node: "server1", DiskId: 1}}, + Targets: []*worker_pb.TaskTarget{{Node: "server1", DiskId: 2}}, + }, + }) + + mq.workers["worker1"] = &MaintenanceWorker{ + ID: "worker1", + Status: "active", + Capabilities: []MaintenanceTaskType{"balance"}, + MaxConcurrent: 1, + LastHeartbeat: time.Now(), + } + + // 2. Assign task + mq.GetNextTask("worker1", []MaintenanceTaskType{"balance"}) + + // Verify capacity reserved (9 left) + if at.GetEffectiveAvailableCapacity("server1", 2) != 9 { + t.Errorf("Initial assignment: Expected capacity 9, got %d", at.GetEffectiveAvailableCapacity("server1", 2)) + } + + // 3. Complete with error (trigger retry) + mq.CompleteTask(taskID, "simulated failure") + + // 4. Verify state after failure + task := mq.tasks[taskID] + if task.Status != TaskStatusPending { + t.Errorf("Expected status pending for retry, got %v", task.Status) + } + if task.RetryCount != 1 { + t.Errorf("Expected retry count 1, got %d", task.RetryCount) + } + + // 5. Verify capacity in ActiveTopology + // It should first release (back to 10) and then re-reserve (SyncTask) because it's pending again. + // So it should still be 9. + if at.GetEffectiveAvailableCapacity("server1", 2) != 9 { + t.Errorf("After retry sync: Expected capacity 9, got %d", at.GetEffectiveAvailableCapacity("server1", 2)) + } +} + +func TestMaintenanceQueue_AssignTaskRollback(t *testing.T) { + // Setup Policy + policy := &MaintenancePolicy{ + TaskPolicies: map[string]*worker_pb.TaskPolicy{ + "balance": {MaxConcurrent: 1}, + }, + GlobalMaxConcurrent: 10, + } + + // Setup Queue and Integration + mq := NewMaintenanceQueue(policy) + integration := NewMaintenanceIntegration(mq, policy) + mq.SetIntegration(integration) + + // Get Topology + at := integration.GetActiveTopology() + topologyInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "server1", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + DiskId: 1, + VolumeCount: 1, + MaxVolumeCount: 1, // Only 1 slot + VolumeInfos: []*master_pb.VolumeInformationMessage{ + {Id: 100, Collection: "col1"}, + }, + }, + "hdd2": { + DiskId: 2, + VolumeCount: 0, + MaxVolumeCount: 0, // NO CAPACITY for target + }, + }, + }, + }, + }, + }, + }, + }, + } + at.UpdateTopology(topologyInfo) + + taskID := "rollback_test_123" + + // 1. Add task to MaintenanceQueue ONLY + // It's not in ActiveTopology, so AssignTask will fail with "pending task not found" + mq.AddTask(&MaintenanceTask{ + ID: taskID, + Type: MaintenanceTaskType("balance"), + VolumeID: 100, + Server: "server1", + Collection: "col1", + TypedParams: &worker_pb.TaskParams{ + TaskId: taskID, + Targets: []*worker_pb.TaskTarget{ + {Node: "server1", DiskId: 2}, + }, + }, + }) + + // 2. Setup worker + mq.workers["worker1"] = &MaintenanceWorker{ + ID: "worker1", + Status: "active", + Capabilities: []MaintenanceTaskType{"balance"}, + MaxConcurrent: 10, + } + + // 3. Try to get next task + taskFound := mq.GetNextTask("worker1", []MaintenanceTaskType{"balance"}) + + // 4. Verify GetNextTask returned nil due to ActiveTopology.AssignTask failure + if taskFound != nil { + t.Errorf("Expected GetNextTask to return nil, got task %s", taskFound.ID) + } + + // 5. Verify the task in MaintenanceQueue is rolled back to pending + mq.mutex.RLock() + task, exists := mq.tasks[taskID] + mq.mutex.RUnlock() + + if !exists { + t.Fatalf("Task %s should still exist in MaintenanceQueue", taskID) + } + if task.Status != TaskStatusPending { + t.Errorf("Expected task status %v, got %v", TaskStatusPending, task.Status) + } + if task.WorkerID != "" { + t.Errorf("Expected task WorkerID to be empty, got %s", task.WorkerID) + } + if len(task.AssignmentHistory) != 0 { + t.Errorf("Expected assignment history to be empty, got %d records", len(task.AssignmentHistory)) + } + + // 6. Verify the task is still in pendingTasks slice + mq.mutex.RLock() + foundInPending := false + for _, pt := range mq.pendingTasks { + if pt.ID == taskID { + foundInPending = true + break + } + } + mq.mutex.RUnlock() + + if !foundInPending { + t.Errorf("Task %s should still be in pendingTasks slice", taskID) + } +} diff --git a/weed/admin/maintenance/maintenance_types.go b/weed/admin/maintenance/maintenance_types.go index 9970e12f1..31c797e50 100644 --- a/weed/admin/maintenance/maintenance_types.go +++ b/weed/admin/maintenance/maintenance_types.go @@ -351,6 +351,7 @@ type MaintenanceScanner struct { // TaskDetectionResult represents the result of scanning for maintenance needs type TaskDetectionResult struct { + TaskID string `json:"task_id"` TaskType MaintenanceTaskType `json:"task_type"` VolumeID uint32 `json:"volume_id,omitempty"` Server string `json:"server,omitempty"` diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go index f56694a37..e304df977 100644 --- a/weed/admin/topology/task_management.go +++ b/weed/admin/topology/task_management.go @@ -66,11 +66,17 @@ func (at *ActiveTopology) CompleteTask(taskID string) error { task, exists := at.assignedTasks[taskID] if !exists { - return fmt.Errorf("assigned task %s not found", taskID) + // If not in assigned tasks, check pending tasks + if task, exists = at.pendingTasks[taskID]; exists { + delete(at.pendingTasks, taskID) + } else { + return fmt.Errorf("task %s not found in assigned or pending tasks", taskID) + } + } else { + delete(at.assignedTasks, taskID) } // Release reserved capacity by moving task to completed state - delete(at.assignedTasks, taskID) task.Status = TaskStatusCompleted task.CompletedAt = time.Now() at.recentTasks[taskID] = task