From 19df3fcc5d65296f63a37b6028a66da70014353a Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 25 Jul 2025 13:22:33 -0700 Subject: [PATCH] worker register itself first --- weed/worker/worker.go | 89 ++++++++++++++++++++++++++++++++----------- 1 file changed, 66 insertions(+), 23 deletions(-) diff --git a/weed/worker/worker.go b/weed/worker/worker.go index a60168bbd..6f78fe6ff 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -3,6 +3,7 @@ package worker import ( "crypto/rand" "fmt" + "net" "os" "path/filepath" "strings" @@ -73,24 +74,43 @@ func GenerateOrLoadWorkerID(workingDir string) (string, error) { } } - // Generate new unique worker ID + // Generate new unique worker ID with host information hostname, _ := os.Hostname() if hostname == "" { hostname = "unknown" } + // Get local IP address for better host identification + var hostIP string + if addrs, err := net.InterfaceAddrs(); err == nil { + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + hostIP = ipnet.IP.String() + break + } + } + } + } + if hostIP == "" { + hostIP = "noip" + } + + // Create host identifier combining hostname and IP + hostID := fmt.Sprintf("%s@%s", hostname, hostIP) + // Generate random component for uniqueness randomBytes := make([]byte, 4) var workerID string if _, err := rand.Read(randomBytes); err != nil { // Fallback to timestamp if crypto/rand fails - workerID = fmt.Sprintf("worker-%s-%d", hostname, time.Now().Unix()) + workerID = fmt.Sprintf("worker-%s-%d", hostID, time.Now().Unix()) glog.Infof("Generated fallback worker ID: %s", workerID) } else { // Use random bytes + timestamp for uniqueness randomHex := fmt.Sprintf("%x", randomBytes) timestamp := time.Now().Unix() - workerID = fmt.Sprintf("worker-%s-%s-%d", hostname, randomHex, timestamp) + workerID = fmt.Sprintf("worker-%s-%s-%d", hostID, randomHex, timestamp) glog.Infof("Generated new worker ID: %s", workerID) } @@ -161,10 +181,8 @@ func (w *Worker) Start() error { if err := w.adminClient.Connect(); err != nil { glog.Warningf("Initial connection to admin server failed, will keep retrying: %v", err) // Don't return error - let the reconnection loop handle it - } else { - // Connection succeeded, register immediately - w.registerWorker() } + // Note: Registration is handled by connectionMonitorLoop to ensure proper protocol ordering // Start worker loops regardless of initial connection status // They will handle connection failures gracefully @@ -468,31 +486,56 @@ func (w *Worker) connectionMonitorLoop() { defer ticker.Stop() lastConnected := false + registrationCompleted := false for { select { case <-w.stopChan: return case <-ticker.C: - // Check if we're now connected when we weren't before if w.adminClient != nil { - // Note: We can't easily check connection status from the interface - // so we'll try to send a heartbeat as a connectivity test - err := w.adminClient.SendHeartbeat(w.id, &types.WorkerStatus{ - WorkerID: w.id, - Status: "active", - Capabilities: w.config.Capabilities, - MaxConcurrent: w.config.MaxConcurrent, - CurrentLoad: len(w.currentTasks), - LastHeartbeat: time.Now(), - }) - - currentlyConnected := (err == nil) - - // If we just became connected, register the worker - if currentlyConnected && !lastConnected { + var currentlyConnected bool + + if !registrationCompleted { + // Before registration, test connectivity by attempting registration glog.Infof("Connection to admin server established, registering worker...") - w.registerWorker() + + workerInfo := &types.Worker{ + ID: w.id, + Capabilities: w.config.Capabilities, + MaxConcurrent: w.config.MaxConcurrent, + Status: "active", + CurrentLoad: 0, + LastHeartbeat: time.Now(), + } + + err := w.adminClient.RegisterWorker(workerInfo) + if err == nil { + currentlyConnected = true + registrationCompleted = true + glog.Infof("Worker %s registered successfully with admin server", w.id) + } else { + glog.V(2).Infof("Registration failed, will retry: %v", err) + currentlyConnected = false + } + } else { + // After registration, use heartbeat to test connectivity + err := w.adminClient.SendHeartbeat(w.id, &types.WorkerStatus{ + WorkerID: w.id, + Status: "active", + Capabilities: w.config.Capabilities, + MaxConcurrent: w.config.MaxConcurrent, + CurrentLoad: len(w.currentTasks), + LastHeartbeat: time.Now(), + }) + + currentlyConnected = (err == nil) + + // If we lost connection, reset registration status + if !currentlyConnected && lastConnected { + glog.Warningf("Lost connection to admin server, will re-register on reconnection") + registrationCompleted = false + } } lastConnected = currentlyConnected