From 751cfac7d747e47e12c63ee29fb686c262985026 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 10 Aug 2025 18:02:42 -0700 Subject: [PATCH] Implement volume-aware task conflict checking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MAJOR IMPROVEMENT: Tasks now conflict by volume ID, not globally by task type Changes: - PRIMARY RULE: Tasks on the same volume ID always conflict (prevents race conditions) - SECONDARY RULE: Minimal global task type conflicts (currently none) - Add isDiskAvailableForVolume() for volume-specific availability checking - Add GetAvailableDisksForVolume() and GetDisksWithEffectiveCapacityForVolume() - Remove overly restrictive global task type conflicts - Update planning functions to focus on capacity, not conflicts Benefits: ✅ Multiple vacuum tasks can run on different volumes simultaneously ✅ Balance and erasure coding can run on different volumes ✅ Still prevents dangerous concurrent operations on same volume ✅ Much more efficient resource utilization ✅ Maintains data integrity and prevents race conditions This addresses the user feedback that task conflicts should be volume-specific, not global task type restrictions. --- weed/admin/topology/capacity.go | 59 +++++++++++++++++++--- weed/admin/topology/internal.go | 59 +++++++++++++--------- weed/admin/topology/topology_management.go | 24 +++++++++ 3 files changed, 111 insertions(+), 31 deletions(-) diff --git a/weed/admin/topology/capacity.go b/weed/admin/topology/capacity.go index a595ed369..502b6e25c 100644 --- a/weed/admin/topology/capacity.go +++ b/weed/admin/topology/capacity.go @@ -227,13 +227,9 @@ func (at *ActiveTopology) isDiskAvailableForPlanning(disk *activeDisk, taskType return false } - // Check for conflicting task types in active tasks only - for _, task := range disk.assignedTasks { - if at.areTaskTypesConflicting(task.TaskType, taskType) { - return false - } - } - + // For planning purposes, we only check capacity constraints + // Volume-specific conflicts will be checked when the actual task is scheduled + // with knowledge of the specific volume ID return true } @@ -298,3 +294,52 @@ func (at *ActiveTopology) getEffectiveAvailableCapacityUnsafe(disk *activeDisk) ShardSlots: -netImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability) } } + +// GetDisksWithEffectiveCapacityForVolume returns disks with effective capacity for a specific volume +// Uses volume-aware conflict checking to prevent race conditions on the same volume +func (at *ActiveTopology) GetDisksWithEffectiveCapacityForVolume(taskType TaskType, volumeID uint32, excludeNodeID string, minCapacity int64) []*DiskInfo { + at.mutex.RLock() + defer at.mutex.RUnlock() + + var available []*DiskInfo + + for _, disk := range at.disks { + if disk.NodeID == excludeNodeID { + continue // Skip excluded node + } + + if at.isDiskAvailableForVolume(disk, taskType, volumeID) { + effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk) + + // Only include disks that meet minimum capacity requirement + if int64(effectiveCapacity.VolumeSlots) >= minCapacity { + // Create a new DiskInfo with current capacity information + diskCopy := DiskInfo{ + NodeID: disk.DiskInfo.NodeID, + DiskID: disk.DiskInfo.DiskID, + DiskType: disk.DiskInfo.DiskType, + DataCenter: disk.DiskInfo.DataCenter, + Rack: disk.DiskInfo.Rack, + LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks + } + + // Create a new protobuf DiskInfo to avoid modifying the original + diskInfoCopy := &master_pb.DiskInfo{ + DiskId: disk.DiskInfo.DiskInfo.DiskId, + MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount, + VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots), + VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos, + EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos, + RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount, + ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount, + FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount, + } + diskCopy.DiskInfo = diskInfoCopy + + available = append(available, &diskCopy) + } + } + } + + return available +} diff --git a/weed/admin/topology/internal.go b/weed/admin/topology/internal.go index cb2aad7ce..b0ddef9a9 100644 --- a/weed/admin/topology/internal.go +++ b/weed/admin/topology/internal.go @@ -64,7 +64,7 @@ func (at *ActiveTopology) assignTaskToDisk(task *taskState) { } } -// isDiskAvailable checks if a disk can accept new tasks +// isDiskAvailable checks if a disk can accept new tasks (general availability) func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool { // Check if disk has too many pending and active tasks activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks) @@ -72,9 +72,21 @@ func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) b return false } - // Check for conflicting task types + // For general availability, only check disk capacity + // Volume-specific conflicts are checked in isDiskAvailableForVolume + return true +} + +// isDiskAvailableForVolume checks if a disk can accept a new task for a specific volume +func (at *ActiveTopology) isDiskAvailableForVolume(disk *activeDisk, taskType TaskType, volumeID uint32) bool { + // Check basic availability first + if !at.isDiskAvailable(disk, taskType) { + return false + } + + // Check for volume-specific conflicts for _, task := range disk.assignedTasks { - if at.areTaskTypesConflicting(task.TaskType, taskType) { + if at.areTasksConflicting(task, taskType, volumeID) { return false } } @@ -82,29 +94,28 @@ func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) b return true } -// areTaskTypesConflicting checks if two task types conflict -func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool { - // Define conflicting task types to prevent dangerous concurrent operations - // These conflicts prevent race conditions and data integrity issues - conflictMap := map[TaskType][]TaskType{ - // Vacuum conflicts with balance and erasure coding (and ec_vacuum) - TaskType("vacuum"): {TaskType("balance"), TaskType("erasure_coding"), TaskType("ec_vacuum")}, - - // Balance conflicts with vacuum and erasure coding operations - TaskType("balance"): {TaskType("vacuum"), TaskType("erasure_coding"), TaskType("ec_vacuum")}, - - // Erasure coding conflicts with vacuum and balance operations - TaskType("erasure_coding"): {TaskType("vacuum"), TaskType("balance"), TaskType("ec_vacuum")}, - - // EC vacuum conflicts with all other maintenance operations on same volume - TaskType("ec_vacuum"): {TaskType("vacuum"), TaskType("balance"), TaskType("erasure_coding")}, - - // Replication generally should not conflict with read-only operations - // but should conflict with destructive operations - TaskType("replication"): {TaskType("vacuum"), TaskType("balance")}, +// areTasksConflicting checks if a new task conflicts with an existing task +func (at *ActiveTopology) areTasksConflicting(existingTask *taskState, newTaskType TaskType, newVolumeID uint32) bool { + // PRIMARY RULE: Tasks on the same volume always conflict (prevents race conditions) + if existingTask.VolumeID == newVolumeID { + return true + } + + // SECONDARY RULE: Some task types may have global conflicts (rare cases) + return at.areTaskTypesGloballyConflicting(existingTask.TaskType, newTaskType) +} + +// areTaskTypesGloballyConflicting checks for rare global task type conflicts +// These should be minimal - most conflicts should be volume-specific +func (at *ActiveTopology) areTaskTypesGloballyConflicting(existing, new TaskType) bool { + // Define very limited global conflicts (cross-volume conflicts) + // Most conflicts should be volume-based, not global + globalConflictMap := map[TaskType][]TaskType{ + // Example: Some hypothetical global resource conflicts could go here + // Currently empty - volume-based conflicts are sufficient } - if conflicts, exists := conflictMap[existing]; exists { + if conflicts, exists := globalConflictMap[existing]; exists { for _, conflictType := range conflicts { if conflictType == new { return true diff --git a/weed/admin/topology/topology_management.go b/weed/admin/topology/topology_management.go index 65b7dfe7e..fcf1b7043 100644 --- a/weed/admin/topology/topology_management.go +++ b/weed/admin/topology/topology_management.go @@ -89,6 +89,30 @@ func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID str return available } +// GetAvailableDisksForVolume returns disks that can accept a task for a specific volume +// This method uses volume-aware conflict checking to prevent race conditions +func (at *ActiveTopology) GetAvailableDisksForVolume(taskType TaskType, volumeID uint32, excludeNodeID string) []*DiskInfo { + at.mutex.RLock() + defer at.mutex.RUnlock() + + var available []*DiskInfo + + for _, disk := range at.disks { + if disk.NodeID == excludeNodeID { + continue // Skip excluded node + } + + if at.isDiskAvailableForVolume(disk, taskType, volumeID) { + // Create a copy with current load count + diskCopy := *disk.DiskInfo + diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks) + available = append(available, &diskCopy) + } + } + + return available +} + // HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection) func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool { at.mutex.RLock()