package maintenance import ( "testing" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" ) // Test suite for canScheduleTaskNow() function and related scheduling logic // // This test suite ensures that: // 1. The fallback scheduling logic works correctly when no integration is present // 2. Task concurrency limits are properly enforced per task type // 3. Different task types don't interfere with each other's concurrency limits // 4. Custom policies with higher concurrency limits work correctly // 5. Edge cases (nil tasks, empty task types) are handled gracefully // 6. Helper functions (GetRunningTaskCount, canExecuteTaskType, etc.) work correctly // // Background: The canScheduleTaskNow() function is critical for task assignment. // It was previously failing due to an overly restrictive integration scheduler, // so we implemented a temporary fix that bypasses the integration and uses // fallback logic based on simple concurrency limits per task type. func TestCanScheduleTaskNow_FallbackLogic(t *testing.T) { // Test the current implementation which uses fallback logic mq := &MaintenanceQueue{ tasks: make(map[string]*MaintenanceTask), pendingTasks: []*MaintenanceTask{}, workers: make(map[string]*MaintenanceWorker), policy: nil, // No policy for default behavior integration: nil, // No integration to force fallback } task := &MaintenanceTask{ ID: "test-task-1", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusPending, } // Should return true with fallback logic (no running tasks, default max concurrent = 1) result := mq.canScheduleTaskNow(task) if !result { t.Errorf("Expected canScheduleTaskNow to return true with fallback logic, got false") } } func TestCanScheduleTaskNow_FallbackWithRunningTasks(t *testing.T) { // Test fallback logic when there are already running tasks mq := &MaintenanceQueue{ tasks: map[string]*MaintenanceTask{ "running-task": { ID: "running-task", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusInProgress, }, }, pendingTasks: []*MaintenanceTask{}, workers: make(map[string]*MaintenanceWorker), policy: nil, integration: nil, } task := &MaintenanceTask{ ID: "test-task-2", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusPending, } // Should return false because max concurrent is 1 and we have 1 running task result := mq.canScheduleTaskNow(task) if result { t.Errorf("Expected canScheduleTaskNow to return false when at capacity, got true") } } func TestCanScheduleTaskNow_DifferentTaskTypes(t *testing.T) { // Test that different task types don't interfere with each other mq := &MaintenanceQueue{ tasks: map[string]*MaintenanceTask{ "running-ec-task": { ID: "running-ec-task", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusInProgress, }, }, pendingTasks: []*MaintenanceTask{}, workers: make(map[string]*MaintenanceWorker), policy: nil, integration: nil, } // Test vacuum task when EC task is running vacuumTask := &MaintenanceTask{ ID: "vacuum-task", Type: MaintenanceTaskType("vacuum"), Status: TaskStatusPending, } // Should return true because vacuum and erasure_coding are different task types result := mq.canScheduleTaskNow(vacuumTask) if !result { t.Errorf("Expected canScheduleTaskNow to return true for different task type, got false") } // Test another EC task when one is already running ecTask := &MaintenanceTask{ ID: "ec-task", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusPending, } // Should return false because max concurrent for EC is 1 and we have 1 running result = mq.canScheduleTaskNow(ecTask) if result { t.Errorf("Expected canScheduleTaskNow to return false for same task type at capacity, got true") } } func TestCanScheduleTaskNow_WithIntegration(t *testing.T) { // Test with a real MaintenanceIntegration (will use fallback logic in current implementation) policy := &MaintenancePolicy{ TaskPolicies: make(map[string]*worker_pb.TaskPolicy), GlobalMaxConcurrent: 10, DefaultRepeatIntervalSeconds: 24 * 60 * 60, // 24 hours in seconds DefaultCheckIntervalSeconds: 60 * 60, // 1 hour in seconds } mq := NewMaintenanceQueue(policy) // Create a basic integration (this would normally be more complex) integration := NewMaintenanceIntegration(mq, policy) mq.SetIntegration(integration) task := &MaintenanceTask{ ID: "test-task-3", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusPending, } // With our current implementation (fallback logic), this should return true result := mq.canScheduleTaskNow(task) if !result { t.Errorf("Expected canScheduleTaskNow to return true with fallback logic, got false") } } func TestGetRunningTaskCount(t *testing.T) { // Test the helper function used by fallback logic mq := &MaintenanceQueue{ tasks: map[string]*MaintenanceTask{ "task1": { ID: "task1", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusInProgress, }, "task2": { ID: "task2", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusAssigned, }, "task3": { ID: "task3", Type: MaintenanceTaskType("vacuum"), Status: TaskStatusInProgress, }, "task4": { ID: "task4", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusCompleted, }, }, pendingTasks: []*MaintenanceTask{}, workers: make(map[string]*MaintenanceWorker), } // Should count 2 running EC tasks (in_progress + assigned) ecCount := mq.GetRunningTaskCount(MaintenanceTaskType("erasure_coding")) if ecCount != 2 { t.Errorf("Expected 2 running EC tasks, got %d", ecCount) } // Should count 1 running vacuum task vacuumCount := mq.GetRunningTaskCount(MaintenanceTaskType("vacuum")) if vacuumCount != 1 { t.Errorf("Expected 1 running vacuum task, got %d", vacuumCount) } // Should count 0 running balance tasks balanceCount := mq.GetRunningTaskCount(MaintenanceTaskType("balance")) if balanceCount != 0 { t.Errorf("Expected 0 running balance tasks, got %d", balanceCount) } } func TestCanExecuteTaskType(t *testing.T) { // Test the fallback logic helper function mq := &MaintenanceQueue{ tasks: map[string]*MaintenanceTask{ "running-task": { ID: "running-task", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusInProgress, }, }, pendingTasks: []*MaintenanceTask{}, workers: make(map[string]*MaintenanceWorker), policy: nil, // Will use default max concurrent = 1 integration: nil, } // Should return false for EC (1 running, max = 1) result := mq.canExecuteTaskType(MaintenanceTaskType("erasure_coding")) if result { t.Errorf("Expected canExecuteTaskType to return false for EC at capacity, got true") } // Should return true for vacuum (0 running, max = 1) result = mq.canExecuteTaskType(MaintenanceTaskType("vacuum")) if !result { t.Errorf("Expected canExecuteTaskType to return true for vacuum, got false") } } func TestGetMaxConcurrentForTaskType_DefaultBehavior(t *testing.T) { // Test the default behavior when no policy or integration is set mq := &MaintenanceQueue{ tasks: make(map[string]*MaintenanceTask), pendingTasks: []*MaintenanceTask{}, workers: make(map[string]*MaintenanceWorker), policy: nil, integration: nil, } // Should return default value of 1 maxConcurrent := mq.getMaxConcurrentForTaskType(MaintenanceTaskType("erasure_coding")) if maxConcurrent != 1 { t.Errorf("Expected default max concurrent to be 1, got %d", maxConcurrent) } maxConcurrent = mq.getMaxConcurrentForTaskType(MaintenanceTaskType("vacuum")) if maxConcurrent != 1 { t.Errorf("Expected default max concurrent to be 1, got %d", maxConcurrent) } } // Test edge cases and error conditions func TestCanScheduleTaskNow_NilTask(t *testing.T) { mq := &MaintenanceQueue{ tasks: make(map[string]*MaintenanceTask), pendingTasks: []*MaintenanceTask{}, workers: make(map[string]*MaintenanceWorker), policy: nil, integration: nil, } // This should panic with a nil task, so we expect and catch the panic defer func() { if r := recover(); r == nil { t.Errorf("Expected canScheduleTaskNow to panic with nil task, but it didn't") } }() // This should panic mq.canScheduleTaskNow(nil) } func TestCanScheduleTaskNow_EmptyTaskType(t *testing.T) { mq := &MaintenanceQueue{ tasks: make(map[string]*MaintenanceTask), pendingTasks: []*MaintenanceTask{}, workers: make(map[string]*MaintenanceWorker), policy: nil, integration: nil, } task := &MaintenanceTask{ ID: "empty-type-task", Type: MaintenanceTaskType(""), // Empty task type Status: TaskStatusPending, } // Should handle empty task type gracefully result := mq.canScheduleTaskNow(task) if !result { t.Errorf("Expected canScheduleTaskNow to handle empty task type, got false") } } func TestCanScheduleTaskNow_WithPolicy(t *testing.T) { // Test with a policy that allows higher concurrency policy := &MaintenancePolicy{ TaskPolicies: map[string]*worker_pb.TaskPolicy{ string(MaintenanceTaskType("erasure_coding")): { Enabled: true, MaxConcurrent: 3, RepeatIntervalSeconds: 60 * 60, // 1 hour CheckIntervalSeconds: 60 * 60, // 1 hour }, string(MaintenanceTaskType("vacuum")): { Enabled: true, MaxConcurrent: 2, RepeatIntervalSeconds: 60 * 60, // 1 hour CheckIntervalSeconds: 60 * 60, // 1 hour }, }, GlobalMaxConcurrent: 10, DefaultRepeatIntervalSeconds: 24 * 60 * 60, // 24 hours in seconds DefaultCheckIntervalSeconds: 60 * 60, // 1 hour in seconds } mq := &MaintenanceQueue{ tasks: map[string]*MaintenanceTask{ "running-task-1": { ID: "running-task-1", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusInProgress, }, "running-task-2": { ID: "running-task-2", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusAssigned, }, }, pendingTasks: []*MaintenanceTask{}, workers: make(map[string]*MaintenanceWorker), policy: policy, integration: nil, } task := &MaintenanceTask{ ID: "test-task-policy", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusPending, } // Should return true because we have 2 running EC tasks but max is 3 result := mq.canScheduleTaskNow(task) if !result { t.Errorf("Expected canScheduleTaskNow to return true with policy allowing 3 concurrent, got false") } // Add one more running task to reach the limit mq.tasks["running-task-3"] = &MaintenanceTask{ ID: "running-task-3", Type: MaintenanceTaskType("erasure_coding"), Status: TaskStatusInProgress, } // Should return false because we now have 3 running EC tasks (at limit) result = mq.canScheduleTaskNow(task) if result { t.Errorf("Expected canScheduleTaskNow to return false when at policy limit, got true") } }