Browse Source

clean up logs

worker-execute-ec-tasks
chrislu 4 months ago
parent
commit
a3d727034a
  1. 10
      weed/admin/dash/worker_grpc_server.go
  2. 44
      weed/admin/handlers/maintenance_handlers.go
  3. 21
      weed/admin/maintenance/maintenance_integration.go
  4. 44
      weed/admin/maintenance/maintenance_manager.go
  5. 146
      weed/admin/maintenance/maintenance_queue.go
  6. 31
      weed/worker/client.go

10
weed/admin/dash/worker_grpc_server.go

@ -319,14 +319,20 @@ func (s *WorkerGrpcServer) handleHeartbeat(conn *WorkerConnection, heartbeat *wo
// handleTaskRequest processes task requests from workers
func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *worker_pb.TaskRequest) {
glog.Infof("DEBUG handleTaskRequest: Worker %s requesting tasks with capabilities %v", conn.workerID, conn.capabilities)
if s.adminServer.maintenanceManager == nil {
glog.Infof("DEBUG handleTaskRequest: maintenance manager is nil")
return
}
// Get next task from maintenance manager
task := s.adminServer.maintenanceManager.GetNextTask(conn.workerID, conn.capabilities)
glog.Infof("DEBUG handleTaskRequest: GetNextTask returned task: %v", task != nil)
if task != nil {
glog.Infof("DEBUG handleTaskRequest: Assigning task %s (type: %s) to worker %s", task.ID, task.Type, conn.workerID)
// Send task assignment
assignment := &worker_pb.AdminMessage{
Timestamp: time.Now().Unix(),
@ -348,10 +354,12 @@ func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *wo
select {
case conn.outgoing <- assignment:
glog.V(2).Infof("Assigned task %s to worker %s", task.ID, conn.workerID)
glog.Infof("DEBUG handleTaskRequest: Successfully assigned task %s to worker %s", task.ID, conn.workerID)
case <-time.After(time.Second):
glog.Warningf("Failed to send task assignment to worker %s", conn.workerID)
}
} else {
glog.Infof("DEBUG handleTaskRequest: No tasks available for worker %s", conn.workerID)
}
}

44
weed/admin/handlers/maintenance_handlers.go

@ -300,32 +300,20 @@ func (h *MaintenanceHandlers) UpdateMaintenanceConfig(c *gin.Context) {
// Helper methods that delegate to AdminServer
func (h *MaintenanceHandlers) getMaintenanceQueueData() (*maintenance.MaintenanceQueueData, error) {
glog.Infof("DEBUG getMaintenanceQueueData: starting data assembly")
tasks, err := h.getMaintenanceTasks()
if err != nil {
glog.Infof("DEBUG getMaintenanceQueueData: error getting tasks: %v", err)
return nil, err
}
glog.Infof("DEBUG getMaintenanceQueueData: got %d tasks", len(tasks))
workers, err := h.getMaintenanceWorkers()
if err != nil {
glog.Infof("DEBUG getMaintenanceQueueData: error getting workers: %v", err)
return nil, err
}
glog.Infof("DEBUG getMaintenanceQueueData: got %d workers", len(workers))
stats, err := h.getMaintenanceQueueStats()
if err != nil {
glog.Infof("DEBUG getMaintenanceQueueData: error getting stats: %v", err)
return nil, err
}
if stats != nil {
glog.Infof("DEBUG getMaintenanceQueueData: got stats {pending: %d, running: %d}", stats.PendingTasks, stats.RunningTasks)
} else {
glog.Infof("DEBUG getMaintenanceQueueData: stats is nil")
}
data := &maintenance.MaintenanceQueueData{
Tasks: tasks,
@ -334,7 +322,6 @@ func (h *MaintenanceHandlers) getMaintenanceQueueData() (*maintenance.Maintenanc
LastUpdated: time.Now(),
}
glog.Infof("DEBUG getMaintenanceQueueData: assembled data with %d tasks, %d workers", len(data.Tasks), len(data.Workers))
return data, nil
}
@ -346,37 +333,34 @@ func (h *MaintenanceHandlers) getMaintenanceQueueStats() (*maintenance.QueueStat
func (h *MaintenanceHandlers) getMaintenanceTasks() ([]*maintenance.MaintenanceTask, error) {
// Call the maintenance manager directly to get all tasks
if h.adminServer == nil {
glog.Infof("DEBUG getMaintenanceTasks: adminServer is nil")
return []*maintenance.MaintenanceTask{}, nil
}
if h.adminServer.GetMaintenanceManager() == nil {
glog.Infof("DEBUG getMaintenanceTasks: maintenance manager is nil")
manager := h.adminServer.GetMaintenanceManager()
if manager == nil {
return []*maintenance.MaintenanceTask{}, nil
}
// Get ALL tasks using empty parameters - this should match what the API returns
allTasks := h.adminServer.GetMaintenanceManager().GetTasks("", "", 0)
glog.Infof("DEBUG getMaintenanceTasks: retrieved %d tasks from maintenance manager", len(allTasks))
for i, task := range allTasks {
if task != nil {
glog.Infof("DEBUG getMaintenanceTasks: task[%d] = {id: %s, type: %s, status: %s, volume: %d}",
i, task.ID, task.Type, task.Status, task.VolumeID)
} else {
glog.Infof("DEBUG getMaintenanceTasks: task[%d] is nil", i)
}
}
allTasks := manager.GetTasks("", "", 0)
return allTasks, nil
}
func (h *MaintenanceHandlers) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) {
// This would integrate with the maintenance system to get real workers
// For now, return mock data
// Get workers from the admin server's maintenance manager
if h.adminServer == nil {
return []*maintenance.MaintenanceWorker{}, nil
}
if h.adminServer.GetMaintenanceManager() == nil {
return []*maintenance.MaintenanceWorker{}, nil
}
// Get workers from the maintenance manager
workers := h.adminServer.GetMaintenanceManager().GetWorkers()
return workers, nil
}
func (h *MaintenanceHandlers) getMaintenanceConfig() (*maintenance.MaintenanceConfigData, error) {
// Delegate to AdminServer's real persistence method
return h.adminServer.GetMaintenanceConfigData()

21
weed/admin/maintenance/maintenance_integration.go

@ -259,31 +259,44 @@ func (s *MaintenanceIntegration) convertToExistingFormat(result *types.TaskDetec
// CanScheduleWithTaskSchedulers determines if a task can be scheduled using task schedulers with dynamic type conversion
func (s *MaintenanceIntegration) CanScheduleWithTaskSchedulers(task *MaintenanceTask, runningTasks []*MaintenanceTask, availableWorkers []*MaintenanceWorker) bool {
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Checking task %s (type: %s)", task.ID, task.Type)
// Convert existing types to task types using mapping
taskType, exists := s.revTaskTypeMap[task.Type]
if !exists {
glog.V(2).Infof("Unknown task type %s for scheduling, falling back to existing logic", task.Type)
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Unknown task type %s for scheduling, falling back to existing logic", task.Type)
return false // Fallback to existing logic for unknown types
}
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Mapped task type %s to %s", task.Type, taskType)
// Convert task objects
taskObject := s.convertTaskToTaskSystem(task)
if taskObject == nil {
glog.V(2).Infof("Failed to convert task %s for scheduling", task.ID)
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Failed to convert task %s for scheduling", task.ID)
return false
}
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Successfully converted task %s", task.ID)
runningTaskObjects := s.convertTasksToTaskSystem(runningTasks)
workerObjects := s.convertWorkersToTaskSystem(availableWorkers)
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Converted %d running tasks and %d workers", len(runningTaskObjects), len(workerObjects))
// Get the appropriate scheduler
scheduler := s.taskRegistry.GetScheduler(taskType)
if scheduler == nil {
glog.V(2).Infof("No scheduler found for task type %s", taskType)
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: No scheduler found for task type %s", taskType)
return false
}
return scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects)
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Found scheduler for task type %s", taskType)
canSchedule := scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects)
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Scheduler decision for task %s: %v", task.ID, canSchedule)
return canSchedule
}
// convertTaskToTaskSystem converts existing task to task system format using dynamic mapping

44
weed/admin/maintenance/maintenance_manager.go

@ -23,6 +23,7 @@ type MaintenanceManager struct {
lastErrorTime time.Time
backoffDelay time.Duration
mutex sync.RWMutex
scanInProgress bool
}
// NewMaintenanceManager creates a new maintenance manager
@ -170,25 +171,48 @@ func (mm *MaintenanceManager) cleanupLoop() {
// performScan executes a maintenance scan with error handling and backoff
func (mm *MaintenanceManager) performScan() {
defer func() {
// Always reset scan in progress flag when done
mm.mutex.Lock()
defer mm.mutex.Unlock()
mm.scanInProgress = false
mm.mutex.Unlock()
}()
glog.V(2).Infof("Starting maintenance scan")
glog.Infof("Starting maintenance scan...")
results, err := mm.scanner.ScanForMaintenanceTasks()
if err != nil {
mm.mutex.Lock()
mm.handleScanError(err)
mm.mutex.Unlock()
glog.Warningf("Maintenance scan failed: %v", err)
return
}
// Scan succeeded, reset error tracking
// Scan succeeded, reset error tracking and add tasks
mm.mutex.Lock()
mm.resetErrorTracking()
if len(results) > 0 {
// Count tasks by type for logging
taskCounts := make(map[MaintenanceTaskType]int)
for _, result := range results {
taskCounts[result.TaskType]++
}
// Release the mutex before calling AddTasksFromResults to avoid holding
// the manager mutex while trying to acquire the queue mutex
mm.mutex.Unlock()
mm.queue.AddTasksFromResults(results)
glog.V(1).Infof("Maintenance scan completed: added %d tasks", len(results))
// Log detailed scan results
glog.Infof("Maintenance scan completed: found %d tasks", len(results))
for taskType, count := range taskCounts {
glog.Infof(" - %s: %d tasks", taskType, count)
}
} else {
glog.V(2).Infof("Maintenance scan completed: no tasks needed")
mm.mutex.Unlock()
glog.Infof("Maintenance scan completed: no maintenance tasks needed")
}
}
@ -334,6 +358,16 @@ func (mm *MaintenanceManager) TriggerScan() error {
return fmt.Errorf("maintenance manager is not running")
}
// Prevent multiple concurrent scans
mm.mutex.Lock()
if mm.scanInProgress {
mm.mutex.Unlock()
glog.V(1).Infof("Scan already in progress, ignoring trigger request")
return nil
}
mm.scanInProgress = true
mm.mutex.Unlock()
go mm.performScan()
return nil
}

146
weed/admin/maintenance/maintenance_queue.go

@ -33,7 +33,8 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) {
// Check for duplicate tasks (same type + volume + not completed)
if mq.hasDuplicateTask(task) {
glog.V(2).Infof("Skipping duplicate task: %s for volume %d (already exists)", task.Type, task.VolumeID)
glog.V(1).Infof("Task skipped (duplicate): %s for volume %d on %s (already queued or running)",
task.Type, task.VolumeID, task.Server)
return
}
@ -53,7 +54,13 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) {
return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt)
})
glog.V(2).Infof("Added maintenance task %s: %s for volume %d", task.ID, task.Type, task.VolumeID)
scheduleInfo := ""
if !task.ScheduledAt.IsZero() && time.Until(task.ScheduledAt) > time.Minute {
scheduleInfo = fmt.Sprintf(", scheduled for %v", task.ScheduledAt.Format("15:04:05"))
}
glog.Infof("Task queued: %s (%s) volume %d on %s, priority %s%s, reason: %s",
task.ID, task.Type, task.VolumeID, task.Server, task.Priority, scheduleInfo, task.Reason)
}
// hasDuplicateTask checks if a similar task already exists (same type, volume, and not completed)
@ -90,72 +97,89 @@ func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult)
// GetNextTask returns the next available task for a worker
func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
mq.mutex.Lock()
defer mq.mutex.Unlock()
glog.Infof("DEBUG GetNextTask: Worker %s requesting task with capabilities %v", workerID, capabilities)
glog.Infof("DEBUG GetNextTask: Total pending tasks: %d, Total workers: %d", len(mq.pendingTasks), len(mq.workers))
// Use read lock for initial checks and search
mq.mutex.RLock()
worker, exists := mq.workers[workerID]
if !exists {
glog.Infof("DEBUG GetNextTask: Worker %s not found in workers map", workerID)
mq.mutex.RUnlock()
glog.V(2).Infof("Task assignment failed for worker %s: worker not registered", workerID)
return nil
}
glog.Infof("DEBUG GetNextTask: Worker %s found, CurrentLoad: %d, MaxConcurrent: %d", workerID, worker.CurrentLoad, worker.MaxConcurrent)
// Check if worker has capacity
if worker.CurrentLoad >= worker.MaxConcurrent {
glog.Infof("DEBUG GetNextTask: Worker %s at capacity", workerID)
mq.mutex.RUnlock()
glog.V(2).Infof("Task assignment failed for worker %s: at capacity (%d/%d)", workerID, worker.CurrentLoad, worker.MaxConcurrent)
return nil
}
now := time.Now()
var selectedTask *MaintenanceTask
var selectedIndex int = -1
// Find the next suitable task
// Find the next suitable task (using read lock)
for i, task := range mq.pendingTasks {
glog.Infof("DEBUG GetNextTask: Evaluating task[%d] %s (type: %s, volume: %d)", i, task.ID, task.Type, task.VolumeID)
// Check if it's time to execute the task
if task.ScheduledAt.After(now) {
glog.Infof("DEBUG GetNextTask: Task %s scheduled for future (%v)", task.ID, task.ScheduledAt)
glog.V(3).Infof("Task %s skipped for worker %s: scheduled for future (%v)", task.ID, workerID, task.ScheduledAt)
continue
}
// Check if worker can handle this task type
if !mq.workerCanHandle(task.Type, capabilities) {
glog.Infof("DEBUG GetNextTask: Worker %s cannot handle task type %s (capabilities: %v)", workerID, task.Type, capabilities)
glog.V(3).Infof("Task %s (%s) skipped for worker %s: capability mismatch (worker has: %v)", task.ID, task.Type, workerID, capabilities)
continue
}
// Check scheduling logic - use simplified system if available, otherwise fallback
// Check if this task type needs a cooldown period
if !mq.canScheduleTaskNow(task) {
glog.Infof("DEBUG GetNextTask: Task %s cannot be scheduled now", task.ID)
glog.V(3).Infof("Task %s (%s) skipped for worker %s: scheduling constraints not met", task.ID, task.Type, workerID)
continue
}
glog.Infof("DEBUG GetNextTask: Assigning task %s to worker %s", task.ID, workerID)
// Found a suitable task
selectedTask = task
selectedIndex = i
break
}
// Release read lock
mq.mutex.RUnlock()
// If no task found, return nil
if selectedTask == nil {
glog.V(2).Infof("No suitable tasks available for worker %s (checked %d pending tasks)", workerID, len(mq.pendingTasks))
return nil
}
// Now acquire write lock to actually assign the task
mq.mutex.Lock()
defer mq.mutex.Unlock()
// Re-check that the task is still available (it might have been assigned to another worker)
if selectedIndex >= len(mq.pendingTasks) || mq.pendingTasks[selectedIndex].ID != selectedTask.ID {
glog.V(2).Infof("Task %s no longer available for worker %s: assigned to another worker", selectedTask.ID, workerID)
return nil
}
// Assign task to worker
task.Status = TaskStatusAssigned
task.WorkerID = workerID
startTime := now
task.StartedAt = &startTime
// Assign the task
selectedTask.Status = TaskStatusAssigned
selectedTask.WorkerID = workerID
selectedTask.StartedAt = &now
// Remove from pending tasks
mq.pendingTasks = append(mq.pendingTasks[:i], mq.pendingTasks[i+1:]...)
mq.pendingTasks = append(mq.pendingTasks[:selectedIndex], mq.pendingTasks[selectedIndex+1:]...)
// Update worker
worker.CurrentTask = task
// Update worker load
if worker, exists := mq.workers[workerID]; exists {
worker.CurrentLoad++
worker.Status = "busy"
glog.Infof("DEBUG GetNextTask: Successfully assigned task %s to worker %s", task.ID, workerID)
return task
}
glog.Infof("DEBUG GetNextTask: No suitable tasks found for worker %s", workerID)
return nil
glog.Infof("Task assigned: %s (%s) → worker %s (volume %d, server %s)",
selectedTask.ID, selectedTask.Type, workerID, selectedTask.VolumeID, selectedTask.Server)
return selectedTask
}
// CompleteTask marks a task as completed
@ -165,12 +189,19 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
task, exists := mq.tasks[taskID]
if !exists {
glog.Warningf("Attempted to complete non-existent task: %s", taskID)
return
}
completedTime := time.Now()
task.CompletedAt = &completedTime
// Calculate task duration
var duration time.Duration
if task.StartedAt != nil {
duration = completedTime.Sub(*task.StartedAt)
}
if error != "" {
task.Status = TaskStatusFailed
task.Error = error
@ -186,14 +217,17 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
task.ScheduledAt = time.Now().Add(15 * time.Minute) // Retry delay
mq.pendingTasks = append(mq.pendingTasks, task)
glog.V(2).Infof("Retrying task %s (attempt %d/%d)", taskID, task.RetryCount, task.MaxRetries)
glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s",
taskID, task.Type, task.RetryCount, task.MaxRetries, task.WorkerID, duration, error)
} else {
glog.Errorf("Task %s failed permanently after %d retries: %s", taskID, task.MaxRetries, error)
glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s",
taskID, task.Type, task.WorkerID, duration, task.MaxRetries, error)
}
} else {
task.Status = TaskStatusCompleted
task.Progress = 100
glog.V(2).Infof("Task %s completed successfully", taskID)
glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d",
taskID, task.Type, task.WorkerID, duration, task.VolumeID)
}
// Update worker
@ -214,8 +248,23 @@ func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64)
defer mq.mutex.RUnlock()
if task, exists := mq.tasks[taskID]; exists {
oldProgress := task.Progress
task.Progress = progress
task.Status = TaskStatusInProgress
// Log progress at significant milestones or changes
if progress == 0 {
glog.V(1).Infof("Task started: %s (%s) worker %s, volume %d",
taskID, task.Type, task.WorkerID, task.VolumeID)
} else if progress >= 100 {
glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete",
taskID, task.Type, task.WorkerID, progress)
} else if progress-oldProgress >= 25 { // Log every 25% increment
glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete",
taskID, task.Type, task.WorkerID, progress)
}
} else {
glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress)
}
}
@ -224,12 +273,25 @@ func (mq *MaintenanceQueue) RegisterWorker(worker *MaintenanceWorker) {
mq.mutex.Lock()
defer mq.mutex.Unlock()
isNewWorker := true
if existingWorker, exists := mq.workers[worker.ID]; exists {
isNewWorker = false
glog.Infof("Worker reconnected: %s at %s (capabilities: %v, max concurrent: %d)",
worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent)
// Preserve current load when reconnecting
worker.CurrentLoad = existingWorker.CurrentLoad
} else {
glog.Infof("Worker registered: %s at %s (capabilities: %v, max concurrent: %d)",
worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent)
}
worker.LastHeartbeat = time.Now()
worker.Status = "active"
if isNewWorker {
worker.CurrentLoad = 0
}
mq.workers[worker.ID] = worker
glog.V(1).Infof("Registered maintenance worker %s at %s", worker.ID, worker.Address)
}
// UpdateWorkerHeartbeat updates worker heartbeat
@ -238,7 +300,15 @@ func (mq *MaintenanceQueue) UpdateWorkerHeartbeat(workerID string) {
defer mq.mutex.Unlock()
if worker, exists := mq.workers[workerID]; exists {
lastSeen := worker.LastHeartbeat
worker.LastHeartbeat = time.Now()
// Log if worker was offline for a while
if time.Since(lastSeen) > 2*time.Minute {
glog.Infof("Worker %s heartbeat resumed after %v", workerID, time.Since(lastSeen))
}
} else {
glog.V(2).Infof("Heartbeat from unknown worker: %s", workerID)
}
}

