You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							348 lines
						
					
					
						
							7.8 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							348 lines
						
					
					
						
							7.8 KiB
						
					
					
				| package worker | |
| 
 | |
| import ( | |
| 	"fmt" | |
| 	"sync" | |
| 	"time" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/worker/types" | |
| ) | |
| 
 | |
| // Registry manages workers and their statistics | |
| type Registry struct { | |
| 	workers map[string]*types.WorkerData | |
| 	stats   *types.RegistryStats | |
| 	mutex   sync.RWMutex | |
| } | |
| 
 | |
| // NewRegistry creates a new worker registry | |
| func NewRegistry() *Registry { | |
| 	return &Registry{ | |
| 		workers: make(map[string]*types.WorkerData), | |
| 		stats: &types.RegistryStats{ | |
| 			TotalWorkers:   0, | |
| 			ActiveWorkers:  0, | |
| 			BusyWorkers:    0, | |
| 			IdleWorkers:    0, | |
| 			TotalTasks:     0, | |
| 			CompletedTasks: 0, | |
| 			FailedTasks:    0, | |
| 			StartTime:      time.Now(), | |
| 		}, | |
| 	} | |
| } | |
| 
 | |
| // RegisterWorker registers a new worker | |
| func (r *Registry) RegisterWorker(worker *types.WorkerData) error { | |
| 	r.mutex.Lock() | |
| 	defer r.mutex.Unlock() | |
| 
 | |
| 	if _, exists := r.workers[worker.ID]; exists { | |
| 		return fmt.Errorf("worker %s already registered", worker.ID) | |
| 	} | |
| 
 | |
| 	r.workers[worker.ID] = worker | |
| 	r.updateStats() | |
| 	return nil | |
| } | |
| 
 | |
| // UnregisterWorker removes a worker from the registry | |
| func (r *Registry) UnregisterWorker(workerID string) error { | |
| 	r.mutex.Lock() | |
| 	defer r.mutex.Unlock() | |
| 
 | |
| 	if _, exists := r.workers[workerID]; !exists { | |
| 		return fmt.Errorf("worker %s not found", workerID) | |
| 	} | |
| 
 | |
| 	delete(r.workers, workerID) | |
| 	r.updateStats() | |
| 	return nil | |
| } | |
| 
 | |
| // GetWorker returns a worker by ID | |
| func (r *Registry) GetWorker(workerID string) (*types.WorkerData, bool) { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 
 | |
| 	worker, exists := r.workers[workerID] | |
| 	return worker, exists | |
| } | |
| 
 | |
| // ListWorkers returns all registered workers | |
| func (r *Registry) ListWorkers() []*types.WorkerData { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 
 | |
| 	workers := make([]*types.WorkerData, 0, len(r.workers)) | |
| 	for _, worker := range r.workers { | |
| 		workers = append(workers, worker) | |
| 	} | |
| 	return workers | |
| } | |
| 
 | |
| // GetWorkersByCapability returns workers that support a specific capability | |
| func (r *Registry) GetWorkersByCapability(capability types.TaskType) []*types.WorkerData { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 
 | |
| 	var workers []*types.WorkerData | |
| 	for _, worker := range r.workers { | |
| 		for _, cap := range worker.Capabilities { | |
| 			if cap == capability { | |
| 				workers = append(workers, worker) | |
| 				break | |
| 			} | |
| 		} | |
| 	} | |
| 	return workers | |
| } | |
| 
 | |
| // GetAvailableWorkers returns workers that are available for new tasks | |
| func (r *Registry) GetAvailableWorkers() []*types.WorkerData { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 
 | |
| 	var workers []*types.WorkerData | |
| 	for _, worker := range r.workers { | |
| 		if worker.Status == "active" && worker.CurrentLoad < worker.MaxConcurrent { | |
| 			workers = append(workers, worker) | |
| 		} | |
| 	} | |
| 	return workers | |
| } | |
| 
 | |
