Browse Source

timeouts

pull/7231/head
chrislu 2 months ago
parent
commit
5d5c820c88
  1. 8
      weed/mq/kafka/protocol/fetch.go
  2. 5
      weed/mq/kafka/protocol/handler.go

8
weed/mq/kafka/protocol/fetch.go

@ -54,7 +54,13 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
shouldLongPoll := fetchRequest.MinBytes > 0 && maxWaitMs > 0 && !hasDataAvailable() && allTopicsExist() shouldLongPoll := fetchRequest.MinBytes > 0 && maxWaitMs > 0 && !hasDataAvailable() && allTopicsExist()
if shouldLongPoll { if shouldLongPoll {
start := time.Now() start := time.Now()
deadline := start.Add(time.Duration(maxWaitMs) * time.Millisecond)
// Limit polling time to maximum 2 seconds to prevent hanging in CI
maxPollTime := time.Duration(maxWaitMs) * time.Millisecond
if maxPollTime > 2*time.Second {
maxPollTime = 2 * time.Second
fmt.Printf("DEBUG: Limiting fetch polling to 2 seconds to prevent hanging\n")
}
deadline := start.Add(maxPollTime)
for time.Now().Before(deadline) { for time.Now().Before(deadline) {
// Use context-aware sleep instead of blocking time.Sleep // Use context-aware sleep instead of blocking time.Sleep
select { select {

5
weed/mq/kafka/protocol/handler.go

@ -300,11 +300,14 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
readChan <- readResult{n: n, err: err} readChan <- readResult{n: n, err: err}
}() }()
// Wait for either the read to complete or context cancellation
// Wait for either the read to complete or context cancellation with a backup timeout
select { select {
case <-ctx.Done(): case <-ctx.Done():
fmt.Printf("DEBUG: [%s] Context cancelled during read, closing connection\n", connectionID) fmt.Printf("DEBUG: [%s] Context cancelled during read, closing connection\n", connectionID)
return ctx.Err() return ctx.Err()
case <-time.After(2 * time.Second):
fmt.Printf("DEBUG: [%s] Read operation timed out after 2 seconds, closing connection\n", connectionID)
return fmt.Errorf("read timeout")
case result := <-readChan: case result := <-readChan:
if result.err != nil { if result.err != nil {
if result.err == io.EOF { if result.err == io.EOF {

Loading…
Cancel
Save