diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go index 36f97261a..7496bb568 100644 --- a/weed/admin/dash/worker_grpc_server.go +++ b/weed/admin/dash/worker_grpc_server.go @@ -319,14 +319,20 @@ func (s *WorkerGrpcServer) handleHeartbeat(conn *WorkerConnection, heartbeat *wo // handleTaskRequest processes task requests from workers func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *worker_pb.TaskRequest) { + glog.Infof("DEBUG handleTaskRequest: Worker %s requesting tasks with capabilities %v", conn.workerID, conn.capabilities) + if s.adminServer.maintenanceManager == nil { + glog.Infof("DEBUG handleTaskRequest: maintenance manager is nil") return } // Get next task from maintenance manager task := s.adminServer.maintenanceManager.GetNextTask(conn.workerID, conn.capabilities) + glog.Infof("DEBUG handleTaskRequest: GetNextTask returned task: %v", task != nil) if task != nil { + glog.Infof("DEBUG handleTaskRequest: Assigning task %s (type: %s) to worker %s", task.ID, task.Type, conn.workerID) + // Send task assignment assignment := &worker_pb.AdminMessage{ Timestamp: time.Now().Unix(), @@ -348,10 +354,12 @@ func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *wo select { case conn.outgoing <- assignment: - glog.V(2).Infof("Assigned task %s to worker %s", task.ID, conn.workerID) + glog.Infof("DEBUG handleTaskRequest: Successfully assigned task %s to worker %s", task.ID, conn.workerID) case <-time.After(time.Second): glog.Warningf("Failed to send task assignment to worker %s", conn.workerID) } + } else { + glog.Infof("DEBUG handleTaskRequest: No tasks available for worker %s", conn.workerID) } } diff --git a/weed/admin/handlers/maintenance_handlers.go b/weed/admin/handlers/maintenance_handlers.go index cf090d5ad..7c5feacd6 100644 --- a/weed/admin/handlers/maintenance_handlers.go +++ b/weed/admin/handlers/maintenance_handlers.go @@ -300,32 +300,20 @@ func (h *MaintenanceHandlers) UpdateMaintenanceConfig(c *gin.Context) { // Helper methods that delegate to AdminServer func (h *MaintenanceHandlers) getMaintenanceQueueData() (*maintenance.MaintenanceQueueData, error) { - glog.Infof("DEBUG getMaintenanceQueueData: starting data assembly") - tasks, err := h.getMaintenanceTasks() if err != nil { - glog.Infof("DEBUG getMaintenanceQueueData: error getting tasks: %v", err) return nil, err } - glog.Infof("DEBUG getMaintenanceQueueData: got %d tasks", len(tasks)) workers, err := h.getMaintenanceWorkers() if err != nil { - glog.Infof("DEBUG getMaintenanceQueueData: error getting workers: %v", err) return nil, err } - glog.Infof("DEBUG getMaintenanceQueueData: got %d workers", len(workers)) stats, err := h.getMaintenanceQueueStats() if err != nil { - glog.Infof("DEBUG getMaintenanceQueueData: error getting stats: %v", err) return nil, err } - if stats != nil { - glog.Infof("DEBUG getMaintenanceQueueData: got stats {pending: %d, running: %d}", stats.PendingTasks, stats.RunningTasks) - } else { - glog.Infof("DEBUG getMaintenanceQueueData: stats is nil") - } data := &maintenance.MaintenanceQueueData{ Tasks: tasks, @@ -334,7 +322,6 @@ func (h *MaintenanceHandlers) getMaintenanceQueueData() (*maintenance.Maintenanc LastUpdated: time.Now(), } - glog.Infof("DEBUG getMaintenanceQueueData: assembled data with %d tasks, %d workers", len(data.Tasks), len(data.Workers)) return data, nil } @@ -346,35 +333,32 @@ func (h *MaintenanceHandlers) getMaintenanceQueueStats() (*maintenance.QueueStat func (h *MaintenanceHandlers) getMaintenanceTasks() ([]*maintenance.MaintenanceTask, error) { // Call the maintenance manager directly to get all tasks if h.adminServer == nil { - glog.Infof("DEBUG getMaintenanceTasks: adminServer is nil") return []*maintenance.MaintenanceTask{}, nil } - if h.adminServer.GetMaintenanceManager() == nil { - glog.Infof("DEBUG getMaintenanceTasks: maintenance manager is nil") + manager := h.adminServer.GetMaintenanceManager() + if manager == nil { return []*maintenance.MaintenanceTask{}, nil } // Get ALL tasks using empty parameters - this should match what the API returns - allTasks := h.adminServer.GetMaintenanceManager().GetTasks("", "", 0) - glog.Infof("DEBUG getMaintenanceTasks: retrieved %d tasks from maintenance manager", len(allTasks)) - - for i, task := range allTasks { - if task != nil { - glog.Infof("DEBUG getMaintenanceTasks: task[%d] = {id: %s, type: %s, status: %s, volume: %d}", - i, task.ID, task.Type, task.Status, task.VolumeID) - } else { - glog.Infof("DEBUG getMaintenanceTasks: task[%d] is nil", i) - } - } - + allTasks := manager.GetTasks("", "", 0) return allTasks, nil } func (h *MaintenanceHandlers) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) { - // This would integrate with the maintenance system to get real workers - // For now, return mock data - return []*maintenance.MaintenanceWorker{}, nil + // Get workers from the admin server's maintenance manager + if h.adminServer == nil { + return []*maintenance.MaintenanceWorker{}, nil + } + + if h.adminServer.GetMaintenanceManager() == nil { + return []*maintenance.MaintenanceWorker{}, nil + } + + // Get workers from the maintenance manager + workers := h.adminServer.GetMaintenanceManager().GetWorkers() + return workers, nil } func (h *MaintenanceHandlers) getMaintenanceConfig() (*maintenance.MaintenanceConfigData, error) { diff --git a/weed/admin/maintenance/maintenance_integration.go b/weed/admin/maintenance/maintenance_integration.go index 9a965d38a..0ba4d9e55 100644 --- a/weed/admin/maintenance/maintenance_integration.go +++ b/weed/admin/maintenance/maintenance_integration.go @@ -259,31 +259,44 @@ func (s *MaintenanceIntegration) convertToExistingFormat(result *types.TaskDetec // CanScheduleWithTaskSchedulers determines if a task can be scheduled using task schedulers with dynamic type conversion func (s *MaintenanceIntegration) CanScheduleWithTaskSchedulers(task *MaintenanceTask, runningTasks []*MaintenanceTask, availableWorkers []*MaintenanceWorker) bool { + glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Checking task %s (type: %s)", task.ID, task.Type) + // Convert existing types to task types using mapping taskType, exists := s.revTaskTypeMap[task.Type] if !exists { - glog.V(2).Infof("Unknown task type %s for scheduling, falling back to existing logic", task.Type) + glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Unknown task type %s for scheduling, falling back to existing logic", task.Type) return false // Fallback to existing logic for unknown types } + glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Mapped task type %s to %s", task.Type, taskType) + // Convert task objects taskObject := s.convertTaskToTaskSystem(task) if taskObject == nil { - glog.V(2).Infof("Failed to convert task %s for scheduling", task.ID) + glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Failed to convert task %s for scheduling", task.ID) return false } + glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Successfully converted task %s", task.ID) + runningTaskObjects := s.convertTasksToTaskSystem(runningTasks) workerObjects := s.convertWorkersToTaskSystem(availableWorkers) + glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Converted %d running tasks and %d workers", len(runningTaskObjects), len(workerObjects)) + // Get the appropriate scheduler scheduler := s.taskRegistry.GetScheduler(taskType) if scheduler == nil { - glog.V(2).Infof("No scheduler found for task type %s", taskType) + glog.Infof("DEBUG CanScheduleWithTaskSchedulers: No scheduler found for task type %s", taskType) return false } - return scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects) + glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Found scheduler for task type %s", taskType) + + canSchedule := scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects) + glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Scheduler decision for task %s: %v", task.ID, canSchedule) + + return canSchedule } // convertTaskToTaskSystem converts existing task to task system format using dynamic mapping diff --git a/weed/admin/maintenance/maintenance_manager.go b/weed/admin/maintenance/maintenance_manager.go index 5d87d817e..049fb3529 100644 --- a/weed/admin/maintenance/maintenance_manager.go +++ b/weed/admin/maintenance/maintenance_manager.go @@ -18,11 +18,12 @@ type MaintenanceManager struct { running bool stopChan chan struct{} // Error handling and backoff - errorCount int - lastError error - lastErrorTime time.Time - backoffDelay time.Duration - mutex sync.RWMutex + errorCount int + lastError error + lastErrorTime time.Time + backoffDelay time.Duration + mutex sync.RWMutex + scanInProgress bool } // NewMaintenanceManager creates a new maintenance manager @@ -170,25 +171,48 @@ func (mm *MaintenanceManager) cleanupLoop() { // performScan executes a maintenance scan with error handling and backoff func (mm *MaintenanceManager) performScan() { - mm.mutex.Lock() - defer mm.mutex.Unlock() + defer func() { + // Always reset scan in progress flag when done + mm.mutex.Lock() + mm.scanInProgress = false + mm.mutex.Unlock() + }() - glog.V(2).Infof("Starting maintenance scan") + glog.Infof("Starting maintenance scan...") results, err := mm.scanner.ScanForMaintenanceTasks() if err != nil { + mm.mutex.Lock() mm.handleScanError(err) + mm.mutex.Unlock() + glog.Warningf("Maintenance scan failed: %v", err) return } - // Scan succeeded, reset error tracking + // Scan succeeded, reset error tracking and add tasks + mm.mutex.Lock() mm.resetErrorTracking() if len(results) > 0 { + // Count tasks by type for logging + taskCounts := make(map[MaintenanceTaskType]int) + for _, result := range results { + taskCounts[result.TaskType]++ + } + + // Release the mutex before calling AddTasksFromResults to avoid holding + // the manager mutex while trying to acquire the queue mutex + mm.mutex.Unlock() mm.queue.AddTasksFromResults(results) - glog.V(1).Infof("Maintenance scan completed: added %d tasks", len(results)) + + // Log detailed scan results + glog.Infof("Maintenance scan completed: found %d tasks", len(results)) + for taskType, count := range taskCounts { + glog.Infof(" - %s: %d tasks", taskType, count) + } } else { - glog.V(2).Infof("Maintenance scan completed: no tasks needed") + mm.mutex.Unlock() + glog.Infof("Maintenance scan completed: no maintenance tasks needed") } } @@ -334,6 +358,16 @@ func (mm *MaintenanceManager) TriggerScan() error { return fmt.Errorf("maintenance manager is not running") } + // Prevent multiple concurrent scans + mm.mutex.Lock() + if mm.scanInProgress { + mm.mutex.Unlock() + glog.V(1).Infof("Scan already in progress, ignoring trigger request") + return nil + } + mm.scanInProgress = true + mm.mutex.Unlock() + go mm.performScan() return nil } diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index 0e8ac8b24..8528707bb 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -33,7 +33,8 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { // Check for duplicate tasks (same type + volume + not completed) if mq.hasDuplicateTask(task) { - glog.V(2).Infof("Skipping duplicate task: %s for volume %d (already exists)", task.Type, task.VolumeID) + glog.V(1).Infof("Task skipped (duplicate): %s for volume %d on %s (already queued or running)", + task.Type, task.VolumeID, task.Server) return } @@ -53,7 +54,13 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt) }) - glog.V(2).Infof("Added maintenance task %s: %s for volume %d", task.ID, task.Type, task.VolumeID) + scheduleInfo := "" + if !task.ScheduledAt.IsZero() && time.Until(task.ScheduledAt) > time.Minute { + scheduleInfo = fmt.Sprintf(", scheduled for %v", task.ScheduledAt.Format("15:04:05")) + } + + glog.Infof("Task queued: %s (%s) volume %d on %s, priority %s%s, reason: %s", + task.ID, task.Type, task.VolumeID, task.Server, task.Priority, scheduleInfo, task.Reason) } // hasDuplicateTask checks if a similar task already exists (same type, volume, and not completed) @@ -90,72 +97,89 @@ func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) // GetNextTask returns the next available task for a worker func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask { - mq.mutex.Lock() - defer mq.mutex.Unlock() - - glog.Infof("DEBUG GetNextTask: Worker %s requesting task with capabilities %v", workerID, capabilities) - glog.Infof("DEBUG GetNextTask: Total pending tasks: %d, Total workers: %d", len(mq.pendingTasks), len(mq.workers)) + // Use read lock for initial checks and search + mq.mutex.RLock() worker, exists := mq.workers[workerID] if !exists { - glog.Infof("DEBUG GetNextTask: Worker %s not found in workers map", workerID) + mq.mutex.RUnlock() + glog.V(2).Infof("Task assignment failed for worker %s: worker not registered", workerID) return nil } - glog.Infof("DEBUG GetNextTask: Worker %s found, CurrentLoad: %d, MaxConcurrent: %d", workerID, worker.CurrentLoad, worker.MaxConcurrent) - // Check if worker has capacity if worker.CurrentLoad >= worker.MaxConcurrent { - glog.Infof("DEBUG GetNextTask: Worker %s at capacity", workerID) + mq.mutex.RUnlock() + glog.V(2).Infof("Task assignment failed for worker %s: at capacity (%d/%d)", workerID, worker.CurrentLoad, worker.MaxConcurrent) return nil } now := time.Now() + var selectedTask *MaintenanceTask + var selectedIndex int = -1 - // Find the next suitable task + // Find the next suitable task (using read lock) for i, task := range mq.pendingTasks { - glog.Infof("DEBUG GetNextTask: Evaluating task[%d] %s (type: %s, volume: %d)", i, task.ID, task.Type, task.VolumeID) - // Check if it's time to execute the task if task.ScheduledAt.After(now) { - glog.Infof("DEBUG GetNextTask: Task %s scheduled for future (%v)", task.ID, task.ScheduledAt) + glog.V(3).Infof("Task %s skipped for worker %s: scheduled for future (%v)", task.ID, workerID, task.ScheduledAt) continue } // Check if worker can handle this task type if !mq.workerCanHandle(task.Type, capabilities) { - glog.Infof("DEBUG GetNextTask: Worker %s cannot handle task type %s (capabilities: %v)", workerID, task.Type, capabilities) + glog.V(3).Infof("Task %s (%s) skipped for worker %s: capability mismatch (worker has: %v)", task.ID, task.Type, workerID, capabilities) continue } - // Check scheduling logic - use simplified system if available, otherwise fallback + // Check if this task type needs a cooldown period if !mq.canScheduleTaskNow(task) { - glog.Infof("DEBUG GetNextTask: Task %s cannot be scheduled now", task.ID) + glog.V(3).Infof("Task %s (%s) skipped for worker %s: scheduling constraints not met", task.ID, task.Type, workerID) continue } - glog.Infof("DEBUG GetNextTask: Assigning task %s to worker %s", task.ID, workerID) + // Found a suitable task + selectedTask = task + selectedIndex = i + break + } + + // Release read lock + mq.mutex.RUnlock() - // Assign task to worker - task.Status = TaskStatusAssigned - task.WorkerID = workerID - startTime := now - task.StartedAt = &startTime + // If no task found, return nil + if selectedTask == nil { + glog.V(2).Infof("No suitable tasks available for worker %s (checked %d pending tasks)", workerID, len(mq.pendingTasks)) + return nil + } - // Remove from pending tasks - mq.pendingTasks = append(mq.pendingTasks[:i], mq.pendingTasks[i+1:]...) + // Now acquire write lock to actually assign the task + mq.mutex.Lock() + defer mq.mutex.Unlock() - // Update worker - worker.CurrentTask = task - worker.CurrentLoad++ - worker.Status = "busy" + // Re-check that the task is still available (it might have been assigned to another worker) + if selectedIndex >= len(mq.pendingTasks) || mq.pendingTasks[selectedIndex].ID != selectedTask.ID { + glog.V(2).Infof("Task %s no longer available for worker %s: assigned to another worker", selectedTask.ID, workerID) + return nil + } + + // Assign the task + selectedTask.Status = TaskStatusAssigned + selectedTask.WorkerID = workerID + selectedTask.StartedAt = &now - glog.Infof("DEBUG GetNextTask: Successfully assigned task %s to worker %s", task.ID, workerID) - return task + // Remove from pending tasks + mq.pendingTasks = append(mq.pendingTasks[:selectedIndex], mq.pendingTasks[selectedIndex+1:]...) + + // Update worker load + if worker, exists := mq.workers[workerID]; exists { + worker.CurrentLoad++ } - glog.Infof("DEBUG GetNextTask: No suitable tasks found for worker %s", workerID) - return nil + glog.Infof("Task assigned: %s (%s) → worker %s (volume %d, server %s)", + selectedTask.ID, selectedTask.Type, workerID, selectedTask.VolumeID, selectedTask.Server) + + return selectedTask } // CompleteTask marks a task as completed @@ -165,12 +189,19 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { task, exists := mq.tasks[taskID] if !exists { + glog.Warningf("Attempted to complete non-existent task: %s", taskID) return } completedTime := time.Now() task.CompletedAt = &completedTime + // Calculate task duration + var duration time.Duration + if task.StartedAt != nil { + duration = completedTime.Sub(*task.StartedAt) + } + if error != "" { task.Status = TaskStatusFailed task.Error = error @@ -186,14 +217,17 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { task.ScheduledAt = time.Now().Add(15 * time.Minute) // Retry delay mq.pendingTasks = append(mq.pendingTasks, task) - glog.V(2).Infof("Retrying task %s (attempt %d/%d)", taskID, task.RetryCount, task.MaxRetries) + glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s", + taskID, task.Type, task.RetryCount, task.MaxRetries, task.WorkerID, duration, error) } else { - glog.Errorf("Task %s failed permanently after %d retries: %s", taskID, task.MaxRetries, error) + glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s", + taskID, task.Type, task.WorkerID, duration, task.MaxRetries, error) } } else { task.Status = TaskStatusCompleted task.Progress = 100 - glog.V(2).Infof("Task %s completed successfully", taskID) + glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d", + taskID, task.Type, task.WorkerID, duration, task.VolumeID) } // Update worker @@ -214,8 +248,23 @@ func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64) defer mq.mutex.RUnlock() if task, exists := mq.tasks[taskID]; exists { + oldProgress := task.Progress task.Progress = progress task.Status = TaskStatusInProgress + + // Log progress at significant milestones or changes + if progress == 0 { + glog.V(1).Infof("Task started: %s (%s) worker %s, volume %d", + taskID, task.Type, task.WorkerID, task.VolumeID) + } else if progress >= 100 { + glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", + taskID, task.Type, task.WorkerID, progress) + } else if progress-oldProgress >= 25 { // Log every 25% increment + glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", + taskID, task.Type, task.WorkerID, progress) + } + } else { + glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress) } } @@ -224,12 +273,25 @@ func (mq *MaintenanceQueue) RegisterWorker(worker *MaintenanceWorker) { mq.mutex.Lock() defer mq.mutex.Unlock() + isNewWorker := true + if existingWorker, exists := mq.workers[worker.ID]; exists { + isNewWorker = false + glog.Infof("Worker reconnected: %s at %s (capabilities: %v, max concurrent: %d)", + worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent) + + // Preserve current load when reconnecting + worker.CurrentLoad = existingWorker.CurrentLoad + } else { + glog.Infof("Worker registered: %s at %s (capabilities: %v, max concurrent: %d)", + worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent) + } + worker.LastHeartbeat = time.Now() worker.Status = "active" - worker.CurrentLoad = 0 + if isNewWorker { + worker.CurrentLoad = 0 + } mq.workers[worker.ID] = worker - - glog.V(1).Infof("Registered maintenance worker %s at %s", worker.ID, worker.Address) } // UpdateWorkerHeartbeat updates worker heartbeat @@ -238,7 +300,15 @@ func (mq *MaintenanceQueue) UpdateWorkerHeartbeat(workerID string) { defer mq.mutex.Unlock() if worker, exists := mq.workers[workerID]; exists { + lastSeen := worker.LastHeartbeat worker.LastHeartbeat = time.Now() + + // Log if worker was offline for a while + if time.Since(lastSeen) > 2*time.Minute { + glog.Infof("Worker %s heartbeat resumed after %v", workerID, time.Since(lastSeen)) + } + } else { + glog.V(2).Infof("Heartbeat from unknown worker: %s", workerID) } } diff --git a/weed/worker/client.go b/weed/worker/client.go index a54661d77..7359b44bd 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -115,9 +115,16 @@ func (c *GrpcAdminClient) attemptConnection() error { c.stream = stream c.connected = true - // Start stream handlers - go c.handleOutgoing() - go c.handleIncoming() + // Start stream handlers with synchronization + outgoingReady := make(chan struct{}) + incomingReady := make(chan struct{}) + + go c.handleOutgoingWithReady(outgoingReady) + go c.handleIncomingWithReady(incomingReady) + + // Wait for both handlers to be ready + <-outgoingReady + <-incomingReady glog.Infof("Connected to admin server at %s", c.adminAddress) return nil @@ -329,6 +336,15 @@ func (c *GrpcAdminClient) handleOutgoing() { } } +// handleOutgoingWithReady processes outgoing messages and signals when ready +func (c *GrpcAdminClient) handleOutgoingWithReady(ready chan struct{}) { + // Signal that this handler is ready to process messages + close(ready) + + // Now process messages normally + c.handleOutgoing() +} + // handleIncoming processes incoming messages from admin func (c *GrpcAdminClient) handleIncoming() { for { @@ -363,6 +379,15 @@ func (c *GrpcAdminClient) handleIncoming() { } } +// handleIncomingWithReady processes incoming messages and signals when ready +func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) { + // Signal that this handler is ready to process messages + close(ready) + + // Now process messages normally + c.handleIncoming() +} + // RegisterWorker registers the worker with the admin server func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error { if !c.connected {