From 5895f22d69031630918931e049c4a9a4b9332864 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 13 Sep 2025 20:02:11 -0700 Subject: [PATCH] context is cancelled, the server will detect it immediately and exit gracefully --- test/kafka/docker-compose.yml | 6 +++--- weed/mq/kafka/protocol/handler.go | 16 +++++++++++++++- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/test/kafka/docker-compose.yml b/test/kafka/docker-compose.yml index f5d363388..8b835807d 100644 --- a/test/kafka/docker-compose.yml +++ b/test/kafka/docker-compose.yml @@ -90,11 +90,11 @@ services: volumes: - seaweedfs-master-data:/data healthcheck: - test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:9333/cluster/status"] + test: ["CMD-SHELL", "wget --quiet --tries=1 --spider http://localhost:9333/cluster/status || curl -sf http://localhost:9333/cluster/status"] interval: 10s timeout: 5s - retries: 3 - start_period: 10s + retries: 10 + start_period: 20s networks: - kafka-test-net diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index ad5e42fbf..90c0b9639 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -247,15 +247,29 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { return fmt.Errorf("set read deadline: %w", err) } + // Check context before reading + select { + case <-ctx.Done(): + fmt.Printf("DEBUG: [%s] Context cancelled before reading message header\n", connectionID) + return ctx.Err() + default: + } + // Read message size (4 bytes) fmt.Printf("DEBUG: [%s] About to read message size header\n", connectionID) var sizeBytes [4]byte if _, err := io.ReadFull(r, sizeBytes[:]); err != nil { if err == io.EOF { - fmt.Printf("DEBUG: Client closed connection (clean EOF)\n") + fmt.Printf("DEBUG: [%s] Client closed connection (clean EOF)\n", connectionID) return nil // clean disconnect } + // Check if it's a timeout error + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + fmt.Printf("DEBUG: [%s] Read timeout due to context cancellation\n", connectionID) + return ctx.Err() + } + // Use centralized error classification errorCode := ClassifyNetworkError(err) switch errorCode {