From 31a4f57cd9c3211c863b70762207e15545c7366c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 31 Dec 2025 11:55:09 -0800 Subject: [PATCH] Fix: Add -admin.grpc flag to worker for explicit gRPC port (#7926) (#7927) * Fix: Add -admin.grpc flag to worker for explicit gRPC port configuration * Fix(helm): Add adminGrpcServer to worker configuration * Refactor: Support host:port.grpcPort address format, revert -admin.grpc flag * Helm: Conditionally append grpcPort to worker admin address * weed/admin: fix "send on closed channel" panic in worker gRPC server Make unregisterWorker connection-aware to prevent closing channels belonging to newer connections. * weed/worker: improve gRPC client stability and logging - Fix goroutine leak in reconnection logic - Refactor reconnection loop to exit on success and prevent busy-waiting - Add session identification and enhanced logging to client handlers - Use constant for internal reset action and remove unused variables * weed/worker: fix worker state initialization and add lifecycle logs - Revert workerState to use running boolean correctly - Prevent handleStart failing by checking running state instead of startTime - Add more detailed logs for worker startup events --- .../templates/worker/worker-deployment.yaml | 6 +- k8s/charts/seaweedfs/values.yaml | 2 +- weed/admin/dash/worker_grpc_server.go | 42 ++++-- weed/command/worker.go | 1 + weed/pb/grpc_client_server.go | 22 +++ weed/worker/client.go | 139 ++++++++++++------ weed/worker/worker.go | 3 +- 7 files changed, 146 insertions(+), 69 deletions(-) diff --git a/k8s/charts/seaweedfs/templates/worker/worker-deployment.yaml b/k8s/charts/seaweedfs/templates/worker/worker-deployment.yaml index d57a900d5..83e827e41 100644 --- a/k8s/charts/seaweedfs/templates/worker/worker-deployment.yaml +++ b/k8s/charts/seaweedfs/templates/worker/worker-deployment.yaml @@ -131,11 +131,7 @@ spec: -v={{ .Values.global.loggingLevel }} \ {{- end }} worker \ - {{- if .Values.worker.adminServer }} - -admin={{ .Values.worker.adminServer }} \ - {{- else }} - -admin={{ template "seaweedfs.name" . }}-admin.{{ .Release.Namespace }}:{{ .Values.admin.port }} \ - {{- end }} + -admin={{ template "seaweedfs.name" . }}-admin.{{ .Release.Namespace }}:{{ .Values.admin.port }}{{ if .Values.admin.grpcPort }}.{{ .Values.admin.grpcPort }}{{ end }} \ -capabilities={{ .Values.worker.capabilities }} \ -maxConcurrent={{ .Values.worker.maxConcurrent }} \ -workingDir={{ .Values.worker.workingDir }}{{- if or .Values.worker.metricsPort .Values.worker.extraArgs }} \{{ end }} diff --git a/k8s/charts/seaweedfs/values.yaml b/k8s/charts/seaweedfs/values.yaml index a94d7f183..947369c88 100644 --- a/k8s/charts/seaweedfs/values.yaml +++ b/k8s/charts/seaweedfs/values.yaml @@ -1231,9 +1231,9 @@ worker: metricsPort: 9327 # Admin server to connect to - # Format: "host:port" or auto-discover from admin service adminServer: "" + # Worker capabilities - comma-separated list # Available: vacuum, balance, erasure_coding # Default: "vacuum,balance,erasure_coding" (all capabilities) diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go index ba52a79e2..c4b187797 100644 --- a/weed/admin/dash/worker_grpc_server.go +++ b/weed/admin/dash/worker_grpc_server.go @@ -225,11 +225,11 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr select { case <-ctx.Done(): glog.Infof("Worker %s connection closed: %v", workerID, ctx.Err()) - s.unregisterWorker(workerID) + s.unregisterWorker(conn) return nil case <-connCtx.Done(): glog.Infof("Worker %s connection cancelled", workerID) - s.unregisterWorker(workerID) + s.unregisterWorker(conn) return nil default: } @@ -241,7 +241,7 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr } else { glog.Errorf("Error receiving from worker %s: %v", workerID, err) } - s.unregisterWorker(workerID) + s.unregisterWorker(conn) return err } @@ -294,7 +294,7 @@ func (s *WorkerGrpcServer) handleWorkerMessage(conn *WorkerConnection, msg *work case *worker_pb.WorkerMessage_Shutdown: glog.Infof("Worker %s shutting down: %s", workerID, m.Shutdown.Reason) - s.unregisterWorker(workerID) + s.unregisterWorker(conn) default: glog.Warningf("Unknown message type from worker %s", workerID) @@ -463,17 +463,24 @@ func (s *WorkerGrpcServer) safeCloseOutgoingChannel(conn *WorkerConnection, sour } // unregisterWorker removes a worker connection -func (s *WorkerGrpcServer) unregisterWorker(workerID string) { +func (s *WorkerGrpcServer) unregisterWorker(conn *WorkerConnection) { s.connMutex.Lock() - conn, exists := s.connections[workerID] + existingConn, exists := s.connections[conn.workerID] if !exists { s.connMutex.Unlock() - glog.V(2).Infof("unregisterWorker: worker %s not found in connections map (already unregistered)", workerID) + glog.V(2).Infof("unregisterWorker: worker %s not found in connections map (already unregistered)", conn.workerID) + return + } + + // Only remove if it matches the specific connection instance + if existingConn != conn { + s.connMutex.Unlock() + glog.V(1).Infof("unregisterWorker: worker %s connection replaced, skipping unregister for old connection", conn.workerID) return } // Remove from map first to prevent duplicate cleanup attempts - delete(s.connections, workerID) + delete(s.connections, conn.workerID) s.connMutex.Unlock() // Cancel context to signal goroutines to stop @@ -482,7 +489,7 @@ func (s *WorkerGrpcServer) unregisterWorker(workerID string) { // Safely close the outgoing channel with recover to handle potential double-close s.safeCloseOutgoingChannel(conn, "unregisterWorker") - glog.V(1).Infof("Unregistered worker %s", workerID) + glog.V(1).Infof("Unregistered worker %s", conn.workerID) } // cleanupRoutine periodically cleans up stale connections @@ -505,16 +512,19 @@ func (s *WorkerGrpcServer) cleanupStaleConnections() { cutoff := time.Now().Add(-2 * time.Minute) s.connMutex.Lock() - defer s.connMutex.Unlock() - - for workerID, conn := range s.connections { + // collect connections to remove first to avoid deadlock if unregisterWorker locks + var toRemove []*WorkerConnection + for _, conn := range s.connections { if conn.lastSeen.Before(cutoff) { - glog.Warningf("Cleaning up stale worker connection: %s", workerID) - conn.cancel() - s.safeCloseOutgoingChannel(conn, "cleanupStaleConnections") - delete(s.connections, workerID) + toRemove = append(toRemove, conn) } } + s.connMutex.Unlock() + + for _, conn := range toRemove { + glog.Warningf("Cleaning up stale worker connection: %s", conn.workerID) + s.unregisterWorker(conn) + } } // GetConnectedWorkers returns a list of currently connected workers diff --git a/weed/command/worker.go b/weed/command/worker.go index 1ff6678a0..16c14c738 100644 --- a/weed/command/worker.go +++ b/weed/command/worker.go @@ -23,6 +23,7 @@ import ( _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" + // TODO: Implement additional task packages (add to default capabilities when ready): // _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/remote" - for uploading volumes to remote/cloud storage // _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/replication" - for fixing replication issues and maintaining data consistency diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 3460339ca..75ef51b9d 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -257,6 +257,15 @@ func hostAndPort(address string) (host string, port uint64, err error) { if colonIndex < 0 { return "", 0, fmt.Errorf("server should have hostname:port format: %v", address) } + dotIndex := strings.LastIndex(address, ".") + if dotIndex > colonIndex { + // port format is "port.grpcPort" + port, err = strconv.ParseUint(address[colonIndex+1:dotIndex], 10, 64) + if err != nil { + return "", 0, fmt.Errorf("server port parse error: %w", err) + } + return address[:colonIndex], port, err + } port, err = strconv.ParseUint(address[colonIndex+1:], 10, 64) if err != nil { return "", 0, fmt.Errorf("server port parse error: %w", err) @@ -267,6 +276,19 @@ func hostAndPort(address string) (host string, port uint64, err error) { func ServerToGrpcAddress(server string) (serverGrpcAddress string) { + colonIndex := strings.LastIndex(server, ":") + if colonIndex >= 0 { + if dotIndex := strings.LastIndex(server, "."); dotIndex > colonIndex { + // port format is "port.grpcPort" + // return the host:grpcPort + host := server[:colonIndex] + grpcPort := server[dotIndex+1:] + if _, err := strconv.ParseUint(grpcPort, 10, 64); err == nil { + return util.JoinHostPort(host, int(0+util.ParseInt(grpcPort, 0))) + } + } + } + host, port, parseErr := hostAndPort(server) if parseErr != nil { glog.Fatalf("server address %s parse error: %v", server, parseErr) diff --git a/weed/worker/client.go b/weed/worker/client.go index f4d15e155..812308636 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -7,6 +7,8 @@ import ( "io" "time" + "crypto/rand" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" @@ -26,6 +28,9 @@ type GrpcAdminClient struct { cmds chan grpcCommand + // Session identification for logging + sessionID string + // Reconnection parameters maxReconnectAttempts int reconnectBackoff time.Duration @@ -49,6 +54,7 @@ const ( ActionQueryReconnecting grpcAction = "query_reconnecting" ActionQueryConnected grpcAction = "query_connected" ActionQueryShouldReconnect grpcAction = "query_shouldreconnect" + ActionResetReconnectStop grpcAction = "reset_reconnect_stop" ) type registrationRequest struct { @@ -83,9 +89,15 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di // Admin uses HTTP port + 10000 as gRPC port grpcAddress := pb.ServerToGrpcAddress(adminAddress) + // Generate a unique session ID for logging + sessionBytes := make([]byte, 2) + rand.Read(sessionBytes) + sessionID := fmt.Sprintf("%x", sessionBytes) + c := &GrpcAdminClient{ adminAddress: grpcAddress, workerID: workerID, + sessionID: sessionID, dialOption: dialOption, maxReconnectAttempts: 0, // 0 means infinite attempts reconnectBackoff: 1 * time.Second, @@ -133,8 +145,11 @@ func (c *GrpcAdminClient) safeCloseChannel(chPtr *chan struct{}) { func (c *GrpcAdminClient) managerLoop() { state := &grpcState{shouldReconnect: true} + glog.V(1).Infof("[session %s] Manager loop started for worker %s", c.sessionID, c.workerID) + out: for cmd := range c.cmds { + glog.V(4).Infof("[session %s] Manager received command: %s", c.sessionID, cmd.action) switch cmd.action { case ActionConnect: c.handleConnect(cmd, state) @@ -152,11 +167,20 @@ out: cmd.resp <- err case ActionStreamError: state.connected = false + // Restart reconnection loop if needed + if state.shouldReconnect && state.reconnectStop == nil { + glog.V(1).Infof("[session %s] Stream error, starting reconnection loop", c.sessionID) + stop := make(chan struct{}) + state.reconnectStop = stop + go c.reconnectionLoop(stop, func() { + c.cmds <- grpcCommand{action: ActionResetReconnectStop, data: state} + }) + } case ActionRegisterWorker: req := cmd.data.(registrationRequest) state.lastWorkerInfo = req.Worker if !state.connected { - glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection") + glog.V(1).Infof("[session %s] Not connected yet, worker info stored for registration upon connection", c.sessionID) // Respond immediately with success (registration will happen later) req.Resp <- nil continue @@ -179,13 +203,17 @@ out: case ActionQueryShouldReconnect: respCh := cmd.data.(chan bool) respCh <- state.shouldReconnect + case ActionResetReconnectStop: + // This is an internal action to reset the stop channel when reconnectionLoop exits + s := cmd.data.(*grpcState) + s.reconnectStop = nil } } } // Connect establishes gRPC connection to admin server with TLS detection func (c *GrpcAdminClient) Connect() error { - resp := make(chan error) + resp := make(chan error, 1) c.cmds <- grpcCommand{ action: ActionConnect, resp: resp, @@ -199,15 +227,24 @@ func (c *GrpcAdminClient) handleConnect(cmd grpcCommand, s *grpcState) { return } - // Start reconnection loop immediately (async) - stop := make(chan struct{}) - s.reconnectStop = stop - go c.reconnectionLoop(stop) + // Start reconnection loop immediately if not already running (async) + if s.reconnectStop == nil { + glog.V(1).Infof("[session %s] Starting reconnection loop", c.sessionID) + stop := make(chan struct{}) + s.reconnectStop = stop + go c.reconnectionLoop(stop, func() { + // This callback is executed when the reconnectionLoop exits. + // It ensures that s.reconnectStop is reset to nil, allowing a new loop to be started later. + c.cmds <- grpcCommand{action: ActionResetReconnectStop, data: s} + }) + } else { + glog.V(1).Infof("[session %s] Reconnection loop already running", c.sessionID) + } // Attempt the initial connection err := c.attemptConnection(s) if err != nil { - glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err) + glog.Warningf("[session %s] Initial connection failed, reconnection loop will retry: %v", c.sessionID, err) cmd.resp <- err return } @@ -221,10 +258,10 @@ func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) { conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption) if err != nil { - return nil, fmt.Errorf("failed to connect to admin server: %w", err) + return nil, fmt.Errorf("failed to connect to admin server %s: %w", c.adminAddress, err) } - glog.Infof("Connected to admin server at %s", c.adminAddress) + glog.Infof("[session %s] Connected to admin server at %s", c.sessionID, c.adminAddress) return conn, nil } @@ -242,7 +279,7 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { // Create bidirectional stream s.streamCtx, s.streamCancel = context.WithCancel(context.Background()) stream, err := s.client.WorkerStream(s.streamCtx) - glog.Infof("Worker stream created") + glog.Infof("[session %s] Worker stream created", c.sessionID) if err != nil { s.conn.Close() return fmt.Errorf("failed to create worker stream: %w", err) @@ -255,8 +292,8 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { s.streamExit = make(chan struct{}) s.streamFailed = make(chan struct{}) s.regWait = make(chan *worker_pb.RegistrationResponse, 1) - go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds) - go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds, s.streamFailed, s.regWait) + go handleOutgoing(c.sessionID, s.stream, s.streamExit, c.outgoing, c.cmds) + go handleIncoming(c.sessionID, c.workerID, s.stream, s.streamExit, c.incoming, c.cmds, s.streamFailed, s.regWait) // Always check for worker info and send registration immediately as the very first message if s.lastWorkerInfo != nil { @@ -270,18 +307,19 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { s.connected = false return fmt.Errorf("failed to register worker: %w", err) } - glog.Infof("Worker registered successfully with admin server") + glog.Infof("[session %s] Worker %s registered successfully with admin server", c.sessionID, c.workerID) } else { // No worker info yet - stream will wait for registration - glog.V(1).Infof("Connected to admin server, waiting for worker registration info") + glog.V(1).Infof("[session %s] Connected to admin server, waiting for worker registration info", c.sessionID) } - glog.Infof("Connected to admin server at %s", c.adminAddress) + glog.V(1).Infof("[session %s] attemptConnection successful", c.sessionID) return nil } // reconnect attempts to re-establish the connection func (c *GrpcAdminClient) reconnect(s *grpcState) error { + glog.V(1).Infof("[session %s] Reconnection attempt starting", c.sessionID) // Clean up existing connection completely c.safeCloseChannel(&s.streamExit) c.safeCloseChannel(&s.streamFailed) @@ -299,25 +337,31 @@ func (c *GrpcAdminClient) reconnect(s *grpcState) error { return fmt.Errorf("failed to reconnect: %w", err) } + glog.Infof("[session %s] Successfully reconnected to admin server", c.sessionID) // Registration is now handled in attemptConnection if worker info is available return nil } // reconnectionLoop handles automatic reconnection with exponential backoff -func (c *GrpcAdminClient) reconnectionLoop(reconnectStop chan struct{}) { +func (c *GrpcAdminClient) reconnectionLoop(reconnectStop chan struct{}, onExit func()) { + defer onExit() // Ensure the cleanup callback is called when the loop exits backoff := c.reconnectBackoff attempts := 0 for { + attempts++ // Count this attempt (starts at 1) waitDuration := backoff - if attempts == 0 { - waitDuration = time.Second + if attempts == 1 { + waitDuration = 100 * time.Millisecond // Quick retry for the very first failure } + glog.V(2).Infof("[session %s] Reconnection loop sleeping for %v before attempt %d", c.sessionID, waitDuration, attempts) select { case <-reconnectStop: + glog.V(1).Infof("[session %s] Reconnection loop stopping (received signal)", c.sessionID) return case <-time.After(waitDuration): } + resp := make(chan error, 1) c.cmds <- grpcCommand{ action: ActionReconnect, @@ -326,19 +370,17 @@ func (c *GrpcAdminClient) reconnectionLoop(reconnectStop chan struct{}) { err := <-resp if err == nil { // Successful reconnection - attempts = 0 - backoff = c.reconnectBackoff - glog.Infof("Successfully reconnected to admin server") + glog.Infof("[session %s] Successfully reconnected to admin server, stopping reconnection loop", c.sessionID) + return // EXIT ON SUCCESS } else if errors.Is(err, ErrAlreadyConnected) { - attempts = 0 - backoff = c.reconnectBackoff + glog.V(1).Infof("[session %s] Already connected, stopping reconnection loop", c.sessionID) + return // EXIT ON SUCCESS (already connected) } else { - attempts++ - glog.Errorf("Reconnection attempt %d failed: %v", attempts, err) + glog.Warningf("[session %s] Reconnection attempt %d failed: %v", c.sessionID, attempts, err) // Check if we should give up if c.maxReconnectAttempts > 0 && attempts >= c.maxReconnectAttempts { - glog.Errorf("Max reconnection attempts (%d) reached, giving up", c.maxReconnectAttempts) + glog.Errorf("[session %s] Max reconnection attempts (%d) reached, giving up", c.sessionID, c.maxReconnectAttempts) return } @@ -347,17 +389,21 @@ func (c *GrpcAdminClient) reconnectionLoop(reconnectStop chan struct{}) { if backoff > c.maxReconnectBackoff { backoff = c.maxReconnectBackoff } - glog.Infof("Waiting %v before next reconnection attempt", backoff) + glog.V(1).Infof("[session %s] Waiting %v before next reconnection attempt", c.sessionID, backoff) } } } // handleOutgoing processes outgoing messages to admin func handleOutgoing( + sessionID string, stream worker_pb.WorkerService_WorkerStreamClient, streamExit <-chan struct{}, outgoing <-chan *worker_pb.WorkerMessage, cmds chan<- grpcCommand) { + + glog.V(1).Infof("[session %s] Outgoing message handler started", sessionID) + defer glog.V(1).Infof("[session %s] Outgoing message handler stopping", sessionID) msgCh := make(chan *worker_pb.WorkerMessage, 1) errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy @@ -375,18 +421,18 @@ func handleOutgoing( // Helper function to handle stream errors and cleanup handleStreamError := func(err error) { if err != nil { - glog.Errorf("Failed to send message to admin: %v", err) + glog.Warningf("[session %s] Stream send error: %v", sessionID, err) select { case cmds <- grpcCommand{action: ActionStreamError, data: err}: // Successfully queued default: // Manager busy, queue asynchronously to avoid blocking - glog.V(2).Infof("Manager busy, queuing stream error asynchronously from outgoing handler: %v", err) + glog.V(2).Infof("[session %s] Manager busy, queuing stream error asynchronously from outgoing handler: %v", sessionID, err) go func(e error) { select { case cmds <- grpcCommand{action: ActionStreamError, data: e}: case <-time.After(2 * time.Second): - glog.Warningf("Failed to send stream error to manager from outgoing handler, channel blocked: %v", e) + glog.Warningf("[session %s] Failed to send stream error to manager from outgoing handler, channel blocked: %v", sessionID, e) } }(err) } @@ -428,14 +474,17 @@ func handleOutgoing( // handleIncoming processes incoming messages from admin func handleIncoming( + sessionID string, workerID string, stream worker_pb.WorkerService_WorkerStreamClient, streamExit <-chan struct{}, incoming chan<- *worker_pb.AdminMessage, cmds chan<- grpcCommand, - streamFailed chan<- struct{}, - regWait chan<- *worker_pb.RegistrationResponse) { - glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", workerID) + streamFailed chan struct{}, + regWait chan *worker_pb.RegistrationResponse) { + + glog.V(1).Infof("[session %s] Incoming message handler started", sessionID) + defer glog.V(1).Infof("[session %s] Incoming message handler stopping", sessionID) msgCh := make(chan *worker_pb.AdminMessage) errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy // regWait is buffered with size 1 so that the registration response can be sent @@ -455,12 +504,10 @@ func handleIncoming( }() for { - glog.V(4).Infof("LISTENING: Worker %s waiting for message from admin server", workerID) - select { case msg := <-msgCh: // Message successfully received from the stream - glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", workerID, msg.Message) + glog.V(4).Infof("[session %s] Received message from admin server: %T", sessionID, msg.Message) // If this is a registration response, also publish to the registration waiter. // regWait is buffered (size 1) so that the response can be sent even if sendRegistration @@ -468,26 +515,26 @@ func handleIncoming( if rr := msg.GetRegistrationResponse(); rr != nil { select { case regWait <- rr: - glog.V(3).Infof("REGISTRATION RESPONSE: Worker %s routed registration response to waiter", workerID) + glog.V(3).Infof("[session %s] Registration response routed to waiter", sessionID) default: - glog.V(2).Infof("REGISTRATION RESPONSE DROPPED: Worker %s registration response dropped (no waiter)", workerID) + glog.V(2).Infof("[session %s] Registration response dropped (no waiter)", sessionID) } } // Route message to general handler. select { case incoming <- msg: - glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", workerID) + glog.V(3).Infof("[session %s] Message routed to incoming channel", sessionID) case <-time.After(time.Second): - glog.Warningf("MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", workerID, msg.Message) + glog.Warningf("[session %s] Incoming message buffer full, dropping message: %T", sessionID, msg.Message) } case err := <-errCh: // Stream Receiver goroutine reported an error (EOF or network error) if err == io.EOF { - glog.Infof("STREAM CLOSED: Worker %s admin server closed the stream", workerID) + glog.Infof("[session %s] Admin server closed the stream", sessionID) } else { - glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", workerID, err) + glog.Warningf("[session %s] Stream receive error: %v", sessionID, err) } // Signal that stream has failed (non-blocking) @@ -502,23 +549,23 @@ func handleIncoming( select { case cmds <- grpcCommand{action: ActionStreamError, data: err}: default: - glog.V(2).Infof("Manager busy, queuing stream error asynchronously: %v", err) + glog.V(2).Infof("[session %s] Manager busy, queuing stream error asynchronously from incoming handler: %v", sessionID, err) go func(e error) { select { case cmds <- grpcCommand{action: ActionStreamError, data: e}: case <-time.After(2 * time.Second): - glog.Warningf("Failed to send stream error to manager, channel blocked: %v", e) + glog.Warningf("[session %s] Failed to send stream error to manager from incoming handler, channel blocked: %v", sessionID, e) } }(err) } // Exit the main handler loop - glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler due to stream error", workerID) + glog.V(1).Infof("[session %s] Incoming message handler stopping due to stream error", sessionID) return case <-streamExit: // Manager closed this channel, signaling a controlled disconnection. - glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - received exit signal", workerID) + glog.V(1).Infof("[session %s] Incoming message handler stopping - received exit signal", sessionID) return } } diff --git a/weed/worker/worker.go b/weed/worker/worker.go index 97e6e7a1e..5c7ebfd86 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -195,7 +195,7 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) { func (w *Worker) managerLoop() { w.state = &workerState{ - startTime: time.Now(), + running: false, stopChan: make(chan struct{}), currentTasks: make(map[string]*types.TaskInput), } @@ -428,6 +428,7 @@ func (w *Worker) Start() error { // Start starts the worker func (w *Worker) handleStart(cmd workerCommand) { + glog.Infof("Worker %s handleStart called", w.id) if w.state.running { cmd.resp <- fmt.Errorf("worker is already running") return