diff --git a/weed/command/mini.go b/weed/command/mini.go index fc359f904..d52dc1c21 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -517,39 +517,26 @@ func initializeGrpcPortsOnIP(bindIp string) { continue } - // If gRPC port is 0, calculate it + // If gRPC port is 0, calculate it from HTTP port if *config.grpcPort == 0 { - calculatedPort := *config.httpPort + GrpcPortOffset - // Check if calculated port is available (on both specific IP and all interfaces) - // Also check if it was already allocated to another service in this function - if !isPortOpenOnIP(bindIp, calculatedPort) || !isPortAvailable(calculatedPort) || allocatedGrpcPorts[calculatedPort] { - glog.Warningf("Calculated gRPC port %d for %s is not available, finding alternative...", calculatedPort, config.name) - newPort := findAvailablePortOnIP(bindIp, calculatedPort+1, 100, allocatedGrpcPorts) - if newPort == 0 { - glog.Errorf("Could not find available gRPC port for %s starting from %d, will use calculated %d and fail on binding", config.name, calculatedPort+1, calculatedPort) - } else { - calculatedPort = newPort - glog.Infof("gRPC port %d for %s is available, using it instead of calculated %d", newPort, config.name, *config.httpPort+GrpcPortOffset) - } - } - *config.grpcPort = calculatedPort - allocatedGrpcPorts[calculatedPort] = true - glog.V(1).Infof("%s gRPC port initialized to %d", config.name, calculatedPort) - } else { - // gRPC port was explicitly set, verify it's still available (check on both specific IP and all interfaces) - // Also check if it was already allocated to another service in this function - if !isPortOpenOnIP(bindIp, *config.grpcPort) || !isPortAvailable(*config.grpcPort) || allocatedGrpcPorts[*config.grpcPort] { - glog.Warningf("Explicitly set gRPC port %d for %s is not available, finding alternative...", *config.grpcPort, config.name) - newPort := findAvailablePortOnIP(bindIp, *config.grpcPort+1, 100, allocatedGrpcPorts) - if newPort == 0 { - glog.Errorf("Could not find available gRPC port for %s starting from %d, will use original %d and fail on binding", config.name, *config.grpcPort+1, *config.grpcPort) - } else { - glog.Infof("gRPC port %d for %s is available, using it instead of %d", newPort, config.name, *config.grpcPort) - *config.grpcPort = newPort - } + *config.grpcPort = *config.httpPort + GrpcPortOffset + } + + // Verify the gRPC port is available (whether calculated or explicitly set) + // Check on both specific IP and all interfaces, and check against already allocated ports + if !isPortOpenOnIP(bindIp, *config.grpcPort) || !isPortAvailable(*config.grpcPort) || allocatedGrpcPorts[*config.grpcPort] { + glog.Warningf("gRPC port %d for %s is not available, finding alternative...", *config.grpcPort, config.name) + originalPort := *config.grpcPort + newPort := findAvailablePortOnIP(bindIp, originalPort+1, 100, allocatedGrpcPorts) + if newPort == 0 { + glog.Errorf("Could not find available gRPC port for %s starting from %d, will use %d and fail on binding", config.name, originalPort+1, originalPort) + } else { + glog.Infof("gRPC port %d for %s is available, using it instead of %d", newPort, config.name, originalPort) + *config.grpcPort = newPort } - allocatedGrpcPorts[*config.grpcPort] = true } + allocatedGrpcPorts[*config.grpcPort] = true + glog.V(1).Infof("%s gRPC port set to %d", config.name, *config.grpcPort) } } @@ -934,26 +921,8 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { // gRPC port should have been initialized by ensureAllPortsAvailableOnIP in runMini // If it's still 0, that indicates a problem with the port initialization sequence - // This defensive fallback handles edge cases where port initialization may have been skipped - // or failed silently (e.g., due to configuration changes or error handling paths) if *miniAdminOptions.grpcPort == 0 { - glog.Warningf("Admin gRPC port was not initialized before startAdminServer, attempting fallback initialization...") - // Use the same availability checking logic as initializeGrpcPortsOnIP - calculatedPort := *miniAdminOptions.port + GrpcPortOffset - if !isPortOpenOnIP(getBindIp(), calculatedPort) || !isPortAvailable(calculatedPort) { - glog.Warningf("Calculated fallback gRPC port %d is not available, finding alternative...", calculatedPort) - newPort := findAvailablePortOnIP(getBindIp(), calculatedPort+1, 100, make(map[int]bool)) - if newPort == 0 { - glog.Errorf("Could not find available gRPC port for Admin starting from %d, will use calculated %d and fail on binding", calculatedPort+1, calculatedPort) - *miniAdminOptions.grpcPort = calculatedPort - } else { - glog.Infof("Fallback: using gRPC port %d for Admin", newPort) - *miniAdminOptions.grpcPort = newPort - } - } else { - *miniAdminOptions.grpcPort = calculatedPort - glog.Infof("Fallback: Admin gRPC port initialized to %d", calculatedPort) - } + glog.Fatalf("Admin gRPC port was not initialized before startAdminServer. This indicates a problem with the port initialization sequence.") } // Create data directory if specified diff --git a/weed/worker/client.go b/weed/worker/client.go index d562b8703..f4d15e155 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -74,6 +74,8 @@ type grpcState struct { lastWorkerInfo *types.WorkerData reconnectStop chan struct{} streamExit chan struct{} + streamFailed chan struct{} // Signals when stream has failed + regWait chan *worker_pb.RegistrationResponse } // NewGrpcAdminClient creates a new gRPC admin client @@ -98,6 +100,25 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di return c } +// drainAndCloseRegWaitChannel drains any pending messages from the regWait channel +// and then safely closes it. This prevents losing RegistrationResponse messages +// that were sent before the channel is closed. +func drainAndCloseRegWaitChannel(ch *chan *worker_pb.RegistrationResponse) { + if ch == nil || *ch == nil { + return + } + for { + select { + case <-*ch: + // continue draining until channel is empty + default: + close(*ch) + *ch = nil + return + } + } +} + // safeCloseChannel safely closes a channel and sets it to nil to prevent double-close panics. // NOTE: This function is NOT thread-safe. It is safe to use in this codebase because all calls // are serialized within the managerLoop goroutine. If this function is used in concurrent contexts @@ -140,7 +161,14 @@ out: req.Resp <- nil continue } - err := c.sendRegistration(req.Worker) + // Capture channel pointers to avoid race condition with reconnect + streamFailedCh := state.streamFailed + regWaitCh := state.regWait + if streamFailedCh == nil || regWaitCh == nil { + req.Resp <- fmt.Errorf("stream not ready for registration") + continue + } + err := c.sendRegistration(req.Worker, streamFailedCh, regWaitCh) req.Resp <- err case ActionQueryConnected: respCh := cmd.data.(chan bool) @@ -225,14 +253,18 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { // Start stream handlers BEFORE sending registration // This ensures handleIncoming is ready to receive the registration response s.streamExit = make(chan struct{}) + s.streamFailed = make(chan struct{}) + s.regWait = make(chan *worker_pb.RegistrationResponse, 1) go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds) - go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds) + go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds, s.streamFailed, s.regWait) // Always check for worker info and send registration immediately as the very first message if s.lastWorkerInfo != nil { // Send registration via the normal outgoing channel and wait for response via incoming - if err := c.sendRegistration(s.lastWorkerInfo); err != nil { + if err := c.sendRegistration(s.lastWorkerInfo, s.streamFailed, s.regWait); err != nil { c.safeCloseChannel(&s.streamExit) + c.safeCloseChannel(&s.streamFailed) + drainAndCloseRegWaitChannel(&s.regWait) s.streamCancel() s.conn.Close() s.connected = false @@ -252,6 +284,8 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { func (c *GrpcAdminClient) reconnect(s *grpcState) error { // Clean up existing connection completely c.safeCloseChannel(&s.streamExit) + c.safeCloseChannel(&s.streamFailed) + drainAndCloseRegWaitChannel(&s.regWait) if s.streamCancel != nil { s.streamCancel() } @@ -324,32 +358,70 @@ func handleOutgoing( streamExit <-chan struct{}, outgoing <-chan *worker_pb.WorkerMessage, cmds chan<- grpcCommand) { - - msgCh := make(chan *worker_pb.WorkerMessage) + msgCh := make(chan *worker_pb.WorkerMessage, 1) errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy - // Goroutine to handle blocking stream.Recv() and simultaneously handle exit - // signals + + // Goroutine that reads from msgCh and performs the blocking stream.Send() calls. go func() { for msg := range msgCh { if err := stream.Send(msg); err != nil { errCh <- err - return // Exit the receiver goroutine on error/EOF + return } } close(errCh) }() - for msg := range outgoing { - select { - case msgCh <- msg: - case err := <-errCh: + // Helper function to handle stream errors and cleanup + handleStreamError := func(err error) { + if err != nil { glog.Errorf("Failed to send message to admin: %v", err) - cmds <- grpcCommand{action: ActionStreamError, data: err} - return + select { + case cmds <- grpcCommand{action: ActionStreamError, data: err}: + // Successfully queued + default: + // Manager busy, queue asynchronously to avoid blocking + glog.V(2).Infof("Manager busy, queuing stream error asynchronously from outgoing handler: %v", err) + go func(e error) { + select { + case cmds <- grpcCommand{action: ActionStreamError, data: e}: + case <-time.After(2 * time.Second): + glog.Warningf("Failed to send stream error to manager from outgoing handler, channel blocked: %v", e) + } + }(err) + } + } + } + + // Helper function to cleanup resources + cleanup := func() { + close(msgCh) + <-errCh + } + + for { + select { case <-streamExit: - close(msgCh) - <-errCh + cleanup() + return + case err := <-errCh: + handleStreamError(err) return + case msg, ok := <-outgoing: + if !ok { + cleanup() + return + } + select { + case msgCh <- msg: + // Message queued successfully + case <-streamExit: + cleanup() + return + case err := <-errCh: + handleStreamError(err) + return + } } } } @@ -360,10 +432,15 @@ func handleIncoming( stream worker_pb.WorkerService_WorkerStreamClient, streamExit <-chan struct{}, incoming chan<- *worker_pb.AdminMessage, - cmds chan<- grpcCommand) { + cmds chan<- grpcCommand, + streamFailed chan<- struct{}, + regWait chan<- *worker_pb.RegistrationResponse) { glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", workerID) msgCh := make(chan *worker_pb.AdminMessage) errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy + // regWait is buffered with size 1 so that the registration response can be sent + // even if the receiver goroutine has not yet started waiting on the channel. + // This non-blocking send pattern avoids a race between sendRegistration and handleIncoming. // Goroutine to handle blocking stream.Recv() and simultaneously handle exit // signals go func() { @@ -385,7 +462,19 @@ func handleIncoming( // Message successfully received from the stream glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", workerID, msg.Message) - // Route message to waiting goroutines or general handler (original select logic) + // If this is a registration response, also publish to the registration waiter. + // regWait is buffered (size 1) so that the response can be sent even if sendRegistration + // hasn't started waiting yet, preventing a race condition between the two goroutines. + if rr := msg.GetRegistrationResponse(); rr != nil { + select { + case regWait <- rr: + glog.V(3).Infof("REGISTRATION RESPONSE: Worker %s routed registration response to waiter", workerID) + default: + glog.V(2).Infof("REGISTRATION RESPONSE DROPPED: Worker %s registration response dropped (no waiter)", workerID) + } + } + + // Route message to general handler. select { case incoming <- msg: glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", workerID) @@ -401,8 +490,27 @@ func handleIncoming( glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", workerID, err) } - // Report the failure as a command to the managerLoop (blocking) - cmds <- grpcCommand{action: ActionStreamError, data: err} + // Signal that stream has failed (non-blocking) + select { + case streamFailed <- struct{}{}: + default: + } + + // Report the failure as a command to the managerLoop. + // Try non-blocking first; if the manager is busy and the channel is full, + // fall back to an asynchronous blocking send so the error is not lost. + select { + case cmds <- grpcCommand{action: ActionStreamError, data: err}: + default: + glog.V(2).Infof("Manager busy, queuing stream error asynchronously: %v", err) + go func(e error) { + select { + case cmds <- grpcCommand{action: ActionStreamError, data: e}: + case <-time.After(2 * time.Second): + glog.Warningf("Failed to send stream error to manager, channel blocked: %v", e) + } + }(err) + } // Exit the main handler loop glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler due to stream error", workerID) @@ -460,6 +568,8 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { // Send shutdown signal to stop handlers loop c.safeCloseChannel(&s.streamExit) + c.safeCloseChannel(&s.streamFailed) + drainAndCloseRegWaitChannel(&s.regWait) // Cancel stream context if s.streamCancel != nil { @@ -495,7 +605,7 @@ func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error { } // sendRegistration sends the registration message and waits for response -func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error { +func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData, streamFailed <-chan struct{}, regWait <-chan *worker_pb.RegistrationResponse) error { capabilities := make([]string, len(worker.Capabilities)) for i, cap := range worker.Capabilities { capabilities[i] = string(cap) @@ -519,6 +629,8 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error { case c.outgoing <- msg: case <-time.After(5 * time.Second): return fmt.Errorf("failed to send registration message: timeout") + case <-streamFailed: + return fmt.Errorf("stream failed before registration message could be sent") } // Wait for registration response @@ -528,16 +640,19 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error { for { select { - case response := <-c.incoming: - if regResp := response.GetRegistrationResponse(); regResp != nil { - if regResp.Success { - glog.Infof("Worker registered successfully: %s", regResp.Message) - return nil - } - return fmt.Errorf("registration failed: %s", regResp.Message) + case regResp, ok := <-regWait: + if !ok || regResp == nil { + return fmt.Errorf("registration failed: channel closed unexpectedly") + } + if regResp.Success { + glog.Infof("Worker registered successfully: %s", regResp.Message) + return nil } + return fmt.Errorf("registration failed: %s", regResp.Message) + case <-streamFailed: + return fmt.Errorf("registration failed: stream closed by server") case <-timeout.C: - return fmt.Errorf("registration timeout") + return fmt.Errorf("registration failed: timeout waiting for response") } } }