diff --git a/docker/admin_integration/docker-compose-ec-test.yml b/docker/admin_integration/docker-compose-ec-test.yml index 5fc3aa704..05369b88c 100644 --- a/docker/admin_integration/docker-compose-ec-test.yml +++ b/docker/admin_integration/docker-compose-ec-test.yml @@ -128,7 +128,7 @@ services: worker1: image: chrislusf/seaweedfs:local - command: "worker -admin=admin:23646 -capabilities=ec,vacuum -maxConcurrent=2" + command: "worker -admin=admin:23646 -capabilities=erasure_coding,vacuum -maxConcurrent=2" depends_on: - admin volumes: @@ -140,7 +140,7 @@ services: worker2: image: chrislusf/seaweedfs:local - command: "worker -admin=admin:23646 -capabilities=ec,vacuum -maxConcurrent=2" + command: "worker -admin=admin:23646 -capabilities=erasure_coding,vacuum -maxConcurrent=2" depends_on: - admin volumes: @@ -152,7 +152,7 @@ services: worker3: image: chrislusf/seaweedfs:local - command: "worker -admin=admin:23646 -capabilities=ec,vacuum -maxConcurrent=2" + command: "worker -admin=admin:23646 -capabilities=erasure_coding,vacuum -maxConcurrent=2" depends_on: - admin volumes: diff --git a/weed/command/worker.go b/weed/command/worker.go index c9d69b12b..1d1296e0a 100644 --- a/weed/command/worker.go +++ b/weed/command/worker.go @@ -143,7 +143,7 @@ func runWorker(cmd *Command, args []string) bool { // Start the worker err = workerInstance.Start() if err != nil { - glog.Fatalf("Failed to start worker: %v", err) + glog.Errorf("Failed to start worker: %v", err) return false } diff --git a/weed/worker/tasks/erasure_coding/ec_detector.go b/weed/worker/tasks/erasure_coding/ec_detector.go index 567e3ce51..6bcca4d64 100644 --- a/weed/worker/tasks/erasure_coding/ec_detector.go +++ b/weed/worker/tasks/erasure_coding/ec_detector.go @@ -26,11 +26,11 @@ var ( // NewEcDetector creates a new erasure coding detector with configurable defaults func NewEcDetector() *EcDetector { return &EcDetector{ - enabled: false, // Disabled by default for safety - volumeAgeHours: 24, // Require 24 hours age by default - fullnessRatio: 0.8, // 80% full by default - minSizeMB: 100, // Minimum 100MB before considering EC - scanInterval: 2 * time.Hour, + enabled: true, // Enabled for testing + volumeAgeHours: 0, // No age requirement for testing (was 24) + fullnessRatio: 0.8, // 80% full by default + minSizeMB: 50, // Minimum 50MB for testing (was 100MB) + scanInterval: 30 * time.Second, // Faster scanning for testing } } diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go index e66e9295c..70d3fc05f 100644 --- a/weed/worker/tasks/erasure_coding/ec_register.go +++ b/weed/worker/tasks/erasure_coding/ec_register.go @@ -50,3 +50,35 @@ func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) { return task, nil } + +// getSharedInstances returns shared detector and scheduler instances +func getSharedInstances() (*EcDetector, *Scheduler) { + // Create shared instances (singleton pattern) + detector := NewEcDetector() + scheduler := NewScheduler() + return detector, scheduler +} + +// GetSharedInstances returns the shared detector and scheduler instances (public API) +func GetSharedInstances() (*EcDetector, *Scheduler) { + return getSharedInstances() +} + +// Auto-register this task when the package is imported +func init() { + factory := NewFactory() + tasks.AutoRegister(types.TaskTypeErasureCoding, factory) + + // Get shared instances for all registrations + detector, scheduler := getSharedInstances() + + // Register with types registry + tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { + registry.RegisterTask(detector, scheduler) + }) + + // Register with UI registry using the same instances + tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { + RegisterUI(uiRegistry, detector, scheduler) + }) +} diff --git a/weed/worker/worker.go b/weed/worker/worker.go index b333c8775..f152ffecb 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -90,35 +90,28 @@ func (w *Worker) Start() error { return fmt.Errorf("admin client is not set") } - // Connect to admin server - if err := w.adminClient.Connect(); err != nil { - return fmt.Errorf("failed to connect to admin server: %w", err) - } - w.running = true w.startTime = time.Now() - // Register with admin server - workerInfo := &types.Worker{ - ID: w.id, - Capabilities: w.config.Capabilities, - MaxConcurrent: w.config.MaxConcurrent, - Status: "active", - CurrentLoad: 0, - LastHeartbeat: time.Now(), - } + // Start connection attempt (will retry automatically via reconnection loop) + glog.Infof("Worker %s starting, attempting to connect to admin server...", w.id) - if err := w.adminClient.RegisterWorker(workerInfo); err != nil { - w.running = false - w.adminClient.Disconnect() - return fmt.Errorf("failed to register worker: %w", err) + // Try initial connection, but don't fail if it doesn't work immediately + if err := w.adminClient.Connect(); err != nil { + glog.Warningf("Initial connection to admin server failed, will keep retrying: %v", err) + // Don't return error - let the reconnection loop handle it + } else { + // Connection succeeded, register immediately + w.registerWorker() } - // Start worker loops + // Start worker loops regardless of initial connection status + // They will handle connection failures gracefully go w.heartbeatLoop() go w.taskRequestLoop() + go w.connectionMonitorLoop() - glog.Infof("Worker %s started", w.id) + glog.Infof("Worker %s started (connection attempts will continue in background)", w.id) return nil } @@ -390,6 +383,63 @@ func (w *Worker) GetCurrentTasks() map[string]*types.Task { return tasks } +// registerWorker registers the worker with the admin server +func (w *Worker) registerWorker() { + workerInfo := &types.Worker{ + ID: w.id, + Capabilities: w.config.Capabilities, + MaxConcurrent: w.config.MaxConcurrent, + Status: "active", + CurrentLoad: 0, + LastHeartbeat: time.Now(), + } + + if err := w.adminClient.RegisterWorker(workerInfo); err != nil { + glog.Warningf("Failed to register worker (will retry on next heartbeat): %v", err) + } else { + glog.Infof("Worker %s registered successfully with admin server", w.id) + } +} + +// connectionMonitorLoop monitors connection status and registers when connected +func (w *Worker) connectionMonitorLoop() { + ticker := time.NewTicker(10 * time.Second) // Check every 10 seconds + defer ticker.Stop() + + lastConnected := false + + for { + select { + case <-w.stopChan: + return + case <-ticker.C: + // Check if we're now connected when we weren't before + if w.adminClient != nil { + // Note: We can't easily check connection status from the interface + // so we'll try to send a heartbeat as a connectivity test + err := w.adminClient.SendHeartbeat(w.id, &types.WorkerStatus{ + WorkerID: w.id, + Status: "active", + Capabilities: w.config.Capabilities, + MaxConcurrent: w.config.MaxConcurrent, + CurrentLoad: len(w.currentTasks), + LastHeartbeat: time.Now(), + }) + + currentlyConnected := (err == nil) + + // If we just became connected, register the worker + if currentlyConnected && !lastConnected { + glog.Infof("Connection to admin server established, registering worker...") + w.registerWorker() + } + + lastConnected = currentlyConnected + } + } + } +} + // GetConfig returns the worker configuration func (w *Worker) GetConfig() *types.WorkerConfig { return w.config