You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							252 lines
						
					
					
						
							5.8 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							252 lines
						
					
					
						
							5.8 KiB
						
					
					
				| package tasks | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"sync" | |
| 	"time" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/worker/types" | |
| ) | |
| 
 | |
| // BaseTask provides common functionality for all tasks | |
| type BaseTask struct { | |
| 	taskType          types.TaskType | |
| 	progress          float64 | |
| 	cancelled         bool | |
| 	mutex             sync.RWMutex | |
| 	startTime         time.Time | |
| 	estimatedDuration time.Duration | |
| } | |
| 
 | |
| // NewBaseTask creates a new base task | |
| func NewBaseTask(taskType types.TaskType) *BaseTask { | |
| 	return &BaseTask{ | |
| 		taskType:  taskType, | |
| 		progress:  0.0, | |
| 		cancelled: false, | |
| 	} | |
| } | |
| 
 | |
| // Type returns the task type | |
| func (t *BaseTask) Type() types.TaskType { | |
| 	return t.taskType | |
| } | |
| 
 | |
| // GetProgress returns the current progress (0.0 to 100.0) | |
| func (t *BaseTask) GetProgress() float64 { | |
| 	t.mutex.RLock() | |
| 	defer t.mutex.RUnlock() | |
| 	return t.progress | |
| } | |
| 
 | |
| // SetProgress sets the current progress | |
| func (t *BaseTask) SetProgress(progress float64) { | |
| 	t.mutex.Lock() | |
| 	defer t.mutex.Unlock() | |
| 	if progress < 0 { | |
| 		progress = 0 | |
| 	} | |
| 	if progress > 100 { | |
| 		progress = 100 | |
| 	} | |
| 	t.progress = progress | |
| } | |
| 
 | |
| // Cancel cancels the task | |
| func (t *BaseTask) Cancel() error { | |
| 	t.mutex.Lock() | |
| 	defer t.mutex.Unlock() | |
| 	t.cancelled = true | |
| 	return nil | |
| } | |
| 
 | |
| // IsCancelled returns whether the task is cancelled | |
| func (t *BaseTask) IsCancelled() bool { | |
| 	t.mutex.RLock() | |
| 	defer t.mutex.RUnlock() | |
| 	return t.cancelled | |
| } | |
| 
 | |
| // SetStartTime sets the task start time | |
| func (t *BaseTask) SetStartTime(startTime time.Time) { | |
| 	t.mutex.Lock() | |
| 	defer t.mutex.Unlock() | |
| 	t.startTime = startTime | |
| } | |
| 
 | |
| // GetStartTime returns the task start time | |
| func (t *BaseTask) GetStartTime() time.Time { | |
| 	t.mutex.RLock() | |
| 	defer t.mutex.RUnlock() | |
| 	return t.startTime | |
| } | |
| 
 | |
| // SetEstimatedDuration sets the estimated duration | |
| func (t *BaseTask) SetEstimatedDuration(duration time.Duration) { | |
| 	t.mutex.Lock() | |
| 	defer t.mutex.Unlock() | |
| 	t.estimatedDuration = duration | |
| } | |
| 
 | |
| // GetEstimatedDuration returns the estimated duration | |
| func (t *BaseTask) GetEstimatedDuration() time.Duration { | |
| 	t.mutex.RLock() | |
| 	defer t.mutex.RUnlock() | |
| 	return t.estimatedDuration | |
| } | |
| 
 | |
| // ExecuteTask is a wrapper that handles common task execution logic | |
| func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error { | |
| 	t.SetStartTime(time.Now()) | |
| 	t.SetProgress(0) | |
| 
 | |
| 	// Create a context that can be cancelled | |
| 	ctx, cancel := context.WithCancel(ctx) | |
| 	defer cancel() | |
| 
 | |
| 	// Monitor for cancellation | |
| 	go func() { | |
| 		for !t.IsCancelled() { | |
| 			select { | |
| 			case <-ctx.Done(): | |
| 				return | |
| 			case <-time.After(time.Second): | |
| 				// Check cancellation every second | |
| 			} | |
| 		} | |
| 		cancel() | |
| 	}() | |
| 
 | |
| 	// Execute the actual task | |
| 	err := executor(ctx, params) | |
| 
 | |
| 	if err != nil { | |
| 		return err | |
| 	} | |
| 
 | |
| 	if t.IsCancelled() { | |
| 		return context.Canceled | |
| 	} | |
| 
 | |
| 	t.SetProgress(100) | |
| 	return nil | |
| } | |
| 
 | |
