From 85369414f9caa3f02d3a4d1eb7bd1181ab46964c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Dec 2025 00:10:47 -0800 Subject: [PATCH] fix: improve worker reconnection robustness and prevent handleOutgoing hang - Add dedicated streamFailed signaling channel to abort registration waits early when stream dies - Add per-connection regWait channel to route RegistrationResponse separately from shared incoming channel, avoiding race where other consumers steal the response - Refactor handleOutgoing() loop to use select on streamExit/errCh, ensuring old handlers exit cleanly on reconnect (prevents stale senders competing with new stream) - Buffer msgCh to reduce shutdown edge cases - Add cleanup of streamFailed and regWait channels on reconnect/disconnect - Fixes registration timeout and potential stream lifecycle hangs on aggressive server max_age recycling --- weed/worker/client.go | 100 ++++++++++++++++++++++++++++++++---------- 1 file changed, 76 insertions(+), 24 deletions(-) diff --git a/weed/worker/client.go b/weed/worker/client.go index d562b8703..7d5a8a8b2 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -74,6 +74,8 @@ type grpcState struct { lastWorkerInfo *types.WorkerData reconnectStop chan struct{} streamExit chan struct{} + streamFailed chan struct{} // Signals when stream has failed + regWait chan *worker_pb.RegistrationResponse } // NewGrpcAdminClient creates a new gRPC admin client @@ -140,7 +142,11 @@ out: req.Resp <- nil continue } - err := c.sendRegistration(req.Worker) + if state.streamFailed == nil || state.regWait == nil { + req.Resp <- fmt.Errorf("stream not ready for registration") + continue + } + err := c.sendRegistration(req.Worker, state.streamFailed, state.regWait) req.Resp <- err case ActionQueryConnected: respCh := cmd.data.(chan bool) @@ -225,13 +231,15 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { // Start stream handlers BEFORE sending registration // This ensures handleIncoming is ready to receive the registration response s.streamExit = make(chan struct{}) + s.streamFailed = make(chan struct{}) + s.regWait = make(chan *worker_pb.RegistrationResponse, 1) go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds) - go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds) + go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds, s.streamFailed, s.regWait) // Always check for worker info and send registration immediately as the very first message if s.lastWorkerInfo != nil { // Send registration via the normal outgoing channel and wait for response via incoming - if err := c.sendRegistration(s.lastWorkerInfo); err != nil { + if err := c.sendRegistration(s.lastWorkerInfo, s.streamFailed, s.regWait); err != nil { c.safeCloseChannel(&s.streamExit) s.streamCancel() s.conn.Close() @@ -252,6 +260,8 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { func (c *GrpcAdminClient) reconnect(s *grpcState) error { // Clean up existing connection completely c.safeCloseChannel(&s.streamExit) + c.safeCloseChannel(&s.streamFailed) + s.regWait = nil if s.streamCancel != nil { s.streamCancel() } @@ -324,32 +334,51 @@ func handleOutgoing( streamExit <-chan struct{}, outgoing <-chan *worker_pb.WorkerMessage, cmds chan<- grpcCommand) { - - msgCh := make(chan *worker_pb.WorkerMessage) + msgCh := make(chan *worker_pb.WorkerMessage, 1) errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy - // Goroutine to handle blocking stream.Recv() and simultaneously handle exit - // signals + + // Goroutine that performs the blocking stream.Send() calls. go func() { for msg := range msgCh { if err := stream.Send(msg); err != nil { errCh <- err - return // Exit the receiver goroutine on error/EOF + return } } close(errCh) }() - for msg := range outgoing { + for { select { - case msgCh <- msg: - case err := <-errCh: - glog.Errorf("Failed to send message to admin: %v", err) - cmds <- grpcCommand{action: ActionStreamError, data: err} - return case <-streamExit: close(msgCh) <-errCh return + case err := <-errCh: + if err != nil { + glog.Errorf("Failed to send message to admin: %v", err) + cmds <- grpcCommand{action: ActionStreamError, data: err} + } + return + case msg, ok := <-outgoing: + if !ok { + close(msgCh) + <-errCh + return + } + select { + case msgCh <- msg: + case <-streamExit: + close(msgCh) + <-errCh + return + case err := <-errCh: + if err != nil { + glog.Errorf("Failed to send message to admin: %v", err) + cmds <- grpcCommand{action: ActionStreamError, data: err} + } + return + } } } } @@ -360,7 +389,9 @@ func handleIncoming( stream worker_pb.WorkerService_WorkerStreamClient, streamExit <-chan struct{}, incoming chan<- *worker_pb.AdminMessage, - cmds chan<- grpcCommand) { + cmds chan<- grpcCommand, + streamFailed chan<- struct{}, + regWait chan<- *worker_pb.RegistrationResponse) { glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", workerID) msgCh := make(chan *worker_pb.AdminMessage) errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy @@ -385,7 +416,15 @@ func handleIncoming( // Message successfully received from the stream glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", workerID, msg.Message) - // Route message to waiting goroutines or general handler (original select logic) + // If this is a registration response, also publish to the registration waiter. + if rr := msg.GetRegistrationResponse(); rr != nil { + select { + case regWait <- rr: + default: + } + } + + // Route message to general handler. select { case incoming <- msg: glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", workerID) @@ -401,6 +440,12 @@ func handleIncoming( glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", workerID, err) } + // Signal that stream has failed (non-blocking) + select { + case streamFailed <- struct{}{}: + default: + } + // Report the failure as a command to the managerLoop (blocking) cmds <- grpcCommand{action: ActionStreamError, data: err} @@ -460,6 +505,8 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { // Send shutdown signal to stop handlers loop c.safeCloseChannel(&s.streamExit) + c.safeCloseChannel(&s.streamFailed) + s.regWait = nil // Cancel stream context if s.streamCancel != nil { @@ -495,7 +542,7 @@ func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error { } // sendRegistration sends the registration message and waits for response -func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error { +func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData, streamFailed <-chan struct{}, regWait <-chan *worker_pb.RegistrationResponse) error { capabilities := make([]string, len(worker.Capabilities)) for i, cap := range worker.Capabilities { capabilities[i] = string(cap) @@ -519,6 +566,8 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error { case c.outgoing <- msg: case <-time.After(5 * time.Second): return fmt.Errorf("failed to send registration message: timeout") + case <-streamFailed: + return fmt.Errorf("stream failed while sending registration") } // Wait for registration response @@ -528,14 +577,17 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error { for { select { - case response := <-c.incoming: - if regResp := response.GetRegistrationResponse(); regResp != nil { - if regResp.Success { - glog.Infof("Worker registered successfully: %s", regResp.Message) - return nil - } - return fmt.Errorf("registration failed: %s", regResp.Message) + case regResp := <-regWait: + if regResp == nil { + return fmt.Errorf("registration timeout: registration channel closed") + } + if regResp.Success { + glog.Infof("Worker registered successfully: %s", regResp.Message) + return nil } + return fmt.Errorf("registration failed: %s", regResp.Message) + case <-streamFailed: + return fmt.Errorf("registration timeout: stream closed by server") case <-timeout.C: return fmt.Errorf("registration timeout") }