diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index ae7d8fa37..39615830b 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -146,9 +146,9 @@ func (s *Server) Close() error { select { case <-done: // Normal shutdown - case <-time.After(2 * time.Second): + case <-time.After(1 * time.Second): // Timeout - force shutdown - glog.Warningf("Server shutdown timed out after 2 seconds, forcing close") + glog.Warningf("Server shutdown timed out after 1 second, forcing close") } // Close the handler (important for SeaweedMQ mode) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index b2388b29d..7c764ebe1 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -324,6 +324,10 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { default: // Already completed } + case <-ctx.Done(): + // Context cancelled, exit timeout goroutine + fmt.Printf("DEBUG: [%s] Force timeout goroutine cancelled due to context\n", connectionID) + return case <-done: // Operation completed, exit timeout goroutine return