You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
300 lines
10 KiB
300 lines
10 KiB
package topology
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
)
|
|
|
|
// GetEffectiveAvailableCapacity returns the effective available capacity for a disk
|
|
// This considers BOTH pending and assigned tasks for capacity reservation.
|
|
//
|
|
// Formula: BaseAvailable - (VolumeSlots + ShardSlots/ShardsPerVolumeSlot) from all tasks
|
|
//
|
|
// The calculation includes:
|
|
// - Pending tasks: Reserve capacity immediately when added
|
|
// - Assigned tasks: Continue to reserve capacity during execution
|
|
// - Recently completed tasks are NOT counted against capacity
|
|
func (at *ActiveTopology) GetEffectiveAvailableCapacity(nodeID string, diskID uint32) int64 {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
|
|
disk, exists := at.disks[diskKey]
|
|
if !exists {
|
|
return 0
|
|
}
|
|
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return 0
|
|
}
|
|
|
|
// Use the same logic as getEffectiveAvailableCapacityUnsafe but with locking
|
|
capacity := at.getEffectiveAvailableCapacityUnsafe(disk)
|
|
return int64(capacity.VolumeSlots)
|
|
}
|
|
|
|
// GetEffectiveAvailableCapacityDetailed returns detailed available capacity as StorageSlotChange
|
|
// This provides granular information about available volume slots and shard slots
|
|
func (at *ActiveTopology) GetEffectiveAvailableCapacityDetailed(nodeID string, diskID uint32) StorageSlotChange {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
|
|
disk, exists := at.disks[diskKey]
|
|
if !exists {
|
|
return StorageSlotChange{}
|
|
}
|
|
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return StorageSlotChange{}
|
|
}
|
|
|
|
return at.getEffectiveAvailableCapacityUnsafe(disk)
|
|
}
|
|
|
|
// GetEffectiveCapacityImpact returns the StorageSlotChange impact for a disk
|
|
// This shows the net impact from all pending and assigned tasks
|
|
func (at *ActiveTopology) GetEffectiveCapacityImpact(nodeID string, diskID uint32) StorageSlotChange {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
|
|
disk, exists := at.disks[diskKey]
|
|
if !exists {
|
|
return StorageSlotChange{}
|
|
}
|
|
|
|
return at.getEffectiveCapacityUnsafe(disk)
|
|
}
|
|
|
|
// GetDisksWithEffectiveCapacity returns disks with sufficient effective capacity
|
|
// This method considers BOTH pending and assigned tasks for capacity reservation using StorageSlotChange.
|
|
//
|
|
// Parameters:
|
|
// - taskType: type of task to check compatibility for
|
|
// - excludeNodeID: node to exclude from results
|
|
// - minCapacity: minimum effective capacity required (in volume slots)
|
|
//
|
|
// Returns: DiskInfo objects where VolumeCount reflects capacity reserved by all tasks
|
|
func (at *ActiveTopology) GetDisksWithEffectiveCapacity(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
var available []*DiskInfo
|
|
|
|
for _, disk := range at.disks {
|
|
if disk.NodeID == excludeNodeID {
|
|
continue // Skip excluded node
|
|
}
|
|
|
|
if at.isDiskAvailable(disk, taskType) {
|
|
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
|
|
|
|
// Only include disks that meet minimum capacity requirement
|
|
if int64(effectiveCapacity.VolumeSlots) >= minCapacity {
|
|
// Create a new DiskInfo with current capacity information
|
|
diskCopy := DiskInfo{
|
|
NodeID: disk.DiskInfo.NodeID,
|
|
DiskID: disk.DiskInfo.DiskID,
|
|
DiskType: disk.DiskInfo.DiskType,
|
|
DataCenter: disk.DiskInfo.DataCenter,
|
|
Rack: disk.DiskInfo.Rack,
|
|
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks
|
|
}
|
|
|
|
// Create a new protobuf DiskInfo to avoid modifying the original
|
|
diskInfoCopy := &master_pb.DiskInfo{
|
|
DiskId: disk.DiskInfo.DiskInfo.DiskId,
|
|
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
|
|
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots),
|
|
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
|
|
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
|
|
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
|
|
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
|
|
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
|
|
}
|
|
diskCopy.DiskInfo = diskInfoCopy
|
|
|
|
available = append(available, &diskCopy)
|
|
}
|
|
}
|
|
}
|
|
|
|
return available
|
|
}
|
|
|
|
// GetDisksForPlanning returns disks considering both active and pending tasks for planning decisions
|
|
// This helps avoid over-scheduling tasks to the same disk
|
|
func (at *ActiveTopology) GetDisksForPlanning(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
var available []*DiskInfo
|
|
|
|
for _, disk := range at.disks {
|
|
if disk.NodeID == excludeNodeID {
|
|
continue // Skip excluded node
|
|
}
|
|
|
|
// Consider both pending and active tasks for scheduling decisions
|
|
if at.isDiskAvailableForPlanning(disk, taskType) {
|
|
// Check if disk can accommodate new task considering pending tasks
|
|
planningCapacity := at.getPlanningCapacityUnsafe(disk)
|
|
|
|
if int64(planningCapacity.VolumeSlots) >= minCapacity {
|
|
// Create a new DiskInfo with planning information
|
|
diskCopy := DiskInfo{
|
|
NodeID: disk.DiskInfo.NodeID,
|
|
DiskID: disk.DiskInfo.DiskID,
|
|
DiskType: disk.DiskInfo.DiskType,
|
|
DataCenter: disk.DiskInfo.DataCenter,
|
|
Rack: disk.DiskInfo.Rack,
|
|
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks),
|
|
}
|
|
|
|
// Create a new protobuf DiskInfo to avoid modifying the original
|
|
diskInfoCopy := &master_pb.DiskInfo{
|
|
DiskId: disk.DiskInfo.DiskInfo.DiskId,
|
|
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
|
|
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(planningCapacity.VolumeSlots),
|
|
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
|
|
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
|
|
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
|
|
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
|
|
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
|
|
}
|
|
diskCopy.DiskInfo = diskInfoCopy
|
|
|
|
available = append(available, &diskCopy)
|
|
}
|
|
}
|
|
}
|
|
|
|
return available
|
|
}
|
|
|
|
// CanAccommodateTask checks if a disk can accommodate a new task considering all constraints
|
|
func (at *ActiveTopology) CanAccommodateTask(nodeID string, diskID uint32, taskType TaskType, volumesNeeded int64) bool {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
|
|
disk, exists := at.disks[diskKey]
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
// Check basic availability
|
|
if !at.isDiskAvailable(disk, taskType) {
|
|
return false
|
|
}
|
|
|
|
// Check effective capacity
|
|
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
|
|
return int64(effectiveCapacity.VolumeSlots) >= volumesNeeded
|
|
}
|
|
|
|
// getPlanningCapacityUnsafe considers both pending and active tasks for planning
|
|
func (at *ActiveTopology) getPlanningCapacityUnsafe(disk *activeDisk) StorageSlotChange {
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return StorageSlotChange{}
|
|
}
|
|
|
|
baseAvailableVolumes := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
|
|
|
|
// Use the centralized helper function to calculate task storage impact
|
|
totalImpact := at.calculateTaskStorageImpact(disk)
|
|
|
|
// Calculate available capacity considering impact (negative impact reduces availability)
|
|
availableVolumeSlots := baseAvailableVolumes - totalImpact.ToVolumeSlots()
|
|
if availableVolumeSlots < 0 {
|
|
availableVolumeSlots = 0
|
|
}
|
|
|
|
// Return detailed capacity information
|
|
return StorageSlotChange{
|
|
VolumeSlots: int32(availableVolumeSlots),
|
|
ShardSlots: -totalImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
|
|
}
|
|
}
|
|
|
|
// isDiskAvailableForPlanning checks if disk can accept new tasks considering pending load
|
|
func (at *ActiveTopology) isDiskAvailableForPlanning(disk *activeDisk, taskType TaskType) bool {
|
|
// Check total load including pending tasks
|
|
totalLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
|
|
if totalLoad >= MaxTotalTaskLoadPerDisk {
|
|
return false
|
|
}
|
|
|
|
// Check for conflicting task types in active tasks only
|
|
for _, task := range disk.assignedTasks {
|
|
if at.areTaskTypesConflicting(task.TaskType, taskType) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// calculateTaskStorageImpact is a helper function that calculates the total storage impact
|
|
// from all tasks (pending and assigned) on a given disk. This eliminates code duplication
|
|
// between multiple capacity calculation functions.
|
|
func (at *ActiveTopology) calculateTaskStorageImpact(disk *activeDisk) StorageSlotChange {
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return StorageSlotChange{}
|
|
}
|
|
|
|
totalImpact := StorageSlotChange{}
|
|
|
|
// Process both pending and assigned tasks with identical logic
|
|
taskLists := [][]*taskState{disk.pendingTasks, disk.assignedTasks}
|
|
|
|
for _, taskList := range taskLists {
|
|
for _, task := range taskList {
|
|
// Calculate impact for all source locations
|
|
for _, source := range task.Sources {
|
|
if source.SourceServer == disk.NodeID && source.SourceDisk == disk.DiskID {
|
|
totalImpact.AddInPlace(source.StorageChange)
|
|
}
|
|
}
|
|
|
|
// Calculate impact for all destination locations
|
|
for _, dest := range task.Destinations {
|
|
if dest.TargetServer == disk.NodeID && dest.TargetDisk == disk.DiskID {
|
|
totalImpact.AddInPlace(dest.StorageChange)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return totalImpact
|
|
}
|
|
|
|
// getEffectiveCapacityUnsafe returns effective capacity impact without locking (for internal use)
|
|
// Returns StorageSlotChange representing the net impact from all tasks
|
|
func (at *ActiveTopology) getEffectiveCapacityUnsafe(disk *activeDisk) StorageSlotChange {
|
|
return at.calculateTaskStorageImpact(disk)
|
|
}
|
|
|
|
// getEffectiveAvailableCapacityUnsafe returns detailed available capacity as StorageSlotChange
|
|
func (at *ActiveTopology) getEffectiveAvailableCapacityUnsafe(disk *activeDisk) StorageSlotChange {
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return StorageSlotChange{}
|
|
}
|
|
|
|
baseAvailable := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
|
|
netImpact := at.getEffectiveCapacityUnsafe(disk)
|
|
|
|
// Calculate available volume slots (negative impact reduces availability)
|
|
availableVolumeSlots := baseAvailable - netImpact.ToVolumeSlots()
|
|
if availableVolumeSlots < 0 {
|
|
availableVolumeSlots = 0
|
|
}
|
|
|
|
// Return detailed capacity information
|
|
return StorageSlotChange{
|
|
VolumeSlots: int32(availableVolumeSlots),
|
|
ShardSlots: -netImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
|
|
}
|
|
}
|