You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							259 lines
						
					
					
						
							8.8 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							259 lines
						
					
					
						
							8.8 KiB
						
					
					
				| package topology | |
| 
 | |
| import ( | |
| 	"fmt" | |
| 	"time" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| ) | |
| 
 | |
| // AssignTask moves a task from pending to assigned and reserves capacity | |
| func (at *ActiveTopology) AssignTask(taskID string) error { | |
| 	at.mutex.Lock() | |
| 	defer at.mutex.Unlock() | |
| 
 | |
| 	task, exists := at.pendingTasks[taskID] | |
| 	if !exists { | |
| 		return fmt.Errorf("pending task %s not found", taskID) | |
| 	} | |
| 
 | |
| 	// Check if all destination disks have sufficient capacity to reserve | |
| 	for _, dest := range task.Destinations { | |
| 		targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk) | |
| 		if targetDisk, exists := at.disks[targetKey]; exists { | |
| 			availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk) | |
| 
 | |
| 			// Check if we have enough total capacity using the improved unified comparison | |
| 			if !availableCapacity.CanAccommodate(dest.StorageChange) { | |
| 				return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v", | |
| 					dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange) | |
| 			} | |
| 		} else if dest.TargetServer != "" { | |
| 			// Fail fast if destination disk is not found in topology | |
| 			return fmt.Errorf("destination disk %s not found in topology", targetKey) | |
| 		} | |
| 	} | |
| 
 | |
| 	// Move task to assigned and reserve capacity | |
| 	delete(at.pendingTasks, taskID) | |
| 	task.Status = TaskStatusInProgress | |
| 	at.assignedTasks[taskID] = task | |
| 	at.reassignTaskStates() | |
| 
 | |
| 	// Log capacity reservation information for all sources and destinations | |
| 	totalSourceImpact := StorageSlotChange{} | |
| 	totalDestImpact := StorageSlotChange{} | |
| 	for _, source := range task.Sources { | |
| 		totalSourceImpact.AddInPlace(source.StorageChange) | |
| 	} | |
| 	for _, dest := range task.Destinations { | |
| 		totalDestImpact.AddInPlace(dest.StorageChange) | |
| 	} | |
| 
 | |
