Browse Source

Fix worker reconnection race condition causing context canceled errors (#7825)

* Fix worker reconnection race condition causing context canceled errors

Fixes #7824

This commit fixes critical connection stability issues between admin server
and workers that manifested as rapid reconnection cycles with 'context canceled'
errors, particularly after 24+ hours of operation in containerized environments.

Root Cause:
-----------
Race condition where TWO goroutines were calling stream.Recv() on the same
gRPC bidirectional stream concurrently:

1. sendRegistrationSync() started a goroutine that calls stream.Recv()
2. handleIncoming() also calls stream.Recv() in a loop

Per gRPC specification, only ONE goroutine can call Recv() on a stream at a
time. Concurrent Recv() calls cause undefined behavior, manifesting as
'context canceled' errors and stream corruption.

The race occurred during worker reconnection:
- Sometimes sendRegistrationSync goroutine read the registration response first (success)
- Sometimes handleIncoming read it first, causing sendRegistrationSync to timeout
- This left the stream in an inconsistent state, triggering 'context canceled' error
- The error triggered rapid reconnection attempts, creating a reconnection storm

Why it happened after 24 hours:
Container orchestration systems (Docker Swarm/Kubernetes) periodically restart
pods. Over time, workers reconnect multiple times. Each reconnection had a chance
of hitting the race condition. Eventually the race manifested and caused the
connection storm.

Changes:
--------

weed/worker/client.go:
- Start handleIncoming and handleOutgoing goroutines BEFORE sending registration
- Use sendRegistration() instead of sendRegistrationSync()
- Ensures only ONE goroutine (handleIncoming) calls stream.Recv()
- Eliminates race condition entirely

weed/admin/dash/worker_grpc_server.go:
- Clean up old connection when worker reconnects with same ID
- Cancel old connection context to stop its goroutines
- Prevents resource leaks and stale connection accumulation

Impact:
-------
Before: Random 'context canceled' errors during reconnection, rapid reconnection
        cycles, resource leaks, requires manual restart to recover
After:  Reliable reconnection, single Recv() goroutine, proper cleanup,
        stable operation over 24+ hours

Testing:
--------
Build verified successful with no compilation errors.

How to reproduce the bug:
1. Start admin server and worker
2. Restart admin server (simulates container recreation)
3. Worker reconnects
4. Race condition may manifest, causing 'context canceled' error
5. Observe rapid reconnection cycles in logs

The fix is backward compatible and requires no configuration changes.

* Add MaxConnectionAge to gRPC server for Docker Swarm DNS handling

- Configure MaxConnectionAge and MaxConnectionAgeGrace for gRPC server
- Expand error detection in shouldInvalidateConnection for better cache invalidation
- Add connection lifecycle logging for debugging

* Add topology validation and nil-safety checks

- Add validation guards in UpdateTopology to prevent invalid updates
- Add nil-safety checks in rebuildIndexes
- Add GetDiskCount method for diagnostic purposes

* Fix worker registration race condition

- Reorder goroutine startup in WorkerStream to prevent race conditions
- Add defensive cleanup in unregisterWorker with panic-safe channel closing

* Add comprehensive topology update logging

- Enhance UpdateTopologyInfo with detailed logging of datacenter/node/disk counts
- Add metrics logging for topology changes

* Add periodic diagnostic status logging

- Implement topologyStatusLoop running every 5 minutes
- Add logTopologyStatus function reporting system metrics
- Run as background goroutine in maintenance manager

* Enhance master client connection logging

- Add connection timing logs in tryConnectToMaster
- Add reconnection attempt counting in KeepConnectedToMaster
- Improve diagnostic visibility for connection issues

* Remove unused sendRegistrationSync function

- Function is no longer called after switching to asynchronous sendRegistration
- Contains the problematic concurrent stream.Recv() pattern that caused race conditions
- Cleanup as suggested in PR review

* Clarify comment for channel closing during disconnection

- Improve comment to explain why channels are closed and their effect
- Make the code more self-documenting as suggested in PR review

* Address code review feedback: refactor and improvements

- Extract topology counting logic to shared helper function
  CountTopologyResources() to eliminate duplication between
  topology_management.go and maintenance_integration.go

- Use gRPC status codes for more robust error detection in
  shouldInvalidateConnection(), falling back to string matching
  for transport-level errors

- Add recover wrapper for channel close consistency in
  cleanupStaleConnections() to match unregisterWorker() pattern

* Update grpc_client_server.go

* Fix data race on lastSeen field access

- Add mutex protection around conn.lastSeen = time.Now() in WorkerStream method
- Ensures thread-safe access consistent with cleanupStaleConnections

* Fix goroutine leaks in worker reconnection logic

- Close streamExit in reconnect() before creating new connection
- Close streamExit in attemptConnection() when sendRegistration fails
- Prevents orphaned handleOutgoing/handleIncoming goroutines from previous connections
- Ensures proper cleanup of goroutines competing for shared outgoing channel

* Minor cleanup improvements for consistency and clarity

- Remove redundant string checks in shouldInvalidateConnection that overlap with gRPC status codes
- Add recover block to Stop() method for consistency with other channel close operations
- Maintains valuable DNS and transport-specific error detection while eliminating redundancy

* Improve topology update error handling

- Return descriptive errors instead of silently preserving topology for invalid updates
- Change nil topologyInfo case to return 'rejected invalid topology update: nil topologyInfo'
- Change empty DataCenterInfos case to return 'rejected invalid topology update: empty DataCenterInfos (had X nodes, Y disks)'
- Keep existing glog.Warningf calls but append error details to logs before returning errors
- Allows callers to distinguish rejected updates and handle them appropriately

* Refactor safe channel closing into helper method

- Add safeCloseOutgoingChannel helper method to eliminate code duplication
- Replace repeated recover blocks in Stop, unregisterWorker, and cleanupStaleConnections
- Improves maintainability and ensures consistent error handling across all channel close operations
- Maintains same panic recovery behavior with contextual source identification

* Make connection invalidation string matching case-insensitive

- Convert error string to lowercase once for all string.Contains checks
- Improves robustness by catching error message variations from different sources
- Eliminates need for separate 'DNS resolution' and 'dns' checks
- Maintains same error detection coverage with better reliability

* Clean up warning logs in UpdateTopology to avoid duplicating error text

- Remove duplicated error phrases from glog.Warningf messages
- Keep concise contextual warnings that don't repeat the fmt.Errorf content
- Maintain same error returns for backward compatibility

* Add robust validation to prevent topology wipeout during master restart

- Reject topology updates with 0 nodes when current topology has nodes
- Prevents transient empty topology from overwriting valid state
- Improves resilience during master restart scenarios
- Maintains backward compatibility for legitimate empty topology updates
pull/6224/merge
Chris Lu 24 hours ago
committed by GitHub
parent
commit
5b86d33c3c
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 54
      weed/admin/dash/worker_grpc_server.go
  2. 21
      weed/admin/maintenance/maintenance_integration.go
  3. 49
      weed/admin/maintenance/maintenance_manager.go
  4. 55
      weed/admin/topology/topology_management.go
  5. 48
      weed/pb/grpc_client_server.go
  6. 12
      weed/wdclient/masterclient.go
  7. 93
      weed/worker/client.go

54
weed/admin/dash/worker_grpc_server.go

@ -117,7 +117,7 @@ func (s *WorkerGrpcServer) Stop() error {
s.connMutex.Lock()
for _, conn := range s.connections {
conn.cancel()
close(conn.outgoing)
s.safeCloseOutgoingChannel(conn, "Stop")
}
s.connections = make(map[string]*WorkerConnection)
s.connMutex.Unlock()
@ -182,15 +182,27 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr
}
conn.capabilities = capabilities
// Register connection
// Register connection - clean up old connection if worker is reconnecting
s.connMutex.Lock()
if oldConn, exists := s.connections[workerID]; exists {
glog.Infof("Worker %s reconnected, cleaning up old connection", workerID)
// Cancel old connection to stop its goroutines
oldConn.cancel()
// Don't close oldConn.outgoing here as it may cause panic in handleOutgoingMessages
// Let the goroutine exit naturally when it detects context cancellation
}
s.connections[workerID] = conn
s.connMutex.Unlock()
// Register worker with maintenance manager
s.registerWorkerWithManager(conn)
// Send registration response
// IMPORTANT: Start outgoing message handler BEFORE sending registration response
// This ensures the handler is ready to process messages and prevents race conditions
// where the worker might send requests before we're ready to respond
go s.handleOutgoingMessages(conn)
// Send registration response (after handler is started)
regResponse := &worker_pb.AdminMessage{
Timestamp: time.Now().Unix(),
Message: &worker_pb.AdminMessage_RegistrationResponse{
@ -203,13 +215,11 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr
select {
case conn.outgoing <- regResponse:
glog.V(1).Infof("Registration response sent to worker %s", workerID)
case <-time.After(5 * time.Second):
glog.Errorf("Failed to send registration response to worker %s", workerID)
}
// Start outgoing message handler
go s.handleOutgoingMessages(conn)
// Handle incoming messages
for {
select {
@ -235,7 +245,9 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr
return err
}
s.connMutex.Lock()
conn.lastSeen = time.Now()
s.connMutex.Unlock()
s.handleWorkerMessage(conn, msg)
}
}
@ -440,16 +452,36 @@ func (s *WorkerGrpcServer) handleTaskLogResponse(conn *WorkerConnection, respons
s.logRequestsMutex.Unlock()
}
// safeCloseOutgoingChannel safely closes the outgoing channel for a worker connection.
func (s *WorkerGrpcServer) safeCloseOutgoingChannel(conn *WorkerConnection, source string) {
defer func() {
if r := recover(); r != nil {
glog.V(1).Infof("%s: recovered from panic closing outgoing channel for worker %s: %v", source, conn.workerID, r)
}
}()
close(conn.outgoing)
}
// unregisterWorker removes a worker connection
func (s *WorkerGrpcServer) unregisterWorker(workerID string) {
s.connMutex.Lock()
if conn, exists := s.connections[workerID]; exists {
conn.cancel()
close(conn.outgoing)
delete(s.connections, workerID)
conn, exists := s.connections[workerID]
if !exists {
s.connMutex.Unlock()
glog.V(2).Infof("unregisterWorker: worker %s not found in connections map (already unregistered)", workerID)
return
}
// Remove from map first to prevent duplicate cleanup attempts
delete(s.connections, workerID)
s.connMutex.Unlock()
// Cancel context to signal goroutines to stop
conn.cancel()
// Safely close the outgoing channel with recover to handle potential double-close
s.safeCloseOutgoingChannel(conn, "unregisterWorker")
glog.V(1).Infof("Unregistered worker %s", workerID)
}
@ -479,7 +511,7 @@ func (s *WorkerGrpcServer) cleanupStaleConnections() {
if conn.lastSeen.Before(cutoff) {
glog.Warningf("Cleaning up stale worker connection: %s", workerID)
conn.cancel()
close(conn.outgoing)
s.safeCloseOutgoingChannel(conn, "cleanupStaleConnections")
delete(s.connections, workerID)
}
}

21
weed/admin/maintenance/maintenance_integration.go

@ -266,7 +266,26 @@ func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.Vo
// UpdateTopologyInfo updates the volume shard tracker with topology information for empty servers
func (s *MaintenanceIntegration) UpdateTopologyInfo(topologyInfo *master_pb.TopologyInfo) error {
return s.activeTopology.UpdateTopology(topologyInfo)
// Log topology details before update for diagnostics
if topologyInfo != nil {
dcCount, nodeCount, diskCount := topology.CountTopologyResources(topologyInfo)
glog.V(2).Infof("UpdateTopologyInfo: received topology with %d datacenters, %d nodes, %d disks",
dcCount, nodeCount, diskCount)
} else {
glog.Warningf("UpdateTopologyInfo: received nil topologyInfo")
}
err := s.activeTopology.UpdateTopology(topologyInfo)
if err != nil {
glog.Errorf("UpdateTopologyInfo: topology update failed: %v", err)
} else {
// Log success with current disk count
currentDiskCount := s.activeTopology.GetDiskCount()
glog.V(1).Infof("UpdateTopologyInfo: topology update successful, active topology now has %d disks", currentDiskCount)
}
return err
}
// convertToExistingFormat converts task results to existing system format using dynamic mapping

49
weed/admin/maintenance/maintenance_manager.go

@ -137,6 +137,7 @@ func (mm *MaintenanceManager) Start() error {
// Start background processes
go mm.scanLoop()
go mm.cleanupLoop()
go mm.topologyStatusLoop() // Periodic diagnostic logging
glog.Infof("Maintenance manager started with scan interval %ds", mm.config.ScanIntervalSeconds)
return nil
@ -255,6 +256,54 @@ func (mm *MaintenanceManager) cleanupLoop() {
}
}
// topologyStatusLoop periodically logs topology status for diagnostics
func (mm *MaintenanceManager) topologyStatusLoop() {
// Log topology status every 5 minutes for diagnostic purposes
statusInterval := 5 * time.Minute
ticker := time.NewTicker(statusInterval)
defer ticker.Stop()
for mm.running {
select {
case <-mm.stopChan:
return
case <-ticker.C:
mm.logTopologyStatus()
}
}
}
// logTopologyStatus logs current topology and worker status for diagnostics
func (mm *MaintenanceManager) logTopologyStatus() {
if mm.scanner == nil || mm.scanner.integration == nil {
glog.V(2).Infof("Topology status: scanner/integration not available")
return
}
activeTopology := mm.scanner.integration.GetActiveTopology()
if activeTopology == nil {
glog.V(1).Infof("Topology status: ActiveTopology is nil")
return
}
diskCount := activeTopology.GetDiskCount()
nodeCount := len(activeTopology.GetAllNodes())
// Get queue stats
stats := mm.queue.GetStats()
workerCount := len(mm.queue.GetWorkers())
mm.mutex.RLock()
errorCount := mm.errorCount
mm.mutex.RUnlock()
glog.V(0).Infof("Topology status: %d nodes, %d disks, %d workers, %d pending tasks, %d running tasks, errors: %d",
nodeCount, diskCount, workerCount,
stats.TasksByStatus[TaskStatusPending],
stats.TasksByStatus[TaskStatusInProgress]+stats.TasksByStatus[TaskStatusAssigned],
errorCount)
}
// performScan executes a maintenance scan with error handling and backoff
func (mm *MaintenanceManager) performScan() {
defer func() {

55
weed/admin/topology/topology_management.go

@ -8,11 +8,53 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// CountTopologyResources counts datacenters, nodes, and disks in topology info
func CountTopologyResources(topologyInfo *master_pb.TopologyInfo) (dcCount, nodeCount, diskCount int) {
if topologyInfo == nil {
return 0, 0, 0
}
dcCount = len(topologyInfo.DataCenterInfos)
for _, dc := range topologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
nodeCount += len(rack.DataNodeInfos)
for _, node := range rack.DataNodeInfos {
diskCount += len(node.DiskInfos)
}
}
}
return
}
// UpdateTopology updates the topology information from master
func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
at.mutex.Lock()
defer at.mutex.Unlock()
// Validate topology updates to prevent clearing disk maps with invalid data
if topologyInfo == nil {
glog.Warningf("UpdateTopology received nil topologyInfo, preserving last-known-good topology")
return fmt.Errorf("rejected invalid topology update: nil topologyInfo")
}
if len(topologyInfo.DataCenterInfos) == 0 {
glog.Warningf("UpdateTopology received empty DataCenterInfos, preserving last-known-good topology (had %d nodes, %d disks)",
len(at.nodes), len(at.disks))
return fmt.Errorf("rejected invalid topology update: empty DataCenterInfos (had %d nodes, %d disks)", len(at.nodes), len(at.disks))
}
// Count incoming topology for validation logging
dcCount, incomingNodes, incomingDisks := CountTopologyResources(topologyInfo)
// Reject updates that would wipe out a valid topology with an empty one (e.g. during master restart)
if incomingNodes == 0 && len(at.nodes) > 0 {
glog.Warningf("UpdateTopology received topology with 0 nodes, preserving last-known-good topology (had %d nodes, %d disks)",
len(at.nodes), len(at.disks))
return fmt.Errorf("rejected invalid topology update: 0 nodes (had %d nodes, %d disks)", len(at.nodes), len(at.disks))
}
glog.V(2).Infof("UpdateTopology: validating update with %d datacenters, %d nodes, %d disks (current: %d nodes, %d disks)",
dcCount, incomingNodes, incomingDisks, len(at.nodes), len(at.disks))
at.topologyInfo = topologyInfo
at.lastUpdated = time.Now()
@ -142,8 +184,21 @@ func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
return disks
}
// GetDiskCount returns the total number of disks in the active topology
func (at *ActiveTopology) GetDiskCount() int {
at.mutex.RLock()
defer at.mutex.RUnlock()
return len(at.disks)
}
// rebuildIndexes rebuilds the volume and EC shard indexes for O(1) lookups
func (at *ActiveTopology) rebuildIndexes() {
// Nil-safety guard: return early if topology is not valid
if at.topologyInfo == nil || at.topologyInfo.DataCenterInfos == nil {
glog.V(1).Infof("rebuildIndexes: skipping rebuild due to nil topology or DataCenterInfos")
return
}
// Clear existing indexes
at.volumeIndex = make(map[uint32][]string)
at.ecShardIndex = make(map[uint32][]string)

48
weed/pb/grpc_client_server.go

@ -19,7 +19,9 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@ -33,6 +35,11 @@ const (
// gRPC keepalive settings - must be consistent between client and server
GrpcKeepAliveTime = 60 * time.Second // ping interval when no activity
GrpcKeepAliveTimeout = 20 * time.Second // ping timeout
// Connection recycling for Docker Swarm environments
// Forces connections to be recycled periodically to handle DNS changes
GrpcMaxConnectionAge = 5 * time.Minute // max time a connection may exist
GrpcMaxConnectionAgeGrace = 30 * time.Second // grace period for RPCs to complete
)
var (
@ -56,9 +63,10 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
var options []grpc.ServerOption
options = append(options,
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: GrpcKeepAliveTime, // server pings client if no activity for this long
Timeout: GrpcKeepAliveTimeout, // ping timeout
// MaxConnectionAge: 10 * time.Hour,
Time: GrpcKeepAliveTime, // server pings client if no activity for this long
Timeout: GrpcKeepAliveTimeout, // ping timeout
MaxConnectionAge: GrpcMaxConnectionAge, // max connection age for Docker Swarm DNS refresh
MaxConnectionAgeGrace: GrpcMaxConnectionAgeGrace, // grace period for in-flight RPCs
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: GrpcKeepAliveTime, // min time a client should wait before sending a ping
@ -112,9 +120,11 @@ func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialO
existingConnection, found := grpcClients[address]
if found {
glog.V(3).Infof("gRPC cache hit for %s (version %d)", address, existingConnection.version)
return existingConnection, nil
}
glog.V(2).Infof("Creating new gRPC connection to %s", address)
ctx := context.Background()
grpcConnection, err := GrpcDial(ctx, address, waitForReady, opts...)
if err != nil {
@ -127,6 +137,7 @@ func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialO
0,
}
grpcClients[address] = vgc
glog.V(2).Infof("New gRPC connection established to %s (version %d)", address, vgc.version)
return vgc, nil
}
@ -163,6 +174,33 @@ func requestIDUnaryInterceptor() grpc.UnaryServerInterceptor {
}
}
// shouldInvalidateConnection checks if an error indicates the cached connection should be invalidated
func shouldInvalidateConnection(err error) bool {
if err == nil {
return false
}
// Check gRPC status codes first (more reliable)
if s, ok := status.FromError(err); ok {
code := s.Code()
switch code {
case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.Internal:
return true
}
}
// Fall back to string matching for transport-level errors not captured by gRPC codes
errStr := err.Error()
errLower := strings.ToLower(errStr)
return strings.Contains(errLower, "transport") ||
strings.Contains(errLower, "connection closed") ||
strings.Contains(errLower, "dns") ||
strings.Contains(errLower, "connection refused") ||
strings.Contains(errLower, "no route to host") ||
strings.Contains(errLower, "network is unreachable") ||
strings.Contains(errLower, "connection reset")
}
// WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection.
func WithGrpcClient(streamingMode bool, signature int32, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error {
@ -173,11 +211,11 @@ func WithGrpcClient(streamingMode bool, signature int32, fn func(*grpc.ClientCon
}
executionErr := fn(vgc.ClientConn)
if executionErr != nil {
if strings.Contains(executionErr.Error(), "transport") ||
strings.Contains(executionErr.Error(), "connection closed") {
if shouldInvalidateConnection(executionErr) {
grpcClientsLock.Lock()
if t, ok := grpcClients[address]; ok {
if t.version == vgc.version {
glog.V(1).Infof("Removing cached gRPC connection to %s due to error: %v", address, executionErr)
vgc.Close()
delete(grpcClients, address)
}

12
weed/wdclient/masterclient.go

@ -165,6 +165,7 @@ func (mc *MasterClient) tryAllMasters(ctx context.Context) {
func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master)
stats.MasterClientConnectCounter.WithLabelValues("total").Inc()
connectStartTime := time.Now()
gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -175,6 +176,7 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToKeepConnected).Inc()
return err
}
glog.V(0).Infof("%s.%s masterClient gRPC stream established to %s in %v", mc.FilerGroup, mc.clientType, master, time.Since(connectStartTime))
if err = stream.Send(&master_pb.KeepConnectedRequest{
FilerGroup: mc.FilerGroup,
@ -380,14 +382,22 @@ func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
}
func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
glog.V(0).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
reconnectCount := 0
for {
select {
case <-ctx.Done():
glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
return
default:
reconnectStart := time.Now()
if reconnectCount > 0 {
glog.V(0).Infof("%s.%s masterClient reconnection attempt #%d", mc.FilerGroup, mc.clientType, reconnectCount)
}
mc.tryAllMasters(ctx)
reconnectCount++
glog.V(1).Infof("%s.%s masterClient connection cycle completed in %v, sleeping before retry",
mc.FilerGroup, mc.clientType, time.Since(reconnectStart))
time.Sleep(time.Second)
}
}

93
weed/worker/client.go

@ -211,10 +211,18 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
s.connected = true
s.stream = stream
// Start stream handlers BEFORE sending registration
// This ensures handleIncoming is ready to receive the registration response
s.streamExit = make(chan struct{})
go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds)
go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds)
// Always check for worker info and send registration immediately as the very first message
if s.lastWorkerInfo != nil {
// Send registration synchronously as the very first message
if err := c.sendRegistrationSync(s.lastWorkerInfo, s.stream); err != nil {
// Send registration via the normal outgoing channel and wait for response via incoming
if err := c.sendRegistration(s.lastWorkerInfo); err != nil {
close(s.streamExit)
s.streamCancel()
s.conn.Close()
s.connected = false
return fmt.Errorf("failed to register worker: %w", err)
@ -225,11 +233,6 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
glog.V(1).Infof("Connected to admin server, waiting for worker registration info")
}
// Start stream handlers
s.streamExit = make(chan struct{})
go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds)
go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds)
glog.Infof("Connected to admin server at %s", c.adminAddress)
return nil
}
@ -237,6 +240,9 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
// reconnect attempts to re-establish the connection
func (c *GrpcAdminClient) reconnect(s *grpcState) error {
// Clean up existing connection completely
if s.streamExit != nil {
close(s.streamExit)
}
if s.streamCancel != nil {
s.streamCancel()
}
@ -456,7 +462,8 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) {
s.conn.Close()
}
// Close channels
// Close channels to signal all goroutines to stop
// This will cause any pending sends/receives to fail gracefully
close(c.outgoing)
close(c.incoming)
@ -525,76 +532,6 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error {
}
}
// sendRegistrationSync sends the registration message synchronously
func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData, stream worker_pb.WorkerService_WorkerStreamClient) error {
capabilities := make([]string, len(worker.Capabilities))
for i, cap := range worker.Capabilities {
capabilities[i] = string(cap)
}
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_Registration{
Registration: &worker_pb.WorkerRegistration{
WorkerId: c.workerID,
Address: worker.Address,
Capabilities: capabilities,
MaxConcurrent: int32(worker.MaxConcurrent),
Metadata: make(map[string]string),
},
},
}
// Send directly to stream to ensure it's the first message
if err := stream.Send(msg); err != nil {
return fmt.Errorf("failed to send registration message: %w", err)
}
// Create a channel to receive the response
responseChan := make(chan *worker_pb.AdminMessage, 1)
errChan := make(chan error, 1)
// Start a goroutine to listen for the response
go func() {
for {
response, err := stream.Recv()
if err != nil {
errChan <- fmt.Errorf("failed to receive registration response: %w", err)
return
}
if regResp := response.GetRegistrationResponse(); regResp != nil {
responseChan <- response
return
}
// Continue waiting if it's not a registration response
// If stream is stuck, reconnect() will kill it, cleaning up this
// goroutine
}
}()
// Wait for registration response with timeout
timeout := time.NewTimer(10 * time.Second)
defer timeout.Stop()
select {
case response := <-responseChan:
if regResp := response.GetRegistrationResponse(); regResp != nil {
if regResp.Success {
glog.V(1).Infof("Worker registered successfully: %s", regResp.Message)
return nil
}
return fmt.Errorf("registration failed: %s", regResp.Message)
}
return fmt.Errorf("unexpected response type")
case err := <-errChan:
return err
case <-timeout.C:
return fmt.Errorf("registration timeout")
}
}
func (c *GrpcAdminClient) IsConnected() bool {
respCh := make(chan bool, 1)

Loading…
Cancel
Save