diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 529233e7b..a1d038701 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -300,38 +300,61 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { readChan <- readResult{n: n, err: err} }() - // Wait for either the read to complete or context cancellation with a backup timeout + // Wait for either the read to complete or context cancellation with aggressive timeout + done := make(chan bool, 1) + var finalResult readResult + var finalErr error + + // Start a timeout goroutine that will force completion after 1 second + go func() { + time.Sleep(1 * time.Second) + select { + case done <- true: + fmt.Printf("DEBUG: [%s] Force timeout after 1 second, closing connection\n", connectionID) + finalErr = fmt.Errorf("force timeout") + default: + // Already completed + } + }() + + // Main waiting logic select { case <-ctx.Done(): fmt.Printf("DEBUG: [%s] Context cancelled during read, closing connection\n", connectionID) + done <- true return ctx.Err() - case <-time.After(2 * time.Second): - fmt.Printf("DEBUG: [%s] Read operation timed out after 2 seconds, closing connection\n", connectionID) - return fmt.Errorf("read timeout") case result := <-readChan: - if result.err != nil { - if result.err == io.EOF { - fmt.Printf("DEBUG: [%s] Client closed connection (clean EOF)\n", connectionID) - return nil // clean disconnect - } + done <- true + finalResult = result + case <-done: + if finalErr != nil { + return finalErr + } + } - // Check if it's a timeout error - if netErr, ok := result.err.(net.Error); ok && netErr.Timeout() { - fmt.Printf("DEBUG: [%s] Read timeout (likely due to context cancellation or client disconnect)\n", connectionID) - // Check if context was cancelled - select { - case <-ctx.Done(): - fmt.Printf("DEBUG: [%s] Context was cancelled, returning context error\n", connectionID) - return ctx.Err() - default: - fmt.Printf("DEBUG: [%s] Timeout without context cancellation, treating as client disconnect\n", connectionID) - return nil // treat as clean disconnect - } - } + // Process the result if we got one + if finalResult.err != nil { + if finalResult.err == io.EOF { + fmt.Printf("DEBUG: [%s] Client closed connection (clean EOF)\n", connectionID) + return nil // clean disconnect + } - fmt.Printf("DEBUG: [%s] Read error: %v\n", connectionID, result.err) - return fmt.Errorf("read message size: %w", result.err) + // Check if it's a timeout error + if netErr, ok := finalResult.err.(net.Error); ok && netErr.Timeout() { + fmt.Printf("DEBUG: [%s] Read timeout (likely due to context cancellation or client disconnect)\n", connectionID) + // Check if context was cancelled + select { + case <-ctx.Done(): + fmt.Printf("DEBUG: [%s] Context was cancelled, returning context error\n", connectionID) + return ctx.Err() + default: + fmt.Printf("DEBUG: [%s] Timeout without context cancellation, treating as client disconnect\n", connectionID) + return nil // treat as clean disconnect + } } + + fmt.Printf("DEBUG: [%s] Read error: %v\n", connectionID, finalResult.err) + return fmt.Errorf("read message size: %w", finalResult.err) } // Successfully read the message size