Browse Source

test canScheduleTaskNow

worker-execute-ec-tasks
chrislu 4 months ago
parent
commit
63817fe7ee
  1. 51
      weed/admin/maintenance/maintenance_queue.go
  2. 349
      weed/admin/maintenance/maintenance_queue_test.go

51
weed/admin/maintenance/maintenance_queue.go

@ -93,13 +93,20 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena
mq.mutex.Lock()
defer mq.mutex.Unlock()
glog.Infof("DEBUG GetNextTask: Worker %s requesting task with capabilities %v", workerID, capabilities)
glog.Infof("DEBUG GetNextTask: Total pending tasks: %d, Total workers: %d", len(mq.pendingTasks), len(mq.workers))
worker, exists := mq.workers[workerID]
if !exists {
glog.Infof("DEBUG GetNextTask: Worker %s not found in workers map", workerID)
return nil
}
glog.Infof("DEBUG GetNextTask: Worker %s found, CurrentLoad: %d, MaxConcurrent: %d", workerID, worker.CurrentLoad, worker.MaxConcurrent)
// Check if worker has capacity
if worker.CurrentLoad >= worker.MaxConcurrent {
glog.Infof("DEBUG GetNextTask: Worker %s at capacity", workerID)
return nil
}
@ -107,21 +114,28 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena
// Find the next suitable task
for i, task := range mq.pendingTasks {
glog.Infof("DEBUG GetNextTask: Evaluating task[%d] %s (type: %s, volume: %d)", i, task.ID, task.Type, task.VolumeID)
// Check if it's time to execute the task
if task.ScheduledAt.After(now) {
glog.Infof("DEBUG GetNextTask: Task %s scheduled for future (%v)", task.ID, task.ScheduledAt)
continue
}
// Check if worker can handle this task type
if !mq.workerCanHandle(task.Type, capabilities) {
glog.Infof("DEBUG GetNextTask: Worker %s cannot handle task type %s (capabilities: %v)", workerID, task.Type, capabilities)
continue
}
// Check scheduling logic - use simplified system if available, otherwise fallback
if !mq.canScheduleTaskNow(task) {
glog.Infof("DEBUG GetNextTask: Task %s cannot be scheduled now", task.ID)
continue
}
glog.Infof("DEBUG GetNextTask: Assigning task %s to worker %s", task.ID, workerID)
// Assign task to worker
task.Status = TaskStatusAssigned
task.WorkerID = workerID
@ -136,10 +150,11 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena
worker.CurrentLoad++
worker.Status = "busy"
glog.V(2).Infof("Assigned task %s to worker %s", task.ID, workerID)
glog.Infof("DEBUG GetNextTask: Successfully assigned task %s to worker %s", task.ID, workerID)
return task
}
glog.Infof("DEBUG GetNextTask: No suitable tasks found for worker %s", workerID)
return nil
}
@ -463,19 +478,31 @@ func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabi
// canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic
func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool {
// Try task scheduling logic first
if mq.integration != nil {
// Get all running tasks and available workers
runningTasks := mq.getRunningTasks()
availableWorkers := mq.getAvailableWorkers()
glog.Infof("DEBUG canScheduleTaskNow: Checking if task %s (type: %s) can be scheduled", task.ID, task.Type)
canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers)
glog.V(3).Infof("Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule)
return canSchedule
}
// TEMPORARY FIX: Skip integration task scheduler which is being overly restrictive
// Use fallback logic directly for now
glog.Infof("DEBUG canScheduleTaskNow: Using fallback logic (bypassing integration scheduler)")
canExecute := mq.canExecuteTaskType(task.Type)
glog.Infof("DEBUG canScheduleTaskNow: Fallback decision for task %s (%s): %v", task.ID, task.Type, canExecute)
return canExecute
// Fallback to hardcoded logic
return mq.canExecuteTaskType(task.Type)
// NOTE: Original integration code disabled temporarily
// Try task scheduling logic first
/*
if mq.integration != nil {
glog.Infof("DEBUG canScheduleTaskNow: Using integration task scheduler")
// Get all running tasks and available workers
runningTasks := mq.getRunningTasks()
availableWorkers := mq.getAvailableWorkers()
glog.Infof("DEBUG canScheduleTaskNow: Running tasks: %d, Available workers: %d", len(runningTasks), len(availableWorkers))
canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers)
glog.Infof("DEBUG canScheduleTaskNow: Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule)
return canSchedule
}
*/
}
// canExecuteTaskType checks if we can execute more tasks of this type (concurrency limits) - fallback logic

349
weed/admin/maintenance/maintenance_queue_test.go

