@ -7,6 +7,8 @@ import (
"io"
"time"
"crypto/rand"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
@ -26,6 +28,9 @@ type GrpcAdminClient struct {
cmds chan grpcCommand
// Session identification for logging
sessionID string
// Reconnection parameters
maxReconnectAttempts int
reconnectBackoff time . Duration
@ -49,6 +54,7 @@ const (
ActionQueryReconnecting grpcAction = "query_reconnecting"
ActionQueryConnected grpcAction = "query_connected"
ActionQueryShouldReconnect grpcAction = "query_shouldreconnect"
ActionResetReconnectStop grpcAction = "reset_reconnect_stop"
)
type registrationRequest struct {
@ -83,9 +89,15 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di
// Admin uses HTTP port + 10000 as gRPC port
grpcAddress := pb . ServerToGrpcAddress ( adminAddress )
// Generate a unique session ID for logging
sessionBytes := make ( [ ] byte , 2 )
rand . Read ( sessionBytes )
sessionID := fmt . Sprintf ( "%x" , sessionBytes )
c := & GrpcAdminClient {
adminAddress : grpcAddress ,
workerID : workerID ,
sessionID : sessionID ,
dialOption : dialOption ,
maxReconnectAttempts : 0 , // 0 means infinite attempts
reconnectBackoff : 1 * time . Second ,
@ -133,8 +145,11 @@ func (c *GrpcAdminClient) safeCloseChannel(chPtr *chan struct{}) {
func ( c * GrpcAdminClient ) managerLoop ( ) {
state := & grpcState { shouldReconnect : true }
glog . V ( 1 ) . Infof ( "[session %s] Manager loop started for worker %s" , c . sessionID , c . workerID )
out :
for cmd := range c . cmds {
glog . V ( 4 ) . Infof ( "[session %s] Manager received command: %s" , c . sessionID , cmd . action )
switch cmd . action {
case ActionConnect :
c . handleConnect ( cmd , state )
@ -152,11 +167,20 @@ out:
cmd . resp <- err
case ActionStreamError :
state . connected = false
// Restart reconnection loop if needed
if state . shouldReconnect && state . reconnectStop == nil {
glog . V ( 1 ) . Infof ( "[session %s] Stream error, starting reconnection loop" , c . sessionID )
stop := make ( chan struct { } )
state . reconnectStop = stop
go c . reconnectionLoop ( stop , func ( ) {
c . cmds <- grpcCommand { action : ActionResetReconnectStop , data : state }
} )
}
case ActionRegisterWorker :
req := cmd . data . ( registrationRequest )
state . lastWorkerInfo = req . Worker
if ! state . connected {
glog . V ( 1 ) . Infof ( "Not connected yet, worker info stored for registration upon connection" )
glog . V ( 1 ) . Infof ( "[session %s] Not connected yet, worker info stored for registration upon connection" , c . sessionID )
// Respond immediately with success (registration will happen later)
req . Resp <- nil
continue
@ -179,13 +203,17 @@ out:
case ActionQueryShouldReconnect :
respCh := cmd . data . ( chan bool )
respCh <- state . shouldReconnect
case ActionResetReconnectStop :
// This is an internal action to reset the stop channel when reconnectionLoop exits
s := cmd . data . ( * grpcState )
s . reconnectStop = nil
}
}
}
// Connect establishes gRPC connection to admin server with TLS detection
func ( c * GrpcAdminClient ) Connect ( ) error {
resp := make ( chan error )
resp := make ( chan error , 1 )
c . cmds <- grpcCommand {
action : ActionConnect ,
resp : resp ,
@ -199,15 +227,24 @@ func (c *GrpcAdminClient) handleConnect(cmd grpcCommand, s *grpcState) {
return
}
// Start reconnection loop immediately (async)
stop := make ( chan struct { } )
s . reconnectStop = stop
go c . reconnectionLoop ( stop )
// Start reconnection loop immediately if not already running (async)
if s . reconnectStop == nil {
glog . V ( 1 ) . Infof ( "[session %s] Starting reconnection loop" , c . sessionID )
stop := make ( chan struct { } )
s . reconnectStop = stop
go c . reconnectionLoop ( stop , func ( ) {
// This callback is executed when the reconnectionLoop exits.
// It ensures that s.reconnectStop is reset to nil, allowing a new loop to be started later.
c . cmds <- grpcCommand { action : ActionResetReconnectStop , data : s }
} )
} else {
glog . V ( 1 ) . Infof ( "[session %s] Reconnection loop already running" , c . sessionID )
}
// Attempt the initial connection
err := c . attemptConnection ( s )
if err != nil {
glog . V ( 1 ) . Infof ( "Initial connection failed, reconnection loop will retry: %v" , err )
glog . Warningf ( "[session %s] Initial connection failed, reconnection loop will retry: %v" , c . sessionID , err )
cmd . resp <- err
return
}
@ -221,10 +258,10 @@ func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) {
conn , err := pb . GrpcDial ( ctx , c . adminAddress , false , c . dialOption )
if err != nil {
return nil , fmt . Errorf ( "failed to connect to admin server: %w" , err )
return nil , fmt . Errorf ( "failed to connect to admin server %s : %w" , c . adminAddress , err )
}
glog . Infof ( "Connected to admin server at %s" , c . adminAddress )
glog . Infof ( "[session %s] Connected to admin server at %s" , c . sessionID , c . adminAddress )
return conn , nil
}
@ -242,7 +279,7 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
// Create bidirectional stream
s . streamCtx , s . streamCancel = context . WithCancel ( context . Background ( ) )
stream , err := s . client . WorkerStream ( s . streamCtx )
glog . Infof ( "Worker stream created" )
glog . Infof ( "[session %s] Worker stream created" , c . sessionID )
if err != nil {
s . conn . Close ( )
return fmt . Errorf ( "failed to create worker stream: %w" , err )
@ -255,8 +292,8 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
s . streamExit = make ( chan struct { } )
s . streamFailed = make ( chan struct { } )
s . regWait = make ( chan * worker_pb . RegistrationResponse , 1 )
go handleOutgoing ( s . stream , s . streamExit , c . outgoing , c . cmds )
go handleIncoming ( c . workerID , s . stream , s . streamExit , c . incoming , c . cmds , s . streamFailed , s . regWait )
go handleOutgoing ( c . sessionID , s . stream , s . streamExit , c . outgoing , c . cmds )
go handleIncoming ( c . sessionID , c . workerID , s . stream , s . streamExit , c . incoming , c . cmds , s . streamFailed , s . regWait )
// Always check for worker info and send registration immediately as the very first message
if s . lastWorkerInfo != nil {
@ -270,18 +307,19 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
s . connected = false
return fmt . Errorf ( "failed to register worker: %w" , err )
}
glog . Infof ( "Worker registered successfully with admin server" )
glog . Infof ( "[session %s] Worker %s registered successfully with admin server" , c . sessionID , c . workerID )
} else {
// No worker info yet - stream will wait for registration
glog . V ( 1 ) . Infof ( "Connected to admin server, waiting for worker registration info" )
glog . V ( 1 ) . Infof ( "[session %s] Connected to admin server, waiting for worker registration info" , c . sessionID )
}
glog . Infof ( "Connected to admin server at %s " , c . adminAddress )
glog . V ( 1 ) . Infof ( "[session %s] attemptConnection successful " , c . sessionID )
return nil
}
// reconnect attempts to re-establish the connection
func ( c * GrpcAdminClient ) reconnect ( s * grpcState ) error {
glog . V ( 1 ) . Infof ( "[session %s] Reconnection attempt starting" , c . sessionID )
// Clean up existing connection completely
c . safeCloseChannel ( & s . streamExit )
c . safeCloseChannel ( & s . streamFailed )
@ -299,25 +337,31 @@ func (c *GrpcAdminClient) reconnect(s *grpcState) error {
return fmt . Errorf ( "failed to reconnect: %w" , err )
}
glog . Infof ( "[session %s] Successfully reconnected to admin server" , c . sessionID )
// Registration is now handled in attemptConnection if worker info is available
return nil
}
// reconnectionLoop handles automatic reconnection with exponential backoff
func ( c * GrpcAdminClient ) reconnectionLoop ( reconnectStop chan struct { } ) {
func ( c * GrpcAdminClient ) reconnectionLoop ( reconnectStop chan struct { } , onExit func ( ) ) {
defer onExit ( ) // Ensure the cleanup callback is called when the loop exits
backoff := c . reconnectBackoff
attempts := 0
for {
attempts ++ // Count this attempt (starts at 1)
waitDuration := backoff
if attempts == 0 {
waitDuration = time . Second
if attempts == 1 {
waitDuration = 100 * time . Millisecond // Quick retry for the very first failure
}
glog . V ( 2 ) . Infof ( "[session %s] Reconnection loop sleeping for %v before attempt %d" , c . sessionID , waitDuration , attempts )
select {
case <- reconnectStop :
glog . V ( 1 ) . Infof ( "[session %s] Reconnection loop stopping (received signal)" , c . sessionID )
return
case <- time . After ( waitDuration ) :
}
resp := make ( chan error , 1 )
c . cmds <- grpcCommand {
action : ActionReconnect ,
@ -326,19 +370,17 @@ func (c *GrpcAdminClient) reconnectionLoop(reconnectStop chan struct{}) {
err := <- resp
if err == nil {
// Successful reconnection
attempts = 0
backoff = c . reconnectBackoff
glog . Infof ( "Successfully reconnected to admin server" )
glog . Infof ( "[session %s] Successfully reconnected to admin server, stopping reconnection loop" , c . sessionID )
return // EXIT ON SUCCESS
} else if errors . Is ( err , ErrAlreadyConnected ) {
attempts = 0
backoff = c . reconnectBackoff
glog . V ( 1 ) . Infof ( "[session %s] Already connected, stopping reconnection loop" , c . sessionID )
return // EXIT ON SUCCESS (already connected)
} else {
attempts ++
glog . Errorf ( "Reconnection attempt %d failed: %v" , attempts , err )
glog . Warningf ( "[session %s] Reconnection attempt %d failed: %v" , c . sessionID , attempts , err )
// Check if we should give up
if c . maxReconnectAttempts > 0 && attempts >= c . maxReconnectAttempts {
glog . Errorf ( "Max reconnection attempts (%d) reached, giving up" , c . maxReconnectAttempts )
glog . Errorf ( "[session %s] Max reconnection attempts (%d) reached, giving up" , c . sessionID , c . maxReconnectAttempts )
return
}
@ -347,17 +389,21 @@ func (c *GrpcAdminClient) reconnectionLoop(reconnectStop chan struct{}) {
if backoff > c . maxReconnectBackoff {
backoff = c . maxReconnectBackoff
}
glog . Infof ( "Waiting %v before next reconnection attempt" , backoff )
glog . V ( 1 ) . Infof ( "[session %s] Waiting %v before next reconnection attempt" , c . sessionID , backoff )
}
}
}
// handleOutgoing processes outgoing messages to admin
func handleOutgoing (
sessionID string ,
stream worker_pb . WorkerService_WorkerStreamClient ,
streamExit <- chan struct { } ,
outgoing <- chan * worker_pb . WorkerMessage ,
cmds chan <- grpcCommand ) {
glog . V ( 1 ) . Infof ( "[session %s] Outgoing message handler started" , sessionID )
defer glog . V ( 1 ) . Infof ( "[session %s] Outgoing message handler stopping" , sessionID )
msgCh := make ( chan * worker_pb . WorkerMessage , 1 )
errCh := make ( chan error , 1 ) // Buffered to prevent blocking if the manager is busy
@ -375,18 +421,18 @@ func handleOutgoing(
// Helper function to handle stream errors and cleanup
handleStreamError := func ( err error ) {
if err != nil {
glog . Errorf ( "Failed to send message to admin: %v" , err )
glog . Warningf ( "[session %s] Stream send error: %v" , sessionID , err )
select {
case cmds <- grpcCommand { action : ActionStreamError , data : err } :
// Successfully queued
default :
// Manager busy, queue asynchronously to avoid blocking
glog . V ( 2 ) . Infof ( "Manager busy, queuing stream error asynchronously from outgoing handler: %v" , err )
glog . V ( 2 ) . Infof ( "[session %s] Manager busy, queuing stream error asynchronously from outgoing handler: %v" , sessionID , err )
go func ( e error ) {
select {
case cmds <- grpcCommand { action : ActionStreamError , data : e } :
case <- time . After ( 2 * time . Second ) :
glog . Warningf ( "Failed to send stream error to manager from outgoing handler, channel blocked: %v" , e )
glog . Warningf ( "[session %s] Failed to send stream error to manager from outgoing handler, channel blocked: %v" , sessionID , e )
}
} ( err )
}
@ -428,14 +474,17 @@ func handleOutgoing(
// handleIncoming processes incoming messages from admin
func handleIncoming (
sessionID string ,
workerID string ,
stream worker_pb . WorkerService_WorkerStreamClient ,
streamExit <- chan struct { } ,
incoming chan <- * worker_pb . AdminMessage ,
cmds chan <- grpcCommand ,
streamFailed chan <- struct { } ,
regWait chan <- * worker_pb . RegistrationResponse ) {
glog . V ( 1 ) . Infof ( "INCOMING HANDLER STARTED: Worker %s incoming message handler started" , workerID )
streamFailed chan struct { } ,
regWait chan * worker_pb . RegistrationResponse ) {
glog . V ( 1 ) . Infof ( "[session %s] Incoming message handler started" , sessionID )
defer glog . V ( 1 ) . Infof ( "[session %s] Incoming message handler stopping" , sessionID )
msgCh := make ( chan * worker_pb . AdminMessage )
errCh := make ( chan error , 1 ) // Buffered to prevent blocking if the manager is busy
// regWait is buffered with size 1 so that the registration response can be sent
@ -455,12 +504,10 @@ func handleIncoming(
} ( )
for {
glog . V ( 4 ) . Infof ( "LISTENING: Worker %s waiting for message from admin server" , workerID )
select {
case msg := <- msgCh :
// Message successfully received from the stream
glog . V ( 4 ) . Infof ( "MESSAGE RECEIVED: Worker %s received message from admin server: %T" , worker ID , msg . Message )
glog . V ( 4 ) . Infof ( "[session %s] Received message from admin server: %T" , session ID , msg . Message )
// If this is a registration response, also publish to the registration waiter.
// regWait is buffered (size 1) so that the response can be sent even if sendRegistration
@ -468,26 +515,26 @@ func handleIncoming(
if rr := msg . GetRegistrationResponse ( ) ; rr != nil {
select {
case regWait <- rr :
glog . V ( 3 ) . Infof ( "REGISTRATION RESPONSE: Worker %s routed registration response to waiter" , worker ID )
glog . V ( 3 ) . Infof ( "[session %s] Registration response routed to waiter" , session ID )
default :
glog . V ( 2 ) . Infof ( "REGISTRATION RESPONSE DROPPED: Worker %s registration response dropped (no waiter)" , worker ID )
glog . V ( 2 ) . Infof ( "[session %s] Registration response dropped (no waiter)" , session ID )
}
}
// Route message to general handler.
select {
case incoming <- msg :
glog . V ( 3 ) . Infof ( "MESSAGE ROUTED: Worker %s successfully routed message to handler" , worker ID )
glog . V ( 3 ) . Infof ( "[session %s] Message routed to incoming channel" , session ID )
case <- time . After ( time . Second ) :
glog . Warningf ( "MESSAGE DROPPED: Worker %s i ncoming message buffer full, dropping message: %T" , worker ID, msg . Message )
glog . Warningf ( "[session %s] I ncoming message buffer full, dropping message: %T" , session ID, msg . Message )
}
case err := <- errCh :
// Stream Receiver goroutine reported an error (EOF or network error)
if err == io . EOF {
glog . Infof ( "STREAM CLOSED: Worker %s admin server closed the stream" , worker ID )
glog . Infof ( "[session %s] Admin server closed the stream" , session ID )
} else {
glog . Errorf ( "RECEIVE ERROR: Worker %s failed to receive message from admin: %v" , worker ID, err )
glog . Warningf ( "[session %s] Stream receive error: %v" , session ID, err )
}
// Signal that stream has failed (non-blocking)
@ -502,23 +549,23 @@ func handleIncoming(
select {
case cmds <- grpcCommand { action : ActionStreamError , data : err } :
default :
glog . V ( 2 ) . Infof ( "Manager busy, queuing stream error asynchronously: %v" , err )
glog . V ( 2 ) . Infof ( "[session %s] Manager busy, queuing stream error asynchronously from incoming handler : %v" , sessionID , err )
go func ( e error ) {
select {
case cmds <- grpcCommand { action : ActionStreamError , data : e } :
case <- time . After ( 2 * time . Second ) :
glog . Warningf ( "Failed to send stream error to manager, channel blocked: %v" , e )
glog . Warningf ( "[session %s] Failed to send stream error to manager from incoming handler , channel blocked: %v" , sessionID , e )
}
} ( err )
}
// Exit the main handler loop
glog . V ( 1 ) . Infof ( "INCOMING HANDLER STOPPED: Worker %s stopping incoming handler due to stream error" , worker ID )
glog . V ( 1 ) . Infof ( "[session %s] Incoming message handler stopping due to stream error" , session ID )
return
case <- streamExit :
// Manager closed this channel, signaling a controlled disconnection.
glog . V ( 1 ) . Infof ( "INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - received exit signal" , worker ID )
glog . V ( 1 ) . Infof ( "[session %s] Incoming message handler stopping - received exit signal" , session ID )
return
}
}