Browse Source

debug: force Metadata v0 to fix kafka-go readPartitions issue

- Set max_version=0 for Metadata API to avoid kafka-go parsing issues
- Add detailed debugging for Metadata v0 responses
- Improve SyncGroup debug messages
- Root cause: kafka-go's readPartitions fails with v1+ but works with v0
- Issue: kafka-go still not calling SyncGroup after successful readPartitions

Progress:
 Produce phase working perfectly
 JoinGroup working with leader election
 Metadata v0 working (no more 'multiple Read calls' error)
 SyncGroup never called - investigating assignTopicPartitions phase
pull/7231/head
chrislu 2 months ago
parent
commit
0c918b223b
  1. 15
      weed/mq/kafka/protocol/handler.go

15
weed/mq/kafka/protocol/handler.go

@ -235,8 +235,7 @@ func (h *Handler) HandleConn(conn net.Conn) error {
fmt.Printf("DEBUG: JoinGroup response hex dump (%d bytes): %x\n", len(response), response) fmt.Printf("DEBUG: JoinGroup response hex dump (%d bytes): %x\n", len(response), response)
} }
case 14: // SyncGroup case 14: // SyncGroup
fmt.Printf("DEBUG: *** SYNCGROUP REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion)
fmt.Printf("DEBUG: *** THIS IS CRITICAL - SYNCGROUP WAS CALLED! ***\n")
fmt.Printf("DEBUG: *** 🎉 SYNCGROUP API CALLED! Version: %d, Correlation: %d ***\n", apiVersion, correlationID)
response, err = h.handleSyncGroup(correlationID, apiVersion, messageBuf[8:]) // skip header response, err = h.handleSyncGroup(correlationID, apiVersion, messageBuf[8:]) // skip header
if err != nil { if err != nil {
fmt.Printf("DEBUG: SyncGroup error: %v\n", err) fmt.Printf("DEBUG: SyncGroup error: %v\n", err)
@ -314,10 +313,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
response = append(response, 0, 3) // max version 3 response = append(response, 0, 3) // max version 3
// API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2) // API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2)
// Advertise Metadata v7 for Kafka 2.1+ compatibility
// Force kafka-go to use v0 to avoid readPartitions parsing issues
response = append(response, 0, 3) // API key 3 response = append(response, 0, 3) // API key 3
response = append(response, 0, 0) // min version 0 response = append(response, 0, 0) // min version 0
response = append(response, 0, 7) // max version 7
response = append(response, 0, 0) // max version 0
// API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2) // API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 2) // API key 2 response = append(response, 0, 2) // API key 2
@ -476,6 +475,14 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]
} }
fmt.Printf("DEBUG: Metadata v0 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) fmt.Printf("DEBUG: Metadata v0 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn)
fmt.Printf("DEBUG: *** METADATA v0 RESPONSE DETAILS ***\n")
fmt.Printf("DEBUG: Response size: %d bytes\n", len(response))
fmt.Printf("DEBUG: Broker: %s:%d\n", h.brokerHost, h.brokerPort)
fmt.Printf("DEBUG: Topics: %v\n", topicsToReturn)
for i, topic := range topicsToReturn {
fmt.Printf("DEBUG: Topic[%d]: %s (1 partition)\n", i, topic)
}
fmt.Printf("DEBUG: *** END METADATA v0 RESPONSE ***\n")
return response, nil return response, nil
} }

Loading…
Cancel
Save