@ -74,6 +74,8 @@ type grpcState struct {
lastWorkerInfo * types . WorkerData
reconnectStop chan struct { }
streamExit chan struct { }
streamFailed chan struct { } // Signals when stream has failed
regWait chan * worker_pb . RegistrationResponse
}
// NewGrpcAdminClient creates a new gRPC admin client
@ -140,7 +142,11 @@ out:
req . Resp <- nil
continue
}
err := c . sendRegistration ( req . Worker )
if state . streamFailed == nil || state . regWait == nil {
req . Resp <- fmt . Errorf ( "stream not ready for registration" )
continue
}
err := c . sendRegistration ( req . Worker , state . streamFailed , state . regWait )
req . Resp <- err
case ActionQueryConnected :
respCh := cmd . data . ( chan bool )
@ -225,13 +231,15 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
// Start stream handlers BEFORE sending registration
// This ensures handleIncoming is ready to receive the registration response
s . streamExit = make ( chan struct { } )
s . streamFailed = make ( chan struct { } )
s . regWait = make ( chan * worker_pb . RegistrationResponse , 1 )
go handleOutgoing ( s . stream , s . streamExit , c . outgoing , c . cmds )
go handleIncoming ( c . workerID , s . stream , s . streamExit , c . incoming , c . cmds )
go handleIncoming ( c . workerID , s . stream , s . streamExit , c . incoming , c . cmds , s . streamFailed , s . regWait )
// Always check for worker info and send registration immediately as the very first message
if s . lastWorkerInfo != nil {
// Send registration via the normal outgoing channel and wait for response via incoming
if err := c . sendRegistration ( s . lastWorkerInfo ) ; err != nil {
if err := c . sendRegistration ( s . lastWorkerInfo , s . streamFailed , s . regWait ) ; err != nil {
c . safeCloseChannel ( & s . streamExit )
s . streamCancel ( )
s . conn . Close ( )
@ -252,6 +260,8 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
func ( c * GrpcAdminClient ) reconnect ( s * grpcState ) error {
// Clean up existing connection completely
c . safeCloseChannel ( & s . streamExit )
c . safeCloseChannel ( & s . streamFailed )
s . regWait = nil
if s . streamCancel != nil {
s . streamCancel ( )
}
@ -324,32 +334,51 @@ func handleOutgoing(
streamExit <- chan struct { } ,
outgoing <- chan * worker_pb . WorkerMessage ,
cmds chan <- grpcCommand ) {
msgCh := make ( chan * worker_pb . WorkerMessage )
msgCh := make ( chan * worker_pb . WorkerMessage , 1 )
errCh := make ( chan error , 1 ) // Buffered to prevent blocking if the manager is busy
// Goroutine to handle blocking stream.Recv() and simultaneously handle exit
// signals
// Goroutine that perform s the block in g stream.Se nd() c al ls.
go func ( ) {
for msg := range msgCh {
if err := stream . Send ( msg ) ; err != nil {
errCh <- err
return // Exit the receiver goroutine on error/EOF
return
}
}
close ( errCh )
} ( )
for msg := range outgoing {
for {
select {
case msgCh <- msg :
case err := <- errCh :
glog . Errorf ( "Failed to send message to admin: %v" , err )
cmds <- grpcCommand { action : ActionStreamError , data : err }
return
case <- streamExit :
close ( msgCh )
<- errCh
return
case err := <- errCh :
if err != nil {
glog . Errorf ( "Failed to send message to admin: %v" , err )
cmds <- grpcCommand { action : ActionStreamError , data : err }
}
return
case msg , ok := <- outgoing :
if ! ok {
close ( msgCh )
<- errCh
return
}
select {
case msgCh <- msg :
case <- streamExit :
close ( msgCh )
<- errCh
return
case err := <- errCh :
if err != nil {
glog . Errorf ( "Failed to send message to admin: %v" , err )
cmds <- grpcCommand { action : ActionStreamError , data : err }
}
return
}
}
}
}
@ -360,7 +389,9 @@ func handleIncoming(
stream worker_pb . WorkerService_WorkerStreamClient ,
streamExit <- chan struct { } ,
incoming chan <- * worker_pb . AdminMessage ,
cmds chan <- grpcCommand ) {
cmds chan <- grpcCommand ,
streamFailed chan <- struct { } ,
regWait chan <- * worker_pb . RegistrationResponse ) {
glog . V ( 1 ) . Infof ( "INCOMING HANDLER STARTED: Worker %s incoming message handler started" , workerID )
msgCh := make ( chan * worker_pb . AdminMessage )
errCh := make ( chan error , 1 ) // Buffered to prevent blocking if the manager is busy
@ -385,7 +416,15 @@ func handleIncoming(
// Message successfully received from the stream
glog . V ( 4 ) . Infof ( "MESSAGE RECEIVED: Worker %s received message from admin server: %T" , workerID , msg . Message )
// Route message to waiting goroutines or general handler (original select logic)
// If this is a registration response, also publish to the registration waiter.
if rr := msg . GetRegistrationResponse ( ) ; rr != nil {
select {
case regWait <- rr :
default :
}
}
// Route message to general handler.
select {
case incoming <- msg :
glog . V ( 3 ) . Infof ( "MESSAGE ROUTED: Worker %s successfully routed message to handler" , workerID )
@ -401,6 +440,12 @@ func handleIncoming(
glog . Errorf ( "RECEIVE ERROR: Worker %s failed to receive message from admin: %v" , workerID , err )
}
// Signal that stream has failed (non-blocking)
select {
case streamFailed <- struct { } { } :
default :
}
// Report the failure as a command to the managerLoop (blocking)
cmds <- grpcCommand { action : ActionStreamError , data : err }
@ -460,6 +505,8 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) {
// Send shutdown signal to stop handlers loop
c . safeCloseChannel ( & s . streamExit )
c . safeCloseChannel ( & s . streamFailed )
s . regWait = nil
// Cancel stream context
if s . streamCancel != nil {
@ -495,7 +542,7 @@ func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error {
}
// sendRegistration sends the registration message and waits for response
func ( c * GrpcAdminClient ) sendRegistration ( worker * types . WorkerData ) error {
func ( c * GrpcAdminClient ) sendRegistration ( worker * types . WorkerData , streamFailed <- chan struct { } , regWait <- chan * worker_pb . RegistrationResponse ) error {
capabilities := make ( [ ] string , len ( worker . Capabilities ) )
for i , cap := range worker . Capabilities {
capabilities [ i ] = string ( cap )
@ -519,6 +566,8 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error {
case c . outgoing <- msg :
case <- time . After ( 5 * time . Second ) :
return fmt . Errorf ( "failed to send registration message: timeout" )
case <- streamFailed :
return fmt . Errorf ( "stream failed while sending registration" )
}
// Wait for registration response
@ -528,14 +577,17 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error {
for {
select {
case response := <- c . incoming :
if regResp : = response . GetRegistrationResponse ( ) ; regResp ! = nil {
if regResp . Success {
glog . Infof ( "Worker registered successfully: %s" , regResp . Message )
return nil
}
return fmt . Errorf ( "registration failed: %s" , regResp . Message )
case regRe sp := <- regWait :
if regResp == nil {
return fmt . Errorf ( "registration timeout: registration channel closed" )
}
if regResp . Success {
glog . Infof ( "Worker registered successfully: %s" , regResp . Message )
return nil
}
return fmt . Errorf ( "registration failed: %s" , regResp . Message )
case <- streamFailed :
return fmt . Errorf ( "registration timeout: stream closed by server" )
case <- timeout . C :
return fmt . Errorf ( "registration timeout" )
}