diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index bd7bc5aa0..29b81c800 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -233,6 +233,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { default: } + // Set a very short read deadline to prevent hanging in CI + shortDeadline := time.Now().Add(1 * time.Second) + // Set a read deadline for the connection based on context or default timeout var readDeadline time.Time if deadline, ok := ctx.Deadline(); ok { @@ -242,6 +245,14 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { readDeadline = time.Now().Add(timeoutConfig.ReadTimeout) } + // Always use the shorter of the two deadlines + if shortDeadline.Before(readDeadline) { + readDeadline = shortDeadline + fmt.Printf("DEBUG: [%s] Using short deadline (1s) to prevent hanging\n", connectionID) + } else { + fmt.Printf("DEBUG: [%s] Using context/config deadline\n", connectionID) + } + if err := conn.SetReadDeadline(readDeadline); err != nil { fmt.Printf("DEBUG: [%s] Failed to set read deadline: %v\n", connectionID, err) return fmt.Errorf("set read deadline: %w", err) @@ -257,11 +268,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { default: } - // Set a much shorter read deadline to prevent hanging in CI - if err := conn.SetReadDeadline(time.Now().Add(2 * time.Second)); err != nil { - fmt.Printf("DEBUG: [%s] Failed to set short read deadline: %v\n", connectionID, err) - } - // Read message size (4 bytes) fmt.Printf("DEBUG: [%s] About to read message size header\n", connectionID) var sizeBytes [4]byte