From c19f88eef1ea4ca57fca8ecdea81df9e25a4bf4d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 4 Mar 2026 19:20:28 -0800 Subject: [PATCH] fix: resolve ServerAddress to NodeId in maintenance task sync (#8508) * fix: maintenance task topology lookup, retry, and stale task cleanup 1. Strip gRPC port from ServerAddress in SyncTask using ToHttpAddress() so task targets match topology disk keys (NodeId format). 2. Skip capacity check when topology has no disks yet (startup race where tasks are loaded from persistence before first topology update). 3. Don't retry permanent errors like "volume not found" - these will never succeed on retry. 4. Cancel all pending tasks for each task type before re-detection, ensuring stale proposals from previous cycles are cleaned up. This prevents stale tasks from blocking new detection and from repeatedly failing. Co-Authored-By: Claude Opus 4.6 * logs Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * less lock scope Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Claude Opus 4.6 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../maintenance/maintenance_integration.go | 20 ++++++-- weed/admin/maintenance/maintenance_queue.go | 51 ++++++++++++++++++- weed/admin/topology/task_management.go | 36 ++++++++----- weed/admin/topology/topology_management.go | 2 + 4 files changed, 91 insertions(+), 18 deletions(-) diff --git a/weed/admin/maintenance/maintenance_integration.go b/weed/admin/maintenance/maintenance_integration.go index a569ea09f..be5875a72 100644 --- a/weed/admin/maintenance/maintenance_integration.go +++ b/weed/admin/maintenance/maintenance_integration.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -229,6 +230,12 @@ func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.Vo continue } + // Cancel stale pending tasks for this type before re-detection + maintenanceType := s.taskTypeMap[taskType] + if cancelled := s.maintenanceQueue.CancelPendingTasksByType(maintenanceType); cancelled > 0 { + glog.Infof("Cancelled %d stale pending %s tasks before re-detection", cancelled, taskType) + } + glog.V(2).Infof("Running detection for task type: %s", taskType) results, err := detector.ScanForTasks(filteredMetrics, clusterInfo) @@ -528,10 +535,15 @@ func (s *MaintenanceIntegration) SyncTask(task *MaintenanceTask) { // Volume size is not currently used for Balance/Vacuum impact and is not stored in MaintenanceTask sourceImpact, targetImpact := topology.CalculateTaskStorageImpact(topology.TaskType(string(taskType)), 0) - // Use unified sources and targets from TaskParams + // Use unified sources and targets from TaskParams. + // Task protos store ServerAddresses (with gRPC port, e.g., "host:port.grpcPort") + // but the topology indexes disks by NodeId (e.g., "host:port"). + // Strip the gRPC port suffix via ToHttpAddress() to match the topology key. for _, src := range task.TypedParams.Sources { + resolvedSrc := pb.ServerAddress(src.Node).ToHttpAddress() + glog.V(2).Infof("SyncTask %s: source proto Node=%q resolved to %q, diskId=%d", task.ID, src.Node, resolvedSrc, src.DiskId) sources = append(sources, topology.TaskSource{ - SourceServer: src.Node, + SourceServer: resolvedSrc, SourceDisk: src.DiskId, StorageChange: sourceImpact, }) @@ -539,8 +551,10 @@ func (s *MaintenanceIntegration) SyncTask(task *MaintenanceTask) { estimatedSize += int64(src.EstimatedSize) } for _, target := range task.TypedParams.Targets { + resolvedTarget := pb.ServerAddress(target.Node).ToHttpAddress() + glog.V(2).Infof("SyncTask %s: target proto Node=%q resolved to %q, diskId=%d", task.ID, target.Node, resolvedTarget, target.DiskId) destinations = append(destinations, topology.TaskDestination{ - TargetServer: target.Node, + TargetServer: resolvedTarget, TargetDisk: target.DiskId, StorageChange: targetImpact, }) diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index 32a3abbb4..28dbc1c5c 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "fmt" "sort" + "strings" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -230,6 +231,46 @@ func (mq *MaintenanceQueue) hasDuplicateTask(newTask *MaintenanceTask) bool { return false } +// CancelPendingTasksByType cancels all pending tasks of a given type. +// This is called before each detection cycle to ensure stale proposals +// from previous cycles are cleaned up before creating new ones. +func (mq *MaintenanceQueue) CancelPendingTasksByType(taskType MaintenanceTaskType) int { + mq.mutex.Lock() + + var remaining []*MaintenanceTask + var cancelledSnapshots []*MaintenanceTask + cancelled := 0 + for _, task := range mq.pendingTasks { + if task.Type == taskType { + task.Status = TaskStatusCancelled + now := time.Now() + task.CompletedAt = &now + cancelled++ + cancelledSnapshots = append(cancelledSnapshots, snapshotTask(task)) + glog.V(1).Infof("Cancelled stale pending task %s (%s) for volume %d before re-detection", + task.ID, task.Type, task.VolumeID) + + // Release capacity in ActiveTopology and remove pending operation + if mq.integration != nil { + if at := mq.integration.GetActiveTopology(); at != nil { + _ = at.CompleteTask(task.ID) + } + } + mq.removePendingOperation(task.ID) + } else { + remaining = append(remaining, task) + } + } + mq.pendingTasks = remaining + mq.mutex.Unlock() + + // Persist cancelled state outside the lock to avoid blocking + for _, snapshot := range cancelledSnapshots { + mq.saveTaskState(snapshot) + } + return cancelled +} + // AddTasksFromResults converts detection results to tasks and adds them to the queue func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) { for _, result := range results { @@ -455,8 +496,8 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { task.Status = TaskStatusFailed task.Error = error - // Check if task should be retried - if task.RetryCount < task.MaxRetries { + // Check if task should be retried (skip retry for permanent errors) + if task.RetryCount < task.MaxRetries && !isNonRetriableError(error) { // Record unassignment due to failure/retry if task.WorkerID != "" && len(task.AssignmentHistory) > 0 { lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1] @@ -559,6 +600,12 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { } } +// isNonRetriableError returns true for errors that will never succeed on retry, +// such as when the volume doesn't exist on the source server. +func isNonRetriableError(errMsg string) bool { + return strings.Contains(errMsg, "not found") +} + // UpdateTaskProgress updates the progress of a running task func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64) { mq.mutex.Lock() diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go index e304df977..3710b06ab 100644 --- a/weed/admin/topology/task_management.go +++ b/weed/admin/topology/task_management.go @@ -17,20 +17,30 @@ func (at *ActiveTopology) AssignTask(taskID string) error { return fmt.Errorf("pending task %s not found", taskID) } - // Check if all destination disks have sufficient capacity to reserve - for _, dest := range task.Destinations { - targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk) - if targetDisk, exists := at.disks[targetKey]; exists { - availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk) - - // Check if we have enough total capacity using the improved unified comparison - if !availableCapacity.CanAccommodate(dest.StorageChange) { - return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v", - dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange) + // Skip capacity check if topology hasn't been populated yet + if len(at.disks) == 0 { + glog.Warningf("AssignTask %s: topology has no disks yet, skipping capacity check", taskID) + } else { + // Check if all destination disks have sufficient capacity to reserve + for _, dest := range task.Destinations { + targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk) + if targetDisk, exists := at.disks[targetKey]; exists { + availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk) + + // Check if we have enough total capacity using the improved unified comparison + if !availableCapacity.CanAccommodate(dest.StorageChange) { + return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v", + dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange) + } + } else if dest.TargetServer != "" { + // Fail fast if destination disk is not found in topology + var existingKeys []string + for k := range at.disks { + existingKeys = append(existingKeys, k) + } + glog.Warningf("destination disk %s not found in topology. Existing disk keys: %v", targetKey, existingKeys) + return fmt.Errorf("destination disk %s not found in topology", targetKey) } - } else if dest.TargetServer != "" { - // Fail fast if destination disk is not found in topology - return fmt.Errorf("destination disk %s not found in topology", targetKey) } } diff --git a/weed/admin/topology/topology_management.go b/weed/admin/topology/topology_management.go index 25837e09b..1343deab8 100644 --- a/weed/admin/topology/topology_management.go +++ b/weed/admin/topology/topology_management.go @@ -87,6 +87,8 @@ func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) e } diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId) + glog.V(2).Infof("UpdateTopology: adding disk key=%q nodeId=%q diskId=%d diskType=%q address=%q grpcPort=%d volumes=%d maxVolumes=%d", + diskKey, nodeInfo.Id, diskInfo.DiskId, diskType, nodeInfo.Address, nodeInfo.GrpcPort, diskInfo.VolumeCount, diskInfo.MaxVolumeCount) node.disks[diskInfo.DiskId] = disk at.disks[diskKey] = disk }