You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
348 lines
7.8 KiB
348 lines
7.8 KiB
package worker
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
)
|
|
|
|
// Registry manages workers and their statistics
|
|
type Registry struct {
|
|
workers map[string]*types.Worker
|
|
stats *types.RegistryStats
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewRegistry creates a new worker registry
|
|
func NewRegistry() *Registry {
|
|
return &Registry{
|
|
workers: make(map[string]*types.Worker),
|
|
stats: &types.RegistryStats{
|
|
TotalWorkers: 0,
|
|
ActiveWorkers: 0,
|
|
BusyWorkers: 0,
|
|
IdleWorkers: 0,
|
|
TotalTasks: 0,
|
|
CompletedTasks: 0,
|
|
FailedTasks: 0,
|
|
StartTime: time.Now(),
|
|
},
|
|
}
|
|
}
|
|
|
|
// RegisterWorker registers a new worker
|
|
func (r *Registry) RegisterWorker(worker *types.Worker) error {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
|
|
if _, exists := r.workers[worker.ID]; exists {
|
|
return fmt.Errorf("worker %s already registered", worker.ID)
|
|
}
|
|
|
|
r.workers[worker.ID] = worker
|
|
r.updateStats()
|
|
return nil
|
|
}
|
|
|
|
// UnregisterWorker removes a worker from the registry
|
|
func (r *Registry) UnregisterWorker(workerID string) error {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
|
|
if _, exists := r.workers[workerID]; !exists {
|
|
return fmt.Errorf("worker %s not found", workerID)
|
|
}
|
|
|
|
delete(r.workers, workerID)
|
|
r.updateStats()
|
|
return nil
|
|
}
|
|
|
|
// GetWorker returns a worker by ID
|
|
func (r *Registry) GetWorker(workerID string) (*types.Worker, bool) {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
worker, exists := r.workers[workerID]
|
|
return worker, exists
|
|
}
|
|
|
|
// ListWorkers returns all registered workers
|
|
func (r *Registry) ListWorkers() []*types.Worker {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
workers := make([]*types.Worker, 0, len(r.workers))
|
|
for _, worker := range r.workers {
|
|
workers = append(workers, worker)
|
|
}
|
|
return workers
|
|
}
|
|
|
|
// GetWorkersByCapability returns workers that support a specific capability
|
|
func (r *Registry) GetWorkersByCapability(capability types.TaskType) []*types.Worker {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
var workers []*types.Worker
|
|
for _, worker := range r.workers {
|
|
for _, cap := range worker.Capabilities {
|
|
if cap == capability {
|
|
workers = append(workers, worker)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return workers
|
|
}
|
|
|
|
// GetAvailableWorkers returns workers that are available for new tasks
|
|
func (r *Registry) GetAvailableWorkers() []*types.Worker {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
var workers []*types.Worker
|
|
for _, worker := range r.workers {
|
|
if worker.Status == "active" && worker.CurrentLoad < worker.MaxConcurrent {
|
|
workers = append(workers, worker)
|
|
}
|
|
}
|
|
return workers
|
|
}
|
|
|
|
// GetBestWorkerForTask returns the best worker for a specific task
|
|
func (r *Registry) GetBestWorkerForTask(taskType types.TaskType) *types.Worker {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
var bestWorker *types.Worker
|
|
var bestScore float64
|
|
|
|
for _, worker := range r.workers {
|
|
// Check if worker supports this task type
|
|
supportsTask := false
|
|
for _, cap := range worker.Capabilities {
|
|
if cap == taskType {
|
|
supportsTask = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !supportsTask {
|
|
continue
|
|
}
|
|
|
|
// Check if worker is available
|
|
if worker.Status != "active" || worker.CurrentLoad >= worker.MaxConcurrent {
|
|
continue
|
|
}
|
|
|
|
// Calculate score based on current load and capacity
|
|
score := float64(worker.MaxConcurrent-worker.CurrentLoad) / float64(worker.MaxConcurrent)
|
|
if bestWorker == nil || score > bestScore {
|
|
bestWorker = worker
|
|
bestScore = score
|
|
}
|
|
}
|
|
|
|
return bestWorker
|
|
}
|
|
|
|
// UpdateWorkerHeartbeat updates the last heartbeat time for a worker
|
|
func (r *Registry) UpdateWorkerHeartbeat(workerID string) error {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
|
|
worker, exists := r.workers[workerID]
|
|
if !exists {
|
|
return fmt.Errorf("worker %s not found", workerID)
|
|
}
|
|
|
|
worker.LastHeartbeat = time.Now()
|
|
return nil
|
|
}
|
|
|
|
// UpdateWorkerLoad updates the current load for a worker
|
|
func (r *Registry) UpdateWorkerLoad(workerID string, load int) error {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
|
|
worker, exists := r.workers[workerID]
|
|
if !exists {
|
|
return fmt.Errorf("worker %s not found", workerID)
|
|
}
|
|
|
|
worker.CurrentLoad = load
|
|
if load >= worker.MaxConcurrent {
|
|
worker.Status = "busy"
|
|
} else {
|
|
worker.Status = "active"
|
|
}
|
|
|
|
r.updateStats()
|
|
return nil
|
|
}
|
|
|
|
// UpdateWorkerStatus updates the status of a worker
|
|
func (r *Registry) UpdateWorkerStatus(workerID string, status string) error {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
|
|
worker, exists := r.workers[workerID]
|
|
if !exists {
|
|
return fmt.Errorf("worker %s not found", workerID)
|
|
}
|
|
|
|
worker.Status = status
|
|
r.updateStats()
|
|
return nil
|
|
}
|
|
|
|
// CleanupStaleWorkers removes workers that haven't sent heartbeats recently
|
|
func (r *Registry) CleanupStaleWorkers(timeout time.Duration) int {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
|
|
var removedCount int
|
|
cutoff := time.Now().Add(-timeout)
|
|
|
|
for workerID, worker := range r.workers {
|
|
if worker.LastHeartbeat.Before(cutoff) {
|
|
delete(r.workers, workerID)
|
|
removedCount++
|
|
}
|
|
}
|
|
|
|
if removedCount > 0 {
|
|
r.updateStats()
|
|
}
|
|
|
|
return removedCount
|
|
}
|
|
|
|
// GetStats returns current registry statistics
|
|
func (r *Registry) GetStats() *types.RegistryStats {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
// Create a copy of the stats to avoid race conditions
|
|
stats := *r.stats
|
|
return &stats
|
|
}
|
|
|
|
// updateStats updates the registry statistics (must be called with lock held)
|
|
func (r *Registry) updateStats() {
|
|
r.stats.TotalWorkers = len(r.workers)
|
|
r.stats.ActiveWorkers = 0
|
|
r.stats.BusyWorkers = 0
|
|
r.stats.IdleWorkers = 0
|
|
|
|
for _, worker := range r.workers {
|
|
switch worker.Status {
|
|
case "active":
|
|
if worker.CurrentLoad > 0 {
|
|
r.stats.ActiveWorkers++
|
|
} else {
|
|
r.stats.IdleWorkers++
|
|
}
|
|
case "busy":
|
|
r.stats.BusyWorkers++
|
|
}
|
|
}
|
|
|
|
r.stats.Uptime = time.Since(r.stats.StartTime)
|
|
r.stats.LastUpdated = time.Now()
|
|
}
|
|
|
|
// GetTaskCapabilities returns all task capabilities available in the registry
|
|
func (r *Registry) GetTaskCapabilities() []types.TaskType {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
capabilitySet := make(map[types.TaskType]bool)
|
|
for _, worker := range r.workers {
|
|
for _, cap := range worker.Capabilities {
|
|
capabilitySet[cap] = true
|
|
}
|
|
}
|
|
|
|
var capabilities []types.TaskType
|
|
for cap := range capabilitySet {
|
|
capabilities = append(capabilities, cap)
|
|
}
|
|
|
|
return capabilities
|
|
}
|
|
|
|
// GetWorkersByStatus returns workers filtered by status
|
|
func (r *Registry) GetWorkersByStatus(status string) []*types.Worker {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
var workers []*types.Worker
|
|
for _, worker := range r.workers {
|
|
if worker.Status == status {
|
|
workers = append(workers, worker)
|
|
}
|
|
}
|
|
return workers
|
|
}
|
|
|
|
// GetWorkerCount returns the total number of registered workers
|
|
func (r *Registry) GetWorkerCount() int {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
return len(r.workers)
|
|
}
|
|
|
|
// GetWorkerIDs returns all worker IDs
|
|
func (r *Registry) GetWorkerIDs() []string {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
ids := make([]string, 0, len(r.workers))
|
|
for id := range r.workers {
|
|
ids = append(ids, id)
|
|
}
|
|
return ids
|
|
}
|
|
|
|
// GetWorkerSummary returns a summary of all workers
|
|
func (r *Registry) GetWorkerSummary() *types.WorkerSummary {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
summary := &types.WorkerSummary{
|
|
TotalWorkers: len(r.workers),
|
|
ByStatus: make(map[string]int),
|
|
ByCapability: make(map[types.TaskType]int),
|
|
TotalLoad: 0,
|
|
MaxCapacity: 0,
|
|
}
|
|
|
|
for _, worker := range r.workers {
|
|
summary.ByStatus[worker.Status]++
|
|
summary.TotalLoad += worker.CurrentLoad
|
|
summary.MaxCapacity += worker.MaxConcurrent
|
|
|
|
for _, cap := range worker.Capabilities {
|
|
summary.ByCapability[cap]++
|
|
}
|
|
}
|
|
|
|
return summary
|
|
}
|
|
|
|
// Default global registry instance
|
|
var defaultRegistry *Registry
|
|
var registryOnce sync.Once
|
|
|
|
// GetDefaultRegistry returns the default global registry
|
|
func GetDefaultRegistry() *Registry {
|
|
registryOnce.Do(func() {
|
|
defaultRegistry = NewRegistry()
|
|
})
|
|
return defaultRegistry
|
|
}
|