Browse Source

clean up connections

pull/7231/head
chrislu 2 months ago
parent
commit
8743c5453a
  1. 2
      weed/mq/kafka/gateway/server.go
  2. 29
      weed/mq/kafka/protocol/handler.go
  3. 7
      weed/mq/kafka/protocol/handler_test.go

2
weed/mq/kafka/gateway/server.go

@ -118,7 +118,7 @@ func (s *Server) Start() error {
s.wg.Add(1) s.wg.Add(1)
go func(c net.Conn) { go func(c net.Conn) {
defer s.wg.Done() defer s.wg.Done()
if err := s.handler.HandleConn(c); err != nil {
if err := s.handler.HandleConn(s.ctx, c); err != nil {
glog.V(1).Infof("handle conn %v: %v", c.RemoteAddr(), err) glog.V(1).Infof("handle conn %v: %v", c.RemoteAddr(), err)
} }
}(conn) }(conn)

29
weed/mq/kafka/protocol/handler.go

@ -3,6 +3,7 @@ package protocol
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io" "io"
@ -333,7 +334,7 @@ func (h *Handler) SetBrokerAddress(host string, port int) {
} }
// HandleConn processes a single client connection // HandleConn processes a single client connection
func (h *Handler) HandleConn(conn net.Conn) error {
func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
connectionID := fmt.Sprintf("%s->%s", conn.RemoteAddr(), conn.LocalAddr()) connectionID := fmt.Sprintf("%s->%s", conn.RemoteAddr(), conn.LocalAddr())
defer func() { defer func() {
fmt.Printf("DEBUG: [%s] Connection closing\n", connectionID) fmt.Printf("DEBUG: [%s] Connection closing\n", connectionID)
@ -345,6 +346,22 @@ func (h *Handler) HandleConn(conn net.Conn) error {
defer w.Flush() defer w.Flush()
for { for {
// Check if context is cancelled
select {
case <-ctx.Done():
fmt.Printf("DEBUG: [%s] Context cancelled, closing connection\n", connectionID)
return ctx.Err()
default:
}
// Set a read deadline for the connection based on context
if deadline, ok := ctx.Deadline(); ok {
conn.SetReadDeadline(deadline)
} else {
// Set a reasonable timeout if no deadline is set
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
}
// Read message size (4 bytes) // Read message size (4 bytes)
var sizeBytes [4]byte var sizeBytes [4]byte
if _, err := io.ReadFull(r, sizeBytes[:]); err != nil { if _, err := io.ReadFull(r, sizeBytes[:]); err != nil {
@ -352,6 +369,16 @@ func (h *Handler) HandleConn(conn net.Conn) error {
fmt.Printf("DEBUG: Client closed connection (clean EOF)\n") fmt.Printf("DEBUG: Client closed connection (clean EOF)\n")
return nil // clean disconnect return nil // clean disconnect
} }
// Check if error is due to context cancellation
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
select {
case <-ctx.Done():
fmt.Printf("DEBUG: [%s] Read timeout due to context cancellation\n", connectionID)
return ctx.Err()
default:
// Actual timeout, continue with error
}
}
fmt.Printf("DEBUG: Error reading message size: %v\n", err) fmt.Printf("DEBUG: Error reading message size: %v\n", err)
return fmt.Errorf("read size: %w", err) return fmt.Errorf("read size: %w", err)
} }

7
weed/mq/kafka/protocol/handler_test.go

@ -1,6 +1,7 @@
package protocol package protocol
import ( import (
"context"
"encoding/binary" "encoding/binary"
"net" "net"
"testing" "testing"
@ -19,7 +20,7 @@ func TestHandler_ApiVersions(t *testing.T) {
// Handle connection in background // Handle connection in background
done := make(chan error, 1) done := make(chan error, 1)
go func() { go func() {
done <- h.HandleConn(server)
done <- h.HandleConn(context.Background(), server)
}() }()
// Create ApiVersions request manually // Create ApiVersions request manually
@ -361,7 +362,7 @@ func TestHandler_ListOffsets_EndToEnd(t *testing.T) {
// Handle connection in background // Handle connection in background
done := make(chan error, 1) done := make(chan error, 1)
go func() { go func() {
done <- h.HandleConn(server)
done <- h.HandleConn(context.Background(), server)
}() }()
// Create ListOffsets request // Create ListOffsets request
@ -471,7 +472,7 @@ func TestHandler_Metadata_EndToEnd(t *testing.T) {
// Handle connection in background // Handle connection in background
done := make(chan error, 1) done := make(chan error, 1)
go func() { go func() {
done <- h.HandleConn(server)
done <- h.HandleConn(context.Background(), server)
}() }()
// Create Metadata request // Create Metadata request

Loading…
Cancel
Save