| // GetBestWorkerForTask returns the best worker for a specific task | |
| func (r *Registry) GetBestWorkerForTask(taskType types.TaskType) *types.WorkerData { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 
 | |
| 	var bestWorker *types.WorkerData | |
| 	var bestScore float64 | |
| 
 | |
| 	for _, worker := range r.workers { | |
| 		// Check if worker supports this task type | |
| 		supportsTask := false | |
| 		for _, cap := range worker.Capabilities { | |
| 			if cap == taskType { | |
| 				supportsTask = true | |
| 				break | |
| 			} | |
| 		} | |
| 
 | |
| 		if !supportsTask { | |
| 			continue | |
| 		} | |
| 
 | |
| 		// Check if worker is available | |
| 		if worker.Status != "active" || worker.CurrentLoad >= worker.MaxConcurrent { | |
| 			continue | |
| 		} | |
| 
 | |
| 		// Calculate score based on current load and capacity | |
| 		score := float64(worker.MaxConcurrent-worker.CurrentLoad) / float64(worker.MaxConcurrent) | |
| 		if bestWorker == nil || score > bestScore { | |
| 			bestWorker = worker | |
| 			bestScore = score | |
| 		} | |
| 	} | |
| 
 | |
| 	return bestWorker | |
| } | |
| 
 | |
| // UpdateWorkerHeartbeat updates the last heartbeat time for a worker | |
| func (r *Registry) UpdateWorkerHeartbeat(workerID string) error { | |
| 	r.mutex.Lock() | |
| 	defer r.mutex.Unlock() | |
| 
 | |
| 	worker, exists := r.workers[workerID] | |
| 	if !exists { | |
| 		return fmt.Errorf("worker %s not found", workerID) | |
| 	} | |
| 
 | |
| 	worker.LastHeartbeat = time.Now() | |
| 	return nil | |
| } | |
| 
 | |
| // UpdateWorkerLoad updates the current load for a worker | |
| func (r *Registry) UpdateWorkerLoad(workerID string, load int) error { | |
| 	r.mutex.Lock() | |
| 	defer r.mutex.Unlock() | |
| 
 | |
| 	worker, exists := r.workers[workerID] | |
| 	if !exists { | |
| 		return fmt.Errorf("worker %s not found", workerID) | |
| 	} | |
| 
 | |
| 	worker.CurrentLoad = load | |
| 	if load >= worker.MaxConcurrent { | |
| 		worker.Status = "busy" | |
| 	} else { | |
| 		worker.Status = "active" | |
| 	} | |
| 
 | |
| 	r.updateStats() | |
| 	return nil | |
| } | |
| 
 | |
| // UpdateWorkerStatus updates the status of a worker | |
| func (r *Registry) UpdateWorkerStatus(workerID string, status string) error { | |
| 	r.mutex.Lock() | |
| 	defer r.mutex.Unlock() | |
| 
 | |
| 	worker, exists := r.workers[workerID] | |
| 	if !exists { | |
| 		return fmt.Errorf("worker %s not found", workerID) | |
| 	} | |
| 
 | |
| 	worker.Status = status | |
| 	r.updateStats() | |
| 	return nil | |
| } | |
| 
 | |
| // CleanupStaleWorkers removes workers that haven't sent heartbeats recently | |
| func (r *Registry) CleanupStaleWorkers(timeout time.Duration) int { | |
| 	r.mutex.Lock() | |
| 	defer r.mutex.Unlock() | |
| 
 | |
| 	var removedCount int | |
| 	cutoff := time.Now().Add(-timeout) | |
| 
 | |
| 	for workerID, worker := range r.workers { | |
| 		if worker.LastHeartbeat.Before(cutoff) { | |
| 			delete(r.workers, workerID) | |
| 			removedCount++ | |
| 		} | |
| 	} | |
| 
 | |
| 	if removedCount > 0 { | |
| 		r.updateStats() | |
| 	} | |
| 
 | |
| 	return removedCount | |
| } | |
| 
 | |
| // GetStats returns current registry statistics | |
| func (r *Registry) GetStats() *types.RegistryStats { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 
 | |
| 	// Create a copy of the stats to avoid race conditions | |
| 	stats := *r.stats | |
| 	return &stats | |
| } | |
| 
 | |
