|
|
@ -80,6 +80,21 @@ func (c *GrpcAdminClient) Connect() error { |
|
|
return fmt.Errorf("already connected") |
|
|
return fmt.Errorf("already connected") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Always start the reconnection loop, even if initial connection fails
|
|
|
|
|
|
go c.reconnectionLoop() |
|
|
|
|
|
|
|
|
|
|
|
// Attempt initial connection
|
|
|
|
|
|
err := c.attemptConnection() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err) |
|
|
|
|
|
return err |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// attemptConnection tries to establish the connection without managing the reconnection loop
|
|
|
|
|
|
func (c *GrpcAdminClient) attemptConnection() error { |
|
|
// Detect TLS support and create appropriate connection
|
|
|
// Detect TLS support and create appropriate connection
|
|
|
conn, err := c.createConnection() |
|
|
conn, err := c.createConnection() |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
@ -100,10 +115,9 @@ func (c *GrpcAdminClient) Connect() error { |
|
|
c.stream = stream |
|
|
c.stream = stream |
|
|
c.connected = true |
|
|
c.connected = true |
|
|
|
|
|
|
|
|
// Start stream handlers and reconnection loop
|
|
|
|
|
|
|
|
|
// Start stream handlers
|
|
|
go c.handleOutgoing() |
|
|
go c.handleOutgoing() |
|
|
go c.handleIncoming() |
|
|
go c.handleIncoming() |
|
|
go c.reconnectionLoop() |
|
|
|
|
|
|
|
|
|
|
|
glog.Infof("Connected to admin server at %s", c.adminAddress) |
|
|
glog.Infof("Connected to admin server at %s", c.adminAddress) |
|
|
return nil |
|
|
return nil |
|
|
@ -268,39 +282,15 @@ func (c *GrpcAdminClient) reconnect() error { |
|
|
if c.conn != nil { |
|
|
if c.conn != nil { |
|
|
c.conn.Close() |
|
|
c.conn.Close() |
|
|
} |
|
|
} |
|
|
|
|
|
c.connected = false |
|
|
c.mutex.Unlock() |
|
|
c.mutex.Unlock() |
|
|
|
|
|
|
|
|
// Create new connection
|
|
|
|
|
|
conn, err := c.createConnection() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("failed to create connection: %w", err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
client := worker_pb.NewWorkerServiceClient(conn) |
|
|
|
|
|
|
|
|
|
|
|
// Create new stream
|
|
|
|
|
|
streamCtx, streamCancel := context.WithCancel(context.Background()) |
|
|
|
|
|
stream, err := client.WorkerStream(streamCtx) |
|
|
|
|
|
|
|
|
// Attempt to re-establish connection using the same logic as initial connection
|
|
|
|
|
|
err := c.attemptConnection() |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
conn.Close() |
|
|
|
|
|
streamCancel() |
|
|
|
|
|
return fmt.Errorf("failed to create stream: %w", err) |
|
|
|
|
|
|
|
|
return fmt.Errorf("failed to reconnect: %w", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Update client state
|
|
|
|
|
|
c.mutex.Lock() |
|
|
|
|
|
c.conn = conn |
|
|
|
|
|
c.client = client |
|
|
|
|
|
c.stream = stream |
|
|
|
|
|
c.streamCtx = streamCtx |
|
|
|
|
|
c.streamCancel = streamCancel |
|
|
|
|
|
c.connected = true |
|
|
|
|
|
c.mutex.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
// Restart stream handlers
|
|
|
|
|
|
go c.handleOutgoing() |
|
|
|
|
|
go c.handleIncoming() |
|
|
|
|
|
|
|
|
|
|
|
// Re-register worker if we have previous registration info
|
|
|
// Re-register worker if we have previous registration info
|
|
|
c.mutex.RLock() |
|
|
c.mutex.RLock() |
|
|
workerInfo := c.lastWorkerInfo |
|
|
workerInfo := c.lastWorkerInfo |
|
|
@ -308,10 +298,9 @@ func (c *GrpcAdminClient) reconnect() error { |
|
|
|
|
|
|
|
|
if workerInfo != nil { |
|
|
if workerInfo != nil { |
|
|
glog.Infof("Re-registering worker after reconnection...") |
|
|
glog.Infof("Re-registering worker after reconnection...") |
|
|
if err := c.sendRegistration(workerInfo); err != nil { |
|
|
|
|
|
glog.Errorf("Failed to re-register worker: %v", err) |
|
|
|
|
|
|
|
|
if err := c.RegisterWorker(workerInfo); err != nil { |
|
|
|
|
|
glog.Warningf("Failed to re-register worker after reconnection: %v", err) |
|
|
// Don't fail the reconnection because of registration failure
|
|
|
// Don't fail the reconnection because of registration failure
|
|
|
// The registration will be retried on next heartbeat or operation
|
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -438,6 +427,17 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error { |
|
|
// SendHeartbeat sends heartbeat to admin server
|
|
|
// SendHeartbeat sends heartbeat to admin server
|
|
|
func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error { |
|
|
func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error { |
|
|
if !c.connected { |
|
|
if !c.connected { |
|
|
|
|
|
// If we're currently reconnecting, don't wait - just skip the heartbeat
|
|
|
|
|
|
c.mutex.RLock() |
|
|
|
|
|
reconnecting := c.reconnecting |
|
|
|
|
|
c.mutex.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
if reconnecting { |
|
|
|
|
|
// Don't treat as an error - reconnection is in progress
|
|
|
|
|
|
glog.V(2).Infof("Skipping heartbeat during reconnection") |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Wait for reconnection for a short time
|
|
|
// Wait for reconnection for a short time
|
|
|
if err := c.waitForConnection(10 * time.Second); err != nil { |
|
|
if err := c.waitForConnection(10 * time.Second); err != nil { |
|
|
return fmt.Errorf("not connected to admin server: %w", err) |
|
|
return fmt.Errorf("not connected to admin server: %w", err) |
|
|
@ -477,6 +477,17 @@ func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta |
|
|
// RequestTask requests a new task from admin server
|
|
|
// RequestTask requests a new task from admin server
|
|
|
func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) { |
|
|
func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) { |
|
|
if !c.connected { |
|
|
if !c.connected { |
|
|
|
|
|
// If we're currently reconnecting, don't wait - just return no task
|
|
|
|
|
|
c.mutex.RLock() |
|
|
|
|
|
reconnecting := c.reconnecting |
|
|
|
|
|
c.mutex.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
if reconnecting { |
|
|
|
|
|
// Don't treat as an error - reconnection is in progress
|
|
|
|
|
|
glog.V(2).Infof("Skipping task request during reconnection") |
|
|
|
|
|
return nil, nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Wait for reconnection for a short time
|
|
|
// Wait for reconnection for a short time
|
|
|
if err := c.waitForConnection(5 * time.Second); err != nil { |
|
|
if err := c.waitForConnection(5 * time.Second); err != nil { |
|
|
return nil, fmt.Errorf("not connected to admin server: %w", err) |
|
|
return nil, fmt.Errorf("not connected to admin server: %w", err) |
|
|
@ -543,6 +554,17 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task |
|
|
// CompleteTask reports task completion to admin server
|
|
|
// CompleteTask reports task completion to admin server
|
|
|
func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error { |
|
|
func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error { |
|
|
if !c.connected { |
|
|
if !c.connected { |
|
|
|
|
|
// If we're currently reconnecting, don't wait - just skip the completion report
|
|
|
|
|
|
c.mutex.RLock() |
|
|
|
|
|
reconnecting := c.reconnecting |
|
|
|
|
|
c.mutex.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
if reconnecting { |
|
|
|
|
|
// Don't treat as an error - reconnection is in progress
|
|
|
|
|
|
glog.V(2).Infof("Skipping task completion report during reconnection for task %s", taskID) |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Wait for reconnection for a short time
|
|
|
// Wait for reconnection for a short time
|
|
|
if err := c.waitForConnection(5 * time.Second); err != nil { |
|
|
if err := c.waitForConnection(5 * time.Second); err != nil { |
|
|
return fmt.Errorf("not connected to admin server: %w", err) |
|
|
return fmt.Errorf("not connected to admin server: %w", err) |
|
|
@ -574,6 +596,17 @@ func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg str |
|
|
// UpdateTaskProgress updates task progress to admin server
|
|
|
// UpdateTaskProgress updates task progress to admin server
|
|
|
func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error { |
|
|
func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error { |
|
|
if !c.connected { |
|
|
if !c.connected { |
|
|
|
|
|
// If we're currently reconnecting, don't wait - just skip the progress update
|
|
|
|
|
|
c.mutex.RLock() |
|
|
|
|
|
reconnecting := c.reconnecting |
|
|
|
|
|
c.mutex.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
if reconnecting { |
|
|
|
|
|
// Don't treat as an error - reconnection is in progress
|
|
|
|
|
|
glog.V(2).Infof("Skipping task progress update during reconnection for task %s", taskID) |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Wait for reconnection for a short time
|
|
|
// Wait for reconnection for a short time
|
|
|
if err := c.waitForConnection(5 * time.Second); err != nil { |
|
|
if err := c.waitForConnection(5 * time.Second); err != nil { |
|
|
return fmt.Errorf("not connected to admin server: %w", err) |
|
|
return fmt.Errorf("not connected to admin server: %w", err) |
|
|
|