Browse Source

worker register itself first

worker-execute-ec-tasks
chrislu 5 months ago
parent
commit
19df3fcc5d
  1. 89
      weed/worker/worker.go

89
weed/worker/worker.go

@ -3,6 +3,7 @@ package worker
import (
"crypto/rand"
"fmt"
"net"
"os"
"path/filepath"
"strings"
@ -73,24 +74,43 @@ func GenerateOrLoadWorkerID(workingDir string) (string, error) {
}
}
// Generate new unique worker ID
// Generate new unique worker ID with host information
hostname, _ := os.Hostname()
if hostname == "" {
hostname = "unknown"
}
// Get local IP address for better host identification
var hostIP string
if addrs, err := net.InterfaceAddrs(); err == nil {
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
hostIP = ipnet.IP.String()
break
}
}
}
}
if hostIP == "" {
hostIP = "noip"
}
// Create host identifier combining hostname and IP
hostID := fmt.Sprintf("%s@%s", hostname, hostIP)
// Generate random component for uniqueness
randomBytes := make([]byte, 4)
var workerID string
if _, err := rand.Read(randomBytes); err != nil {
// Fallback to timestamp if crypto/rand fails
workerID = fmt.Sprintf("worker-%s-%d", hostname, time.Now().Unix())
workerID = fmt.Sprintf("worker-%s-%d", hostID, time.Now().Unix())
glog.Infof("Generated fallback worker ID: %s", workerID)
} else {
// Use random bytes + timestamp for uniqueness
randomHex := fmt.Sprintf("%x", randomBytes)
timestamp := time.Now().Unix()
workerID = fmt.Sprintf("worker-%s-%s-%d", hostname, randomHex, timestamp)
workerID = fmt.Sprintf("worker-%s-%s-%d", hostID, randomHex, timestamp)
glog.Infof("Generated new worker ID: %s", workerID)
}
@ -161,10 +181,8 @@ func (w *Worker) Start() error {
if err := w.adminClient.Connect(); err != nil {
glog.Warningf("Initial connection to admin server failed, will keep retrying: %v", err)
// Don't return error - let the reconnection loop handle it
} else {
// Connection succeeded, register immediately
w.registerWorker()
}
// Note: Registration is handled by connectionMonitorLoop to ensure proper protocol ordering
// Start worker loops regardless of initial connection status
// They will handle connection failures gracefully
@ -468,31 +486,56 @@ func (w *Worker) connectionMonitorLoop() {
defer ticker.Stop()
lastConnected := false
registrationCompleted := false
for {
select {
case <-w.stopChan:
return
case <-ticker.C:
// Check if we're now connected when we weren't before
if w.adminClient != nil {
// Note: We can't easily check connection status from the interface
// so we'll try to send a heartbeat as a connectivity test
err := w.adminClient.SendHeartbeat(w.id, &types.WorkerStatus{
WorkerID: w.id,
Status: "active",
Capabilities: w.config.Capabilities,
MaxConcurrent: w.config.MaxConcurrent,
CurrentLoad: len(w.currentTasks),
LastHeartbeat: time.Now(),
})
currentlyConnected := (err == nil)
// If we just became connected, register the worker
if currentlyConnected && !lastConnected {
var currentlyConnected bool
if !registrationCompleted {
// Before registration, test connectivity by attempting registration
glog.Infof("Connection to admin server established, registering worker...")
w.registerWorker()
workerInfo := &types.Worker{
ID: w.id,
Capabilities: w.config.Capabilities,
MaxConcurrent: w.config.MaxConcurrent,
Status: "active",
CurrentLoad: 0,
LastHeartbeat: time.Now(),
}
err := w.adminClient.RegisterWorker(workerInfo)
if err == nil {
currentlyConnected = true
registrationCompleted = true
glog.Infof("Worker %s registered successfully with admin server", w.id)
} else {
glog.V(2).Infof("Registration failed, will retry: %v", err)
currentlyConnected = false
}
} else {
// After registration, use heartbeat to test connectivity
err := w.adminClient.SendHeartbeat(w.id, &types.WorkerStatus{
WorkerID: w.id,
Status: "active",
Capabilities: w.config.Capabilities,
MaxConcurrent: w.config.MaxConcurrent,
CurrentLoad: len(w.currentTasks),
LastHeartbeat: time.Now(),
})
currentlyConnected = (err == nil)
// If we lost connection, reset registration status
if !currentlyConnected && lastConnected {
glog.Warningf("Lost connection to admin server, will re-register on reconnection")
registrationCompleted = false
}
}
lastConnected = currentlyConnected

Loading…
Cancel
Save