diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go index 061abc7c9..103640c86 100644 --- a/weed/admin/dash/config_persistence.go +++ b/weed/admin/dash/config_persistence.go @@ -7,6 +7,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/admin/maintenance" @@ -87,6 +88,11 @@ func isValidTaskID(taskID string) bool { // ConfigPersistence handles saving and loading configuration files type ConfigPersistence struct { dataDir string + // tasksMu serializes all filesystem operations on the tasks/ directory. + // SaveTaskState, LoadTaskState, LoadAllTaskStates, DeleteTaskState, and + // CleanupCompletedTasks are called from multiple goroutines concurrently + // after saveTaskState was moved outside mq.mutex in the maintenance queue. + tasksMu sync.Mutex } // NewConfigPersistence creates a new configuration persistence manager @@ -937,6 +943,8 @@ func (cp *ConfigPersistence) ListTaskDetails() ([]string, error) { // CleanupCompletedTasks removes old completed tasks beyond the retention limit func (cp *ConfigPersistence) CleanupCompletedTasks() error { + cp.tasksMu.Lock() + defer cp.tasksMu.Unlock() if cp.dataDir == "" { return fmt.Errorf("no data directory specified, cannot cleanup completed tasks") } @@ -946,8 +954,8 @@ func (cp *ConfigPersistence) CleanupCompletedTasks() error { return nil // No tasks directory, nothing to cleanup } - // Load all tasks and find completed/failed ones - allTasks, err := cp.LoadAllTaskStates() + // Use unlocked helpers to avoid deadlock (tasksMu is already held) + allTasks, err := cp.loadAllTaskStatesLocked() if err != nil { return fmt.Errorf("failed to load tasks for cleanup: %w", err) } @@ -998,7 +1006,7 @@ func (cp *ConfigPersistence) CleanupCompletedTasks() error { if len(completedTasks) > MaxCompletedTasks { tasksToDelete := completedTasks[MaxCompletedTasks:] for _, task := range tasksToDelete { - if err := cp.DeleteTaskState(task.ID); err != nil { + if err := cp.deleteTaskStateLocked(task.ID); err != nil { glog.Warningf("Failed to delete old completed task %s: %v", task.ID, err) } else { glog.V(2).Infof("Cleaned up old completed task %s (completed: %v)", task.ID, task.CompletedAt) @@ -1012,6 +1020,8 @@ func (cp *ConfigPersistence) CleanupCompletedTasks() error { // SaveTaskState saves a task state to protobuf file func (cp *ConfigPersistence) SaveTaskState(task *maintenance.MaintenanceTask) error { + cp.tasksMu.Lock() + defer cp.tasksMu.Unlock() if cp.dataDir == "" { return fmt.Errorf("no data directory specified, cannot save task state") } @@ -1051,6 +1061,13 @@ func (cp *ConfigPersistence) SaveTaskState(task *maintenance.MaintenanceTask) er // LoadTaskState loads a task state from protobuf file func (cp *ConfigPersistence) LoadTaskState(taskID string) (*maintenance.MaintenanceTask, error) { + cp.tasksMu.Lock() + defer cp.tasksMu.Unlock() + return cp.loadTaskStateLocked(taskID) +} + +// loadTaskStateLocked loads a single task state. Must be called with tasksMu held. +func (cp *ConfigPersistence) loadTaskStateLocked(taskID string) (*maintenance.MaintenanceTask, error) { if cp.dataDir == "" { return nil, fmt.Errorf("no data directory specified, cannot load task state") } @@ -1084,6 +1101,13 @@ func (cp *ConfigPersistence) LoadTaskState(taskID string) (*maintenance.Maintena // LoadAllTaskStates loads all task states from disk func (cp *ConfigPersistence) LoadAllTaskStates() ([]*maintenance.MaintenanceTask, error) { + cp.tasksMu.Lock() + defer cp.tasksMu.Unlock() + return cp.loadAllTaskStatesLocked() +} + +// loadAllTaskStatesLocked loads all task states from disk. Must be called with tasksMu held. +func (cp *ConfigPersistence) loadAllTaskStatesLocked() ([]*maintenance.MaintenanceTask, error) { if cp.dataDir == "" { return []*maintenance.MaintenanceTask{}, nil } @@ -1102,7 +1126,7 @@ func (cp *ConfigPersistence) LoadAllTaskStates() ([]*maintenance.MaintenanceTask for _, entry := range entries { if !entry.IsDir() && filepath.Ext(entry.Name()) == ".pb" { taskID := entry.Name()[:len(entry.Name())-3] // Remove .pb extension - task, err := cp.LoadTaskState(taskID) + task, err := cp.loadTaskStateLocked(taskID) if err != nil { glog.Warningf("Failed to load task state for %s: %v", taskID, err) continue @@ -1117,6 +1141,13 @@ func (cp *ConfigPersistence) LoadAllTaskStates() ([]*maintenance.MaintenanceTask // DeleteTaskState removes a task state file from disk func (cp *ConfigPersistence) DeleteTaskState(taskID string) error { + cp.tasksMu.Lock() + defer cp.tasksMu.Unlock() + return cp.deleteTaskStateLocked(taskID) +} + +// deleteTaskStateLocked removes a task state file. Must be called with tasksMu held. +func (cp *ConfigPersistence) deleteTaskStateLocked(taskID string) error { if cp.dataDir == "" { return fmt.Errorf("no data directory specified, cannot delete task state") } diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index e4b354723..0a2322160 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -130,10 +130,10 @@ func (mq *MaintenanceQueue) cleanupCompletedTasks() { // AddTask adds a new maintenance task to the queue with deduplication func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { mq.mutex.Lock() - defer mq.mutex.Unlock() // Check for duplicate tasks (same type + volume + not completed) if mq.hasDuplicateTask(task) { + mq.mutex.Unlock() glog.V(1).Infof("Task skipped (duplicate): %s for volume %d on %s (already queued or running)", task.Type, task.VolumeID, task.Server) return @@ -169,16 +169,23 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt) }) - // Save task state to persistence - mq.saveTaskState(task) - scheduleInfo := "" if !task.ScheduledAt.IsZero() && time.Until(task.ScheduledAt) > time.Minute { scheduleInfo = fmt.Sprintf(", scheduled for %v", task.ScheduledAt.Format("15:04:05")) } + // Snapshot task state while lock is still held to avoid data race; + // also capture log fields from the snapshot so the live task pointer + // is not accessed after mq.mutex is released. + taskSnapshot := snapshotTask(task) + mq.mutex.Unlock() + + // Save task state to persistence outside the lock to avoid blocking + // RegisterWorker and HTTP handlers (GetTasks) during disk I/O + mq.saveTaskState(taskSnapshot) + glog.Infof("Task queued: %s (%s) volume %d on %s, priority %d%s, reason: %s", - task.ID, task.Type, task.VolumeID, task.Server, task.Priority, scheduleInfo, task.Reason) + taskSnapshot.ID, taskSnapshot.Type, taskSnapshot.VolumeID, taskSnapshot.Server, taskSnapshot.Priority, scheduleInfo, taskSnapshot.Reason) } // hasDuplicateTask checks if a similar task already exists (same type, volume, and not completed) @@ -286,11 +293,14 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena // Now acquire write lock to actually assign the task mq.mutex.Lock() - defer mq.mutex.Unlock() + + // Capture ID before the re-check so it is available for logging after unlock. + selectedTaskID := selectedTask.ID // Re-check that the task is still available (it might have been assigned to another worker) - if selectedIndex >= len(mq.pendingTasks) || mq.pendingTasks[selectedIndex].ID != selectedTask.ID { - glog.V(2).Infof("Task %s no longer available for worker %s: assigned to another worker", selectedTask.ID, workerID) + if selectedIndex >= len(mq.pendingTasks) || mq.pendingTasks[selectedIndex].ID != selectedTaskID { + mq.mutex.Unlock() + glog.V(2).Infof("Task %s no longer available for worker %s: assigned to another worker", selectedTaskID, workerID) return nil } @@ -331,6 +341,7 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena if len(selectedTask.AssignmentHistory) > 0 { selectedTask.AssignmentHistory = selectedTask.AssignmentHistory[:len(selectedTask.AssignmentHistory)-1] } + mq.mutex.Unlock() // Return nil so the task is not removed from pendingTasks and not returned to the worker return nil } @@ -348,11 +359,15 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena // Track pending operation mq.trackPendingOperation(selectedTask) - // Save task state after assignment - mq.saveTaskState(selectedTask) + // Snapshot task state while lock is still held to avoid data race + selectedSnapshot := snapshotTask(selectedTask) + mq.mutex.Unlock() + + // Save task state to persistence outside the lock + mq.saveTaskState(selectedSnapshot) glog.Infof("Task assigned: %s (%s) → worker %s (volume %d, server %s)", - selectedTask.ID, selectedTask.Type, workerID, selectedTask.VolumeID, selectedTask.Server) + selectedSnapshot.ID, selectedSnapshot.Type, workerID, selectedSnapshot.VolumeID, selectedSnapshot.Server) return selectedTask } @@ -360,10 +375,10 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena // CompleteTask marks a task as completed func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { mq.mutex.Lock() - defer mq.mutex.Unlock() task, exists := mq.tasks[taskID] if !exists { + mq.mutex.Unlock() glog.Warningf("Attempted to complete non-existent task: %s", taskID) return } @@ -388,6 +403,12 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { duration = completedTime.Sub(*task.StartedAt) } + // Capture workerID before it may be cleared during retry + originalWorkerID := task.WorkerID + + var taskToSave *MaintenanceTask + var logFn func() + if error != "" { task.Status = TaskStatusFailed task.Error = error @@ -420,10 +441,12 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { mq.integration.SyncTask(task) } - // Save task state after retry setup - mq.saveTaskState(task) - glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s", - taskID, task.Type, task.RetryCount, task.MaxRetries, task.WorkerID, duration, error) + taskToSave = task + retryCount, maxRetries := task.RetryCount, task.MaxRetries + logFn = func() { + glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s", + taskID, task.Type, retryCount, maxRetries, originalWorkerID, duration, error) + } } else { // Record unassignment due to permanent failure if task.WorkerID != "" && len(task.AssignmentHistory) > 0 { @@ -435,23 +458,27 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { } } - // Save task state after permanent failure - mq.saveTaskState(task) - glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s", - taskID, task.Type, task.WorkerID, duration, task.MaxRetries, error) + taskToSave = task + maxRetries := task.MaxRetries + logFn = func() { + glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s", + taskID, task.Type, originalWorkerID, duration, maxRetries, error) + } } } else { task.Status = TaskStatusCompleted task.Progress = 100 - // Save task state after successful completion - mq.saveTaskState(task) - glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d", - taskID, task.Type, task.WorkerID, duration, task.VolumeID) + taskToSave = task + volumeID := task.VolumeID + logFn = func() { + glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d", + taskID, task.Type, originalWorkerID, duration, volumeID) + } } - // Update worker - if task.WorkerID != "" { - if worker, exists := mq.workers[task.WorkerID]; exists { + // Update worker load and capture state before releasing lock + if originalWorkerID != "" { + if worker, exists := mq.workers[originalWorkerID]; exists { worker.CurrentTask = nil worker.CurrentLoad-- if worker.CurrentLoad == 0 { @@ -459,16 +486,32 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { } } } + taskStatus := task.Status + taskCount := len(mq.tasks) + // Snapshot task state while lock is still held to avoid data race + var taskToSaveSnapshot *MaintenanceTask + if taskToSave != nil { + taskToSaveSnapshot = snapshotTask(taskToSave) + } + mq.mutex.Unlock() + + // Save task state to persistence outside the lock + if taskToSaveSnapshot != nil { + mq.saveTaskState(taskToSaveSnapshot) + } + + if logFn != nil { + logFn() + } // Remove pending operation (unless it's being retried) - if task.Status != TaskStatusPending { + if taskStatus != TaskStatusPending { mq.removePendingOperation(taskID) } - // Periodically cleanup old completed tasks (every 10th completion) - if task.Status == TaskStatusCompleted { - // Simple counter-based trigger for cleanup - if len(mq.tasks)%10 == 0 { + // Periodically cleanup old completed tasks (when total task count is a multiple of 10) + if taskStatus == TaskStatusCompleted { + if taskCount%10 == 0 { go mq.cleanupCompletedTasks() } } @@ -476,35 +519,46 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { // UpdateTaskProgress updates the progress of a running task func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64) { - mq.mutex.RLock() - defer mq.mutex.RUnlock() + mq.mutex.Lock() - if task, exists := mq.tasks[taskID]; exists { - oldProgress := task.Progress - task.Progress = progress - task.Status = TaskStatusInProgress + task, exists := mq.tasks[taskID] + if !exists { + mq.mutex.Unlock() + glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress) + return + } - // Update pending operation status - mq.updatePendingOperationStatus(taskID, "in_progress") + oldProgress := task.Progress + task.Progress = progress + task.Status = TaskStatusInProgress - // Log progress at significant milestones or changes - if progress == 0 { - glog.V(1).Infof("Task started: %s (%s) worker %s, volume %d", - taskID, task.Type, task.WorkerID, task.VolumeID) - } else if progress >= 100 { - glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", - taskID, task.Type, task.WorkerID, progress) - } else if progress-oldProgress >= 25 { // Log every 25% increment - glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", - taskID, task.Type, task.WorkerID, progress) - } + // Update pending operation status while lock is held + mq.updatePendingOperationStatus(taskID, "in_progress") - // Save task state after progress update - if progress == 0 || progress >= 100 || progress-oldProgress >= 10 { - mq.saveTaskState(task) - } - } else { - glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress) + // Determine whether to persist and capture log fields before unlocking + shouldSave := progress == 0 || progress >= 100 || progress-oldProgress >= 10 + var taskSnapshot *MaintenanceTask + if shouldSave { + taskSnapshot = snapshotTask(task) + } + taskType, workerID, volumeID := task.Type, task.WorkerID, task.VolumeID + mq.mutex.Unlock() + + // Log progress at significant milestones or changes + if progress == 0 { + glog.V(1).Infof("Task started: %s (%s) worker %s, volume %d", + taskID, taskType, workerID, volumeID) + } else if progress >= 100 { + glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", + taskID, taskType, workerID, progress) + } else if progress-oldProgress >= 25 { // Log every 25% increment + glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", + taskID, taskType, workerID, progress) + } + + // Save task state outside the lock to avoid blocking readers + if taskSnapshot != nil { + mq.saveTaskState(taskSnapshot) } } @@ -1004,3 +1058,41 @@ func (mq *MaintenanceQueue) updatePendingOperationStatus(taskID string, status s pendingOps.UpdateOperationStatus(taskID, status) } + +// snapshotTask returns a shallow copy of t with slice and map fields deep-copied +// so that the snapshot can be safely passed to saveTaskState after mq.mutex is +// released without racing against concurrent mutations of the live task struct. +// Must be called with mq.mutex held. +func snapshotTask(t *MaintenanceTask) *MaintenanceTask { + cp := *t // copy all scalar / pointer-sized fields + + // Deep-copy AssignmentHistory: the slice header and each record pointer. + // Records themselves are never mutated after being appended, so copying + // the pointers is sufficient. + if t.AssignmentHistory != nil { + cp.AssignmentHistory = make([]*TaskAssignmentRecord, len(t.AssignmentHistory)) + copy(cp.AssignmentHistory, t.AssignmentHistory) + } + + // Deep-copy Tags map to avoid concurrent map read/write. + if t.Tags != nil { + cp.Tags = make(map[string]string, len(t.Tags)) + for k, v := range t.Tags { + cp.Tags[k] = v + } + } + + // Copy optional time pointers so a concurrent nil-assignment (e.g. retry + // path clearing StartedAt) does not race with maintenanceTaskToProtobuf + // reading the pointed-to value. + if t.StartedAt != nil { + ts := *t.StartedAt + cp.StartedAt = &ts + } + if t.CompletedAt != nil { + tc := *t.CompletedAt + cp.CompletedAt = &tc + } + + return &cp +}