diff --git a/weed/worker/tasks/balance/balance.go b/weed/worker/tasks/balance/balance.go index ea867d950..657c32df4 100644 --- a/weed/worker/tasks/balance/balance.go +++ b/weed/worker/tasks/balance/balance.go @@ -30,6 +30,7 @@ func NewTask(server string, volumeID uint32, collection string) *Task { // Execute executes the balance task func (t *Task) Execute(params types.TaskParams) error { + t.LogInfo("Starting balance task for volume %d on server %s (collection: %s)", t.volumeID, t.server, t.collection) glog.Infof("Starting balance task for volume %d on server %s (collection: %s)", t.volumeID, t.server, t.collection) // Simulate balance operation with progress updates @@ -45,18 +46,23 @@ func (t *Task) Execute(params types.TaskParams) error { {"Verifying balance", 1 * time.Second, 100}, } - for _, step := range steps { + for i, step := range steps { if t.IsCancelled() { + t.LogWarning("Balance task cancelled at step %d: %s", i+1, step.name) return fmt.Errorf("balance task cancelled") } + t.LogInfo("Starting step %d/%d: %s", i+1, len(steps), step.name) glog.V(1).Infof("Balance task step: %s", step.name) t.SetProgress(step.progress) // Simulate work time.Sleep(step.duration) + + t.LogDebug("Completed step %d/%d: %s (progress: %.1f%%)", i+1, len(steps), step.name, step.progress) } + t.LogInfo("Balance task completed successfully for volume %d on server %s", t.volumeID, t.server) glog.Infof("Balance task completed for volume %d on server %s", t.volumeID, t.server) return nil } diff --git a/weed/worker/tasks/task.go b/weed/worker/tasks/task.go index 482233f60..ceeb5f413 100644 --- a/weed/worker/tasks/task.go +++ b/weed/worker/tasks/task.go @@ -2,6 +2,10 @@ package tasks import ( "context" + "fmt" + "log" + "os" + "path/filepath" "sync" "time" @@ -16,6 +20,11 @@ type BaseTask struct { mutex sync.RWMutex startTime time.Time estimatedDuration time.Duration + + // Logging functionality + logFile *os.File + logger *log.Logger + logFilePath string } // NewBaseTask creates a new base task @@ -95,6 +104,16 @@ func (t *BaseTask) GetEstimatedDuration() time.Duration { return t.estimatedDuration } +// InitializeTaskLogging sets up task-specific logging - can be called by worker or tasks +func (t *BaseTask) InitializeTaskLogging(workingDir, taskID string) error { + return t.initializeLogging(workingDir, taskID) +} + +// CloseTaskLogging properly closes task logging - can be called by worker or tasks +func (t *BaseTask) CloseTaskLogging() { + t.closeLogging() +} + // ExecuteTask is a wrapper that handles common task execution logic func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error { t.SetStartTime(time.Now()) @@ -132,6 +151,92 @@ func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, exe return nil } +// initializeLogging sets up task-specific logging to a file in the working directory +func (t *BaseTask) initializeLogging(workingDir, taskID string) error { + if workingDir == "" { + // If no working directory specified, skip file logging + return nil + } + + // Ensure working directory exists + if err := os.MkdirAll(workingDir, 0755); err != nil { + return fmt.Errorf("failed to create working directory %s: %v", workingDir, err) + } + + // Create task-specific log file + timestamp := time.Now().Format("20060102_150405") + logFileName := fmt.Sprintf("%s_%s_%s.log", t.taskType, taskID, timestamp) + t.logFilePath = filepath.Join(workingDir, logFileName) + + logFile, err := os.OpenFile(t.logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return fmt.Errorf("failed to create log file %s: %v", t.logFilePath, err) + } + + t.logFile = logFile + t.logger = log.New(logFile, "", log.LstdFlags|log.Lmicroseconds) + + // Log task initialization + t.LogInfo("Task %s initialized for %s", taskID, t.taskType) + + return nil +} + +// closeLogging properly closes the log file +func (t *BaseTask) closeLogging() { + if t.logFile != nil { + t.LogInfo("Task completed, closing log file") + t.logFile.Close() + t.logFile = nil + t.logger = nil + } +} + +// LogInfo writes an info-level log message to both glog and task log file +func (t *BaseTask) LogInfo(format string, args ...interface{}) { + message := fmt.Sprintf(format, args...) + + // Always log to task file if available + if t.logger != nil { + t.logger.Printf("[INFO] %s", message) + } +} + +// LogError writes an error-level log message to both glog and task log file +func (t *BaseTask) LogError(format string, args ...interface{}) { + message := fmt.Sprintf(format, args...) + + // Always log to task file if available + if t.logger != nil { + t.logger.Printf("[ERROR] %s", message) + } +} + +// LogDebug writes a debug-level log message to task log file +func (t *BaseTask) LogDebug(format string, args ...interface{}) { + message := fmt.Sprintf(format, args...) + + // Always log to task file if available + if t.logger != nil { + t.logger.Printf("[DEBUG] %s", message) + } +} + +// LogWarning writes a warning-level log message to both glog and task log file +func (t *BaseTask) LogWarning(format string, args ...interface{}) { + message := fmt.Sprintf(format, args...) + + // Always log to task file if available + if t.logger != nil { + t.logger.Printf("[WARNING] %s", message) + } +} + +// GetLogFilePath returns the path to the task's log file +func (t *BaseTask) GetLogFilePath() string { + return t.logFilePath +} + // TaskRegistry manages task factories type TaskRegistry struct { factories map[types.TaskType]types.TaskFactory diff --git a/weed/worker/types/worker_types.go b/weed/worker/types/worker_types.go index b9b13e6c9..e197fca3b 100644 --- a/weed/worker/types/worker_types.go +++ b/weed/worker/types/worker_types.go @@ -108,4 +108,13 @@ type TaskInterface interface { EstimateTime(params TaskParams) time.Duration GetProgress() float64 Cancel() error + + // Logging methods for task-specific log files + InitializeTaskLogging(workingDir, taskID string) error + CloseTaskLogging() + LogInfo(format string, args ...interface{}) + LogError(format string, args ...interface{}) + LogDebug(format string, args ...interface{}) + LogWarning(format string, args ...interface{}) + GetLogFilePath() string } diff --git a/weed/worker/worker.go b/weed/worker/worker.go index 8d869b2bd..3ac08a0fa 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -366,15 +366,29 @@ func (w *Worker) executeTask(task *types.Task) { return } + // Initialize task logging + if err := taskInstance.InitializeTaskLogging(taskWorkingDir, task.ID); err != nil { + glog.Warningf("Failed to initialize task logging for %s: %v", task.ID, err) + } else { + // Ensure logging is closed when task completes + defer taskInstance.CloseTaskLogging() + taskInstance.LogInfo("Worker %s starting execution of task %s (type: %s)", w.id, task.ID, task.Type) + if task.VolumeID != 0 { + taskInstance.LogInfo("Task parameters: VolumeID=%d, Server=%s, Collection=%s", task.VolumeID, task.Server, task.Collection) + } + } + // Execute task err = taskInstance.Execute(taskParams) // Report completion if err != nil { + taskInstance.LogError("Task execution failed: %v", err) w.completeTask(task.ID, false, err.Error()) w.tasksFailed++ glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err) } else { + taskInstance.LogInfo("Task completed successfully") w.completeTask(task.ID, true, "") w.tasksCompleted++ glog.Infof("Worker %s completed task %s successfully", w.id, task.ID)