diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 29b81c800..64397bdc1 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -233,24 +233,25 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { default: } - // Set a very short read deadline to prevent hanging in CI - shortDeadline := time.Now().Add(1 * time.Second) - // Set a read deadline for the connection based on context or default timeout var readDeadline time.Time + var timeoutDuration time.Duration + if deadline, ok := ctx.Deadline(); ok { readDeadline = deadline + timeoutDuration = time.Until(deadline) + fmt.Printf("DEBUG: [%s] Using context deadline: %v\n", connectionID, timeoutDuration) } else { // Use configurable read timeout instead of hardcoded 5 seconds - readDeadline = time.Now().Add(timeoutConfig.ReadTimeout) + timeoutDuration = timeoutConfig.ReadTimeout + readDeadline = time.Now().Add(timeoutDuration) + fmt.Printf("DEBUG: [%s] Using config timeout: %v\n", connectionID, timeoutDuration) } - // Always use the shorter of the two deadlines - if shortDeadline.Before(readDeadline) { - readDeadline = shortDeadline - fmt.Printf("DEBUG: [%s] Using short deadline (1s) to prevent hanging\n", connectionID) - } else { - fmt.Printf("DEBUG: [%s] Using context/config deadline\n", connectionID) + // If context is about to be cancelled or timeout is very long, use a shorter deadline to prevent hanging + if timeoutDuration > 5*time.Second { + readDeadline = time.Now().Add(5 * time.Second) + fmt.Printf("DEBUG: [%s] Limiting timeout to 5s to prevent hanging\n", connectionID) } if err := conn.SetReadDeadline(readDeadline); err != nil { @@ -266,6 +267,16 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { time.Sleep(100 * time.Millisecond) return ctx.Err() default: + // If context is close to being cancelled, set a very short timeout + if deadline, ok := ctx.Deadline(); ok { + timeUntilDeadline := time.Until(deadline) + if timeUntilDeadline < 2*time.Second && timeUntilDeadline > 0 { + shortDeadline := time.Now().Add(500 * time.Millisecond) + if err := conn.SetReadDeadline(shortDeadline); err == nil { + fmt.Printf("DEBUG: [%s] Context deadline approaching, using 500ms timeout\n", connectionID) + } + } + } } // Read message size (4 bytes)