From 8743c5453a10bceaff8743cd6770359c69021608 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 13 Sep 2025 08:17:20 -0700 Subject: [PATCH] clean up connections --- weed/mq/kafka/gateway/server.go | 2 +- weed/mq/kafka/protocol/handler.go | 29 +++++++++++++++++++++++++- weed/mq/kafka/protocol/handler_test.go | 7 ++++--- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index c8dd17d4e..4acb7b103 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -118,7 +118,7 @@ func (s *Server) Start() error { s.wg.Add(1) go func(c net.Conn) { defer s.wg.Done() - if err := s.handler.HandleConn(c); err != nil { + if err := s.handler.HandleConn(s.ctx, c); err != nil { glog.V(1).Infof("handle conn %v: %v", c.RemoteAddr(), err) } }(conn) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index d2e773e32..266252b12 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -3,6 +3,7 @@ package protocol import ( "bufio" "bytes" + "context" "encoding/binary" "fmt" "io" @@ -333,7 +334,7 @@ func (h *Handler) SetBrokerAddress(host string, port int) { } // HandleConn processes a single client connection -func (h *Handler) HandleConn(conn net.Conn) error { +func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { connectionID := fmt.Sprintf("%s->%s", conn.RemoteAddr(), conn.LocalAddr()) defer func() { fmt.Printf("DEBUG: [%s] Connection closing\n", connectionID) @@ -345,6 +346,22 @@ func (h *Handler) HandleConn(conn net.Conn) error { defer w.Flush() for { + // Check if context is cancelled + select { + case <-ctx.Done(): + fmt.Printf("DEBUG: [%s] Context cancelled, closing connection\n", connectionID) + return ctx.Err() + default: + } + + // Set a read deadline for the connection based on context + if deadline, ok := ctx.Deadline(); ok { + conn.SetReadDeadline(deadline) + } else { + // Set a reasonable timeout if no deadline is set + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + } + // Read message size (4 bytes) var sizeBytes [4]byte if _, err := io.ReadFull(r, sizeBytes[:]); err != nil { @@ -352,6 +369,16 @@ func (h *Handler) HandleConn(conn net.Conn) error { fmt.Printf("DEBUG: Client closed connection (clean EOF)\n") return nil // clean disconnect } + // Check if error is due to context cancellation + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + select { + case <-ctx.Done(): + fmt.Printf("DEBUG: [%s] Read timeout due to context cancellation\n", connectionID) + return ctx.Err() + default: + // Actual timeout, continue with error + } + } fmt.Printf("DEBUG: Error reading message size: %v\n", err) return fmt.Errorf("read size: %w", err) } diff --git a/weed/mq/kafka/protocol/handler_test.go b/weed/mq/kafka/protocol/handler_test.go index 02ff70083..a347da771 100644 --- a/weed/mq/kafka/protocol/handler_test.go +++ b/weed/mq/kafka/protocol/handler_test.go @@ -1,6 +1,7 @@ package protocol import ( + "context" "encoding/binary" "net" "testing" @@ -19,7 +20,7 @@ func TestHandler_ApiVersions(t *testing.T) { // Handle connection in background done := make(chan error, 1) go func() { - done <- h.HandleConn(server) + done <- h.HandleConn(context.Background(), server) }() // Create ApiVersions request manually @@ -361,7 +362,7 @@ func TestHandler_ListOffsets_EndToEnd(t *testing.T) { // Handle connection in background done := make(chan error, 1) go func() { - done <- h.HandleConn(server) + done <- h.HandleConn(context.Background(), server) }() // Create ListOffsets request @@ -471,7 +472,7 @@ func TestHandler_Metadata_EndToEnd(t *testing.T) { // Handle connection in background done := make(chan error, 1) go func() { - done <- h.HandleConn(server) + done <- h.HandleConn(context.Background(), server) }() // Create Metadata request