From b4c7d42a065d7421ed2d52fc4d42efe16b227a70 Mon Sep 17 00:00:00 2001 From: Anton Date: Tue, 24 Feb 2026 23:41:41 +0200 Subject: [PATCH] fix(admin): release mutex before disk I/O in maintenance queue; remove per-request LoadAllTaskStates (#8433) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(admin): release mutex before disk I/O in maintenance queue saveTaskState performs synchronous BoltDB writes. Calling it while holding mq.mutex.Lock() in AddTask, GetNextTask, and CompleteTask blocks all readers (GetTasks via RLock) for the full disk write duration on every task state change. During a maintenance scan AddTasksFromResults calls AddTask for every volume — potentially hundreds of times — meaning the write lock is held almost continuously. The HTTP handler for /maintenance calls GetTasks which blocks on RLock, exceeding the 30s timeout and returning 408 to the browser. Fix: update in-memory state (mq.tasks, mq.pendingTasks) under the lock as before, then unlock before calling saveTaskState. In-memory state is the authoritative source; persistence is crash-recovery only and does not require lock protection during the write. * fix(admin): add mutex to ConfigPersistence to synchronize tasks/ filesystem ops saveTaskState is now called outside mq.mutex, meaning SaveTaskState, LoadAllTaskStates, DeleteTaskState, and CleanupCompletedTasks can be invoked concurrently from multiple goroutines. ConfigPersistence had no internal synchronization, creating races on the tasks/ directory: - concurrent os.WriteFile + os.ReadFile on the same .pb file could yield a partial read and unmarshal error - LoadAllTaskStates (ReadDir + per-file ReadFile) could see a directory entry for a file being written or deleted concurrently - CleanupCompletedTasks (LoadAllTaskStates + DeleteTaskState) could race with SaveTaskState on the same file Fix: add tasksMu sync.Mutex to ConfigPersistence, acquired at the top of SaveTaskState, LoadTaskState, LoadAllTaskStates, DeleteTaskState, and CleanupCompletedTasks. Extract private Locked helpers so that CleanupCompletedTasks (which holds tasksMu) can call them internally without deadlocking. --------- Co-authored-by: Anton Ustyugov --- weed/admin/dash/config_persistence.go | 39 +++- weed/admin/maintenance/maintenance_queue.go | 206 ++++++++++++++------ 2 files changed, 184 insertions(+), 61 deletions(-) diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go index 061abc7c9..103640c86 100644 --- a/weed/admin/dash/config_persistence.go +++ b/weed/admin/dash/config_persistence.go @@ -7,6 +7,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/admin/maintenance" @@ -87,6 +88,11 @@ func isValidTaskID(taskID string) bool { // ConfigPersistence handles saving and loading configuration files type ConfigPersistence struct { dataDir string + // tasksMu serializes all filesystem operations on the tasks/ directory. + // SaveTaskState, LoadTaskState, LoadAllTaskStates, DeleteTaskState, and + // CleanupCompletedTasks are called from multiple goroutines concurrently + // after saveTaskState was moved outside mq.mutex in the maintenance queue. + tasksMu sync.Mutex } // NewConfigPersistence creates a new configuration persistence manager @@ -937,6 +943,8 @@ func (cp *ConfigPersistence) ListTaskDetails() ([]string, error) { // CleanupCompletedTasks removes old completed tasks beyond the retention limit func (cp *ConfigPersistence) CleanupCompletedTasks() error { + cp.tasksMu.Lock() + defer cp.tasksMu.Unlock() if cp.dataDir == "" { return fmt.Errorf("no data directory specified, cannot cleanup completed tasks") } @@ -946,8 +954,8 @@ func (cp *ConfigPersistence) CleanupCompletedTasks() error { return nil // No tasks directory, nothing to cleanup } - // Load all tasks and find completed/failed ones - allTasks, err := cp.LoadAllTaskStates() + // Use unlocked helpers to avoid deadlock (tasksMu is already held) + allTasks, err := cp.loadAllTaskStatesLocked() if err != nil { return fmt.Errorf("failed to load tasks for cleanup: %w", err) } @@ -998,7 +1006,7 @@ func (cp *ConfigPersistence) CleanupCompletedTasks() error { if len(completedTasks) > MaxCompletedTasks { tasksToDelete := completedTasks[MaxCompletedTasks:] for _, task := range tasksToDelete { - if err := cp.DeleteTaskState(task.ID); err != nil { + if err := cp.deleteTaskStateLocked(task.ID); err != nil { glog.Warningf("Failed to delete old completed task %s: %v", task.ID, err) } else { glog.V(2).Infof("Cleaned up old completed task %s (completed: %v)", task.ID, task.CompletedAt) @@ -1012,6 +1020,8 @@ func (cp *ConfigPersistence) CleanupCompletedTasks() error { // SaveTaskState saves a task state to protobuf file func (cp *ConfigPersistence) SaveTaskState(task *maintenance.MaintenanceTask) error { + cp.tasksMu.Lock() + defer cp.tasksMu.Unlock() if cp.dataDir == "" { return fmt.Errorf("no data directory specified, cannot save task state") } @@ -1051,6 +1061,13 @@ func (cp *ConfigPersistence) SaveTaskState(task *maintenance.MaintenanceTask) er // LoadTaskState loads a task state from protobuf file func (cp *ConfigPersistence) LoadTaskState(taskID string) (*maintenance.MaintenanceTask, error) { + cp.tasksMu.Lock() + defer cp.tasksMu.Unlock() + return cp.loadTaskStateLocked(taskID) +} + +// loadTaskStateLocked loads a single task state. Must be called with tasksMu held. +func (cp *ConfigPersistence) loadTaskStateLocked(taskID string) (*maintenance.MaintenanceTask, error) { if cp.dataDir == "" { return nil, fmt.Errorf("no data directory specified, cannot load task state") } @@ -1084,6 +1101,13 @@ func (cp *ConfigPersistence) LoadTaskState(taskID string) (*maintenance.Maintena // LoadAllTaskStates loads all task states from disk func (cp *ConfigPersistence) LoadAllTaskStates() ([]*maintenance.MaintenanceTask, error) { + cp.tasksMu.Lock() + defer cp.tasksMu.Unlock() + return cp.loadAllTaskStatesLocked() +} + +// loadAllTaskStatesLocked loads all task states from disk. Must be called with tasksMu held. +func (cp *ConfigPersistence) loadAllTaskStatesLocked() ([]*maintenance.MaintenanceTask, error) { if cp.dataDir == "" { return []*maintenance.MaintenanceTask{}, nil } @@ -1102,7 +1126,7 @@ func (cp *ConfigPersistence) LoadAllTaskStates() ([]*maintenance.MaintenanceTask for _, entry := range entries { if !entry.IsDir() && filepath.Ext(entry.Name()) == ".pb" { taskID := entry.Name()[:len(entry.Name())-3] // Remove .pb extension - task, err := cp.LoadTaskState(taskID) + task, err := cp.loadTaskStateLocked(taskID) if err != nil { glog.Warningf("Failed to load task state for %s: %v", taskID, err) continue @@ -1117,6 +1141,13 @@ func (cp *ConfigPersistence) LoadAllTaskStates() ([]*maintenance.MaintenanceTask // DeleteTaskState removes a task state file from disk func (cp *ConfigPersistence) DeleteTaskState(taskID string) error { + cp.tasksMu.Lock() + defer cp.tasksMu.Unlock() + return cp.deleteTaskStateLocked(taskID) +} + +// deleteTaskStateLocked removes a task state file. Must be called with tasksMu held. +func (cp *ConfigPersistence) deleteTaskStateLocked(taskID string) error { if cp.dataDir == "" { return fmt.Errorf("no data directory specified, cannot delete task state") } diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index e4b354723..0a2322160 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -130,10 +130,10 @@ func (mq *MaintenanceQueue) cleanupCompletedTasks() { // AddTask adds a new maintenance task to the queue with deduplication func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { mq.mutex.Lock() - defer mq.mutex.Unlock() // Check for duplicate tasks (same type + volume + not completed) if mq.hasDuplicateTask(task) { + mq.mutex.Unlock() glog.V(1).Infof("Task skipped (duplicate): %s for volume %d on %s (already queued or running)", task.Type, task.VolumeID, task.Server) return @@ -169,16 +169,23 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt) }) - // Save task state to persistence - mq.saveTaskState(task) - scheduleInfo := "" if !task.ScheduledAt.IsZero() && time.Until(task.ScheduledAt) > time.Minute { scheduleInfo = fmt.Sprintf(", scheduled for %v", task.ScheduledAt.Format("15:04:05")) } + // Snapshot task state while lock is still held to avoid data race; + // also capture log fields from the snapshot so the live task pointer + // is not accessed after mq.mutex is released. + taskSnapshot := snapshotTask(task) + mq.mutex.Unlock() + + // Save task state to persistence outside the lock to avoid blocking + // RegisterWorker and HTTP handlers (GetTasks) during disk I/O + mq.saveTaskState(taskSnapshot) + glog.Infof("Task queued: %s (%s) volume %d on %s, priority %d%s, reason: %s", - task.ID, task.Type, task.VolumeID, task.Server, task.Priority, scheduleInfo, task.Reason) + taskSnapshot.ID, taskSnapshot.Type, taskSnapshot.VolumeID, taskSnapshot.Server, taskSnapshot.Priority, scheduleInfo, taskSnapshot.Reason) } // hasDuplicateTask checks if a similar task already exists (same type, volume, and not completed) @@ -286,11 +293,14 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena // Now acquire write lock to actually assign the task mq.mutex.Lock() - defer mq.mutex.Unlock() + + // Capture ID before the re-check so it is available for logging after unlock. + selectedTaskID := selectedTask.ID // Re-check that the task is still available (it might have been assigned to another worker) - if selectedIndex >= len(mq.pendingTasks) || mq.pendingTasks[selectedIndex].ID != selectedTask.ID { - glog.V(2).Infof("Task %s no longer available for worker %s: assigned to another worker", selectedTask.ID, workerID) + if selectedIndex >= len(mq.pendingTasks) || mq.pendingTasks[selectedIndex].ID != selectedTaskID { + mq.mutex.Unlock() + glog.V(2).Infof("Task %s no longer available for worker %s: assigned to another worker", selectedTaskID, workerID) return nil } @@ -331,6 +341,7 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena if len(selectedTask.AssignmentHistory) > 0 { selectedTask.AssignmentHistory = selectedTask.AssignmentHistory[:len(selectedTask.AssignmentHistory)-1] } + mq.mutex.Unlock() // Return nil so the task is not removed from pendingTasks and not returned to the worker return nil } @@ -348,11 +359,15 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena // Track pending operation mq.trackPendingOperation(selectedTask) - // Save task state after assignment - mq.saveTaskState(selectedTask) + // Snapshot task state while lock is still held to avoid data race + selectedSnapshot := snapshotTask(selectedTask) + mq.mutex.Unlock() + + // Save task state to persistence outside the lock + mq.saveTaskState(selectedSnapshot) glog.Infof("Task assigned: %s (%s) → worker %s (volume %d, server %s)", - selectedTask.ID, selectedTask.Type, workerID, selectedTask.VolumeID, selectedTask.Server) + selectedSnapshot.ID, selectedSnapshot.Type, workerID, selectedSnapshot.VolumeID, selectedSnapshot.Server) return selectedTask } @@ -360,10 +375,10 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena // CompleteTask marks a task as completed func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { mq.mutex.Lock() - defer mq.mutex.Unlock() task, exists := mq.tasks[taskID] if !exists { + mq.mutex.Unlock() glog.Warningf("Attempted to complete non-existent task: %s", taskID) return } @@ -388,6 +403,12 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { duration = completedTime.Sub(*task.StartedAt) } + // Capture workerID before it may be cleared during retry + originalWorkerID := task.WorkerID + + var taskToSave *MaintenanceTask + var logFn func() + if error != "" { task.Status = TaskStatusFailed task.Error = error @@ -420,10 +441,12 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { mq.integration.SyncTask(task) } - // Save task state after retry setup - mq.saveTaskState(task) - glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s", - taskID, task.Type, task.RetryCount, task.MaxRetries, task.WorkerID, duration, error) + taskToSave = task + retryCount, maxRetries := task.RetryCount, task.MaxRetries + logFn = func() { + glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s", + taskID, task.Type, retryCount, maxRetries, originalWorkerID, duration, error) + } } else { // Record unassignment due to permanent failure if task.WorkerID != "" && len(task.AssignmentHistory) > 0 { @@ -435,23 +458,27 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { } } - // Save task state after permanent failure - mq.saveTaskState(task) - glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s", - taskID, task.Type, task.WorkerID, duration, task.MaxRetries, error) + taskToSave = task + maxRetries := task.MaxRetries + logFn = func() { + glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s", + taskID, task.Type, originalWorkerID, duration, maxRetries, error) + } } } else { task.Status = TaskStatusCompleted task.Progress = 100 - // Save task state after successful completion - mq.saveTaskState(task) - glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d", - taskID, task.Type, task.WorkerID, duration, task.VolumeID) + taskToSave = task + volumeID := task.VolumeID + logFn = func() { + glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d", + taskID, task.Type, originalWorkerID, duration, volumeID) + } } - // Update worker - if task.WorkerID != "" { - if worker, exists := mq.workers[task.WorkerID]; exists { + // Update worker load and capture state before releasing lock + if originalWorkerID != "" { + if worker, exists := mq.workers[originalWorkerID]; exists { worker.CurrentTask = nil worker.CurrentLoad-- if worker.CurrentLoad == 0 { @@ -459,16 +486,32 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { } } } + taskStatus := task.Status + taskCount := len(mq.tasks) + // Snapshot task state while lock is still held to avoid data race + var taskToSaveSnapshot *MaintenanceTask + if taskToSave != nil { + taskToSaveSnapshot = snapshotTask(taskToSave) + } + mq.mutex.Unlock() + + // Save task state to persistence outside the lock + if taskToSaveSnapshot != nil { + mq.saveTaskState(taskToSaveSnapshot) + } + + if logFn != nil { + logFn() + } // Remove pending operation (unless it's being retried) - if task.Status != TaskStatusPending { + if taskStatus != TaskStatusPending { mq.removePendingOperation(taskID) } - // Periodically cleanup old completed tasks (every 10th completion) - if task.Status == TaskStatusCompleted { - // Simple counter-based trigger for cleanup - if len(mq.tasks)%10 == 0 { + // Periodically cleanup old completed tasks (when total task count is a multiple of 10) + if taskStatus == TaskStatusCompleted { + if taskCount%10 == 0 { go mq.cleanupCompletedTasks() } } @@ -476,35 +519,46 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { // UpdateTaskProgress updates the progress of a running task func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64) { - mq.mutex.RLock() - defer mq.mutex.RUnlock() + mq.mutex.Lock() - if task, exists := mq.tasks[taskID]; exists { - oldProgress := task.Progress - task.Progress = progress - task.Status = TaskStatusInProgress + task, exists := mq.tasks[taskID] + if !exists { + mq.mutex.Unlock() + glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress) + return + } - // Update pending operation status - mq.updatePendingOperationStatus(taskID, "in_progress") + oldProgress := task.Progress + task.Progress = progress + task.Status = TaskStatusInProgress - // Log progress at significant milestones or changes - if progress == 0 { - glog.V(1).Infof("Task started: %s (%s) worker %s, volume %d", - taskID, task.Type, task.WorkerID, task.VolumeID) - } else if progress >= 100 { - glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", - taskID, task.Type, task.WorkerID, progress) - } else if progress-oldProgress >= 25 { // Log every 25% increment - glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", - taskID, task.Type, task.WorkerID, progress) - } + // Update pending operation status while lock is held + mq.updatePendingOperationStatus(taskID, "in_progress") - // Save task state after progress update - if progress == 0 || progress >= 100 || progress-oldProgress >= 10 { - mq.saveTaskState(task) - } - } else { - glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress) + // Determine whether to persist and capture log fields before unlocking + shouldSave := progress == 0 || progress >= 100 || progress-oldProgress >= 10 + var taskSnapshot *MaintenanceTask + if shouldSave { + taskSnapshot = snapshotTask(task) + } + taskType, workerID, volumeID := task.Type, task.WorkerID, task.VolumeID + mq.mutex.Unlock() + + // Log progress at significant milestones or changes + if progress == 0 { + glog.V(1).Infof("Task started: %s (%s) worker %s, volume %d", + taskID, taskType, workerID, volumeID) + } else if progress >= 100 { + glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", + taskID, taskType, workerID, progress) + } else if progress-oldProgress >= 25 { // Log every 25% increment + glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", + taskID, taskType, workerID, progress) + } + + // Save task state outside the lock to avoid blocking readers + if taskSnapshot != nil { + mq.saveTaskState(taskSnapshot) } } @@ -1004,3 +1058,41 @@ func (mq *MaintenanceQueue) updatePendingOperationStatus(taskID string, status s pendingOps.UpdateOperationStatus(taskID, status) } + +// snapshotTask returns a shallow copy of t with slice and map fields deep-copied +// so that the snapshot can be safely passed to saveTaskState after mq.mutex is +// released without racing against concurrent mutations of the live task struct. +// Must be called with mq.mutex held. +func snapshotTask(t *MaintenanceTask) *MaintenanceTask { + cp := *t // copy all scalar / pointer-sized fields + + // Deep-copy AssignmentHistory: the slice header and each record pointer. + // Records themselves are never mutated after being appended, so copying + // the pointers is sufficient. + if t.AssignmentHistory != nil { + cp.AssignmentHistory = make([]*TaskAssignmentRecord, len(t.AssignmentHistory)) + copy(cp.AssignmentHistory, t.AssignmentHistory) + } + + // Deep-copy Tags map to avoid concurrent map read/write. + if t.Tags != nil { + cp.Tags = make(map[string]string, len(t.Tags)) + for k, v := range t.Tags { + cp.Tags[k] = v + } + } + + // Copy optional time pointers so a concurrent nil-assignment (e.g. retry + // path clearing StartedAt) does not race with maintenanceTaskToProtobuf + // reading the pointed-to value. + if t.StartedAt != nil { + ts := *t.StartedAt + cp.StartedAt = &ts + } + if t.CompletedAt != nil { + tc := *t.CompletedAt + cp.CompletedAt = &tc + } + + return &cp +}