package topology import ( "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/glog" ) // AssignTask moves a task from pending to assigned and reserves capacity func (at *ActiveTopology) AssignTask(taskID string) error { at.mutex.Lock() defer at.mutex.Unlock() task, exists := at.pendingTasks[taskID] if !exists { return fmt.Errorf("pending task %s not found", taskID) } // Check if all destination disks have sufficient capacity to reserve for _, dest := range task.Destinations { targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk) if targetDisk, exists := at.disks[targetKey]; exists { availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk) // Check if we have enough total capacity using the improved unified comparison if !availableCapacity.CanAccommodate(dest.StorageChange) { return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v", dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange) } } else if dest.TargetServer != "" { // Fail fast if destination disk is not found in topology return fmt.Errorf("destination disk %s not found in topology", targetKey) } } // Move task to assigned and reserve capacity delete(at.pendingTasks, taskID) task.Status = TaskStatusInProgress at.assignedTasks[taskID] = task at.reassignTaskStates() // Log capacity reservation information for all sources and destinations totalSourceImpact := StorageSlotChange{} totalDestImpact := StorageSlotChange{} for _, source := range task.Sources { totalSourceImpact.AddInPlace(source.StorageChange) } for _, dest := range task.Destinations { totalDestImpact.AddInPlace(dest.StorageChange) } glog.V(2).Infof("Task %s assigned and capacity reserved: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)", taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots, len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots) return nil } // CompleteTask moves a task from assigned to recent and releases reserved capacity // NOTE: This only releases the reserved capacity. The actual topology update (VolumeCount changes) // should be handled by the master when it receives the task completion notification. func (at *ActiveTopology) CompleteTask(taskID string) error { at.mutex.Lock() defer at.mutex.Unlock() task, exists := at.assignedTasks[taskID] if !exists { return fmt.Errorf("assigned task %s not found", taskID) } // Release reserved capacity by moving task to completed state delete(at.assignedTasks, taskID) task.Status = TaskStatusCompleted task.CompletedAt = time.Now() at.recentTasks[taskID] = task at.reassignTaskStates() // Log capacity release information for all sources and destinations totalSourceImpact := StorageSlotChange{} totalDestImpact := StorageSlotChange{} for _, source := range task.Sources { totalSourceImpact.AddInPlace(source.StorageChange) } for _, dest := range task.Destinations { totalDestImpact.AddInPlace(dest.StorageChange) } glog.V(2).Infof("Task %s completed and capacity released: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)", taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots, len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots) // Clean up old recent tasks at.cleanupRecentTasks() return nil } // ApplyActualStorageChange updates the topology to reflect actual storage changes after task completion // This should be called when the master updates the topology with new VolumeCount information func (at *ActiveTopology) ApplyActualStorageChange(nodeID string, diskID uint32, volumeCountChange int64) { at.mutex.Lock() defer at.mutex.Unlock() diskKey := fmt.Sprintf("%s:%d", nodeID, diskID) if disk, exists := at.disks[diskKey]; exists && disk.DiskInfo != nil && disk.DiskInfo.DiskInfo != nil { oldCount := disk.DiskInfo.DiskInfo.VolumeCount disk.DiskInfo.DiskInfo.VolumeCount += volumeCountChange glog.V(2).Infof("Applied actual storage change on disk %s: volume_count %d -> %d (change: %+d)", diskKey, oldCount, disk.DiskInfo.DiskInfo.VolumeCount, volumeCountChange) } } // AddPendingTask is the unified function that handles both simple and complex task creation func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error { // Validation if len(spec.Sources) == 0 { return fmt.Errorf("at least one source is required") } if len(spec.Destinations) == 0 { return fmt.Errorf("at least one destination is required") } at.mutex.Lock() defer at.mutex.Unlock() // Build sources array sources := make([]TaskSource, len(spec.Sources)) for i, sourceSpec := range spec.Sources { var storageImpact StorageSlotChange var estimatedSize int64 if sourceSpec.StorageImpact != nil { // Use manually specified impact storageImpact = *sourceSpec.StorageImpact } else { // Auto-calculate based on task type and cleanup type storageImpact = at.calculateSourceStorageImpact(spec.TaskType, sourceSpec.CleanupType, spec.VolumeSize) } if sourceSpec.EstimatedSize != nil { estimatedSize = *sourceSpec.EstimatedSize } else { estimatedSize = spec.VolumeSize // Default to volume size } sources[i] = TaskSource{ SourceServer: sourceSpec.ServerID, SourceDisk: sourceSpec.DiskID, StorageChange: storageImpact, EstimatedSize: estimatedSize, } } // Build destinations array destinations := make([]TaskDestination, len(spec.Destinations)) for i, destSpec := range spec.Destinations { var storageImpact StorageSlotChange var estimatedSize int64 if destSpec.StorageImpact != nil { // Use manually specified impact storageImpact = *destSpec.StorageImpact } else { // Auto-calculate based on task type _, storageImpact = CalculateTaskStorageImpact(spec.TaskType, spec.VolumeSize) } if destSpec.EstimatedSize != nil { estimatedSize = *destSpec.EstimatedSize } else { estimatedSize = spec.VolumeSize // Default to volume size } destinations[i] = TaskDestination{ TargetServer: destSpec.ServerID, TargetDisk: destSpec.DiskID, StorageChange: storageImpact, EstimatedSize: estimatedSize, } } // Create the task task := &taskState{ VolumeID: spec.VolumeID, TaskType: spec.TaskType, Status: TaskStatusPending, StartedAt: time.Now(), EstimatedSize: spec.VolumeSize, Sources: sources, Destinations: destinations, } at.pendingTasks[spec.TaskID] = task at.assignTaskToDisk(task) glog.V(2).Infof("Added pending %s task %s: volume %d, %d sources, %d destinations", spec.TaskType, spec.TaskID, spec.VolumeID, len(sources), len(destinations)) return nil } // calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange { switch taskType { case TaskTypeErasureCoding: switch cleanupType { case CleanupVolumeReplica: impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize) return impact case CleanupECShards: return CalculateECShardCleanupImpact(volumeSize) default: impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize) return impact } default: impact, _ := CalculateTaskStorageImpact(taskType, volumeSize) return impact } } // SourceCleanupType indicates what type of data needs to be cleaned up from a source type SourceCleanupType int const ( CleanupVolumeReplica SourceCleanupType = iota // Clean up volume replica (frees volume slots) CleanupECShards // Clean up existing EC shards (frees shard slots) ) // TaskSourceSpec represents a source specification for task creation type TaskSourceSpec struct { ServerID string DiskID uint32 CleanupType SourceCleanupType // For EC: volume replica vs existing shards StorageImpact *StorageSlotChange // Optional: manual override EstimatedSize *int64 // Optional: manual override } // TaskDestinationSpec represents a destination specification for task creation type TaskDestinationSpec struct { ServerID string DiskID uint32 StorageImpact *StorageSlotChange // Optional: manual override EstimatedSize *int64 // Optional: manual override } // TaskSpec represents a complete task specification type TaskSpec struct { TaskID string TaskType TaskType VolumeID uint32 VolumeSize int64 // Used for auto-calculation when manual impacts not provided Sources []TaskSourceSpec // Can be single or multiple Destinations []TaskDestinationSpec // Can be single or multiple } // TaskSourceLocation represents a source location for task creation (DEPRECATED: use TaskSourceSpec) type TaskSourceLocation struct { ServerID string DiskID uint32 CleanupType SourceCleanupType // What type of cleanup is needed }