Browse Source

fix: improve worker reconnection robustness and prevent handleOutgoing hang (#7838)

* feat: add automatic port detection and fallback for mini command

- Added port availability detection using TCP binding tests
- Implemented port fallback mechanism searching for available ports
- Support for both HTTP and gRPC port handling
- IP-aware port checking using actual service bind address
- Dual-interface verification (specific IP and wildcard 0.0.0.0)
- All services (Master, Volume, Filer, S3, WebDAV, Admin) auto-reallocate to available ports
- Enables multiple mini instances to run simultaneously without conflicts

* fix: use actual bind IP for service health checks

- Previously health checks were hardcoded to localhost (127.0.0.1)
- This caused failures when services bind to actual IP (e.g., 10.21.153.8)
- Now health checks use the same IP that services are binding to
- Fixes Volume and other service health check failures on non-localhost IPs

* refactor: improve port detection logic and remove gRPC handling duplication

- findAvailablePortOnIP now returns 0 on failure instead of unavailable port
  Allows callers to detect when port finding fails and handle appropriately

- Remove duplicate gRPC port handling from ensureAllPortsAvailableOnIP
  All gRPC port logic is now centralized in initializeGrpcPortsOnIP

- Log final port configuration only after all ports are finalized
  Both HTTP and gRPC ports are now correctly initialized before logging

- Add error logging when port allocation fails
  Makes debugging easier when ports can't be found

* refactor: fix race condition and clean up port detection code

- Convert parallel HTTP port checks to sequential to prevent race conditions
  where multiple goroutines could allocate the same available port
- Remove unused 'sync' import since WaitGroup is no longer used
- Add documentation to localhost wrapper functions explaining they are
  kept for backwards compatibility and future use
- All gRPC port logic is now exclusively handled in initializeGrpcPortsOnIP
  eliminating any duplication in ensureAllPortsAvailableOnIP

* refactor: address code review comments - constants, helper function, and cleanup

- Define GrpcPortOffset constant (10000) to replace magic numbers throughout
  the code for better maintainability and consistency
- Extract bindIp determination logic into getBindIp() helper function
  to eliminate code duplication between runMini and startMiniServices
- Remove redundant 'calculatedPort = calculatedPort' assignment that had no effect
- Update all gRPC port calculations to use GrpcPortOffset constant
  (lines 489, 886 and the error logging at line 501)

* refactor: remove unused wrapper functions and update documentation

- Remove unused localhost wrapper functions that were never called:
  - isPortOpen() - wrapper around isPortOpenOnIP with hardcoded 127.0.0.1
  - findAvailablePort() - wrapper around findAvailablePortOnIP with hardcoded 127.0.0.1
  - ensurePortAvailable() - wrapper around ensurePortAvailableOnIP with hardcoded 127.0.0.1
  - ensureAllPortsAvailable() - wrapper around ensureAllPortsAvailableOnIP with hardcoded 127.0.0.1

  Since this is new functionality with no backwards compatibility concerns,
  these wrapper functions were not needed. The comments claiming they were
  'kept for future use or backwards compatibility' are no longer valid.

- Update documentation to reference GrpcPortOffset constant instead of hardcoded 10000:
  - Update comment in ensureAllPortsAvailableOnIP to use GrpcPortOffset
  - Update admin.port.grpc flag help text to reference GrpcPortOffset

Note: getBindIp() is actually being used and should be retained (contrary to
the review comment suggesting it was unused - it's called in both runMini
and startMiniServices functions)

* refactor: prevent HTTP/gRPC port collisions and improve error handling

- Add upfront reservation of all calculated gRPC ports before allocating HTTP ports
  to prevent collisions where an HTTP port allocation could use a port that will
  later be needed for a gRPC port calculation.

  Example scenario that is now prevented:
  - Master HTTP reallocated from 9333 to 9334 (original in use)
  - Filer HTTP search finds 19334 available and assigns it
  - Master gRPC calculated as 9334 + GrpcPortOffset = 19334 → collision!

  Now: reserved gRPC ports are tracked upfront and HTTP port search skips them.

- Improve admin server gRPC port fallback error handling:
  - Change from silent V(1) verbose log to Warningf to make the error visible
  - Update comment to clarify this indicates a problem in the port initialization sequence
  - Add explanation that the fallback calculation may cause bind failure

- Update ensureAllPortsAvailableOnIP comment to clarify it avoids reserved ports

* fix: enforce reserved ports in HTTP allocation and improve admin gRPC fallback

Critical fixes for port allocation safety:

1. Make findAvailablePortOnIP and ensurePortAvailableOnIP aware of reservedPorts:
   - Add reservedPorts map parameter to both functions
   - findAvailablePortOnIP now skips reserved ports when searching for alternatives
   - ensurePortAvailableOnIP passes reservedPorts through to findAvailablePortOnIP
   - This prevents HTTP ports from being allocated to ports reserved for gRPC

2. Update ensureAllPortsAvailableOnIP to pass reservedPorts:
   - Pass the reservedPorts map to ensurePortAvailableOnIP calls
   - Maintains the map updates (delete/add) for accuracy as ports change

3. Replace blind admin gRPC port fallback with proper availability checks:
   - Previous code just calculated *miniAdminOptions.port + GrpcPortOffset
   - New code checks both the calculated port and finds alternatives if needed
   - Uses the same availability checking logic as initializeGrpcPortsOnIP
   - Properly logs the fallback process and any port changes
   - Will fail gracefully if no available ports found (consistent with other services)

These changes eliminate two critical vulnerabilities:
- HTTP port allocation can no longer accidentally claim gRPC ports
- Admin gRPC port fallback no longer blindly uses an unchecked port

* fix: prevent gRPC port collisions during multi-service fallback allocation

Critical fix for gRPC port allocation safety across multiple services:

Problem: When multiple services need gRPC port fallback allocation in sequence
(e.g., Master gRPC unavailable → finds alternative, then Filer gRPC unavailable
→ searches from calculated port), there was no tracking of previously allocated
gRPC ports. This could allow two services to claim the same port.

Scenario that is now prevented:
- Master gRPC: calculated 19333 unavailable → finds 19334 → assigns 19334
- Filer gRPC: calculated 18888 unavailable → searches from 18889, might land on
  19334 if consecutive ports in range are unavailable (especially with custom
  port configurations or in high-port-contention environments)

Solution:
- Add allocatedGrpcPorts map to track gRPC ports allocated within the function
- Check allocatedGrpcPorts before using calculated port for each service
- Pass allocatedGrpcPorts to findAvailablePortOnIP when finding fallback ports
- Add allocatedGrpcPorts[port] = true after each successful allocation
- This ensures no two services can allocate the same gRPC port

The fix handles both:
1. Calculated gRPC ports (when grpcPort == 0)
2. Explicitly set gRPC ports (when user provides -service.port.grpc value)

While default port spacing makes collision unlikely, this fix is essential for:
- Custom port configurations
- High-contention environments
- Edge cases with many unavailable consecutive ports
- Correctness and safety guarantees

* feat: enforce hard-fail behavior for explicitly specified ports

When users explicitly specify a port via command-line flags (e.g., -s3.port=8333),
the server should fail immediately if the port is unavailable, rather than silently
falling back to an alternative port. This prevents user confusion and makes misconfiguration
failures obvious.

Changes:
- Modified ensurePortAvailableOnIP() to check if a port was explicitly passed via isFlagPassed()
- If an explicit port is unavailable, return error instead of silently allocating alternative
- Updated ensureAllPortsAvailableOnIP() to handle the returned error and fail startup
- Modified runMini() to check error from ensureAllPortsAvailableOnIP() and return false on failure
- Default ports (not explicitly specified) continue to fallback to available alternatives

This ensures:
- Explicit ports: fail if unavailable (e.g., -s3.port=8333 fails if 8333 is taken)
- Default ports: fallback to alternatives (e.g., s3.port without flag falls back to 8334 if 8333 taken)

* fix: accurate error messages for explicitly specified unavailable ports

When a port is explicitly specified via CLI flags but is unavailable, the error message
now correctly reports the originally requested port instead of reporting a fallback port
that was calculated internally.

The issue was that the config file applied after CLI flag parsing caused isFlagPassed()
to return true for ports loaded from the config file (since flag.Visit() was called during
config file application), incorrectly marking them as explicitly specified.

Solution: Capture which port flags were explicitly passed on the CLI BEFORE the config file
is applied, storing them in the explicitPortFlags map. This preserves the accurate
distinction between user-specified ports and defaults/config-file ports.

Example:
- User runs: weed mini -dir=. -s3.port=22
- Now correctly shows: 'port 22 for S3 (specified by flag s3.port) is not available'
- Previously incorrectly showed: 'port 8334 for S3...' (some calculated fallback)

* fix: respect explicitly specified ports and prevent config file override

When a port is explicitly specified via CLI flags (e.g., -s3.port=8333),
the config file options should NOT override it. Previously, config file
options would be applied if the flag value differed from default, but
this check wasn't sufficient to prevent override in all cases.

Solution: Check the explicitPortFlags map before applying any config file
port options. If a port was explicitly passed on the CLI, skip applying
the config file option for that port.

This ensures:
- Explicit ports take absolute precedence over config file ports
- Config file ports are only used if port wasn't specified on CLI
- Example: 'weed mini -s3.port=8333' will use 8333, never the config file value

* fix: don't print usage on port allocation error

When a port allocation fails (e.g., explicit port is unavailable), exit
immediately without showing the usage example. This provides cleaner
error output when the error is expected (port conflict).

* refactor: clean up code quality issues

Remove no-op assignment (calculatedPort = calculatedPort) that had no effect.
The variable already holds the correct value when no alternative port is
found.

Improve documentation for the defensive gRPC port initialization fallback
in startAdminServer. While this code shouldn't execute in normal flow
because ensureAllPortsAvailableOnIP is called earlier in runMini, the
fallback handles edge cases where port initialization may have been skipped
or failed silently due to configuration changes or error handling paths.

* fix: improve worker reconnection robustness and prevent handleOutgoing hang

- Add dedicated streamFailed signaling channel to abort registration waits early when stream dies
- Add per-connection regWait channel to route RegistrationResponse separately from shared incoming channel, avoiding race where other consumers steal the response
- Refactor handleOutgoing() loop to use select on streamExit/errCh, ensuring old handlers exit cleanly on reconnect (prevents stale senders competing with new stream)
- Buffer msgCh to reduce shutdown edge cases
- Add cleanup of streamFailed and regWait channels on reconnect/disconnect
- Fixes registration timeout and potential stream lifecycle hangs on aggressive server max_age recycling

* fix: prevent deadlock when stream error occurs - make cmds send non-blocking

If managerLoop is blocked (e.g., waiting on regWait), a blocking send to cmds
will deadlock handleIncoming. Make the send non-blocking to prevent this.

* fix: address code review comments on mini.go port allocation

- Remove flawed fallback gRPC port initialization and convert to fatal error
  (ensures port initialization issues are caught immediately instead of silently
  failing with an empty reserved ports map)
- Extract common port validation logic to eliminate duplication between
  calculated and explicitly set gRPC port handling

* Fix critical race condition and improve error handling in worker client

- Capture channel pointers before checking for nil (prevents TOCTOU race with reconnect)
- Use async fallback goroutine for cmds send to prevent error loss when manager is busy
- Consistently close regWait channel on disconnect (matches streamFailed behavior)
- Complete cleanup of channels on failed registration
- Improve error messages for clarity (replace 'timeout' with 'failed' where appropriate)

* Add debug logging for registration response routing

Add glog.V(3) and glog.V(2) logs to track successful and dropped registration
responses in handleIncoming, helping diagnose registration issues in production.

* Update weed/worker/client.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Ensure stream errors are never lost by using async fallback

When handleIncoming detects a stream error, queue ActionStreamError to managerLoop
with non-blocking send. If managerLoop is busy and cmds channel is full, spawn an
async goroutine to queue the error asynchronously. This ensures the manager is
always notified of stream failures, preventing the connection from remaining in an
inconsistent state (connected=true while stream is dead).

* Refactor handleOutgoing to eliminate duplicate error handling code

Extract error handling and cleanup logic into helper functions to avoid duplication
in nested select statements. This improves maintainability and reduces the risk of
inconsistencies when updating error handling logic.

* Prevent goroutine leaks by adding timeouts to blocking cmds sends

Add 2-second timeouts to both handleStreamError and the async fallback goroutine
when sending ActionStreamError to cmds channel. This prevents the handleOutgoing
and handleIncoming goroutines from blocking indefinitely if the managerLoop is
no longer receiving (e.g., during shutdown), preventing resource leaks.

* Properly close regWait channel in reconnect to prevent resource leaks

Close the regWait channel before setting it to nil in reconnect(), matching the
pattern used in handleDisconnect(). This ensures any goroutines waiting on this
channel during reconnection are properly signaled, preventing them from hanging.

* Use non-blocking async pattern in handleOutgoing error reporting

Refactor handleStreamError to use non-blocking send with async fallback goroutine,
matching the pattern used in handleIncoming. This allows handleOutgoing to exit
immediately when errors occur rather than blocking for up to 2 seconds, improving
responsiveness and consistency across handlers.

* fix: drain regWait channel before closing to prevent message loss

- Add drain loop before closing regWait in reconnect() cleanup
- Add drain loop before closing regWait in handleDisconnect() cleanup
- Ensures no pending RegistrationResponse messages are lost during channel closure

* docs: add comments explaining regWait buffered channel design

- Document that regWait buffer size 1 prevents race conditions
- Explain non-blocking send pattern between sendRegistration and handleIncoming
- Clarify timing of registration response handling in handleIncoming

* fix: improve error messages and channel handling in sendRegistration

- Clarify error message when stream fails before registration sent
- Use two-value receive form to properly detect closed channels
- Better distinguish between closed channel and nil value scenarios

* refactor: extract drain and close channel logic into helper function

- Create drainAndCloseRegWaitChannel() helper to eliminate code duplication
- Replace 3 copies of drain-and-close logic with single function call
- Improves maintainability and consistency across cleanup paths

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
pull/7852/head
Chris Lu 17 hours ago
committed by GitHub
parent
commit
14df5d1bb5
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 67
      weed/command/mini.go
  2. 173
      weed/worker/client.go

67
weed/command/mini.go

@ -517,39 +517,26 @@ func initializeGrpcPortsOnIP(bindIp string) {
continue
}
// If gRPC port is 0, calculate it
// If gRPC port is 0, calculate it from HTTP port
if *config.grpcPort == 0 {
calculatedPort := *config.httpPort + GrpcPortOffset
// Check if calculated port is available (on both specific IP and all interfaces)
// Also check if it was already allocated to another service in this function
if !isPortOpenOnIP(bindIp, calculatedPort) || !isPortAvailable(calculatedPort) || allocatedGrpcPorts[calculatedPort] {
glog.Warningf("Calculated gRPC port %d for %s is not available, finding alternative...", calculatedPort, config.name)
newPort := findAvailablePortOnIP(bindIp, calculatedPort+1, 100, allocatedGrpcPorts)
if newPort == 0 {
glog.Errorf("Could not find available gRPC port for %s starting from %d, will use calculated %d and fail on binding", config.name, calculatedPort+1, calculatedPort)
} else {
calculatedPort = newPort
glog.Infof("gRPC port %d for %s is available, using it instead of calculated %d", newPort, config.name, *config.httpPort+GrpcPortOffset)
}
}
*config.grpcPort = calculatedPort
allocatedGrpcPorts[calculatedPort] = true
glog.V(1).Infof("%s gRPC port initialized to %d", config.name, calculatedPort)
} else {
// gRPC port was explicitly set, verify it's still available (check on both specific IP and all interfaces)
// Also check if it was already allocated to another service in this function
if !isPortOpenOnIP(bindIp, *config.grpcPort) || !isPortAvailable(*config.grpcPort) || allocatedGrpcPorts[*config.grpcPort] {
glog.Warningf("Explicitly set gRPC port %d for %s is not available, finding alternative...", *config.grpcPort, config.name)
newPort := findAvailablePortOnIP(bindIp, *config.grpcPort+1, 100, allocatedGrpcPorts)
if newPort == 0 {
glog.Errorf("Could not find available gRPC port for %s starting from %d, will use original %d and fail on binding", config.name, *config.grpcPort+1, *config.grpcPort)
} else {
glog.Infof("gRPC port %d for %s is available, using it instead of %d", newPort, config.name, *config.grpcPort)
*config.grpcPort = newPort
}
*config.grpcPort = *config.httpPort + GrpcPortOffset
}
// Verify the gRPC port is available (whether calculated or explicitly set)
// Check on both specific IP and all interfaces, and check against already allocated ports
if !isPortOpenOnIP(bindIp, *config.grpcPort) || !isPortAvailable(*config.grpcPort) || allocatedGrpcPorts[*config.grpcPort] {
glog.Warningf("gRPC port %d for %s is not available, finding alternative...", *config.grpcPort, config.name)
originalPort := *config.grpcPort
newPort := findAvailablePortOnIP(bindIp, originalPort+1, 100, allocatedGrpcPorts)
if newPort == 0 {
glog.Errorf("Could not find available gRPC port for %s starting from %d, will use %d and fail on binding", config.name, originalPort+1, originalPort)
} else {
glog.Infof("gRPC port %d for %s is available, using it instead of %d", newPort, config.name, originalPort)
*config.grpcPort = newPort
}
allocatedGrpcPorts[*config.grpcPort] = true
}
allocatedGrpcPorts[*config.grpcPort] = true
glog.V(1).Infof("%s gRPC port set to %d", config.name, *config.grpcPort)
}
}
@ -934,26 +921,8 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) {
// gRPC port should have been initialized by ensureAllPortsAvailableOnIP in runMini
// If it's still 0, that indicates a problem with the port initialization sequence
// This defensive fallback handles edge cases where port initialization may have been skipped
// or failed silently (e.g., due to configuration changes or error handling paths)
if *miniAdminOptions.grpcPort == 0 {
glog.Warningf("Admin gRPC port was not initialized before startAdminServer, attempting fallback initialization...")
// Use the same availability checking logic as initializeGrpcPortsOnIP
calculatedPort := *miniAdminOptions.port + GrpcPortOffset
if !isPortOpenOnIP(getBindIp(), calculatedPort) || !isPortAvailable(calculatedPort) {
glog.Warningf("Calculated fallback gRPC port %d is not available, finding alternative...", calculatedPort)
newPort := findAvailablePortOnIP(getBindIp(), calculatedPort+1, 100, make(map[int]bool))
if newPort == 0 {
glog.Errorf("Could not find available gRPC port for Admin starting from %d, will use calculated %d and fail on binding", calculatedPort+1, calculatedPort)
*miniAdminOptions.grpcPort = calculatedPort
} else {
glog.Infof("Fallback: using gRPC port %d for Admin", newPort)
*miniAdminOptions.grpcPort = newPort
}
} else {
*miniAdminOptions.grpcPort = calculatedPort
glog.Infof("Fallback: Admin gRPC port initialized to %d", calculatedPort)
}
glog.Fatalf("Admin gRPC port was not initialized before startAdminServer. This indicates a problem with the port initialization sequence.")
}
// Create data directory if specified

173
weed/worker/client.go

@ -74,6 +74,8 @@ type grpcState struct {
lastWorkerInfo *types.WorkerData
reconnectStop chan struct{}
streamExit chan struct{}
streamFailed chan struct{} // Signals when stream has failed
regWait chan *worker_pb.RegistrationResponse
}
// NewGrpcAdminClient creates a new gRPC admin client
@ -98,6 +100,25 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di
return c
}
// drainAndCloseRegWaitChannel drains any pending messages from the regWait channel
// and then safely closes it. This prevents losing RegistrationResponse messages
// that were sent before the channel is closed.
func drainAndCloseRegWaitChannel(ch *chan *worker_pb.RegistrationResponse) {
if ch == nil || *ch == nil {
return
}
for {
select {
case <-*ch:
// continue draining until channel is empty
default:
close(*ch)
*ch = nil
return
}
}
}
// safeCloseChannel safely closes a channel and sets it to nil to prevent double-close panics.
// NOTE: This function is NOT thread-safe. It is safe to use in this codebase because all calls
// are serialized within the managerLoop goroutine. If this function is used in concurrent contexts
@ -140,7 +161,14 @@ out:
req.Resp <- nil
continue
}
err := c.sendRegistration(req.Worker)
// Capture channel pointers to avoid race condition with reconnect
streamFailedCh := state.streamFailed
regWaitCh := state.regWait
if streamFailedCh == nil || regWaitCh == nil {
req.Resp <- fmt.Errorf("stream not ready for registration")
continue
}
err := c.sendRegistration(req.Worker, streamFailedCh, regWaitCh)
req.Resp <- err
case ActionQueryConnected:
respCh := cmd.data.(chan bool)
@ -225,14 +253,18 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
// Start stream handlers BEFORE sending registration
// This ensures handleIncoming is ready to receive the registration response
s.streamExit = make(chan struct{})
s.streamFailed = make(chan struct{})
s.regWait = make(chan *worker_pb.RegistrationResponse, 1)
go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds)
go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds)
go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds, s.streamFailed, s.regWait)
// Always check for worker info and send registration immediately as the very first message
if s.lastWorkerInfo != nil {
// Send registration via the normal outgoing channel and wait for response via incoming
if err := c.sendRegistration(s.lastWorkerInfo); err != nil {
if err := c.sendRegistration(s.lastWorkerInfo, s.streamFailed, s.regWait); err != nil {
c.safeCloseChannel(&s.streamExit)
c.safeCloseChannel(&s.streamFailed)
drainAndCloseRegWaitChannel(&s.regWait)
s.streamCancel()
s.conn.Close()
s.connected = false
@ -252,6 +284,8 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
func (c *GrpcAdminClient) reconnect(s *grpcState) error {
// Clean up existing connection completely
c.safeCloseChannel(&s.streamExit)
c.safeCloseChannel(&s.streamFailed)
drainAndCloseRegWaitChannel(&s.regWait)
if s.streamCancel != nil {
s.streamCancel()
}
@ -324,32 +358,70 @@ func handleOutgoing(
streamExit <-chan struct{},
outgoing <-chan *worker_pb.WorkerMessage,
cmds chan<- grpcCommand) {
msgCh := make(chan *worker_pb.WorkerMessage)
msgCh := make(chan *worker_pb.WorkerMessage, 1)
errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy
// Goroutine to handle blocking stream.Recv() and simultaneously handle exit
// signals
// Goroutine that reads from msgCh and performs the blocking stream.Send() calls.
go func() {
for msg := range msgCh {
if err := stream.Send(msg); err != nil {
errCh <- err
return // Exit the receiver goroutine on error/EOF
return
}
}
close(errCh)
}()
for msg := range outgoing {
select {
case msgCh <- msg:
case err := <-errCh:
// Helper function to handle stream errors and cleanup
handleStreamError := func(err error) {
if err != nil {
glog.Errorf("Failed to send message to admin: %v", err)
cmds <- grpcCommand{action: ActionStreamError, data: err}
return
select {
case cmds <- grpcCommand{action: ActionStreamError, data: err}:
// Successfully queued
default:
// Manager busy, queue asynchronously to avoid blocking
glog.V(2).Infof("Manager busy, queuing stream error asynchronously from outgoing handler: %v", err)
go func(e error) {
select {
case cmds <- grpcCommand{action: ActionStreamError, data: e}:
case <-time.After(2 * time.Second):
glog.Warningf("Failed to send stream error to manager from outgoing handler, channel blocked: %v", e)
}
}(err)
}
}
}
// Helper function to cleanup resources
cleanup := func() {
close(msgCh)
<-errCh
}
for {
select {
case <-streamExit:
close(msgCh)
<-errCh
cleanup()
return
case err := <-errCh:
handleStreamError(err)
return
case msg, ok := <-outgoing:
if !ok {
cleanup()
return
}
select {
case msgCh <- msg:
// Message queued successfully
case <-streamExit:
cleanup()
return
case err := <-errCh:
handleStreamError(err)
return
}
}
}
}
@ -360,10 +432,15 @@ func handleIncoming(
stream worker_pb.WorkerService_WorkerStreamClient,
streamExit <-chan struct{},
incoming chan<- *worker_pb.AdminMessage,
cmds chan<- grpcCommand) {
cmds chan<- grpcCommand,
streamFailed chan<- struct{},
regWait chan<- *worker_pb.RegistrationResponse) {
glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", workerID)
msgCh := make(chan *worker_pb.AdminMessage)
errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy
// regWait is buffered with size 1 so that the registration response can be sent
// even if the receiver goroutine has not yet started waiting on the channel.
// This non-blocking send pattern avoids a race between sendRegistration and handleIncoming.
// Goroutine to handle blocking stream.Recv() and simultaneously handle exit
// signals
go func() {
@ -385,7 +462,19 @@ func handleIncoming(
// Message successfully received from the stream
glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", workerID, msg.Message)
// Route message to waiting goroutines or general handler (original select logic)
// If this is a registration response, also publish to the registration waiter.
// regWait is buffered (size 1) so that the response can be sent even if sendRegistration
// hasn't started waiting yet, preventing a race condition between the two goroutines.
if rr := msg.GetRegistrationResponse(); rr != nil {
select {
case regWait <- rr:
glog.V(3).Infof("REGISTRATION RESPONSE: Worker %s routed registration response to waiter", workerID)
default:
glog.V(2).Infof("REGISTRATION RESPONSE DROPPED: Worker %s registration response dropped (no waiter)", workerID)
}
}
// Route message to general handler.
select {
case incoming <- msg:
glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", workerID)
@ -401,8 +490,27 @@ func handleIncoming(
glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", workerID, err)
}
// Report the failure as a command to the managerLoop (blocking)
cmds <- grpcCommand{action: ActionStreamError, data: err}
// Signal that stream has failed (non-blocking)
select {
case streamFailed <- struct{}{}:
default:
}
// Report the failure as a command to the managerLoop.
// Try non-blocking first; if the manager is busy and the channel is full,
// fall back to an asynchronous blocking send so the error is not lost.
select {
case cmds <- grpcCommand{action: ActionStreamError, data: err}:
default:
glog.V(2).Infof("Manager busy, queuing stream error asynchronously: %v", err)
go func(e error) {
select {
case cmds <- grpcCommand{action: ActionStreamError, data: e}:
case <-time.After(2 * time.Second):
glog.Warningf("Failed to send stream error to manager, channel blocked: %v", e)
}
}(err)
}
// Exit the main handler loop
glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler due to stream error", workerID)
@ -460,6 +568,8 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) {
// Send shutdown signal to stop handlers loop
c.safeCloseChannel(&s.streamExit)
c.safeCloseChannel(&s.streamFailed)
drainAndCloseRegWaitChannel(&s.regWait)
// Cancel stream context
if s.streamCancel != nil {
@ -495,7 +605,7 @@ func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error {
}
// sendRegistration sends the registration message and waits for response
func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error {
func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData, streamFailed <-chan struct{}, regWait <-chan *worker_pb.RegistrationResponse) error {
capabilities := make([]string, len(worker.Capabilities))
for i, cap := range worker.Capabilities {
capabilities[i] = string(cap)
@ -519,6 +629,8 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error {
case c.outgoing <- msg:
case <-time.After(5 * time.Second):
return fmt.Errorf("failed to send registration message: timeout")
case <-streamFailed:
return fmt.Errorf("stream failed before registration message could be sent")
}
// Wait for registration response
@ -528,16 +640,19 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error {
for {
select {
case response := <-c.incoming:
if regResp := response.GetRegistrationResponse(); regResp != nil {
if regResp.Success {
glog.Infof("Worker registered successfully: %s", regResp.Message)
return nil
}
return fmt.Errorf("registration failed: %s", regResp.Message)
case regResp, ok := <-regWait:
if !ok || regResp == nil {
return fmt.Errorf("registration failed: channel closed unexpectedly")
}
if regResp.Success {
glog.Infof("Worker registered successfully: %s", regResp.Message)
return nil
}
return fmt.Errorf("registration failed: %s", regResp.Message)
case <-streamFailed:
return fmt.Errorf("registration failed: stream closed by server")
case <-timeout.C:
return fmt.Errorf("registration timeout")
return fmt.Errorf("registration failed: timeout waiting for response")
}
}
}

Loading…
Cancel
Save