diff --git a/test/kafka/produce_consume_cycle_test.go b/test/kafka/produce_consume_cycle_test.go index 0be37e94b..ac1dc48b4 100644 --- a/test/kafka/produce_consume_cycle_test.go +++ b/test/kafka/produce_consume_cycle_test.go @@ -58,10 +58,12 @@ func TestKafkaProduceConsumeE2E(t *testing.T) { // Produce messages for i, msg := range testMessages { - err := writer.WriteMessages(context.Background(), kafka.Message{ + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + err := writer.WriteMessages(ctx, kafka.Message{ Key: []byte(fmt.Sprintf("key-%d", i)), Value: []byte(msg), }) + cancel() if err != nil { t.Fatalf("Failed to produce message %d: %v", i, err) } diff --git a/weed/mq/kafka/protocol/find_coordinator.go b/weed/mq/kafka/protocol/find_coordinator.go index 155be2eac..e099a5152 100644 --- a/weed/mq/kafka/protocol/find_coordinator.go +++ b/weed/mq/kafka/protocol/find_coordinator.go @@ -9,41 +9,47 @@ func (h *Handler) handleFindCoordinator(correlationID uint32, requestBody []byte // Parse FindCoordinator request // Request format: client_id + coordinator_key + coordinator_type(1) + // DEBUG: Hex dump the request to understand format + dumpLen := len(requestBody) + if dumpLen > 50 { + dumpLen = 50 + } + fmt.Printf("DEBUG: FindCoordinator request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) + if len(requestBody) < 2 { // client_id_size(2) return nil, fmt.Errorf("FindCoordinator request too short") } // Skip client_id clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) + fmt.Printf("DEBUG: FindCoordinator client_id_size: %d\n", clientIDSize) offset := 2 + int(clientIDSize) if len(requestBody) < offset+3 { // coordinator_key_size(2) + coordinator_type(1) - return nil, fmt.Errorf("FindCoordinator request missing data") + return nil, fmt.Errorf("FindCoordinator request missing data (need %d bytes, have %d)", offset+3, len(requestBody)) } // Parse coordinator key (group ID for consumer groups) coordinatorKeySize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + fmt.Printf("DEBUG: FindCoordinator coordinator_key_size: %d, offset: %d\n", coordinatorKeySize, offset) offset += 2 - if len(requestBody) < offset+int(coordinatorKeySize)+1 { - return nil, fmt.Errorf("FindCoordinator request missing coordinator key") + if len(requestBody) < offset+int(coordinatorKeySize) { + return nil, fmt.Errorf("FindCoordinator request missing coordinator key (need %d bytes, have %d)", offset+int(coordinatorKeySize), len(requestBody)) } coordinatorKey := string(requestBody[offset : offset+int(coordinatorKeySize)]) offset += int(coordinatorKeySize) - coordinatorType := requestBody[offset] + // Coordinator type is optional in some versions, default to 0 (group coordinator) + var coordinatorType byte = 0 + if offset < len(requestBody) { + coordinatorType = requestBody[offset] + } _ = coordinatorType // 0 = group coordinator, 1 = transaction coordinator fmt.Printf("DEBUG: FindCoordinator request for key '%s' (type: %d)\n", coordinatorKey, coordinatorType) - // DEBUG: Hex dump the request to understand format - dumpLen := len(requestBody) - if dumpLen > 50 { - dumpLen = 50 - } - fmt.Printf("DEBUG: FindCoordinator request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) - response := make([]byte, 0, 64) // Correlation ID diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index e50e17736..27a2e4e39 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -234,7 +234,11 @@ func (h *Handler) HandleConn(conn net.Conn) error { case 9: // OffsetFetch response, err = h.handleOffsetFetch(correlationID, messageBuf[8:]) // skip header case 10: // FindCoordinator + fmt.Printf("DEBUG: *** FINDCOORDINATOR REQUEST RECEIVED *** Correlation: %d\n", correlationID) response, err = h.handleFindCoordinator(correlationID, messageBuf[8:]) // skip header + if err != nil { + fmt.Printf("DEBUG: FindCoordinator error: %v\n", err) + } case 12: // Heartbeat response, err = h.handleHeartbeat(correlationID, messageBuf[8:]) // skip header case 13: // LeaveGroup