Browse Source

Fix critical race condition and improve error handling in worker client

- Capture channel pointers before checking for nil (prevents TOCTOU race with reconnect)
- Use async fallback goroutine for cmds send to prevent error loss when manager is busy
- Consistently close regWait channel on disconnect (matches streamFailed behavior)
- Complete cleanup of channels on failed registration
- Improve error messages for clarity (replace 'timeout' with 'failed' where appropriate)
pull/7838/head
Chris Lu 2 months ago
parent
commit
5e258a5ce7
  1. 23
      weed/worker/client.go

23
weed/worker/client.go

@ -142,11 +142,14 @@ out:
req.Resp <- nil req.Resp <- nil
continue continue
} }
if state.streamFailed == nil || state.regWait == nil {
// Capture channel pointers to avoid race condition with reconnect
streamFailedCh := state.streamFailed
regWaitCh := state.regWait
if streamFailedCh == nil || regWaitCh == nil {
req.Resp <- fmt.Errorf("stream not ready for registration") req.Resp <- fmt.Errorf("stream not ready for registration")
continue continue
} }
err := c.sendRegistration(req.Worker, state.streamFailed, state.regWait)
err := c.sendRegistration(req.Worker, streamFailedCh, regWaitCh)
req.Resp <- err req.Resp <- err
case ActionQueryConnected: case ActionQueryConnected:
respCh := cmd.data.(chan bool) respCh := cmd.data.(chan bool)
@ -241,6 +244,11 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
// Send registration via the normal outgoing channel and wait for response via incoming // Send registration via the normal outgoing channel and wait for response via incoming
if err := c.sendRegistration(s.lastWorkerInfo, s.streamFailed, s.regWait); err != nil { if err := c.sendRegistration(s.lastWorkerInfo, s.streamFailed, s.regWait); err != nil {
c.safeCloseChannel(&s.streamExit) c.safeCloseChannel(&s.streamExit)
c.safeCloseChannel(&s.streamFailed)
if s.regWait != nil {
close(s.regWait)
s.regWait = nil
}
s.streamCancel() s.streamCancel()
s.conn.Close() s.conn.Close()
s.connected = false s.connected = false
@ -510,7 +518,10 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) {
// Send shutdown signal to stop handlers loop // Send shutdown signal to stop handlers loop
c.safeCloseChannel(&s.streamExit) c.safeCloseChannel(&s.streamExit)
c.safeCloseChannel(&s.streamFailed) c.safeCloseChannel(&s.streamFailed)
s.regWait = nil
if s.regWait != nil {
close(s.regWait)
s.regWait = nil
}
// Cancel stream context // Cancel stream context
if s.streamCancel != nil { if s.streamCancel != nil {
@ -583,7 +594,7 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData, streamFaile
select { select {
case regResp := <-regWait: case regResp := <-regWait:
if regResp == nil { if regResp == nil {
return fmt.Errorf("registration timeout: registration channel closed")
return fmt.Errorf("registration failed: channel closed unexpectedly")
} }
if regResp.Success { if regResp.Success {
glog.Infof("Worker registered successfully: %s", regResp.Message) glog.Infof("Worker registered successfully: %s", regResp.Message)
@ -591,9 +602,9 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData, streamFaile
} }
return fmt.Errorf("registration failed: %s", regResp.Message) return fmt.Errorf("registration failed: %s", regResp.Message)
case <-streamFailed: case <-streamFailed:
return fmt.Errorf("registration timeout: stream closed by server")
return fmt.Errorf("registration failed: stream closed by server")
case <-timeout.C: case <-timeout.C:
return fmt.Errorf("registration timeout")
return fmt.Errorf("registration failed: timeout waiting for response")
} }
} }
} }

Loading…
Cancel
Save