diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 59bf24aaf..a80de9a66 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -235,8 +235,7 @@ func (h *Handler) HandleConn(conn net.Conn) error { fmt.Printf("DEBUG: JoinGroup response hex dump (%d bytes): %x\n", len(response), response) } case 14: // SyncGroup - fmt.Printf("DEBUG: *** SYNCGROUP REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) - fmt.Printf("DEBUG: *** THIS IS CRITICAL - SYNCGROUP WAS CALLED! ***\n") + fmt.Printf("DEBUG: *** 🎉 SYNCGROUP API CALLED! Version: %d, Correlation: %d ***\n", apiVersion, correlationID) response, err = h.handleSyncGroup(correlationID, apiVersion, messageBuf[8:]) // skip header if err != nil { fmt.Printf("DEBUG: SyncGroup error: %v\n", err) @@ -314,10 +313,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 3) // max version 3 // API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2) - // Advertise Metadata v7 for Kafka 2.1+ compatibility + // Force kafka-go to use v0 to avoid readPartitions parsing issues response = append(response, 0, 3) // API key 3 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 7) // max version 7 + response = append(response, 0, 0) // max version 0 // API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 2) // API key 2 @@ -476,6 +475,14 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([] } fmt.Printf("DEBUG: Metadata v0 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) + fmt.Printf("DEBUG: *** METADATA v0 RESPONSE DETAILS ***\n") + fmt.Printf("DEBUG: Response size: %d bytes\n", len(response)) + fmt.Printf("DEBUG: Broker: %s:%d\n", h.brokerHost, h.brokerPort) + fmt.Printf("DEBUG: Topics: %v\n", topicsToReturn) + for i, topic := range topicsToReturn { + fmt.Printf("DEBUG: Topic[%d]: %s (1 partition)\n", i, topic) + } + fmt.Printf("DEBUG: *** END METADATA v0 RESPONSE ***\n") return response, nil } @@ -588,7 +595,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]byte, error) { // Metadata v2 adds ClusterID field (nullable string) // v2 response layout: correlation_id(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) - + // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) fmt.Printf("DEBUG: 🔍 METADATA v2 REQUEST - Requested: %v (empty=all)\n", requestedTopics) @@ -681,7 +688,7 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([] func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ([]byte, error) { // Metadata v3/v4 adds ThrottleTimeMs field at the beginning // v3/v4 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) - + // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) fmt.Printf("DEBUG: 🔍 METADATA v3/v4 REQUEST - Requested: %v (empty=all)\n", requestedTopics) @@ -778,7 +785,7 @@ func (h *Handler) HandleMetadataV5V6(correlationID uint32, requestBody []byte) ( // Metadata v5/v6 adds OfflineReplicas field to partitions // v5/v6 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) // Each partition now includes: error_code(2) + partition_index(4) + leader_id(4) + replica_nodes(ARRAY) + isr_nodes(ARRAY) + offline_replicas(ARRAY) - + // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) fmt.Printf("DEBUG: 🔍 METADATA v5/v6 REQUEST - Requested: %v (empty=all)\n", requestedTopics) @@ -878,7 +885,7 @@ func (h *Handler) HandleMetadataV7(correlationID uint32, requestBody []byte) ([] // Metadata v7 adds LeaderEpoch field to partitions // v7 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) // Each partition now includes: error_code(2) + partition_index(4) + leader_id(4) + leader_epoch(4) + replica_nodes(ARRAY) + isr_nodes(ARRAY) + offline_replicas(ARRAY) - + // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) fmt.Printf("DEBUG: 🔍 METADATA v7 REQUEST - Requested: %v (empty=all)\n", requestedTopics)