You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							944 lines
						
					
					
						
							26 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							944 lines
						
					
					
						
							26 KiB
						
					
					
				
								package worker
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"io"
							 | 
						|
									"sync"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/worker/types"
							 | 
						|
									"google.golang.org/grpc"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// GrpcAdminClient implements AdminClient using gRPC bidirectional streaming
							 | 
						|
								type GrpcAdminClient struct {
							 | 
						|
									adminAddress string
							 | 
						|
									workerID     string
							 | 
						|
									dialOption   grpc.DialOption
							 | 
						|
								
							 | 
						|
									conn         *grpc.ClientConn
							 | 
						|
									client       worker_pb.WorkerServiceClient
							 | 
						|
									stream       worker_pb.WorkerService_WorkerStreamClient
							 | 
						|
									streamCtx    context.Context
							 | 
						|
									streamCancel context.CancelFunc
							 | 
						|
								
							 | 
						|
									connected       bool
							 | 
						|
									reconnecting    bool
							 | 
						|
									shouldReconnect bool
							 | 
						|
									mutex           sync.RWMutex
							 | 
						|
								
							 | 
						|
									// Reconnection parameters
							 | 
						|
									maxReconnectAttempts int
							 | 
						|
									reconnectBackoff     time.Duration
							 | 
						|
									maxReconnectBackoff  time.Duration
							 | 
						|
									reconnectMultiplier  float64
							 | 
						|
								
							 | 
						|
									// Worker registration info for re-registration after reconnection
							 | 
						|
									lastWorkerInfo *types.WorkerData
							 | 
						|
								
							 | 
						|
									// Channels for communication
							 | 
						|
									outgoing       chan *worker_pb.WorkerMessage
							 | 
						|
									incoming       chan *worker_pb.AdminMessage
							 | 
						|
									responseChans  map[string]chan *worker_pb.AdminMessage
							 | 
						|
									responsesMutex sync.RWMutex
							 | 
						|
								
							 | 
						|
									// Shutdown channel
							 | 
						|
									shutdownChan chan struct{}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// NewGrpcAdminClient creates a new gRPC admin client
							 | 
						|
								func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.DialOption) *GrpcAdminClient {
							 | 
						|
									// Admin uses HTTP port + 10000 as gRPC port
							 | 
						|
									grpcAddress := pb.ServerToGrpcAddress(adminAddress)
							 | 
						|
								
							 | 
						|
									return &GrpcAdminClient{
							 | 
						|
										adminAddress:         grpcAddress,
							 | 
						|
										workerID:             workerID,
							 | 
						|
										dialOption:           dialOption,
							 | 
						|
										shouldReconnect:      true,
							 | 
						|
										maxReconnectAttempts: 0, // 0 means infinite attempts
							 | 
						|
										reconnectBackoff:     1 * time.Second,
							 | 
						|
										maxReconnectBackoff:  30 * time.Second,
							 | 
						|
										reconnectMultiplier:  1.5,
							 | 
						|
										outgoing:             make(chan *worker_pb.WorkerMessage, 100),
							 | 
						|
										incoming:             make(chan *worker_pb.AdminMessage, 100),
							 | 
						|
										responseChans:        make(map[string]chan *worker_pb.AdminMessage),
							 | 
						|
										shutdownChan:         make(chan struct{}),
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Connect establishes gRPC connection to admin server with TLS detection
							 | 
						|
								func (c *GrpcAdminClient) Connect() error {
							 | 
						|
									c.mutex.Lock()
							 | 
						|
									defer c.mutex.Unlock()
							 | 
						|
								
							 | 
						|
									if c.connected {
							 | 
						|
										return fmt.Errorf("already connected")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Always start the reconnection loop, even if initial connection fails
							 | 
						|
									go c.reconnectionLoop()
							 | 
						|
								
							 | 
						|
									// Attempt initial connection
							 | 
						|
									err := c.attemptConnection()
							 | 
						|
									if err != nil {
							 | 
						|
										glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err)
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// attemptConnection tries to establish the connection without managing the reconnection loop
							 | 
						|
								func (c *GrpcAdminClient) attemptConnection() error {
							 | 
						|
									// Detect TLS support and create appropriate connection
							 | 
						|
									conn, err := c.createConnection()
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to connect to admin server: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									c.conn = conn
							 | 
						|
									c.client = worker_pb.NewWorkerServiceClient(conn)
							 | 
						|
								
							 | 
						|
									// Create bidirectional stream
							 | 
						|
									c.streamCtx, c.streamCancel = context.WithCancel(context.Background())
							 | 
						|
									stream, err := c.client.WorkerStream(c.streamCtx)
							 | 
						|
									if err != nil {
							 | 
						|
										c.conn.Close()
							 | 
						|
										return fmt.Errorf("failed to create worker stream: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									c.stream = stream
							 | 
						|
									c.connected = true
							 | 
						|
								
							 | 
						|
									// Always check for worker info and send registration immediately as the very first message
							 | 
						|
									c.mutex.RLock()
							 | 
						|
									workerInfo := c.lastWorkerInfo
							 | 
						|
									c.mutex.RUnlock()
							 | 
						|
								
							 | 
						|
									if workerInfo != nil {
							 | 
						|
										// Send registration synchronously as the very first message
							 | 
						|
										if err := c.sendRegistrationSync(workerInfo); err != nil {
							 | 
						|
											c.conn.Close()
							 | 
						|
											c.connected = false
							 | 
						|
											return fmt.Errorf("failed to register worker: %w", err)
							 | 
						|
										}
							 | 
						|
										glog.Infof("Worker registered successfully with admin server")
							 | 
						|
									} else {
							 | 
						|
										// No worker info yet - stream will wait for registration
							 | 
						|
										glog.V(1).Infof("Connected to admin server, waiting for worker registration info")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Start stream handlers with synchronization
							 | 
						|
									outgoingReady := make(chan struct{})
							 | 
						|
									incomingReady := make(chan struct{})
							 | 
						|
								
							 | 
						|
									go c.handleOutgoingWithReady(outgoingReady)
							 | 
						|
									go c.handleIncomingWithReady(incomingReady)
							 | 
						|
								
							 | 
						|
									// Wait for both handlers to be ready
							 | 
						|
									<-outgoingReady
							 | 
						|
									<-incomingReady
							 | 
						|
								
							 | 
						|
									glog.Infof("Connected to admin server at %s", c.adminAddress)
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createConnection attempts to connect using the provided dial option
							 | 
						|
								func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) {
							 | 
						|
									ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
							 | 
						|
									defer cancel()
							 | 
						|
								
							 | 
						|
									conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to connect to admin server: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.Infof("Connected to admin server at %s", c.adminAddress)
							 | 
						|
									return conn, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Disconnect closes the gRPC connection
							 | 
						|
								func (c *GrpcAdminClient) Disconnect() error {
							 | 
						|
									c.mutex.Lock()
							 | 
						|
									defer c.mutex.Unlock()
							 | 
						|
								
							 | 
						|
									if !c.connected {
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									c.connected = false
							 | 
						|
									c.shouldReconnect = false
							 | 
						|
								
							 | 
						|
									// Send shutdown signal to stop reconnection loop
							 | 
						|
									select {
							 | 
						|
									case c.shutdownChan <- struct{}{}:
							 | 
						|
									default:
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Send shutdown message
							 | 
						|
									shutdownMsg := &worker_pb.WorkerMessage{
							 | 
						|
										WorkerId:  c.workerID,
							 | 
						|
										Timestamp: time.Now().Unix(),
							 | 
						|
										Message: &worker_pb.WorkerMessage_Shutdown{
							 | 
						|
											Shutdown: &worker_pb.WorkerShutdown{
							 | 
						|
												WorkerId: c.workerID,
							 | 
						|
												Reason:   "normal shutdown",
							 | 
						|
											},
							 | 
						|
										},
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									select {
							 | 
						|
									case c.outgoing <- shutdownMsg:
							 | 
						|
									case <-time.After(time.Second):
							 | 
						|
										glog.Warningf("Failed to send shutdown message")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Cancel stream context
							 | 
						|
									if c.streamCancel != nil {
							 | 
						|
										c.streamCancel()
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Close stream
							 | 
						|
									if c.stream != nil {
							 | 
						|
										c.stream.CloseSend()
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Close connection
							 | 
						|
									if c.conn != nil {
							 | 
						|
										c.conn.Close()
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Close channels
							 | 
						|
									close(c.outgoing)
							 | 
						|
									close(c.incoming)
							 | 
						|
								
							 | 
						|
									glog.Infof("Disconnected from admin server")
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// reconnectionLoop handles automatic reconnection with exponential backoff
							 | 
						|
								func (c *GrpcAdminClient) reconnectionLoop() {
							 | 
						|
									backoff := c.reconnectBackoff
							 | 
						|
									attempts := 0
							 | 
						|
								
							 | 
						|
									for {
							 | 
						|
										select {
							 | 
						|
										case <-c.shutdownChan:
							 | 
						|
											return
							 | 
						|
										default:
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										c.mutex.RLock()
							 | 
						|
										shouldReconnect := c.shouldReconnect && !c.connected && !c.reconnecting
							 | 
						|
										c.mutex.RUnlock()
							 | 
						|
								
							 | 
						|
										if !shouldReconnect {
							 | 
						|
											time.Sleep(time.Second)
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										c.mutex.Lock()
							 | 
						|
										c.reconnecting = true
							 | 
						|
										c.mutex.Unlock()
							 | 
						|
								
							 | 
						|
										glog.Infof("Attempting to reconnect to admin server (attempt %d)", attempts+1)
							 | 
						|
								
							 | 
						|
										// Attempt to reconnect
							 | 
						|
										if err := c.reconnect(); err != nil {
							 | 
						|
											attempts++
							 | 
						|
											glog.Errorf("Reconnection attempt %d failed: %v", attempts, err)
							 | 
						|
								
							 | 
						|
											// Reset reconnecting flag
							 | 
						|
											c.mutex.Lock()
							 | 
						|
											c.reconnecting = false
							 | 
						|
											c.mutex.Unlock()
							 | 
						|
								
							 | 
						|
											// Check if we should give up
							 | 
						|
											if c.maxReconnectAttempts > 0 && attempts >= c.maxReconnectAttempts {
							 | 
						|
												glog.Errorf("Max reconnection attempts (%d) reached, giving up", c.maxReconnectAttempts)
							 | 
						|
												c.mutex.Lock()
							 | 
						|
												c.shouldReconnect = false
							 | 
						|
												c.mutex.Unlock()
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Wait with exponential backoff
							 | 
						|
											glog.Infof("Waiting %v before next reconnection attempt", backoff)
							 | 
						|
								
							 | 
						|
											select {
							 | 
						|
											case <-c.shutdownChan:
							 | 
						|
												return
							 | 
						|
											case <-time.After(backoff):
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Increase backoff
							 | 
						|
											backoff = time.Duration(float64(backoff) * c.reconnectMultiplier)
							 | 
						|
											if backoff > c.maxReconnectBackoff {
							 | 
						|
												backoff = c.maxReconnectBackoff
							 | 
						|
											}
							 | 
						|
										} else {
							 | 
						|
											// Successful reconnection
							 | 
						|
											attempts = 0
							 | 
						|
											backoff = c.reconnectBackoff
							 | 
						|
											glog.Infof("Successfully reconnected to admin server")
							 | 
						|
								
							 | 
						|
											c.mutex.Lock()
							 | 
						|
											c.reconnecting = false
							 | 
						|
											c.mutex.Unlock()
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// reconnect attempts to re-establish the connection
							 | 
						|
								func (c *GrpcAdminClient) reconnect() error {
							 | 
						|
									// Clean up existing connection completely
							 | 
						|
									c.mutex.Lock()
							 | 
						|
									if c.streamCancel != nil {
							 | 
						|
										c.streamCancel()
							 | 
						|
									}
							 | 
						|
									if c.stream != nil {
							 | 
						|
										c.stream.CloseSend()
							 | 
						|
									}
							 | 
						|
									if c.conn != nil {
							 | 
						|
										c.conn.Close()
							 | 
						|
									}
							 | 
						|
									c.connected = false
							 | 
						|
									c.mutex.Unlock()
							 | 
						|
								
							 | 
						|
									// Attempt to re-establish connection using the same logic as initial connection
							 | 
						|
									err := c.attemptConnection()
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to reconnect: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Registration is now handled in attemptConnection if worker info is available
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// handleOutgoing processes outgoing messages to admin
							 | 
						|
								func (c *GrpcAdminClient) handleOutgoing() {
							 | 
						|
									for msg := range c.outgoing {
							 | 
						|
										c.mutex.RLock()
							 | 
						|
										connected := c.connected
							 | 
						|
										stream := c.stream
							 | 
						|
										c.mutex.RUnlock()
							 | 
						|
								
							 | 
						|
										if !connected {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										if err := stream.Send(msg); err != nil {
							 | 
						|
											glog.Errorf("Failed to send message to admin: %v", err)
							 | 
						|
											c.mutex.Lock()
							 | 
						|
											c.connected = false
							 | 
						|
											c.mutex.Unlock()
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// handleOutgoingWithReady processes outgoing messages and signals when ready
							 | 
						|
								func (c *GrpcAdminClient) handleOutgoingWithReady(ready chan struct{}) {
							 | 
						|
									// Signal that this handler is ready to process messages
							 | 
						|
									close(ready)
							 | 
						|
								
							 | 
						|
									// Now process messages normally
							 | 
						|
									c.handleOutgoing()
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// handleIncoming processes incoming messages from admin
							 | 
						|
								func (c *GrpcAdminClient) handleIncoming() {
							 | 
						|
									glog.V(1).Infof("📡 INCOMING HANDLER STARTED: Worker %s incoming message handler started", c.workerID)
							 | 
						|
								
							 | 
						|
									for {
							 | 
						|
										c.mutex.RLock()
							 | 
						|
										connected := c.connected
							 | 
						|
										stream := c.stream
							 | 
						|
										c.mutex.RUnlock()
							 | 
						|
								
							 | 
						|
										if !connected {
							 | 
						|
											glog.V(1).Infof("🔌 INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - not connected", c.workerID)
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										glog.V(4).Infof("👂 LISTENING: Worker %s waiting for message from admin server", c.workerID)
							 | 
						|
										msg, err := stream.Recv()
							 | 
						|
										if err != nil {
							 | 
						|
											if err == io.EOF {
							 | 
						|
												glog.Infof("🔚 STREAM CLOSED: Worker %s admin server closed the stream", c.workerID)
							 | 
						|
											} else {
							 | 
						|
												glog.Errorf("❌ RECEIVE ERROR: Worker %s failed to receive message from admin: %v", c.workerID, err)
							 | 
						|
											}
							 | 
						|
											c.mutex.Lock()
							 | 
						|
											c.connected = false
							 | 
						|
											c.mutex.Unlock()
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										glog.V(4).Infof("📨 MESSAGE RECEIVED: Worker %s received message from admin server: %T", c.workerID, msg.Message)
							 | 
						|
								
							 | 
						|
										// Route message to waiting goroutines or general handler
							 | 
						|
										select {
							 | 
						|
										case c.incoming <- msg:
							 | 
						|
											glog.V(3).Infof("✅ MESSAGE ROUTED: Worker %s successfully routed message to handler", c.workerID)
							 | 
						|
										case <-time.After(time.Second):
							 | 
						|
											glog.Warningf("🚫 MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", c.workerID, msg.Message)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(1).Infof("🏁 INCOMING HANDLER FINISHED: Worker %s incoming message handler finished", c.workerID)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// handleIncomingWithReady processes incoming messages and signals when ready
							 | 
						|
								func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) {
							 | 
						|
									// Signal that this handler is ready to process messages
							 | 
						|
									close(ready)
							 | 
						|
								
							 | 
						|
									// Now process messages normally
							 | 
						|
									c.handleIncoming()
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// RegisterWorker registers the worker with the admin server
							 | 
						|
								func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error {
							 | 
						|
									// Store worker info for re-registration after reconnection
							 | 
						|
									c.mutex.Lock()
							 | 
						|
									c.lastWorkerInfo = worker
							 | 
						|
									c.mutex.Unlock()
							 | 
						|
								
							 | 
						|
									// If not connected, registration will happen when connection is established
							 | 
						|
									if !c.connected {
							 | 
						|
										glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection")
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return c.sendRegistration(worker)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// sendRegistration sends the registration message and waits for response
							 | 
						|
								func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error {
							 | 
						|
									capabilities := make([]string, len(worker.Capabilities))
							 | 
						|
									for i, cap := range worker.Capabilities {
							 | 
						|
										capabilities[i] = string(cap)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									msg := &worker_pb.WorkerMessage{
							 | 
						|
										WorkerId:  c.workerID,
							 | 
						|
										Timestamp: time.Now().Unix(),
							 | 
						|
										Message: &worker_pb.WorkerMessage_Registration{
							 | 
						|
											Registration: &worker_pb.WorkerRegistration{
							 | 
						|
												WorkerId:      c.workerID,
							 | 
						|
												Address:       worker.Address,
							 | 
						|
												Capabilities:  capabilities,
							 | 
						|
												MaxConcurrent: int32(worker.MaxConcurrent),
							 | 
						|
												Metadata:      make(map[string]string),
							 | 
						|
											},
							 | 
						|
										},
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									select {
							 | 
						|
									case c.outgoing <- msg:
							 | 
						|
									case <-time.After(5 * time.Second):
							 | 
						|
										return fmt.Errorf("failed to send registration message: timeout")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Wait for registration response
							 | 
						|
									timeout := time.NewTimer(10 * time.Second)
							 | 
						|
									defer timeout.Stop()
							 | 
						|
								
							 | 
						|
									for {
							 | 
						|
										select {
							 | 
						|
										case response := <-c.incoming:
							 | 
						|
											if regResp := response.GetRegistrationResponse(); regResp != nil {
							 | 
						|
												if regResp.Success {
							 | 
						|
													glog.Infof("Worker registered successfully: %s", regResp.Message)
							 | 
						|
													return nil
							 | 
						|
												}
							 | 
						|
												return fmt.Errorf("registration failed: %s", regResp.Message)
							 | 
						|
											}
							 | 
						|
										case <-timeout.C:
							 | 
						|
											return fmt.Errorf("registration timeout")
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// sendRegistrationSync sends the registration message synchronously
							 | 
						|
								func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error {
							 | 
						|
									capabilities := make([]string, len(worker.Capabilities))
							 | 
						|
									for i, cap := range worker.Capabilities {
							 | 
						|
										capabilities[i] = string(cap)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									msg := &worker_pb.WorkerMessage{
							 | 
						|
										WorkerId:  c.workerID,
							 | 
						|
										Timestamp: time.Now().Unix(),
							 | 
						|
										Message: &worker_pb.WorkerMessage_Registration{
							 | 
						|
											Registration: &worker_pb.WorkerRegistration{
							 | 
						|
												WorkerId:      c.workerID,
							 | 
						|
												Address:       worker.Address,
							 | 
						|
												Capabilities:  capabilities,
							 | 
						|
												MaxConcurrent: int32(worker.MaxConcurrent),
							 | 
						|
												Metadata:      make(map[string]string),
							 | 
						|
											},
							 | 
						|
										},
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Send directly to stream to ensure it's the first message
							 | 
						|
									if err := c.stream.Send(msg); err != nil {
							 | 
						|
										return fmt.Errorf("failed to send registration message: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create a channel to receive the response
							 | 
						|
									responseChan := make(chan *worker_pb.AdminMessage, 1)
							 | 
						|
									errChan := make(chan error, 1)
							 | 
						|
								
							 | 
						|
									// Start a goroutine to listen for the response
							 | 
						|
									go func() {
							 | 
						|
										for {
							 | 
						|
											response, err := c.stream.Recv()
							 | 
						|
											if err != nil {
							 | 
						|
												errChan <- fmt.Errorf("failed to receive registration response: %w", err)
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											if regResp := response.GetRegistrationResponse(); regResp != nil {
							 | 
						|
												responseChan <- response
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
											// Continue waiting if it's not a registration response
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Wait for registration response with timeout
							 | 
						|
									timeout := time.NewTimer(10 * time.Second)
							 | 
						|
									defer timeout.Stop()
							 | 
						|
								
							 | 
						|
									select {
							 | 
						|
									case response := <-responseChan:
							 | 
						|
										if regResp := response.GetRegistrationResponse(); regResp != nil {
							 | 
						|
											if regResp.Success {
							 | 
						|
												glog.V(1).Infof("Worker registered successfully: %s", regResp.Message)
							 | 
						|
												return nil
							 | 
						|
											}
							 | 
						|
											return fmt.Errorf("registration failed: %s", regResp.Message)
							 | 
						|
										}
							 | 
						|
										return fmt.Errorf("unexpected response type")
							 | 
						|
									case err := <-errChan:
							 | 
						|
										return err
							 | 
						|
									case <-timeout.C:
							 | 
						|
										return fmt.Errorf("registration timeout")
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SendHeartbeat sends heartbeat to admin server
							 | 
						|
								func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
							 | 
						|
									if !c.connected {
							 | 
						|
										// If we're currently reconnecting, don't wait - just skip the heartbeat
							 | 
						|
										c.mutex.RLock()
							 | 
						|
										reconnecting := c.reconnecting
							 | 
						|
										c.mutex.RUnlock()
							 | 
						|
								
							 | 
						|
										if reconnecting {
							 | 
						|
											// Don't treat as an error - reconnection is in progress
							 | 
						|
											glog.V(2).Infof("Skipping heartbeat during reconnection")
							 | 
						|
											return nil
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Wait for reconnection for a short time
							 | 
						|
										if err := c.waitForConnection(10 * time.Second); err != nil {
							 | 
						|
											return fmt.Errorf("not connected to admin server: %w", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									taskIds := make([]string, len(status.CurrentTasks))
							 | 
						|
									for i, task := range status.CurrentTasks {
							 | 
						|
										taskIds[i] = task.ID
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									msg := &worker_pb.WorkerMessage{
							 | 
						|
										WorkerId:  c.workerID,
							 | 
						|
										Timestamp: time.Now().Unix(),
							 | 
						|
										Message: &worker_pb.WorkerMessage_Heartbeat{
							 | 
						|
											Heartbeat: &worker_pb.WorkerHeartbeat{
							 | 
						|
												WorkerId:       c.workerID,
							 | 
						|
												Status:         status.Status,
							 | 
						|
												CurrentLoad:    int32(status.CurrentLoad),
							 | 
						|
												MaxConcurrent:  int32(status.MaxConcurrent),
							 | 
						|
												CurrentTaskIds: taskIds,
							 | 
						|
												TasksCompleted: int32(status.TasksCompleted),
							 | 
						|
												TasksFailed:    int32(status.TasksFailed),
							 | 
						|
												UptimeSeconds:  int64(status.Uptime.Seconds()),
							 | 
						|
											},
							 | 
						|
										},
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									select {
							 | 
						|
									case c.outgoing <- msg:
							 | 
						|
										return nil
							 | 
						|
									case <-time.After(time.Second):
							 | 
						|
										return fmt.Errorf("failed to send heartbeat: timeout")
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// RequestTask requests a new task from admin server
							 | 
						|
								func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) {
							 | 
						|
									if !c.connected {
							 | 
						|
										// If we're currently reconnecting, don't wait - just return no task
							 | 
						|
										c.mutex.RLock()
							 | 
						|
										reconnecting := c.reconnecting
							 | 
						|
										c.mutex.RUnlock()
							 | 
						|
								
							 | 
						|
										if reconnecting {
							 | 
						|
											// Don't treat as an error - reconnection is in progress
							 | 
						|
											glog.V(2).Infof("🔄 RECONNECTING: Worker %s skipping task request during reconnection", workerID)
							 | 
						|
											return nil, nil
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Wait for reconnection for a short time
							 | 
						|
										if err := c.waitForConnection(5 * time.Second); err != nil {
							 | 
						|
											return nil, fmt.Errorf("not connected to admin server: %w", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									caps := make([]string, len(capabilities))
							 | 
						|
									for i, cap := range capabilities {
							 | 
						|
										caps[i] = string(cap)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(3).Infof("📤 SENDING TASK REQUEST: Worker %s sending task request to admin server with capabilities: %v",
							 | 
						|
										workerID, capabilities)
							 | 
						|
								
							 | 
						|
									msg := &worker_pb.WorkerMessage{
							 | 
						|
										WorkerId:  c.workerID,
							 | 
						|
										Timestamp: time.Now().Unix(),
							 | 
						|
										Message: &worker_pb.WorkerMessage_TaskRequest{
							 | 
						|
											TaskRequest: &worker_pb.TaskRequest{
							 | 
						|
												WorkerId:       c.workerID,
							 | 
						|
												Capabilities:   caps,
							 | 
						|
												AvailableSlots: 1, // Request one task
							 | 
						|
											},
							 | 
						|
										},
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									select {
							 | 
						|
									case c.outgoing <- msg:
							 | 
						|
										glog.V(3).Infof("✅ TASK REQUEST SENT: Worker %s successfully sent task request to admin server", workerID)
							 | 
						|
									case <-time.After(time.Second):
							 | 
						|
										glog.Errorf("❌ TASK REQUEST TIMEOUT: Worker %s failed to send task request: timeout", workerID)
							 | 
						|
										return nil, fmt.Errorf("failed to send task request: timeout")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Wait for task assignment
							 | 
						|
									glog.V(3).Infof("⏳ WAITING FOR RESPONSE: Worker %s waiting for task assignment response (5s timeout)", workerID)
							 | 
						|
									timeout := time.NewTimer(5 * time.Second)
							 | 
						|
									defer timeout.Stop()
							 | 
						|
								
							 | 
						|
									for {
							 | 
						|
										select {
							 | 
						|
										case response := <-c.incoming:
							 | 
						|
											glog.V(3).Infof("📨 RESPONSE RECEIVED: Worker %s received response from admin server: %T", workerID, response.Message)
							 | 
						|
											if taskAssign := response.GetTaskAssignment(); taskAssign != nil {
							 | 
						|
												glog.V(1).Infof("Worker %s received task assignment in response: %s (type: %s, volume: %d)",
							 | 
						|
													workerID, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
							 | 
						|
								
							 | 
						|
												// Convert to our task type
							 | 
						|
												task := &types.TaskInput{
							 | 
						|
													ID:         taskAssign.TaskId,
							 | 
						|
													Type:       types.TaskType(taskAssign.TaskType),
							 | 
						|
													Status:     types.TaskStatusAssigned,
							 | 
						|
													VolumeID:   taskAssign.Params.VolumeId,
							 | 
						|
													Server:     getServerFromParams(taskAssign.Params),
							 | 
						|
													Collection: taskAssign.Params.Collection,
							 | 
						|
													Priority:   types.TaskPriority(taskAssign.Priority),
							 | 
						|
													CreatedAt:  time.Unix(taskAssign.CreatedTime, 0),
							 | 
						|
													// Use typed protobuf parameters directly
							 | 
						|
													TypedParams: taskAssign.Params,
							 | 
						|
												}
							 | 
						|
												return task, nil
							 | 
						|
											} else {
							 | 
						|
												glog.V(3).Infof("📭 NON-TASK RESPONSE: Worker %s received non-task response: %T", workerID, response.Message)
							 | 
						|
											}
							 | 
						|
										case <-timeout.C:
							 | 
						|
											glog.V(3).Infof("⏰ TASK REQUEST TIMEOUT: Worker %s - no task assignment received within 5 seconds", workerID)
							 | 
						|
											return nil, nil // No task available
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// CompleteTask reports task completion to admin server
							 | 
						|
								func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
							 | 
						|
									return c.CompleteTaskWithMetadata(taskID, success, errorMsg, nil)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// CompleteTaskWithMetadata reports task completion with additional metadata
							 | 
						|
								func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
							 | 
						|
									if !c.connected {
							 | 
						|
										// If we're currently reconnecting, don't wait - just skip the completion report
							 | 
						|
										c.mutex.RLock()
							 | 
						|
										reconnecting := c.reconnecting
							 | 
						|
										c.mutex.RUnlock()
							 | 
						|
								
							 | 
						|
										if reconnecting {
							 | 
						|
											// Don't treat as an error - reconnection is in progress
							 | 
						|
											glog.V(2).Infof("Skipping task completion report during reconnection for task %s", taskID)
							 | 
						|
											return nil
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Wait for reconnection for a short time
							 | 
						|
										if err := c.waitForConnection(5 * time.Second); err != nil {
							 | 
						|
											return fmt.Errorf("not connected to admin server: %w", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									taskComplete := &worker_pb.TaskComplete{
							 | 
						|
										TaskId:         taskID,
							 | 
						|
										WorkerId:       c.workerID,
							 | 
						|
										Success:        success,
							 | 
						|
										ErrorMessage:   errorMsg,
							 | 
						|
										CompletionTime: time.Now().Unix(),
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add metadata if provided
							 | 
						|
									if metadata != nil {
							 | 
						|
										taskComplete.ResultMetadata = metadata
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									msg := &worker_pb.WorkerMessage{
							 | 
						|
										WorkerId:  c.workerID,
							 | 
						|
										Timestamp: time.Now().Unix(),
							 | 
						|
										Message: &worker_pb.WorkerMessage_TaskComplete{
							 | 
						|
											TaskComplete: taskComplete,
							 | 
						|
										},
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									select {
							 | 
						|
									case c.outgoing <- msg:
							 | 
						|
										return nil
							 | 
						|
									case <-time.After(time.Second):
							 | 
						|
										return fmt.Errorf("failed to send task completion: timeout")
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// UpdateTaskProgress updates task progress to admin server
							 | 
						|
								func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
							 | 
						|
									if !c.connected {
							 | 
						|
										// If we're currently reconnecting, don't wait - just skip the progress update
							 | 
						|
										c.mutex.RLock()
							 | 
						|
										reconnecting := c.reconnecting
							 | 
						|
										c.mutex.RUnlock()
							 | 
						|
								
							 | 
						|
										if reconnecting {
							 | 
						|
											// Don't treat as an error - reconnection is in progress
							 | 
						|
											glog.V(2).Infof("Skipping task progress update during reconnection for task %s", taskID)
							 | 
						|
											return nil
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Wait for reconnection for a short time
							 | 
						|
										if err := c.waitForConnection(5 * time.Second); err != nil {
							 | 
						|
											return fmt.Errorf("not connected to admin server: %w", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									msg := &worker_pb.WorkerMessage{
							 | 
						|
										WorkerId:  c.workerID,
							 | 
						|
										Timestamp: time.Now().Unix(),
							 | 
						|
										Message: &worker_pb.WorkerMessage_TaskUpdate{
							 | 
						|
											TaskUpdate: &worker_pb.TaskUpdate{
							 | 
						|
												TaskId:   taskID,
							 | 
						|
												WorkerId: c.workerID,
							 | 
						|
												Status:   "in_progress",
							 | 
						|
												Progress: float32(progress),
							 | 
						|
											},
							 | 
						|
										},
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									select {
							 | 
						|
									case c.outgoing <- msg:
							 | 
						|
										return nil
							 | 
						|
									case <-time.After(time.Second):
							 | 
						|
										return fmt.Errorf("failed to send task progress: timeout")
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// IsConnected returns whether the client is connected
							 | 
						|
								func (c *GrpcAdminClient) IsConnected() bool {
							 | 
						|
									c.mutex.RLock()
							 | 
						|
									defer c.mutex.RUnlock()
							 | 
						|
									return c.connected
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// IsReconnecting returns whether the client is currently attempting to reconnect
							 | 
						|
								func (c *GrpcAdminClient) IsReconnecting() bool {
							 | 
						|
									c.mutex.RLock()
							 | 
						|
									defer c.mutex.RUnlock()
							 | 
						|
									return c.reconnecting
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SetReconnectionSettings allows configuration of reconnection behavior
							 | 
						|
								func (c *GrpcAdminClient) SetReconnectionSettings(maxAttempts int, initialBackoff, maxBackoff time.Duration, multiplier float64) {
							 | 
						|
									c.mutex.Lock()
							 | 
						|
									defer c.mutex.Unlock()
							 | 
						|
									c.maxReconnectAttempts = maxAttempts
							 | 
						|
									c.reconnectBackoff = initialBackoff
							 | 
						|
									c.maxReconnectBackoff = maxBackoff
							 | 
						|
									c.reconnectMultiplier = multiplier
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// StopReconnection stops the reconnection loop
							 | 
						|
								func (c *GrpcAdminClient) StopReconnection() {
							 | 
						|
									c.mutex.Lock()
							 | 
						|
									defer c.mutex.Unlock()
							 | 
						|
									c.shouldReconnect = false
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// StartReconnection starts the reconnection loop
							 | 
						|
								func (c *GrpcAdminClient) StartReconnection() {
							 | 
						|
									c.mutex.Lock()
							 | 
						|
									defer c.mutex.Unlock()
							 | 
						|
									c.shouldReconnect = true
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// waitForConnection waits for the connection to be established or timeout
							 | 
						|
								func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error {
							 | 
						|
									deadline := time.Now().Add(timeout)
							 | 
						|
								
							 | 
						|
									for time.Now().Before(deadline) {
							 | 
						|
										c.mutex.RLock()
							 | 
						|
										connected := c.connected
							 | 
						|
										shouldReconnect := c.shouldReconnect
							 | 
						|
										c.mutex.RUnlock()
							 | 
						|
								
							 | 
						|
										if connected {
							 | 
						|
											return nil
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										if !shouldReconnect {
							 | 
						|
											return fmt.Errorf("reconnection is disabled")
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										time.Sleep(100 * time.Millisecond)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return fmt.Errorf("timeout waiting for connection")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetIncomingChannel returns the incoming message channel for message processing
							 | 
						|
								// This allows the worker to process admin messages directly
							 | 
						|
								func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage {
							 | 
						|
									return c.incoming
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// MockAdminClient provides a mock implementation for testing
							 | 
						|
								type MockAdminClient struct {
							 | 
						|
									workerID  string
							 | 
						|
									connected bool
							 | 
						|
									tasks     []*types.TaskInput
							 | 
						|
									mutex     sync.RWMutex
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// NewMockAdminClient creates a new mock admin client
							 | 
						|
								func NewMockAdminClient() *MockAdminClient {
							 | 
						|
									return &MockAdminClient{
							 | 
						|
										connected: true,
							 | 
						|
										tasks:     make([]*types.TaskInput, 0),
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Connect mock implementation
							 | 
						|
								func (m *MockAdminClient) Connect() error {
							 | 
						|
									m.mutex.Lock()
							 | 
						|
									defer m.mutex.Unlock()
							 | 
						|
									m.connected = true
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Disconnect mock implementation
							 | 
						|
								func (m *MockAdminClient) Disconnect() error {
							 | 
						|
									m.mutex.Lock()
							 | 
						|
									defer m.mutex.Unlock()
							 | 
						|
									m.connected = false
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// RegisterWorker mock implementation
							 | 
						|
								func (m *MockAdminClient) RegisterWorker(worker *types.WorkerData) error {
							 | 
						|
									m.workerID = worker.ID
							 | 
						|
									glog.Infof("Mock: Worker %s registered with capabilities: %v", worker.ID, worker.Capabilities)
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SendHeartbeat mock implementation
							 | 
						|
								func (m *MockAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
							 | 
						|
									glog.V(2).Infof("Mock: Heartbeat from worker %s, status: %s, load: %d/%d",
							 | 
						|
										workerID, status.Status, status.CurrentLoad, status.MaxConcurrent)
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// RequestTask mock implementation
							 | 
						|
								func (m *MockAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) {
							 | 
						|
									m.mutex.Lock()
							 | 
						|
									defer m.mutex.Unlock()
							 | 
						|
								
							 | 
						|
									if len(m.tasks) > 0 {
							 | 
						|
										task := m.tasks[0]
							 | 
						|
										m.tasks = m.tasks[1:]
							 | 
						|
										glog.Infof("Mock: Assigned task %s to worker %s", task.ID, workerID)
							 | 
						|
										return task, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// No tasks available
							 | 
						|
									return nil, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// CompleteTask mock implementation
							 | 
						|
								func (m *MockAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
							 | 
						|
									if success {
							 | 
						|
										glog.Infof("Mock: Task %s completed successfully", taskID)
							 | 
						|
									} else {
							 | 
						|
										glog.Infof("Mock: Task %s failed: %s", taskID, errorMsg)
							 | 
						|
									}
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// UpdateTaskProgress mock implementation
							 | 
						|
								func (m *MockAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
							 | 
						|
									glog.V(2).Infof("Mock: Task %s progress: %.1f%%", taskID, progress)
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// CompleteTaskWithMetadata mock implementation
							 | 
						|
								func (m *MockAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
							 | 
						|
									glog.Infof("Mock: Task %s completed: success=%v, error=%s, metadata=%v", taskID, success, errorMsg, metadata)
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// IsConnected mock implementation
							 | 
						|
								func (m *MockAdminClient) IsConnected() bool {
							 | 
						|
									m.mutex.RLock()
							 | 
						|
									defer m.mutex.RUnlock()
							 | 
						|
									return m.connected
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// AddMockTask adds a mock task for testing
							 | 
						|
								func (m *MockAdminClient) AddMockTask(task *types.TaskInput) {
							 | 
						|
									m.mutex.Lock()
							 | 
						|
									defer m.mutex.Unlock()
							 | 
						|
									m.tasks = append(m.tasks, task)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// CreateAdminClient creates an admin client with the provided dial option
							 | 
						|
								func CreateAdminClient(adminServer string, workerID string, dialOption grpc.DialOption) (AdminClient, error) {
							 | 
						|
									return NewGrpcAdminClient(adminServer, workerID, dialOption), nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// getServerFromParams extracts server address from unified sources
							 | 
						|
								func getServerFromParams(params *worker_pb.TaskParams) string {
							 | 
						|
									if len(params.Sources) > 0 {
							 | 
						|
										return params.Sources[0].Node
							 | 
						|
									}
							 | 
						|
									return ""
							 | 
						|
								}
							 |