Browse Source

timout

pull/7231/head
chrislu 2 months ago
parent
commit
1c21527179
  1. 71
      weed/mq/kafka/protocol/handler.go

71
weed/mq/kafka/protocol/handler.go

@ -300,38 +300,61 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
readChan <- readResult{n: n, err: err}
}()
// Wait for either the read to complete or context cancellation with a backup timeout
// Wait for either the read to complete or context cancellation with aggressive timeout
done := make(chan bool, 1)
var finalResult readResult
var finalErr error
// Start a timeout goroutine that will force completion after 1 second
go func() {
time.Sleep(1 * time.Second)
select {
case done <- true:
fmt.Printf("DEBUG: [%s] Force timeout after 1 second, closing connection\n", connectionID)
finalErr = fmt.Errorf("force timeout")
default:
// Already completed
}
}()
// Main waiting logic
select {
case <-ctx.Done():
fmt.Printf("DEBUG: [%s] Context cancelled during read, closing connection\n", connectionID)
done <- true
return ctx.Err()
case <-time.After(2 * time.Second):
fmt.Printf("DEBUG: [%s] Read operation timed out after 2 seconds, closing connection\n", connectionID)
return fmt.Errorf("read timeout")
case result := <-readChan:
if result.err != nil {
if result.err == io.EOF {
fmt.Printf("DEBUG: [%s] Client closed connection (clean EOF)\n", connectionID)
return nil // clean disconnect
}
done <- true
finalResult = result
case <-done:
if finalErr != nil {
return finalErr
}
}
// Check if it's a timeout error
if netErr, ok := result.err.(net.Error); ok && netErr.Timeout() {
fmt.Printf("DEBUG: [%s] Read timeout (likely due to context cancellation or client disconnect)\n", connectionID)
// Check if context was cancelled
select {
case <-ctx.Done():
fmt.Printf("DEBUG: [%s] Context was cancelled, returning context error\n", connectionID)
return ctx.Err()
default:
fmt.Printf("DEBUG: [%s] Timeout without context cancellation, treating as client disconnect\n", connectionID)
return nil // treat as clean disconnect
}
}
// Process the result if we got one
if finalResult.err != nil {
if finalResult.err == io.EOF {
fmt.Printf("DEBUG: [%s] Client closed connection (clean EOF)\n", connectionID)
return nil // clean disconnect
}
fmt.Printf("DEBUG: [%s] Read error: %v\n", connectionID, result.err)
return fmt.Errorf("read message size: %w", result.err)
// Check if it's a timeout error
if netErr, ok := finalResult.err.(net.Error); ok && netErr.Timeout() {
fmt.Printf("DEBUG: [%s] Read timeout (likely due to context cancellation or client disconnect)\n", connectionID)
// Check if context was cancelled
select {
case <-ctx.Done():
fmt.Printf("DEBUG: [%s] Context was cancelled, returning context error\n", connectionID)
return ctx.Err()
default:
fmt.Printf("DEBUG: [%s] Timeout without context cancellation, treating as client disconnect\n", connectionID)
return nil // treat as clean disconnect
}
}
fmt.Printf("DEBUG: [%s] Read error: %v\n", connectionID, finalResult.err)
return fmt.Errorf("read message size: %w", finalResult.err)
}
// Successfully read the message size

Loading…
Cancel
Save