| 	glog.V(2).Infof("Task %s assigned and capacity reserved: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)", | |
| 		taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots, | |
| 		len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots) | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| // CompleteTask moves a task from assigned to recent and releases reserved capacity | |
| // NOTE: This only releases the reserved capacity. The actual topology update (VolumeCount changes) | |
| // should be handled by the master when it receives the task completion notification. | |
| func (at *ActiveTopology) CompleteTask(taskID string) error { | |
| 	at.mutex.Lock() | |
| 	defer at.mutex.Unlock() | |
| 
 | |
| 	task, exists := at.assignedTasks[taskID] | |
| 	if !exists { | |
| 		return fmt.Errorf("assigned task %s not found", taskID) | |
| 	} | |
| 
 | |
| 	// Release reserved capacity by moving task to completed state | |
| 	delete(at.assignedTasks, taskID) | |
| 	task.Status = TaskStatusCompleted | |
| 	task.CompletedAt = time.Now() | |
| 	at.recentTasks[taskID] = task | |
| 	at.reassignTaskStates() | |
| 
 | |
| 	// Log capacity release information for all sources and destinations | |
| 	totalSourceImpact := StorageSlotChange{} | |
| 	totalDestImpact := StorageSlotChange{} | |
| 	for _, source := range task.Sources { | |
| 		totalSourceImpact.AddInPlace(source.StorageChange) | |
| 	} | |
| 	for _, dest := range task.Destinations { | |
| 		totalDestImpact.AddInPlace(dest.StorageChange) | |
| 	} | |
| 
 | |
| 	glog.V(2).Infof("Task %s completed and capacity released: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)", | |
| 		taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots, | |
| 		len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots) | |
| 
 | |
| 	// Clean up old recent tasks | |
| 	at.cleanupRecentTasks() | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| // ApplyActualStorageChange updates the topology to reflect actual storage changes after task completion | |
| // This should be called when the master updates the topology with new VolumeCount information | |
| func (at *ActiveTopology) ApplyActualStorageChange(nodeID string, diskID uint32, volumeCountChange int64) { | |
| 	at.mutex.Lock() | |
| 	defer at.mutex.Unlock() | |
| 
 | |
| 	diskKey := fmt.Sprintf("%s:%d", nodeID, diskID) | |
| 	if disk, exists := at.disks[diskKey]; exists && disk.DiskInfo != nil && disk.DiskInfo.DiskInfo != nil { | |
| 		oldCount := disk.DiskInfo.DiskInfo.VolumeCount | |
| 		disk.DiskInfo.DiskInfo.VolumeCount += volumeCountChange | |
| 
 | |
| 		glog.V(2).Infof("Applied actual storage change on disk %s: volume_count %d -> %d (change: %+d)", | |
| 			diskKey, oldCount, disk.DiskInfo.DiskInfo.VolumeCount, volumeCountChange) | |
| 	} | |
| } | |
| 
 | |
| // AddPendingTask is the unified function that handles both simple and complex task creation | |
| func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error { | |
| 	// Validation | |
| 	if len(spec.Sources) == 0 { | |
| 		return fmt.Errorf("at least one source is required") | |
| 	} | |
| 	if len(spec.Destinations) == 0 { | |
| 		return fmt.Errorf("at least one destination is required") | |
| 	} | |
| 
 | |
| 	at.mutex.Lock() | |
| 	defer at.mutex.Unlock() | |
| 
 | |
| 	// Build sources array | |
| 	sources := make([]TaskSource, len(spec.Sources)) | |
| 	for i, sourceSpec := range spec.Sources { | |
| 		var storageImpact StorageSlotChange | |
| 		var estimatedSize int64 | |
| 
 | |
| 		if sourceSpec.StorageImpact != nil { | |
| 			// Use manually specified impact | |
| 			storageImpact = *sourceSpec.StorageImpact | |
| 		} else { | |
| 			// Auto-calculate based on task type and cleanup type | |
| 			storageImpact = at.calculateSourceStorageImpact(spec.TaskType, sourceSpec.CleanupType, spec.VolumeSize) | |
| 		} | |
| 
 | |
| 		if sourceSpec.EstimatedSize != nil { | |
| 			estimatedSize = *sourceSpec.EstimatedSize | |
| 		} else { | |
| 			estimatedSize = spec.VolumeSize // Default to volume size | |
| 		} | |
| 
 | |
| 		sources[i] = TaskSource{ | |
| 			SourceServer:  sourceSpec.ServerID, | |
| 			SourceDisk:    sourceSpec.DiskID, | |
| 			StorageChange: storageImpact, | |
| 			EstimatedSize: estimatedSize, | |
| 		} | |
| 	} | |
| 
 | |
| 	// Build destinations array | |
| 	destinations := make([]TaskDestination, len(spec.Destinations)) | |
| 	for i, destSpec := range spec.Destinations { | |
| 		var storageImpact StorageSlotChange | |
| 		var estimatedSize int64 | |
| 
 | |
| 		if destSpec.StorageImpact != nil { | |
| 			// Use manually specified impact | |
| 			storageImpact = *destSpec.StorageImpact | |
| 		} else { | |
| 			// Auto-calculate based on task type | |
| 			_, storageImpact = CalculateTaskStorageImpact(spec.TaskType, spec.VolumeSize) | |
| 		} | |
| 
 | |
| 		if destSpec.EstimatedSize != nil { | |
| 			estimatedSize = *destSpec.EstimatedSize | |
| 		} else { | |
| 			estimatedSize = spec.VolumeSize // Default to volume size | |
| 		} | |
| 
 | |
| 		destinations[i] = TaskDestination{ | |
| 			TargetServer:  destSpec.ServerID, | |
| 			TargetDisk:    destSpec.DiskID, | |
| 			StorageChange: storageImpact, | |
| 			EstimatedSize: estimatedSize, | |
| 		} | |
| 	} | |
| 
 | |
| 	// Create the task | |
| 	task := &taskState{ | |
| 		VolumeID:      spec.VolumeID, | |
| 		TaskType:      spec.TaskType, | |
| 		Status:        TaskStatusPending, | |
| 		StartedAt:     time.Now(), | |
| 		EstimatedSize: spec.VolumeSize, | |
| 		Sources:       sources, | |
| 		Destinations:  destinations, | |
| 	} | |
| 
 | |
| 	at.pendingTasks[spec.TaskID] = task | |
| 	at.assignTaskToDisk(task) | |
| 
 | |
| 	glog.V(2).Infof("Added pending %s task %s: volume %d, %d sources, %d destinations", | |
| 		spec.TaskType, spec.TaskID, spec.VolumeID, len(sources), len(destinations)) | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| // calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type | |
| func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange { | |
| 	switch taskType { | |
| 	case TaskTypeErasureCoding: | |
| 		switch cleanupType { | |
| 		case CleanupVolumeReplica: | |
| 			impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize) | |
| 			return impact | |
| 		case CleanupECShards: | |
| 			return CalculateECShardCleanupImpact(volumeSize) | |
| 		default: | |
| 			impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize) | |
| 			return impact | |
| 		} | |
| 	default: | |
| 		impact, _ := CalculateTaskStorageImpact(taskType, volumeSize) | |
| 		return impact | |
| 	} | |
| } | |
| 
 | |
| // SourceCleanupType indicates what type of data needs to be cleaned up from a source | |
| type SourceCleanupType int | |
| 
 | |
| const ( | |
| 	CleanupVolumeReplica SourceCleanupType = iota // Clean up volume replica (frees volume slots) | |
| 	CleanupECShards                               // Clean up existing EC shards (frees shard slots) | |
| ) | |
| 
 | |
| // TaskSourceSpec represents a source specification for task creation | |
| type TaskSourceSpec struct { | |
| 	ServerID      string | |
| 	DiskID        uint32 | |
| 	DataCenter    string             // Data center of the source server | |
| 	Rack          string             // Rack of the source server | |
| 	CleanupType   SourceCleanupType  // For EC: volume replica vs existing shards | |
| 	StorageImpact *StorageSlotChange // Optional: manual override | |
| 	EstimatedSize *int64             // Optional: manual override | |
| } | |
| 
 | |
| // TaskDestinationSpec represents a destination specification for task creation | |
| type TaskDestinationSpec struct { | |
| 	ServerID      string | |
| 	DiskID        uint32 | |
| 	StorageImpact *StorageSlotChange // Optional: manual override | |
| 	EstimatedSize *int64             // Optional: manual override | |
| } | |
| 
 | |
| // TaskSpec represents a complete task specification | |
| type TaskSpec struct { | |
| 	TaskID       string | |
| 	TaskType     TaskType | |
| 	VolumeID     uint32 | |
| 	VolumeSize   int64                 // Used for auto-calculation when manual impacts not provided | |
| 	Sources      []TaskSourceSpec      // Can be single or multiple | |
| 	Destinations []TaskDestinationSpec // Can be single or multiple | |
| }
 |