You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
407 lines
12 KiB
407 lines
12 KiB
package maintenance
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
)
|
|
|
|
// MaintenanceManager coordinates the maintenance system
|
|
type MaintenanceManager struct {
|
|
config *MaintenanceConfig
|
|
scanner *MaintenanceScanner
|
|
queue *MaintenanceQueue
|
|
adminClient AdminClient
|
|
running bool
|
|
stopChan chan struct{}
|
|
// Error handling and backoff
|
|
errorCount int
|
|
lastError error
|
|
lastErrorTime time.Time
|
|
backoffDelay time.Duration
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewMaintenanceManager creates a new maintenance manager
|
|
func NewMaintenanceManager(adminClient AdminClient, config *MaintenanceConfig) *MaintenanceManager {
|
|
if config == nil {
|
|
config = DefaultMaintenanceConfig()
|
|
}
|
|
|
|
queue := NewMaintenanceQueue(config.Policy)
|
|
scanner := NewMaintenanceScanner(adminClient, config.Policy, queue)
|
|
|
|
return &MaintenanceManager{
|
|
config: config,
|
|
scanner: scanner,
|
|
queue: queue,
|
|
adminClient: adminClient,
|
|
stopChan: make(chan struct{}),
|
|
backoffDelay: time.Second, // Start with 1 second backoff
|
|
}
|
|
}
|
|
|
|
// Start begins the maintenance manager
|
|
func (mm *MaintenanceManager) Start() error {
|
|
if !mm.config.Enabled {
|
|
glog.V(1).Infof("Maintenance system is disabled")
|
|
return nil
|
|
}
|
|
|
|
// Validate configuration durations to prevent ticker panics
|
|
if err := mm.validateConfig(); err != nil {
|
|
return fmt.Errorf("invalid maintenance configuration: %v", err)
|
|
}
|
|
|
|
mm.running = true
|
|
|
|
// Start background processes
|
|
go mm.scanLoop()
|
|
go mm.cleanupLoop()
|
|
|
|
glog.Infof("Maintenance manager started with scan interval %ds", mm.config.ScanIntervalSeconds)
|
|
return nil
|
|
}
|
|
|
|
// validateConfig validates the maintenance configuration durations
|
|
func (mm *MaintenanceManager) validateConfig() error {
|
|
if mm.config.ScanIntervalSeconds <= 0 {
|
|
glog.Warningf("Invalid scan interval %ds, using default 30m", mm.config.ScanIntervalSeconds)
|
|
mm.config.ScanIntervalSeconds = 30 * 60 // 30 minutes in seconds
|
|
}
|
|
|
|
if mm.config.CleanupIntervalSeconds <= 0 {
|
|
glog.Warningf("Invalid cleanup interval %ds, using default 24h", mm.config.CleanupIntervalSeconds)
|
|
mm.config.CleanupIntervalSeconds = 24 * 60 * 60 // 24 hours in seconds
|
|
}
|
|
|
|
if mm.config.WorkerTimeoutSeconds <= 0 {
|
|
glog.Warningf("Invalid worker timeout %ds, using default 5m", mm.config.WorkerTimeoutSeconds)
|
|
mm.config.WorkerTimeoutSeconds = 5 * 60 // 5 minutes in seconds
|
|
}
|
|
|
|
if mm.config.TaskTimeoutSeconds <= 0 {
|
|
glog.Warningf("Invalid task timeout %ds, using default 2h", mm.config.TaskTimeoutSeconds)
|
|
mm.config.TaskTimeoutSeconds = 2 * 60 * 60 // 2 hours in seconds
|
|
}
|
|
|
|
if mm.config.RetryDelaySeconds <= 0 {
|
|
glog.Warningf("Invalid retry delay %ds, using default 15m", mm.config.RetryDelaySeconds)
|
|
mm.config.RetryDelaySeconds = 15 * 60 // 15 minutes in seconds
|
|
}
|
|
|
|
if mm.config.TaskRetentionSeconds <= 0 {
|
|
glog.Warningf("Invalid task retention %ds, using default 168h", mm.config.TaskRetentionSeconds)
|
|
mm.config.TaskRetentionSeconds = 7 * 24 * 60 * 60 // 7 days in seconds
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// IsRunning returns whether the maintenance manager is currently running
|
|
func (mm *MaintenanceManager) IsRunning() bool {
|
|
return mm.running
|
|
}
|
|
|
|
// Stop terminates the maintenance manager
|
|
func (mm *MaintenanceManager) Stop() {
|
|
mm.running = false
|
|
close(mm.stopChan)
|
|
glog.Infof("Maintenance manager stopped")
|
|
}
|
|
|
|
// scanLoop periodically scans for maintenance tasks with adaptive timing
|
|
func (mm *MaintenanceManager) scanLoop() {
|
|
scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second
|
|
ticker := time.NewTicker(scanInterval)
|
|
defer ticker.Stop()
|
|
|
|
for mm.running {
|
|
select {
|
|
case <-mm.stopChan:
|
|
return
|
|
case <-ticker.C:
|
|
glog.V(1).Infof("Performing maintenance scan every %v", scanInterval)
|
|
mm.performScan()
|
|
|
|
// Adjust ticker interval based on error state
|
|
mm.mutex.RLock()
|
|
currentInterval := scanInterval
|
|
if mm.errorCount > 0 {
|
|
// Use backoff delay when there are errors
|
|
currentInterval = mm.backoffDelay
|
|
if currentInterval > scanInterval {
|
|
// Don't make it longer than the configured interval * 10
|
|
maxInterval := scanInterval * 10
|
|
if currentInterval > maxInterval {
|
|
currentInterval = maxInterval
|
|
}
|
|
}
|
|
}
|
|
mm.mutex.RUnlock()
|
|
|
|
// Reset ticker with new interval if needed
|
|
if currentInterval != scanInterval {
|
|
ticker.Stop()
|
|
ticker = time.NewTicker(currentInterval)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupLoop periodically cleans up old tasks and stale workers
|
|
func (mm *MaintenanceManager) cleanupLoop() {
|
|
cleanupInterval := time.Duration(mm.config.CleanupIntervalSeconds) * time.Second
|
|
ticker := time.NewTicker(cleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
for mm.running {
|
|
select {
|
|
case <-mm.stopChan:
|
|
return
|
|
case <-ticker.C:
|
|
mm.performCleanup()
|
|
}
|
|
}
|
|
}
|
|
|
|
// performScan executes a maintenance scan with error handling and backoff
|
|
func (mm *MaintenanceManager) performScan() {
|
|
mm.mutex.Lock()
|
|
defer mm.mutex.Unlock()
|
|
|
|
glog.V(2).Infof("Starting maintenance scan")
|
|
|
|
results, err := mm.scanner.ScanForMaintenanceTasks()
|
|
if err != nil {
|
|
mm.handleScanError(err)
|
|
return
|
|
}
|
|
|
|
// Scan succeeded, reset error tracking
|
|
mm.resetErrorTracking()
|
|
|
|
if len(results) > 0 {
|
|
mm.queue.AddTasksFromResults(results)
|
|
glog.V(1).Infof("Maintenance scan completed: added %d tasks", len(results))
|
|
} else {
|
|
glog.V(2).Infof("Maintenance scan completed: no tasks needed")
|
|
}
|
|
}
|
|
|
|
// handleScanError handles scan errors with exponential backoff and reduced logging
|
|
func (mm *MaintenanceManager) handleScanError(err error) {
|
|
now := time.Now()
|
|
mm.errorCount++
|
|
mm.lastError = err
|
|
mm.lastErrorTime = now
|
|
|
|
// Use exponential backoff with jitter
|
|
if mm.errorCount > 1 {
|
|
mm.backoffDelay = mm.backoffDelay * 2
|
|
if mm.backoffDelay > 5*time.Minute {
|
|
mm.backoffDelay = 5 * time.Minute // Cap at 5 minutes
|
|
}
|
|
}
|
|
|
|
// Reduce log frequency based on error count and time
|
|
shouldLog := false
|
|
if mm.errorCount <= 3 {
|
|
// Log first 3 errors immediately
|
|
shouldLog = true
|
|
} else if mm.errorCount <= 10 && mm.errorCount%3 == 0 {
|
|
// Log every 3rd error for errors 4-10
|
|
shouldLog = true
|
|
} else if mm.errorCount%10 == 0 {
|
|
// Log every 10th error after that
|
|
shouldLog = true
|
|
}
|
|
|
|
if shouldLog {
|
|
// Check if it's a connection error to provide better messaging
|
|
if isConnectionError(err) {
|
|
if mm.errorCount == 1 {
|
|
glog.Errorf("Maintenance scan failed: %v (will retry with backoff)", err)
|
|
} else {
|
|
glog.Errorf("Maintenance scan still failing after %d attempts: %v (backoff: %v)",
|
|
mm.errorCount, err, mm.backoffDelay)
|
|
}
|
|
} else {
|
|
glog.Errorf("Maintenance scan failed: %v", err)
|
|
}
|
|
} else {
|
|
// Use debug level for suppressed errors
|
|
glog.V(3).Infof("Maintenance scan failed (error #%d, suppressed): %v", mm.errorCount, err)
|
|
}
|
|
}
|
|
|
|
// resetErrorTracking resets error tracking when scan succeeds
|
|
func (mm *MaintenanceManager) resetErrorTracking() {
|
|
if mm.errorCount > 0 {
|
|
glog.V(1).Infof("Maintenance scan recovered after %d failed attempts", mm.errorCount)
|
|
mm.errorCount = 0
|
|
mm.lastError = nil
|
|
mm.backoffDelay = time.Second // Reset to initial delay
|
|
}
|
|
}
|
|
|
|
// isConnectionError checks if the error is a connection-related error
|
|
func isConnectionError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
errStr := err.Error()
|
|
return strings.Contains(errStr, "connection refused") ||
|
|
strings.Contains(errStr, "connection error") ||
|
|
strings.Contains(errStr, "dial tcp") ||
|
|
strings.Contains(errStr, "connection timeout") ||
|
|
strings.Contains(errStr, "no route to host") ||
|
|
strings.Contains(errStr, "network unreachable")
|
|
}
|
|
|
|
// performCleanup cleans up old tasks and stale workers
|
|
func (mm *MaintenanceManager) performCleanup() {
|
|
glog.V(2).Infof("Starting maintenance cleanup")
|
|
|
|
taskRetention := time.Duration(mm.config.TaskRetentionSeconds) * time.Second
|
|
workerTimeout := time.Duration(mm.config.WorkerTimeoutSeconds) * time.Second
|
|
|
|
removedTasks := mm.queue.CleanupOldTasks(taskRetention)
|
|
removedWorkers := mm.queue.RemoveStaleWorkers(workerTimeout)
|
|
|
|
if removedTasks > 0 || removedWorkers > 0 {
|
|
glog.V(1).Infof("Cleanup completed: removed %d old tasks and %d stale workers", removedTasks, removedWorkers)
|
|
}
|
|
}
|
|
|
|
// GetQueue returns the maintenance queue
|
|
func (mm *MaintenanceManager) GetQueue() *MaintenanceQueue {
|
|
return mm.queue
|
|
}
|
|
|
|
// GetConfig returns the maintenance configuration
|
|
func (mm *MaintenanceManager) GetConfig() *MaintenanceConfig {
|
|
return mm.config
|
|
}
|
|
|
|
// GetStats returns maintenance statistics
|
|
func (mm *MaintenanceManager) GetStats() *MaintenanceStats {
|
|
stats := mm.queue.GetStats()
|
|
|
|
mm.mutex.RLock()
|
|
defer mm.mutex.RUnlock()
|
|
|
|
stats.LastScanTime = time.Now() // Would need to track this properly
|
|
|
|
// Calculate next scan time based on current error state
|
|
scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second
|
|
nextScanInterval := scanInterval
|
|
if mm.errorCount > 0 {
|
|
nextScanInterval = mm.backoffDelay
|
|
maxInterval := scanInterval * 10
|
|
if nextScanInterval > maxInterval {
|
|
nextScanInterval = maxInterval
|
|
}
|
|
}
|
|
stats.NextScanTime = time.Now().Add(nextScanInterval)
|
|
|
|
return stats
|
|
}
|
|
|
|
// GetErrorState returns the current error state for monitoring
|
|
func (mm *MaintenanceManager) GetErrorState() (errorCount int, lastError error, backoffDelay time.Duration) {
|
|
mm.mutex.RLock()
|
|
defer mm.mutex.RUnlock()
|
|
return mm.errorCount, mm.lastError, mm.backoffDelay
|
|
}
|
|
|
|
// GetTasks returns tasks with filtering
|
|
func (mm *MaintenanceManager) GetTasks(status MaintenanceTaskStatus, taskType MaintenanceTaskType, limit int) []*MaintenanceTask {
|
|
return mm.queue.GetTasks(status, taskType, limit)
|
|
}
|
|
|
|
// GetWorkers returns all registered workers
|
|
func (mm *MaintenanceManager) GetWorkers() []*MaintenanceWorker {
|
|
return mm.queue.GetWorkers()
|
|
}
|
|
|
|
// TriggerScan manually triggers a maintenance scan
|
|
func (mm *MaintenanceManager) TriggerScan() error {
|
|
if !mm.running {
|
|
return fmt.Errorf("maintenance manager is not running")
|
|
}
|
|
|
|
go mm.performScan()
|
|
return nil
|
|
}
|
|
|
|
// UpdateConfig updates the maintenance configuration
|
|
func (mm *MaintenanceManager) UpdateConfig(config *MaintenanceConfig) error {
|
|
if config == nil {
|
|
return fmt.Errorf("config cannot be nil")
|
|
}
|
|
|
|
mm.config = config
|
|
mm.queue.policy = config.Policy
|
|
mm.scanner.policy = config.Policy
|
|
|
|
glog.V(1).Infof("Maintenance configuration updated")
|
|
return nil
|
|
}
|
|
|
|
// CancelTask cancels a pending task
|
|
func (mm *MaintenanceManager) CancelTask(taskID string) error {
|
|
mm.queue.mutex.Lock()
|
|
defer mm.queue.mutex.Unlock()
|
|
|
|
task, exists := mm.queue.tasks[taskID]
|
|
if !exists {
|
|
return fmt.Errorf("task %s not found", taskID)
|
|
}
|
|
|
|
if task.Status == TaskStatusPending {
|
|
task.Status = TaskStatusCancelled
|
|
task.CompletedAt = &[]time.Time{time.Now()}[0]
|
|
|
|
// Remove from pending tasks
|
|
for i, pendingTask := range mm.queue.pendingTasks {
|
|
if pendingTask.ID == taskID {
|
|
mm.queue.pendingTasks = append(mm.queue.pendingTasks[:i], mm.queue.pendingTasks[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
|
|
glog.V(2).Infof("Cancelled task %s", taskID)
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("task %s cannot be cancelled (status: %s)", taskID, task.Status)
|
|
}
|
|
|
|
// RegisterWorker registers a new worker
|
|
func (mm *MaintenanceManager) RegisterWorker(worker *MaintenanceWorker) {
|
|
mm.queue.RegisterWorker(worker)
|
|
}
|
|
|
|
// GetNextTask returns the next task for a worker
|
|
func (mm *MaintenanceManager) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
|
|
return mm.queue.GetNextTask(workerID, capabilities)
|
|
}
|
|
|
|
// CompleteTask marks a task as completed
|
|
func (mm *MaintenanceManager) CompleteTask(taskID string, error string) {
|
|
mm.queue.CompleteTask(taskID, error)
|
|
}
|
|
|
|
// UpdateTaskProgress updates task progress
|
|
func (mm *MaintenanceManager) UpdateTaskProgress(taskID string, progress float64) {
|
|
mm.queue.UpdateTaskProgress(taskID, progress)
|
|
}
|
|
|
|
// UpdateWorkerHeartbeat updates worker heartbeat
|
|
func (mm *MaintenanceManager) UpdateWorkerHeartbeat(workerID string) {
|
|
mm.queue.UpdateWorkerHeartbeat(workerID)
|
|
}
|