You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							439 lines
						
					
					
						
							11 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							439 lines
						
					
					
						
							11 KiB
						
					
					
				
								package tasks
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"sync"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/worker/types"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// BaseTask provides common functionality for all tasks
							 | 
						|
								type BaseTask struct {
							 | 
						|
									taskType          types.TaskType
							 | 
						|
									taskID            string
							 | 
						|
									progress          float64
							 | 
						|
									cancelled         bool
							 | 
						|
									mutex             sync.RWMutex
							 | 
						|
									startTime         time.Time
							 | 
						|
									estimatedDuration time.Duration
							 | 
						|
									logger            TaskLogger
							 | 
						|
									loggerConfig      TaskLoggerConfig
							 | 
						|
									progressCallback  func(float64, string) // Callback function for progress updates
							 | 
						|
									currentStage      string                // Current stage description
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// NewBaseTask creates a new base task
							 | 
						|
								func NewBaseTask(taskType types.TaskType) *BaseTask {
							 | 
						|
									return &BaseTask{
							 | 
						|
										taskType:     taskType,
							 | 
						|
										progress:     0.0,
							 | 
						|
										cancelled:    false,
							 | 
						|
										loggerConfig: DefaultTaskLoggerConfig(),
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// NewBaseTaskWithLogger creates a new base task with custom logger configuration
							 | 
						|
								func NewBaseTaskWithLogger(taskType types.TaskType, loggerConfig TaskLoggerConfig) *BaseTask {
							 | 
						|
									return &BaseTask{
							 | 
						|
										taskType:     taskType,
							 | 
						|
										progress:     0.0,
							 | 
						|
										cancelled:    false,
							 | 
						|
										loggerConfig: loggerConfig,
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// InitializeLogger initializes the task logger with task details
							 | 
						|
								func (t *BaseTask) InitializeLogger(taskID string, workerID string, params types.TaskParams) error {
							 | 
						|
									return t.InitializeTaskLogger(taskID, workerID, params)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface)
							 | 
						|
								func (t *BaseTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error {
							 | 
						|
									t.mutex.Lock()
							 | 
						|
									defer t.mutex.Unlock()
							 | 
						|
								
							 | 
						|
									t.taskID = taskID
							 | 
						|
								
							 | 
						|
									logger, err := NewTaskLogger(taskID, t.taskType, workerID, params, t.loggerConfig)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to initialize task logger: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t.logger = logger
							 | 
						|
									t.logger.Info("BaseTask initialized for task %s (type: %s)", taskID, t.taskType)
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Type returns the task type
							 | 
						|
								func (t *BaseTask) Type() types.TaskType {
							 | 
						|
									return t.taskType
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetProgress returns the current progress (0.0 to 100.0)
							 | 
						|
								func (t *BaseTask) GetProgress() float64 {
							 | 
						|
									t.mutex.RLock()
							 | 
						|
									defer t.mutex.RUnlock()
							 | 
						|
									return t.progress
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SetProgress sets the current progress and logs it
							 | 
						|
								func (t *BaseTask) SetProgress(progress float64) {
							 | 
						|
									t.mutex.Lock()
							 | 
						|
									if progress < 0 {
							 | 
						|
										progress = 0
							 | 
						|
									}
							 | 
						|
									if progress > 100 {
							 | 
						|
										progress = 100
							 | 
						|
									}
							 | 
						|
									oldProgress := t.progress
							 | 
						|
									callback := t.progressCallback
							 | 
						|
									stage := t.currentStage
							 | 
						|
									t.progress = progress
							 | 
						|
									t.mutex.Unlock()
							 | 
						|
								
							 | 
						|
									// Log progress change
							 | 
						|
									if t.logger != nil && progress != oldProgress {
							 | 
						|
										message := stage
							 | 
						|
										if message == "" {
							 | 
						|
											message = fmt.Sprintf("Progress updated from %.1f%% to %.1f%%", oldProgress, progress)
							 | 
						|
										}
							 | 
						|
										t.logger.LogProgress(progress, message)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Call progress callback if set
							 | 
						|
									if callback != nil && progress != oldProgress {
							 | 
						|
										callback(progress, stage)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SetProgressWithStage sets the current progress with a stage description
							 | 
						|
								func (t *BaseTask) SetProgressWithStage(progress float64, stage string) {
							 | 
						|
									t.mutex.Lock()
							 | 
						|
									if progress < 0 {
							 | 
						|
										progress = 0
							 | 
						|
									}
							 | 
						|
									if progress > 100 {
							 | 
						|
										progress = 100
							 | 
						|
									}
							 | 
						|
									callback := t.progressCallback
							 | 
						|
									t.progress = progress
							 | 
						|
									t.currentStage = stage
							 | 
						|
									t.mutex.Unlock()
							 | 
						|
								
							 | 
						|
									// Log progress change
							 | 
						|
									if t.logger != nil {
							 | 
						|
										t.logger.LogProgress(progress, stage)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Call progress callback if set
							 | 
						|
									if callback != nil {
							 | 
						|
										callback(progress, stage)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SetCurrentStage sets the current stage description
							 | 
						|
								func (t *BaseTask) SetCurrentStage(stage string) {
							 | 
						|
									t.mutex.Lock()
							 | 
						|
									defer t.mutex.Unlock()
							 | 
						|
									t.currentStage = stage
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetCurrentStage returns the current stage description
							 | 
						|
								func (t *BaseTask) GetCurrentStage() string {
							 | 
						|
									t.mutex.RLock()
							 | 
						|
									defer t.mutex.RUnlock()
							 | 
						|
									return t.currentStage
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Cancel cancels the task
							 | 
						|
								func (t *BaseTask) Cancel() error {
							 | 
						|
									t.mutex.Lock()
							 | 
						|
									defer t.mutex.Unlock()
							 | 
						|
								
							 | 
						|
									if t.cancelled {
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t.cancelled = true
							 | 
						|
								
							 | 
						|
									if t.logger != nil {
							 | 
						|
										t.logger.LogStatus("cancelled", "Task cancelled by request")
							 | 
						|
										t.logger.Warning("Task %s was cancelled", t.taskID)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// IsCancelled returns whether the task is cancelled
							 | 
						|
								func (t *BaseTask) IsCancelled() bool {
							 | 
						|
									t.mutex.RLock()
							 | 
						|
									defer t.mutex.RUnlock()
							 | 
						|
									return t.cancelled
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SetStartTime sets the task start time
							 | 
						|
								func (t *BaseTask) SetStartTime(startTime time.Time) {
							 | 
						|
									t.mutex.Lock()
							 | 
						|
									defer t.mutex.Unlock()
							 | 
						|
									t.startTime = startTime
							 | 
						|
								
							 | 
						|
									if t.logger != nil {
							 | 
						|
										t.logger.LogStatus("running", fmt.Sprintf("Task started at %s", startTime.Format(time.RFC3339)))
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetStartTime returns the task start time
							 | 
						|
								func (t *BaseTask) GetStartTime() time.Time {
							 | 
						|
									t.mutex.RLock()
							 | 
						|
									defer t.mutex.RUnlock()
							 | 
						|
									return t.startTime
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SetEstimatedDuration sets the estimated duration
							 | 
						|
								func (t *BaseTask) SetEstimatedDuration(duration time.Duration) {
							 | 
						|
									t.mutex.Lock()
							 | 
						|
									defer t.mutex.Unlock()
							 | 
						|
									t.estimatedDuration = duration
							 | 
						|
								
							 | 
						|
									if t.logger != nil {
							 | 
						|
										t.logger.LogWithFields("INFO", "Estimated duration set", map[string]interface{}{
							 | 
						|
											"estimated_duration": duration.String(),
							 | 
						|
											"estimated_seconds":  duration.Seconds(),
							 | 
						|
										})
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetEstimatedDuration returns the estimated duration
							 | 
						|
								func (t *BaseTask) GetEstimatedDuration() time.Duration {
							 | 
						|
									t.mutex.RLock()
							 | 
						|
									defer t.mutex.RUnlock()
							 | 
						|
									return t.estimatedDuration
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SetProgressCallback sets the progress callback function
							 | 
						|
								func (t *BaseTask) SetProgressCallback(callback func(float64, string)) {
							 | 
						|
									t.mutex.Lock()
							 | 
						|
									defer t.mutex.Unlock()
							 | 
						|
									t.progressCallback = callback
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SetLoggerConfig sets the logger configuration for this task
							 | 
						|
								func (t *BaseTask) SetLoggerConfig(config TaskLoggerConfig) {
							 | 
						|
									t.mutex.Lock()
							 | 
						|
									defer t.mutex.Unlock()
							 | 
						|
									t.loggerConfig = config
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetLogger returns the task logger
							 | 
						|
								func (t *BaseTask) GetLogger() TaskLogger {
							 | 
						|
									t.mutex.RLock()
							 | 
						|
									defer t.mutex.RUnlock()
							 | 
						|
									return t.logger
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetTaskLogger returns the task logger (LoggerProvider interface)
							 | 
						|
								func (t *BaseTask) GetTaskLogger() TaskLogger {
							 | 
						|
									t.mutex.RLock()
							 | 
						|
									defer t.mutex.RUnlock()
							 | 
						|
									return t.logger
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// LogInfo logs an info message
							 | 
						|
								func (t *BaseTask) LogInfo(message string, args ...interface{}) {
							 | 
						|
									if t.logger != nil {
							 | 
						|
										t.logger.Info(message, args...)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// LogWarning logs a warning message
							 | 
						|
								func (t *BaseTask) LogWarning(message string, args ...interface{}) {
							 | 
						|
									if t.logger != nil {
							 | 
						|
										t.logger.Warning(message, args...)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// LogError logs an error message
							 | 
						|
								func (t *BaseTask) LogError(message string, args ...interface{}) {
							 | 
						|
									if t.logger != nil {
							 | 
						|
										t.logger.Error(message, args...)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// LogDebug logs a debug message
							 | 
						|
								func (t *BaseTask) LogDebug(message string, args ...interface{}) {
							 | 
						|
									if t.logger != nil {
							 | 
						|
										t.logger.Debug(message, args...)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// LogWithFields logs a message with structured fields
							 | 
						|
								func (t *BaseTask) LogWithFields(level string, message string, fields map[string]interface{}) {
							 | 
						|
									if t.logger != nil {
							 | 
						|
										t.logger.LogWithFields(level, message, fields)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// FinishTask finalizes the task and closes the logger
							 | 
						|
								func (t *BaseTask) FinishTask(success bool, errorMsg string) error {
							 | 
						|
									if t.logger != nil {
							 | 
						|
										if success {
							 | 
						|
											t.logger.LogStatus("completed", "Task completed successfully")
							 | 
						|
											t.logger.Info("Task %s finished successfully", t.taskID)
							 | 
						|
										} else {
							 | 
						|
											t.logger.LogStatus("failed", fmt.Sprintf("Task failed: %s", errorMsg))
							 | 
						|
											t.logger.Error("Task %s failed: %s", t.taskID, errorMsg)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Close logger
							 | 
						|
										if err := t.logger.Close(); err != nil {
							 | 
						|
											glog.Errorf("Failed to close task logger: %v", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ExecuteTask is a wrapper that handles common task execution logic with logging
							 | 
						|
								func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error {
							 | 
						|
									// Initialize logger if not already done
							 | 
						|
									if t.logger == nil {
							 | 
						|
										// Generate a temporary task ID if none provided
							 | 
						|
										if t.taskID == "" {
							 | 
						|
											t.taskID = fmt.Sprintf("task_%d", time.Now().UnixNano())
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										workerID := "unknown"
							 | 
						|
										if err := t.InitializeLogger(t.taskID, workerID, params); err != nil {
							 | 
						|
											glog.Warningf("Failed to initialize task logger: %v", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t.SetStartTime(time.Now())
							 | 
						|
									t.SetProgress(0)
							 | 
						|
								
							 | 
						|
									if t.logger != nil {
							 | 
						|
										t.logger.LogWithFields("INFO", "Task execution started", map[string]interface{}{
							 | 
						|
											"volume_id":  params.VolumeID,
							 | 
						|
											"server":     getServerFromSources(params.TypedParams.Sources),
							 | 
						|
											"collection": params.Collection,
							 | 
						|
										})
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create a context that can be cancelled
							 | 
						|
									ctx, cancel := context.WithCancel(ctx)
							 | 
						|
									defer cancel()
							 | 
						|
								
							 | 
						|
									// Monitor for cancellation
							 | 
						|
									go func() {
							 | 
						|
										for !t.IsCancelled() {
							 | 
						|
											select {
							 | 
						|
											case <-ctx.Done():
							 | 
						|
												return
							 | 
						|
											case <-time.After(time.Second):
							 | 
						|
												// Check cancellation every second
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
										t.LogWarning("Task cancellation detected, cancelling context")
							 | 
						|
										cancel()
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Execute the actual task
							 | 
						|
									t.LogInfo("Starting task executor")
							 | 
						|
									err := executor(ctx, params)
							 | 
						|
								
							 | 
						|
									if err != nil {
							 | 
						|
										t.LogError("Task executor failed: %v", err)
							 | 
						|
										t.FinishTask(false, err.Error())
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if t.IsCancelled() {
							 | 
						|
										t.LogWarning("Task was cancelled during execution")
							 | 
						|
										t.FinishTask(false, "cancelled")
							 | 
						|
										return context.Canceled
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t.SetProgress(100)
							 | 
						|
									t.LogInfo("Task executor completed successfully")
							 | 
						|
									t.FinishTask(true, "")
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// UnsupportedTaskTypeError represents an error for unsupported task types
							 | 
						|
								type UnsupportedTaskTypeError struct {
							 | 
						|
									TaskType types.TaskType
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (e *UnsupportedTaskTypeError) Error() string {
							 | 
						|
									return "unsupported task type: " + string(e.TaskType)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// BaseTaskFactory provides common functionality for task factories
							 | 
						|
								type BaseTaskFactory struct {
							 | 
						|
									taskType     types.TaskType
							 | 
						|
									capabilities []string
							 | 
						|
									description  string
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// NewBaseTaskFactory creates a new base task factory
							 | 
						|
								func NewBaseTaskFactory(taskType types.TaskType, capabilities []string, description string) *BaseTaskFactory {
							 | 
						|
									return &BaseTaskFactory{
							 | 
						|
										taskType:     taskType,
							 | 
						|
										capabilities: capabilities,
							 | 
						|
										description:  description,
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Capabilities returns the capabilities required for this task type
							 | 
						|
								func (f *BaseTaskFactory) Capabilities() []string {
							 | 
						|
									return f.capabilities
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Description returns the description of this task type
							 | 
						|
								func (f *BaseTaskFactory) Description() string {
							 | 
						|
									return f.description
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ValidateParams validates task parameters
							 | 
						|
								func ValidateParams(params types.TaskParams, requiredFields ...string) error {
							 | 
						|
									for _, field := range requiredFields {
							 | 
						|
										switch field {
							 | 
						|
										case "volume_id":
							 | 
						|
											if params.VolumeID == 0 {
							 | 
						|
												return &ValidationError{Field: field, Message: "volume_id is required"}
							 | 
						|
											}
							 | 
						|
										case "server":
							 | 
						|
											if len(params.TypedParams.Sources) == 0 {
							 | 
						|
												return &ValidationError{Field: field, Message: "server is required"}
							 | 
						|
											}
							 | 
						|
										case "collection":
							 | 
						|
											if params.Collection == "" {
							 | 
						|
												return &ValidationError{Field: field, Message: "collection is required"}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ValidationError represents a parameter validation error
							 | 
						|
								type ValidationError struct {
							 | 
						|
									Field   string
							 | 
						|
									Message string
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (e *ValidationError) Error() string {
							 | 
						|
									return e.Field + ": " + e.Message
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// getServerFromSources extracts the server address from unified sources
							 | 
						|
								func getServerFromSources(sources []*worker_pb.TaskSource) string {
							 | 
						|
									if len(sources) > 0 {
							 | 
						|
										return sources[0].Node
							 | 
						|
									}
							 | 
						|
									return ""
							 | 
						|
								}
							 |