31
weed/worker/client.go

@ -115,9 +115,16 @@ func (c *GrpcAdminClient) attemptConnection() error {
c.stream = stream
c.connected = true
// Start stream handlers
go c.handleOutgoing()
go c.handleIncoming()
// Start stream handlers with synchronization
outgoingReady := make(chan struct{})
incomingReady := make(chan struct{})
go c.handleOutgoingWithReady(outgoingReady)
go c.handleIncomingWithReady(incomingReady)
// Wait for both handlers to be ready
<-outgoingReady
<-incomingReady
glog.Infof("Connected to admin server at %s", c.adminAddress)
return nil
@ -329,6 +336,15 @@ func (c *GrpcAdminClient) handleOutgoing() {
}
}
// handleOutgoingWithReady processes outgoing messages and signals when ready
func (c *GrpcAdminClient) handleOutgoingWithReady(ready chan struct{}) {
// Signal that this handler is ready to process messages
close(ready)
// Now process messages normally
c.handleOutgoing()
}
// handleIncoming processes incoming messages from admin
func (c *GrpcAdminClient) handleIncoming() {
for {
@ -363,6 +379,15 @@ func (c *GrpcAdminClient) handleIncoming() {
}
}
// handleIncomingWithReady processes incoming messages and signals when ready
func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) {
// Signal that this handler is ready to process messages
close(ready)
// Now process messages normally
c.handleIncoming()
}
// RegisterWorker registers the worker with the admin server
func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error {
if !c.connected {

Loading…
Cancel
Save