@ -0,0 +1,349 @@
package maintenance
import (
"testing"
)
// Test suite for canScheduleTaskNow() function and related scheduling logic
//
// This test suite ensures that:
// 1. The fallback scheduling logic works correctly when no integration is present
// 2. Task concurrency limits are properly enforced per task type
// 3. Different task types don't interfere with each other's concurrency limits
// 4. Custom policies with higher concurrency limits work correctly
// 5. Edge cases (nil tasks, empty task types) are handled gracefully
// 6. Helper functions (GetRunningTaskCount, canExecuteTaskType, etc.) work correctly
//
// Background: The canScheduleTaskNow() function is critical for task assignment.
// It was previously failing due to an overly restrictive integration scheduler,
// so we implemented a temporary fix that bypasses the integration and uses
// fallback logic based on simple concurrency limits per task type.
func TestCanScheduleTaskNow_FallbackLogic(t *testing.T) {
// Test the current implementation which uses fallback logic
mq := &MaintenanceQueue{
tasks: make(map[string]*MaintenanceTask),
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil, // No policy for default behavior
integration: nil, // No integration to force fallback
}
task := &MaintenanceTask{
ID: "test-task-1",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusPending,
}
// Should return true with fallback logic (no running tasks, default max concurrent = 1)
result := mq.canScheduleTaskNow(task)
if !result {
t.Errorf("Expected canScheduleTaskNow to return true with fallback logic, got false")
}
}
func TestCanScheduleTaskNow_FallbackWithRunningTasks(t *testing.T) {
// Test fallback logic when there are already running tasks
mq := &MaintenanceQueue{
tasks: map[string]*MaintenanceTask{
"running-task": {
ID: "running-task",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusInProgress,
},
},
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil,
integration: nil,
}
task := &MaintenanceTask{
ID: "test-task-2",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusPending,
}
// Should return false because max concurrent is 1 and we have 1 running task
result := mq.canScheduleTaskNow(task)
if result {
t.Errorf("Expected canScheduleTaskNow to return false when at capacity, got true")
}
}
func TestCanScheduleTaskNow_DifferentTaskTypes(t *testing.T) {
// Test that different task types don't interfere with each other
mq := &MaintenanceQueue{
tasks: map[string]*MaintenanceTask{
"running-ec-task": {
ID: "running-ec-task",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusInProgress,
},
},
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil,
integration: nil,
}
// Test vacuum task when EC task is running
vacuumTask := &MaintenanceTask{
ID: "vacuum-task",
Type: MaintenanceTaskType("vacuum"),
Status: TaskStatusPending,
}
// Should return true because vacuum and erasure_coding are different task types
result := mq.canScheduleTaskNow(vacuumTask)
if !result {
t.Errorf("Expected canScheduleTaskNow to return true for different task type, got false")
}
// Test another EC task when one is already running
ecTask := &MaintenanceTask{
ID: "ec-task",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusPending,
}
// Should return false because max concurrent for EC is 1 and we have 1 running
result = mq.canScheduleTaskNow(ecTask)
if result {
t.Errorf("Expected canScheduleTaskNow to return false for same task type at capacity, got true")
}
}
func TestCanScheduleTaskNow_WithIntegration(t *testing.T) {
// Test with a real MaintenanceIntegration (will use fallback logic in current implementation)
policy := &MaintenancePolicy{
TaskPolicies: make(map[MaintenanceTaskType]*TaskPolicy),
GlobalMaxConcurrent: 10,
DefaultRepeatInterval: 24,
DefaultCheckInterval: 1,
}
mq := NewMaintenanceQueue(policy)
// Create a basic integration (this would normally be more complex)
integration := NewMaintenanceIntegration(mq, policy)
mq.SetIntegration(integration)
task := &MaintenanceTask{
ID: "test-task-3",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusPending,
}
// With our current implementation (fallback logic), this should return true
result := mq.canScheduleTaskNow(task)
if !result {
t.Errorf("Expected canScheduleTaskNow to return true with fallback logic, got false")
}
}
func TestGetRunningTaskCount(t *testing.T) {
// Test the helper function used by fallback logic
mq := &MaintenanceQueue{
tasks: map[string]*MaintenanceTask{
"task1": {
ID: "task1",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusInProgress,
},
"task2": {
ID: "task2",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusAssigned,
},
"task3": {
ID: "task3",
Type: MaintenanceTaskType("vacuum"),
Status: TaskStatusInProgress,
},
"task4": {
ID: "task4",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusCompleted,
},
},
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
}
// Should count 2 running EC tasks (in_progress + assigned)
ecCount := mq.GetRunningTaskCount(MaintenanceTaskType("erasure_coding"))
if ecCount != 2 {
t.Errorf("Expected 2 running EC tasks, got %d", ecCount)
}
// Should count 1 running vacuum task
vacuumCount := mq.GetRunningTaskCount(MaintenanceTaskType("vacuum"))
if vacuumCount != 1 {
t.Errorf("Expected 1 running vacuum task, got %d", vacuumCount)
}
// Should count 0 running balance tasks
balanceCount := mq.GetRunningTaskCount(MaintenanceTaskType("balance"))
if balanceCount != 0 {
t.Errorf("Expected 0 running balance tasks, got %d", balanceCount)
}
}
func TestCanExecuteTaskType(t *testing.T) {
// Test the fallback logic helper function
mq := &MaintenanceQueue{
tasks: map[string]*MaintenanceTask{
"running-task": {
ID: "running-task",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusInProgress,
},
},
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil, // Will use default max concurrent = 1
integration: nil,
}
// Should return false for EC (1 running, max = 1)
result := mq.canExecuteTaskType(MaintenanceTaskType("erasure_coding"))
if result {
t.Errorf("Expected canExecuteTaskType to return false for EC at capacity, got true")
}
// Should return true for vacuum (0 running, max = 1)
result = mq.canExecuteTaskType(MaintenanceTaskType("vacuum"))
if !result {
t.Errorf("Expected canExecuteTaskType to return true for vacuum, got false")
}
}
func TestGetMaxConcurrentForTaskType_DefaultBehavior(t *testing.T) {
// Test the default behavior when no policy or integration is set
mq := &MaintenanceQueue{
tasks: make(map[string]*MaintenanceTask),
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil,
integration: nil,
}
// Should return default value of 1
maxConcurrent := mq.getMaxConcurrentForTaskType(MaintenanceTaskType("erasure_coding"))
if maxConcurrent != 1 {
t.Errorf("Expected default max concurrent to be 1, got %d", maxConcurrent)
}
maxConcurrent = mq.getMaxConcurrentForTaskType(MaintenanceTaskType("vacuum"))
if maxConcurrent != 1 {
t.Errorf("Expected default max concurrent to be 1, got %d", maxConcurrent)
}
}
// Test edge cases and error conditions
func TestCanScheduleTaskNow_NilTask(t *testing.T) {
mq := &MaintenanceQueue{
tasks: make(map[string]*MaintenanceTask),
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil,
integration: nil,
}
// This should panic with a nil task, so we expect and catch the panic
defer func() {
if r := recover(); r == nil {
t.Errorf("Expected canScheduleTaskNow to panic with nil task, but it didn't")
}
}()
// This should panic
mq.canScheduleTaskNow(nil)
}
func TestCanScheduleTaskNow_EmptyTaskType(t *testing.T) {
mq := &MaintenanceQueue{
tasks: make(map[string]*MaintenanceTask),
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil,
integration: nil,
}
task := &MaintenanceTask{
ID: "empty-type-task",
Type: MaintenanceTaskType(""), // Empty task type
Status: TaskStatusPending,
}
// Should handle empty task type gracefully
result := mq.canScheduleTaskNow(task)
if !result {
t.Errorf("Expected canScheduleTaskNow to handle empty task type, got false")
}
}
func TestCanScheduleTaskNow_WithPolicy(t *testing.T) {
// Test with a policy that allows higher concurrency
policy := &MaintenancePolicy{
TaskPolicies: map[MaintenanceTaskType]*TaskPolicy{
MaintenanceTaskType("erasure_coding"): {
Enabled: true,
MaxConcurrent: 3,
Configuration: make(map[string]interface{}),
},
MaintenanceTaskType("vacuum"): {
Enabled: true,
MaxConcurrent: 2,
Configuration: make(map[string]interface{}),
},
},
GlobalMaxConcurrent: 10,
DefaultRepeatInterval: 24,
DefaultCheckInterval: 1,
}
mq := &MaintenanceQueue{
tasks: map[string]*MaintenanceTask{
"running-task-1": {
ID: "running-task-1",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusInProgress,
},
"running-task-2": {
ID: "running-task-2",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusAssigned,
},
},
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: policy,
integration: nil,
}
task := &MaintenanceTask{
ID: "test-task-policy",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusPending,
}
// Should return true because we have 2 running EC tasks but max is 3
result := mq.canScheduleTaskNow(task)
if !result {
t.Errorf("Expected canScheduleTaskNow to return true with policy allowing 3 concurrent, got false")
}
// Add one more running task to reach the limit
mq.tasks["running-task-3"] = &MaintenanceTask{
ID: "running-task-3",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusInProgress,
}
// Should return false because we now have 3 running EC tasks (at limit)
result = mq.canScheduleTaskNow(task)
if result {
t.Errorf("Expected canScheduleTaskNow to return false when at policy limit, got true")
}
}
Loading…
Cancel
Save