You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
353 lines
11 KiB
353 lines
11 KiB
package maintenance
|
|
|
|
import (
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
)
|
|
|
|
// Test suite for canScheduleTaskNow() function and related scheduling logic
|
|
//
|
|
// This test suite ensures that:
|
|
// 1. The fallback scheduling logic works correctly when no integration is present
|
|
// 2. Task concurrency limits are properly enforced per task type
|
|
// 3. Different task types don't interfere with each other's concurrency limits
|
|
// 4. Custom policies with higher concurrency limits work correctly
|
|
// 5. Edge cases (nil tasks, empty task types) are handled gracefully
|
|
// 6. Helper functions (GetRunningTaskCount, canExecuteTaskType, etc.) work correctly
|
|
//
|
|
// Background: The canScheduleTaskNow() function is critical for task assignment.
|
|
// It was previously failing due to an overly restrictive integration scheduler,
|
|
// so we implemented a temporary fix that bypasses the integration and uses
|
|
// fallback logic based on simple concurrency limits per task type.
|
|
|
|
func TestCanScheduleTaskNow_FallbackLogic(t *testing.T) {
|
|
// Test the current implementation which uses fallback logic
|
|
mq := &MaintenanceQueue{
|
|
tasks: make(map[string]*MaintenanceTask),
|
|
pendingTasks: []*MaintenanceTask{},
|
|
workers: make(map[string]*MaintenanceWorker),
|
|
policy: nil, // No policy for default behavior
|
|
integration: nil, // No integration to force fallback
|
|
}
|
|
|
|
task := &MaintenanceTask{
|
|
ID: "test-task-1",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusPending,
|
|
}
|
|
|
|
// Should return true with fallback logic (no running tasks, default max concurrent = 1)
|
|
result := mq.canScheduleTaskNow(task)
|
|
if !result {
|
|
t.Errorf("Expected canScheduleTaskNow to return true with fallback logic, got false")
|
|
}
|
|
}
|
|
|
|
func TestCanScheduleTaskNow_FallbackWithRunningTasks(t *testing.T) {
|
|
// Test fallback logic when there are already running tasks
|
|
mq := &MaintenanceQueue{
|
|
tasks: map[string]*MaintenanceTask{
|
|
"running-task": {
|
|
ID: "running-task",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusInProgress,
|
|
},
|
|
},
|
|
pendingTasks: []*MaintenanceTask{},
|
|
workers: make(map[string]*MaintenanceWorker),
|
|
policy: nil,
|
|
integration: nil,
|
|
}
|
|
|
|
task := &MaintenanceTask{
|
|
ID: "test-task-2",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusPending,
|
|
}
|
|
|
|
// Should return false because max concurrent is 1 and we have 1 running task
|
|
result := mq.canScheduleTaskNow(task)
|
|
if result {
|
|
t.Errorf("Expected canScheduleTaskNow to return false when at capacity, got true")
|
|
}
|
|
}
|
|
|
|
func TestCanScheduleTaskNow_DifferentTaskTypes(t *testing.T) {
|
|
// Test that different task types don't interfere with each other
|
|
mq := &MaintenanceQueue{
|
|
tasks: map[string]*MaintenanceTask{
|
|
"running-ec-task": {
|
|
ID: "running-ec-task",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusInProgress,
|
|
},
|
|
},
|
|
pendingTasks: []*MaintenanceTask{},
|
|
workers: make(map[string]*MaintenanceWorker),
|
|
policy: nil,
|
|
integration: nil,
|
|
}
|
|
|
|
// Test vacuum task when EC task is running
|
|
vacuumTask := &MaintenanceTask{
|
|
ID: "vacuum-task",
|
|
Type: MaintenanceTaskType("vacuum"),
|
|
Status: TaskStatusPending,
|
|
}
|
|
|
|
// Should return true because vacuum and erasure_coding are different task types
|
|
result := mq.canScheduleTaskNow(vacuumTask)
|
|
if !result {
|
|
t.Errorf("Expected canScheduleTaskNow to return true for different task type, got false")
|
|
}
|
|
|
|
// Test another EC task when one is already running
|
|
ecTask := &MaintenanceTask{
|
|
ID: "ec-task",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusPending,
|
|
}
|
|
|
|
// Should return false because max concurrent for EC is 1 and we have 1 running
|
|
result = mq.canScheduleTaskNow(ecTask)
|
|
if result {
|
|
t.Errorf("Expected canScheduleTaskNow to return false for same task type at capacity, got true")
|
|
}
|
|
}
|
|
|
|
func TestCanScheduleTaskNow_WithIntegration(t *testing.T) {
|
|
// Test with a real MaintenanceIntegration (will use fallback logic in current implementation)
|
|
policy := &MaintenancePolicy{
|
|
TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
|
|
GlobalMaxConcurrent: 10,
|
|
DefaultRepeatIntervalSeconds: 24 * 60 * 60, // 24 hours in seconds
|
|
DefaultCheckIntervalSeconds: 60 * 60, // 1 hour in seconds
|
|
}
|
|
mq := NewMaintenanceQueue(policy)
|
|
|
|
// Create a basic integration (this would normally be more complex)
|
|
integration := NewMaintenanceIntegration(mq, policy)
|
|
mq.SetIntegration(integration)
|
|
|
|
task := &MaintenanceTask{
|
|
ID: "test-task-3",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusPending,
|
|
}
|
|
|
|
// With our current implementation (fallback logic), this should return true
|
|
result := mq.canScheduleTaskNow(task)
|
|
if !result {
|
|
t.Errorf("Expected canScheduleTaskNow to return true with fallback logic, got false")
|
|
}
|
|
}
|
|
|
|
func TestGetRunningTaskCount(t *testing.T) {
|
|
// Test the helper function used by fallback logic
|
|
mq := &MaintenanceQueue{
|
|
tasks: map[string]*MaintenanceTask{
|
|
"task1": {
|
|
ID: "task1",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusInProgress,
|
|
},
|
|
"task2": {
|
|
ID: "task2",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusAssigned,
|
|
},
|
|
"task3": {
|
|
ID: "task3",
|
|
Type: MaintenanceTaskType("vacuum"),
|
|
Status: TaskStatusInProgress,
|
|
},
|
|
"task4": {
|
|
ID: "task4",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusCompleted,
|
|
},
|
|
},
|
|
pendingTasks: []*MaintenanceTask{},
|
|
workers: make(map[string]*MaintenanceWorker),
|
|
}
|
|
|
|
// Should count 2 running EC tasks (in_progress + assigned)
|
|
ecCount := mq.GetRunningTaskCount(MaintenanceTaskType("erasure_coding"))
|
|
if ecCount != 2 {
|
|
t.Errorf("Expected 2 running EC tasks, got %d", ecCount)
|
|
}
|
|
|
|
// Should count 1 running vacuum task
|
|
vacuumCount := mq.GetRunningTaskCount(MaintenanceTaskType("vacuum"))
|
|
if vacuumCount != 1 {
|
|
t.Errorf("Expected 1 running vacuum task, got %d", vacuumCount)
|
|
}
|
|
|
|
// Should count 0 running balance tasks
|
|
balanceCount := mq.GetRunningTaskCount(MaintenanceTaskType("balance"))
|
|
if balanceCount != 0 {
|
|
t.Errorf("Expected 0 running balance tasks, got %d", balanceCount)
|
|
}
|
|
}
|
|
|
|
func TestCanExecuteTaskType(t *testing.T) {
|
|
// Test the fallback logic helper function
|
|
mq := &MaintenanceQueue{
|
|
tasks: map[string]*MaintenanceTask{
|
|
"running-task": {
|
|
ID: "running-task",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusInProgress,
|
|
},
|
|
},
|
|
pendingTasks: []*MaintenanceTask{},
|
|
workers: make(map[string]*MaintenanceWorker),
|
|
policy: nil, // Will use default max concurrent = 1
|
|
integration: nil,
|
|
}
|
|
|
|
// Should return false for EC (1 running, max = 1)
|
|
result := mq.canExecuteTaskType(MaintenanceTaskType("erasure_coding"))
|
|
if result {
|
|
t.Errorf("Expected canExecuteTaskType to return false for EC at capacity, got true")
|
|
}
|
|
|
|
// Should return true for vacuum (0 running, max = 1)
|
|
result = mq.canExecuteTaskType(MaintenanceTaskType("vacuum"))
|
|
if !result {
|
|
t.Errorf("Expected canExecuteTaskType to return true for vacuum, got false")
|
|
}
|
|
}
|
|
|
|
func TestGetMaxConcurrentForTaskType_DefaultBehavior(t *testing.T) {
|
|
// Test the default behavior when no policy or integration is set
|
|
mq := &MaintenanceQueue{
|
|
tasks: make(map[string]*MaintenanceTask),
|
|
pendingTasks: []*MaintenanceTask{},
|
|
workers: make(map[string]*MaintenanceWorker),
|
|
policy: nil,
|
|
integration: nil,
|
|
}
|
|
|
|
// Should return default value of 1
|
|
maxConcurrent := mq.getMaxConcurrentForTaskType(MaintenanceTaskType("erasure_coding"))
|
|
if maxConcurrent != 1 {
|
|
t.Errorf("Expected default max concurrent to be 1, got %d", maxConcurrent)
|
|
}
|
|
|
|
maxConcurrent = mq.getMaxConcurrentForTaskType(MaintenanceTaskType("vacuum"))
|
|
if maxConcurrent != 1 {
|
|
t.Errorf("Expected default max concurrent to be 1, got %d", maxConcurrent)
|
|
}
|
|
}
|
|
|
|
// Test edge cases and error conditions
|
|
func TestCanScheduleTaskNow_NilTask(t *testing.T) {
|
|
mq := &MaintenanceQueue{
|
|
tasks: make(map[string]*MaintenanceTask),
|
|
pendingTasks: []*MaintenanceTask{},
|
|
workers: make(map[string]*MaintenanceWorker),
|
|
policy: nil,
|
|
integration: nil,
|
|
}
|
|
|
|
// This should panic with a nil task, so we expect and catch the panic
|
|
defer func() {
|
|
if r := recover(); r == nil {
|
|
t.Errorf("Expected canScheduleTaskNow to panic with nil task, but it didn't")
|
|
}
|
|
}()
|
|
|
|
// This should panic
|
|
mq.canScheduleTaskNow(nil)
|
|
}
|
|
|
|
func TestCanScheduleTaskNow_EmptyTaskType(t *testing.T) {
|
|
mq := &MaintenanceQueue{
|
|
tasks: make(map[string]*MaintenanceTask),
|
|
pendingTasks: []*MaintenanceTask{},
|
|
workers: make(map[string]*MaintenanceWorker),
|
|
policy: nil,
|
|
integration: nil,
|
|
}
|
|
|
|
task := &MaintenanceTask{
|
|
ID: "empty-type-task",
|
|
Type: MaintenanceTaskType(""), // Empty task type
|
|
Status: TaskStatusPending,
|
|
}
|
|
|
|
// Should handle empty task type gracefully
|
|
result := mq.canScheduleTaskNow(task)
|
|
if !result {
|
|
t.Errorf("Expected canScheduleTaskNow to handle empty task type, got false")
|
|
}
|
|
}
|
|
|
|
func TestCanScheduleTaskNow_WithPolicy(t *testing.T) {
|
|
// Test with a policy that allows higher concurrency
|
|
policy := &MaintenancePolicy{
|
|
TaskPolicies: map[string]*worker_pb.TaskPolicy{
|
|
string(MaintenanceTaskType("erasure_coding")): {
|
|
Enabled: true,
|
|
MaxConcurrent: 3,
|
|
RepeatIntervalSeconds: 60 * 60, // 1 hour
|
|
CheckIntervalSeconds: 60 * 60, // 1 hour
|
|
},
|
|
string(MaintenanceTaskType("vacuum")): {
|
|
Enabled: true,
|
|
MaxConcurrent: 2,
|
|
RepeatIntervalSeconds: 60 * 60, // 1 hour
|
|
CheckIntervalSeconds: 60 * 60, // 1 hour
|
|
},
|
|
},
|
|
GlobalMaxConcurrent: 10,
|
|
DefaultRepeatIntervalSeconds: 24 * 60 * 60, // 24 hours in seconds
|
|
DefaultCheckIntervalSeconds: 60 * 60, // 1 hour in seconds
|
|
}
|
|
|
|
mq := &MaintenanceQueue{
|
|
tasks: map[string]*MaintenanceTask{
|
|
"running-task-1": {
|
|
ID: "running-task-1",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusInProgress,
|
|
},
|
|
"running-task-2": {
|
|
ID: "running-task-2",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusAssigned,
|
|
},
|
|
},
|
|
pendingTasks: []*MaintenanceTask{},
|
|
workers: make(map[string]*MaintenanceWorker),
|
|
policy: policy,
|
|
integration: nil,
|
|
}
|
|
|
|
task := &MaintenanceTask{
|
|
ID: "test-task-policy",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusPending,
|
|
}
|
|
|
|
// Should return true because we have 2 running EC tasks but max is 3
|
|
result := mq.canScheduleTaskNow(task)
|
|
if !result {
|
|
t.Errorf("Expected canScheduleTaskNow to return true with policy allowing 3 concurrent, got false")
|
|
}
|
|
|
|
// Add one more running task to reach the limit
|
|
mq.tasks["running-task-3"] = &MaintenanceTask{
|
|
ID: "running-task-3",
|
|
Type: MaintenanceTaskType("erasure_coding"),
|
|
Status: TaskStatusInProgress,
|
|
}
|
|
|
|
// Should return false because we now have 3 running EC tasks (at limit)
|
|
result = mq.canScheduleTaskNow(task)
|
|
if result {
|
|
t.Errorf("Expected canScheduleTaskNow to return false when at policy limit, got true")
|
|
}
|
|
}
|