You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

300 lines
10 KiB

package topology
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// GetEffectiveAvailableCapacity returns the effective available capacity for a disk
// This considers BOTH pending and assigned tasks for capacity reservation.
//
// Formula: BaseAvailable - (VolumeSlots + ShardSlots/ShardsPerVolumeSlot) from all tasks
//
// The calculation includes:
// - Pending tasks: Reserve capacity immediately when added
// - Assigned tasks: Continue to reserve capacity during execution
// - Recently completed tasks are NOT counted against capacity
func (at *ActiveTopology) GetEffectiveAvailableCapacity(nodeID string, diskID uint32) int64 {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return 0
}
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return 0
}
// Use the same logic as getEffectiveAvailableCapacityUnsafe but with locking
capacity := at.getEffectiveAvailableCapacityUnsafe(disk)
return int64(capacity.VolumeSlots)
}
// GetEffectiveAvailableCapacityDetailed returns detailed available capacity as StorageSlotChange
// This provides granular information about available volume slots and shard slots
func (at *ActiveTopology) GetEffectiveAvailableCapacityDetailed(nodeID string, diskID uint32) StorageSlotChange {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return StorageSlotChange{}
}
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
return at.getEffectiveAvailableCapacityUnsafe(disk)
}
// GetEffectiveCapacityImpact returns the StorageSlotChange impact for a disk
// This shows the net impact from all pending and assigned tasks
func (at *ActiveTopology) GetEffectiveCapacityImpact(nodeID string, diskID uint32) StorageSlotChange {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return StorageSlotChange{}
}
return at.getEffectiveCapacityUnsafe(disk)
}
// GetDisksWithEffectiveCapacity returns disks with sufficient effective capacity
// This method considers BOTH pending and assigned tasks for capacity reservation using StorageSlotChange.
//
// Parameters:
// - taskType: type of task to check compatibility for
// - excludeNodeID: node to exclude from results
// - minCapacity: minimum effective capacity required (in volume slots)
//
// Returns: DiskInfo objects where VolumeCount reflects capacity reserved by all tasks
func (at *ActiveTopology) GetDisksWithEffectiveCapacity(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
if at.isDiskAvailable(disk, taskType) {
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
// Only include disks that meet minimum capacity requirement
if int64(effectiveCapacity.VolumeSlots) >= minCapacity {
// Create a new DiskInfo with current capacity information
diskCopy := DiskInfo{
NodeID: disk.DiskInfo.NodeID,
DiskID: disk.DiskInfo.DiskID,
DiskType: disk.DiskInfo.DiskType,
DataCenter: disk.DiskInfo.DataCenter,
Rack: disk.DiskInfo.Rack,
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks
}
// Create a new protobuf DiskInfo to avoid modifying the original
diskInfoCopy := &master_pb.DiskInfo{
DiskId: disk.DiskInfo.DiskInfo.DiskId,
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots),
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
}
diskCopy.DiskInfo = diskInfoCopy
available = append(available, &diskCopy)
}
}
}
return available
}
// GetDisksForPlanning returns disks considering both active and pending tasks for planning decisions
// This helps avoid over-scheduling tasks to the same disk
func (at *ActiveTopology) GetDisksForPlanning(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
// Consider both pending and active tasks for scheduling decisions
if at.isDiskAvailableForPlanning(disk, taskType) {
// Check if disk can accommodate new task considering pending tasks
planningCapacity := at.getPlanningCapacityUnsafe(disk)
if int64(planningCapacity.VolumeSlots) >= minCapacity {
// Create a new DiskInfo with planning information
diskCopy := DiskInfo{
NodeID: disk.DiskInfo.NodeID,
DiskID: disk.DiskInfo.DiskID,
DiskType: disk.DiskInfo.DiskType,
DataCenter: disk.DiskInfo.DataCenter,
Rack: disk.DiskInfo.Rack,
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks),
}
// Create a new protobuf DiskInfo to avoid modifying the original
diskInfoCopy := &master_pb.DiskInfo{
DiskId: disk.DiskInfo.DiskInfo.DiskId,
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(planningCapacity.VolumeSlots),
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
}
diskCopy.DiskInfo = diskInfoCopy
available = append(available, &diskCopy)
}
}
}
return available
}
// CanAccommodateTask checks if a disk can accommodate a new task considering all constraints
func (at *ActiveTopology) CanAccommodateTask(nodeID string, diskID uint32, taskType TaskType, volumesNeeded int64) bool {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return false
}
// Check basic availability
if !at.isDiskAvailable(disk, taskType) {
return false
}
// Check effective capacity
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
return int64(effectiveCapacity.VolumeSlots) >= volumesNeeded
}
// getPlanningCapacityUnsafe considers both pending and active tasks for planning
func (at *ActiveTopology) getPlanningCapacityUnsafe(disk *activeDisk) StorageSlotChange {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
baseAvailableVolumes := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
// Use the centralized helper function to calculate task storage impact
totalImpact := at.calculateTaskStorageImpact(disk)
// Calculate available capacity considering impact (negative impact reduces availability)
availableVolumeSlots := baseAvailableVolumes - totalImpact.ToVolumeSlots()
if availableVolumeSlots < 0 {
availableVolumeSlots = 0
}
// Return detailed capacity information
return StorageSlotChange{
VolumeSlots: int32(availableVolumeSlots),
ShardSlots: -totalImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
}
}
// isDiskAvailableForPlanning checks if disk can accept new tasks considering pending load
func (at *ActiveTopology) isDiskAvailableForPlanning(disk *activeDisk, taskType TaskType) bool {
// Check total load including pending tasks
totalLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
if totalLoad >= MaxTotalTaskLoadPerDisk {
return false
}
// Check for conflicting task types in active tasks only
for _, task := range disk.assignedTasks {
if at.areTaskTypesConflicting(task.TaskType, taskType) {
return false
}
}
return true
}
// calculateTaskStorageImpact is a helper function that calculates the total storage impact
// from all tasks (pending and assigned) on a given disk. This eliminates code duplication
// between multiple capacity calculation functions.
func (at *ActiveTopology) calculateTaskStorageImpact(disk *activeDisk) StorageSlotChange {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
totalImpact := StorageSlotChange{}
// Process both pending and assigned tasks with identical logic
taskLists := [][]*taskState{disk.pendingTasks, disk.assignedTasks}
for _, taskList := range taskLists {
for _, task := range taskList {
// Calculate impact for all source locations
for _, source := range task.Sources {
if source.SourceServer == disk.NodeID && source.SourceDisk == disk.DiskID {
totalImpact.AddInPlace(source.StorageChange)
}
}
// Calculate impact for all destination locations
for _, dest := range task.Destinations {
if dest.TargetServer == disk.NodeID && dest.TargetDisk == disk.DiskID {
totalImpact.AddInPlace(dest.StorageChange)
}
}
}
}
return totalImpact
}
// getEffectiveCapacityUnsafe returns effective capacity impact without locking (for internal use)
// Returns StorageSlotChange representing the net impact from all tasks
func (at *ActiveTopology) getEffectiveCapacityUnsafe(disk *activeDisk) StorageSlotChange {
return at.calculateTaskStorageImpact(disk)
}
// getEffectiveAvailableCapacityUnsafe returns detailed available capacity as StorageSlotChange
func (at *ActiveTopology) getEffectiveAvailableCapacityUnsafe(disk *activeDisk) StorageSlotChange {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
baseAvailable := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
netImpact := at.getEffectiveCapacityUnsafe(disk)
// Calculate available volume slots (negative impact reduces availability)
availableVolumeSlots := baseAvailable - netImpact.ToVolumeSlots()
if availableVolumeSlots < 0 {
availableVolumeSlots = 0
}
// Return detailed capacity information
return StorageSlotChange{
VolumeSlots: int32(availableVolumeSlots),
ShardSlots: -netImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
}
}