You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
153 lines
4.5 KiB
153 lines
4.5 KiB
package topology
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// reassignTaskStates assigns tasks to the appropriate disks
|
|
func (at *ActiveTopology) reassignTaskStates() {
|
|
// Clear existing task assignments
|
|
for _, disk := range at.disks {
|
|
disk.pendingTasks = nil
|
|
disk.assignedTasks = nil
|
|
disk.recentTasks = nil
|
|
}
|
|
|
|
// Reassign pending tasks
|
|
for _, task := range at.pendingTasks {
|
|
at.assignTaskToDisk(task)
|
|
}
|
|
|
|
// Reassign assigned tasks
|
|
for _, task := range at.assignedTasks {
|
|
at.assignTaskToDisk(task)
|
|
}
|
|
|
|
// Reassign recent tasks
|
|
for _, task := range at.recentTasks {
|
|
at.assignTaskToDisk(task)
|
|
}
|
|
}
|
|
|
|
// assignTaskToDisk assigns a task to the appropriate disk(s)
|
|
func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
|
|
addedDisks := make(map[string]bool)
|
|
|
|
// Local helper function to assign task to a disk and avoid code duplication
|
|
assign := func(server string, diskID uint32) {
|
|
key := fmt.Sprintf("%s:%d", server, diskID)
|
|
if server == "" || addedDisks[key] {
|
|
return
|
|
}
|
|
if disk, exists := at.disks[key]; exists {
|
|
switch task.Status {
|
|
case TaskStatusPending:
|
|
disk.pendingTasks = append(disk.pendingTasks, task)
|
|
case TaskStatusInProgress:
|
|
disk.assignedTasks = append(disk.assignedTasks, task)
|
|
case TaskStatusCompleted:
|
|
disk.recentTasks = append(disk.recentTasks, task)
|
|
}
|
|
addedDisks[key] = true
|
|
}
|
|
}
|
|
|
|
// Assign to all source disks
|
|
for _, source := range task.Sources {
|
|
assign(source.SourceServer, source.SourceDisk)
|
|
}
|
|
|
|
// Assign to all destination disks (duplicates automatically avoided by helper)
|
|
for _, dest := range task.Destinations {
|
|
assign(dest.TargetServer, dest.TargetDisk)
|
|
}
|
|
}
|
|
|
|
// isDiskAvailable checks if a disk can accept new tasks (general availability)
|
|
func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
|
|
// Check if disk has too many pending and active tasks
|
|
activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
|
|
if activeLoad >= MaxConcurrentTasksPerDisk {
|
|
return false
|
|
}
|
|
|
|
// For general availability, only check disk capacity
|
|
// Volume-specific conflicts are checked in isDiskAvailableForVolume
|
|
return true
|
|
}
|
|
|
|
// isDiskAvailableForVolume checks if a disk can accept a new task for a specific volume
|
|
func (at *ActiveTopology) isDiskAvailableForVolume(disk *activeDisk, taskType TaskType, volumeID uint32) bool {
|
|
// Check basic availability first
|
|
if !at.isDiskAvailable(disk, taskType) {
|
|
return false
|
|
}
|
|
|
|
// Check for volume-specific conflicts in ALL task states:
|
|
// 1. Pending tasks (queued but not yet started)
|
|
for _, task := range disk.pendingTasks {
|
|
if at.areTasksConflicting(task, taskType, volumeID) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// 2. Assigned/Active tasks (currently running)
|
|
for _, task := range disk.assignedTasks {
|
|
if at.areTasksConflicting(task, taskType, volumeID) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// 3. Recent tasks (just completed - avoid immediate re-scheduling on same volume)
|
|
for _, task := range disk.recentTasks {
|
|
if at.areTasksConflicting(task, taskType, volumeID) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// areTasksConflicting checks if a new task conflicts with an existing task
|
|
func (at *ActiveTopology) areTasksConflicting(existingTask *taskState, newTaskType TaskType, newVolumeID uint32) bool {
|
|
// PRIMARY RULE: Tasks on the same volume always conflict (prevents race conditions)
|
|
if existingTask.VolumeID == newVolumeID {
|
|
return true
|
|
}
|
|
|
|
// SECONDARY RULE: Some task types may have global conflicts (rare cases)
|
|
return at.areTaskTypesGloballyConflicting(existingTask.TaskType, newTaskType)
|
|
}
|
|
|
|
// areTaskTypesGloballyConflicting checks for rare global task type conflicts
|
|
// These should be minimal - most conflicts should be volume-specific
|
|
func (at *ActiveTopology) areTaskTypesGloballyConflicting(existing, new TaskType) bool {
|
|
// Define very limited global conflicts (cross-volume conflicts)
|
|
// Most conflicts should be volume-based, not global
|
|
globalConflictMap := map[TaskType][]TaskType{
|
|
// Example: Some hypothetical global resource conflicts could go here
|
|
// Currently empty - volume-based conflicts are sufficient
|
|
}
|
|
|
|
if conflicts, exists := globalConflictMap[existing]; exists {
|
|
for _, conflictType := range conflicts {
|
|
if conflictType == new {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// cleanupRecentTasks removes old recent tasks
|
|
func (at *ActiveTopology) cleanupRecentTasks() {
|
|
cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second)
|
|
|
|
for taskID, task := range at.recentTasks {
|
|
if task.CompletedAt.Before(cutoff) {
|
|
delete(at.recentTasks, taskID)
|
|
}
|
|
}
|
|
}
|