Browse Source

mq(kafka): force Metadata v0 for kafka-go compatibility - major breakthrough!

 SUCCESSES:
- Produce phase working perfectly with Metadata v0
- FindCoordinator working (consumer group discovery)
- JoinGroup working (member joins, becomes leader, deterministic IDs)
- Group state transitions: Empty → PreparingRebalance → CompletingRebalance
- Member ID reuse working correctly

🔍 CURRENT ISSUE:
- kafka-go makes repeated Metadata calls after JoinGroup
- SyncGroup not being called yet (expected after ReadPartitions)
- Consumer workflow: FindCoordinator → JoinGroup → Metadata (repeated) → ???

Next: Investigate why SyncGroup is not called after Metadata
pull/7231/head
chrislu 3 months ago
parent
commit
6516d8ad23
  1. 7
      weed/mq/kafka/protocol/handler.go

7
weed/mq/kafka/protocol/handler.go

@ -313,9 +313,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
response = append(response, 0, 3) // max version 3
// API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2)
// TEMPORARY: Force v0 only until kafka-go compatibility issue is resolved
response = append(response, 0, 3) // API key 3
response = append(response, 0, 0) // min version 0
response = append(response, 0, 1) // max version 1
response = append(response, 0, 0) // max version 0 (force v0 for kafka-go compatibility)
// API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 2) // API key 2
@ -1109,6 +1110,10 @@ func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, reques
return h.HandleMetadataV0(correlationID, requestBody)
case 1:
return h.HandleMetadataV1(correlationID, requestBody)
case 2, 3, 4, 5, 6:
// For now, use v1 format for higher versions (kafka-go compatibility)
// TODO: Implement proper v2-v6 formats with additional fields
return h.HandleMetadataV1(correlationID, requestBody)
default:
return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion)
}

Loading…
Cancel
Save