You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

440 lines
12 KiB

package topology
import (
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// TaskType represents different types of maintenance operations
type TaskType string
// TaskStatus represents the current status of a task
type TaskStatus string
// Common task type constants
const (
TaskTypeVacuum TaskType = "vacuum"
TaskTypeBalance TaskType = "balance"
TaskTypeErasureCoding TaskType = "erasure_coding"
TaskTypeReplication TaskType = "replication"
)
// Common task status constants
const (
TaskStatusPending TaskStatus = "pending"
TaskStatusInProgress TaskStatus = "in_progress"
TaskStatusCompleted TaskStatus = "completed"
)
// taskState represents the current state of tasks affecting the topology (internal)
type taskState struct {
VolumeID uint32 `json:"volume_id"`
TaskType TaskType `json:"task_type"`
SourceServer string `json:"source_server"`
SourceDisk uint32 `json:"source_disk"`
TargetServer string `json:"target_server,omitempty"`
TargetDisk uint32 `json:"target_disk,omitempty"`
Status TaskStatus `json:"status"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at,omitempty"`
}
// DiskInfo represents a disk with its current state and ongoing tasks (public for external access)
type DiskInfo struct {
NodeID string `json:"node_id"`
DiskID uint32 `json:"disk_id"`
DiskType string `json:"disk_type"`
DataCenter string `json:"data_center"`
Rack string `json:"rack"`
DiskInfo *master_pb.DiskInfo `json:"disk_info"`
LoadCount int `json:"load_count"` // Number of active tasks
}
// activeDisk represents internal disk state (private)
type activeDisk struct {
*DiskInfo
pendingTasks []*taskState
assignedTasks []*taskState
recentTasks []*taskState // Completed in last N seconds
}
// activeNode represents a node with its disks (private)
type activeNode struct {
nodeID string
dataCenter string
rack string
nodeInfo *master_pb.DataNodeInfo
disks map[uint32]*activeDisk // DiskID -> activeDisk
}
// ActiveTopology provides a real-time view of cluster state with task awareness
type ActiveTopology struct {
// Core topology from master
topologyInfo *master_pb.TopologyInfo
lastUpdated time.Time
// Structured topology for easy access (private)
nodes map[string]*activeNode // NodeID -> activeNode
disks map[string]*activeDisk // "NodeID:DiskID" -> activeDisk
// Task states affecting the topology (private)
pendingTasks map[string]*taskState
assignedTasks map[string]*taskState
recentTasks map[string]*taskState
// Configuration
recentTaskWindowSeconds int
// Synchronization
mutex sync.RWMutex
}
// NewActiveTopology creates a new ActiveTopology instance
func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology {
if recentTaskWindowSeconds <= 0 {
recentTaskWindowSeconds = 10 // Default 10 seconds
}
return &ActiveTopology{
nodes: make(map[string]*activeNode),
disks: make(map[string]*activeDisk),
pendingTasks: make(map[string]*taskState),
assignedTasks: make(map[string]*taskState),
recentTasks: make(map[string]*taskState),
recentTaskWindowSeconds: recentTaskWindowSeconds,
}
}
// UpdateTopology updates the topology information from master
func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
at.mutex.Lock()
defer at.mutex.Unlock()
at.topologyInfo = topologyInfo
at.lastUpdated = time.Now()
// Rebuild structured topology
at.nodes = make(map[string]*activeNode)
at.disks = make(map[string]*activeDisk)
for _, dc := range topologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, nodeInfo := range rack.DataNodeInfos {
node := &activeNode{
nodeID: nodeInfo.Id,
dataCenter: dc.Id,
rack: rack.Id,
nodeInfo: nodeInfo,
disks: make(map[uint32]*activeDisk),
}
// Add disks for this node
for diskType, diskInfo := range nodeInfo.DiskInfos {
disk := &activeDisk{
DiskInfo: &DiskInfo{
NodeID: nodeInfo.Id,
DiskID: diskInfo.DiskId,
DiskType: diskType,
DataCenter: dc.Id,
Rack: rack.Id,
DiskInfo: diskInfo,
},
}
diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
node.disks[diskInfo.DiskId] = disk
at.disks[diskKey] = disk
}
at.nodes[nodeInfo.Id] = node
}
}
}
// Reassign task states to updated topology
at.reassignTaskStates()
glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks", len(at.nodes), len(at.disks))
return nil
}
// AddPendingTask adds a pending task to the topology
func (at *ActiveTopology) AddPendingTask(taskID string, taskType TaskType, volumeID uint32,
sourceServer string, sourceDisk uint32, targetServer string, targetDisk uint32) {
at.mutex.Lock()
defer at.mutex.Unlock()
task := &taskState{
VolumeID: volumeID,
TaskType: taskType,
SourceServer: sourceServer,
SourceDisk: sourceDisk,
TargetServer: targetServer,
TargetDisk: targetDisk,
Status: TaskStatusPending,
StartedAt: time.Now(),
}
at.pendingTasks[taskID] = task
at.assignTaskToDisk(task)
}
// AssignTask moves a task from pending to assigned
func (at *ActiveTopology) AssignTask(taskID string) error {
at.mutex.Lock()
defer at.mutex.Unlock()
task, exists := at.pendingTasks[taskID]
if !exists {
return fmt.Errorf("pending task %s not found", taskID)
}
delete(at.pendingTasks, taskID)
task.Status = TaskStatusInProgress
at.assignedTasks[taskID] = task
at.reassignTaskStates()
return nil
}
// CompleteTask moves a task from assigned to recent
func (at *ActiveTopology) CompleteTask(taskID string) error {
at.mutex.Lock()
defer at.mutex.Unlock()
task, exists := at.assignedTasks[taskID]
if !exists {
return fmt.Errorf("assigned task %s not found", taskID)
}
delete(at.assignedTasks, taskID)
task.Status = TaskStatusCompleted
task.CompletedAt = time.Now()
at.recentTasks[taskID] = task
at.reassignTaskStates()
// Clean up old recent tasks
at.cleanupRecentTasks()
return nil
}
// GetAvailableDisks returns disks that can accept new tasks of the given type
func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
if at.isDiskAvailable(disk, taskType) {
// Create a copy with current load count
diskCopy := *disk.DiskInfo
diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
available = append(available, &diskCopy)
}
}
return available
}
// GetDiskLoad returns the current load on a disk (number of active tasks)
func (at *ActiveTopology) GetDiskLoad(nodeID string, diskID uint32) int {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return 0
}
return len(disk.pendingTasks) + len(disk.assignedTasks)
}
// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
at.mutex.RLock()
defer at.mutex.RUnlock()
for _, task := range at.recentTasks {
if task.VolumeID == volumeID && task.TaskType == taskType {
return true
}
}
return false
}
// GetAllNodes returns information about all nodes (public interface)
func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
result := make(map[string]*master_pb.DataNodeInfo)
for nodeID, node := range at.nodes {
result[nodeID] = node.nodeInfo
}
return result
}
// GetTopologyInfo returns the current topology information (read-only access)
func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
return at.topologyInfo
}
// GetNodeDisks returns all disks for a specific node
func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
node, exists := at.nodes[nodeID]
if !exists {
return nil
}
var disks []*DiskInfo
for _, disk := range node.disks {
diskCopy := *disk.DiskInfo
diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
disks = append(disks, &diskCopy)
}
return disks
}
// DestinationPlan represents a planned destination for a volume/shard operation
type DestinationPlan struct {
TargetNode string `json:"target_node"`
TargetDisk uint32 `json:"target_disk"`
TargetRack string `json:"target_rack"`
TargetDC string `json:"target_dc"`
ExpectedSize uint64 `json:"expected_size"`
PlacementScore float64 `json:"placement_score"`
Conflicts []string `json:"conflicts"`
}
// MultiDestinationPlan represents multiple planned destinations for operations like EC
type MultiDestinationPlan struct {
Plans []*DestinationPlan `json:"plans"`
TotalShards int `json:"total_shards"`
SuccessfulRack int `json:"successful_racks"`
SuccessfulDCs int `json:"successful_dcs"`
}
// Private methods
// reassignTaskStates assigns tasks to the appropriate disks
func (at *ActiveTopology) reassignTaskStates() {
// Clear existing task assignments
for _, disk := range at.disks {
disk.pendingTasks = nil
disk.assignedTasks = nil
disk.recentTasks = nil
}
// Reassign pending tasks
for _, task := range at.pendingTasks {
at.assignTaskToDisk(task)
}
// Reassign assigned tasks
for _, task := range at.assignedTasks {
at.assignTaskToDisk(task)
}
// Reassign recent tasks
for _, task := range at.recentTasks {
at.assignTaskToDisk(task)
}
}
// assignTaskToDisk assigns a task to the appropriate disk(s)
func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
// Assign to source disk
sourceKey := fmt.Sprintf("%s:%d", task.SourceServer, task.SourceDisk)
if sourceDisk, exists := at.disks[sourceKey]; exists {
switch task.Status {
case TaskStatusPending:
sourceDisk.pendingTasks = append(sourceDisk.pendingTasks, task)
case TaskStatusInProgress:
sourceDisk.assignedTasks = append(sourceDisk.assignedTasks, task)
case TaskStatusCompleted:
sourceDisk.recentTasks = append(sourceDisk.recentTasks, task)
}
}
// Assign to target disk if it exists and is different from source
if task.TargetServer != "" && (task.TargetServer != task.SourceServer || task.TargetDisk != task.SourceDisk) {
targetKey := fmt.Sprintf("%s:%d", task.TargetServer, task.TargetDisk)
if targetDisk, exists := at.disks[targetKey]; exists {
switch task.Status {
case TaskStatusPending:
targetDisk.pendingTasks = append(targetDisk.pendingTasks, task)
case TaskStatusInProgress:
targetDisk.assignedTasks = append(targetDisk.assignedTasks, task)
case TaskStatusCompleted:
targetDisk.recentTasks = append(targetDisk.recentTasks, task)
}
}
}
}
// isDiskAvailable checks if a disk can accept new tasks
func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
// Check if disk has too many active tasks
activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
if activeLoad >= 2 { // Max 2 concurrent tasks per disk
return false
}
// Check for conflicting task types
for _, task := range disk.assignedTasks {
if at.areTaskTypesConflicting(task.TaskType, taskType) {
return false
}
}
return true
}
// areTaskTypesConflicting checks if two task types conflict
func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
// Examples of conflicting task types
conflictMap := map[TaskType][]TaskType{
TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding},
TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding},
TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance},
}
if conflicts, exists := conflictMap[existing]; exists {
for _, conflictType := range conflicts {
if conflictType == new {
return true
}
}
}
return false
}
// cleanupRecentTasks removes old recent tasks
func (at *ActiveTopology) cleanupRecentTasks() {
cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second)
for taskID, task := range at.recentTasks {
if task.CompletedAt.Before(cutoff) {
delete(at.recentTasks, taskID)
}
}
}