You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1024 lines
32 KiB
1024 lines
32 KiB
package maintenance
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"fmt"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
)
|
|
|
|
// NewMaintenanceQueue creates a new maintenance queue
|
|
func NewMaintenanceQueue(policy *MaintenancePolicy) *MaintenanceQueue {
|
|
queue := &MaintenanceQueue{
|
|
tasks: make(map[string]*MaintenanceTask),
|
|
workers: make(map[string]*MaintenanceWorker),
|
|
pendingTasks: make([]*MaintenanceTask, 0),
|
|
policy: policy,
|
|
persistenceChan: make(chan *MaintenanceTask, 1000), // Buffer for async persistence
|
|
}
|
|
|
|
// Start persistence worker goroutine
|
|
go queue.persistenceWorker()
|
|
|
|
return queue
|
|
}
|
|
|
|
// SetIntegration sets the integration reference
|
|
func (mq *MaintenanceQueue) SetIntegration(integration *MaintenanceIntegration) {
|
|
mq.integration = integration
|
|
glog.V(1).Infof("Maintenance queue configured with integration")
|
|
}
|
|
|
|
// SetPersistence sets the task persistence interface
|
|
func (mq *MaintenanceQueue) SetPersistence(persistence TaskPersistence) {
|
|
mq.persistence = persistence
|
|
glog.V(1).Infof("Maintenance queue configured with task persistence")
|
|
}
|
|
|
|
// LoadTasksFromPersistence loads tasks from persistent storage on startup
|
|
func (mq *MaintenanceQueue) LoadTasksFromPersistence() error {
|
|
if mq.persistence == nil {
|
|
glog.V(1).Infof("No task persistence configured, skipping task loading")
|
|
return nil
|
|
}
|
|
|
|
glog.Infof("Loading tasks from persistence...")
|
|
|
|
// Load tasks without holding lock to prevent deadlock
|
|
tasks, err := mq.persistence.LoadAllTaskStates()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load task states: %w", err)
|
|
}
|
|
|
|
// Only acquire lock for the in-memory operations
|
|
mq.mutex.Lock()
|
|
defer mq.mutex.Unlock()
|
|
|
|
glog.Infof("DEBUG LoadTasksFromPersistence: Found %d tasks in persistence", len(tasks))
|
|
|
|
// Reset task maps
|
|
mq.tasks = make(map[string]*MaintenanceTask)
|
|
mq.pendingTasks = make([]*MaintenanceTask, 0)
|
|
|
|
// Load tasks by status
|
|
for _, task := range tasks {
|
|
glog.Infof("DEBUG LoadTasksFromPersistence: Loading task %s (type: %s, status: %s, scheduled: %v)", task.ID, task.Type, task.Status, task.ScheduledAt)
|
|
mq.tasks[task.ID] = task
|
|
|
|
switch task.Status {
|
|
case TaskStatusPending:
|
|
glog.Infof("DEBUG LoadTasksFromPersistence: Adding task %s to pending queue", task.ID)
|
|
mq.pendingTasks = append(mq.pendingTasks, task)
|
|
case TaskStatusAssigned, TaskStatusInProgress:
|
|
// For assigned/in-progress tasks, we need to check if the worker is still available
|
|
// If not, we should fail them and make them eligible for retry
|
|
if task.WorkerID != "" {
|
|
if _, exists := mq.workers[task.WorkerID]; !exists {
|
|
glog.Warningf("Task %s was assigned to unavailable worker %s, marking as failed", task.ID, task.WorkerID)
|
|
task.Status = TaskStatusFailed
|
|
task.Error = "Worker unavailable after restart"
|
|
completedTime := time.Now()
|
|
task.CompletedAt = &completedTime
|
|
|
|
// Check if it should be retried
|
|
if task.RetryCount < task.MaxRetries {
|
|
task.RetryCount++
|
|
task.Status = TaskStatusPending
|
|
task.WorkerID = ""
|
|
task.StartedAt = nil
|
|
task.CompletedAt = nil
|
|
task.Error = ""
|
|
task.ScheduledAt = time.Now().Add(1 * time.Minute) // Retry after restart delay
|
|
glog.Infof("DEBUG LoadTasksFromPersistence: Retrying task %s, adding to pending queue", task.ID)
|
|
mq.pendingTasks = append(mq.pendingTasks, task)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sort pending tasks by priority and schedule time
|
|
sort.Slice(mq.pendingTasks, func(i, j int) bool {
|
|
if mq.pendingTasks[i].Priority != mq.pendingTasks[j].Priority {
|
|
return mq.pendingTasks[i].Priority > mq.pendingTasks[j].Priority
|
|
}
|
|
return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt)
|
|
})
|
|
|
|
glog.Infof("Loaded %d tasks from persistence (%d pending)", len(tasks), len(mq.pendingTasks))
|
|
return nil
|
|
}
|
|
|
|
// persistenceWorker handles async persistence operations
|
|
func (mq *MaintenanceQueue) persistenceWorker() {
|
|
for task := range mq.persistenceChan {
|
|
if mq.persistence != nil {
|
|
if err := mq.persistence.SaveTaskState(task); err != nil {
|
|
glog.Errorf("Failed to save task state for %s: %v", task.ID, err)
|
|
}
|
|
}
|
|
}
|
|
glog.V(1).Infof("Persistence worker shut down")
|
|
}
|
|
|
|
// Close gracefully shuts down the maintenance queue
|
|
func (mq *MaintenanceQueue) Close() {
|
|
if mq.persistenceChan != nil {
|
|
close(mq.persistenceChan)
|
|
glog.V(1).Infof("Maintenance queue persistence channel closed")
|
|
}
|
|
}
|
|
|
|
// saveTaskState saves a task to persistent storage asynchronously
|
|
func (mq *MaintenanceQueue) saveTaskState(task *MaintenanceTask) {
|
|
if mq.persistence != nil && mq.persistenceChan != nil {
|
|
// Create a copy to avoid race conditions
|
|
taskCopy := *task
|
|
select {
|
|
case mq.persistenceChan <- &taskCopy:
|
|
// Successfully queued for async persistence
|
|
default:
|
|
glog.Warningf("Persistence channel full, task state may be lost: %s", task.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupCompletedTasks removes old completed tasks beyond the retention limit
|
|
func (mq *MaintenanceQueue) cleanupCompletedTasks() {
|
|
if mq.persistence != nil {
|
|
if err := mq.persistence.CleanupCompletedTasks(); err != nil {
|
|
glog.Errorf("Failed to cleanup completed tasks: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// AddTask adds a new maintenance task to the queue with deduplication
|
|
func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) {
|
|
mq.mutex.Lock()
|
|
defer mq.mutex.Unlock()
|
|
|
|
// Check for duplicate tasks (same type + volume + not completed)
|
|
if mq.hasDuplicateTask(task) {
|
|
glog.V(1).Infof("Task skipped (duplicate): %s for volume %d on %s (already queued or running)",
|
|
task.Type, task.VolumeID, task.Server)
|
|
return
|
|
}
|
|
|
|
task.ID = generateTaskID()
|
|
task.Status = TaskStatusPending
|
|
task.CreatedAt = time.Now()
|
|
task.MaxRetries = 3 // Default retry count
|
|
|
|
// Initialize assignment history and set creation context
|
|
task.AssignmentHistory = make([]*TaskAssignmentRecord, 0)
|
|
if task.CreatedBy == "" {
|
|
task.CreatedBy = "maintenance-system"
|
|
}
|
|
if task.CreationContext == "" {
|
|
task.CreationContext = "Automatic task creation based on system monitoring"
|
|
}
|
|
if task.Tags == nil {
|
|
task.Tags = make(map[string]string)
|
|
}
|
|
|
|
mq.tasks[task.ID] = task
|
|
mq.pendingTasks = append(mq.pendingTasks, task)
|
|
|
|
// Sort pending tasks by priority and schedule time
|
|
sort.Slice(mq.pendingTasks, func(i, j int) bool {
|
|
if mq.pendingTasks[i].Priority != mq.pendingTasks[j].Priority {
|
|
return mq.pendingTasks[i].Priority > mq.pendingTasks[j].Priority
|
|
}
|
|
return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt)
|
|
})
|
|
|
|
// Save task state to persistence
|
|
mq.saveTaskState(task)
|
|
|
|
scheduleInfo := ""
|
|
if !task.ScheduledAt.IsZero() && time.Until(task.ScheduledAt) > time.Minute {
|
|
scheduleInfo = fmt.Sprintf(", scheduled for %v", task.ScheduledAt.Format("15:04:05"))
|
|
}
|
|
|
|
glog.Infof("Task queued: %s (%s) volume %d on %s, priority %d%s, reason: %s",
|
|
task.ID, task.Type, task.VolumeID, task.Server, task.Priority, scheduleInfo, task.Reason)
|
|
}
|
|
|
|
// hasDuplicateTask checks if a similar task already exists (same type, volume, and not completed)
|
|
func (mq *MaintenanceQueue) hasDuplicateTask(newTask *MaintenanceTask) bool {
|
|
for _, existingTask := range mq.tasks {
|
|
if existingTask.Type == newTask.Type &&
|
|
existingTask.VolumeID == newTask.VolumeID &&
|
|
existingTask.Server == newTask.Server &&
|
|
(existingTask.Status == TaskStatusPending ||
|
|
existingTask.Status == TaskStatusAssigned ||
|
|
existingTask.Status == TaskStatusInProgress) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// AddTasksFromResults converts detection results to tasks and adds them to the queue
|
|
func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) {
|
|
for _, result := range results {
|
|
// Validate that task has proper typed parameters
|
|
if result.TypedParams == nil {
|
|
glog.Warningf("Rejecting invalid task: %s for volume %d on %s - no typed parameters (insufficient destinations or planning failed)",
|
|
result.TaskType, result.VolumeID, result.Server)
|
|
continue
|
|
}
|
|
|
|
task := &MaintenanceTask{
|
|
Type: result.TaskType,
|
|
Priority: result.Priority,
|
|
VolumeID: result.VolumeID,
|
|
Server: result.Server,
|
|
Collection: result.Collection,
|
|
// Copy typed protobuf parameters
|
|
TypedParams: result.TypedParams,
|
|
Reason: result.Reason,
|
|
ScheduledAt: result.ScheduleAt,
|
|
}
|
|
mq.AddTask(task)
|
|
}
|
|
}
|
|
|
|
// GetNextTask returns the next available task for a worker
|
|
func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
|
|
// Use read lock for initial checks and search
|
|
mq.mutex.RLock()
|
|
|
|
worker, exists := mq.workers[workerID]
|
|
if !exists {
|
|
mq.mutex.RUnlock()
|
|
glog.V(2).Infof("Task assignment failed for worker %s: worker not registered", workerID)
|
|
return nil
|
|
}
|
|
|
|
// Check if worker has capacity
|
|
if worker.CurrentLoad >= worker.MaxConcurrent {
|
|
mq.mutex.RUnlock()
|
|
glog.V(2).Infof("Task assignment failed for worker %s: at capacity (%d/%d)", workerID, worker.CurrentLoad, worker.MaxConcurrent)
|
|
return nil
|
|
}
|
|
|
|
now := time.Now()
|
|
var selectedTask *MaintenanceTask
|
|
var selectedIndex int = -1
|
|
|
|
// Find the next suitable task (using read lock)
|
|
for i, task := range mq.pendingTasks {
|
|
// Check if it's time to execute the task
|
|
if task.ScheduledAt.After(now) {
|
|
glog.V(3).Infof("Task %s skipped for worker %s: scheduled for future (%v)", task.ID, workerID, task.ScheduledAt)
|
|
continue
|
|
}
|
|
|
|
// Check if worker can handle this task type
|
|
if !mq.workerCanHandle(task.Type, capabilities) {
|
|
glog.V(3).Infof("Task %s (%s) skipped for worker %s: capability mismatch (worker has: %v)", task.ID, task.Type, workerID, capabilities)
|
|
continue
|
|
}
|
|
|
|
// Check if this task type needs a cooldown period
|
|
if !mq.canScheduleTaskNow(task) {
|
|
// Add detailed diagnostic information
|
|
runningCount := mq.GetRunningTaskCount(task.Type)
|
|
maxConcurrent := mq.getMaxConcurrentForTaskType(task.Type)
|
|
glog.V(2).Infof("Task %s (%s) skipped for worker %s: scheduling constraints not met (running: %d, max: %d)",
|
|
task.ID, task.Type, workerID, runningCount, maxConcurrent)
|
|
continue
|
|
}
|
|
|
|
// Found a suitable task
|
|
selectedTask = task
|
|
selectedIndex = i
|
|
break
|
|
}
|
|
|
|
// Release read lock
|
|
mq.mutex.RUnlock()
|
|
|
|
// If no task found, return nil
|
|
if selectedTask == nil {
|
|
glog.V(3).Infof("No suitable tasks available for worker %s (checked %d pending tasks)", workerID, len(mq.pendingTasks))
|
|
return nil
|
|
}
|
|
|
|
// Now acquire write lock to actually assign the task
|
|
mq.mutex.Lock()
|
|
defer mq.mutex.Unlock()
|
|
|
|
// Re-check that the task is still available (it might have been assigned to another worker)
|
|
if selectedIndex >= len(mq.pendingTasks) || mq.pendingTasks[selectedIndex].ID != selectedTask.ID {
|
|
glog.V(2).Infof("Task %s no longer available for worker %s: assigned to another worker", selectedTask.ID, workerID)
|
|
return nil
|
|
}
|
|
|
|
// Record assignment history
|
|
workerAddress := ""
|
|
if worker, exists := mq.workers[workerID]; exists {
|
|
workerAddress = worker.Address
|
|
}
|
|
|
|
// Create assignment record
|
|
assignmentRecord := &TaskAssignmentRecord{
|
|
WorkerID: workerID,
|
|
WorkerAddress: workerAddress,
|
|
AssignedAt: now,
|
|
Reason: "Task assigned to available worker",
|
|
}
|
|
|
|
// Initialize assignment history if nil
|
|
if selectedTask.AssignmentHistory == nil {
|
|
selectedTask.AssignmentHistory = make([]*TaskAssignmentRecord, 0)
|
|
}
|
|
selectedTask.AssignmentHistory = append(selectedTask.AssignmentHistory, assignmentRecord)
|
|
|
|
// Assign the task
|
|
selectedTask.Status = TaskStatusAssigned
|
|
selectedTask.WorkerID = workerID
|
|
selectedTask.StartedAt = &now
|
|
|
|
// Remove from pending tasks
|
|
mq.pendingTasks = append(mq.pendingTasks[:selectedIndex], mq.pendingTasks[selectedIndex+1:]...)
|
|
|
|
// Update worker load
|
|
if worker, exists := mq.workers[workerID]; exists {
|
|
worker.CurrentLoad++
|
|
}
|
|
|
|
// Track pending operation
|
|
mq.trackPendingOperation(selectedTask)
|
|
|
|
// Save task state after assignment
|
|
mq.saveTaskState(selectedTask)
|
|
|
|
glog.Infof("Task assigned: %s (%s) → worker %s (volume %d, server %s)",
|
|
selectedTask.ID, selectedTask.Type, workerID, selectedTask.VolumeID, selectedTask.Server)
|
|
|
|
return selectedTask
|
|
}
|
|
|
|
// CompleteTask marks a task as completed
|
|
func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
|
|
mq.mutex.Lock()
|
|
defer mq.mutex.Unlock()
|
|
|
|
task, exists := mq.tasks[taskID]
|
|
if !exists {
|
|
glog.Warningf("Attempted to complete non-existent task: %s", taskID)
|
|
return
|
|
}
|
|
|
|
completedTime := time.Now()
|
|
task.CompletedAt = &completedTime
|
|
|
|
// Calculate task duration
|
|
var duration time.Duration
|
|
if task.StartedAt != nil {
|
|
duration = completedTime.Sub(*task.StartedAt)
|
|
}
|
|
|
|
if error != "" {
|
|
task.Status = TaskStatusFailed
|
|
task.Error = error
|
|
|
|
// Check if task should be retried
|
|
if task.RetryCount < task.MaxRetries {
|
|
// Record unassignment due to failure/retry
|
|
if task.WorkerID != "" && len(task.AssignmentHistory) > 0 {
|
|
lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1]
|
|
if lastAssignment.UnassignedAt == nil {
|
|
unassignedTime := completedTime
|
|
lastAssignment.UnassignedAt = &unassignedTime
|
|
lastAssignment.Reason = fmt.Sprintf("Task failed, scheduling retry (attempt %d/%d): %s",
|
|
task.RetryCount+1, task.MaxRetries, error)
|
|
}
|
|
}
|
|
|
|
task.RetryCount++
|
|
task.Status = TaskStatusPending
|
|
task.WorkerID = ""
|
|
task.StartedAt = nil
|
|
task.CompletedAt = nil
|
|
task.Error = ""
|
|
task.ScheduledAt = time.Now().Add(15 * time.Minute) // Retry delay
|
|
|
|
mq.pendingTasks = append(mq.pendingTasks, task)
|
|
// Save task state after retry setup
|
|
mq.saveTaskState(task)
|
|
glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s",
|
|
taskID, task.Type, task.RetryCount, task.MaxRetries, task.WorkerID, duration, error)
|
|
} else {
|
|
// Record unassignment due to permanent failure
|
|
if task.WorkerID != "" && len(task.AssignmentHistory) > 0 {
|
|
lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1]
|
|
if lastAssignment.UnassignedAt == nil {
|
|
unassignedTime := completedTime
|
|
lastAssignment.UnassignedAt = &unassignedTime
|
|
lastAssignment.Reason = fmt.Sprintf("Task failed permanently after %d retries: %s", task.MaxRetries, error)
|
|
}
|
|
}
|
|
|
|
// Save task state after permanent failure
|
|
mq.saveTaskState(task)
|
|
glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s",
|
|
taskID, task.Type, task.WorkerID, duration, task.MaxRetries, error)
|
|
}
|
|
} else {
|
|
task.Status = TaskStatusCompleted
|
|
task.Progress = 100
|
|
// Save task state after successful completion
|
|
mq.saveTaskState(task)
|
|
glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d",
|
|
taskID, task.Type, task.WorkerID, duration, task.VolumeID)
|
|
}
|
|
|
|
// Update worker
|
|
if task.WorkerID != "" {
|
|
if worker, exists := mq.workers[task.WorkerID]; exists {
|
|
worker.CurrentTask = nil
|
|
worker.CurrentLoad--
|
|
if worker.CurrentLoad == 0 {
|
|
worker.Status = "active"
|
|
}
|
|
}
|
|
}
|
|
|
|
// Remove pending operation (unless it's being retried)
|
|
if task.Status != TaskStatusPending {
|
|
mq.removePendingOperation(taskID)
|
|
}
|
|
|
|
// Periodically cleanup old completed tasks (every 10th completion)
|
|
if task.Status == TaskStatusCompleted {
|
|
// Simple counter-based trigger for cleanup
|
|
if len(mq.tasks)%10 == 0 {
|
|
go mq.cleanupCompletedTasks()
|
|
}
|
|
}
|
|
}
|
|
|
|
// UpdateTaskProgress updates the progress of a running task
|
|
func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64) {
|
|
mq.mutex.RLock()
|
|
defer mq.mutex.RUnlock()
|
|
|
|
if task, exists := mq.tasks[taskID]; exists {
|
|
oldProgress := task.Progress
|
|
task.Progress = progress
|
|
task.Status = TaskStatusInProgress
|
|
|
|
// Update pending operation status
|
|
mq.updatePendingOperationStatus(taskID, "in_progress")
|
|
|
|
// Log progress at significant milestones or changes
|
|
if progress == 0 {
|
|
glog.V(1).Infof("Task started: %s (%s) worker %s, volume %d",
|
|
taskID, task.Type, task.WorkerID, task.VolumeID)
|
|
} else if progress >= 100 {
|
|
glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete",
|
|
taskID, task.Type, task.WorkerID, progress)
|
|
} else if progress-oldProgress >= 25 { // Log every 25% increment
|
|
glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete",
|
|
taskID, task.Type, task.WorkerID, progress)
|
|
}
|
|
|
|
// Save task state after progress update
|
|
if progress == 0 || progress >= 100 || progress-oldProgress >= 10 {
|
|
mq.saveTaskState(task)
|
|
}
|
|
} else {
|
|
glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress)
|
|
}
|
|
}
|
|
|
|
// RegisterWorker registers a new worker
|
|
func (mq *MaintenanceQueue) RegisterWorker(worker *MaintenanceWorker) {
|
|
mq.mutex.Lock()
|
|
defer mq.mutex.Unlock()
|
|
|
|
isNewWorker := true
|
|
if existingWorker, exists := mq.workers[worker.ID]; exists {
|
|
isNewWorker = false
|
|
glog.Infof("Worker reconnected: %s at %s (capabilities: %v, max concurrent: %d)",
|
|
worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent)
|
|
|
|
// Preserve current load when reconnecting
|
|
worker.CurrentLoad = existingWorker.CurrentLoad
|
|
} else {
|
|
glog.Infof("Worker registered: %s at %s (capabilities: %v, max concurrent: %d)",
|
|
worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent)
|
|
}
|
|
|
|
worker.LastHeartbeat = time.Now()
|
|
worker.Status = "active"
|
|
if isNewWorker {
|
|
worker.CurrentLoad = 0
|
|
}
|
|
mq.workers[worker.ID] = worker
|
|
}
|
|
|
|
// UpdateWorkerHeartbeat updates worker heartbeat
|
|
func (mq *MaintenanceQueue) UpdateWorkerHeartbeat(workerID string) {
|
|
mq.mutex.Lock()
|
|
defer mq.mutex.Unlock()
|
|
|
|
if worker, exists := mq.workers[workerID]; exists {
|
|
lastSeen := worker.LastHeartbeat
|
|
worker.LastHeartbeat = time.Now()
|
|
|
|
// Log if worker was offline for a while
|
|
if time.Since(lastSeen) > 2*time.Minute {
|
|
glog.Infof("Worker %s heartbeat resumed after %v", workerID, time.Since(lastSeen))
|
|
}
|
|
} else {
|
|
glog.V(2).Infof("Heartbeat from unknown worker: %s", workerID)
|
|
}
|
|
}
|
|
|
|
// GetRunningTaskCount returns the number of running tasks of a specific type
|
|
func (mq *MaintenanceQueue) GetRunningTaskCount(taskType MaintenanceTaskType) int {
|
|
mq.mutex.RLock()
|
|
defer mq.mutex.RUnlock()
|
|
|
|
count := 0
|
|
for _, task := range mq.tasks {
|
|
if task.Type == taskType && (task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress) {
|
|
count++
|
|
}
|
|
}
|
|
return count
|
|
}
|
|
|
|
// WasTaskRecentlyCompleted checks if a similar task was recently completed
|
|
func (mq *MaintenanceQueue) WasTaskRecentlyCompleted(taskType MaintenanceTaskType, volumeID uint32, server string, now time.Time) bool {
|
|
mq.mutex.RLock()
|
|
defer mq.mutex.RUnlock()
|
|
|
|
// Get the repeat prevention interval for this task type
|
|
interval := mq.getRepeatPreventionInterval(taskType)
|
|
cutoff := now.Add(-interval)
|
|
|
|
for _, task := range mq.tasks {
|
|
if task.Type == taskType &&
|
|
task.VolumeID == volumeID &&
|
|
task.Server == server &&
|
|
task.Status == TaskStatusCompleted &&
|
|
task.CompletedAt != nil &&
|
|
task.CompletedAt.After(cutoff) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// getRepeatPreventionInterval returns the interval for preventing task repetition
|
|
func (mq *MaintenanceQueue) getRepeatPreventionInterval(taskType MaintenanceTaskType) time.Duration {
|
|
// First try to get default from task scheduler
|
|
if mq.integration != nil {
|
|
if scheduler := mq.integration.GetTaskScheduler(taskType); scheduler != nil {
|
|
defaultInterval := scheduler.GetDefaultRepeatInterval()
|
|
if defaultInterval > 0 {
|
|
glog.V(3).Infof("Using task scheduler default repeat interval for %s: %v", taskType, defaultInterval)
|
|
return defaultInterval
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fallback to policy configuration if no scheduler available or scheduler doesn't provide default
|
|
if mq.policy != nil {
|
|
repeatIntervalHours := GetRepeatInterval(mq.policy, taskType)
|
|
if repeatIntervalHours > 0 {
|
|
interval := time.Duration(repeatIntervalHours) * time.Hour
|
|
glog.V(3).Infof("Using policy configuration repeat interval for %s: %v", taskType, interval)
|
|
return interval
|
|
}
|
|
}
|
|
|
|
// Ultimate fallback - but avoid hardcoded values where possible
|
|
glog.V(2).Infof("No scheduler or policy configuration found for task type %s, using minimal default: 1h", taskType)
|
|
return time.Hour // Minimal safe default
|
|
}
|
|
|
|
// GetTasks returns tasks with optional filtering
|
|
func (mq *MaintenanceQueue) GetTasks(status MaintenanceTaskStatus, taskType MaintenanceTaskType, limit int) []*MaintenanceTask {
|
|
// Create a copy of task slice while holding the lock for minimal time
|
|
mq.mutex.RLock()
|
|
tasksCopy := make([]*MaintenanceTask, 0, len(mq.tasks))
|
|
for _, task := range mq.tasks {
|
|
if status != "" && task.Status != status {
|
|
continue
|
|
}
|
|
if taskType != "" && task.Type != taskType {
|
|
continue
|
|
}
|
|
// Create a shallow copy to avoid data races
|
|
taskCopy := *task
|
|
tasksCopy = append(tasksCopy, &taskCopy)
|
|
if limit > 0 && len(tasksCopy) >= limit {
|
|
break
|
|
}
|
|
}
|
|
mq.mutex.RUnlock()
|
|
|
|
// Sort after releasing the lock to prevent deadlocks
|
|
sort.Slice(tasksCopy, func(i, j int) bool {
|
|
return tasksCopy[i].CreatedAt.After(tasksCopy[j].CreatedAt)
|
|
})
|
|
|
|
return tasksCopy
|
|
}
|
|
|
|
// GetWorkers returns all registered workers
|
|
func (mq *MaintenanceQueue) GetWorkers() []*MaintenanceWorker {
|
|
mq.mutex.RLock()
|
|
workers := make([]*MaintenanceWorker, 0, len(mq.workers))
|
|
for _, worker := range mq.workers {
|
|
// Create a shallow copy to avoid data races
|
|
workerCopy := *worker
|
|
workers = append(workers, &workerCopy)
|
|
}
|
|
mq.mutex.RUnlock()
|
|
|
|
return workers
|
|
}
|
|
|
|
// generateTaskID generates a unique ID for tasks
|
|
func generateTaskID() string {
|
|
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
|
|
b := make([]byte, 8)
|
|
randBytes := make([]byte, 8)
|
|
|
|
// Generate random bytes
|
|
if _, err := rand.Read(randBytes); err != nil {
|
|
// Fallback to timestamp-based ID if crypto/rand fails
|
|
timestamp := time.Now().UnixNano()
|
|
return fmt.Sprintf("task-%d", timestamp)
|
|
}
|
|
|
|
// Convert random bytes to charset
|
|
for i := range b {
|
|
b[i] = charset[int(randBytes[i])%len(charset)]
|
|
}
|
|
|
|
// Add timestamp suffix to ensure uniqueness
|
|
timestamp := time.Now().Unix() % 10000 // last 4 digits of timestamp
|
|
return fmt.Sprintf("%s-%04d", string(b), timestamp)
|
|
}
|
|
|
|
// RetryTask manually retries a failed or pending task
|
|
func (mq *MaintenanceQueue) RetryTask(taskID string) error {
|
|
mq.mutex.Lock()
|
|
defer mq.mutex.Unlock()
|
|
|
|
task, exists := mq.tasks[taskID]
|
|
if !exists {
|
|
return fmt.Errorf("task %s not found", taskID)
|
|
}
|
|
|
|
// Only allow retry for failed or pending tasks
|
|
if task.Status != TaskStatusFailed && task.Status != TaskStatusPending {
|
|
return fmt.Errorf("task %s cannot be retried (status: %s)", taskID, task.Status)
|
|
}
|
|
|
|
// Reset task for retry
|
|
now := time.Now()
|
|
task.Status = TaskStatusPending
|
|
task.WorkerID = ""
|
|
task.StartedAt = nil
|
|
task.CompletedAt = nil
|
|
task.Error = ""
|
|
task.ScheduledAt = now // Schedule immediately
|
|
task.Progress = 0
|
|
|
|
// Add to assignment history if it was previously assigned
|
|
if len(task.AssignmentHistory) > 0 {
|
|
lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1]
|
|
if lastAssignment.UnassignedAt == nil {
|
|
unassignedTime := now
|
|
lastAssignment.UnassignedAt = &unassignedTime
|
|
lastAssignment.Reason = "Manual retry requested"
|
|
}
|
|
}
|
|
|
|
// Remove from current pending list if already there to avoid duplicates
|
|
for i, pendingTask := range mq.pendingTasks {
|
|
if pendingTask.ID == taskID {
|
|
mq.pendingTasks = append(mq.pendingTasks[:i], mq.pendingTasks[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
|
|
// Add back to pending queue
|
|
mq.pendingTasks = append(mq.pendingTasks, task)
|
|
|
|
// Save task state
|
|
mq.saveTaskState(task)
|
|
|
|
glog.Infof("Task manually retried: %s (%s) for volume %d", taskID, task.Type, task.VolumeID)
|
|
return nil
|
|
}
|
|
|
|
func (mq *MaintenanceQueue) CleanupOldTasks(retention time.Duration) int {
|
|
mq.mutex.Lock()
|
|
defer mq.mutex.Unlock()
|
|
|
|
cutoff := time.Now().Add(-retention)
|
|
removed := 0
|
|
|
|
for id, task := range mq.tasks {
|
|
if (task.Status == TaskStatusCompleted || task.Status == TaskStatusFailed) &&
|
|
task.CompletedAt != nil &&
|
|
task.CompletedAt.Before(cutoff) {
|
|
delete(mq.tasks, id)
|
|
removed++
|
|
}
|
|
}
|
|
|
|
glog.V(2).Infof("Cleaned up %d old maintenance tasks", removed)
|
|
return removed
|
|
}
|
|
|
|
// RemoveStaleWorkers removes workers that haven't sent heartbeat recently
|
|
func (mq *MaintenanceQueue) RemoveStaleWorkers(timeout time.Duration) int {
|
|
mq.mutex.Lock()
|
|
defer mq.mutex.Unlock()
|
|
|
|
cutoff := time.Now().Add(-timeout)
|
|
removed := 0
|
|
|
|
for id, worker := range mq.workers {
|
|
if worker.LastHeartbeat.Before(cutoff) {
|
|
// Mark any assigned tasks as failed and record unassignment
|
|
for _, task := range mq.tasks {
|
|
if task.WorkerID == id && (task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress) {
|
|
// Record unassignment due to worker becoming unavailable
|
|
if len(task.AssignmentHistory) > 0 {
|
|
lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1]
|
|
if lastAssignment.UnassignedAt == nil {
|
|
unassignedTime := time.Now()
|
|
lastAssignment.UnassignedAt = &unassignedTime
|
|
lastAssignment.Reason = "Worker became unavailable (stale heartbeat)"
|
|
}
|
|
}
|
|
|
|
task.Status = TaskStatusFailed
|
|
task.Error = "Worker became unavailable"
|
|
completedTime := time.Now()
|
|
task.CompletedAt = &completedTime
|
|
}
|
|
}
|
|
|
|
delete(mq.workers, id)
|
|
removed++
|
|
glog.Warningf("Removed stale maintenance worker %s", id)
|
|
}
|
|
}
|
|
|
|
return removed
|
|
}
|
|
|
|
// GetStats returns maintenance statistics
|
|
func (mq *MaintenanceQueue) GetStats() *MaintenanceStats {
|
|
mq.mutex.RLock()
|
|
defer mq.mutex.RUnlock()
|
|
|
|
stats := &MaintenanceStats{
|
|
TotalTasks: len(mq.tasks),
|
|
TasksByStatus: make(map[MaintenanceTaskStatus]int),
|
|
TasksByType: make(map[MaintenanceTaskType]int),
|
|
ActiveWorkers: 0,
|
|
}
|
|
|
|
today := time.Now().Truncate(24 * time.Hour)
|
|
var totalDuration time.Duration
|
|
var completedTasks int
|
|
|
|
for _, task := range mq.tasks {
|
|
stats.TasksByStatus[task.Status]++
|
|
stats.TasksByType[task.Type]++
|
|
|
|
if task.CompletedAt != nil && task.CompletedAt.After(today) {
|
|
if task.Status == TaskStatusCompleted {
|
|
stats.CompletedToday++
|
|
} else if task.Status == TaskStatusFailed {
|
|
stats.FailedToday++
|
|
}
|
|
|
|
if task.StartedAt != nil {
|
|
duration := task.CompletedAt.Sub(*task.StartedAt)
|
|
totalDuration += duration
|
|
completedTasks++
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, worker := range mq.workers {
|
|
if worker.Status == "active" || worker.Status == "busy" {
|
|
stats.ActiveWorkers++
|
|
}
|
|
}
|
|
|
|
if completedTasks > 0 {
|
|
stats.AverageTaskTime = totalDuration / time.Duration(completedTasks)
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// workerCanHandle checks if a worker can handle a specific task type
|
|
func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabilities []MaintenanceTaskType) bool {
|
|
for _, capability := range capabilities {
|
|
if capability == taskType {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic
|
|
func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool {
|
|
glog.V(2).Infof("Checking if task %s (type: %s) can be scheduled", task.ID, task.Type)
|
|
|
|
// TEMPORARY FIX: Skip integration task scheduler which is being overly restrictive
|
|
// Use fallback logic directly for now
|
|
glog.V(2).Infof("Using fallback logic for task scheduling")
|
|
canExecute := mq.canExecuteTaskType(task.Type)
|
|
glog.V(2).Infof("Fallback decision for task %s: %v", task.ID, canExecute)
|
|
return canExecute
|
|
|
|
// NOTE: Original integration code disabled temporarily
|
|
// Try task scheduling logic first
|
|
/*
|
|
if mq.integration != nil {
|
|
glog.Infof("DEBUG canScheduleTaskNow: Using integration task scheduler")
|
|
// Get all running tasks and available workers
|
|
runningTasks := mq.getRunningTasks()
|
|
availableWorkers := mq.getAvailableWorkers()
|
|
|
|
glog.Infof("DEBUG canScheduleTaskNow: Running tasks: %d, Available workers: %d", len(runningTasks), len(availableWorkers))
|
|
|
|
canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers)
|
|
glog.Infof("DEBUG canScheduleTaskNow: Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule)
|
|
return canSchedule
|
|
}
|
|
*/
|
|
}
|
|
|
|
// canExecuteTaskType checks if we can execute more tasks of this type (concurrency limits) - fallback logic
|
|
func (mq *MaintenanceQueue) canExecuteTaskType(taskType MaintenanceTaskType) bool {
|
|
runningCount := mq.GetRunningTaskCount(taskType)
|
|
maxConcurrent := mq.getMaxConcurrentForTaskType(taskType)
|
|
|
|
canExecute := runningCount < maxConcurrent
|
|
glog.V(3).Infof("canExecuteTaskType for %s: running=%d, max=%d, canExecute=%v", taskType, runningCount, maxConcurrent, canExecute)
|
|
|
|
return canExecute
|
|
}
|
|
|
|
// getMaxConcurrentForTaskType returns the maximum concurrent tasks allowed for a task type
|
|
func (mq *MaintenanceQueue) getMaxConcurrentForTaskType(taskType MaintenanceTaskType) int {
|
|
// First try to get default from task scheduler
|
|
if mq.integration != nil {
|
|
if scheduler := mq.integration.GetTaskScheduler(taskType); scheduler != nil {
|
|
maxConcurrent := scheduler.GetMaxConcurrent()
|
|
if maxConcurrent > 0 {
|
|
glog.V(3).Infof("Using task scheduler max concurrent for %s: %d", taskType, maxConcurrent)
|
|
return maxConcurrent
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fallback to policy configuration if no scheduler available or scheduler doesn't provide default
|
|
if mq.policy != nil {
|
|
maxConcurrent := GetMaxConcurrent(mq.policy, taskType)
|
|
if maxConcurrent > 0 {
|
|
glog.V(3).Infof("Using policy configuration max concurrent for %s: %d", taskType, maxConcurrent)
|
|
return maxConcurrent
|
|
}
|
|
}
|
|
|
|
// Ultimate fallback - minimal safe default
|
|
glog.V(2).Infof("No scheduler or policy configuration found for task type %s, using minimal default: 1", taskType)
|
|
return 1
|
|
}
|
|
|
|
// getRunningTasks returns all currently running tasks
|
|
func (mq *MaintenanceQueue) getRunningTasks() []*MaintenanceTask {
|
|
var runningTasks []*MaintenanceTask
|
|
for _, task := range mq.tasks {
|
|
if task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress {
|
|
runningTasks = append(runningTasks, task)
|
|
}
|
|
}
|
|
return runningTasks
|
|
}
|
|
|
|
// getAvailableWorkers returns all workers that can take more work
|
|
func (mq *MaintenanceQueue) getAvailableWorkers() []*MaintenanceWorker {
|
|
var availableWorkers []*MaintenanceWorker
|
|
for _, worker := range mq.workers {
|
|
if worker.Status == "active" && worker.CurrentLoad < worker.MaxConcurrent {
|
|
availableWorkers = append(availableWorkers, worker)
|
|
}
|
|
}
|
|
return availableWorkers
|
|
}
|
|
|
|
// trackPendingOperation adds a task to the pending operations tracker
|
|
func (mq *MaintenanceQueue) trackPendingOperation(task *MaintenanceTask) {
|
|
if mq.integration == nil {
|
|
return
|
|
}
|
|
|
|
pendingOps := mq.integration.GetPendingOperations()
|
|
if pendingOps == nil {
|
|
return
|
|
}
|
|
|
|
// Skip tracking for tasks without proper typed parameters
|
|
if task.TypedParams == nil {
|
|
glog.V(2).Infof("Skipping pending operation tracking for task %s - no typed parameters", task.ID)
|
|
return
|
|
}
|
|
|
|
// Map maintenance task type to pending operation type
|
|
var opType PendingOperationType
|
|
switch task.Type {
|
|
case MaintenanceTaskType("balance"):
|
|
opType = OpTypeVolumeBalance
|
|
case MaintenanceTaskType("erasure_coding"):
|
|
opType = OpTypeErasureCoding
|
|
case MaintenanceTaskType("vacuum"):
|
|
opType = OpTypeVacuum
|
|
case MaintenanceTaskType("replication"):
|
|
opType = OpTypeReplication
|
|
default:
|
|
opType = OpTypeVolumeMove
|
|
}
|
|
|
|
// Determine destination node and estimated size from unified targets
|
|
destNode := ""
|
|
estimatedSize := uint64(1024 * 1024 * 1024) // Default 1GB estimate
|
|
|
|
// Use unified targets array - the only source of truth
|
|
if len(task.TypedParams.Targets) > 0 {
|
|
destNode = task.TypedParams.Targets[0].Node
|
|
if task.TypedParams.Targets[0].EstimatedSize > 0 {
|
|
estimatedSize = task.TypedParams.Targets[0].EstimatedSize
|
|
}
|
|
}
|
|
|
|
// Determine source node from unified sources
|
|
sourceNode := ""
|
|
if len(task.TypedParams.Sources) > 0 {
|
|
sourceNode = task.TypedParams.Sources[0].Node
|
|
}
|
|
|
|
operation := &PendingOperation{
|
|
VolumeID: task.VolumeID,
|
|
OperationType: opType,
|
|
SourceNode: sourceNode,
|
|
DestNode: destNode,
|
|
TaskID: task.ID,
|
|
StartTime: time.Now(),
|
|
EstimatedSize: estimatedSize,
|
|
Collection: task.Collection,
|
|
Status: "assigned",
|
|
}
|
|
|
|
pendingOps.AddOperation(operation)
|
|
}
|
|
|
|
// removePendingOperation removes a task from the pending operations tracker
|
|
func (mq *MaintenanceQueue) removePendingOperation(taskID string) {
|
|
if mq.integration == nil {
|
|
return
|
|
}
|
|
|
|
pendingOps := mq.integration.GetPendingOperations()
|
|
if pendingOps == nil {
|
|
return
|
|
}
|
|
|
|
pendingOps.RemoveOperation(taskID)
|
|
}
|
|
|
|
// updatePendingOperationStatus updates the status of a pending operation
|
|
func (mq *MaintenanceQueue) updatePendingOperationStatus(taskID string, status string) {
|
|
if mq.integration == nil {
|
|
return
|
|
}
|
|
|
|
pendingOps := mq.integration.GetPendingOperations()
|
|
if pendingOps == nil {
|
|
return
|
|
}
|
|
|
|
pendingOps.UpdateOperationStatus(taskID, status)
|
|
}
|