You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
264 lines
8.9 KiB
264 lines
8.9 KiB
package topology
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
)
|
|
|
|
// AssignTask moves a task from pending to assigned and reserves capacity
|
|
func (at *ActiveTopology) AssignTask(taskID string) error {
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
task, exists := at.pendingTasks[taskID]
|
|
if !exists {
|
|
return fmt.Errorf("pending task %s not found", taskID)
|
|
}
|
|
|
|
// Check if all destination disks have sufficient capacity to reserve
|
|
for _, dest := range task.Destinations {
|
|
targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk)
|
|
if targetDisk, exists := at.disks[targetKey]; exists {
|
|
availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk)
|
|
|
|
// Check if we have enough total capacity using the improved unified comparison
|
|
if !availableCapacity.CanAccommodate(dest.StorageChange) {
|
|
return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v",
|
|
dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange)
|
|
}
|
|
} else if dest.TargetServer != "" {
|
|
// Fail fast if destination disk is not found in topology
|
|
return fmt.Errorf("destination disk %s not found in topology", targetKey)
|
|
}
|
|
}
|
|
|
|
// Move task to assigned and reserve capacity
|
|
delete(at.pendingTasks, taskID)
|
|
task.Status = TaskStatusInProgress
|
|
at.assignedTasks[taskID] = task
|
|
at.reassignTaskStates()
|
|
|
|
// Log capacity reservation information for all sources and destinations
|
|
totalSourceImpact := StorageSlotChange{}
|
|
totalDestImpact := StorageSlotChange{}
|
|
for _, source := range task.Sources {
|
|
totalSourceImpact.AddInPlace(source.StorageChange)
|
|
}
|
|
for _, dest := range task.Destinations {
|
|
totalDestImpact.AddInPlace(dest.StorageChange)
|
|
}
|
|
|
|
glog.V(2).Infof("Task %s assigned and capacity reserved: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
|
|
taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
|
|
len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
|
|
|
|
return nil
|
|
}
|
|
|
|
// CompleteTask moves a task from assigned to recent and releases reserved capacity
|
|
// NOTE: This only releases the reserved capacity. The actual topology update (VolumeCount changes)
|
|
// should be handled by the master when it receives the task completion notification.
|
|
func (at *ActiveTopology) CompleteTask(taskID string) error {
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
task, exists := at.assignedTasks[taskID]
|
|
if !exists {
|
|
return fmt.Errorf("assigned task %s not found", taskID)
|
|
}
|
|
|
|
// Release reserved capacity by moving task to completed state
|
|
delete(at.assignedTasks, taskID)
|
|
task.Status = TaskStatusCompleted
|
|
task.CompletedAt = time.Now()
|
|
at.recentTasks[taskID] = task
|
|
at.reassignTaskStates()
|
|
|
|
// Log capacity release information for all sources and destinations
|
|
totalSourceImpact := StorageSlotChange{}
|
|
totalDestImpact := StorageSlotChange{}
|
|
for _, source := range task.Sources {
|
|
totalSourceImpact.AddInPlace(source.StorageChange)
|
|
}
|
|
for _, dest := range task.Destinations {
|
|
totalDestImpact.AddInPlace(dest.StorageChange)
|
|
}
|
|
|
|
glog.V(2).Infof("Task %s completed and capacity released: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
|
|
taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
|
|
len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
|
|
|
|
// Clean up old recent tasks
|
|
at.cleanupRecentTasks()
|
|
|
|
return nil
|
|
}
|
|
|
|
// ApplyActualStorageChange updates the topology to reflect actual storage changes after task completion
|
|
// This should be called when the master updates the topology with new VolumeCount information
|
|
func (at *ActiveTopology) ApplyActualStorageChange(nodeID string, diskID uint32, volumeCountChange int64) {
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
|
|
if disk, exists := at.disks[diskKey]; exists && disk.DiskInfo != nil && disk.DiskInfo.DiskInfo != nil {
|
|
oldCount := disk.DiskInfo.DiskInfo.VolumeCount
|
|
disk.DiskInfo.DiskInfo.VolumeCount += volumeCountChange
|
|
|
|
glog.V(2).Infof("Applied actual storage change on disk %s: volume_count %d -> %d (change: %+d)",
|
|
diskKey, oldCount, disk.DiskInfo.DiskInfo.VolumeCount, volumeCountChange)
|
|
}
|
|
}
|
|
|
|
// AddPendingTask is the unified function that handles both simple and complex task creation
|
|
func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error {
|
|
// Validation
|
|
if len(spec.Sources) == 0 {
|
|
return fmt.Errorf("at least one source is required")
|
|
}
|
|
if len(spec.Destinations) == 0 {
|
|
return fmt.Errorf("at least one destination is required")
|
|
}
|
|
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
// Build sources array
|
|
sources := make([]TaskSource, len(spec.Sources))
|
|
for i, sourceSpec := range spec.Sources {
|
|
var storageImpact StorageSlotChange
|
|
var estimatedSize int64
|
|
|
|
if sourceSpec.StorageImpact != nil {
|
|
// Use manually specified impact
|
|
storageImpact = *sourceSpec.StorageImpact
|
|
} else {
|
|
// Auto-calculate based on task type and cleanup type
|
|
storageImpact = at.calculateSourceStorageImpact(spec.TaskType, sourceSpec.CleanupType, spec.VolumeSize)
|
|
}
|
|
|
|
if sourceSpec.EstimatedSize != nil {
|
|
estimatedSize = *sourceSpec.EstimatedSize
|
|
} else {
|
|
estimatedSize = spec.VolumeSize // Default to volume size
|
|
}
|
|
|
|
sources[i] = TaskSource{
|
|
SourceServer: sourceSpec.ServerID,
|
|
SourceDisk: sourceSpec.DiskID,
|
|
StorageChange: storageImpact,
|
|
EstimatedSize: estimatedSize,
|
|
}
|
|
}
|
|
|
|
// Build destinations array
|
|
destinations := make([]TaskDestination, len(spec.Destinations))
|
|
for i, destSpec := range spec.Destinations {
|
|
var storageImpact StorageSlotChange
|
|
var estimatedSize int64
|
|
|
|
if destSpec.StorageImpact != nil {
|
|
// Use manually specified impact
|
|
storageImpact = *destSpec.StorageImpact
|
|
} else {
|
|
// Auto-calculate based on task type
|
|
_, storageImpact = CalculateTaskStorageImpact(spec.TaskType, spec.VolumeSize)
|
|
}
|
|
|
|
if destSpec.EstimatedSize != nil {
|
|
estimatedSize = *destSpec.EstimatedSize
|
|
} else {
|
|
estimatedSize = spec.VolumeSize // Default to volume size
|
|
}
|
|
|
|
destinations[i] = TaskDestination{
|
|
TargetServer: destSpec.ServerID,
|
|
TargetDisk: destSpec.DiskID,
|
|
StorageChange: storageImpact,
|
|
EstimatedSize: estimatedSize,
|
|
}
|
|
}
|
|
|
|
// Create the task
|
|
task := &taskState{
|
|
VolumeID: spec.VolumeID,
|
|
TaskType: spec.TaskType,
|
|
Status: TaskStatusPending,
|
|
StartedAt: time.Now(),
|
|
EstimatedSize: spec.VolumeSize,
|
|
Sources: sources,
|
|
Destinations: destinations,
|
|
}
|
|
|
|
at.pendingTasks[spec.TaskID] = task
|
|
at.assignTaskToDisk(task)
|
|
|
|
glog.V(2).Infof("Added pending %s task %s: volume %d, %d sources, %d destinations",
|
|
spec.TaskType, spec.TaskID, spec.VolumeID, len(sources), len(destinations))
|
|
|
|
return nil
|
|
}
|
|
|
|
// calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type
|
|
func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange {
|
|
switch taskType {
|
|
case TaskTypeErasureCoding:
|
|
switch cleanupType {
|
|
case CleanupVolumeReplica:
|
|
impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
|
|
return impact
|
|
case CleanupECShards:
|
|
return CalculateECShardCleanupImpact(volumeSize)
|
|
default:
|
|
impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
|
|
return impact
|
|
}
|
|
default:
|
|
impact, _ := CalculateTaskStorageImpact(taskType, volumeSize)
|
|
return impact
|
|
}
|
|
}
|
|
|
|
// SourceCleanupType indicates what type of data needs to be cleaned up from a source
|
|
type SourceCleanupType int
|
|
|
|
const (
|
|
CleanupVolumeReplica SourceCleanupType = iota // Clean up volume replica (frees volume slots)
|
|
CleanupECShards // Clean up existing EC shards (frees shard slots)
|
|
)
|
|
|
|
// TaskSourceSpec represents a source specification for task creation
|
|
type TaskSourceSpec struct {
|
|
ServerID string
|
|
DiskID uint32
|
|
CleanupType SourceCleanupType // For EC: volume replica vs existing shards
|
|
StorageImpact *StorageSlotChange // Optional: manual override
|
|
EstimatedSize *int64 // Optional: manual override
|
|
}
|
|
|
|
// TaskDestinationSpec represents a destination specification for task creation
|
|
type TaskDestinationSpec struct {
|
|
ServerID string
|
|
DiskID uint32
|
|
StorageImpact *StorageSlotChange // Optional: manual override
|
|
EstimatedSize *int64 // Optional: manual override
|
|
}
|
|
|
|
// TaskSpec represents a complete task specification
|
|
type TaskSpec struct {
|
|
TaskID string
|
|
TaskType TaskType
|
|
VolumeID uint32
|
|
VolumeSize int64 // Used for auto-calculation when manual impacts not provided
|
|
Sources []TaskSourceSpec // Can be single or multiple
|
|
Destinations []TaskDestinationSpec // Can be single or multiple
|
|
}
|
|
|
|
// TaskSourceLocation represents a source location for task creation (DEPRECATED: use TaskSourceSpec)
|
|
type TaskSourceLocation struct {
|
|
ServerID string
|
|
DiskID uint32
|
|
CleanupType SourceCleanupType // What type of cleanup is needed
|
|
}
|