Browse Source

update

pull/7231/head
chrislu 2 months ago
parent
commit
1904ba93a8
  1. 8
      test/kafka/docker-compose.yml
  2. 19
      weed/mq/kafka/protocol/handler.go

8
test/kafka/docker-compose.yml

@ -90,7 +90,7 @@ services:
volumes: volumes:
- seaweedfs-master-data:/data - seaweedfs-master-data:/data
healthcheck: healthcheck:
test: ["CMD-SHELL", "wget --quiet --tries=1 --spider http://localhost:9333/cluster/status || curl -sf http://localhost:9333/cluster/status"]
test: ["CMD-SHELL", "wget --quiet --tries=1 --spider http://seaweedfs-master:9333/cluster/status || curl -sf http://seaweedfs-master:9333/cluster/status"]
interval: 10s interval: 10s
timeout: 5s timeout: 5s
retries: 10 retries: 10
@ -119,7 +119,7 @@ services:
volumes: volumes:
- seaweedfs-volume-data:/data - seaweedfs-volume-data:/data
healthcheck: healthcheck:
test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:8080/status"]
test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://seaweedfs-volume:8080/status"]
interval: 10s interval: 10s
timeout: 5s timeout: 5s
retries: 3 retries: 3
@ -148,7 +148,7 @@ services:
volumes: volumes:
- seaweedfs-filer-data:/data - seaweedfs-filer-data:/data
healthcheck: healthcheck:
test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:8888/"]
test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://seaweedfs-filer:8888/"]
interval: 10s interval: 10s
timeout: 5s timeout: 5s
retries: 3 retries: 3
@ -164,7 +164,7 @@ services:
- "17777:17777" # MQ Broker port - "17777:17777" # MQ Broker port
command: command:
- mq.broker - mq.broker
- -filer=seaweedfs-filer:8888
- -master=seaweedfs-master:9333
- -ip=seaweedfs-mq-broker - -ip=seaweedfs-mq-broker
- -port=17777 - -port=17777
depends_on: depends_on:

19
weed/mq/kafka/protocol/handler.go

@ -251,10 +251,17 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
fmt.Printf("DEBUG: [%s] Context cancelled before reading message header\n", connectionID) fmt.Printf("DEBUG: [%s] Context cancelled before reading message header\n", connectionID)
// Give a small delay to ensure proper cleanup
time.Sleep(100 * time.Millisecond)
return ctx.Err() return ctx.Err()
default: default:
} }
// Set a much shorter read deadline to prevent hanging in CI
if err := conn.SetReadDeadline(time.Now().Add(2 * time.Second)); err != nil {
fmt.Printf("DEBUG: [%s] Failed to set short read deadline: %v\n", connectionID, err)
}
// Read message size (4 bytes) // Read message size (4 bytes)
fmt.Printf("DEBUG: [%s] About to read message size header\n", connectionID) fmt.Printf("DEBUG: [%s] About to read message size header\n", connectionID)
var sizeBytes [4]byte var sizeBytes [4]byte
@ -266,8 +273,16 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Check if it's a timeout error // Check if it's a timeout error
if netErr, ok := err.(net.Error); ok && netErr.Timeout() { if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
fmt.Printf("DEBUG: [%s] Read timeout due to context cancellation\n", connectionID)
return ctx.Err()
fmt.Printf("DEBUG: [%s] Read timeout (likely due to context cancellation or client disconnect)\n", connectionID)
// Check if context was cancelled
select {
case <-ctx.Done():
fmt.Printf("DEBUG: [%s] Context was cancelled, returning context error\n", connectionID)
return ctx.Err()
default:
fmt.Printf("DEBUG: [%s] Timeout without context cancellation, treating as client disconnect\n", connectionID)
return nil // treat as clean disconnect
}
} }
// Use centralized error classification // Use centralized error classification

Loading…
Cancel
Save