package worker import ( "crypto/rand" "fmt" "net" "os" "path/filepath" "strings" "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" // Import task packages to trigger their auto-registration _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" ) // Worker represents a maintenance worker instance type Worker struct { id string config *types.WorkerConfig registry *tasks.TaskRegistry currentTasks map[string]*types.Task adminClient AdminClient running bool stopChan chan struct{} mutex sync.RWMutex startTime time.Time tasksCompleted int tasksFailed int heartbeatTicker *time.Ticker requestTicker *time.Ticker } // AdminClient defines the interface for communicating with the admin server type AdminClient interface { Connect() error Disconnect() error RegisterWorker(worker *types.Worker) error SendHeartbeat(workerID string, status *types.WorkerStatus) error RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) CompleteTask(taskID string, success bool, errorMsg string) error UpdateTaskProgress(taskID string, progress float64) error IsConnected() bool } // GenerateOrLoadWorkerID generates a unique worker ID or loads existing one from working directory func GenerateOrLoadWorkerID(workingDir string) (string, error) { const workerIDFile = "worker.id" var idFilePath string if workingDir != "" { idFilePath = filepath.Join(workingDir, workerIDFile) } else { // Use current working directory if none specified wd, err := os.Getwd() if err != nil { return "", fmt.Errorf("failed to get working directory: %w", err) } idFilePath = filepath.Join(wd, workerIDFile) } // Try to read existing worker ID if data, err := os.ReadFile(idFilePath); err == nil { workerID := strings.TrimSpace(string(data)) if workerID != "" { glog.Infof("Loaded existing worker ID from %s: %s", idFilePath, workerID) return workerID, nil } } // Generate new unique worker ID with host information hostname, _ := os.Hostname() if hostname == "" { hostname = "unknown" } // Get local IP address for better host identification var hostIP string if addrs, err := net.InterfaceAddrs(); err == nil { for _, addr := range addrs { if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { if ipnet.IP.To4() != nil { hostIP = ipnet.IP.String() break } } } } if hostIP == "" { hostIP = "noip" } // Create host identifier combining hostname and IP hostID := fmt.Sprintf("%s@%s", hostname, hostIP) // Generate random component for uniqueness randomBytes := make([]byte, 4) var workerID string if _, err := rand.Read(randomBytes); err != nil { // Fallback to timestamp if crypto/rand fails workerID = fmt.Sprintf("worker-%s-%d", hostID, time.Now().Unix()) glog.Infof("Generated fallback worker ID: %s", workerID) } else { // Use random bytes + timestamp for uniqueness randomHex := fmt.Sprintf("%x", randomBytes) timestamp := time.Now().Unix() workerID = fmt.Sprintf("worker-%s-%s-%d", hostID, randomHex, timestamp) glog.Infof("Generated new worker ID: %s", workerID) } // Save worker ID to file if err := os.WriteFile(idFilePath, []byte(workerID), 0644); err != nil { glog.Warningf("Failed to save worker ID to %s: %v", idFilePath, err) } else { glog.Infof("Saved worker ID to %s", idFilePath) } return workerID, nil } // NewWorker creates a new worker instance func NewWorker(config *types.WorkerConfig) (*Worker, error) { if config == nil { config = types.DefaultWorkerConfig() } // Generate or load persistent worker ID workerID, err := GenerateOrLoadWorkerID(config.BaseWorkingDir) if err != nil { return nil, fmt.Errorf("failed to generate or load worker ID: %w", err) } // Use the global registry that already has all tasks registered registry := tasks.GetGlobalRegistry() worker := &Worker{ id: workerID, config: config, registry: registry, currentTasks: make(map[string]*types.Task), stopChan: make(chan struct{}), startTime: time.Now(), } glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetSupportedTypes())) return worker, nil } // ID returns the worker ID func (w *Worker) ID() string { return w.id } // Start starts the worker func (w *Worker) Start() error { w.mutex.Lock() defer w.mutex.Unlock() if w.running { return fmt.Errorf("worker is already running") } if w.adminClient == nil { return fmt.Errorf("admin client is not set") } w.running = true w.startTime = time.Now() // Prepare worker info for registration workerInfo := &types.Worker{ ID: w.id, Capabilities: w.config.Capabilities, MaxConcurrent: w.config.MaxConcurrent, Status: "active", CurrentLoad: 0, LastHeartbeat: time.Now(), } // Register worker info with client first (this stores it for use during connection) if err := w.adminClient.RegisterWorker(workerInfo); err != nil { glog.V(1).Infof("Worker info stored for registration: %v", err) // This is expected if not connected yet } // Start connection attempt (will register immediately if successful) glog.Infof("Worker %s starting, attempting to connect to admin server...", w.id) // Try initial connection, but don't fail if it doesn't work immediately if err := w.adminClient.Connect(); err != nil { glog.Warningf("Initial connection to admin server failed, will keep retrying: %v", err) // Don't return error - let the reconnection loop handle it } // Start worker loops regardless of initial connection status // They will handle connection failures gracefully go w.heartbeatLoop() go w.taskRequestLoop() go w.connectionMonitorLoop() glog.Infof("Worker %s started (connection attempts will continue in background)", w.id) return nil } // Stop stops the worker func (w *Worker) Stop() error { w.mutex.Lock() defer w.mutex.Unlock() if !w.running { return nil } w.running = false close(w.stopChan) // Stop tickers if w.heartbeatTicker != nil { w.heartbeatTicker.Stop() } if w.requestTicker != nil { w.requestTicker.Stop() } // Wait for current tasks to complete or timeout timeout := time.NewTimer(30 * time.Second) defer timeout.Stop() for len(w.currentTasks) > 0 { select { case <-timeout.C: glog.Warningf("Worker %s stopping with %d tasks still running", w.id, len(w.currentTasks)) break case <-time.After(time.Second): // Check again } } // Disconnect from admin server if w.adminClient != nil { if err := w.adminClient.Disconnect(); err != nil { glog.Errorf("Error disconnecting from admin server: %v", err) } } glog.Infof("Worker %s stopped", w.id) return nil } // RegisterTask registers a task factory func (w *Worker) RegisterTask(taskType types.TaskType, factory types.TaskFactory) { w.registry.Register(taskType, factory) } // GetCapabilities returns the worker capabilities func (w *Worker) GetCapabilities() []types.TaskType { return w.config.Capabilities } // GetStatus returns the current worker status func (w *Worker) GetStatus() types.WorkerStatus { w.mutex.RLock() defer w.mutex.RUnlock() var currentTasks []types.Task for _, task := range w.currentTasks { currentTasks = append(currentTasks, *task) } status := "active" if len(w.currentTasks) >= w.config.MaxConcurrent { status = "busy" } return types.WorkerStatus{ WorkerID: w.id, Status: status, Capabilities: w.config.Capabilities, MaxConcurrent: w.config.MaxConcurrent, CurrentLoad: len(w.currentTasks), LastHeartbeat: time.Now(), CurrentTasks: currentTasks, Uptime: time.Since(w.startTime), TasksCompleted: w.tasksCompleted, TasksFailed: w.tasksFailed, } } // HandleTask handles a task execution func (w *Worker) HandleTask(task *types.Task) error { w.mutex.Lock() if len(w.currentTasks) >= w.config.MaxConcurrent { w.mutex.Unlock() return fmt.Errorf("worker is at capacity") } w.currentTasks[task.ID] = task w.mutex.Unlock() // Execute task in goroutine go w.executeTask(task) return nil } // SetCapabilities sets the worker capabilities func (w *Worker) SetCapabilities(capabilities []types.TaskType) { w.config.Capabilities = capabilities } // SetMaxConcurrent sets the maximum concurrent tasks func (w *Worker) SetMaxConcurrent(max int) { w.config.MaxConcurrent = max } // SetHeartbeatInterval sets the heartbeat interval func (w *Worker) SetHeartbeatInterval(interval time.Duration) { w.config.HeartbeatInterval = interval } // SetTaskRequestInterval sets the task request interval func (w *Worker) SetTaskRequestInterval(interval time.Duration) { w.config.TaskRequestInterval = interval } // SetAdminClient sets the admin client func (w *Worker) SetAdminClient(client AdminClient) { w.adminClient = client } // executeTask executes a task func (w *Worker) executeTask(task *types.Task) { defer func() { w.mutex.Lock() delete(w.currentTasks, task.ID) w.mutex.Unlock() }() glog.Infof("Worker %s executing task %s: %s", w.id, task.ID, task.Type) // Determine task-specific working directory var taskWorkingDir string if w.config.BaseWorkingDir != "" { taskWorkingDir = fmt.Sprintf("%s/%s", w.config.BaseWorkingDir, string(task.Type)) } // Create task instance taskParams := types.TaskParams{ VolumeID: task.VolumeID, Server: task.Server, Collection: task.Collection, WorkingDir: taskWorkingDir, Parameters: task.Parameters, GrpcDialOption: w.config.GrpcDialOption, } taskInstance, err := w.registry.CreateTask(task.Type, taskParams) if err != nil { w.completeTask(task.ID, false, fmt.Sprintf("failed to create task: %v", err)) return } // Initialize task logging if err := taskInstance.InitializeTaskLogging(taskWorkingDir, task.ID); err != nil { glog.Warningf("Failed to initialize task logging for %s: %v", task.ID, err) } else { // Ensure logging is closed when task completes defer taskInstance.CloseTaskLogging() taskInstance.LogInfo("Worker %s starting execution of task %s (type: %s)", w.id, task.ID, task.Type) if task.VolumeID != 0 { taskInstance.LogInfo("Task parameters: VolumeID=%d, Server=%s, Collection=%s", task.VolumeID, task.Server, task.Collection) } } // Execute task err = taskInstance.Execute(taskParams) // Report completion if err != nil { taskInstance.LogError("Task execution failed: %v", err) w.completeTask(task.ID, false, err.Error()) w.tasksFailed++ glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err) } else { taskInstance.LogInfo("Task completed successfully") w.completeTask(task.ID, true, "") w.tasksCompleted++ glog.Infof("Worker %s completed task %s successfully", w.id, task.ID) } } // completeTask reports task completion to admin server func (w *Worker) completeTask(taskID string, success bool, errorMsg string) { if w.adminClient != nil { if err := w.adminClient.CompleteTask(taskID, success, errorMsg); err != nil { glog.Errorf("Failed to report task completion: %v", err) } } } // heartbeatLoop sends periodic heartbeats to the admin server func (w *Worker) heartbeatLoop() { w.heartbeatTicker = time.NewTicker(w.config.HeartbeatInterval) defer w.heartbeatTicker.Stop() for { select { case <-w.stopChan: return case <-w.heartbeatTicker.C: w.sendHeartbeat() } } } // taskRequestLoop periodically requests new tasks from the admin server func (w *Worker) taskRequestLoop() { w.requestTicker = time.NewTicker(w.config.TaskRequestInterval) defer w.requestTicker.Stop() for { select { case <-w.stopChan: return case <-w.requestTicker.C: w.requestTasks() } } } // sendHeartbeat sends heartbeat to admin server func (w *Worker) sendHeartbeat() { if w.adminClient != nil { if err := w.adminClient.SendHeartbeat(w.id, &types.WorkerStatus{ WorkerID: w.id, Status: "active", Capabilities: w.config.Capabilities, MaxConcurrent: w.config.MaxConcurrent, CurrentLoad: len(w.currentTasks), LastHeartbeat: time.Now(), }); err != nil { glog.Warningf("Failed to send heartbeat: %v", err) } } } // requestTasks requests new tasks from the admin server func (w *Worker) requestTasks() { w.mutex.RLock() currentLoad := len(w.currentTasks) w.mutex.RUnlock() if currentLoad >= w.config.MaxConcurrent { return // Already at capacity } if w.adminClient != nil { task, err := w.adminClient.RequestTask(w.id, w.config.Capabilities) if err != nil { glog.V(2).Infof("Failed to request task: %v", err) return } if task != nil { if err := w.HandleTask(task); err != nil { glog.Errorf("Failed to handle task: %v", err) } } } } // GetTaskRegistry returns the task registry func (w *Worker) GetTaskRegistry() *tasks.TaskRegistry { return w.registry } // GetCurrentTasks returns the current tasks func (w *Worker) GetCurrentTasks() map[string]*types.Task { w.mutex.RLock() defer w.mutex.RUnlock() tasks := make(map[string]*types.Task) for id, task := range w.currentTasks { tasks[id] = task } return tasks } // registerWorker registers the worker with the admin server func (w *Worker) registerWorker() { workerInfo := &types.Worker{ ID: w.id, Capabilities: w.config.Capabilities, MaxConcurrent: w.config.MaxConcurrent, Status: "active", CurrentLoad: 0, LastHeartbeat: time.Now(), } if err := w.adminClient.RegisterWorker(workerInfo); err != nil { glog.Warningf("Failed to register worker (will retry on next heartbeat): %v", err) } else { glog.Infof("Worker %s registered successfully with admin server", w.id) } } // connectionMonitorLoop monitors connection status func (w *Worker) connectionMonitorLoop() { ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds defer ticker.Stop() for { select { case <-w.stopChan: return case <-ticker.C: // Just monitor connection status - registration is handled automatically // by the client's reconnection logic if w.adminClient != nil && w.adminClient.IsConnected() { glog.V(2).Infof("Worker %s connection status: connected", w.id) } else if w.adminClient != nil { glog.V(1).Infof("Worker %s connection status: disconnected, reconnection in progress", w.id) } } } } // GetConfig returns the worker configuration func (w *Worker) GetConfig() *types.WorkerConfig { return w.config } // GetPerformanceMetrics returns performance metrics func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance { w.mutex.RLock() defer w.mutex.RUnlock() uptime := time.Since(w.startTime) var successRate float64 totalTasks := w.tasksCompleted + w.tasksFailed if totalTasks > 0 { successRate = float64(w.tasksCompleted) / float64(totalTasks) * 100 } return &types.WorkerPerformance{ TasksCompleted: w.tasksCompleted, TasksFailed: w.tasksFailed, AverageTaskTime: 0, // Would need to track this Uptime: uptime, SuccessRate: successRate, } }