You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
413 lines
13 KiB
413 lines
13 KiB
package maintenance
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
|
|
// Import task packages to trigger their auto-registration
|
|
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
|
|
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
|
|
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
|
|
)
|
|
|
|
// MaintenanceWorkerService manages maintenance task execution
|
|
// TaskExecutor defines the function signature for task execution
|
|
type TaskExecutor func(*MaintenanceWorkerService, *MaintenanceTask) error
|
|
|
|
// TaskExecutorFactory creates a task executor for a given worker service
|
|
type TaskExecutorFactory func() TaskExecutor
|
|
|
|
// Global registry for task executor factories
|
|
var taskExecutorFactories = make(map[MaintenanceTaskType]TaskExecutorFactory)
|
|
var executorRegistryMutex sync.RWMutex
|
|
var executorRegistryInitOnce sync.Once
|
|
|
|
// initializeExecutorFactories dynamically registers executor factories for all auto-registered task types
|
|
func initializeExecutorFactories() {
|
|
executorRegistryInitOnce.Do(func() {
|
|
// Get all registered task types from the global registry
|
|
typesRegistry := tasks.GetGlobalTypesRegistry()
|
|
|
|
var taskTypes []MaintenanceTaskType
|
|
for workerTaskType := range typesRegistry.GetAllDetectors() {
|
|
// Convert types.TaskType to MaintenanceTaskType by string conversion
|
|
maintenanceTaskType := MaintenanceTaskType(string(workerTaskType))
|
|
taskTypes = append(taskTypes, maintenanceTaskType)
|
|
}
|
|
|
|
// Register generic executor for all task types
|
|
for _, taskType := range taskTypes {
|
|
RegisterTaskExecutorFactory(taskType, createGenericTaskExecutor)
|
|
}
|
|
|
|
glog.V(1).Infof("Dynamically registered generic task executor for %d task types: %v", len(taskTypes), taskTypes)
|
|
})
|
|
}
|
|
|
|
// RegisterTaskExecutorFactory registers a factory function for creating task executors
|
|
func RegisterTaskExecutorFactory(taskType MaintenanceTaskType, factory TaskExecutorFactory) {
|
|
executorRegistryMutex.Lock()
|
|
defer executorRegistryMutex.Unlock()
|
|
taskExecutorFactories[taskType] = factory
|
|
glog.V(2).Infof("Registered executor factory for task type: %s", taskType)
|
|
}
|
|
|
|
// GetTaskExecutorFactory returns the factory for a task type
|
|
func GetTaskExecutorFactory(taskType MaintenanceTaskType) (TaskExecutorFactory, bool) {
|
|
// Ensure executor factories are initialized
|
|
initializeExecutorFactories()
|
|
|
|
executorRegistryMutex.RLock()
|
|
defer executorRegistryMutex.RUnlock()
|
|
factory, exists := taskExecutorFactories[taskType]
|
|
return factory, exists
|
|
}
|
|
|
|
// GetSupportedExecutorTaskTypes returns all task types with registered executor factories
|
|
func GetSupportedExecutorTaskTypes() []MaintenanceTaskType {
|
|
// Ensure executor factories are initialized
|
|
initializeExecutorFactories()
|
|
|
|
executorRegistryMutex.RLock()
|
|
defer executorRegistryMutex.RUnlock()
|
|
|
|
taskTypes := make([]MaintenanceTaskType, 0, len(taskExecutorFactories))
|
|
for taskType := range taskExecutorFactories {
|
|
taskTypes = append(taskTypes, taskType)
|
|
}
|
|
return taskTypes
|
|
}
|
|
|
|
// createGenericTaskExecutor creates a generic task executor that uses the task registry
|
|
func createGenericTaskExecutor() TaskExecutor {
|
|
return func(mws *MaintenanceWorkerService, task *MaintenanceTask) error {
|
|
return mws.executeGenericTask(task)
|
|
}
|
|
}
|
|
|
|
// init does minimal initialization - actual registration happens lazily
|
|
func init() {
|
|
// Executor factory registration will happen lazily when first accessed
|
|
glog.V(1).Infof("Maintenance worker initialized - executor factories will be registered on first access")
|
|
}
|
|
|
|
type MaintenanceWorkerService struct {
|
|
workerID string
|
|
address string
|
|
adminServer string
|
|
capabilities []MaintenanceTaskType
|
|
maxConcurrent int
|
|
currentTasks map[string]*MaintenanceTask
|
|
queue *MaintenanceQueue
|
|
adminClient AdminClient
|
|
running bool
|
|
stopChan chan struct{}
|
|
|
|
// Task execution registry
|
|
taskExecutors map[MaintenanceTaskType]TaskExecutor
|
|
|
|
// Task registry for creating task instances
|
|
taskRegistry *tasks.TaskRegistry
|
|
}
|
|
|
|
// NewMaintenanceWorkerService creates a new maintenance worker service
|
|
func NewMaintenanceWorkerService(workerID, address, adminServer string) *MaintenanceWorkerService {
|
|
// Get all registered maintenance task types dynamically
|
|
capabilities := GetRegisteredMaintenanceTaskTypes()
|
|
|
|
worker := &MaintenanceWorkerService{
|
|
workerID: workerID,
|
|
address: address,
|
|
adminServer: adminServer,
|
|
capabilities: capabilities,
|
|
maxConcurrent: 2, // Default concurrent task limit
|
|
currentTasks: make(map[string]*MaintenanceTask),
|
|
stopChan: make(chan struct{}),
|
|
taskExecutors: make(map[MaintenanceTaskType]TaskExecutor),
|
|
taskRegistry: tasks.GetGlobalRegistry(), // Use global registry with auto-registered tasks
|
|
}
|
|
|
|
// Initialize task executor registry
|
|
worker.initializeTaskExecutors()
|
|
|
|
glog.V(1).Infof("Created maintenance worker with %d registered task types", len(worker.taskRegistry.GetSupportedTypes()))
|
|
|
|
return worker
|
|
}
|
|
|
|
// executeGenericTask executes a task using the task registry instead of hardcoded methods
|
|
func (mws *MaintenanceWorkerService) executeGenericTask(task *MaintenanceTask) error {
|
|
glog.V(2).Infof("Executing generic task %s: %s for volume %d", task.ID, task.Type, task.VolumeID)
|
|
|
|
// Convert MaintenanceTask to types.TaskType
|
|
taskType := types.TaskType(string(task.Type))
|
|
|
|
// Create task parameters
|
|
taskParams := types.TaskParams{
|
|
VolumeID: task.VolumeID,
|
|
Server: task.Server,
|
|
Collection: task.Collection,
|
|
Parameters: task.Parameters,
|
|
}
|
|
|
|
// Create task instance using the registry
|
|
taskInstance, err := mws.taskRegistry.CreateTask(taskType, taskParams)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create task instance: %v", err)
|
|
}
|
|
|
|
// Update progress to show task has started
|
|
mws.updateTaskProgress(task.ID, 5)
|
|
|
|
// Execute the task
|
|
err = taskInstance.Execute(taskParams)
|
|
if err != nil {
|
|
return fmt.Errorf("task execution failed: %v", err)
|
|
}
|
|
|
|
// Update progress to show completion
|
|
mws.updateTaskProgress(task.ID, 100)
|
|
|
|
glog.V(2).Infof("Generic task %s completed successfully", task.ID)
|
|
return nil
|
|
}
|
|
|
|
// initializeTaskExecutors sets up the task execution registry dynamically
|
|
func (mws *MaintenanceWorkerService) initializeTaskExecutors() {
|
|
mws.taskExecutors = make(map[MaintenanceTaskType]TaskExecutor)
|
|
|
|
// Get all registered executor factories and create executors
|
|
executorRegistryMutex.RLock()
|
|
defer executorRegistryMutex.RUnlock()
|
|
|
|
for taskType, factory := range taskExecutorFactories {
|
|
executor := factory()
|
|
mws.taskExecutors[taskType] = executor
|
|
glog.V(3).Infof("Initialized executor for task type: %s", taskType)
|
|
}
|
|
|
|
glog.V(2).Infof("Initialized %d task executors", len(mws.taskExecutors))
|
|
}
|
|
|
|
// RegisterTaskExecutor allows dynamic registration of new task executors
|
|
func (mws *MaintenanceWorkerService) RegisterTaskExecutor(taskType MaintenanceTaskType, executor TaskExecutor) {
|
|
if mws.taskExecutors == nil {
|
|
mws.taskExecutors = make(map[MaintenanceTaskType]TaskExecutor)
|
|
}
|
|
mws.taskExecutors[taskType] = executor
|
|
glog.V(1).Infof("Registered executor for task type: %s", taskType)
|
|
}
|
|
|
|
// GetSupportedTaskTypes returns all task types that this worker can execute
|
|
func (mws *MaintenanceWorkerService) GetSupportedTaskTypes() []MaintenanceTaskType {
|
|
return GetSupportedExecutorTaskTypes()
|
|
}
|
|
|
|
// Start begins the worker service
|
|
func (mws *MaintenanceWorkerService) Start() error {
|
|
mws.running = true
|
|
|
|
// Register with admin server
|
|
worker := &MaintenanceWorker{
|
|
ID: mws.workerID,
|
|
Address: mws.address,
|
|
Capabilities: mws.capabilities,
|
|
MaxConcurrent: mws.maxConcurrent,
|
|
}
|
|
|
|
if mws.queue != nil {
|
|
mws.queue.RegisterWorker(worker)
|
|
}
|
|
|
|
// Start worker loop
|
|
go mws.workerLoop()
|
|
|
|
glog.Infof("Maintenance worker %s started at %s", mws.workerID, mws.address)
|
|
return nil
|
|
}
|
|
|
|
// Stop terminates the worker service
|
|
func (mws *MaintenanceWorkerService) Stop() {
|
|
mws.running = false
|
|
close(mws.stopChan)
|
|
|
|
// Wait for current tasks to complete or timeout
|
|
timeout := time.NewTimer(30 * time.Second)
|
|
defer timeout.Stop()
|
|
|
|
for len(mws.currentTasks) > 0 {
|
|
select {
|
|
case <-timeout.C:
|
|
glog.Warningf("Worker %s stopping with %d tasks still running", mws.workerID, len(mws.currentTasks))
|
|
return
|
|
case <-time.After(time.Second):
|
|
// Check again
|
|
}
|
|
}
|
|
|
|
glog.Infof("Maintenance worker %s stopped", mws.workerID)
|
|
}
|
|
|
|
// workerLoop is the main worker event loop
|
|
func (mws *MaintenanceWorkerService) workerLoop() {
|
|
heartbeatTicker := time.NewTicker(30 * time.Second)
|
|
defer heartbeatTicker.Stop()
|
|
|
|
taskRequestTicker := time.NewTicker(5 * time.Second)
|
|
defer taskRequestTicker.Stop()
|
|
|
|
for mws.running {
|
|
select {
|
|
case <-mws.stopChan:
|
|
return
|
|
case <-heartbeatTicker.C:
|
|
mws.sendHeartbeat()
|
|
case <-taskRequestTicker.C:
|
|
mws.requestTasks()
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendHeartbeat sends heartbeat to admin server
|
|
func (mws *MaintenanceWorkerService) sendHeartbeat() {
|
|
if mws.queue != nil {
|
|
mws.queue.UpdateWorkerHeartbeat(mws.workerID)
|
|
}
|
|
}
|
|
|
|
// requestTasks requests new tasks from the admin server
|
|
func (mws *MaintenanceWorkerService) requestTasks() {
|
|
if len(mws.currentTasks) >= mws.maxConcurrent {
|
|
return // Already at capacity
|
|
}
|
|
|
|
if mws.queue != nil {
|
|
task := mws.queue.GetNextTask(mws.workerID, mws.capabilities)
|
|
if task != nil {
|
|
mws.executeTask(task)
|
|
}
|
|
}
|
|
}
|
|
|
|
// executeTask executes a maintenance task
|
|
func (mws *MaintenanceWorkerService) executeTask(task *MaintenanceTask) {
|
|
mws.currentTasks[task.ID] = task
|
|
|
|
go func() {
|
|
defer func() {
|
|
delete(mws.currentTasks, task.ID)
|
|
}()
|
|
|
|
glog.Infof("Worker %s executing task %s: %s", mws.workerID, task.ID, task.Type)
|
|
|
|
// Execute task using dynamic executor registry
|
|
var err error
|
|
if executor, exists := mws.taskExecutors[task.Type]; exists {
|
|
err = executor(mws, task)
|
|
} else {
|
|
err = fmt.Errorf("unsupported task type: %s", task.Type)
|
|
glog.Errorf("No executor registered for task type: %s", task.Type)
|
|
}
|
|
|
|
// Report task completion
|
|
if mws.queue != nil {
|
|
errorMsg := ""
|
|
if err != nil {
|
|
errorMsg = err.Error()
|
|
}
|
|
mws.queue.CompleteTask(task.ID, errorMsg)
|
|
}
|
|
|
|
if err != nil {
|
|
glog.Errorf("Worker %s failed to execute task %s: %v", mws.workerID, task.ID, err)
|
|
} else {
|
|
glog.Infof("Worker %s completed task %s successfully", mws.workerID, task.ID)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// updateTaskProgress updates the progress of a task
|
|
func (mws *MaintenanceWorkerService) updateTaskProgress(taskID string, progress float64) {
|
|
if mws.queue != nil {
|
|
mws.queue.UpdateTaskProgress(taskID, progress)
|
|
}
|
|
}
|
|
|
|
// GetStatus returns the current status of the worker
|
|
func (mws *MaintenanceWorkerService) GetStatus() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"worker_id": mws.workerID,
|
|
"address": mws.address,
|
|
"running": mws.running,
|
|
"capabilities": mws.capabilities,
|
|
"max_concurrent": mws.maxConcurrent,
|
|
"current_tasks": len(mws.currentTasks),
|
|
"task_details": mws.currentTasks,
|
|
}
|
|
}
|
|
|
|
// SetQueue sets the maintenance queue for the worker
|
|
func (mws *MaintenanceWorkerService) SetQueue(queue *MaintenanceQueue) {
|
|
mws.queue = queue
|
|
}
|
|
|
|
// SetAdminClient sets the admin client for the worker
|
|
func (mws *MaintenanceWorkerService) SetAdminClient(client AdminClient) {
|
|
mws.adminClient = client
|
|
}
|
|
|
|
// SetCapabilities sets the worker capabilities
|
|
func (mws *MaintenanceWorkerService) SetCapabilities(capabilities []MaintenanceTaskType) {
|
|
mws.capabilities = capabilities
|
|
}
|
|
|
|
// SetMaxConcurrent sets the maximum concurrent tasks
|
|
func (mws *MaintenanceWorkerService) SetMaxConcurrent(max int) {
|
|
mws.maxConcurrent = max
|
|
}
|
|
|
|
// SetHeartbeatInterval sets the heartbeat interval (placeholder for future use)
|
|
func (mws *MaintenanceWorkerService) SetHeartbeatInterval(interval time.Duration) {
|
|
// Future implementation for configurable heartbeat
|
|
}
|
|
|
|
// SetTaskRequestInterval sets the task request interval (placeholder for future use)
|
|
func (mws *MaintenanceWorkerService) SetTaskRequestInterval(interval time.Duration) {
|
|
// Future implementation for configurable task requests
|
|
}
|
|
|
|
// MaintenanceWorkerCommand represents a standalone maintenance worker command
|
|
type MaintenanceWorkerCommand struct {
|
|
workerService *MaintenanceWorkerService
|
|
}
|
|
|
|
// NewMaintenanceWorkerCommand creates a new worker command
|
|
func NewMaintenanceWorkerCommand(workerID, address, adminServer string) *MaintenanceWorkerCommand {
|
|
return &MaintenanceWorkerCommand{
|
|
workerService: NewMaintenanceWorkerService(workerID, address, adminServer),
|
|
}
|
|
}
|
|
|
|
// Run starts the maintenance worker as a standalone service
|
|
func (mwc *MaintenanceWorkerCommand) Run() error {
|
|
// Generate worker ID if not provided
|
|
if mwc.workerService.workerID == "" {
|
|
hostname, _ := os.Hostname()
|
|
mwc.workerService.workerID = fmt.Sprintf("worker-%s-%d", hostname, time.Now().Unix())
|
|
}
|
|
|
|
// Start the worker service
|
|
err := mwc.workerService.Start()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to start maintenance worker: %v", err)
|
|
}
|
|
|
|
// Wait for interrupt signal
|
|
select {}
|
|
}
|