diff --git a/weed/admin/maintenance/maintenance_integration.go b/weed/admin/maintenance/maintenance_integration.go index a569ea09f..be5875a72 100644 --- a/weed/admin/maintenance/maintenance_integration.go +++ b/weed/admin/maintenance/maintenance_integration.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -229,6 +230,12 @@ func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.Vo continue } + // Cancel stale pending tasks for this type before re-detection + maintenanceType := s.taskTypeMap[taskType] + if cancelled := s.maintenanceQueue.CancelPendingTasksByType(maintenanceType); cancelled > 0 { + glog.Infof("Cancelled %d stale pending %s tasks before re-detection", cancelled, taskType) + } + glog.V(2).Infof("Running detection for task type: %s", taskType) results, err := detector.ScanForTasks(filteredMetrics, clusterInfo) @@ -528,10 +535,15 @@ func (s *MaintenanceIntegration) SyncTask(task *MaintenanceTask) { // Volume size is not currently used for Balance/Vacuum impact and is not stored in MaintenanceTask sourceImpact, targetImpact := topology.CalculateTaskStorageImpact(topology.TaskType(string(taskType)), 0) - // Use unified sources and targets from TaskParams + // Use unified sources and targets from TaskParams. + // Task protos store ServerAddresses (with gRPC port, e.g., "host:port.grpcPort") + // but the topology indexes disks by NodeId (e.g., "host:port"). + // Strip the gRPC port suffix via ToHttpAddress() to match the topology key. for _, src := range task.TypedParams.Sources { + resolvedSrc := pb.ServerAddress(src.Node).ToHttpAddress() + glog.V(2).Infof("SyncTask %s: source proto Node=%q resolved to %q, diskId=%d", task.ID, src.Node, resolvedSrc, src.DiskId) sources = append(sources, topology.TaskSource{ - SourceServer: src.Node, + SourceServer: resolvedSrc, SourceDisk: src.DiskId, StorageChange: sourceImpact, }) @@ -539,8 +551,10 @@ func (s *MaintenanceIntegration) SyncTask(task *MaintenanceTask) { estimatedSize += int64(src.EstimatedSize) } for _, target := range task.TypedParams.Targets { + resolvedTarget := pb.ServerAddress(target.Node).ToHttpAddress() + glog.V(2).Infof("SyncTask %s: target proto Node=%q resolved to %q, diskId=%d", task.ID, target.Node, resolvedTarget, target.DiskId) destinations = append(destinations, topology.TaskDestination{ - TargetServer: target.Node, + TargetServer: resolvedTarget, TargetDisk: target.DiskId, StorageChange: targetImpact, }) diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index 32a3abbb4..28dbc1c5c 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "fmt" "sort" + "strings" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -230,6 +231,46 @@ func (mq *MaintenanceQueue) hasDuplicateTask(newTask *MaintenanceTask) bool { return false } +// CancelPendingTasksByType cancels all pending tasks of a given type. +// This is called before each detection cycle to ensure stale proposals +// from previous cycles are cleaned up before creating new ones. +func (mq *MaintenanceQueue) CancelPendingTasksByType(taskType MaintenanceTaskType) int { + mq.mutex.Lock() + + var remaining []*MaintenanceTask + var cancelledSnapshots []*MaintenanceTask + cancelled := 0 + for _, task := range mq.pendingTasks { + if task.Type == taskType { + task.Status = TaskStatusCancelled + now := time.Now() + task.CompletedAt = &now + cancelled++ + cancelledSnapshots = append(cancelledSnapshots, snapshotTask(task)) + glog.V(1).Infof("Cancelled stale pending task %s (%s) for volume %d before re-detection", + task.ID, task.Type, task.VolumeID) + + // Release capacity in ActiveTopology and remove pending operation + if mq.integration != nil { + if at := mq.integration.GetActiveTopology(); at != nil { + _ = at.CompleteTask(task.ID) + } + } + mq.removePendingOperation(task.ID) + } else { + remaining = append(remaining, task) + } + } + mq.pendingTasks = remaining + mq.mutex.Unlock() + + // Persist cancelled state outside the lock to avoid blocking + for _, snapshot := range cancelledSnapshots { + mq.saveTaskState(snapshot) + } + return cancelled +} + // AddTasksFromResults converts detection results to tasks and adds them to the queue func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) { for _, result := range results { @@ -455,8 +496,8 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { task.Status = TaskStatusFailed task.Error = error - // Check if task should be retried - if task.RetryCount < task.MaxRetries { + // Check if task should be retried (skip retry for permanent errors) + if task.RetryCount < task.MaxRetries && !isNonRetriableError(error) { // Record unassignment due to failure/retry if task.WorkerID != "" && len(task.AssignmentHistory) > 0 { lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1] @@ -559,6 +600,12 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { } } +// isNonRetriableError returns true for errors that will never succeed on retry, +// such as when the volume doesn't exist on the source server. +func isNonRetriableError(errMsg string) bool { + return strings.Contains(errMsg, "not found") +} + // UpdateTaskProgress updates the progress of a running task func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64) { mq.mutex.Lock() diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go index e304df977..3710b06ab 100644 --- a/weed/admin/topology/task_management.go +++ b/weed/admin/topology/task_management.go @@ -17,20 +17,30 @@ func (at *ActiveTopology) AssignTask(taskID string) error { return fmt.Errorf("pending task %s not found", taskID) } - // Check if all destination disks have sufficient capacity to reserve - for _, dest := range task.Destinations { - targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk) - if targetDisk, exists := at.disks[targetKey]; exists { - availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk) - - // Check if we have enough total capacity using the improved unified comparison - if !availableCapacity.CanAccommodate(dest.StorageChange) { - return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v", - dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange) + // Skip capacity check if topology hasn't been populated yet + if len(at.disks) == 0 { + glog.Warningf("AssignTask %s: topology has no disks yet, skipping capacity check", taskID) + } else { + // Check if all destination disks have sufficient capacity to reserve + for _, dest := range task.Destinations { + targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk) + if targetDisk, exists := at.disks[targetKey]; exists { + availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk) + + // Check if we have enough total capacity using the improved unified comparison + if !availableCapacity.CanAccommodate(dest.StorageChange) { + return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v", + dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange) + } + } else if dest.TargetServer != "" { + // Fail fast if destination disk is not found in topology + var existingKeys []string + for k := range at.disks { + existingKeys = append(existingKeys, k) + } + glog.Warningf("destination disk %s not found in topology. Existing disk keys: %v", targetKey, existingKeys) + return fmt.Errorf("destination disk %s not found in topology", targetKey) } - } else if dest.TargetServer != "" { - // Fail fast if destination disk is not found in topology - return fmt.Errorf("destination disk %s not found in topology", targetKey) } } diff --git a/weed/admin/topology/topology_management.go b/weed/admin/topology/topology_management.go index 25837e09b..1343deab8 100644 --- a/weed/admin/topology/topology_management.go +++ b/weed/admin/topology/topology_management.go @@ -87,6 +87,8 @@ func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) e } diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId) + glog.V(2).Infof("UpdateTopology: adding disk key=%q nodeId=%q diskId=%d diskType=%q address=%q grpcPort=%d volumes=%d maxVolumes=%d", + diskKey, nodeInfo.Id, diskInfo.DiskId, diskType, nodeInfo.Address, nodeInfo.GrpcPort, diskInfo.VolumeCount, diskInfo.MaxVolumeCount) node.disks[diskInfo.DiskId] = disk at.disks[diskKey] = disk }