Browse Source

async persistence

add-ec-vacuum
chrislu 4 months ago
parent
commit
256b1c9c28
  1. 42
      weed/admin/maintenance/maintenance_queue.go
  2. 1
      weed/admin/maintenance/maintenance_types.go

42
weed/admin/maintenance/maintenance_queue.go

@ -16,7 +16,12 @@ func NewMaintenanceQueue(policy *MaintenancePolicy) *MaintenanceQueue {
workers: make(map[string]*MaintenanceWorker), workers: make(map[string]*MaintenanceWorker),
pendingTasks: make([]*MaintenanceTask, 0), pendingTasks: make([]*MaintenanceTask, 0),
policy: policy, policy: policy,
persistenceChan: make(chan *MaintenanceTask, 1000), // Buffer for async persistence
} }
// Start persistence worker goroutine
go queue.persistenceWorker()
return queue return queue
} }
@ -39,16 +44,18 @@ func (mq *MaintenanceQueue) LoadTasksFromPersistence() error {
return nil return nil
} }
mq.mutex.Lock()
defer mq.mutex.Unlock()
glog.Infof("Loading tasks from persistence...") glog.Infof("Loading tasks from persistence...")
// Load tasks without holding lock to prevent deadlock
tasks, err := mq.persistence.LoadAllTaskStates() tasks, err := mq.persistence.LoadAllTaskStates()
if err != nil { if err != nil {
return fmt.Errorf("failed to load task states: %w", err) return fmt.Errorf("failed to load task states: %w", err)
} }
// Only acquire lock for the in-memory operations
mq.mutex.Lock()
defer mq.mutex.Unlock()
glog.Infof("DEBUG LoadTasksFromPersistence: Found %d tasks in persistence", len(tasks)) glog.Infof("DEBUG LoadTasksFromPersistence: Found %d tasks in persistence", len(tasks))
// Reset task maps // Reset task maps
@ -104,13 +111,38 @@ func (mq *MaintenanceQueue) LoadTasksFromPersistence() error {
return nil return nil
} }
// saveTaskState saves a task to persistent storage
func (mq *MaintenanceQueue) saveTaskState(task *MaintenanceTask) {
// persistenceWorker handles async persistence operations
func (mq *MaintenanceQueue) persistenceWorker() {
for task := range mq.persistenceChan {
if mq.persistence != nil { if mq.persistence != nil {
if err := mq.persistence.SaveTaskState(task); err != nil { if err := mq.persistence.SaveTaskState(task); err != nil {
glog.Errorf("Failed to save task state for %s: %v", task.ID, err) glog.Errorf("Failed to save task state for %s: %v", task.ID, err)
} }
} }
}
glog.V(1).Infof("Persistence worker shut down")
}
// Close gracefully shuts down the maintenance queue
func (mq *MaintenanceQueue) Close() {
if mq.persistenceChan != nil {
close(mq.persistenceChan)
glog.V(1).Infof("Maintenance queue persistence channel closed")
}
}
// saveTaskState saves a task to persistent storage asynchronously
func (mq *MaintenanceQueue) saveTaskState(task *MaintenanceTask) {
if mq.persistence != nil && mq.persistenceChan != nil {
// Create a copy to avoid race conditions
taskCopy := *task
select {
case mq.persistenceChan <- &taskCopy:
// Successfully queued for async persistence
default:
glog.Warningf("Persistence channel full, task state may be lost: %s", task.ID)
}
}
} }
// cleanupCompletedTasks removes old completed tasks beyond the retention limit // cleanupCompletedTasks removes old completed tasks beyond the retention limit

1
weed/admin/maintenance/maintenance_types.go

@ -256,6 +256,7 @@ type MaintenanceQueue struct {
policy *MaintenancePolicy policy *MaintenancePolicy
integration *MaintenanceIntegration integration *MaintenanceIntegration
persistence TaskPersistence // Interface for task persistence persistence TaskPersistence // Interface for task persistence
persistenceChan chan *MaintenanceTask // Channel for async persistence
} }
// MaintenanceScanner analyzes the cluster and generates maintenance tasks // MaintenanceScanner analyzes the cluster and generates maintenance tasks

Loading…
Cancel
Save