From 1904ba93a849d034fc3bc7105a7190aac65ff27a Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 13 Sep 2025 20:07:08 -0700 Subject: [PATCH] update --- test/kafka/docker-compose.yml | 8 ++++---- weed/mq/kafka/protocol/handler.go | 19 +++++++++++++++++-- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/test/kafka/docker-compose.yml b/test/kafka/docker-compose.yml index 8b835807d..40491171a 100644 --- a/test/kafka/docker-compose.yml +++ b/test/kafka/docker-compose.yml @@ -90,7 +90,7 @@ services: volumes: - seaweedfs-master-data:/data healthcheck: - test: ["CMD-SHELL", "wget --quiet --tries=1 --spider http://localhost:9333/cluster/status || curl -sf http://localhost:9333/cluster/status"] + test: ["CMD-SHELL", "wget --quiet --tries=1 --spider http://seaweedfs-master:9333/cluster/status || curl -sf http://seaweedfs-master:9333/cluster/status"] interval: 10s timeout: 5s retries: 10 @@ -119,7 +119,7 @@ services: volumes: - seaweedfs-volume-data:/data healthcheck: - test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:8080/status"] + test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://seaweedfs-volume:8080/status"] interval: 10s timeout: 5s retries: 3 @@ -148,7 +148,7 @@ services: volumes: - seaweedfs-filer-data:/data healthcheck: - test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:8888/"] + test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://seaweedfs-filer:8888/"] interval: 10s timeout: 5s retries: 3 @@ -164,7 +164,7 @@ services: - "17777:17777" # MQ Broker port command: - mq.broker - - -filer=seaweedfs-filer:8888 + - -master=seaweedfs-master:9333 - -ip=seaweedfs-mq-broker - -port=17777 depends_on: diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 90c0b9639..bd7bc5aa0 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -251,10 +251,17 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { select { case <-ctx.Done(): fmt.Printf("DEBUG: [%s] Context cancelled before reading message header\n", connectionID) + // Give a small delay to ensure proper cleanup + time.Sleep(100 * time.Millisecond) return ctx.Err() default: } + // Set a much shorter read deadline to prevent hanging in CI + if err := conn.SetReadDeadline(time.Now().Add(2 * time.Second)); err != nil { + fmt.Printf("DEBUG: [%s] Failed to set short read deadline: %v\n", connectionID, err) + } + // Read message size (4 bytes) fmt.Printf("DEBUG: [%s] About to read message size header\n", connectionID) var sizeBytes [4]byte @@ -266,8 +273,16 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Check if it's a timeout error if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - fmt.Printf("DEBUG: [%s] Read timeout due to context cancellation\n", connectionID) - return ctx.Err() + fmt.Printf("DEBUG: [%s] Read timeout (likely due to context cancellation or client disconnect)\n", connectionID) + // Check if context was cancelled + select { + case <-ctx.Done(): + fmt.Printf("DEBUG: [%s] Context was cancelled, returning context error\n", connectionID) + return ctx.Err() + default: + fmt.Printf("DEBUG: [%s] Timeout without context cancellation, treating as client disconnect\n", connectionID) + return nil // treat as clean disconnect + } } // Use centralized error classification