From 0ecb466eda3bda7d44ebabb22c4842c9f78589c6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 3 Aug 2025 01:35:38 -0700 Subject: [PATCH] Admin: refactoring active topology (#7073) * refactoring * add ec shard size * address comments * passing task id There seems to be a disconnect between the pending tasks created in ActiveTopology and the TaskDetectionResult returned by this function. A taskID is generated locally and used to create pending tasks via AddPendingECShardTask, but this taskID is not stored in the TaskDetectionResult or passed along in any way. This makes it impossible for the worker that eventually executes the task to know which pending task in ActiveTopology it corresponds to. Without the correct taskID, the worker cannot call AssignTask or CompleteTask on the master, breaking the entire task lifecycle and capacity management feature. A potential solution is to add a TaskID field to TaskDetectionResult and worker_pb.TaskParams, ensuring the ID is propagated from detection to execution. * 1 source multiple destinations * task supports multi source and destination * ec needs to clean up previous shards * use erasure coding constants * getPlanningCapacityUnsafe getEffectiveAvailableCapacityUnsafe should return StorageSlotChange for calculation * use CanAccommodate to calculate * remove dead code * address comments * fix Mutex Copying in Protobuf Structs * use constants * fix estimatedSize The calculation for estimatedSize only considers source.EstimatedSize and dest.StorageChange, but omits dest.EstimatedSize. The TaskDestination struct has an EstimatedSize field, which seems to be ignored here. This could lead to an incorrect estimation of the total size of data involved in tasks on a disk. The loop should probably also include estimatedSize += dest.EstimatedSize. * at.assignTaskToDisk(task) * refactoring * Update weed/admin/topology/internal.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fail fast * fix compilation * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * indexes for volume and shard locations * dedup with ToVolumeSlots * return an additional boolean to indicate success, or an error * Update abstract_sql_store.go * fix * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/admin/topology/task_management.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * faster findVolumeDisk * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/admin/topology/storage_slot_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * refactor * simplify * remove unused GetDiskStorageImpact function * refactor * add comments * Update weed/admin/topology/storage_impact.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/admin/topology/storage_slot_test.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update storage_impact.go * AddPendingTask The unified AddPendingTask function now serves as the single entry point for all task creation, successfully consolidating the previously separate functions while maintaining full functionality and improving code organization. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- weed/admin/topology/active_topology.go | 425 +------ weed/admin/topology/active_topology_test.go | 154 ++- weed/admin/topology/capacity.go | 300 +++++ weed/admin/topology/internal.go | 114 ++ weed/admin/topology/storage_impact.go | 50 + weed/admin/topology/storage_slot_test.go | 1004 +++++++++++++++++ weed/admin/topology/structs.go | 120 ++ weed/admin/topology/task_management.go | 264 +++++ weed/admin/topology/topology_management.go | 253 +++++ weed/admin/topology/types.go | 97 ++ weed/filer/abstract_sql/abstract_sql_store.go | 5 +- weed/pb/worker.proto | 1 + weed/pb/worker_pb/worker.pb.go | 13 +- weed/worker/tasks/balance/detection.go | 34 + weed/worker/tasks/base/volume_utils.go | 36 + weed/worker/tasks/erasure_coding/detection.go | 169 ++- weed/worker/tasks/vacuum/detection.go | 6 + weed/worker/types/task_types.go | 1 + 18 files changed, 2579 insertions(+), 467 deletions(-) create mode 100644 weed/admin/topology/capacity.go create mode 100644 weed/admin/topology/internal.go create mode 100644 weed/admin/topology/storage_impact.go create mode 100644 weed/admin/topology/storage_slot_test.go create mode 100644 weed/admin/topology/structs.go create mode 100644 weed/admin/topology/task_management.go create mode 100644 weed/admin/topology/topology_management.go create mode 100644 weed/admin/topology/types.go create mode 100644 weed/worker/tasks/base/volume_utils.go diff --git a/weed/admin/topology/active_topology.go b/weed/admin/topology/active_topology.go index bfa03a72f..e4ef5c5d0 100644 --- a/weed/admin/topology/active_topology.go +++ b/weed/admin/topology/active_topology.go @@ -1,98 +1,5 @@ package topology -import ( - "fmt" - "sync" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" -) - -// TaskType represents different types of maintenance operations -type TaskType string - -// TaskStatus represents the current status of a task -type TaskStatus string - -// Common task type constants -const ( - TaskTypeVacuum TaskType = "vacuum" - TaskTypeBalance TaskType = "balance" - TaskTypeErasureCoding TaskType = "erasure_coding" - TaskTypeReplication TaskType = "replication" -) - -// Common task status constants -const ( - TaskStatusPending TaskStatus = "pending" - TaskStatusInProgress TaskStatus = "in_progress" - TaskStatusCompleted TaskStatus = "completed" -) - -// taskState represents the current state of tasks affecting the topology (internal) -type taskState struct { - VolumeID uint32 `json:"volume_id"` - TaskType TaskType `json:"task_type"` - SourceServer string `json:"source_server"` - SourceDisk uint32 `json:"source_disk"` - TargetServer string `json:"target_server,omitempty"` - TargetDisk uint32 `json:"target_disk,omitempty"` - Status TaskStatus `json:"status"` - StartedAt time.Time `json:"started_at"` - CompletedAt time.Time `json:"completed_at,omitempty"` -} - -// DiskInfo represents a disk with its current state and ongoing tasks (public for external access) -type DiskInfo struct { - NodeID string `json:"node_id"` - DiskID uint32 `json:"disk_id"` - DiskType string `json:"disk_type"` - DataCenter string `json:"data_center"` - Rack string `json:"rack"` - DiskInfo *master_pb.DiskInfo `json:"disk_info"` - LoadCount int `json:"load_count"` // Number of active tasks -} - -// activeDisk represents internal disk state (private) -type activeDisk struct { - *DiskInfo - pendingTasks []*taskState - assignedTasks []*taskState - recentTasks []*taskState // Completed in last N seconds -} - -// activeNode represents a node with its disks (private) -type activeNode struct { - nodeID string - dataCenter string - rack string - nodeInfo *master_pb.DataNodeInfo - disks map[uint32]*activeDisk // DiskID -> activeDisk -} - -// ActiveTopology provides a real-time view of cluster state with task awareness -type ActiveTopology struct { - // Core topology from master - topologyInfo *master_pb.TopologyInfo - lastUpdated time.Time - - // Structured topology for easy access (private) - nodes map[string]*activeNode // NodeID -> activeNode - disks map[string]*activeDisk // "NodeID:DiskID" -> activeDisk - - // Task states affecting the topology (private) - pendingTasks map[string]*taskState - assignedTasks map[string]*taskState - recentTasks map[string]*taskState - - // Configuration - recentTaskWindowSeconds int - - // Synchronization - mutex sync.RWMutex -} - // NewActiveTopology creates a new ActiveTopology instance func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology { if recentTaskWindowSeconds <= 0 { @@ -102,339 +9,11 @@ func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology { return &ActiveTopology{ nodes: make(map[string]*activeNode), disks: make(map[string]*activeDisk), + volumeIndex: make(map[uint32][]string), + ecShardIndex: make(map[uint32][]string), pendingTasks: make(map[string]*taskState), assignedTasks: make(map[string]*taskState), recentTasks: make(map[string]*taskState), recentTaskWindowSeconds: recentTaskWindowSeconds, } } - -// UpdateTopology updates the topology information from master -func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error { - at.mutex.Lock() - defer at.mutex.Unlock() - - at.topologyInfo = topologyInfo - at.lastUpdated = time.Now() - - // Rebuild structured topology - at.nodes = make(map[string]*activeNode) - at.disks = make(map[string]*activeDisk) - - for _, dc := range topologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, nodeInfo := range rack.DataNodeInfos { - node := &activeNode{ - nodeID: nodeInfo.Id, - dataCenter: dc.Id, - rack: rack.Id, - nodeInfo: nodeInfo, - disks: make(map[uint32]*activeDisk), - } - - // Add disks for this node - for diskType, diskInfo := range nodeInfo.DiskInfos { - disk := &activeDisk{ - DiskInfo: &DiskInfo{ - NodeID: nodeInfo.Id, - DiskID: diskInfo.DiskId, - DiskType: diskType, - DataCenter: dc.Id, - Rack: rack.Id, - DiskInfo: diskInfo, - }, - } - - diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId) - node.disks[diskInfo.DiskId] = disk - at.disks[diskKey] = disk - } - - at.nodes[nodeInfo.Id] = node - } - } - } - - // Reassign task states to updated topology - at.reassignTaskStates() - - glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks", len(at.nodes), len(at.disks)) - return nil -} - -// AddPendingTask adds a pending task to the topology -func (at *ActiveTopology) AddPendingTask(taskID string, taskType TaskType, volumeID uint32, - sourceServer string, sourceDisk uint32, targetServer string, targetDisk uint32) { - at.mutex.Lock() - defer at.mutex.Unlock() - - task := &taskState{ - VolumeID: volumeID, - TaskType: taskType, - SourceServer: sourceServer, - SourceDisk: sourceDisk, - TargetServer: targetServer, - TargetDisk: targetDisk, - Status: TaskStatusPending, - StartedAt: time.Now(), - } - - at.pendingTasks[taskID] = task - at.assignTaskToDisk(task) -} - -// AssignTask moves a task from pending to assigned -func (at *ActiveTopology) AssignTask(taskID string) error { - at.mutex.Lock() - defer at.mutex.Unlock() - - task, exists := at.pendingTasks[taskID] - if !exists { - return fmt.Errorf("pending task %s not found", taskID) - } - - delete(at.pendingTasks, taskID) - task.Status = TaskStatusInProgress - at.assignedTasks[taskID] = task - at.reassignTaskStates() - - return nil -} - -// CompleteTask moves a task from assigned to recent -func (at *ActiveTopology) CompleteTask(taskID string) error { - at.mutex.Lock() - defer at.mutex.Unlock() - - task, exists := at.assignedTasks[taskID] - if !exists { - return fmt.Errorf("assigned task %s not found", taskID) - } - - delete(at.assignedTasks, taskID) - task.Status = TaskStatusCompleted - task.CompletedAt = time.Now() - at.recentTasks[taskID] = task - at.reassignTaskStates() - - // Clean up old recent tasks - at.cleanupRecentTasks() - - return nil -} - -// GetAvailableDisks returns disks that can accept new tasks of the given type -func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo { - at.mutex.RLock() - defer at.mutex.RUnlock() - - var available []*DiskInfo - - for _, disk := range at.disks { - if disk.NodeID == excludeNodeID { - continue // Skip excluded node - } - - if at.isDiskAvailable(disk, taskType) { - // Create a copy with current load count - diskCopy := *disk.DiskInfo - diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks) - available = append(available, &diskCopy) - } - } - - return available -} - -// GetDiskLoad returns the current load on a disk (number of active tasks) -func (at *ActiveTopology) GetDiskLoad(nodeID string, diskID uint32) int { - at.mutex.RLock() - defer at.mutex.RUnlock() - - diskKey := fmt.Sprintf("%s:%d", nodeID, diskID) - disk, exists := at.disks[diskKey] - if !exists { - return 0 - } - - return len(disk.pendingTasks) + len(disk.assignedTasks) -} - -// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection) -func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool { - at.mutex.RLock() - defer at.mutex.RUnlock() - - for _, task := range at.recentTasks { - if task.VolumeID == volumeID && task.TaskType == taskType { - return true - } - } - - return false -} - -// GetAllNodes returns information about all nodes (public interface) -func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo { - at.mutex.RLock() - defer at.mutex.RUnlock() - - result := make(map[string]*master_pb.DataNodeInfo) - for nodeID, node := range at.nodes { - result[nodeID] = node.nodeInfo - } - return result -} - -// GetTopologyInfo returns the current topology information (read-only access) -func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo { - at.mutex.RLock() - defer at.mutex.RUnlock() - return at.topologyInfo -} - -// GetNodeDisks returns all disks for a specific node -func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo { - at.mutex.RLock() - defer at.mutex.RUnlock() - - node, exists := at.nodes[nodeID] - if !exists { - return nil - } - - var disks []*DiskInfo - for _, disk := range node.disks { - diskCopy := *disk.DiskInfo - diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks) - disks = append(disks, &diskCopy) - } - - return disks -} - -// DestinationPlan represents a planned destination for a volume/shard operation -type DestinationPlan struct { - TargetNode string `json:"target_node"` - TargetDisk uint32 `json:"target_disk"` - TargetRack string `json:"target_rack"` - TargetDC string `json:"target_dc"` - ExpectedSize uint64 `json:"expected_size"` - PlacementScore float64 `json:"placement_score"` - Conflicts []string `json:"conflicts"` -} - -// MultiDestinationPlan represents multiple planned destinations for operations like EC -type MultiDestinationPlan struct { - Plans []*DestinationPlan `json:"plans"` - TotalShards int `json:"total_shards"` - SuccessfulRack int `json:"successful_racks"` - SuccessfulDCs int `json:"successful_dcs"` -} - -// Private methods - -// reassignTaskStates assigns tasks to the appropriate disks -func (at *ActiveTopology) reassignTaskStates() { - // Clear existing task assignments - for _, disk := range at.disks { - disk.pendingTasks = nil - disk.assignedTasks = nil - disk.recentTasks = nil - } - - // Reassign pending tasks - for _, task := range at.pendingTasks { - at.assignTaskToDisk(task) - } - - // Reassign assigned tasks - for _, task := range at.assignedTasks { - at.assignTaskToDisk(task) - } - - // Reassign recent tasks - for _, task := range at.recentTasks { - at.assignTaskToDisk(task) - } -} - -// assignTaskToDisk assigns a task to the appropriate disk(s) -func (at *ActiveTopology) assignTaskToDisk(task *taskState) { - // Assign to source disk - sourceKey := fmt.Sprintf("%s:%d", task.SourceServer, task.SourceDisk) - if sourceDisk, exists := at.disks[sourceKey]; exists { - switch task.Status { - case TaskStatusPending: - sourceDisk.pendingTasks = append(sourceDisk.pendingTasks, task) - case TaskStatusInProgress: - sourceDisk.assignedTasks = append(sourceDisk.assignedTasks, task) - case TaskStatusCompleted: - sourceDisk.recentTasks = append(sourceDisk.recentTasks, task) - } - } - - // Assign to target disk if it exists and is different from source - if task.TargetServer != "" && (task.TargetServer != task.SourceServer || task.TargetDisk != task.SourceDisk) { - targetKey := fmt.Sprintf("%s:%d", task.TargetServer, task.TargetDisk) - if targetDisk, exists := at.disks[targetKey]; exists { - switch task.Status { - case TaskStatusPending: - targetDisk.pendingTasks = append(targetDisk.pendingTasks, task) - case TaskStatusInProgress: - targetDisk.assignedTasks = append(targetDisk.assignedTasks, task) - case TaskStatusCompleted: - targetDisk.recentTasks = append(targetDisk.recentTasks, task) - } - } - } -} - -// isDiskAvailable checks if a disk can accept new tasks -func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool { - // Check if disk has too many active tasks - activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks) - if activeLoad >= 2 { // Max 2 concurrent tasks per disk - return false - } - - // Check for conflicting task types - for _, task := range disk.assignedTasks { - if at.areTaskTypesConflicting(task.TaskType, taskType) { - return false - } - } - - return true -} - -// areTaskTypesConflicting checks if two task types conflict -func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool { - // Examples of conflicting task types - conflictMap := map[TaskType][]TaskType{ - TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding}, - TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding}, - TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance}, - } - - if conflicts, exists := conflictMap[existing]; exists { - for _, conflictType := range conflicts { - if conflictType == new { - return true - } - } - } - - return false -} - -// cleanupRecentTasks removes old recent tasks -func (at *ActiveTopology) cleanupRecentTasks() { - cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second) - - for taskID, task := range at.recentTasks { - if task.CompletedAt.Before(cutoff) { - delete(at.recentTasks, taskID) - } - } -} diff --git a/weed/admin/topology/active_topology_test.go b/weed/admin/topology/active_topology_test.go index 4e8b0b3a8..9b0990f21 100644 --- a/weed/admin/topology/active_topology_test.go +++ b/weed/admin/topology/active_topology_test.go @@ -1,6 +1,7 @@ package topology import ( + "fmt" "testing" "time" @@ -9,6 +10,16 @@ import ( "github.com/stretchr/testify/require" ) +// Helper function to find a disk by ID for testing - reduces code duplication +func findDiskByID(disks []*DiskInfo, diskID uint32) *DiskInfo { + for _, disk := range disks { + if disk.DiskID == diskID { + return disk + } + } + return nil +} + // TestActiveTopologyBasicOperations tests basic topology management func TestActiveTopologyBasicOperations(t *testing.T) { topology := NewActiveTopology(10) @@ -58,8 +69,19 @@ func TestTaskLifecycle(t *testing.T) { taskID := "balance-001" // 1. Add pending task - topology.AddPendingTask(taskID, TaskTypeBalance, 1001, - "10.0.0.1:8080", 0, "10.0.0.2:8080", 1) + err := topology.AddPendingTask(TaskSpec{ + TaskID: taskID, + TaskType: TaskTypeBalance, + VolumeID: 1001, + VolumeSize: 1024 * 1024 * 1024, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.2:8080", DiskID: 1}, + }, + }) + assert.NoError(t, err, "Should add pending task successfully") // Verify pending state assert.Equal(t, 1, len(topology.pendingTasks)) @@ -77,7 +99,7 @@ func TestTaskLifecycle(t *testing.T) { assert.Equal(t, 1, len(targetDisk.pendingTasks)) // 2. Assign task - err := topology.AssignTask(taskID) + err = topology.AssignTask(taskID) require.NoError(t, err) // Verify assigned state @@ -258,8 +280,7 @@ func TestTargetSelectionScenarios(t *testing.T) { assert.NotEqual(t, tt.excludeNode, disk.NodeID, "Available disk should not be on excluded node") - load := tt.topology.GetDiskLoad(disk.NodeID, disk.DiskID) - assert.Less(t, load, 2, "Disk load should be less than 2") + assert.Less(t, disk.LoadCount, 2, "Disk load should be less than 2") } }) } @@ -271,37 +292,65 @@ func TestDiskLoadCalculation(t *testing.T) { topology.UpdateTopology(createSampleTopology()) // Initially no load - load := topology.GetDiskLoad("10.0.0.1:8080", 0) - assert.Equal(t, 0, load) + disks := topology.GetNodeDisks("10.0.0.1:8080") + targetDisk := findDiskByID(disks, 0) + require.NotNil(t, targetDisk, "Should find disk with ID 0") + assert.Equal(t, 0, targetDisk.LoadCount) // Add pending task - topology.AddPendingTask("task1", TaskTypeBalance, 1001, - "10.0.0.1:8080", 0, "10.0.0.2:8080", 1) + err := topology.AddPendingTask(TaskSpec{ + TaskID: "task1", + TaskType: TaskTypeBalance, + VolumeID: 1001, + VolumeSize: 1024 * 1024 * 1024, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.2:8080", DiskID: 1}, + }, + }) + assert.NoError(t, err, "Should add pending task successfully") // Check load increased - load = topology.GetDiskLoad("10.0.0.1:8080", 0) - assert.Equal(t, 1, load) + disks = topology.GetNodeDisks("10.0.0.1:8080") + targetDisk = findDiskByID(disks, 0) + assert.Equal(t, 1, targetDisk.LoadCount) // Add another task to same disk - topology.AddPendingTask("task2", TaskTypeVacuum, 1002, - "10.0.0.1:8080", 0, "", 0) + err = topology.AddPendingTask(TaskSpec{ + TaskID: "task2", + TaskType: TaskTypeVacuum, + VolumeID: 1002, + VolumeSize: 0, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination + }, + }) + assert.NoError(t, err, "Should add vacuum task successfully") - load = topology.GetDiskLoad("10.0.0.1:8080", 0) - assert.Equal(t, 2, load) + disks = topology.GetNodeDisks("10.0.0.1:8080") + targetDisk = findDiskByID(disks, 0) + assert.Equal(t, 2, targetDisk.LoadCount) // Move one task to assigned topology.AssignTask("task1") // Load should still be 2 (1 pending + 1 assigned) - load = topology.GetDiskLoad("10.0.0.1:8080", 0) - assert.Equal(t, 2, load) + disks = topology.GetNodeDisks("10.0.0.1:8080") + targetDisk = findDiskByID(disks, 0) + assert.Equal(t, 2, targetDisk.LoadCount) // Complete one task topology.CompleteTask("task1") // Load should decrease to 1 - load = topology.GetDiskLoad("10.0.0.1:8080", 0) - assert.Equal(t, 1, load) + disks = topology.GetNodeDisks("10.0.0.1:8080") + targetDisk = findDiskByID(disks, 0) + assert.Equal(t, 1, targetDisk.LoadCount) } // TestTaskConflictDetection tests task conflict detection @@ -310,8 +359,19 @@ func TestTaskConflictDetection(t *testing.T) { topology.UpdateTopology(createSampleTopology()) // Add a balance task - topology.AddPendingTask("balance1", TaskTypeBalance, 1001, - "10.0.0.1:8080", 0, "10.0.0.2:8080", 1) + err := topology.AddPendingTask(TaskSpec{ + TaskID: "balance1", + TaskType: TaskTypeBalance, + VolumeID: 1001, + VolumeSize: 1024 * 1024 * 1024, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.2:8080", DiskID: 1}, + }, + }) + assert.NoError(t, err, "Should add balance task successfully") topology.AssignTask("balance1") // Try to get available disks for vacuum (conflicts with balance) @@ -448,8 +508,22 @@ func createTopologyWithLoad() *ActiveTopology { topology.UpdateTopology(createSampleTopology()) // Add some existing tasks to create load - topology.AddPendingTask("existing1", TaskTypeVacuum, 2001, - "10.0.0.1:8080", 0, "", 0) + err := topology.AddPendingTask(TaskSpec{ + TaskID: "existing1", + TaskType: TaskTypeVacuum, + VolumeID: 2001, + VolumeSize: 0, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination + }, + }) + if err != nil { + // In test helper function, just log error instead of failing + fmt.Printf("Warning: Failed to add existing task: %v\n", err) + } topology.AssignTask("existing1") return topology @@ -466,12 +540,38 @@ func createTopologyWithConflicts() *ActiveTopology { topology.UpdateTopology(createSampleTopology()) // Add conflicting tasks - topology.AddPendingTask("balance1", TaskTypeBalance, 3001, - "10.0.0.1:8080", 0, "10.0.0.2:8080", 0) + err := topology.AddPendingTask(TaskSpec{ + TaskID: "balance1", + TaskType: TaskTypeBalance, + VolumeID: 3001, + VolumeSize: 1024 * 1024 * 1024, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.2:8080", DiskID: 0}, + }, + }) + if err != nil { + fmt.Printf("Warning: Failed to add balance task: %v\n", err) + } topology.AssignTask("balance1") - topology.AddPendingTask("ec1", TaskTypeErasureCoding, 3002, - "10.0.0.1:8080", 1, "", 0) + err = topology.AddPendingTask(TaskSpec{ + TaskID: "ec1", + TaskType: TaskTypeErasureCoding, + VolumeID: 3002, + VolumeSize: 1024 * 1024 * 1024, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 1}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "", DiskID: 0}, // EC doesn't have single destination + }, + }) + if err != nil { + fmt.Printf("Warning: Failed to add EC task: %v\n", err) + } topology.AssignTask("ec1") return topology diff --git a/weed/admin/topology/capacity.go b/weed/admin/topology/capacity.go new file mode 100644 index 000000000..a595ed369 --- /dev/null +++ b/weed/admin/topology/capacity.go @@ -0,0 +1,300 @@ +package topology + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +// GetEffectiveAvailableCapacity returns the effective available capacity for a disk +// This considers BOTH pending and assigned tasks for capacity reservation. +// +// Formula: BaseAvailable - (VolumeSlots + ShardSlots/ShardsPerVolumeSlot) from all tasks +// +// The calculation includes: +// - Pending tasks: Reserve capacity immediately when added +// - Assigned tasks: Continue to reserve capacity during execution +// - Recently completed tasks are NOT counted against capacity +func (at *ActiveTopology) GetEffectiveAvailableCapacity(nodeID string, diskID uint32) int64 { + at.mutex.RLock() + defer at.mutex.RUnlock() + + diskKey := fmt.Sprintf("%s:%d", nodeID, diskID) + disk, exists := at.disks[diskKey] + if !exists { + return 0 + } + + if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil { + return 0 + } + + // Use the same logic as getEffectiveAvailableCapacityUnsafe but with locking + capacity := at.getEffectiveAvailableCapacityUnsafe(disk) + return int64(capacity.VolumeSlots) +} + +// GetEffectiveAvailableCapacityDetailed returns detailed available capacity as StorageSlotChange +// This provides granular information about available volume slots and shard slots +func (at *ActiveTopology) GetEffectiveAvailableCapacityDetailed(nodeID string, diskID uint32) StorageSlotChange { + at.mutex.RLock() + defer at.mutex.RUnlock() + + diskKey := fmt.Sprintf("%s:%d", nodeID, diskID) + disk, exists := at.disks[diskKey] + if !exists { + return StorageSlotChange{} + } + + if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil { + return StorageSlotChange{} + } + + return at.getEffectiveAvailableCapacityUnsafe(disk) +} + +// GetEffectiveCapacityImpact returns the StorageSlotChange impact for a disk +// This shows the net impact from all pending and assigned tasks +func (at *ActiveTopology) GetEffectiveCapacityImpact(nodeID string, diskID uint32) StorageSlotChange { + at.mutex.RLock() + defer at.mutex.RUnlock() + + diskKey := fmt.Sprintf("%s:%d", nodeID, diskID) + disk, exists := at.disks[diskKey] + if !exists { + return StorageSlotChange{} + } + + return at.getEffectiveCapacityUnsafe(disk) +} + +// GetDisksWithEffectiveCapacity returns disks with sufficient effective capacity +// This method considers BOTH pending and assigned tasks for capacity reservation using StorageSlotChange. +// +// Parameters: +// - taskType: type of task to check compatibility for +// - excludeNodeID: node to exclude from results +// - minCapacity: minimum effective capacity required (in volume slots) +// +// Returns: DiskInfo objects where VolumeCount reflects capacity reserved by all tasks +func (at *ActiveTopology) GetDisksWithEffectiveCapacity(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo { + at.mutex.RLock() + defer at.mutex.RUnlock() + + var available []*DiskInfo + + for _, disk := range at.disks { + if disk.NodeID == excludeNodeID { + continue // Skip excluded node + } + + if at.isDiskAvailable(disk, taskType) { + effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk) + + // Only include disks that meet minimum capacity requirement + if int64(effectiveCapacity.VolumeSlots) >= minCapacity { + // Create a new DiskInfo with current capacity information + diskCopy := DiskInfo{ + NodeID: disk.DiskInfo.NodeID, + DiskID: disk.DiskInfo.DiskID, + DiskType: disk.DiskInfo.DiskType, + DataCenter: disk.DiskInfo.DataCenter, + Rack: disk.DiskInfo.Rack, + LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks + } + + // Create a new protobuf DiskInfo to avoid modifying the original + diskInfoCopy := &master_pb.DiskInfo{ + DiskId: disk.DiskInfo.DiskInfo.DiskId, + MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount, + VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots), + VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos, + EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos, + RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount, + ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount, + FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount, + } + diskCopy.DiskInfo = diskInfoCopy + + available = append(available, &diskCopy) + } + } + } + + return available +} + +// GetDisksForPlanning returns disks considering both active and pending tasks for planning decisions +// This helps avoid over-scheduling tasks to the same disk +func (at *ActiveTopology) GetDisksForPlanning(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo { + at.mutex.RLock() + defer at.mutex.RUnlock() + + var available []*DiskInfo + + for _, disk := range at.disks { + if disk.NodeID == excludeNodeID { + continue // Skip excluded node + } + + // Consider both pending and active tasks for scheduling decisions + if at.isDiskAvailableForPlanning(disk, taskType) { + // Check if disk can accommodate new task considering pending tasks + planningCapacity := at.getPlanningCapacityUnsafe(disk) + + if int64(planningCapacity.VolumeSlots) >= minCapacity { + // Create a new DiskInfo with planning information + diskCopy := DiskInfo{ + NodeID: disk.DiskInfo.NodeID, + DiskID: disk.DiskInfo.DiskID, + DiskType: disk.DiskInfo.DiskType, + DataCenter: disk.DiskInfo.DataCenter, + Rack: disk.DiskInfo.Rack, + LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), + } + + // Create a new protobuf DiskInfo to avoid modifying the original + diskInfoCopy := &master_pb.DiskInfo{ + DiskId: disk.DiskInfo.DiskInfo.DiskId, + MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount, + VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(planningCapacity.VolumeSlots), + VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos, + EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos, + RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount, + ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount, + FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount, + } + diskCopy.DiskInfo = diskInfoCopy + + available = append(available, &diskCopy) + } + } + } + + return available +} + +// CanAccommodateTask checks if a disk can accommodate a new task considering all constraints +func (at *ActiveTopology) CanAccommodateTask(nodeID string, diskID uint32, taskType TaskType, volumesNeeded int64) bool { + at.mutex.RLock() + defer at.mutex.RUnlock() + + diskKey := fmt.Sprintf("%s:%d", nodeID, diskID) + disk, exists := at.disks[diskKey] + if !exists { + return false + } + + // Check basic availability + if !at.isDiskAvailable(disk, taskType) { + return false + } + + // Check effective capacity + effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk) + return int64(effectiveCapacity.VolumeSlots) >= volumesNeeded +} + +// getPlanningCapacityUnsafe considers both pending and active tasks for planning +func (at *ActiveTopology) getPlanningCapacityUnsafe(disk *activeDisk) StorageSlotChange { + if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil { + return StorageSlotChange{} + } + + baseAvailableVolumes := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount + + // Use the centralized helper function to calculate task storage impact + totalImpact := at.calculateTaskStorageImpact(disk) + + // Calculate available capacity considering impact (negative impact reduces availability) + availableVolumeSlots := baseAvailableVolumes - totalImpact.ToVolumeSlots() + if availableVolumeSlots < 0 { + availableVolumeSlots = 0 + } + + // Return detailed capacity information + return StorageSlotChange{ + VolumeSlots: int32(availableVolumeSlots), + ShardSlots: -totalImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability) + } +} + +// isDiskAvailableForPlanning checks if disk can accept new tasks considering pending load +func (at *ActiveTopology) isDiskAvailableForPlanning(disk *activeDisk, taskType TaskType) bool { + // Check total load including pending tasks + totalLoad := len(disk.pendingTasks) + len(disk.assignedTasks) + if totalLoad >= MaxTotalTaskLoadPerDisk { + return false + } + + // Check for conflicting task types in active tasks only + for _, task := range disk.assignedTasks { + if at.areTaskTypesConflicting(task.TaskType, taskType) { + return false + } + } + + return true +} + +// calculateTaskStorageImpact is a helper function that calculates the total storage impact +// from all tasks (pending and assigned) on a given disk. This eliminates code duplication +// between multiple capacity calculation functions. +func (at *ActiveTopology) calculateTaskStorageImpact(disk *activeDisk) StorageSlotChange { + if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil { + return StorageSlotChange{} + } + + totalImpact := StorageSlotChange{} + + // Process both pending and assigned tasks with identical logic + taskLists := [][]*taskState{disk.pendingTasks, disk.assignedTasks} + + for _, taskList := range taskLists { + for _, task := range taskList { + // Calculate impact for all source locations + for _, source := range task.Sources { + if source.SourceServer == disk.NodeID && source.SourceDisk == disk.DiskID { + totalImpact.AddInPlace(source.StorageChange) + } + } + + // Calculate impact for all destination locations + for _, dest := range task.Destinations { + if dest.TargetServer == disk.NodeID && dest.TargetDisk == disk.DiskID { + totalImpact.AddInPlace(dest.StorageChange) + } + } + } + } + + return totalImpact +} + +// getEffectiveCapacityUnsafe returns effective capacity impact without locking (for internal use) +// Returns StorageSlotChange representing the net impact from all tasks +func (at *ActiveTopology) getEffectiveCapacityUnsafe(disk *activeDisk) StorageSlotChange { + return at.calculateTaskStorageImpact(disk) +} + +// getEffectiveAvailableCapacityUnsafe returns detailed available capacity as StorageSlotChange +func (at *ActiveTopology) getEffectiveAvailableCapacityUnsafe(disk *activeDisk) StorageSlotChange { + if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil { + return StorageSlotChange{} + } + + baseAvailable := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount + netImpact := at.getEffectiveCapacityUnsafe(disk) + + // Calculate available volume slots (negative impact reduces availability) + availableVolumeSlots := baseAvailable - netImpact.ToVolumeSlots() + if availableVolumeSlots < 0 { + availableVolumeSlots = 0 + } + + // Return detailed capacity information + return StorageSlotChange{ + VolumeSlots: int32(availableVolumeSlots), + ShardSlots: -netImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability) + } +} diff --git a/weed/admin/topology/internal.go b/weed/admin/topology/internal.go new file mode 100644 index 000000000..72e37f6c1 --- /dev/null +++ b/weed/admin/topology/internal.go @@ -0,0 +1,114 @@ +package topology + +import ( + "fmt" + "time" +) + +// reassignTaskStates assigns tasks to the appropriate disks +func (at *ActiveTopology) reassignTaskStates() { + // Clear existing task assignments + for _, disk := range at.disks { + disk.pendingTasks = nil + disk.assignedTasks = nil + disk.recentTasks = nil + } + + // Reassign pending tasks + for _, task := range at.pendingTasks { + at.assignTaskToDisk(task) + } + + // Reassign assigned tasks + for _, task := range at.assignedTasks { + at.assignTaskToDisk(task) + } + + // Reassign recent tasks + for _, task := range at.recentTasks { + at.assignTaskToDisk(task) + } +} + +// assignTaskToDisk assigns a task to the appropriate disk(s) +func (at *ActiveTopology) assignTaskToDisk(task *taskState) { + addedDisks := make(map[string]bool) + + // Local helper function to assign task to a disk and avoid code duplication + assign := func(server string, diskID uint32) { + key := fmt.Sprintf("%s:%d", server, diskID) + if server == "" || addedDisks[key] { + return + } + if disk, exists := at.disks[key]; exists { + switch task.Status { + case TaskStatusPending: + disk.pendingTasks = append(disk.pendingTasks, task) + case TaskStatusInProgress: + disk.assignedTasks = append(disk.assignedTasks, task) + case TaskStatusCompleted: + disk.recentTasks = append(disk.recentTasks, task) + } + addedDisks[key] = true + } + } + + // Assign to all source disks + for _, source := range task.Sources { + assign(source.SourceServer, source.SourceDisk) + } + + // Assign to all destination disks (duplicates automatically avoided by helper) + for _, dest := range task.Destinations { + assign(dest.TargetServer, dest.TargetDisk) + } +} + +// isDiskAvailable checks if a disk can accept new tasks +func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool { + // Check if disk has too many pending and active tasks + activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks) + if activeLoad >= MaxConcurrentTasksPerDisk { + return false + } + + // Check for conflicting task types + for _, task := range disk.assignedTasks { + if at.areTaskTypesConflicting(task.TaskType, taskType) { + return false + } + } + + return true +} + +// areTaskTypesConflicting checks if two task types conflict +func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool { + // Examples of conflicting task types + conflictMap := map[TaskType][]TaskType{ + TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding}, + TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding}, + TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance}, + } + + if conflicts, exists := conflictMap[existing]; exists { + for _, conflictType := range conflicts { + if conflictType == new { + return true + } + } + } + + return false +} + +// cleanupRecentTasks removes old recent tasks +func (at *ActiveTopology) cleanupRecentTasks() { + cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second) + + for taskID, task := range at.recentTasks { + if task.CompletedAt.Before(cutoff) { + delete(at.recentTasks, taskID) + } + } +} diff --git a/weed/admin/topology/storage_impact.go b/weed/admin/topology/storage_impact.go new file mode 100644 index 000000000..e325fc9cf --- /dev/null +++ b/weed/admin/topology/storage_impact.go @@ -0,0 +1,50 @@ +package topology + +import ( + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" +) + +// CalculateTaskStorageImpact calculates storage impact for different task types +func CalculateTaskStorageImpact(taskType TaskType, volumeSize int64) (sourceChange, targetChange StorageSlotChange) { + switch taskType { + case TaskTypeErasureCoding: + // EC task: distributes shards to MULTIPLE targets, source reserves with zero impact + // Source reserves capacity but with zero StorageSlotChange (no actual capacity consumption during planning) + // WARNING: EC has multiple targets! Use AddPendingTask with multiple destinations for proper multi-target handling + // This simplified function returns zero impact; real EC requires specialized multi-destination calculation + return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0} + + case TaskTypeBalance: + // Balance task: moves volume from source to target + // Source loses 1 volume, target gains 1 volume + return StorageSlotChange{VolumeSlots: -1, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0} + + case TaskTypeVacuum: + // Vacuum task: frees space by removing deleted entries, no slot change + return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0} + + case TaskTypeReplication: + // Replication task: creates new replica on target + return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0} + + default: + // Unknown task type, assume minimal impact + glog.Warningf("unhandled task type %s in CalculateTaskStorageImpact, assuming default impact", taskType) + return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0} + } +} + +// CalculateECShardStorageImpact calculates storage impact for EC shards specifically +func CalculateECShardStorageImpact(shardCount int32, expectedShardSize int64) StorageSlotChange { + // EC shards are typically much smaller than full volumes + // Use shard-level tracking for granular capacity planning + return StorageSlotChange{VolumeSlots: 0, ShardSlots: shardCount} +} + +// CalculateECShardCleanupImpact calculates storage impact for cleaning up existing EC shards +func CalculateECShardCleanupImpact(originalVolumeSize int64) StorageSlotChange { + // Cleaning up existing EC shards frees shard slots + // Use the actual EC configuration constants for accurate shard count + return StorageSlotChange{VolumeSlots: 0, ShardSlots: -int32(erasure_coding.TotalShardsCount)} // Negative = freed capacity +} diff --git a/weed/admin/topology/storage_slot_test.go b/weed/admin/topology/storage_slot_test.go new file mode 100644 index 000000000..5a0ed3ce5 --- /dev/null +++ b/weed/admin/topology/storage_slot_test.go @@ -0,0 +1,1004 @@ +package topology + +import ( + "fmt" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/stretchr/testify/assert" +) + +// NOTE: These tests are designed to work with any value of erasure_coding.DataShardsCount. +// This ensures compatibility with custom erasure coding configurations where DataShardsCount +// might be changed from the default value of 10. All shard-to-volume conversion calculations +// are done dynamically using the actual constant value. + +// testGetDiskStorageImpact is a test helper that provides the same interface as the removed +// GetDiskStorageImpact method. For simplicity, it returns the total impact as "planned" +// and zeros for "reserved" since the distinction is not critical for most test scenarios. +func testGetDiskStorageImpact(at *ActiveTopology, nodeID string, diskID uint32) (plannedVolumeSlots, reservedVolumeSlots int64, plannedShardSlots, reservedShardSlots int32, estimatedSize int64) { + impact := at.GetEffectiveCapacityImpact(nodeID, diskID) + // Return total impact as "planned" for test compatibility + return int64(impact.VolumeSlots), 0, impact.ShardSlots, 0, 0 +} + +// TestStorageSlotChangeArithmetic tests the arithmetic operations on StorageSlotChange +func TestStorageSlotChangeArithmetic(t *testing.T) { + // Test basic arithmetic operations + a := StorageSlotChange{VolumeSlots: 5, ShardSlots: 10} + b := StorageSlotChange{VolumeSlots: 3, ShardSlots: 8} + + // Test Add + sum := a.Add(b) + assert.Equal(t, StorageSlotChange{VolumeSlots: 8, ShardSlots: 18}, sum, "Add should work correctly") + + // Test Subtract + diff := a.Subtract(b) + assert.Equal(t, StorageSlotChange{VolumeSlots: 2, ShardSlots: 2}, diff, "Subtract should work correctly") + + // Test AddInPlace + c := StorageSlotChange{VolumeSlots: 1, ShardSlots: 2} + c.AddInPlace(b) + assert.Equal(t, StorageSlotChange{VolumeSlots: 4, ShardSlots: 10}, c, "AddInPlace should modify in place") + + // Test SubtractInPlace + d := StorageSlotChange{VolumeSlots: 10, ShardSlots: 20} + d.SubtractInPlace(b) + assert.Equal(t, StorageSlotChange{VolumeSlots: 7, ShardSlots: 12}, d, "SubtractInPlace should modify in place") + + // Test IsZero + zero := StorageSlotChange{VolumeSlots: 0, ShardSlots: 0} + nonZero := StorageSlotChange{VolumeSlots: 1, ShardSlots: 0} + assert.True(t, zero.IsZero(), "Zero struct should return true for IsZero") + assert.False(t, nonZero.IsZero(), "Non-zero struct should return false for IsZero") + + // Test ToVolumeSlots conversion + impact1 := StorageSlotChange{VolumeSlots: 5, ShardSlots: 10} + assert.Equal(t, int64(6), impact1.ToVolumeSlots(), fmt.Sprintf("ToVolumeSlots should be 5 + 10/%d = 6", erasure_coding.DataShardsCount)) + + impact2 := StorageSlotChange{VolumeSlots: -2, ShardSlots: 25} + assert.Equal(t, int64(0), impact2.ToVolumeSlots(), fmt.Sprintf("ToVolumeSlots should be -2 + 25/%d = 0", erasure_coding.DataShardsCount)) + + impact3 := StorageSlotChange{VolumeSlots: 3, ShardSlots: 7} + assert.Equal(t, int64(3), impact3.ToVolumeSlots(), fmt.Sprintf("ToVolumeSlots should be 3 + 7/%d = 3 (integer division)", erasure_coding.DataShardsCount)) +} + +// TestStorageSlotChange tests the new dual-level storage slot tracking +func TestStorageSlotChange(t *testing.T) { + activeTopology := NewActiveTopology(10) + + // Create test topology + topologyInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "10.0.0.1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + DiskId: 0, + Type: "hdd", + VolumeCount: 5, + MaxVolumeCount: 20, + }, + }, + }, + { + Id: "10.0.0.2:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + DiskId: 0, + Type: "hdd", + VolumeCount: 8, + MaxVolumeCount: 15, + }, + }, + }, + }, + }, + }, + }, + }, + } + + activeTopology.UpdateTopology(topologyInfo) + + // Test 1: Basic storage slot calculation + ecSourceChange, ecTargetChange := CalculateTaskStorageImpact(TaskTypeErasureCoding, 1024*1024*1024) + assert.Equal(t, int32(0), ecSourceChange.VolumeSlots, "EC source reserves with zero StorageSlotChange impact") + assert.Equal(t, int32(0), ecSourceChange.ShardSlots, "EC source should have zero shard impact") + assert.Equal(t, int32(0), ecTargetChange.VolumeSlots, "EC should not directly impact target volume slots") + assert.Equal(t, int32(0), ecTargetChange.ShardSlots, "EC target should have zero shard impact from this simplified function") + + balSourceChange, balTargetChange := CalculateTaskStorageImpact(TaskTypeBalance, 1024*1024*1024) + assert.Equal(t, int32(-1), balSourceChange.VolumeSlots, "Balance should free 1 volume slot on source") + assert.Equal(t, int32(1), balTargetChange.VolumeSlots, "Balance should consume 1 volume slot on target") + + // Test 2: EC shard impact calculation + shardImpact := CalculateECShardStorageImpact(3, 100*1024*1024) // 3 shards, 100MB each + assert.Equal(t, int32(0), shardImpact.VolumeSlots, "EC shards should not impact volume slots") + assert.Equal(t, int32(3), shardImpact.ShardSlots, "EC should impact 3 shard slots") + + // Test 3: Add EC task with shard-level tracking + sourceServer := "10.0.0.1:8080" + sourceDisk := uint32(0) + shardDestinations := []string{"10.0.0.2:8080", "10.0.0.2:8080"} + shardDiskIDs := []uint32{0, 0} + + expectedShardSize := int64(50 * 1024 * 1024) // 50MB per shard + originalVolumeSize := int64(1024 * 1024 * 1024) // 1GB original + + // Create source specs (single replica in this test) + sources := []TaskSourceSpec{ + {ServerID: sourceServer, DiskID: sourceDisk, CleanupType: CleanupVolumeReplica}, + } + + // Create destination specs + destinations := make([]TaskDestinationSpec, len(shardDestinations)) + shardImpact = CalculateECShardStorageImpact(1, expectedShardSize) + for i, dest := range shardDestinations { + destinations[i] = TaskDestinationSpec{ + ServerID: dest, + DiskID: shardDiskIDs[i], + StorageImpact: &shardImpact, + EstimatedSize: &expectedShardSize, + } + } + + err := activeTopology.AddPendingTask(TaskSpec{ + TaskID: "ec_test", + TaskType: TaskTypeErasureCoding, + VolumeID: 100, + VolumeSize: originalVolumeSize, + Sources: sources, + Destinations: destinations, + }) + assert.NoError(t, err, "Should add EC shard task successfully") + + // Test 4: Check storage impact on source (EC reserves with zero impact) + sourceImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.1:8080", 0) + assert.Equal(t, int32(0), sourceImpact.VolumeSlots, "Source should show 0 volume slot impact (EC reserves with zero impact)") + assert.Equal(t, int32(0), sourceImpact.ShardSlots, "Source should show 0 shard slot impact") + + // Test 5: Check storage impact on target (should gain shards) + targetImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.2:8080", 0) + assert.Equal(t, int32(0), targetImpact.VolumeSlots, "Target should show 0 volume slot impact (EC shards don't use volume slots)") + assert.Equal(t, int32(2), targetImpact.ShardSlots, "Target should show 2 shard slot impact") + + // Test 6: Check effective capacity calculation (EC source reserves with zero StorageSlotChange) + sourceCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + targetCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0) + + // Source: 15 original available (EC source reserves with zero StorageSlotChange impact) + assert.Equal(t, int64(15), sourceCapacity, "Source should have 15 available slots (EC source has zero StorageSlotChange impact)") + + // Target: 7 original available - (2 shards / 10) = 7 (since 2/10 rounds down to 0) + assert.Equal(t, int64(7), targetCapacity, "Target should have 7 available slots (minimal shard impact)") + + // Test 7: Add traditional balance task for comparison + err = activeTopology.AddPendingTask(TaskSpec{ + TaskID: "balance_test", + TaskType: TaskTypeBalance, + VolumeID: 101, + VolumeSize: 512 * 1024 * 1024, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.2:8080", DiskID: 0}, + }, + }) + assert.NoError(t, err, "Should add balance task successfully") + + // Check updated impacts after adding balance task + finalSourceImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.1:8080", 0) + finalTargetImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.2:8080", 0) + + assert.Equal(t, int32(-1), finalSourceImpact.VolumeSlots, "Source should show -1 volume slot impact (EC: 0, Balance: -1)") + assert.Equal(t, int32(1), finalTargetImpact.VolumeSlots, "Target should show 1 volume slot impact (Balance: +1)") + assert.Equal(t, int32(2), finalTargetImpact.ShardSlots, "Target should still show 2 shard slot impact (EC shards)") +} + +// TestStorageSlotChangeCapacityCalculation tests the capacity calculation with mixed slot types +func TestStorageSlotChangeCapacityCalculation(t *testing.T) { + activeTopology := NewActiveTopology(10) + + // Create simple topology + topologyInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "10.0.0.1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + DiskId: 0, + Type: "hdd", + VolumeCount: 10, + MaxVolumeCount: 100, // Large capacity for testing + }, + }, + }, + }, + }, + }, + }, + }, + } + + activeTopology.UpdateTopology(topologyInfo) + + // Initial capacity + initialCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + assert.Equal(t, int64(90), initialCapacity, "Should start with 90 available slots") + + // Add tasks with different shard slot impacts + targetImpact1 := StorageSlotChange{VolumeSlots: 0, ShardSlots: 5} // Target gains 5 shards + estimatedSize1 := int64(100 * 1024 * 1024) + err := activeTopology.AddPendingTask(TaskSpec{ + TaskID: "shard_test_1", + TaskType: TaskTypeErasureCoding, + VolumeID: 100, + VolumeSize: estimatedSize1, + Sources: []TaskSourceSpec{ + {ServerID: "", DiskID: 0}, // Source not applicable here + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0, StorageImpact: &targetImpact1, EstimatedSize: &estimatedSize1}, + }, + }) + assert.NoError(t, err, "Should add shard test 1 successfully") + + // Capacity should be reduced by pending tasks via StorageSlotChange + capacityAfterShards := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + // Dynamic calculation: 5 shards < DataShardsCount, so no volume impact + expectedImpact5 := int64(5 / erasure_coding.DataShardsCount) // Should be 0 for any reasonable DataShardsCount + assert.Equal(t, int64(90-expectedImpact5), capacityAfterShards, fmt.Sprintf("5 shard slots should consume %d volume slot equivalent (5/%d = %d)", expectedImpact5, erasure_coding.DataShardsCount, expectedImpact5)) + + // Add more shards to reach threshold + additionalShards := int32(erasure_coding.DataShardsCount) // Add exactly one volume worth of shards + targetImpact2 := StorageSlotChange{VolumeSlots: 0, ShardSlots: additionalShards} // Target gains additional shards + estimatedSize2 := int64(100 * 1024 * 1024) + err = activeTopology.AddPendingTask(TaskSpec{ + TaskID: "shard_test_2", + TaskType: TaskTypeErasureCoding, + VolumeID: 101, + VolumeSize: estimatedSize2, + Sources: []TaskSourceSpec{ + {ServerID: "", DiskID: 0}, // Source not applicable here + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0, StorageImpact: &targetImpact2, EstimatedSize: &estimatedSize2}, + }, + }) + assert.NoError(t, err, "Should add shard test 2 successfully") + + // Dynamic calculation: (5 + DataShardsCount) shards should consume 1 volume slot + totalShards := 5 + erasure_coding.DataShardsCount + expectedImpact15 := int64(totalShards / erasure_coding.DataShardsCount) // Should be 1 + capacityAfterMoreShards := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + assert.Equal(t, int64(90-expectedImpact15), capacityAfterMoreShards, fmt.Sprintf("%d shard slots should consume %d volume slot equivalent (%d/%d = %d)", totalShards, expectedImpact15, totalShards, erasure_coding.DataShardsCount, expectedImpact15)) + + // Add a full volume task + targetImpact3 := StorageSlotChange{VolumeSlots: 1, ShardSlots: 0} // Target gains 1 volume + estimatedSize3 := int64(1024 * 1024 * 1024) + err = activeTopology.AddPendingTask(TaskSpec{ + TaskID: "volume_test", + TaskType: TaskTypeBalance, + VolumeID: 102, + VolumeSize: estimatedSize3, + Sources: []TaskSourceSpec{ + {ServerID: "", DiskID: 0}, // Source not applicable here + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0, StorageImpact: &targetImpact3, EstimatedSize: &estimatedSize3}, + }, + }) + assert.NoError(t, err, "Should add volume test successfully") + + // Capacity should be reduced by 1 more volume slot + finalCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + assert.Equal(t, int64(88), finalCapacity, "1 volume + 15 shard slots should consume 2 volume slots total") + + // Verify the detailed storage impact + plannedVol, reservedVol, plannedShard, reservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.1:8080", 0) + assert.Equal(t, int64(1), plannedVol, "Should show 1 planned volume slot") + assert.Equal(t, int64(0), reservedVol, "Should show 0 reserved volume slots") + assert.Equal(t, int32(15), plannedShard, "Should show 15 planned shard slots") + assert.Equal(t, int32(0), reservedShard, "Should show 0 reserved shard slots") +} + +// TestECMultipleTargets demonstrates proper handling of EC operations with multiple targets +func TestECMultipleTargets(t *testing.T) { + activeTopology := NewActiveTopology(10) + + // Create test topology with multiple target nodes + topologyInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "10.0.0.1:8080", // Source + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 10, MaxVolumeCount: 50}, + }, + }, + { + Id: "10.0.0.2:8080", // Target 1 + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 5, MaxVolumeCount: 30}, + }, + }, + { + Id: "10.0.0.3:8080", // Target 2 + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 8, MaxVolumeCount: 40}, + }, + }, + { + Id: "10.0.0.4:8080", // Target 3 + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 12, MaxVolumeCount: 35}, + }, + }, + }, + }, + }, + }, + }, + } + + activeTopology.UpdateTopology(topologyInfo) + + // Demonstrate why CalculateTaskStorageImpact is insufficient for EC + sourceChange, targetChange := CalculateTaskStorageImpact(TaskTypeErasureCoding, 1*1024*1024*1024) + assert.Equal(t, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, sourceChange, "Source reserves with zero StorageSlotChange") + assert.Equal(t, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, targetChange, "Target has zero impact from simplified function - insufficient for multi-target EC") + + // Proper way: Use AddPendingTask for multiple targets + sourceServer := "10.0.0.1:8080" + sourceDisk := uint32(0) + + // EC typically distributes shards across multiple targets + shardDestinations := []string{ + "10.0.0.2:8080", "10.0.0.2:8080", "10.0.0.2:8080", "10.0.0.2:8080", "10.0.0.2:8080", // 5 shards to target 1 + "10.0.0.3:8080", "10.0.0.3:8080", "10.0.0.3:8080", "10.0.0.3:8080", "10.0.0.3:8080", // 5 shards to target 2 + "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", // 4 shards to target 3 + } + shardDiskIDs := make([]uint32, len(shardDestinations)) + for i := range shardDiskIDs { + shardDiskIDs[i] = 0 + } + + // Create source specs (single replica in this test) + sources := []TaskSourceSpec{ + {ServerID: sourceServer, DiskID: sourceDisk, CleanupType: CleanupVolumeReplica}, + } + + // Create destination specs + destinations := make([]TaskDestinationSpec, len(shardDestinations)) + expectedShardSize := int64(50 * 1024 * 1024) + shardImpact := CalculateECShardStorageImpact(1, expectedShardSize) + for i, dest := range shardDestinations { + destinations[i] = TaskDestinationSpec{ + ServerID: dest, + DiskID: shardDiskIDs[i], + StorageImpact: &shardImpact, + EstimatedSize: &expectedShardSize, + } + } + + err := activeTopology.AddPendingTask(TaskSpec{ + TaskID: "ec_multi_target", + TaskType: TaskTypeErasureCoding, + VolumeID: 200, + VolumeSize: 1 * 1024 * 1024 * 1024, + Sources: sources, + Destinations: destinations, + }) + assert.NoError(t, err, "Should add multi-target EC task successfully") + + // Verify source impact (EC reserves with zero StorageSlotChange) + sourcePlannedVol, sourceReservedVol, sourcePlannedShard, sourceReservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.1:8080", 0) + assert.Equal(t, int64(0), sourcePlannedVol, "Source should reserve with zero volume slot impact") + assert.Equal(t, int64(0), sourceReservedVol, "Source should not have reserved capacity yet") + assert.Equal(t, int32(0), sourcePlannedShard, "Source should not have planned shard impact") + assert.Equal(t, int32(0), sourceReservedShard, "Source should not have reserved shard impact") + // Note: EstimatedSize tracking is no longer exposed via public API + + // Verify target impacts (planned, not yet reserved) + target1PlannedVol, target1ReservedVol, target1PlannedShard, target1ReservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.2:8080", 0) + target2PlannedVol, target2ReservedVol, target2PlannedShard, target2ReservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.3:8080", 0) + target3PlannedVol, target3ReservedVol, target3PlannedShard, target3ReservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.4:8080", 0) + + assert.Equal(t, int64(0), target1PlannedVol, "Target 1 should not have planned volume impact") + assert.Equal(t, int32(5), target1PlannedShard, "Target 1 should plan to receive 5 shards") + assert.Equal(t, int64(0), target1ReservedVol, "Target 1 should not have reserved capacity yet") + assert.Equal(t, int32(0), target1ReservedShard, "Target 1 should not have reserved shards yet") + + assert.Equal(t, int64(0), target2PlannedVol, "Target 2 should not have planned volume impact") + assert.Equal(t, int32(5), target2PlannedShard, "Target 2 should plan to receive 5 shards") + assert.Equal(t, int64(0), target2ReservedVol, "Target 2 should not have reserved capacity yet") + assert.Equal(t, int32(0), target2ReservedShard, "Target 2 should not have reserved shards yet") + + assert.Equal(t, int64(0), target3PlannedVol, "Target 3 should not have planned volume impact") + assert.Equal(t, int32(4), target3PlannedShard, "Target 3 should plan to receive 4 shards") + assert.Equal(t, int64(0), target3ReservedVol, "Target 3 should not have reserved capacity yet") + assert.Equal(t, int32(0), target3ReservedShard, "Target 3 should not have reserved shards yet") + + // Verify effective capacity (considers both pending and active tasks via StorageSlotChange) + sourceCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + target1Capacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0) + target2Capacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.3:8080", 0) + target3Capacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.4:8080", 0) + + // Dynamic capacity calculations based on actual DataShardsCount + expectedTarget1Impact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact + expectedTarget2Impact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact + expectedTarget3Impact := int64(4 / erasure_coding.DataShardsCount) // 4 shards impact + + assert.Equal(t, int64(40), sourceCapacity, "Source: 40 (EC source reserves with zero StorageSlotChange impact)") + assert.Equal(t, int64(25-expectedTarget1Impact), target1Capacity, fmt.Sprintf("Target 1: 25 - %d (5 shards/%d = %d impact) = %d", expectedTarget1Impact, erasure_coding.DataShardsCount, expectedTarget1Impact, 25-expectedTarget1Impact)) + assert.Equal(t, int64(32-expectedTarget2Impact), target2Capacity, fmt.Sprintf("Target 2: 32 - %d (5 shards/%d = %d impact) = %d", expectedTarget2Impact, erasure_coding.DataShardsCount, expectedTarget2Impact, 32-expectedTarget2Impact)) + assert.Equal(t, int64(23-expectedTarget3Impact), target3Capacity, fmt.Sprintf("Target 3: 23 - %d (4 shards/%d = %d impact) = %d", expectedTarget3Impact, erasure_coding.DataShardsCount, expectedTarget3Impact, 23-expectedTarget3Impact)) + + t.Logf("EC operation distributed %d shards across %d targets", len(shardDestinations), 3) + t.Logf("Capacity impacts: EC source reserves with zero impact, Targets minimal (shards < %d)", erasure_coding.DataShardsCount) +} + +// TestCapacityReservationCycle demonstrates the complete task lifecycle and capacity management +func TestCapacityReservationCycle(t *testing.T) { + activeTopology := NewActiveTopology(10) + + // Create test topology + topologyInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "10.0.0.1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 10, MaxVolumeCount: 20}, + }, + }, + { + Id: "10.0.0.2:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 5, MaxVolumeCount: 15}, + }, + }, + }, + }, + }, + }, + }, + } + activeTopology.UpdateTopology(topologyInfo) + + // Initial capacity + sourceCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + targetCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0) + assert.Equal(t, int64(10), sourceCapacity, "Source initial capacity") + assert.Equal(t, int64(10), targetCapacity, "Target initial capacity") + + // Step 1: Add pending task (should reserve capacity via StorageSlotChange) + err := activeTopology.AddPendingTask(TaskSpec{ + TaskID: "balance_test", + TaskType: TaskTypeBalance, + VolumeID: 123, + VolumeSize: 1 * 1024 * 1024 * 1024, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.2:8080", DiskID: 0}, + }, + }) + assert.NoError(t, err, "Should add balance test successfully") + + sourceCapacityAfterPending := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + targetCapacityAfterPending := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0) + assert.Equal(t, int64(11), sourceCapacityAfterPending, "Source should gain capacity from pending balance task (balance source frees 1 slot)") + assert.Equal(t, int64(9), targetCapacityAfterPending, "Target should consume capacity from pending task (balance reserves 1 slot)") + + // Verify planning capacity considers the same pending tasks + planningDisks := activeTopology.GetDisksForPlanning(TaskTypeBalance, "", 1) + assert.Len(t, planningDisks, 2, "Both disks should be available for planning") + + // Step 2: Assign task (capacity already reserved by pending task) + err = activeTopology.AssignTask("balance_test") + assert.NoError(t, err, "Should assign task successfully") + + sourceCapacityAfterAssign := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + targetCapacityAfterAssign := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0) + + assert.Equal(t, int64(11), sourceCapacityAfterAssign, "Source capacity should remain same (already accounted by pending)") + assert.Equal(t, int64(9), targetCapacityAfterAssign, "Target capacity should remain same (already accounted by pending)") + + // Note: Detailed task state tracking (planned vs reserved) is no longer exposed via public API + // The important functionality is that capacity calculations remain consistent + + // Step 3: Complete task (should release reserved capacity) + err = activeTopology.CompleteTask("balance_test") + assert.NoError(t, err, "Should complete task successfully") + + sourceCapacityAfterComplete := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + targetCapacityAfterComplete := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0) + assert.Equal(t, int64(10), sourceCapacityAfterComplete, "Source should return to original capacity") + assert.Equal(t, int64(10), targetCapacityAfterComplete, "Target should return to original capacity") + + // Step 4: Apply actual storage change (simulates master topology update) + activeTopology.ApplyActualStorageChange("10.0.0.1:8080", 0, -1) // Source loses 1 volume + activeTopology.ApplyActualStorageChange("10.0.0.2:8080", 0, 1) // Target gains 1 volume + + // Final capacity should reflect actual topology changes + finalSourceCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + finalTargetCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0) + assert.Equal(t, int64(11), finalSourceCapacity, "Source: (20-9) = 11 after losing 1 volume") + assert.Equal(t, int64(9), finalTargetCapacity, "Target: (15-6) = 9 after gaining 1 volume") + + t.Logf("Capacity lifecycle with StorageSlotChange: Pending -> Assigned -> Released -> Applied") + t.Logf("Source: 10 -> 11 -> 11 -> 10 -> 11 (freed by pending balance, then applied)") + t.Logf("Target: 10 -> 9 -> 9 -> 10 -> 9 (reserved by pending, then applied)") +} + +// TestReplicatedVolumeECOperations tests EC operations on replicated volumes +func TestReplicatedVolumeECOperations(t *testing.T) { + activeTopology := NewActiveTopology(10) + + // Setup cluster with multiple servers for replicated volumes + activeTopology.UpdateTopology(&master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "10.0.0.1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 10}, + }, + }, + { + Id: "10.0.0.2:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 5}, + }, + }, + { + Id: "10.0.0.3:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 3}, + }, + }, + { + Id: "10.0.0.4:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 15}, + }, + }, + { + Id: "10.0.0.5:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 20}, + }, + }, + { + Id: "10.0.0.6:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 25}, + }, + }, + }, + }, + }, + }, + }, + }) + + // Test: EC operation on replicated volume (3 replicas) + volumeID := uint32(300) + originalVolumeSize := int64(1024 * 1024 * 1024) // 1GB + + // Create source specs for replicated volume (3 replicas) + sources := []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Replica 1 + {ServerID: "10.0.0.2:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Replica 2 + {ServerID: "10.0.0.3:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Replica 3 + } + + // EC destinations (shards distributed across different servers than sources) + shardDestinations := []string{ + "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", // 5 shards + "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", // 5 shards + "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", // 4 shards + } + shardDiskIDs := make([]uint32, len(shardDestinations)) + for i := range shardDiskIDs { + shardDiskIDs[i] = 0 + } + + expectedShardSize := int64(50 * 1024 * 1024) // 50MB per shard + + // Create destination specs + destinations := make([]TaskDestinationSpec, len(shardDestinations)) + shardImpact := CalculateECShardStorageImpact(1, expectedShardSize) + for i, dest := range shardDestinations { + destinations[i] = TaskDestinationSpec{ + ServerID: dest, + DiskID: shardDiskIDs[i], + StorageImpact: &shardImpact, + EstimatedSize: &expectedShardSize, + } + } + + // Create EC task for replicated volume + err := activeTopology.AddPendingTask(TaskSpec{ + TaskID: "ec_replicated", + TaskType: TaskTypeErasureCoding, + VolumeID: volumeID, + VolumeSize: originalVolumeSize, + Sources: sources, + Destinations: destinations, + }) + assert.NoError(t, err, "Should successfully create EC task for replicated volume") + + // Verify capacity impact on all source replicas (each should reserve with zero impact) + for i, source := range sources { + plannedVol, reservedVol, plannedShard, reservedShard, _ := testGetDiskStorageImpact(activeTopology, source.ServerID, source.DiskID) + assert.Equal(t, int64(0), plannedVol, fmt.Sprintf("Source replica %d should reserve with zero volume slot impact", i+1)) + assert.Equal(t, int64(0), reservedVol, fmt.Sprintf("Source replica %d should have no active volume slots", i+1)) + assert.Equal(t, int32(0), plannedShard, fmt.Sprintf("Source replica %d should have no planned shard slots", i+1)) + assert.Equal(t, int32(0), reservedShard, fmt.Sprintf("Source replica %d should have no active shard slots", i+1)) + // Note: EstimatedSize tracking is no longer exposed via public API + } + + // Verify capacity impact on EC destinations + destinationCounts := make(map[string]int) + for _, dest := range shardDestinations { + destinationCounts[dest]++ + } + + for serverID, expectedShards := range destinationCounts { + plannedVol, _, plannedShard, _, _ := testGetDiskStorageImpact(activeTopology, serverID, 0) + assert.Equal(t, int64(0), plannedVol, fmt.Sprintf("Destination %s should have no planned volume slots", serverID)) + assert.Equal(t, int32(expectedShards), plannedShard, fmt.Sprintf("Destination %s should plan to receive %d shards", serverID, expectedShards)) + } + + // Verify effective capacity calculation for sources (should have zero EC impact) + sourceCapacity1 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + sourceCapacity2 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0) + sourceCapacity3 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.3:8080", 0) + + // All sources should have same capacity as baseline (EC source reserves with zero impact) + assert.Equal(t, int64(90), sourceCapacity1, "Source 1: 100 - 10 (current) - 0 (EC source impact) = 90") + assert.Equal(t, int64(95), sourceCapacity2, "Source 2: 100 - 5 (current) - 0 (EC source impact) = 95") + assert.Equal(t, int64(97), sourceCapacity3, "Source 3: 100 - 3 (current) - 0 (EC source impact) = 97") + + // Verify effective capacity calculation for destinations (should be reduced by shard slots) + destCapacity4 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.4:8080", 0) + destCapacity5 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.5:8080", 0) + destCapacity6 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.6:8080", 0) + + // Dynamic shard impact calculations + dest4ShardImpact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact + dest5ShardImpact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact + dest6ShardImpact := int64(4 / erasure_coding.DataShardsCount) // 4 shards impact + + // Destinations should be reduced by shard impact + assert.Equal(t, int64(85-dest4ShardImpact), destCapacity4, fmt.Sprintf("Dest 4: 100 - 15 (current) - %d (5 shards/%d = %d impact) = %d", dest4ShardImpact, erasure_coding.DataShardsCount, dest4ShardImpact, 85-dest4ShardImpact)) + assert.Equal(t, int64(80-dest5ShardImpact), destCapacity5, fmt.Sprintf("Dest 5: 100 - 20 (current) - %d (5 shards/%d = %d impact) = %d", dest5ShardImpact, erasure_coding.DataShardsCount, dest5ShardImpact, 80-dest5ShardImpact)) + assert.Equal(t, int64(75-dest6ShardImpact), destCapacity6, fmt.Sprintf("Dest 6: 100 - 25 (current) - %d (4 shards/%d = %d impact) = %d", dest6ShardImpact, erasure_coding.DataShardsCount, dest6ShardImpact, 75-dest6ShardImpact)) + + t.Logf("Replicated volume EC operation: %d source replicas, %d EC shards distributed across %d destinations", + len(sources), len(shardDestinations), len(destinationCounts)) + t.Logf("Each source replica reserves with zero capacity impact, destinations receive EC shards") +} + +// TestECWithOldShardCleanup tests EC operations that need to clean up old shards from previous failed attempts +func TestECWithOldShardCleanup(t *testing.T) { + activeTopology := NewActiveTopology(10) + + // Setup cluster with servers + activeTopology.UpdateTopology(&master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "10.0.0.1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 10}, + }, + }, + { + Id: "10.0.0.2:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 5}, + }, + }, + { + Id: "10.0.0.3:8080", // Had old EC shards from previous failed attempt + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 3}, + }, + }, + { + Id: "10.0.0.4:8080", // Had old EC shards from previous failed attempt + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 7}, + }, + }, + { + Id: "10.0.0.5:8080", // New EC destination + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 20}, + }, + }, + { + Id: "10.0.0.6:8080", // New EC destination + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 25}, + }, + }, + }, + }, + }, + }, + }, + }) + + // Test: EC operation that needs to clean up both volume replicas AND old EC shards + volumeID := uint32(400) + originalVolumeSize := int64(1024 * 1024 * 1024) // 1GB + + // Create source specs: volume replicas + old EC shard locations + sources := []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Volume replica 1 + {ServerID: "10.0.0.2:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Volume replica 2 + {ServerID: "10.0.0.3:8080", DiskID: 0, CleanupType: CleanupECShards}, // Old EC shards from failed attempt + {ServerID: "10.0.0.4:8080", DiskID: 0, CleanupType: CleanupECShards}, // Old EC shards from failed attempt + } + + // EC destinations (new complete set of shards) + shardDestinations := []string{ + "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", // 5 shards + "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", // 4 more shards (9 total) + "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", // 5 shards + } + shardDiskIDs := make([]uint32, len(shardDestinations)) + for i := range shardDiskIDs { + shardDiskIDs[i] = 0 + } + + expectedShardSize := int64(50 * 1024 * 1024) // 50MB per shard + + // Create destination specs + destinations := make([]TaskDestinationSpec, len(shardDestinations)) + shardImpact := CalculateECShardStorageImpact(1, expectedShardSize) + for i, dest := range shardDestinations { + destinations[i] = TaskDestinationSpec{ + ServerID: dest, + DiskID: shardDiskIDs[i], + StorageImpact: &shardImpact, + EstimatedSize: &expectedShardSize, + } + } + + // Create EC task that cleans up both volume replicas and old EC shards + err := activeTopology.AddPendingTask(TaskSpec{ + TaskID: "ec_cleanup", + TaskType: TaskTypeErasureCoding, + VolumeID: volumeID, + VolumeSize: originalVolumeSize, + Sources: sources, + Destinations: destinations, + }) + assert.NoError(t, err, "Should successfully create EC task with mixed cleanup types") + + // Verify capacity impact on volume replica sources (zero impact for EC) + for i := 0; i < 2; i++ { + source := sources[i] + plannedVol, _, plannedShard, _, _ := testGetDiskStorageImpact(activeTopology, source.ServerID, source.DiskID) + assert.Equal(t, int64(0), plannedVol, fmt.Sprintf("Volume replica source %d should have zero volume slot impact", i+1)) + assert.Equal(t, int32(0), plannedShard, fmt.Sprintf("Volume replica source %d should have zero shard slot impact", i+1)) + // Note: EstimatedSize tracking is no longer exposed via public API + } + + // Verify capacity impact on old EC shard sources (should free shard slots) + for i := 2; i < 4; i++ { + source := sources[i] + plannedVol, _, plannedShard, _, _ := testGetDiskStorageImpact(activeTopology, source.ServerID, source.DiskID) + assert.Equal(t, int64(0), plannedVol, fmt.Sprintf("EC shard source %d should have zero volume slot impact", i+1)) + assert.Equal(t, int32(-erasure_coding.TotalShardsCount), plannedShard, fmt.Sprintf("EC shard source %d should free %d shard slots", i+1, erasure_coding.TotalShardsCount)) + // Note: EstimatedSize tracking is no longer exposed via public API + } + + // Verify capacity impact on new EC destinations + destPlan5, _, destShard5, _, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.5:8080", 0) + destPlan6, _, destShard6, _, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.6:8080", 0) + + assert.Equal(t, int64(0), destPlan5, "New EC destination 5 should have no planned volume slots") + assert.Equal(t, int32(9), destShard5, "New EC destination 5 should plan to receive 9 shards") + assert.Equal(t, int64(0), destPlan6, "New EC destination 6 should have no planned volume slots") + assert.Equal(t, int32(5), destShard6, "New EC destination 6 should plan to receive 5 shards") + + // Verify effective capacity calculation shows proper impact + capacity3 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.3:8080", 0) // Freeing old EC shards + capacity4 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.4:8080", 0) // Freeing old EC shards + capacity5 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.5:8080", 0) // Receiving new EC shards + capacity6 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.6:8080", 0) // Receiving new EC shards + + // Servers freeing old EC shards should have INCREASED capacity (freed shard slots provide capacity) + assert.Equal(t, int64(98), capacity3, fmt.Sprintf("Server 3: 100 - 3 (current) + 1 (freeing %d shards) = 98", erasure_coding.TotalShardsCount)) + assert.Equal(t, int64(94), capacity4, fmt.Sprintf("Server 4: 100 - 7 (current) + 1 (freeing %d shards) = 94", erasure_coding.TotalShardsCount)) + + // Servers receiving new EC shards should have slightly reduced capacity + server5ShardImpact := int64(9 / erasure_coding.DataShardsCount) // 9 shards impact + server6ShardImpact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact + + assert.Equal(t, int64(80-server5ShardImpact), capacity5, fmt.Sprintf("Server 5: 100 - 20 (current) - %d (9 shards/%d = %d impact) = %d", server5ShardImpact, erasure_coding.DataShardsCount, server5ShardImpact, 80-server5ShardImpact)) + assert.Equal(t, int64(75-server6ShardImpact), capacity6, fmt.Sprintf("Server 6: 100 - 25 (current) - %d (5 shards/%d = %d impact) = %d", server6ShardImpact, erasure_coding.DataShardsCount, server6ShardImpact, 75-server6ShardImpact)) + + t.Logf("EC operation with cleanup: %d volume replicas + %d old EC shard locations → %d new EC shards", + 2, 2, len(shardDestinations)) + t.Logf("Volume sources have zero impact, old EC shard sources free capacity, new destinations consume shard slots") +} + +// TestDetailedCapacityCalculations tests the new StorageSlotChange-based capacity calculation functions +func TestDetailedCapacityCalculations(t *testing.T) { + activeTopology := NewActiveTopology(10) + + // Setup cluster + activeTopology.UpdateTopology(&master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "10.0.0.1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 20}, + }, + }, + }, + }, + }, + }, + }, + }) + + // Test: Add an EC task and check detailed capacity + sources := []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, + } + + shardDestinations := []string{"10.0.0.1:8080", "10.0.0.1:8080", "10.0.0.1:8080", "10.0.0.1:8080", "10.0.0.1:8080"} + shardDiskIDs := []uint32{0, 0, 0, 0, 0} + + // Create destination specs + destinations := make([]TaskDestinationSpec, len(shardDestinations)) + expectedShardSize := int64(50 * 1024 * 1024) + shardImpact := CalculateECShardStorageImpact(1, expectedShardSize) + for i, dest := range shardDestinations { + destinations[i] = TaskDestinationSpec{ + ServerID: dest, + DiskID: shardDiskIDs[i], + StorageImpact: &shardImpact, + EstimatedSize: &expectedShardSize, + } + } + + err := activeTopology.AddPendingTask(TaskSpec{ + TaskID: "detailed_test", + TaskType: TaskTypeErasureCoding, + VolumeID: 500, + VolumeSize: 1024 * 1024 * 1024, + Sources: sources, + Destinations: destinations, + }) + assert.NoError(t, err, "Should add EC task successfully") + + // Test the new detailed capacity function + detailedCapacity := activeTopology.GetEffectiveAvailableCapacityDetailed("10.0.0.1:8080", 0) + simpleCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0) + + // The simple capacity should match the volume slots from detailed capacity + assert.Equal(t, int64(detailedCapacity.VolumeSlots), simpleCapacity, "Simple capacity should match detailed volume slots") + + // Verify detailed capacity has both volume and shard information + assert.Equal(t, int32(80), detailedCapacity.VolumeSlots, "Should have 80 available volume slots (100 - 20 current, no volume impact from EC)") + assert.Equal(t, int32(-5), detailedCapacity.ShardSlots, "Should show -5 available shard slots (5 destination shards)") + + // Verify capacity impact + capacityImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.1:8080", 0) + assert.Equal(t, int32(0), capacityImpact.VolumeSlots, "EC source should have zero volume slot impact") + assert.Equal(t, int32(5), capacityImpact.ShardSlots, "Should have positive shard slot impact (consuming 5 shards)") + + t.Logf("Detailed capacity calculation: VolumeSlots=%d, ShardSlots=%d", + detailedCapacity.VolumeSlots, detailedCapacity.ShardSlots) + t.Logf("Capacity impact: VolumeSlots=%d, ShardSlots=%d", + capacityImpact.VolumeSlots, capacityImpact.ShardSlots) + t.Logf("Simple capacity (backward compatible): %d", simpleCapacity) +} + +// TestStorageSlotChangeConversions tests the conversion and accommodation methods for StorageSlotChange +// This test is designed to work with any value of erasure_coding.DataShardsCount, making it +// compatible with custom erasure coding configurations. +func TestStorageSlotChangeConversions(t *testing.T) { + // Get the actual erasure coding constants for dynamic testing + dataShards := int32(erasure_coding.DataShardsCount) + + // Test conversion constants + assert.Equal(t, int(dataShards), ShardsPerVolumeSlot, fmt.Sprintf("Should use erasure_coding.DataShardsCount (%d) shards per volume slot", dataShards)) + + // Test basic conversions using dynamic values + volumeOnly := StorageSlotChange{VolumeSlots: 5, ShardSlots: 0} + shardOnly := StorageSlotChange{VolumeSlots: 0, ShardSlots: 2 * dataShards} // 2 volume equivalents in shards + mixed := StorageSlotChange{VolumeSlots: 2, ShardSlots: dataShards + 5} // 2 volumes + 1.5 volume equivalent in shards + + // Test ToVolumeSlots conversion - these should work regardless of DataShardsCount value + assert.Equal(t, int64(5), volumeOnly.ToVolumeSlots(), "5 volume slots = 5 volume slots") + assert.Equal(t, int64(2), shardOnly.ToVolumeSlots(), fmt.Sprintf("%d shard slots = 2 volume slots", 2*dataShards)) + expectedMixedVolumes := int64(2 + (dataShards+5)/dataShards) // 2 + floor((DataShardsCount+5)/DataShardsCount) + assert.Equal(t, expectedMixedVolumes, mixed.ToVolumeSlots(), fmt.Sprintf("2 volume + %d shards = %d volume slots", dataShards+5, expectedMixedVolumes)) + + // Test ToShardSlots conversion + expectedVolumeShards := int32(5 * dataShards) + assert.Equal(t, expectedVolumeShards, volumeOnly.ToShardSlots(), fmt.Sprintf("5 volume slots = %d shard slots", expectedVolumeShards)) + assert.Equal(t, 2*dataShards, shardOnly.ToShardSlots(), fmt.Sprintf("%d shard slots = %d shard slots", 2*dataShards, 2*dataShards)) + expectedMixedShards := int32(2*dataShards + dataShards + 5) + assert.Equal(t, expectedMixedShards, mixed.ToShardSlots(), fmt.Sprintf("2 volume + %d shards = %d shard slots", dataShards+5, expectedMixedShards)) + + // Test capacity accommodation checks using shard-based comparison + availableVolumes := int32(10) + available := StorageSlotChange{VolumeSlots: availableVolumes, ShardSlots: 0} // availableVolumes * dataShards shard slots available + + smallVolumeRequest := StorageSlotChange{VolumeSlots: 3, ShardSlots: 0} // Needs 3 * dataShards shard slots + largeVolumeRequest := StorageSlotChange{VolumeSlots: availableVolumes + 5, ShardSlots: 0} // Needs more than available + shardRequest := StorageSlotChange{VolumeSlots: 0, ShardSlots: 5 * dataShards} // Needs 5 volume equivalents in shards + mixedRequest := StorageSlotChange{VolumeSlots: 8, ShardSlots: 3 * dataShards} // Needs 11 volume equivalents total + + smallShardsNeeded := 3 * dataShards + availableShards := availableVolumes * dataShards + largeShardsNeeded := (availableVolumes + 5) * dataShards + shardShardsNeeded := 5 * dataShards + mixedShardsNeeded := 8*dataShards + 3*dataShards + + assert.True(t, available.CanAccommodate(smallVolumeRequest), fmt.Sprintf("Should accommodate small volume request (%d <= %d shards)", smallShardsNeeded, availableShards)) + assert.False(t, available.CanAccommodate(largeVolumeRequest), fmt.Sprintf("Should NOT accommodate large volume request (%d > %d shards)", largeShardsNeeded, availableShards)) + assert.True(t, available.CanAccommodate(shardRequest), fmt.Sprintf("Should accommodate shard request (%d <= %d shards)", shardShardsNeeded, availableShards)) + assert.False(t, available.CanAccommodate(mixedRequest), fmt.Sprintf("Should NOT accommodate mixed request (%d > %d shards)", mixedShardsNeeded, availableShards)) + + t.Logf("Conversion tests passed: %d shards = 1 volume slot", ShardsPerVolumeSlot) + t.Logf("Mixed capacity (%d volumes + %d shards) = %d equivalent volume slots", + mixed.VolumeSlots, mixed.ShardSlots, mixed.ToVolumeSlots()) + t.Logf("Available capacity (%d volumes) = %d total shard slots", + available.VolumeSlots, available.ToShardSlots()) + t.Logf("NOTE: This test adapts automatically to erasure_coding.DataShardsCount = %d", erasure_coding.DataShardsCount) +} diff --git a/weed/admin/topology/structs.go b/weed/admin/topology/structs.go new file mode 100644 index 000000000..f2d29eb5f --- /dev/null +++ b/weed/admin/topology/structs.go @@ -0,0 +1,120 @@ +package topology + +import ( + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +// TaskSource represents a single source in a multi-source task (for replicated volume cleanup) +type TaskSource struct { + SourceServer string `json:"source_server"` + SourceDisk uint32 `json:"source_disk"` + StorageChange StorageSlotChange `json:"storage_change"` // Storage impact on this source + EstimatedSize int64 `json:"estimated_size"` // Estimated size for this source +} + +// TaskDestination represents a single destination in a multi-destination task +type TaskDestination struct { + TargetServer string `json:"target_server"` + TargetDisk uint32 `json:"target_disk"` + StorageChange StorageSlotChange `json:"storage_change"` // Storage impact on this destination + EstimatedSize int64 `json:"estimated_size"` // Estimated size for this destination +} + +// taskState represents the current state of tasks affecting the topology (internal) +// Uses unified multi-source/multi-destination design: +// - Single-source tasks (balance, vacuum, replication): 1 source, 1 destination +// - Multi-source EC tasks (replicated volumes): N sources, M destinations +type taskState struct { + VolumeID uint32 `json:"volume_id"` + TaskType TaskType `json:"task_type"` + Status TaskStatus `json:"status"` + StartedAt time.Time `json:"started_at"` + CompletedAt time.Time `json:"completed_at,omitempty"` + EstimatedSize int64 `json:"estimated_size"` // Total estimated size of task + + // Unified source and destination arrays (always used) + Sources []TaskSource `json:"sources"` // Source locations (1+ for all task types) + Destinations []TaskDestination `json:"destinations"` // Destination locations (1+ for all task types) +} + +// DiskInfo represents a disk with its current state and ongoing tasks (public for external access) +type DiskInfo struct { + NodeID string `json:"node_id"` + DiskID uint32 `json:"disk_id"` + DiskType string `json:"disk_type"` + DataCenter string `json:"data_center"` + Rack string `json:"rack"` + DiskInfo *master_pb.DiskInfo `json:"disk_info"` + LoadCount int `json:"load_count"` // Number of active tasks +} + +// activeDisk represents internal disk state (private) +type activeDisk struct { + *DiskInfo + pendingTasks []*taskState + assignedTasks []*taskState + recentTasks []*taskState // Completed in last N seconds +} + +// activeNode represents a node with its disks (private) +type activeNode struct { + nodeID string + dataCenter string + rack string + nodeInfo *master_pb.DataNodeInfo + disks map[uint32]*activeDisk // DiskID -> activeDisk +} + +// ActiveTopology provides a real-time view of cluster state with task awareness +type ActiveTopology struct { + // Core topology from master + topologyInfo *master_pb.TopologyInfo + lastUpdated time.Time + + // Structured topology for easy access (private) + nodes map[string]*activeNode // NodeID -> activeNode + disks map[string]*activeDisk // "NodeID:DiskID" -> activeDisk + + // Performance indexes for O(1) lookups (private) + volumeIndex map[uint32][]string // VolumeID -> list of "NodeID:DiskID" where volume replicas exist + ecShardIndex map[uint32][]string // VolumeID -> list of "NodeID:DiskID" where EC shards exist + + // Task states affecting the topology (private) + pendingTasks map[string]*taskState + assignedTasks map[string]*taskState + recentTasks map[string]*taskState + + // Configuration + recentTaskWindowSeconds int + + // Synchronization + mutex sync.RWMutex +} + +// DestinationPlan represents a planned destination for a volume/shard operation +type DestinationPlan struct { + TargetNode string `json:"target_node"` + TargetDisk uint32 `json:"target_disk"` + TargetRack string `json:"target_rack"` + TargetDC string `json:"target_dc"` + ExpectedSize uint64 `json:"expected_size"` + PlacementScore float64 `json:"placement_score"` + Conflicts []string `json:"conflicts"` +} + +// MultiDestinationPlan represents multiple planned destinations for operations like EC +type MultiDestinationPlan struct { + Plans []*DestinationPlan `json:"plans"` + TotalShards int `json:"total_shards"` + SuccessfulRack int `json:"successful_racks"` + SuccessfulDCs int `json:"successful_dcs"` +} + +// VolumeReplica represents a replica location with server and disk information +type VolumeReplica struct { + ServerID string `json:"server_id"` + DiskID uint32 `json:"disk_id"` +} diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go new file mode 100644 index 000000000..b240adcd8 --- /dev/null +++ b/weed/admin/topology/task_management.go @@ -0,0 +1,264 @@ +package topology + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +// AssignTask moves a task from pending to assigned and reserves capacity +func (at *ActiveTopology) AssignTask(taskID string) error { + at.mutex.Lock() + defer at.mutex.Unlock() + + task, exists := at.pendingTasks[taskID] + if !exists { + return fmt.Errorf("pending task %s not found", taskID) + } + + // Check if all destination disks have sufficient capacity to reserve + for _, dest := range task.Destinations { + targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk) + if targetDisk, exists := at.disks[targetKey]; exists { + availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk) + + // Check if we have enough total capacity using the improved unified comparison + if !availableCapacity.CanAccommodate(dest.StorageChange) { + return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v", + dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange) + } + } else if dest.TargetServer != "" { + // Fail fast if destination disk is not found in topology + return fmt.Errorf("destination disk %s not found in topology", targetKey) + } + } + + // Move task to assigned and reserve capacity + delete(at.pendingTasks, taskID) + task.Status = TaskStatusInProgress + at.assignedTasks[taskID] = task + at.reassignTaskStates() + + // Log capacity reservation information for all sources and destinations + totalSourceImpact := StorageSlotChange{} + totalDestImpact := StorageSlotChange{} + for _, source := range task.Sources { + totalSourceImpact.AddInPlace(source.StorageChange) + } + for _, dest := range task.Destinations { + totalDestImpact.AddInPlace(dest.StorageChange) + } + + glog.V(2).Infof("Task %s assigned and capacity reserved: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)", + taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots, + len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots) + + return nil +} + +// CompleteTask moves a task from assigned to recent and releases reserved capacity +// NOTE: This only releases the reserved capacity. The actual topology update (VolumeCount changes) +// should be handled by the master when it receives the task completion notification. +func (at *ActiveTopology) CompleteTask(taskID string) error { + at.mutex.Lock() + defer at.mutex.Unlock() + + task, exists := at.assignedTasks[taskID] + if !exists { + return fmt.Errorf("assigned task %s not found", taskID) + } + + // Release reserved capacity by moving task to completed state + delete(at.assignedTasks, taskID) + task.Status = TaskStatusCompleted + task.CompletedAt = time.Now() + at.recentTasks[taskID] = task + at.reassignTaskStates() + + // Log capacity release information for all sources and destinations + totalSourceImpact := StorageSlotChange{} + totalDestImpact := StorageSlotChange{} + for _, source := range task.Sources { + totalSourceImpact.AddInPlace(source.StorageChange) + } + for _, dest := range task.Destinations { + totalDestImpact.AddInPlace(dest.StorageChange) + } + + glog.V(2).Infof("Task %s completed and capacity released: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)", + taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots, + len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots) + + // Clean up old recent tasks + at.cleanupRecentTasks() + + return nil +} + +// ApplyActualStorageChange updates the topology to reflect actual storage changes after task completion +// This should be called when the master updates the topology with new VolumeCount information +func (at *ActiveTopology) ApplyActualStorageChange(nodeID string, diskID uint32, volumeCountChange int64) { + at.mutex.Lock() + defer at.mutex.Unlock() + + diskKey := fmt.Sprintf("%s:%d", nodeID, diskID) + if disk, exists := at.disks[diskKey]; exists && disk.DiskInfo != nil && disk.DiskInfo.DiskInfo != nil { + oldCount := disk.DiskInfo.DiskInfo.VolumeCount + disk.DiskInfo.DiskInfo.VolumeCount += volumeCountChange + + glog.V(2).Infof("Applied actual storage change on disk %s: volume_count %d -> %d (change: %+d)", + diskKey, oldCount, disk.DiskInfo.DiskInfo.VolumeCount, volumeCountChange) + } +} + +// AddPendingTask is the unified function that handles both simple and complex task creation +func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error { + // Validation + if len(spec.Sources) == 0 { + return fmt.Errorf("at least one source is required") + } + if len(spec.Destinations) == 0 { + return fmt.Errorf("at least one destination is required") + } + + at.mutex.Lock() + defer at.mutex.Unlock() + + // Build sources array + sources := make([]TaskSource, len(spec.Sources)) + for i, sourceSpec := range spec.Sources { + var storageImpact StorageSlotChange + var estimatedSize int64 + + if sourceSpec.StorageImpact != nil { + // Use manually specified impact + storageImpact = *sourceSpec.StorageImpact + } else { + // Auto-calculate based on task type and cleanup type + storageImpact = at.calculateSourceStorageImpact(spec.TaskType, sourceSpec.CleanupType, spec.VolumeSize) + } + + if sourceSpec.EstimatedSize != nil { + estimatedSize = *sourceSpec.EstimatedSize + } else { + estimatedSize = spec.VolumeSize // Default to volume size + } + + sources[i] = TaskSource{ + SourceServer: sourceSpec.ServerID, + SourceDisk: sourceSpec.DiskID, + StorageChange: storageImpact, + EstimatedSize: estimatedSize, + } + } + + // Build destinations array + destinations := make([]TaskDestination, len(spec.Destinations)) + for i, destSpec := range spec.Destinations { + var storageImpact StorageSlotChange + var estimatedSize int64 + + if destSpec.StorageImpact != nil { + // Use manually specified impact + storageImpact = *destSpec.StorageImpact + } else { + // Auto-calculate based on task type + _, storageImpact = CalculateTaskStorageImpact(spec.TaskType, spec.VolumeSize) + } + + if destSpec.EstimatedSize != nil { + estimatedSize = *destSpec.EstimatedSize + } else { + estimatedSize = spec.VolumeSize // Default to volume size + } + + destinations[i] = TaskDestination{ + TargetServer: destSpec.ServerID, + TargetDisk: destSpec.DiskID, + StorageChange: storageImpact, + EstimatedSize: estimatedSize, + } + } + + // Create the task + task := &taskState{ + VolumeID: spec.VolumeID, + TaskType: spec.TaskType, + Status: TaskStatusPending, + StartedAt: time.Now(), + EstimatedSize: spec.VolumeSize, + Sources: sources, + Destinations: destinations, + } + + at.pendingTasks[spec.TaskID] = task + at.assignTaskToDisk(task) + + glog.V(2).Infof("Added pending %s task %s: volume %d, %d sources, %d destinations", + spec.TaskType, spec.TaskID, spec.VolumeID, len(sources), len(destinations)) + + return nil +} + +// calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type +func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange { + switch taskType { + case TaskTypeErasureCoding: + switch cleanupType { + case CleanupVolumeReplica: + impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize) + return impact + case CleanupECShards: + return CalculateECShardCleanupImpact(volumeSize) + default: + impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize) + return impact + } + default: + impact, _ := CalculateTaskStorageImpact(taskType, volumeSize) + return impact + } +} + +// SourceCleanupType indicates what type of data needs to be cleaned up from a source +type SourceCleanupType int + +const ( + CleanupVolumeReplica SourceCleanupType = iota // Clean up volume replica (frees volume slots) + CleanupECShards // Clean up existing EC shards (frees shard slots) +) + +// TaskSourceSpec represents a source specification for task creation +type TaskSourceSpec struct { + ServerID string + DiskID uint32 + CleanupType SourceCleanupType // For EC: volume replica vs existing shards + StorageImpact *StorageSlotChange // Optional: manual override + EstimatedSize *int64 // Optional: manual override +} + +// TaskDestinationSpec represents a destination specification for task creation +type TaskDestinationSpec struct { + ServerID string + DiskID uint32 + StorageImpact *StorageSlotChange // Optional: manual override + EstimatedSize *int64 // Optional: manual override +} + +// TaskSpec represents a complete task specification +type TaskSpec struct { + TaskID string + TaskType TaskType + VolumeID uint32 + VolumeSize int64 // Used for auto-calculation when manual impacts not provided + Sources []TaskSourceSpec // Can be single or multiple + Destinations []TaskDestinationSpec // Can be single or multiple +} + +// TaskSourceLocation represents a source location for task creation (DEPRECATED: use TaskSourceSpec) +type TaskSourceLocation struct { + ServerID string + DiskID uint32 + CleanupType SourceCleanupType // What type of cleanup is needed +} diff --git a/weed/admin/topology/topology_management.go b/weed/admin/topology/topology_management.go new file mode 100644 index 000000000..e12839484 --- /dev/null +++ b/weed/admin/topology/topology_management.go @@ -0,0 +1,253 @@ +package topology + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +// UpdateTopology updates the topology information from master +func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error { + at.mutex.Lock() + defer at.mutex.Unlock() + + at.topologyInfo = topologyInfo + at.lastUpdated = time.Now() + + // Rebuild structured topology + at.nodes = make(map[string]*activeNode) + at.disks = make(map[string]*activeDisk) + + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, nodeInfo := range rack.DataNodeInfos { + node := &activeNode{ + nodeID: nodeInfo.Id, + dataCenter: dc.Id, + rack: rack.Id, + nodeInfo: nodeInfo, + disks: make(map[uint32]*activeDisk), + } + + // Add disks for this node + for diskType, diskInfo := range nodeInfo.DiskInfos { + disk := &activeDisk{ + DiskInfo: &DiskInfo{ + NodeID: nodeInfo.Id, + DiskID: diskInfo.DiskId, + DiskType: diskType, + DataCenter: dc.Id, + Rack: rack.Id, + DiskInfo: diskInfo, + }, + } + + diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId) + node.disks[diskInfo.DiskId] = disk + at.disks[diskKey] = disk + } + + at.nodes[nodeInfo.Id] = node + } + } + } + + // Rebuild performance indexes for O(1) lookups + at.rebuildIndexes() + + // Reassign task states to updated topology + at.reassignTaskStates() + + glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks, %d volume entries, %d EC shard entries", + len(at.nodes), len(at.disks), len(at.volumeIndex), len(at.ecShardIndex)) + return nil +} + +// GetAvailableDisks returns disks that can accept new tasks of the given type +// NOTE: For capacity-aware operations, prefer GetDisksWithEffectiveCapacity +func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo { + at.mutex.RLock() + defer at.mutex.RUnlock() + + var available []*DiskInfo + + for _, disk := range at.disks { + if disk.NodeID == excludeNodeID { + continue // Skip excluded node + } + + if at.isDiskAvailable(disk, taskType) { + // Create a copy with current load count and effective capacity + diskCopy := *disk.DiskInfo + diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks) + available = append(available, &diskCopy) + } + } + + return available +} + +// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection) +func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool { + at.mutex.RLock() + defer at.mutex.RUnlock() + + for _, task := range at.recentTasks { + if task.VolumeID == volumeID && task.TaskType == taskType { + return true + } + } + + return false +} + +// GetAllNodes returns information about all nodes (public interface) +func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo { + at.mutex.RLock() + defer at.mutex.RUnlock() + + result := make(map[string]*master_pb.DataNodeInfo) + for nodeID, node := range at.nodes { + result[nodeID] = node.nodeInfo + } + return result +} + +// GetTopologyInfo returns the current topology information (read-only access) +func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo { + at.mutex.RLock() + defer at.mutex.RUnlock() + return at.topologyInfo +} + +// GetNodeDisks returns all disks for a specific node +func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo { + at.mutex.RLock() + defer at.mutex.RUnlock() + + node, exists := at.nodes[nodeID] + if !exists { + return nil + } + + var disks []*DiskInfo + for _, disk := range node.disks { + diskCopy := *disk.DiskInfo + diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks) + disks = append(disks, &diskCopy) + } + + return disks +} + +// rebuildIndexes rebuilds the volume and EC shard indexes for O(1) lookups +func (at *ActiveTopology) rebuildIndexes() { + // Clear existing indexes + at.volumeIndex = make(map[uint32][]string) + at.ecShardIndex = make(map[uint32][]string) + + // Rebuild indexes from current topology + for _, dc := range at.topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, nodeInfo := range rack.DataNodeInfos { + for _, diskInfo := range nodeInfo.DiskInfos { + diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId) + + // Index volumes + for _, volumeInfo := range diskInfo.VolumeInfos { + volumeID := volumeInfo.Id + at.volumeIndex[volumeID] = append(at.volumeIndex[volumeID], diskKey) + } + + // Index EC shards + for _, ecShardInfo := range diskInfo.EcShardInfos { + volumeID := ecShardInfo.Id + at.ecShardIndex[volumeID] = append(at.ecShardIndex[volumeID], diskKey) + } + } + } + } + } +} + +// GetVolumeLocations returns the disk locations for a volume using O(1) lookup +func (at *ActiveTopology) GetVolumeLocations(volumeID uint32, collection string) []VolumeReplica { + at.mutex.RLock() + defer at.mutex.RUnlock() + + diskKeys, exists := at.volumeIndex[volumeID] + if !exists { + return []VolumeReplica{} + } + + var replicas []VolumeReplica + for _, diskKey := range diskKeys { + if disk, diskExists := at.disks[diskKey]; diskExists { + // Verify collection matches (since index doesn't include collection) + if at.volumeMatchesCollection(disk, volumeID, collection) { + replicas = append(replicas, VolumeReplica{ + ServerID: disk.NodeID, + DiskID: disk.DiskID, + }) + } + } + } + + return replicas +} + +// GetECShardLocations returns the disk locations for EC shards using O(1) lookup +func (at *ActiveTopology) GetECShardLocations(volumeID uint32, collection string) []VolumeReplica { + at.mutex.RLock() + defer at.mutex.RUnlock() + + diskKeys, exists := at.ecShardIndex[volumeID] + if !exists { + return []VolumeReplica{} + } + + var ecShards []VolumeReplica + for _, diskKey := range diskKeys { + if disk, diskExists := at.disks[diskKey]; diskExists { + // Verify collection matches (since index doesn't include collection) + if at.ecShardMatchesCollection(disk, volumeID, collection) { + ecShards = append(ecShards, VolumeReplica{ + ServerID: disk.NodeID, + DiskID: disk.DiskID, + }) + } + } + } + + return ecShards +} + +// volumeMatchesCollection checks if a volume on a disk matches the given collection +func (at *ActiveTopology) volumeMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool { + if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil { + return false + } + + for _, volumeInfo := range disk.DiskInfo.DiskInfo.VolumeInfos { + if volumeInfo.Id == volumeID && volumeInfo.Collection == collection { + return true + } + } + return false +} + +// ecShardMatchesCollection checks if EC shards on a disk match the given collection +func (at *ActiveTopology) ecShardMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool { + if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil { + return false + } + + for _, ecShardInfo := range disk.DiskInfo.DiskInfo.EcShardInfos { + if ecShardInfo.Id == volumeID && ecShardInfo.Collection == collection { + return true + } + } + return false +} diff --git a/weed/admin/topology/types.go b/weed/admin/topology/types.go new file mode 100644 index 000000000..df0103529 --- /dev/null +++ b/weed/admin/topology/types.go @@ -0,0 +1,97 @@ +package topology + +import "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + +// TaskType represents different types of maintenance operations +type TaskType string + +// TaskStatus represents the current status of a task +type TaskStatus string + +// Common task type constants +const ( + TaskTypeVacuum TaskType = "vacuum" + TaskTypeBalance TaskType = "balance" + TaskTypeErasureCoding TaskType = "erasure_coding" + TaskTypeReplication TaskType = "replication" +) + +// Common task status constants +const ( + TaskStatusPending TaskStatus = "pending" + TaskStatusInProgress TaskStatus = "in_progress" + TaskStatusCompleted TaskStatus = "completed" +) + +// Task and capacity management configuration constants +const ( + // MaxConcurrentTasksPerDisk defines the maximum number of concurrent tasks per disk + // This prevents overloading a single disk with too many simultaneous operations + MaxConcurrentTasksPerDisk = 2 + + // MaxTotalTaskLoadPerDisk defines the maximum total task load (pending + active) per disk + // This allows more tasks to be queued but limits the total pipeline depth + MaxTotalTaskLoadPerDisk = 3 + + // MaxTaskLoadForECPlacement defines the maximum task load to consider a disk for EC placement + // This threshold ensures disks aren't overloaded when planning EC operations + MaxTaskLoadForECPlacement = 10 +) + +// StorageSlotChange represents storage impact at both volume and shard levels +type StorageSlotChange struct { + VolumeSlots int32 `json:"volume_slots"` // Volume-level slot changes (full volumes) + ShardSlots int32 `json:"shard_slots"` // Shard-level slot changes (EC shards, fractional capacity) +} + +// Add returns a new StorageSlotChange with the sum of this and other +func (s StorageSlotChange) Add(other StorageSlotChange) StorageSlotChange { + return StorageSlotChange{ + VolumeSlots: s.VolumeSlots + other.VolumeSlots, + ShardSlots: s.ShardSlots + other.ShardSlots, + } +} + +// Subtract returns a new StorageSlotChange with other subtracted from this +func (s StorageSlotChange) Subtract(other StorageSlotChange) StorageSlotChange { + return StorageSlotChange{ + VolumeSlots: s.VolumeSlots - other.VolumeSlots, + ShardSlots: s.ShardSlots - other.ShardSlots, + } +} + +// AddInPlace adds other to this StorageSlotChange in-place +func (s *StorageSlotChange) AddInPlace(other StorageSlotChange) { + s.VolumeSlots += other.VolumeSlots + s.ShardSlots += other.ShardSlots +} + +// SubtractInPlace subtracts other from this StorageSlotChange in-place +func (s *StorageSlotChange) SubtractInPlace(other StorageSlotChange) { + s.VolumeSlots -= other.VolumeSlots + s.ShardSlots -= other.ShardSlots +} + +// IsZero returns true if both VolumeSlots and ShardSlots are zero +func (s StorageSlotChange) IsZero() bool { + return s.VolumeSlots == 0 && s.ShardSlots == 0 +} + +// ShardsPerVolumeSlot defines how many EC shards are equivalent to one volume slot +const ShardsPerVolumeSlot = erasure_coding.DataShardsCount + +// ToVolumeSlots converts the entire StorageSlotChange to equivalent volume slots +func (s StorageSlotChange) ToVolumeSlots() int64 { + return int64(s.VolumeSlots) + int64(s.ShardSlots)/ShardsPerVolumeSlot +} + +// ToShardSlots converts the entire StorageSlotChange to equivalent shard slots +func (s StorageSlotChange) ToShardSlots() int32 { + return s.ShardSlots + s.VolumeSlots*ShardsPerVolumeSlot +} + +// CanAccommodate checks if this StorageSlotChange can accommodate the required StorageSlotChange +// Both are converted to shard slots for a more precise comparison +func (s StorageSlotChange) CanAccommodate(required StorageSlotChange) bool { + return s.ToShardSlots() >= required.ToShardSlots() +} diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 23fbbd546..a83b33341 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -4,13 +4,14 @@ import ( "context" "database/sql" "fmt" + "strings" + "sync" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" "github.com/seaweedfs/seaweedfs/weed/util" - "strings" - "sync" ) type SqlGenerator interface { diff --git a/weed/pb/worker.proto b/weed/pb/worker.proto index 0ab115bb2..811f94591 100644 --- a/weed/pb/worker.proto +++ b/weed/pb/worker.proto @@ -94,6 +94,7 @@ message TaskAssignment { // TaskParams contains task-specific parameters with typed variants message TaskParams { + string task_id = 12; // ActiveTopology task ID for lifecycle management uint32 volume_id = 1; string server = 2; string collection = 3; diff --git a/weed/pb/worker_pb/worker.pb.go b/weed/pb/worker_pb/worker.pb.go index f6b3e9fb1..ff7d60545 100644 --- a/weed/pb/worker_pb/worker.pb.go +++ b/weed/pb/worker_pb/worker.pb.go @@ -804,6 +804,7 @@ func (x *TaskAssignment) GetMetadata() map[string]string { // TaskParams contains task-specific parameters with typed variants type TaskParams struct { state protoimpl.MessageState `protogen:"open.v1"` + TaskId string `protobuf:"bytes,12,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` // ActiveTopology task ID for lifecycle management VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"` Server string `protobuf:"bytes,2,opt,name=server,proto3" json:"server,omitempty"` Collection string `protobuf:"bytes,3,opt,name=collection,proto3" json:"collection,omitempty"` @@ -854,6 +855,13 @@ func (*TaskParams) Descriptor() ([]byte, []int) { return file_worker_proto_rawDescGZIP(), []int{8} } +func (x *TaskParams) GetTaskId() string { + if x != nil { + return x.TaskId + } + return "" +} + func (x *TaskParams) GetVolumeId() uint32 { if x != nil { return x.VolumeId @@ -2869,9 +2877,10 @@ const file_worker_proto_rawDesc = "" + "\bmetadata\x18\x06 \x03(\v2'.worker_pb.TaskAssignment.MetadataEntryR\bmetadata\x1a;\n" + "\rMetadataEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + - "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x9a\x04\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xb3\x04\n" + "\n" + - "TaskParams\x12\x1b\n" + + "TaskParams\x12\x17\n" + + "\atask_id\x18\f \x01(\tR\x06taskId\x12\x1b\n" + "\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x16\n" + "\x06server\x18\x02 \x01(\tR\x06server\x12\x1e\n" + "\n" + diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 102f532a8..be03fb92f 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -83,7 +83,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + // Generate task ID for ActiveTopology integration + taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix()) + task := &types.TaskDetectionResult{ + TaskID: taskID, // Link to ActiveTopology pending task TaskType: types.TaskTypeBalance, VolumeID: selectedVolume.VolumeID, Server: selectedVolume.Server, @@ -103,6 +107,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Create typed parameters with destination information task.TypedParams = &worker_pb.TaskParams{ + TaskId: taskID, // Link to ActiveTopology pending task VolumeId: selectedVolume.VolumeID, Server: selectedVolume.Server, Collection: selectedVolume.Collection, @@ -121,6 +126,35 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s (score: %.2f)", selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode, destinationPlan.PlacementScore) + + // Add pending balance task to ActiveTopology for capacity management + + // Find the actual disk containing the volume on the source server + sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + if !found { + return nil, fmt.Errorf("BALANCE: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", + selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + } + targetDisk := destinationPlan.TargetDisk + + err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ + TaskID: taskID, + TaskType: topology.TaskTypeBalance, + VolumeID: selectedVolume.VolumeID, + VolumeSize: int64(selectedVolume.Size), + Sources: []topology.TaskSourceSpec{ + {ServerID: selectedVolume.Server, DiskID: sourceDisk}, + }, + Destinations: []topology.TaskDestinationSpec{ + {ServerID: destinationPlan.TargetNode, DiskID: targetDisk}, + }, + }) + if err != nil { + return nil, fmt.Errorf("BALANCE: Failed to add pending task for volume %d: %v", selectedVolume.VolumeID, err) + } + + glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", + taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) } else { glog.Warningf("No ActiveTopology available for destination planning in balance detection") return nil, nil diff --git a/weed/worker/tasks/base/volume_utils.go b/weed/worker/tasks/base/volume_utils.go new file mode 100644 index 000000000..2aaf795b2 --- /dev/null +++ b/weed/worker/tasks/base/volume_utils.go @@ -0,0 +1,36 @@ +package base + +import ( + "github.com/seaweedfs/seaweedfs/weed/admin/topology" +) + +// FindVolumeDisk finds the disk ID where a specific volume is located on a given server. +// Returns the disk ID and a boolean indicating whether the volume was found. +// Uses O(1) indexed lookup for optimal performance on large clusters. +// +// This is a shared utility function used by multiple task detection algorithms +// (balance, vacuum, etc.) to locate volumes efficiently. +// +// Example usage: +// +// // In balance task: find source disk for a volume that needs to be moved +// sourceDisk, found := base.FindVolumeDisk(topology, volumeID, collection, sourceServer) +// +// // In vacuum task: find disk containing volume that needs cleanup +// diskID, exists := base.FindVolumeDisk(topology, volumeID, collection, serverID) +func FindVolumeDisk(activeTopology *topology.ActiveTopology, volumeID uint32, collection string, serverID string) (uint32, bool) { + if activeTopology == nil { + return 0, false + } + + // Use the new O(1) indexed lookup for better performance + locations := activeTopology.GetVolumeLocations(volumeID, collection) + for _, loc := range locations { + if loc.ServerID == serverID { + return loc.DiskID, true + } + } + + // Volume not found on this server + return 0, false +} diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index 9cf87cdf6..ec632436f 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -61,7 +61,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Check quiet duration and fullness criteria if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { + // Generate task ID for ActiveTopology integration + taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix()) + result := &types.TaskDetectionResult{ + TaskID: taskID, // Link to ActiveTopology pending task TaskType: types.TaskTypeErasureCoding, VolumeID: metric.VolumeID, Server: metric.Server, @@ -81,12 +85,117 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI continue // Skip this volume if destination planning fails } - // Find all volume replicas from topology - replicas := findVolumeReplicas(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) + // Calculate expected shard size for EC operation + // Each data shard will be approximately volumeSize / dataShards + expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount) + + // Add pending EC shard task to ActiveTopology for capacity management + + // Extract shard destinations from multiPlan + var shardDestinations []string + var shardDiskIDs []uint32 + for _, plan := range multiPlan.Plans { + shardDestinations = append(shardDestinations, plan.TargetNode) + shardDiskIDs = append(shardDiskIDs, plan.TargetDisk) + } + + // Find all volume replica locations (server + disk) from topology + replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) + if len(replicaLocations) == 0 { + glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID) + continue + } + + // Find existing EC shards from previous failed attempts + existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) + + // Combine volume replicas and existing EC shards for cleanup + var allSourceLocations []topology.TaskSourceLocation + + // Add volume replicas (will free volume slots) + for _, replica := range replicaLocations { + allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{ + ServerID: replica.ServerID, + DiskID: replica.DiskID, + CleanupType: topology.CleanupVolumeReplica, + }) + } + + // Add existing EC shards (will free shard slots) + duplicateCheck := make(map[string]bool) + for _, replica := range replicaLocations { + key := fmt.Sprintf("%s:%d", replica.ServerID, replica.DiskID) + duplicateCheck[key] = true + } + + for _, shard := range existingECShards { + key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID) + if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas + allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{ + ServerID: shard.ServerID, + DiskID: shard.DiskID, + CleanupType: topology.CleanupECShards, + }) + duplicateCheck[key] = true + } + } + + glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)", + len(replicaLocations), len(existingECShards), metric.VolumeID, len(allSourceLocations)) + + // Convert TaskSourceLocation to TaskSourceSpec + sources := make([]topology.TaskSourceSpec, len(allSourceLocations)) + for i, srcLoc := range allSourceLocations { + sources[i] = topology.TaskSourceSpec{ + ServerID: srcLoc.ServerID, + DiskID: srcLoc.DiskID, + CleanupType: srcLoc.CleanupType, + } + } + + // Convert shard destinations to TaskDestinationSpec + destinations := make([]topology.TaskDestinationSpec, len(shardDestinations)) + shardImpact := topology.CalculateECShardStorageImpact(1, int64(expectedShardSize)) // 1 shard per destination + shardSize := int64(expectedShardSize) + for i, dest := range shardDestinations { + destinations[i] = topology.TaskDestinationSpec{ + ServerID: dest, + DiskID: shardDiskIDs[i], + StorageImpact: &shardImpact, + EstimatedSize: &shardSize, + } + } + + err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ + TaskID: taskID, + TaskType: topology.TaskTypeErasureCoding, + VolumeID: metric.VolumeID, + VolumeSize: int64(metric.Size), + Sources: sources, + Destinations: destinations, + }) + if err != nil { + glog.Warningf("Failed to add pending EC shard task to ActiveTopology for volume %d: %v", metric.VolumeID, err) + continue // Skip this volume if topology task addition fails + } + + glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations", + taskID, metric.VolumeID, len(allSourceLocations), len(multiPlan.Plans)) + + // Find all volume replicas from topology (for legacy worker compatibility) + var replicas []string + serverSet := make(map[string]struct{}) + for _, loc := range replicaLocations { + if _, found := serverSet[loc.ServerID]; !found { + replicas = append(replicas, loc.ServerID) + serverSet[loc.ServerID] = struct{}{} + } + } glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas) // Create typed parameters with EC destination information and replicas result.TypedParams = &worker_pb.TaskParams{ + TaskId: taskID, // Link to ActiveTopology pending task VolumeId: metric.VolumeID, Server: metric.Server, Collection: metric.Collection, @@ -143,6 +252,9 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // planECDestinations plans the destinations for erasure coding operation // This function implements EC destination planning logic directly in the detection phase func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) { + // Calculate expected shard size for EC operation + expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount) + // Get source node information from topology var sourceRack, sourceDC string @@ -168,10 +280,12 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V } } - // Get available disks for EC placement (include source node for EC) - availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeErasureCoding, "") + // Get available disks for EC placement with effective capacity consideration (includes pending tasks) + // For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1 + // For EC, we need at least 1 available volume slot on a disk to consider it for placement. + availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, metric.Server, 1) if len(availableDisks) < erasure_coding.MinTotalDisks { - return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", erasure_coding.MinTotalDisks, len(availableDisks)) + return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks)) } // Select best disks for EC placement with rack/DC diversity @@ -190,7 +304,7 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V TargetDisk: disk.DiskID, TargetRack: disk.Rack, TargetDC: disk.DataCenter, - ExpectedSize: 0, // EC shards don't have predetermined size + ExpectedSize: expectedShardSize, // Set calculated EC shard size PlacementScore: calculateECScore(disk, sourceRack, sourceDC), Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC), } @@ -202,6 +316,22 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V dcCount[disk.DataCenter]++ } + // Log capacity utilization information using ActiveTopology's encapsulated logic + totalEffectiveCapacity := int64(0) + for _, plan := range plans { + effectiveCapacity := activeTopology.GetEffectiveAvailableCapacity(plan.TargetNode, plan.TargetDisk) + totalEffectiveCapacity += effectiveCapacity + } + + glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs, total effective capacity=%d slots", + metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount), totalEffectiveCapacity) + + // Log storage impact for EC task (source only - EC has multiple targets handled individually) + sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, int64(metric.Size)) + glog.V(2).Infof("EC task capacity management: source_reserves_with_zero_impact={VolumeSlots:%d, ShardSlots:%d}, %d_targets_will_receive_shards, estimated_size=%d", + sourceChange.VolumeSlots, sourceChange.ShardSlots, len(plans), metric.Size) + glog.V(2).Infof("EC source reserves capacity but with zero StorageSlotChange impact") + return &topology.MultiDestinationPlan{ Plans: plans, TotalShards: len(plans), @@ -354,13 +484,8 @@ func isDiskSuitableForEC(disk *topology.DiskInfo) bool { return false } - // Check if disk has capacity - if disk.DiskInfo.VolumeCount >= disk.DiskInfo.MaxVolumeCount { - return false - } - - // Check if disk is not overloaded - if disk.LoadCount > 10 { // Arbitrary threshold + // Check if disk is not overloaded with tasks + if disk.LoadCount > topology.MaxTaskLoadForECPlacement { return false } @@ -380,6 +505,24 @@ func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC str return conflicts } +// findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume +// Uses O(1) indexed lookup for optimal performance on large clusters. +func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica { + if activeTopology == nil { + return nil + } + return activeTopology.GetVolumeLocations(volumeID, collection) +} + +// findExistingECShards finds existing EC shards for a volume (from previous failed EC attempts) +// Uses O(1) indexed lookup for optimal performance on large clusters. +func findExistingECShards(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica { + if activeTopology == nil { + return nil + } + return activeTopology.GetECShardLocations(volumeID, collection) +} + // findVolumeReplicas finds all servers that have replicas of the specified volume func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string { if activeTopology == nil { diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go index 23f82ad34..0c14bb956 100644 --- a/weed/worker/tasks/vacuum/detection.go +++ b/weed/worker/tasks/vacuum/detection.go @@ -1,6 +1,7 @@ package vacuum import ( + "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -31,7 +32,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI priority = types.TaskPriorityHigh } + // Generate task ID for future ActiveTopology integration + taskID := fmt.Sprintf("vacuum_vol_%d_%d", metric.VolumeID, time.Now().Unix()) + result := &types.TaskDetectionResult{ + TaskID: taskID, // For future ActiveTopology integration TaskType: types.TaskTypeVacuum, VolumeID: metric.VolumeID, Server: metric.Server, @@ -96,6 +101,7 @@ func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.Volum // Create typed protobuf parameters return &worker_pb.TaskParams{ + TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated) VolumeId: task.VolumeID, Server: task.Server, Collection: task.Collection, diff --git a/weed/worker/types/task_types.go b/weed/worker/types/task_types.go index ed7fc8f07..d5dbc4008 100644 --- a/weed/worker/types/task_types.go +++ b/weed/worker/types/task_types.go @@ -73,6 +73,7 @@ type TaskParams struct { // TaskDetectionResult represents the result of scanning for maintenance needs type TaskDetectionResult struct { + TaskID string `json:"task_id"` // ActiveTopology task ID for lifecycle management TaskType TaskType `json:"task_type"` VolumeID uint32 `json:"volume_id,omitempty"` Server string `json:"server,omitempty"`