You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
357 lines
8.8 KiB
357 lines
8.8 KiB
package tasks
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
)
|
|
|
|
// BaseTask provides common functionality for all tasks
|
|
type BaseTask struct {
|
|
taskType types.TaskType
|
|
progress float64
|
|
cancelled bool
|
|
mutex sync.RWMutex
|
|
startTime time.Time
|
|
estimatedDuration time.Duration
|
|
|
|
// Logging functionality
|
|
logFile *os.File
|
|
logger *log.Logger
|
|
logFilePath string
|
|
}
|
|
|
|
// NewBaseTask creates a new base task
|
|
func NewBaseTask(taskType types.TaskType) *BaseTask {
|
|
return &BaseTask{
|
|
taskType: taskType,
|
|
progress: 0.0,
|
|
cancelled: false,
|
|
}
|
|
}
|
|
|
|
// Type returns the task type
|
|
func (t *BaseTask) Type() types.TaskType {
|
|
return t.taskType
|
|
}
|
|
|
|
// GetProgress returns the current progress (0.0 to 100.0)
|
|
func (t *BaseTask) GetProgress() float64 {
|
|
t.mutex.RLock()
|
|
defer t.mutex.RUnlock()
|
|
return t.progress
|
|
}
|
|
|
|
// SetProgress sets the current progress
|
|
func (t *BaseTask) SetProgress(progress float64) {
|
|
t.mutex.Lock()
|
|
defer t.mutex.Unlock()
|
|
if progress < 0 {
|
|
progress = 0
|
|
}
|
|
if progress > 100 {
|
|
progress = 100
|
|
}
|
|
t.progress = progress
|
|
}
|
|
|
|
// Cancel cancels the task
|
|
func (t *BaseTask) Cancel() error {
|
|
t.mutex.Lock()
|
|
defer t.mutex.Unlock()
|
|
t.cancelled = true
|
|
return nil
|
|
}
|
|
|
|
// IsCancelled returns whether the task is cancelled
|
|
func (t *BaseTask) IsCancelled() bool {
|
|
t.mutex.RLock()
|
|
defer t.mutex.RUnlock()
|
|
return t.cancelled
|
|
}
|
|
|
|
// SetStartTime sets the task start time
|
|
func (t *BaseTask) SetStartTime(startTime time.Time) {
|
|
t.mutex.Lock()
|
|
defer t.mutex.Unlock()
|
|
t.startTime = startTime
|
|
}
|
|
|
|
// GetStartTime returns the task start time
|
|
func (t *BaseTask) GetStartTime() time.Time {
|
|
t.mutex.RLock()
|
|
defer t.mutex.RUnlock()
|
|
return t.startTime
|
|
}
|
|
|
|
// SetEstimatedDuration sets the estimated duration
|
|
func (t *BaseTask) SetEstimatedDuration(duration time.Duration) {
|
|
t.mutex.Lock()
|
|
defer t.mutex.Unlock()
|
|
t.estimatedDuration = duration
|
|
}
|
|
|
|
// GetEstimatedDuration returns the estimated duration
|
|
func (t *BaseTask) GetEstimatedDuration() time.Duration {
|
|
t.mutex.RLock()
|
|
defer t.mutex.RUnlock()
|
|
return t.estimatedDuration
|
|
}
|
|
|
|
// InitializeTaskLogging sets up task-specific logging - can be called by worker or tasks
|
|
func (t *BaseTask) InitializeTaskLogging(workingDir, taskID string) error {
|
|
return t.initializeLogging(workingDir, taskID)
|
|
}
|
|
|
|
// CloseTaskLogging properly closes task logging - can be called by worker or tasks
|
|
func (t *BaseTask) CloseTaskLogging() {
|
|
t.closeLogging()
|
|
}
|
|
|
|
// ExecuteTask is a wrapper that handles common task execution logic
|
|
func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error {
|
|
t.SetStartTime(time.Now())
|
|
t.SetProgress(0)
|
|
|
|
// Create a context that can be cancelled
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// Monitor for cancellation
|
|
go func() {
|
|
for !t.IsCancelled() {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(time.Second):
|
|
// Check cancellation every second
|
|
}
|
|
}
|
|
cancel()
|
|
}()
|
|
|
|
// Execute the actual task
|
|
err := executor(ctx, params)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if t.IsCancelled() {
|
|
return context.Canceled
|
|
}
|
|
|
|
t.SetProgress(100)
|
|
return nil
|
|
}
|
|
|
|
// initializeLogging sets up task-specific logging to a file in the working directory
|
|
func (t *BaseTask) initializeLogging(workingDir, taskID string) error {
|
|
if workingDir == "" {
|
|
// If no working directory specified, skip file logging
|
|
return nil
|
|
}
|
|
|
|
// Ensure working directory exists
|
|
if err := os.MkdirAll(workingDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create working directory %s: %v", workingDir, err)
|
|
}
|
|
|
|
// Create task-specific log file
|
|
timestamp := time.Now().Format("20060102_150405")
|
|
logFileName := fmt.Sprintf("%s_%s_%s.log", t.taskType, taskID, timestamp)
|
|
t.logFilePath = filepath.Join(workingDir, logFileName)
|
|
|
|
logFile, err := os.OpenFile(t.logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create log file %s: %v", t.logFilePath, err)
|
|
}
|
|
|
|
t.logFile = logFile
|
|
t.logger = log.New(logFile, "", log.LstdFlags|log.Lmicroseconds)
|
|
|
|
// Log task initialization
|
|
t.LogInfo("Task %s initialized for %s", taskID, t.taskType)
|
|
|
|
return nil
|
|
}
|
|
|
|
// closeLogging properly closes the log file
|
|
func (t *BaseTask) closeLogging() {
|
|
if t.logFile != nil {
|
|
t.LogInfo("Task completed, closing log file")
|
|
t.logFile.Close()
|
|
t.logFile = nil
|
|
t.logger = nil
|
|
}
|
|
}
|
|
|
|
// LogInfo writes an info-level log message to both glog and task log file
|
|
func (t *BaseTask) LogInfo(format string, args ...interface{}) {
|
|
message := fmt.Sprintf(format, args...)
|
|
|
|
// Always log to task file if available
|
|
if t.logger != nil {
|
|
t.logger.Printf("[INFO] %s", message)
|
|
}
|
|
}
|
|
|
|
// LogError writes an error-level log message to both glog and task log file
|
|
func (t *BaseTask) LogError(format string, args ...interface{}) {
|
|
message := fmt.Sprintf(format, args...)
|
|
|
|
// Always log to task file if available
|
|
if t.logger != nil {
|
|
t.logger.Printf("[ERROR] %s", message)
|
|
}
|
|
}
|
|
|
|
// LogDebug writes a debug-level log message to task log file
|
|
func (t *BaseTask) LogDebug(format string, args ...interface{}) {
|
|
message := fmt.Sprintf(format, args...)
|
|
|
|
// Always log to task file if available
|
|
if t.logger != nil {
|
|
t.logger.Printf("[DEBUG] %s", message)
|
|
}
|
|
}
|
|
|
|
// LogWarning writes a warning-level log message to both glog and task log file
|
|
func (t *BaseTask) LogWarning(format string, args ...interface{}) {
|
|
message := fmt.Sprintf(format, args...)
|
|
|
|
// Always log to task file if available
|
|
if t.logger != nil {
|
|
t.logger.Printf("[WARNING] %s", message)
|
|
}
|
|
}
|
|
|
|
// GetLogFilePath returns the path to the task's log file
|
|
func (t *BaseTask) GetLogFilePath() string {
|
|
return t.logFilePath
|
|
}
|
|
|
|
// TaskRegistry manages task factories
|
|
type TaskRegistry struct {
|
|
factories map[types.TaskType]types.TaskFactory
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewTaskRegistry creates a new task registry
|
|
func NewTaskRegistry() *TaskRegistry {
|
|
return &TaskRegistry{
|
|
factories: make(map[types.TaskType]types.TaskFactory),
|
|
}
|
|
}
|
|
|
|
// Register registers a task factory
|
|
func (r *TaskRegistry) Register(taskType types.TaskType, factory types.TaskFactory) {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
r.factories[taskType] = factory
|
|
}
|
|
|
|
// CreateTask creates a task instance
|
|
func (r *TaskRegistry) CreateTask(taskType types.TaskType, params types.TaskParams) (types.TaskInterface, error) {
|
|
r.mutex.RLock()
|
|
factory, exists := r.factories[taskType]
|
|
r.mutex.RUnlock()
|
|
|
|
if !exists {
|
|
return nil, &UnsupportedTaskTypeError{TaskType: taskType}
|
|
}
|
|
|
|
return factory.Create(params)
|
|
}
|
|
|
|
// GetSupportedTypes returns all supported task types
|
|
func (r *TaskRegistry) GetSupportedTypes() []types.TaskType {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
types := make([]types.TaskType, 0, len(r.factories))
|
|
for taskType := range r.factories {
|
|
types = append(types, taskType)
|
|
}
|
|
return types
|
|
}
|
|
|
|
// GetFactory returns the factory for a task type
|
|
func (r *TaskRegistry) GetFactory(taskType types.TaskType) (types.TaskFactory, bool) {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
factory, exists := r.factories[taskType]
|
|
return factory, exists
|
|
}
|
|
|
|
// UnsupportedTaskTypeError represents an error for unsupported task types
|
|
type UnsupportedTaskTypeError struct {
|
|
TaskType types.TaskType
|
|
}
|
|
|
|
func (e *UnsupportedTaskTypeError) Error() string {
|
|
return "unsupported task type: " + string(e.TaskType)
|
|
}
|
|
|
|
// BaseTaskFactory provides common functionality for task factories
|
|
type BaseTaskFactory struct {
|
|
taskType types.TaskType
|
|
capabilities []string
|
|
description string
|
|
}
|
|
|
|
// NewBaseTaskFactory creates a new base task factory
|
|
func NewBaseTaskFactory(taskType types.TaskType, capabilities []string, description string) *BaseTaskFactory {
|
|
return &BaseTaskFactory{
|
|
taskType: taskType,
|
|
capabilities: capabilities,
|
|
description: description,
|
|
}
|
|
}
|
|
|
|
// Capabilities returns the capabilities required for this task type
|
|
func (f *BaseTaskFactory) Capabilities() []string {
|
|
return f.capabilities
|
|
}
|
|
|
|
// Description returns the description of this task type
|
|
func (f *BaseTaskFactory) Description() string {
|
|
return f.description
|
|
}
|
|
|
|
// ValidateParams validates task parameters
|
|
func ValidateParams(params types.TaskParams, requiredFields ...string) error {
|
|
for _, field := range requiredFields {
|
|
switch field {
|
|
case "volume_id":
|
|
if params.VolumeID == 0 {
|
|
return &ValidationError{Field: field, Message: "volume_id is required"}
|
|
}
|
|
case "server":
|
|
if params.Server == "" {
|
|
return &ValidationError{Field: field, Message: "server is required"}
|
|
}
|
|
case "collection":
|
|
if params.Collection == "" {
|
|
return &ValidationError{Field: field, Message: "collection is required"}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ValidationError represents a parameter validation error
|
|
type ValidationError struct {
|
|
Field string
|
|
Message string
|
|
}
|
|
|
|
func (e *ValidationError) Error() string {
|
|
return e.Field + ": " + e.Message
|
|
}
|