From 256b1c9c28bdbc5a76b05a9dbaa6124d83352ec6 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 13 Aug 2025 09:55:48 -0700 Subject: [PATCH] async persistence --- weed/admin/maintenance/maintenance_queue.go | 56 ++++++++++++++++----- weed/admin/maintenance/maintenance_types.go | 15 +++--- 2 files changed, 52 insertions(+), 19 deletions(-) diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index b939d763c..825b5eaea 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -12,11 +12,16 @@ import ( // NewMaintenanceQueue creates a new maintenance queue func NewMaintenanceQueue(policy *MaintenancePolicy) *MaintenanceQueue { queue := &MaintenanceQueue{ - tasks: make(map[string]*MaintenanceTask), - workers: make(map[string]*MaintenanceWorker), - pendingTasks: make([]*MaintenanceTask, 0), - policy: policy, + tasks: make(map[string]*MaintenanceTask), + workers: make(map[string]*MaintenanceWorker), + pendingTasks: make([]*MaintenanceTask, 0), + policy: policy, + persistenceChan: make(chan *MaintenanceTask, 1000), // Buffer for async persistence } + + // Start persistence worker goroutine + go queue.persistenceWorker() + return queue } @@ -39,16 +44,18 @@ func (mq *MaintenanceQueue) LoadTasksFromPersistence() error { return nil } - mq.mutex.Lock() - defer mq.mutex.Unlock() - glog.Infof("Loading tasks from persistence...") + // Load tasks without holding lock to prevent deadlock tasks, err := mq.persistence.LoadAllTaskStates() if err != nil { return fmt.Errorf("failed to load task states: %w", err) } + // Only acquire lock for the in-memory operations + mq.mutex.Lock() + defer mq.mutex.Unlock() + glog.Infof("DEBUG LoadTasksFromPersistence: Found %d tasks in persistence", len(tasks)) // Reset task maps @@ -104,11 +111,36 @@ func (mq *MaintenanceQueue) LoadTasksFromPersistence() error { return nil } -// saveTaskState saves a task to persistent storage +// persistenceWorker handles async persistence operations +func (mq *MaintenanceQueue) persistenceWorker() { + for task := range mq.persistenceChan { + if mq.persistence != nil { + if err := mq.persistence.SaveTaskState(task); err != nil { + glog.Errorf("Failed to save task state for %s: %v", task.ID, err) + } + } + } + glog.V(1).Infof("Persistence worker shut down") +} + +// Close gracefully shuts down the maintenance queue +func (mq *MaintenanceQueue) Close() { + if mq.persistenceChan != nil { + close(mq.persistenceChan) + glog.V(1).Infof("Maintenance queue persistence channel closed") + } +} + +// saveTaskState saves a task to persistent storage asynchronously func (mq *MaintenanceQueue) saveTaskState(task *MaintenanceTask) { - if mq.persistence != nil { - if err := mq.persistence.SaveTaskState(task); err != nil { - glog.Errorf("Failed to save task state for %s: %v", task.ID, err) + if mq.persistence != nil && mq.persistenceChan != nil { + // Create a copy to avoid race conditions + taskCopy := *task + select { + case mq.persistenceChan <- &taskCopy: + // Successfully queued for async persistence + default: + glog.Warningf("Persistence channel full, task state may be lost: %s", task.ID) } } } @@ -684,7 +716,7 @@ func (mq *MaintenanceQueue) RetryTask(taskID string) error { // Add back to pending queue mq.pendingTasks = append(mq.pendingTasks, task) - + // Save task state mq.saveTaskState(task) diff --git a/weed/admin/maintenance/maintenance_types.go b/weed/admin/maintenance/maintenance_types.go index d36e47939..510b55e63 100644 --- a/weed/admin/maintenance/maintenance_types.go +++ b/weed/admin/maintenance/maintenance_types.go @@ -249,13 +249,14 @@ type MaintenanceWorker struct { // MaintenanceQueue manages the task queue and worker coordination type MaintenanceQueue struct { - tasks map[string]*MaintenanceTask - workers map[string]*MaintenanceWorker - pendingTasks []*MaintenanceTask - mutex sync.RWMutex - policy *MaintenancePolicy - integration *MaintenanceIntegration - persistence TaskPersistence // Interface for task persistence + tasks map[string]*MaintenanceTask + workers map[string]*MaintenanceWorker + pendingTasks []*MaintenanceTask + mutex sync.RWMutex + policy *MaintenancePolicy + integration *MaintenanceIntegration + persistence TaskPersistence // Interface for task persistence + persistenceChan chan *MaintenanceTask // Channel for async persistence } // MaintenanceScanner analyzes the cluster and generates maintenance tasks