diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index 0a2322160..32a3abbb4 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -131,6 +131,14 @@ func (mq *MaintenanceQueue) cleanupCompletedTasks() { func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { mq.mutex.Lock() + // Enforce one queued/active task per volume (across all task types). + if mq.hasQueuedOrActiveTaskForVolume(task.VolumeID) { + mq.mutex.Unlock() + glog.V(1).Infof("Task skipped (volume busy): %s for volume %d on %s (already queued or running)", + task.Type, task.VolumeID, task.Server) + return + } + // Check for duplicate tasks (same type + volume + not completed) if mq.hasDuplicateTask(task) { mq.mutex.Unlock() @@ -188,6 +196,25 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { taskSnapshot.ID, taskSnapshot.Type, taskSnapshot.VolumeID, taskSnapshot.Server, taskSnapshot.Priority, scheduleInfo, taskSnapshot.Reason) } +// hasQueuedOrActiveTaskForVolume checks if any pending/assigned/in-progress task already exists for this volume. +// Caller must hold mq.mutex. +func (mq *MaintenanceQueue) hasQueuedOrActiveTaskForVolume(volumeID uint32) bool { + if volumeID == 0 { + return false + } + for _, existingTask := range mq.tasks { + if existingTask.VolumeID != volumeID { + continue + } + if existingTask.Status == TaskStatusPending || + existingTask.Status == TaskStatusAssigned || + existingTask.Status == TaskStatusInProgress { + return true + } + } + return false +} + // hasDuplicateTask checks if a similar task already exists (same type, volume, and not completed) func (mq *MaintenanceQueue) hasDuplicateTask(newTask *MaintenanceTask) bool { for _, existingTask := range mq.tasks { @@ -260,6 +287,13 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena continue } + // Avoid scheduling concurrent operations on the same volume + if activeTaskID, activeTaskType, hasActive := mq.activeTaskForVolume(task.VolumeID, task.ID); hasActive { + glog.V(2).Infof("Task %s (%s) skipped for worker %s: volume %d is busy with task %s (%s)", + task.ID, task.Type, workerID, task.VolumeID, activeTaskID, activeTaskType) + continue + } + // Check if worker can handle this task type if !mq.workerCanHandle(task.Type, capabilities) { glog.V(3).Infof("Task %s (%s) skipped for worker %s: capability mismatch (worker has: %v)", task.ID, task.Type, workerID, capabilities) @@ -304,6 +338,14 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena return nil } + // Re-check volume conflict after acquiring write lock + if activeTaskID, activeTaskType, hasActive := mq.activeTaskForVolume(selectedTask.VolumeID, selectedTaskID); hasActive { + mq.mutex.Unlock() + glog.V(2).Infof("Task %s no longer available for worker %s: volume %d is busy with task %s (%s)", + selectedTaskID, workerID, selectedTask.VolumeID, activeTaskID, activeTaskType) + return nil + } + // Record assignment history workerAddress := "" if worker, exists := mq.workers[workerID]; exists { @@ -877,6 +919,28 @@ func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabi return false } +// activeTaskForVolume returns the active task ID/type for a volume, if any. +// Caller must hold mq.mutex (read or write). +func (mq *MaintenanceQueue) activeTaskForVolume(volumeID uint32, excludeTaskID string) (string, MaintenanceTaskType, bool) { + if volumeID == 0 { + return "", "", false + } + + for _, task := range mq.tasks { + if task.ID == excludeTaskID { + continue + } + if task.VolumeID != volumeID { + continue + } + if task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress { + return task.ID, task.Type, true + } + } + + return "", "", false +} + // canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool { glog.V(2).Infof("Checking if task %s (type: %s) can be scheduled", task.ID, task.Type) diff --git a/weed/admin/maintenance/maintenance_queue_test.go b/weed/admin/maintenance/maintenance_queue_test.go index 27010de96..959addf3f 100644 --- a/weed/admin/maintenance/maintenance_queue_test.go +++ b/weed/admin/maintenance/maintenance_queue_test.go @@ -969,3 +969,154 @@ func TestMaintenanceQueue_AssignTaskRollback(t *testing.T) { t.Errorf("Task %s should still be in pendingTasks slice", taskID) } } + +func TestGetNextTask_SkipsVolumeConflictsAcrossTypes(t *testing.T) { + policy := &MaintenancePolicy{ + TaskPolicies: map[string]*worker_pb.TaskPolicy{ + "balance": {MaxConcurrent: 2}, + "erasure_coding": {MaxConcurrent: 2}, + "vacuum": {MaxConcurrent: 2}, + }, + } + + mq := NewMaintenanceQueue(policy) + + now := time.Now() + mq.AddTask(&MaintenanceTask{ + ID: "t1", + Type: MaintenanceTaskType("balance"), + Priority: PriorityHigh, + VolumeID: 100, + Server: "server1", + ScheduledAt: now.Add(-3 * time.Second), + }) + t2 := &MaintenanceTask{ + ID: "t2", + Type: MaintenanceTaskType("erasure_coding"), + Priority: PriorityNormal, + VolumeID: 100, + Server: "server1", + Status: TaskStatusPending, + ScheduledAt: now.Add(-2 * time.Second), + } + mq.mutex.Lock() + mq.tasks[t2.ID] = t2 + mq.pendingTasks = append(mq.pendingTasks, t2) + mq.mutex.Unlock() + mq.AddTask(&MaintenanceTask{ + ID: "t3", + Type: MaintenanceTaskType("vacuum"), + Priority: PriorityNormal, + VolumeID: 200, + Server: "server1", + ScheduledAt: now.Add(-1 * time.Second), + }) + + mq.workers["worker1"] = &MaintenanceWorker{ + ID: "worker1", + Status: "active", + Capabilities: []MaintenanceTaskType{"balance", "erasure_coding", "vacuum"}, + MaxConcurrent: 2, + LastHeartbeat: time.Now(), + } + mq.workers["worker2"] = &MaintenanceWorker{ + ID: "worker2", + Status: "active", + Capabilities: []MaintenanceTaskType{"balance", "erasure_coding", "vacuum"}, + MaxConcurrent: 2, + LastHeartbeat: time.Now(), + } + + task1 := mq.GetNextTask("worker1", mq.workers["worker1"].Capabilities) + if task1 == nil || task1.ID != "t1" { + t.Fatalf("Expected first assignment to be t1, got %+v", task1) + } + + task2 := mq.GetNextTask("worker2", mq.workers["worker2"].Capabilities) + if task2 == nil { + t.Fatalf("Expected a second task to be assigned, got nil") + } + if task2.ID != "t3" { + t.Fatalf("Expected second assignment to skip volume 100 and pick t3, got %s", task2.ID) + } + + if mq.tasks["t2"].Status != TaskStatusPending { + t.Fatalf("Expected t2 to remain pending due to volume conflict, got %s", mq.tasks["t2"].Status) + } +} + +func TestAddTask_OnePendingTaskPerVolume(t *testing.T) { + mq := NewMaintenanceQueue(&MaintenancePolicy{ + TaskPolicies: map[string]*worker_pb.TaskPolicy{ + "balance": {MaxConcurrent: 1}, + "erasure_coding": {MaxConcurrent: 1}, + }, + }) + + mq.AddTask(&MaintenanceTask{ + ID: "t1", + Type: MaintenanceTaskType("balance"), + VolumeID: 100, + Server: "server1", + }) + mq.AddTask(&MaintenanceTask{ + ID: "t2", + Type: MaintenanceTaskType("erasure_coding"), + VolumeID: 100, + Server: "server1", + }) + + mq.mutex.RLock() + defer mq.mutex.RUnlock() + + if len(mq.tasks) != 1 { + t.Fatalf("Expected 1 task in queue, got %d", len(mq.tasks)) + } + if len(mq.pendingTasks) != 1 { + t.Fatalf("Expected 1 pending task, got %d", len(mq.pendingTasks)) + } + if _, exists := mq.tasks["t1"]; !exists { + t.Fatalf("Expected task t1 to be queued") + } + if _, exists := mq.tasks["t2"]; exists { + t.Fatalf("Did not expect task t2 to be queued due to pending volume") + } +} + +func TestAddTask_RejectsWhenVolumeHasRunningTask(t *testing.T) { + mq := NewMaintenanceQueue(&MaintenancePolicy{ + TaskPolicies: map[string]*worker_pb.TaskPolicy{ + "balance": {MaxConcurrent: 1}, + "erasure_coding": {MaxConcurrent: 1}, + }, + }) + + mq.AddTask(&MaintenanceTask{ + ID: "t1", + Type: MaintenanceTaskType("balance"), + VolumeID: 100, + Server: "server1", + }) + + // Simulate assignment to make it active + mq.mutex.Lock() + mq.tasks["t1"].Status = TaskStatusInProgress + mq.mutex.Unlock() + + mq.AddTask(&MaintenanceTask{ + ID: "t2", + Type: MaintenanceTaskType("erasure_coding"), + VolumeID: 100, + Server: "server1", + }) + + mq.mutex.RLock() + defer mq.mutex.RUnlock() + + if len(mq.tasks) != 1 { + t.Fatalf("Expected 1 task in queue, got %d", len(mq.tasks)) + } + if _, exists := mq.tasks["t2"]; exists { + t.Fatalf("Did not expect task t2 to be queued due to active volume task") + } +}