| // updateStats updates the registry statistics (must be called with lock held) | |
| func (r *Registry) updateStats() { | |
| 	r.stats.TotalWorkers = len(r.workers) | |
| 	r.stats.ActiveWorkers = 0 | |
| 	r.stats.BusyWorkers = 0 | |
| 	r.stats.IdleWorkers = 0 | |
| 
 | |
| 	for _, worker := range r.workers { | |
| 		switch worker.Status { | |
| 		case "active": | |
| 			if worker.CurrentLoad > 0 { | |
| 				r.stats.ActiveWorkers++ | |
| 			} else { | |
| 				r.stats.IdleWorkers++ | |
| 			} | |
| 		case "busy": | |
| 			r.stats.BusyWorkers++ | |
| 		} | |
| 	} | |
| 
 | |
| 	r.stats.Uptime = time.Since(r.stats.StartTime) | |
| 	r.stats.LastUpdated = time.Now() | |
| } | |
| 
 | |
| // GetTaskCapabilities returns all task capabilities available in the registry | |
| func (r *Registry) GetTaskCapabilities() []types.TaskType { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 
 | |
| 	capabilitySet := make(map[types.TaskType]bool) | |
| 	for _, worker := range r.workers { | |
| 		for _, cap := range worker.Capabilities { | |
| 			capabilitySet[cap] = true | |
| 		} | |
| 	} | |
| 
 | |
| 	var capabilities []types.TaskType | |
| 	for cap := range capabilitySet { | |
| 		capabilities = append(capabilities, cap) | |
| 	} | |
| 
 | |
| 	return capabilities | |
| } | |
| 
 | |
| // GetWorkersByStatus returns workers filtered by status | |
| func (r *Registry) GetWorkersByStatus(status string) []*types.WorkerData { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 
 | |
| 	var workers []*types.WorkerData | |
| 	for _, worker := range r.workers { | |
| 		if worker.Status == status { | |
| 			workers = append(workers, worker) | |
| 		} | |
| 	} | |
| 	return workers | |
| } | |
| 
 | |
| // GetWorkerCount returns the total number of registered workers | |
| func (r *Registry) GetWorkerCount() int { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 	return len(r.workers) | |
| } | |
| 
 | |
| // GetWorkerIDs returns all worker IDs | |
| func (r *Registry) GetWorkerIDs() []string { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 
 | |
| 	ids := make([]string, 0, len(r.workers)) | |
| 	for id := range r.workers { | |
| 		ids = append(ids, id) | |
| 	} | |
| 	return ids | |
| } | |
| 
 | |
| // GetWorkerSummary returns a summary of all workers | |
| func (r *Registry) GetWorkerSummary() *types.WorkerSummary { | |
| 	r.mutex.RLock() | |
| 	defer r.mutex.RUnlock() | |
| 
 | |
| 	summary := &types.WorkerSummary{ | |
| 		TotalWorkers: len(r.workers), | |
| 		ByStatus:     make(map[string]int), | |
| 		ByCapability: make(map[types.TaskType]int), | |
| 		TotalLoad:    0, | |
| 		MaxCapacity:  0, | |
| 	} | |
| 
 | |
| 	for _, worker := range r.workers { | |
| 		summary.ByStatus[worker.Status]++ | |
| 		summary.TotalLoad += worker.CurrentLoad | |
| 		summary.MaxCapacity += worker.MaxConcurrent | |
| 
 | |
| 		for _, cap := range worker.Capabilities { | |
| 			summary.ByCapability[cap]++ | |
| 		} | |
| 	} | |
| 
 | |
| 	return summary | |
| } | |
| 
 | |
| // Default global registry instance | |
| var defaultRegistry *Registry | |
| var registryOnce sync.Once | |
| 
 | |
| // GetDefaultRegistry returns the default global registry | |
| func GetDefaultRegistry() *Registry { | |
| 	registryOnce.Do(func() { | |
| 		defaultRegistry = NewRegistry() | |
| 	}) | |
| 	return defaultRegistry | |
| }
 |