diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 376f3edc7..3f135ee1b 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "net/http" + "strconv" "time" "github.com/gin-gonic/gin" @@ -878,6 +879,46 @@ func (as *AdminServer) GetMaintenanceTask(c *gin.Context) { c.JSON(http.StatusOK, task) } +// GetMaintenanceTaskDetailAPI returns detailed task information via API +func (as *AdminServer) GetMaintenanceTaskDetailAPI(c *gin.Context) { + taskID := c.Param("id") + taskDetail, err := as.GetMaintenanceTaskDetail(taskID) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "Task detail not found", "details": err.Error()}) + return + } + + c.JSON(http.StatusOK, taskDetail) +} + +// ShowMaintenanceTaskDetail renders the task detail page +func (as *AdminServer) ShowMaintenanceTaskDetail(c *gin.Context) { + username := c.GetString("username") + if username == "" { + username = "admin" // Default fallback + } + + taskID := c.Param("id") + taskDetail, err := as.GetMaintenanceTaskDetail(taskID) + if err != nil { + c.HTML(http.StatusNotFound, "error.html", gin.H{ + "error": "Task not found", + "details": err.Error(), + }) + return + } + + // Prepare data for template + data := gin.H{ + "username": username, + "task": taskDetail.Task, + "taskDetail": taskDetail, + "title": fmt.Sprintf("Task Detail - %s", taskID), + } + + c.HTML(http.StatusOK, "task_detail.html", data) +} + // CancelMaintenanceTask cancels a pending maintenance task func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) { taskID := c.Param("id") @@ -1041,27 +1082,65 @@ func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, erro // getMaintenanceTasks returns all maintenance tasks func (as *AdminServer) getMaintenanceTasks() ([]*maintenance.MaintenanceTask, error) { if as.maintenanceManager == nil { - return []*MaintenanceTask{}, nil + return []*maintenance.MaintenanceTask{}, nil + } + + // Collect all tasks from memory across all statuses + allTasks := []*maintenance.MaintenanceTask{} + statuses := []maintenance.MaintenanceTaskStatus{ + maintenance.TaskStatusPending, + maintenance.TaskStatusAssigned, + maintenance.TaskStatusInProgress, + maintenance.TaskStatusCompleted, + maintenance.TaskStatusFailed, + maintenance.TaskStatusCancelled, + } + + for _, status := range statuses { + tasks := as.maintenanceManager.GetTasks(status, "", 0) + allTasks = append(allTasks, tasks...) + } + + // Also load any persisted tasks that might not be in memory + if as.configPersistence != nil { + persistedTasks, err := as.configPersistence.LoadAllTaskStates() + if err == nil { + // Add any persisted tasks not already in memory + for _, persistedTask := range persistedTasks { + found := false + for _, memoryTask := range allTasks { + if memoryTask.ID == persistedTask.ID { + found = true + break + } + } + if !found { + allTasks = append(allTasks, persistedTask) + } + } + } } - return as.maintenanceManager.GetTasks(maintenance.TaskStatusPending, "", 0), nil + + return allTasks, nil } // getMaintenanceTask returns a specific maintenance task -func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, error) { +func (as *AdminServer) getMaintenanceTask(taskID string) (*maintenance.MaintenanceTask, error) { if as.maintenanceManager == nil { return nil, fmt.Errorf("maintenance manager not initialized") } // Search for the task across all statuses since we don't know which status it has - statuses := []MaintenanceTaskStatus{ - TaskStatusPending, - TaskStatusAssigned, - TaskStatusInProgress, - TaskStatusCompleted, - TaskStatusFailed, - TaskStatusCancelled, + statuses := []maintenance.MaintenanceTaskStatus{ + maintenance.TaskStatusPending, + maintenance.TaskStatusAssigned, + maintenance.TaskStatusInProgress, + maintenance.TaskStatusCompleted, + maintenance.TaskStatusFailed, + maintenance.TaskStatusCancelled, } + // First, search for the task in memory across all statuses for _, status := range statuses { tasks := as.maintenanceManager.GetTasks(status, "", 0) // Get all tasks with this status for _, task := range tasks { @@ -1071,9 +1150,133 @@ func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, erro } } + // If not found in memory, try to load from persistent storage + if as.configPersistence != nil { + task, err := as.configPersistence.LoadTaskState(taskID) + if err == nil { + glog.V(2).Infof("Loaded task %s from persistent storage", taskID) + return task, nil + } + glog.V(2).Infof("Task %s not found in persistent storage: %v", taskID, err) + } + return nil, fmt.Errorf("task %s not found", taskID) } +// GetMaintenanceTaskDetail returns comprehensive task details including logs and assignment history +func (as *AdminServer) GetMaintenanceTaskDetail(taskID string) (*maintenance.TaskDetailData, error) { + // Get basic task information + task, err := as.getMaintenanceTask(taskID) + if err != nil { + return nil, err + } + + // Create task detail structure from the loaded task + taskDetail := &maintenance.TaskDetailData{ + Task: task, + AssignmentHistory: task.AssignmentHistory, // Use assignment history from persisted task + ExecutionLogs: []*maintenance.TaskExecutionLog{}, + RelatedTasks: []*maintenance.MaintenanceTask{}, + LastUpdated: time.Now(), + } + + if taskDetail.AssignmentHistory == nil { + taskDetail.AssignmentHistory = []*maintenance.TaskAssignmentRecord{} + } + + // Get worker information if task is assigned + if task.WorkerID != "" { + workers := as.maintenanceManager.GetWorkers() + for _, worker := range workers { + if worker.ID == task.WorkerID { + taskDetail.WorkerInfo = worker + break + } + } + } + + // Get execution logs from worker if task is active/completed and worker is connected + if task.Status == maintenance.TaskStatusInProgress || task.Status == maintenance.TaskStatusCompleted { + if as.workerGrpcServer != nil && task.WorkerID != "" { + workerLogs, err := as.workerGrpcServer.RequestTaskLogs(task.WorkerID, taskID, 100, "") + if err == nil && len(workerLogs) > 0 { + // Convert worker logs to maintenance logs + for _, workerLog := range workerLogs { + maintenanceLog := &maintenance.TaskExecutionLog{ + Timestamp: time.Unix(workerLog.Timestamp, 0), + Level: workerLog.Level, + Message: workerLog.Message, + Source: "worker", + TaskID: taskID, + WorkerID: task.WorkerID, + } + // carry structured fields if present + if len(workerLog.Fields) > 0 { + maintenanceLog.Fields = make(map[string]string, len(workerLog.Fields)) + for k, v := range workerLog.Fields { + maintenanceLog.Fields[k] = v + } + } + // carry optional progress/status + if workerLog.Progress != 0 { + p := float64(workerLog.Progress) + maintenanceLog.Progress = &p + } + if workerLog.Status != "" { + maintenanceLog.Status = workerLog.Status + } + taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, maintenanceLog) + } + } else if err != nil { + // Add a diagnostic log entry when worker logs cannot be retrieved + diagnosticLog := &maintenance.TaskExecutionLog{ + Timestamp: time.Now(), + Level: "WARNING", + Message: fmt.Sprintf("Failed to retrieve worker logs: %v", err), + Source: "admin", + TaskID: taskID, + WorkerID: task.WorkerID, + } + taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, diagnosticLog) + glog.V(1).Infof("Failed to get worker logs for task %s from worker %s: %v", taskID, task.WorkerID, err) + } + } else { + // Add diagnostic information when worker is not available + reason := "worker gRPC server not available" + if task.WorkerID == "" { + reason = "no worker assigned to task" + } + diagnosticLog := &maintenance.TaskExecutionLog{ + Timestamp: time.Now(), + Level: "INFO", + Message: fmt.Sprintf("Worker logs not available: %s", reason), + Source: "admin", + TaskID: taskID, + WorkerID: task.WorkerID, + } + taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, diagnosticLog) + } + } + + // Get related tasks (other tasks on same volume/server) + if task.VolumeID != 0 || task.Server != "" { + allTasks := as.maintenanceManager.GetTasks("", "", 50) // Get recent tasks + for _, relatedTask := range allTasks { + if relatedTask.ID != taskID && + (relatedTask.VolumeID == task.VolumeID || relatedTask.Server == task.Server) { + taskDetail.RelatedTasks = append(taskDetail.RelatedTasks, relatedTask) + } + } + } + + // Save updated task detail to disk + if err := as.configPersistence.SaveTaskDetail(taskID, taskDetail); err != nil { + glog.V(1).Infof("Failed to save task detail for %s: %v", taskID, err) + } + + return taskDetail, nil +} + // getMaintenanceWorkers returns all maintenance workers func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) { if as.maintenanceManager == nil { @@ -1157,6 +1360,34 @@ func (as *AdminServer) getMaintenanceWorkerDetails(workerID string) (*WorkerDeta }, nil } +// GetWorkerLogs fetches logs from a specific worker for a task +func (as *AdminServer) GetWorkerLogs(c *gin.Context) { + workerID := c.Param("id") + taskID := c.Query("taskId") + maxEntriesStr := c.DefaultQuery("maxEntries", "100") + logLevel := c.DefaultQuery("logLevel", "") + + maxEntries := int32(100) + if maxEntriesStr != "" { + if parsed, err := strconv.ParseInt(maxEntriesStr, 10, 32); err == nil { + maxEntries = int32(parsed) + } + } + + if as.workerGrpcServer == nil { + c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Worker gRPC server not available"}) + return + } + + logs, err := as.workerGrpcServer.RequestTaskLogs(workerID, taskID, maxEntries, logLevel) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": fmt.Sprintf("Failed to get logs from worker: %v", err)}) + return + } + + c.JSON(http.StatusOK, gin.H{"worker_id": workerID, "task_id": taskID, "logs": logs, "count": len(logs)}) +} + // getMaintenanceStats returns maintenance statistics func (as *AdminServer) getMaintenanceStats() (*MaintenanceStats, error) { if as.maintenanceManager == nil { @@ -1376,6 +1607,20 @@ func (s *AdminServer) GetWorkerGrpcServer() *WorkerGrpcServer { // InitMaintenanceManager initializes the maintenance manager func (s *AdminServer) InitMaintenanceManager(config *maintenance.MaintenanceConfig) { s.maintenanceManager = maintenance.NewMaintenanceManager(s, config) + + // Set up task persistence if config persistence is available + if s.configPersistence != nil { + queue := s.maintenanceManager.GetQueue() + if queue != nil { + queue.SetPersistence(s.configPersistence) + + // Load tasks from persistence on startup + if err := queue.LoadTasksFromPersistence(); err != nil { + glog.Errorf("Failed to load tasks from persistence: %v", err) + } + } + } + glog.V(1).Infof("Maintenance manager initialized (enabled: %v)", config.Enabled) } diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go index b6b3074ab..1fe1a9b42 100644 --- a/weed/admin/dash/config_persistence.go +++ b/weed/admin/dash/config_persistence.go @@ -1,11 +1,15 @@ package dash import ( + "encoding/json" "fmt" "os" "path/filepath" + "sort" + "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/admin/maintenance" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" @@ -33,6 +37,12 @@ const ( BalanceTaskConfigJSONFile = "task_balance.json" ReplicationTaskConfigJSONFile = "task_replication.json" + // Task persistence subdirectories and settings + TasksSubdir = "tasks" + TaskDetailsSubdir = "task_details" + TaskLogsSubdir = "task_logs" + MaxCompletedTasks = 10 // Only keep last 10 completed tasks + ConfigDirPermissions = 0755 ConfigFilePermissions = 0644 ) @@ -45,6 +55,35 @@ type ( ReplicationTaskConfig = worker_pb.ReplicationTaskConfig ) +// isValidTaskID validates that a task ID is safe for use in file paths +// This prevents path traversal attacks by ensuring the task ID doesn't contain +// path separators or parent directory references +func isValidTaskID(taskID string) bool { + if taskID == "" { + return false + } + + // Reject task IDs with leading or trailing whitespace + if strings.TrimSpace(taskID) != taskID { + return false + } + + // Check for path traversal patterns + if strings.Contains(taskID, "/") || + strings.Contains(taskID, "\\") || + strings.Contains(taskID, "..") || + strings.Contains(taskID, ":") { + return false + } + + // Additional safety: ensure it's not just dots or empty after trim + if taskID == "." || taskID == ".." { + return false + } + + return true +} + // ConfigPersistence handles saving and loading configuration files type ConfigPersistence struct { dataDir string @@ -688,3 +727,509 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy { glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies)) return policy } + +// SaveTaskDetail saves detailed task information to disk +func (cp *ConfigPersistence) SaveTaskDetail(taskID string, detail *maintenance.TaskDetailData) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot save task detail") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + taskDetailDir := filepath.Join(cp.dataDir, TaskDetailsSubdir) + if err := os.MkdirAll(taskDetailDir, ConfigDirPermissions); err != nil { + return fmt.Errorf("failed to create task details directory: %w", err) + } + + // Save task detail as JSON for easy reading and debugging + taskDetailPath := filepath.Join(taskDetailDir, fmt.Sprintf("%s.json", taskID)) + jsonData, err := json.MarshalIndent(detail, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal task detail to JSON: %w", err) + } + + if err := os.WriteFile(taskDetailPath, jsonData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write task detail file: %w", err) + } + + glog.V(2).Infof("Saved task detail for task %s to %s", taskID, taskDetailPath) + return nil +} + +// LoadTaskDetail loads detailed task information from disk +func (cp *ConfigPersistence) LoadTaskDetail(taskID string) (*maintenance.TaskDetailData, error) { + if cp.dataDir == "" { + return nil, fmt.Errorf("no data directory specified, cannot load task detail") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return nil, fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + taskDetailPath := filepath.Join(cp.dataDir, TaskDetailsSubdir, fmt.Sprintf("%s.json", taskID)) + if _, err := os.Stat(taskDetailPath); os.IsNotExist(err) { + return nil, fmt.Errorf("task detail file not found: %s", taskID) + } + + jsonData, err := os.ReadFile(taskDetailPath) + if err != nil { + return nil, fmt.Errorf("failed to read task detail file: %w", err) + } + + var detail maintenance.TaskDetailData + if err := json.Unmarshal(jsonData, &detail); err != nil { + return nil, fmt.Errorf("failed to unmarshal task detail JSON: %w", err) + } + + glog.V(2).Infof("Loaded task detail for task %s from %s", taskID, taskDetailPath) + return &detail, nil +} + +// SaveTaskExecutionLogs saves execution logs for a task +func (cp *ConfigPersistence) SaveTaskExecutionLogs(taskID string, logs []*maintenance.TaskExecutionLog) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot save task logs") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + taskLogsDir := filepath.Join(cp.dataDir, TaskLogsSubdir) + if err := os.MkdirAll(taskLogsDir, ConfigDirPermissions); err != nil { + return fmt.Errorf("failed to create task logs directory: %w", err) + } + + // Save logs as JSON for easy reading + taskLogsPath := filepath.Join(taskLogsDir, fmt.Sprintf("%s.json", taskID)) + logsData := struct { + TaskID string `json:"task_id"` + Logs []*maintenance.TaskExecutionLog `json:"logs"` + }{ + TaskID: taskID, + Logs: logs, + } + jsonData, err := json.MarshalIndent(logsData, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal task logs to JSON: %w", err) + } + + if err := os.WriteFile(taskLogsPath, jsonData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write task logs file: %w", err) + } + + glog.V(2).Infof("Saved %d execution logs for task %s to %s", len(logs), taskID, taskLogsPath) + return nil +} + +// LoadTaskExecutionLogs loads execution logs for a task +func (cp *ConfigPersistence) LoadTaskExecutionLogs(taskID string) ([]*maintenance.TaskExecutionLog, error) { + if cp.dataDir == "" { + return nil, fmt.Errorf("no data directory specified, cannot load task logs") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return nil, fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + taskLogsPath := filepath.Join(cp.dataDir, TaskLogsSubdir, fmt.Sprintf("%s.json", taskID)) + if _, err := os.Stat(taskLogsPath); os.IsNotExist(err) { + // Return empty slice if logs don't exist yet + return []*maintenance.TaskExecutionLog{}, nil + } + + jsonData, err := os.ReadFile(taskLogsPath) + if err != nil { + return nil, fmt.Errorf("failed to read task logs file: %w", err) + } + + var logsData struct { + TaskID string `json:"task_id"` + Logs []*maintenance.TaskExecutionLog `json:"logs"` + } + if err := json.Unmarshal(jsonData, &logsData); err != nil { + return nil, fmt.Errorf("failed to unmarshal task logs JSON: %w", err) + } + + glog.V(2).Infof("Loaded %d execution logs for task %s from %s", len(logsData.Logs), taskID, taskLogsPath) + return logsData.Logs, nil +} + +// DeleteTaskDetail removes task detail and logs from disk +func (cp *ConfigPersistence) DeleteTaskDetail(taskID string) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot delete task detail") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + // Delete task detail file + taskDetailPath := filepath.Join(cp.dataDir, TaskDetailsSubdir, fmt.Sprintf("%s.json", taskID)) + if err := os.Remove(taskDetailPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to delete task detail file: %w", err) + } + + // Delete task logs file + taskLogsPath := filepath.Join(cp.dataDir, TaskLogsSubdir, fmt.Sprintf("%s.json", taskID)) + if err := os.Remove(taskLogsPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to delete task logs file: %w", err) + } + + glog.V(2).Infof("Deleted task detail and logs for task %s", taskID) + return nil +} + +// ListTaskDetails returns a list of all task IDs that have stored details +func (cp *ConfigPersistence) ListTaskDetails() ([]string, error) { + if cp.dataDir == "" { + return nil, fmt.Errorf("no data directory specified, cannot list task details") + } + + taskDetailDir := filepath.Join(cp.dataDir, TaskDetailsSubdir) + if _, err := os.Stat(taskDetailDir); os.IsNotExist(err) { + return []string{}, nil + } + + entries, err := os.ReadDir(taskDetailDir) + if err != nil { + return nil, fmt.Errorf("failed to read task details directory: %w", err) + } + + var taskIDs []string + for _, entry := range entries { + if !entry.IsDir() && filepath.Ext(entry.Name()) == ".json" { + taskID := entry.Name()[:len(entry.Name())-5] // Remove .json extension + taskIDs = append(taskIDs, taskID) + } + } + + return taskIDs, nil +} + +// CleanupCompletedTasks removes old completed tasks beyond the retention limit +func (cp *ConfigPersistence) CleanupCompletedTasks() error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot cleanup completed tasks") + } + + tasksDir := filepath.Join(cp.dataDir, TasksSubdir) + if _, err := os.Stat(tasksDir); os.IsNotExist(err) { + return nil // No tasks directory, nothing to cleanup + } + + // Load all tasks and find completed/failed ones + allTasks, err := cp.LoadAllTaskStates() + if err != nil { + return fmt.Errorf("failed to load tasks for cleanup: %w", err) + } + + // Filter completed and failed tasks, sort by completion time + var completedTasks []*maintenance.MaintenanceTask + for _, task := range allTasks { + if (task.Status == maintenance.TaskStatusCompleted || task.Status == maintenance.TaskStatusFailed) && task.CompletedAt != nil { + completedTasks = append(completedTasks, task) + } + } + + // Sort by completion time (most recent first) + sort.Slice(completedTasks, func(i, j int) bool { + return completedTasks[i].CompletedAt.After(*completedTasks[j].CompletedAt) + }) + + // Keep only the most recent MaxCompletedTasks, delete the rest + if len(completedTasks) > MaxCompletedTasks { + tasksToDelete := completedTasks[MaxCompletedTasks:] + for _, task := range tasksToDelete { + if err := cp.DeleteTaskState(task.ID); err != nil { + glog.Warningf("Failed to delete old completed task %s: %v", task.ID, err) + } else { + glog.V(2).Infof("Cleaned up old completed task %s (completed: %v)", task.ID, task.CompletedAt) + } + } + glog.V(1).Infof("Cleaned up %d old completed tasks (keeping %d most recent)", len(tasksToDelete), MaxCompletedTasks) + } + + return nil +} + +// SaveTaskState saves a task state to protobuf file +func (cp *ConfigPersistence) SaveTaskState(task *maintenance.MaintenanceTask) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot save task state") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(task.ID) { + return fmt.Errorf("invalid task ID: %q contains illegal path characters", task.ID) + } + + tasksDir := filepath.Join(cp.dataDir, TasksSubdir) + if err := os.MkdirAll(tasksDir, ConfigDirPermissions); err != nil { + return fmt.Errorf("failed to create tasks directory: %w", err) + } + + taskFilePath := filepath.Join(tasksDir, fmt.Sprintf("%s.pb", task.ID)) + + // Convert task to protobuf + pbTask := cp.maintenanceTaskToProtobuf(task) + taskStateFile := &worker_pb.TaskStateFile{ + Task: pbTask, + LastUpdated: time.Now().Unix(), + AdminVersion: "unknown", // TODO: add version info + } + + pbData, err := proto.Marshal(taskStateFile) + if err != nil { + return fmt.Errorf("failed to marshal task state protobuf: %w", err) + } + + if err := os.WriteFile(taskFilePath, pbData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write task state file: %w", err) + } + + glog.V(2).Infof("Saved task state for task %s to %s", task.ID, taskFilePath) + return nil +} + +// LoadTaskState loads a task state from protobuf file +func (cp *ConfigPersistence) LoadTaskState(taskID string) (*maintenance.MaintenanceTask, error) { + if cp.dataDir == "" { + return nil, fmt.Errorf("no data directory specified, cannot load task state") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return nil, fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + taskFilePath := filepath.Join(cp.dataDir, TasksSubdir, fmt.Sprintf("%s.pb", taskID)) + if _, err := os.Stat(taskFilePath); os.IsNotExist(err) { + return nil, fmt.Errorf("task state file not found: %s", taskID) + } + + pbData, err := os.ReadFile(taskFilePath) + if err != nil { + return nil, fmt.Errorf("failed to read task state file: %w", err) + } + + var taskStateFile worker_pb.TaskStateFile + if err := proto.Unmarshal(pbData, &taskStateFile); err != nil { + return nil, fmt.Errorf("failed to unmarshal task state protobuf: %w", err) + } + + // Convert protobuf to maintenance task + task := cp.protobufToMaintenanceTask(taskStateFile.Task) + + glog.V(2).Infof("Loaded task state for task %s from %s", taskID, taskFilePath) + return task, nil +} + +// LoadAllTaskStates loads all task states from disk +func (cp *ConfigPersistence) LoadAllTaskStates() ([]*maintenance.MaintenanceTask, error) { + if cp.dataDir == "" { + return []*maintenance.MaintenanceTask{}, nil + } + + tasksDir := filepath.Join(cp.dataDir, TasksSubdir) + if _, err := os.Stat(tasksDir); os.IsNotExist(err) { + return []*maintenance.MaintenanceTask{}, nil + } + + entries, err := os.ReadDir(tasksDir) + if err != nil { + return nil, fmt.Errorf("failed to read tasks directory: %w", err) + } + + var tasks []*maintenance.MaintenanceTask + for _, entry := range entries { + if !entry.IsDir() && filepath.Ext(entry.Name()) == ".pb" { + taskID := entry.Name()[:len(entry.Name())-3] // Remove .pb extension + task, err := cp.LoadTaskState(taskID) + if err != nil { + glog.Warningf("Failed to load task state for %s: %v", taskID, err) + continue + } + tasks = append(tasks, task) + } + } + + glog.V(1).Infof("Loaded %d task states from disk", len(tasks)) + return tasks, nil +} + +// DeleteTaskState removes a task state file from disk +func (cp *ConfigPersistence) DeleteTaskState(taskID string) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot delete task state") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + taskFilePath := filepath.Join(cp.dataDir, TasksSubdir, fmt.Sprintf("%s.pb", taskID)) + if err := os.Remove(taskFilePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to delete task state file: %w", err) + } + + glog.V(2).Infof("Deleted task state for task %s", taskID) + return nil +} + +// maintenanceTaskToProtobuf converts a MaintenanceTask to protobuf format +func (cp *ConfigPersistence) maintenanceTaskToProtobuf(task *maintenance.MaintenanceTask) *worker_pb.MaintenanceTaskData { + pbTask := &worker_pb.MaintenanceTaskData{ + Id: task.ID, + Type: string(task.Type), + Priority: cp.priorityToString(task.Priority), + Status: string(task.Status), + VolumeId: task.VolumeID, + Server: task.Server, + Collection: task.Collection, + Reason: task.Reason, + CreatedAt: task.CreatedAt.Unix(), + ScheduledAt: task.ScheduledAt.Unix(), + WorkerId: task.WorkerID, + Error: task.Error, + Progress: task.Progress, + RetryCount: int32(task.RetryCount), + MaxRetries: int32(task.MaxRetries), + CreatedBy: task.CreatedBy, + CreationContext: task.CreationContext, + DetailedReason: task.DetailedReason, + Tags: task.Tags, + } + + // Handle optional timestamps + if task.StartedAt != nil { + pbTask.StartedAt = task.StartedAt.Unix() + } + if task.CompletedAt != nil { + pbTask.CompletedAt = task.CompletedAt.Unix() + } + + // Convert assignment history + if task.AssignmentHistory != nil { + for _, record := range task.AssignmentHistory { + pbRecord := &worker_pb.TaskAssignmentRecord{ + WorkerId: record.WorkerID, + WorkerAddress: record.WorkerAddress, + AssignedAt: record.AssignedAt.Unix(), + Reason: record.Reason, + } + if record.UnassignedAt != nil { + pbRecord.UnassignedAt = record.UnassignedAt.Unix() + } + pbTask.AssignmentHistory = append(pbTask.AssignmentHistory, pbRecord) + } + } + + // Convert typed parameters if available + if task.TypedParams != nil { + pbTask.TypedParams = task.TypedParams + } + + return pbTask +} + +// protobufToMaintenanceTask converts protobuf format to MaintenanceTask +func (cp *ConfigPersistence) protobufToMaintenanceTask(pbTask *worker_pb.MaintenanceTaskData) *maintenance.MaintenanceTask { + task := &maintenance.MaintenanceTask{ + ID: pbTask.Id, + Type: maintenance.MaintenanceTaskType(pbTask.Type), + Priority: cp.stringToPriority(pbTask.Priority), + Status: maintenance.MaintenanceTaskStatus(pbTask.Status), + VolumeID: pbTask.VolumeId, + Server: pbTask.Server, + Collection: pbTask.Collection, + Reason: pbTask.Reason, + CreatedAt: time.Unix(pbTask.CreatedAt, 0), + ScheduledAt: time.Unix(pbTask.ScheduledAt, 0), + WorkerID: pbTask.WorkerId, + Error: pbTask.Error, + Progress: pbTask.Progress, + RetryCount: int(pbTask.RetryCount), + MaxRetries: int(pbTask.MaxRetries), + CreatedBy: pbTask.CreatedBy, + CreationContext: pbTask.CreationContext, + DetailedReason: pbTask.DetailedReason, + Tags: pbTask.Tags, + } + + // Handle optional timestamps + if pbTask.StartedAt > 0 { + startTime := time.Unix(pbTask.StartedAt, 0) + task.StartedAt = &startTime + } + if pbTask.CompletedAt > 0 { + completedTime := time.Unix(pbTask.CompletedAt, 0) + task.CompletedAt = &completedTime + } + + // Convert assignment history + if pbTask.AssignmentHistory != nil { + task.AssignmentHistory = make([]*maintenance.TaskAssignmentRecord, 0, len(pbTask.AssignmentHistory)) + for _, pbRecord := range pbTask.AssignmentHistory { + record := &maintenance.TaskAssignmentRecord{ + WorkerID: pbRecord.WorkerId, + WorkerAddress: pbRecord.WorkerAddress, + AssignedAt: time.Unix(pbRecord.AssignedAt, 0), + Reason: pbRecord.Reason, + } + if pbRecord.UnassignedAt > 0 { + unassignedTime := time.Unix(pbRecord.UnassignedAt, 0) + record.UnassignedAt = &unassignedTime + } + task.AssignmentHistory = append(task.AssignmentHistory, record) + } + } + + // Convert typed parameters if available + if pbTask.TypedParams != nil { + task.TypedParams = pbTask.TypedParams + } + + return task +} + +// priorityToString converts MaintenanceTaskPriority to string for protobuf storage +func (cp *ConfigPersistence) priorityToString(priority maintenance.MaintenanceTaskPriority) string { + switch priority { + case maintenance.PriorityLow: + return "low" + case maintenance.PriorityNormal: + return "normal" + case maintenance.PriorityHigh: + return "high" + case maintenance.PriorityCritical: + return "critical" + default: + return "normal" + } +} + +// stringToPriority converts string from protobuf to MaintenanceTaskPriority +func (cp *ConfigPersistence) stringToPriority(priorityStr string) maintenance.MaintenanceTaskPriority { + switch priorityStr { + case "low": + return maintenance.PriorityLow + case "normal": + return maintenance.PriorityNormal + case "high": + return maintenance.PriorityHigh + case "critical": + return maintenance.PriorityCritical + default: + return maintenance.PriorityNormal + } +} diff --git a/weed/admin/dash/ec_shard_management.go b/weed/admin/dash/ec_shard_management.go index 272890cf0..34574ecdb 100644 --- a/weed/admin/dash/ec_shard_management.go +++ b/weed/admin/dash/ec_shard_management.go @@ -13,6 +13,17 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" ) +// matchesCollection checks if a volume/EC volume collection matches the filter collection. +// Handles the special case where empty collection ("") represents the "default" collection. +func matchesCollection(volumeCollection, filterCollection string) bool { + // Both empty means default collection matches default filter + if volumeCollection == "" && filterCollection == "" { + return true + } + // Direct string match for named collections + return volumeCollection == filterCollection +} + // GetClusterEcShards retrieves cluster EC shards data with pagination, sorting, and filtering func (s *AdminServer) GetClusterEcShards(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterEcShardsData, error) { // Set defaults @@ -403,7 +414,7 @@ func (s *AdminServer) GetClusterEcVolumes(page int, pageSize int, sortBy string, var ecVolumes []EcVolumeWithShards for _, volume := range volumeData { // Filter by collection if specified - if collection == "" || volume.Collection == collection { + if collection == "" || matchesCollection(volume.Collection, collection) { ecVolumes = append(ecVolumes, *volume) } } diff --git a/weed/admin/dash/volume_management.go b/weed/admin/dash/volume_management.go index 5dabe2674..38b1257a4 100644 --- a/weed/admin/dash/volume_management.go +++ b/weed/admin/dash/volume_management.go @@ -83,13 +83,7 @@ func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, s var filteredEcTotalSize int64 for _, volume := range volumes { - // Handle "default" collection filtering for empty collections - volumeCollection := volume.Collection - if volumeCollection == "" { - volumeCollection = "default" - } - - if volumeCollection == collection { + if matchesCollection(volume.Collection, collection) { filteredVolumes = append(filteredVolumes, volume) filteredTotalSize += int64(volume.Size) } @@ -103,13 +97,7 @@ func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, s for _, node := range rack.DataNodeInfos { for _, diskInfo := range node.DiskInfos { for _, ecShardInfo := range diskInfo.EcShardInfos { - // Handle "default" collection filtering for empty collections - ecCollection := ecShardInfo.Collection - if ecCollection == "" { - ecCollection = "default" - } - - if ecCollection == collection { + if matchesCollection(ecShardInfo.Collection, collection) { // Add all shard sizes for this EC volume for _, shardSize := range ecShardInfo.ShardSizes { filteredEcTotalSize += shardSize @@ -500,7 +488,7 @@ func (s *AdminServer) GetClusterVolumeServers() (*ClusterVolumeServersData, erro ecInfo.EcIndexBits |= ecShardInfo.EcIndexBits // Collect shard sizes from this disk - shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) shardBits.EachSetIndex(func(shardId erasure_coding.ShardId) { if size, found := erasure_coding.GetShardSize(ecShardInfo, shardId); found { allShardSizes[shardId] = size diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go index 3b4312235..78ba6d7de 100644 --- a/weed/admin/dash/worker_grpc_server.go +++ b/weed/admin/dash/worker_grpc_server.go @@ -26,6 +26,10 @@ type WorkerGrpcServer struct { connections map[string]*WorkerConnection connMutex sync.RWMutex + // Log request correlation + pendingLogRequests map[string]*LogRequestContext + logRequestsMutex sync.RWMutex + // gRPC server grpcServer *grpc.Server listener net.Listener @@ -33,6 +37,14 @@ type WorkerGrpcServer struct { stopChan chan struct{} } +// LogRequestContext tracks pending log requests +type LogRequestContext struct { + TaskID string + WorkerID string + ResponseCh chan *worker_pb.TaskLogResponse + Timeout time.Time +} + // WorkerConnection represents an active worker connection type WorkerConnection struct { workerID string @@ -49,9 +61,10 @@ type WorkerConnection struct { // NewWorkerGrpcServer creates a new gRPC server for worker connections func NewWorkerGrpcServer(adminServer *AdminServer) *WorkerGrpcServer { return &WorkerGrpcServer{ - adminServer: adminServer, - connections: make(map[string]*WorkerConnection), - stopChan: make(chan struct{}), + adminServer: adminServer, + connections: make(map[string]*WorkerConnection), + pendingLogRequests: make(map[string]*LogRequestContext), + stopChan: make(chan struct{}), } } @@ -264,6 +277,9 @@ func (s *WorkerGrpcServer) handleWorkerMessage(conn *WorkerConnection, msg *work case *worker_pb.WorkerMessage_TaskComplete: s.handleTaskCompletion(conn, m.TaskComplete) + case *worker_pb.WorkerMessage_TaskLogResponse: + s.handleTaskLogResponse(conn, m.TaskLogResponse) + case *worker_pb.WorkerMessage_Shutdown: glog.Infof("Worker %s shutting down: %s", workerID, m.Shutdown.Reason) s.unregisterWorker(workerID) @@ -341,8 +357,13 @@ func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *wo // Create basic params if none exist taskParams = &worker_pb.TaskParams{ VolumeId: task.VolumeID, - Server: task.Server, Collection: task.Collection, + Sources: []*worker_pb.TaskSource{ + { + Node: task.Server, + VolumeId: task.VolumeID, + }, + }, } } @@ -396,6 +417,35 @@ func (s *WorkerGrpcServer) handleTaskCompletion(conn *WorkerConnection, completi } } +// handleTaskLogResponse processes task log responses from workers +func (s *WorkerGrpcServer) handleTaskLogResponse(conn *WorkerConnection, response *worker_pb.TaskLogResponse) { + requestKey := fmt.Sprintf("%s:%s", response.WorkerId, response.TaskId) + + s.logRequestsMutex.RLock() + requestContext, exists := s.pendingLogRequests[requestKey] + s.logRequestsMutex.RUnlock() + + if !exists { + glog.Warningf("Received unexpected log response for task %s from worker %s", response.TaskId, response.WorkerId) + return + } + + glog.V(1).Infof("Received log response for task %s from worker %s: %d entries", response.TaskId, response.WorkerId, len(response.LogEntries)) + + // Send response to waiting channel + select { + case requestContext.ResponseCh <- response: + // Response delivered successfully + case <-time.After(time.Second): + glog.Warningf("Failed to deliver log response for task %s from worker %s: timeout", response.TaskId, response.WorkerId) + } + + // Clean up the pending request + s.logRequestsMutex.Lock() + delete(s.pendingLogRequests, requestKey) + s.logRequestsMutex.Unlock() +} + // unregisterWorker removes a worker connection func (s *WorkerGrpcServer) unregisterWorker(workerID string) { s.connMutex.Lock() @@ -453,6 +503,112 @@ func (s *WorkerGrpcServer) GetConnectedWorkers() []string { return workers } +// RequestTaskLogs requests execution logs from a worker for a specific task +func (s *WorkerGrpcServer) RequestTaskLogs(workerID, taskID string, maxEntries int32, logLevel string) ([]*worker_pb.TaskLogEntry, error) { + s.connMutex.RLock() + conn, exists := s.connections[workerID] + s.connMutex.RUnlock() + + if !exists { + return nil, fmt.Errorf("worker %s is not connected", workerID) + } + + // Create response channel for this request + responseCh := make(chan *worker_pb.TaskLogResponse, 1) + requestKey := fmt.Sprintf("%s:%s", workerID, taskID) + + // Register pending request + requestContext := &LogRequestContext{ + TaskID: taskID, + WorkerID: workerID, + ResponseCh: responseCh, + Timeout: time.Now().Add(10 * time.Second), + } + + s.logRequestsMutex.Lock() + s.pendingLogRequests[requestKey] = requestContext + s.logRequestsMutex.Unlock() + + // Create log request message + logRequest := &worker_pb.AdminMessage{ + AdminId: "admin-server", + Timestamp: time.Now().Unix(), + Message: &worker_pb.AdminMessage_TaskLogRequest{ + TaskLogRequest: &worker_pb.TaskLogRequest{ + TaskId: taskID, + WorkerId: workerID, + IncludeMetadata: true, + MaxEntries: maxEntries, + LogLevel: logLevel, + }, + }, + } + + // Send the request through the worker's outgoing channel + select { + case conn.outgoing <- logRequest: + glog.V(1).Infof("Log request sent to worker %s for task %s", workerID, taskID) + case <-time.After(5 * time.Second): + // Clean up pending request on timeout + s.logRequestsMutex.Lock() + delete(s.pendingLogRequests, requestKey) + s.logRequestsMutex.Unlock() + return nil, fmt.Errorf("timeout sending log request to worker %s", workerID) + } + + // Wait for response + select { + case response := <-responseCh: + if !response.Success { + return nil, fmt.Errorf("worker log request failed: %s", response.ErrorMessage) + } + glog.V(1).Infof("Received %d log entries for task %s from worker %s", len(response.LogEntries), taskID, workerID) + return response.LogEntries, nil + case <-time.After(10 * time.Second): + // Clean up pending request on timeout + s.logRequestsMutex.Lock() + delete(s.pendingLogRequests, requestKey) + s.logRequestsMutex.Unlock() + return nil, fmt.Errorf("timeout waiting for log response from worker %s", workerID) + } +} + +// RequestTaskLogsFromAllWorkers requests logs for a task from all connected workers +func (s *WorkerGrpcServer) RequestTaskLogsFromAllWorkers(taskID string, maxEntries int32, logLevel string) (map[string][]*worker_pb.TaskLogEntry, error) { + s.connMutex.RLock() + workerIDs := make([]string, 0, len(s.connections)) + for workerID := range s.connections { + workerIDs = append(workerIDs, workerID) + } + s.connMutex.RUnlock() + + results := make(map[string][]*worker_pb.TaskLogEntry) + + for _, workerID := range workerIDs { + logs, err := s.RequestTaskLogs(workerID, taskID, maxEntries, logLevel) + if err != nil { + glog.V(1).Infof("Failed to get logs from worker %s for task %s: %v", workerID, taskID, err) + // Store empty result with error information for debugging + results[workerID+"_error"] = []*worker_pb.TaskLogEntry{ + { + Timestamp: time.Now().Unix(), + Level: "ERROR", + Message: fmt.Sprintf("Failed to retrieve logs from worker %s: %v", workerID, err), + Fields: map[string]string{"source": "admin"}, + }, + } + continue + } + if len(logs) > 0 { + results[workerID] = logs + } else { + glog.V(2).Infof("No logs found for task %s on worker %s", taskID, workerID) + } + } + + return results, nil +} + // convertTaskParameters converts task parameters to protobuf format func convertTaskParameters(params map[string]interface{}) map[string]string { result := make(map[string]string) diff --git a/weed/admin/handlers/admin_handlers.go b/weed/admin/handlers/admin_handlers.go index d28dc9e53..215e2a4e5 100644 --- a/weed/admin/handlers/admin_handlers.go +++ b/weed/admin/handlers/admin_handlers.go @@ -94,6 +94,7 @@ func (h *AdminHandlers) SetupRoutes(r *gin.Engine, authRequired bool, username, protected.POST("/maintenance/config", h.maintenanceHandlers.UpdateMaintenanceConfig) protected.GET("/maintenance/config/:taskType", h.maintenanceHandlers.ShowTaskConfig) protected.POST("/maintenance/config/:taskType", h.maintenanceHandlers.UpdateTaskConfig) + protected.GET("/maintenance/tasks/:id", h.maintenanceHandlers.ShowTaskDetail) // API routes for AJAX calls api := r.Group("/api") @@ -164,9 +165,11 @@ func (h *AdminHandlers) SetupRoutes(r *gin.Engine, authRequired bool, username, maintenanceApi.POST("/scan", h.adminServer.TriggerMaintenanceScan) maintenanceApi.GET("/tasks", h.adminServer.GetMaintenanceTasks) maintenanceApi.GET("/tasks/:id", h.adminServer.GetMaintenanceTask) + maintenanceApi.GET("/tasks/:id/detail", h.adminServer.GetMaintenanceTaskDetailAPI) maintenanceApi.POST("/tasks/:id/cancel", h.adminServer.CancelMaintenanceTask) maintenanceApi.GET("/workers", h.adminServer.GetMaintenanceWorkersAPI) maintenanceApi.GET("/workers/:id", h.adminServer.GetMaintenanceWorker) + maintenanceApi.GET("/workers/:id/logs", h.adminServer.GetWorkerLogs) maintenanceApi.GET("/stats", h.adminServer.GetMaintenanceStats) maintenanceApi.GET("/config", h.adminServer.GetMaintenanceConfigAPI) maintenanceApi.PUT("/config", h.adminServer.UpdateMaintenanceConfigAPI) @@ -218,6 +221,7 @@ func (h *AdminHandlers) SetupRoutes(r *gin.Engine, authRequired bool, username, r.POST("/maintenance/config", h.maintenanceHandlers.UpdateMaintenanceConfig) r.GET("/maintenance/config/:taskType", h.maintenanceHandlers.ShowTaskConfig) r.POST("/maintenance/config/:taskType", h.maintenanceHandlers.UpdateTaskConfig) + r.GET("/maintenance/tasks/:id", h.maintenanceHandlers.ShowTaskDetail) // API routes for AJAX calls api := r.Group("/api") @@ -287,9 +291,11 @@ func (h *AdminHandlers) SetupRoutes(r *gin.Engine, authRequired bool, username, maintenanceApi.POST("/scan", h.adminServer.TriggerMaintenanceScan) maintenanceApi.GET("/tasks", h.adminServer.GetMaintenanceTasks) maintenanceApi.GET("/tasks/:id", h.adminServer.GetMaintenanceTask) + maintenanceApi.GET("/tasks/:id/detail", h.adminServer.GetMaintenanceTaskDetailAPI) maintenanceApi.POST("/tasks/:id/cancel", h.adminServer.CancelMaintenanceTask) maintenanceApi.GET("/workers", h.adminServer.GetMaintenanceWorkersAPI) maintenanceApi.GET("/workers/:id", h.adminServer.GetMaintenanceWorker) + maintenanceApi.GET("/workers/:id/logs", h.adminServer.GetWorkerLogs) maintenanceApi.GET("/stats", h.adminServer.GetMaintenanceStats) maintenanceApi.GET("/config", h.adminServer.GetMaintenanceConfigAPI) maintenanceApi.PUT("/config", h.adminServer.UpdateMaintenanceConfigAPI) diff --git a/weed/admin/handlers/cluster_handlers.go b/weed/admin/handlers/cluster_handlers.go index 38eebee8b..ee6417954 100644 --- a/weed/admin/handlers/cluster_handlers.go +++ b/weed/admin/handlers/cluster_handlers.go @@ -169,6 +169,12 @@ func (h *ClusterHandlers) ShowCollectionDetails(c *gin.Context) { return } + // Map "default" collection to empty string for backend filtering + actualCollectionName := collectionName + if collectionName == "default" { + actualCollectionName = "" + } + // Parse query parameters page, _ := strconv.Atoi(c.DefaultQuery("page", "1")) pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "25")) @@ -176,7 +182,7 @@ func (h *ClusterHandlers) ShowCollectionDetails(c *gin.Context) { sortOrder := c.DefaultQuery("sort_order", "asc") // Get collection details data (volumes and EC volumes) - collectionDetailsData, err := h.adminServer.GetCollectionDetails(collectionName, page, pageSize, sortBy, sortOrder) + collectionDetailsData, err := h.adminServer.GetCollectionDetails(actualCollectionName, page, pageSize, sortBy, sortOrder) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get collection details: " + err.Error()}) return diff --git a/weed/admin/handlers/maintenance_handlers.go b/weed/admin/handlers/maintenance_handlers.go index 1e2337272..e92a50c9d 100644 --- a/weed/admin/handlers/maintenance_handlers.go +++ b/weed/admin/handlers/maintenance_handlers.go @@ -1,6 +1,7 @@ package handlers import ( + "context" "fmt" "net/http" "reflect" @@ -34,35 +35,82 @@ func NewMaintenanceHandlers(adminServer *dash.AdminServer) *MaintenanceHandlers } } -// ShowMaintenanceQueue displays the maintenance queue page -func (h *MaintenanceHandlers) ShowMaintenanceQueue(c *gin.Context) { - data, err := h.getMaintenanceQueueData() +// ShowTaskDetail displays the task detail page +func (h *MaintenanceHandlers) ShowTaskDetail(c *gin.Context) { + taskID := c.Param("id") + glog.Infof("DEBUG ShowTaskDetail: Starting for task ID: %s", taskID) + + taskDetail, err := h.adminServer.GetMaintenanceTaskDetail(taskID) if err != nil { - glog.Infof("DEBUG ShowMaintenanceQueue: error getting data: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + glog.Errorf("DEBUG ShowTaskDetail: error getting task detail for %s: %v", taskID, err) + c.String(http.StatusNotFound, "Task not found: %s (Error: %v)", taskID, err) return } - glog.Infof("DEBUG ShowMaintenanceQueue: got data with %d tasks", len(data.Tasks)) - if data.Stats != nil { - glog.Infof("DEBUG ShowMaintenanceQueue: stats = {pending: %d, running: %d, completed: %d}", - data.Stats.PendingTasks, data.Stats.RunningTasks, data.Stats.CompletedToday) - } else { - glog.Infof("DEBUG ShowMaintenanceQueue: stats is nil") - } + glog.Infof("DEBUG ShowTaskDetail: got task detail for %s, task type: %s, status: %s", taskID, taskDetail.Task.Type, taskDetail.Task.Status) - // Render HTML template c.Header("Content-Type", "text/html") - maintenanceComponent := app.MaintenanceQueue(data) - layoutComponent := layout.Layout(c, maintenanceComponent) + taskDetailComponent := app.TaskDetail(taskDetail) + layoutComponent := layout.Layout(c, taskDetailComponent) err = layoutComponent.Render(c.Request.Context(), c.Writer) if err != nil { - glog.Infof("DEBUG ShowMaintenanceQueue: render error: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to render template: " + err.Error()}) + glog.Errorf("DEBUG ShowTaskDetail: render error: %v", err) + c.String(http.StatusInternalServerError, "Failed to render template: %v", err) return } - glog.Infof("DEBUG ShowMaintenanceQueue: template rendered successfully") + glog.Infof("DEBUG ShowTaskDetail: template rendered successfully for task %s", taskID) +} + +// ShowMaintenanceQueue displays the maintenance queue page +func (h *MaintenanceHandlers) ShowMaintenanceQueue(c *gin.Context) { + // Add timeout to prevent hanging + ctx, cancel := context.WithTimeout(c.Request.Context(), 30*time.Second) + defer cancel() + + // Use a channel to handle timeout for data retrieval + type result struct { + data *maintenance.MaintenanceQueueData + err error + } + resultChan := make(chan result, 1) + + go func() { + data, err := h.getMaintenanceQueueData() + resultChan <- result{data: data, err: err} + }() + + select { + case res := <-resultChan: + if res.err != nil { + glog.V(1).Infof("ShowMaintenanceQueue: error getting data: %v", res.err) + c.JSON(http.StatusInternalServerError, gin.H{"error": res.err.Error()}) + return + } + + glog.V(2).Infof("ShowMaintenanceQueue: got data with %d tasks", len(res.data.Tasks)) + + // Render HTML template + c.Header("Content-Type", "text/html") + maintenanceComponent := app.MaintenanceQueue(res.data) + layoutComponent := layout.Layout(c, maintenanceComponent) + err := layoutComponent.Render(ctx, c.Writer) + if err != nil { + glog.V(1).Infof("ShowMaintenanceQueue: render error: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to render template: " + err.Error()}) + return + } + + glog.V(3).Infof("ShowMaintenanceQueue: template rendered successfully") + + case <-ctx.Done(): + glog.Warningf("ShowMaintenanceQueue: timeout waiting for data") + c.JSON(http.StatusRequestTimeout, gin.H{ + "error": "Request timeout - maintenance data retrieval took too long. This may indicate a system issue.", + "suggestion": "Try refreshing the page or contact system administrator if the problem persists.", + }) + return + } } // ShowMaintenanceWorkers displays the maintenance workers page @@ -479,7 +527,7 @@ func (h *MaintenanceHandlers) getMaintenanceQueueStats() (*maintenance.QueueStat } func (h *MaintenanceHandlers) getMaintenanceTasks() ([]*maintenance.MaintenanceTask, error) { - // Call the maintenance manager directly to get all tasks + // Call the maintenance manager directly to get recent tasks (limit for performance) if h.adminServer == nil { return []*maintenance.MaintenanceTask{}, nil } @@ -489,8 +537,9 @@ func (h *MaintenanceHandlers) getMaintenanceTasks() ([]*maintenance.MaintenanceT return []*maintenance.MaintenanceTask{}, nil } - // Get ALL tasks using empty parameters - this should match what the API returns - allTasks := manager.GetTasks("", "", 0) + // Get recent tasks only (last 100) to prevent slow page loads + // Users can view more tasks via pagination if needed + allTasks := manager.GetTasks("", "", 100) return allTasks, nil } diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index ca402bd4d..d39c96a30 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -7,7 +7,6 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" ) // NewMaintenanceQueue creates a new maintenance queue @@ -27,6 +26,102 @@ func (mq *MaintenanceQueue) SetIntegration(integration *MaintenanceIntegration) glog.V(1).Infof("Maintenance queue configured with integration") } +// SetPersistence sets the task persistence interface +func (mq *MaintenanceQueue) SetPersistence(persistence TaskPersistence) { + mq.persistence = persistence + glog.V(1).Infof("Maintenance queue configured with task persistence") +} + +// LoadTasksFromPersistence loads tasks from persistent storage on startup +func (mq *MaintenanceQueue) LoadTasksFromPersistence() error { + if mq.persistence == nil { + glog.V(1).Infof("No task persistence configured, skipping task loading") + return nil + } + + mq.mutex.Lock() + defer mq.mutex.Unlock() + + glog.Infof("Loading tasks from persistence...") + + tasks, err := mq.persistence.LoadAllTaskStates() + if err != nil { + return fmt.Errorf("failed to load task states: %w", err) + } + + glog.Infof("DEBUG LoadTasksFromPersistence: Found %d tasks in persistence", len(tasks)) + + // Reset task maps + mq.tasks = make(map[string]*MaintenanceTask) + mq.pendingTasks = make([]*MaintenanceTask, 0) + + // Load tasks by status + for _, task := range tasks { + glog.Infof("DEBUG LoadTasksFromPersistence: Loading task %s (type: %s, status: %s, scheduled: %v)", task.ID, task.Type, task.Status, task.ScheduledAt) + mq.tasks[task.ID] = task + + switch task.Status { + case TaskStatusPending: + glog.Infof("DEBUG LoadTasksFromPersistence: Adding task %s to pending queue", task.ID) + mq.pendingTasks = append(mq.pendingTasks, task) + case TaskStatusAssigned, TaskStatusInProgress: + // For assigned/in-progress tasks, we need to check if the worker is still available + // If not, we should fail them and make them eligible for retry + if task.WorkerID != "" { + if _, exists := mq.workers[task.WorkerID]; !exists { + glog.Warningf("Task %s was assigned to unavailable worker %s, marking as failed", task.ID, task.WorkerID) + task.Status = TaskStatusFailed + task.Error = "Worker unavailable after restart" + completedTime := time.Now() + task.CompletedAt = &completedTime + + // Check if it should be retried + if task.RetryCount < task.MaxRetries { + task.RetryCount++ + task.Status = TaskStatusPending + task.WorkerID = "" + task.StartedAt = nil + task.CompletedAt = nil + task.Error = "" + task.ScheduledAt = time.Now().Add(1 * time.Minute) // Retry after restart delay + glog.Infof("DEBUG LoadTasksFromPersistence: Retrying task %s, adding to pending queue", task.ID) + mq.pendingTasks = append(mq.pendingTasks, task) + } + } + } + } + } + + // Sort pending tasks by priority and schedule time + sort.Slice(mq.pendingTasks, func(i, j int) bool { + if mq.pendingTasks[i].Priority != mq.pendingTasks[j].Priority { + return mq.pendingTasks[i].Priority > mq.pendingTasks[j].Priority + } + return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt) + }) + + glog.Infof("Loaded %d tasks from persistence (%d pending)", len(tasks), len(mq.pendingTasks)) + return nil +} + +// saveTaskState saves a task to persistent storage +func (mq *MaintenanceQueue) saveTaskState(task *MaintenanceTask) { + if mq.persistence != nil { + if err := mq.persistence.SaveTaskState(task); err != nil { + glog.Errorf("Failed to save task state for %s: %v", task.ID, err) + } + } +} + +// cleanupCompletedTasks removes old completed tasks beyond the retention limit +func (mq *MaintenanceQueue) cleanupCompletedTasks() { + if mq.persistence != nil { + if err := mq.persistence.CleanupCompletedTasks(); err != nil { + glog.Errorf("Failed to cleanup completed tasks: %v", err) + } + } +} + // AddTask adds a new maintenance task to the queue with deduplication func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { mq.mutex.Lock() @@ -44,6 +139,18 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { task.CreatedAt = time.Now() task.MaxRetries = 3 // Default retry count + // Initialize assignment history and set creation context + task.AssignmentHistory = make([]*TaskAssignmentRecord, 0) + if task.CreatedBy == "" { + task.CreatedBy = "maintenance-system" + } + if task.CreationContext == "" { + task.CreationContext = "Automatic task creation based on system monitoring" + } + if task.Tags == nil { + task.Tags = make(map[string]string) + } + mq.tasks[task.ID] = task mq.pendingTasks = append(mq.pendingTasks, task) @@ -55,6 +162,9 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt) }) + // Save task state to persistence + mq.saveTaskState(task) + scheduleInfo := "" if !task.ScheduledAt.IsZero() && time.Until(task.ScheduledAt) > time.Minute { scheduleInfo = fmt.Sprintf(", scheduled for %v", task.ScheduledAt.Format("15:04:05")) @@ -143,7 +253,11 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena // Check if this task type needs a cooldown period if !mq.canScheduleTaskNow(task) { - glog.V(3).Infof("Task %s (%s) skipped for worker %s: scheduling constraints not met", task.ID, task.Type, workerID) + // Add detailed diagnostic information + runningCount := mq.GetRunningTaskCount(task.Type) + maxConcurrent := mq.getMaxConcurrentForTaskType(task.Type) + glog.V(2).Infof("Task %s (%s) skipped for worker %s: scheduling constraints not met (running: %d, max: %d)", + task.ID, task.Type, workerID, runningCount, maxConcurrent) continue } @@ -172,6 +286,26 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena return nil } + // Record assignment history + workerAddress := "" + if worker, exists := mq.workers[workerID]; exists { + workerAddress = worker.Address + } + + // Create assignment record + assignmentRecord := &TaskAssignmentRecord{ + WorkerID: workerID, + WorkerAddress: workerAddress, + AssignedAt: now, + Reason: "Task assigned to available worker", + } + + // Initialize assignment history if nil + if selectedTask.AssignmentHistory == nil { + selectedTask.AssignmentHistory = make([]*TaskAssignmentRecord, 0) + } + selectedTask.AssignmentHistory = append(selectedTask.AssignmentHistory, assignmentRecord) + // Assign the task selectedTask.Status = TaskStatusAssigned selectedTask.WorkerID = workerID @@ -188,6 +322,9 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena // Track pending operation mq.trackPendingOperation(selectedTask) + // Save task state after assignment + mq.saveTaskState(selectedTask) + glog.Infof("Task assigned: %s (%s) → worker %s (volume %d, server %s)", selectedTask.ID, selectedTask.Type, workerID, selectedTask.VolumeID, selectedTask.Server) @@ -220,6 +357,17 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { // Check if task should be retried if task.RetryCount < task.MaxRetries { + // Record unassignment due to failure/retry + if task.WorkerID != "" && len(task.AssignmentHistory) > 0 { + lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1] + if lastAssignment.UnassignedAt == nil { + unassignedTime := completedTime + lastAssignment.UnassignedAt = &unassignedTime + lastAssignment.Reason = fmt.Sprintf("Task failed, scheduling retry (attempt %d/%d): %s", + task.RetryCount+1, task.MaxRetries, error) + } + } + task.RetryCount++ task.Status = TaskStatusPending task.WorkerID = "" @@ -229,15 +377,31 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { task.ScheduledAt = time.Now().Add(15 * time.Minute) // Retry delay mq.pendingTasks = append(mq.pendingTasks, task) + // Save task state after retry setup + mq.saveTaskState(task) glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s", taskID, task.Type, task.RetryCount, task.MaxRetries, task.WorkerID, duration, error) } else { + // Record unassignment due to permanent failure + if task.WorkerID != "" && len(task.AssignmentHistory) > 0 { + lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1] + if lastAssignment.UnassignedAt == nil { + unassignedTime := completedTime + lastAssignment.UnassignedAt = &unassignedTime + lastAssignment.Reason = fmt.Sprintf("Task failed permanently after %d retries: %s", task.MaxRetries, error) + } + } + + // Save task state after permanent failure + mq.saveTaskState(task) glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s", taskID, task.Type, task.WorkerID, duration, task.MaxRetries, error) } } else { task.Status = TaskStatusCompleted task.Progress = 100 + // Save task state after successful completion + mq.saveTaskState(task) glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d", taskID, task.Type, task.WorkerID, duration, task.VolumeID) } @@ -257,6 +421,14 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { if task.Status != TaskStatusPending { mq.removePendingOperation(taskID) } + + // Periodically cleanup old completed tasks (every 10th completion) + if task.Status == TaskStatusCompleted { + // Simple counter-based trigger for cleanup + if len(mq.tasks)%10 == 0 { + go mq.cleanupCompletedTasks() + } + } } // UpdateTaskProgress updates the progress of a running task @@ -283,6 +455,11 @@ func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64) glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", taskID, task.Type, task.WorkerID, progress) } + + // Save task state after progress update + if progress == 0 || progress >= 100 || progress-oldProgress >= 10 { + mq.saveTaskState(task) + } } else { glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress) } @@ -489,9 +666,19 @@ func (mq *MaintenanceQueue) RemoveStaleWorkers(timeout time.Duration) int { for id, worker := range mq.workers { if worker.LastHeartbeat.Before(cutoff) { - // Mark any assigned tasks as failed + // Mark any assigned tasks as failed and record unassignment for _, task := range mq.tasks { if task.WorkerID == id && (task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress) { + // Record unassignment due to worker becoming unavailable + if len(task.AssignmentHistory) > 0 { + lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1] + if lastAssignment.UnassignedAt == nil { + unassignedTime := time.Now() + lastAssignment.UnassignedAt = &unassignedTime + lastAssignment.Reason = "Worker became unavailable (stale heartbeat)" + } + } + task.Status = TaskStatusFailed task.Error = "Worker became unavailable" completedTime := time.Now() @@ -600,7 +787,10 @@ func (mq *MaintenanceQueue) canExecuteTaskType(taskType MaintenanceTaskType) boo runningCount := mq.GetRunningTaskCount(taskType) maxConcurrent := mq.getMaxConcurrentForTaskType(taskType) - return runningCount < maxConcurrent + canExecute := runningCount < maxConcurrent + glog.V(3).Infof("canExecuteTaskType for %s: running=%d, max=%d, canExecute=%v", taskType, runningCount, maxConcurrent, canExecute) + + return canExecute } // getMaxConcurrentForTaskType returns the maximum concurrent tasks allowed for a task type @@ -684,40 +874,28 @@ func (mq *MaintenanceQueue) trackPendingOperation(task *MaintenanceTask) { opType = OpTypeVolumeMove } - // Determine destination node and estimated size from typed parameters + // Determine destination node and estimated size from unified targets destNode := "" estimatedSize := uint64(1024 * 1024 * 1024) // Default 1GB estimate - switch params := task.TypedParams.TaskParams.(type) { - case *worker_pb.TaskParams_ErasureCodingParams: - if params.ErasureCodingParams != nil { - if len(params.ErasureCodingParams.Destinations) > 0 { - destNode = params.ErasureCodingParams.Destinations[0].Node - } - if params.ErasureCodingParams.EstimatedShardSize > 0 { - estimatedSize = params.ErasureCodingParams.EstimatedShardSize - } - } - case *worker_pb.TaskParams_BalanceParams: - if params.BalanceParams != nil { - destNode = params.BalanceParams.DestNode - if params.BalanceParams.EstimatedSize > 0 { - estimatedSize = params.BalanceParams.EstimatedSize - } - } - case *worker_pb.TaskParams_ReplicationParams: - if params.ReplicationParams != nil { - destNode = params.ReplicationParams.DestNode - if params.ReplicationParams.EstimatedSize > 0 { - estimatedSize = params.ReplicationParams.EstimatedSize - } + // Use unified targets array - the only source of truth + if len(task.TypedParams.Targets) > 0 { + destNode = task.TypedParams.Targets[0].Node + if task.TypedParams.Targets[0].EstimatedSize > 0 { + estimatedSize = task.TypedParams.Targets[0].EstimatedSize } } + // Determine source node from unified sources + sourceNode := "" + if len(task.TypedParams.Sources) > 0 { + sourceNode = task.TypedParams.Sources[0].Node + } + operation := &PendingOperation{ VolumeID: task.VolumeID, OperationType: opType, - SourceNode: task.Server, + SourceNode: sourceNode, DestNode: destNode, TaskID: task.ID, StartTime: time.Now(), diff --git a/weed/admin/maintenance/maintenance_scanner.go b/weed/admin/maintenance/maintenance_scanner.go index 3f8a528eb..6f3b46be2 100644 --- a/weed/admin/maintenance/maintenance_scanner.go +++ b/weed/admin/maintenance/maintenance_scanner.go @@ -117,6 +117,8 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, Server: node.Id, DiskType: diskType, // Track which disk this volume is on DiskId: volInfo.DiskId, // Use disk ID from volume info + DataCenter: dc.Id, // Data center from current loop + Rack: rack.Id, // Rack from current loop Collection: volInfo.Collection, Size: volInfo.Size, DeletedBytes: volInfo.DeletedByteCount, @@ -207,6 +209,8 @@ func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetric Server: metric.Server, DiskType: metric.DiskType, DiskId: metric.DiskId, + DataCenter: metric.DataCenter, + Rack: metric.Rack, Collection: metric.Collection, Size: metric.Size, DeletedBytes: metric.DeletedBytes, diff --git a/weed/admin/maintenance/maintenance_types.go b/weed/admin/maintenance/maintenance_types.go index e863b26e6..fe5d5fa55 100644 --- a/weed/admin/maintenance/maintenance_types.go +++ b/weed/admin/maintenance/maintenance_types.go @@ -108,6 +108,57 @@ type MaintenanceTask struct { Progress float64 `json:"progress"` // 0-100 RetryCount int `json:"retry_count"` MaxRetries int `json:"max_retries"` + + // Enhanced fields for detailed task tracking + CreatedBy string `json:"created_by,omitempty"` // Who/what created this task + CreationContext string `json:"creation_context,omitempty"` // Additional context about creation + AssignmentHistory []*TaskAssignmentRecord `json:"assignment_history,omitempty"` // History of worker assignments + DetailedReason string `json:"detailed_reason,omitempty"` // More detailed explanation than Reason + Tags map[string]string `json:"tags,omitempty"` // Additional metadata tags +} + +// TaskAssignmentRecord tracks when a task was assigned to a worker +type TaskAssignmentRecord struct { + WorkerID string `json:"worker_id"` + WorkerAddress string `json:"worker_address"` + AssignedAt time.Time `json:"assigned_at"` + UnassignedAt *time.Time `json:"unassigned_at,omitempty"` + Reason string `json:"reason"` // Why was it assigned/unassigned +} + +// TaskExecutionLog represents a log entry from task execution +type TaskExecutionLog struct { + Timestamp time.Time `json:"timestamp"` + Level string `json:"level"` // "info", "warn", "error", "debug" + Message string `json:"message"` + Source string `json:"source"` // Which component logged this + TaskID string `json:"task_id"` + WorkerID string `json:"worker_id"` + // Optional structured fields carried from worker logs + Fields map[string]string `json:"fields,omitempty"` + // Optional progress/status carried from worker logs + Progress *float64 `json:"progress,omitempty"` + Status string `json:"status,omitempty"` +} + +// TaskDetailData represents comprehensive information about a task for the detail view +type TaskDetailData struct { + Task *MaintenanceTask `json:"task"` + AssignmentHistory []*TaskAssignmentRecord `json:"assignment_history"` + ExecutionLogs []*TaskExecutionLog `json:"execution_logs"` + RelatedTasks []*MaintenanceTask `json:"related_tasks,omitempty"` // Other tasks on same volume/server + WorkerInfo *MaintenanceWorker `json:"worker_info,omitempty"` // Current or last assigned worker + CreationMetrics *TaskCreationMetrics `json:"creation_metrics,omitempty"` // Metrics that led to task creation + LastUpdated time.Time `json:"last_updated"` +} + +// TaskCreationMetrics holds metrics that led to the task being created +type TaskCreationMetrics struct { + TriggerMetric string `json:"trigger_metric"` // What metric triggered this task + MetricValue float64 `json:"metric_value"` // Value of the trigger metric + Threshold float64 `json:"threshold"` // Threshold that was exceeded + VolumeMetrics *VolumeHealthMetrics `json:"volume_metrics,omitempty"` + AdditionalData map[string]interface{} `json:"additional_data,omitempty"` } // MaintenanceConfig holds configuration for the maintenance system @@ -122,6 +173,15 @@ type MaintenancePolicy = worker_pb.MaintenancePolicy // DEPRECATED: Use worker_pb.TaskPolicy instead type TaskPolicy = worker_pb.TaskPolicy +// TaskPersistence interface for task state persistence +type TaskPersistence interface { + SaveTaskState(task *MaintenanceTask) error + LoadTaskState(taskID string) (*MaintenanceTask, error) + LoadAllTaskStates() ([]*MaintenanceTask, error) + DeleteTaskState(taskID string) error + CleanupCompletedTasks() error +} + // Default configuration values func DefaultMaintenanceConfig() *MaintenanceConfig { return DefaultMaintenanceConfigProto() @@ -273,6 +333,7 @@ type MaintenanceQueue struct { mutex sync.RWMutex policy *MaintenancePolicy integration *MaintenanceIntegration + persistence TaskPersistence // Interface for task persistence } // MaintenanceScanner analyzes the cluster and generates maintenance tasks @@ -301,8 +362,10 @@ type TaskDetectionResult struct { type VolumeHealthMetrics struct { VolumeID uint32 `json:"volume_id"` Server string `json:"server"` - DiskType string `json:"disk_type"` // Disk type (e.g., "hdd", "ssd") or disk path (e.g., "/data1") - DiskId uint32 `json:"disk_id"` // ID of the disk in Store.Locations array + DiskType string `json:"disk_type"` // Disk type (e.g., "hdd", "ssd") or disk path (e.g., "/data1") + DiskId uint32 `json:"disk_id"` // ID of the disk in Store.Locations array + DataCenter string `json:"data_center"` // Data center of the server + Rack string `json:"rack"` // Rack of the server Collection string `json:"collection"` Size uint64 `json:"size"` DeletedBytes uint64 `json:"deleted_bytes"` diff --git a/weed/admin/topology/structs.go b/weed/admin/topology/structs.go index f2d29eb5f..103ee5abe 100644 --- a/weed/admin/topology/structs.go +++ b/weed/admin/topology/structs.go @@ -96,13 +96,12 @@ type ActiveTopology struct { // DestinationPlan represents a planned destination for a volume/shard operation type DestinationPlan struct { - TargetNode string `json:"target_node"` - TargetDisk uint32 `json:"target_disk"` - TargetRack string `json:"target_rack"` - TargetDC string `json:"target_dc"` - ExpectedSize uint64 `json:"expected_size"` - PlacementScore float64 `json:"placement_score"` - Conflicts []string `json:"conflicts"` + TargetNode string `json:"target_node"` + TargetDisk uint32 `json:"target_disk"` + TargetRack string `json:"target_rack"` + TargetDC string `json:"target_dc"` + ExpectedSize uint64 `json:"expected_size"` + PlacementScore float64 `json:"placement_score"` } // MultiDestinationPlan represents multiple planned destinations for operations like EC @@ -115,6 +114,8 @@ type MultiDestinationPlan struct { // VolumeReplica represents a replica location with server and disk information type VolumeReplica struct { - ServerID string `json:"server_id"` - DiskID uint32 `json:"disk_id"` + ServerID string `json:"server_id"` + DiskID uint32 `json:"disk_id"` + DataCenter string `json:"data_center"` + Rack string `json:"rack"` } diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go index b240adcd8..ada60248b 100644 --- a/weed/admin/topology/task_management.go +++ b/weed/admin/topology/task_management.go @@ -233,6 +233,8 @@ const ( type TaskSourceSpec struct { ServerID string DiskID uint32 + DataCenter string // Data center of the source server + Rack string // Rack of the source server CleanupType SourceCleanupType // For EC: volume replica vs existing shards StorageImpact *StorageSlotChange // Optional: manual override EstimatedSize *int64 // Optional: manual override @@ -255,10 +257,3 @@ type TaskSpec struct { Sources []TaskSourceSpec // Can be single or multiple Destinations []TaskDestinationSpec // Can be single or multiple } - -// TaskSourceLocation represents a source location for task creation (DEPRECATED: use TaskSourceSpec) -type TaskSourceLocation struct { - ServerID string - DiskID uint32 - CleanupType SourceCleanupType // What type of cleanup is needed -} diff --git a/weed/admin/topology/topology_management.go b/weed/admin/topology/topology_management.go index e12839484..65b7dfe7e 100644 --- a/weed/admin/topology/topology_management.go +++ b/weed/admin/topology/topology_management.go @@ -188,8 +188,10 @@ func (at *ActiveTopology) GetVolumeLocations(volumeID uint32, collection string) // Verify collection matches (since index doesn't include collection) if at.volumeMatchesCollection(disk, volumeID, collection) { replicas = append(replicas, VolumeReplica{ - ServerID: disk.NodeID, - DiskID: disk.DiskID, + ServerID: disk.NodeID, + DiskID: disk.DiskID, + DataCenter: disk.DataCenter, + Rack: disk.Rack, }) } } @@ -214,8 +216,10 @@ func (at *ActiveTopology) GetECShardLocations(volumeID uint32, collection string // Verify collection matches (since index doesn't include collection) if at.ecShardMatchesCollection(disk, volumeID, collection) { ecShards = append(ecShards, VolumeReplica{ - ServerID: disk.NodeID, - DiskID: disk.DiskID, + ServerID: disk.NodeID, + DiskID: disk.DiskID, + DataCenter: disk.DataCenter, + Rack: disk.Rack, }) } } diff --git a/weed/admin/view/app/admin.templ b/weed/admin/view/app/admin.templ index 534c798bd..568db59d7 100644 --- a/weed/admin/view/app/admin.templ +++ b/weed/admin/view/app/admin.templ @@ -12,7 +12,7 @@ templ Admin(data dash.AdminData) {