| // TaskRegistry manages task factories | |
| type TaskRegistry struct { | |
| 	factories map[types.TaskType]types.TaskFactory | |
| 	mutex     sync.RWMutex | |
| } | |
| 
 | |
| // NewTaskRegistry creates a new task registry | |
| func NewTaskRegistry() *TaskRegistry { | |
| 	return &TaskRegistry{ | |
| 		factories: make(map[types.TaskType]types.TaskFactory), | |
| 	} | |
| } | |
| 
 | |
| // Register registers a task factory | |
| func (r *TaskRegistry) Register(taskType types.TaskType, factory types.TaskFactory) { | |
| 	r.mutex.Lock() | |
| 	defer r.mutex.Unlock() | |
| 	r.factories[taskType] = factory | |
| } | |
| 
 | |
| // CreateTask creates a task instance | |
| func (r *TaskRegistry) CreateTask(taskType types.TaskType, params types.TaskParams) (types.TaskInterface, error) { | |
| 	r.mutex.RLock() | |
| 	factory, exists := r.factories[taskType] | |
| 	r.mutex.RUnlock() | |
| 
 | |
| 	if !exists { | |
| 		return nil, &UnsupportedTaskTypeError{TaskType: taskType} | |
| 	} | |
| 
 | |
| 	return factory.Create(params) | |
| } | |
| 
 | |
| // GetSupportedTypes returns all supported task types | |
| func (r *TaskRegistry) GetSupportedTypes() []types.TaskType { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 
 | |
| 	types := make([]types.TaskType, 0, len(r.factories)) | |
| 	for taskType := range r.factories { | |
| 		types = append(types, taskType) | |
| 	} | |
| 	return types | |
| } | |
| 
 | |
| // GetFactory returns the factory for a task type | |
| func (r *TaskRegistry) GetFactory(taskType types.TaskType) (types.TaskFactory, bool) { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 	factory, exists := r.factories[taskType] | |
| 	return factory, exists | |
| } | |
| 
 | |
| // UnsupportedTaskTypeError represents an error for unsupported task types | |
| type UnsupportedTaskTypeError struct { | |
| 	TaskType types.TaskType | |
| } | |
| 
 | |
| func (e *UnsupportedTaskTypeError) Error() string { | |
| 	return "unsupported task type: " + string(e.TaskType) | |
| } | |
| 
 | |
| // BaseTaskFactory provides common functionality for task factories | |
| type BaseTaskFactory struct { | |
| 	taskType     types.TaskType | |
| 	capabilities []string | |
| 	description  string | |
| } | |
| 
 | |
| // NewBaseTaskFactory creates a new base task factory | |
| func NewBaseTaskFactory(taskType types.TaskType, capabilities []string, description string) *BaseTaskFactory { | |
| 	return &BaseTaskFactory{ | |
| 		taskType:     taskType, | |
| 		capabilities: capabilities, | |
| 		description:  description, | |
| 	} | |
| } | |
| 
 | |
| // Capabilities returns the capabilities required for this task type | |
| func (f *BaseTaskFactory) Capabilities() []string { | |
| 	return f.capabilities | |
| } | |
| 
 | |
| // Description returns the description of this task type | |
| func (f *BaseTaskFactory) Description() string { | |
| 	return f.description | |
| } | |
| 
 | |
| // ValidateParams validates task parameters | |
| func ValidateParams(params types.TaskParams, requiredFields ...string) error { | |
| 	for _, field := range requiredFields { | |
| 		switch field { | |
| 		case "volume_id": | |
| 			if params.VolumeID == 0 { | |
| 				return &ValidationError{Field: field, Message: "volume_id is required"} | |
| 			} | |
| 		case "server": | |
| 			if params.Server == "" { | |
| 				return &ValidationError{Field: field, Message: "server is required"} | |
| 			} | |
| 		case "collection": | |
| 			if params.Collection == "" { | |
| 				return &ValidationError{Field: field, Message: "collection is required"} | |
| 			} | |
| 		} | |
| 	} | |
| 	return nil | |
| } | |
| 
 | |
| // ValidationError represents a parameter validation error | |
| type ValidationError struct { | |
| 	Field   string | |
| 	Message string | |
| } | |
| 
 | |
| func (e *ValidationError) Error() string { | |
| 	return e.Field + ": " + e.Message | |
| }
 |