Browse Source

feat: major consumer group breakthrough - fix FindCoordinator v2 and JoinGroup v5

🎉 MAJOR PROGRESS:
- Fixed FindCoordinator v2 response format (added throttle_time, error_code, error_message, node_id)
- Fixed JoinGroup v5 request parsing (added GroupInstanceID field parsing)
- Consumer group coordination now working: FindCoordinator -> JoinGroup -> SyncGroup
- Sarama consumer successfully joins group, gets member ID, calls Setup handler

 Working:
- FindCoordinator v2: Sarama finds coordinator successfully
- JoinGroup v5: Consumer joins group, gets generation 1, member ID assigned
- Consumer group session setup called with generation 1

 Current issue:
- SyncGroup v3 parsing error: 'invalid member ID length'
- Consumer has no partition assignments (Claims: map[])
- Need to fix SyncGroup parsing to complete consumer group flow

Next: Fix SyncGroup v3 parsing to enable partition assignment and message consumption
pull/7231/head
chrislu 2 months ago
parent
commit
56608aead3
  1. 18
      test/kafka/consumer_group_debug_test.go
  2. 18
      weed/mq/kafka/protocol/find_coordinator.go
  3. 2
      weed/mq/kafka/protocol/handler.go
  4. 44
      weed/mq/kafka/protocol/joingroup.go

18
test/kafka/consumer_group_debug_test.go

@ -29,7 +29,7 @@ func TestConsumerGroup_Debug(t *testing.T) {
// Test configuration
topicName := "debug-test"
groupID := "debug-group"
// Add topic for testing
gatewayServer.GetHandler().AddTopicForTesting(topicName, 1)
@ -70,7 +70,7 @@ func TestConsumerGroup_Debug(t *testing.T) {
// Start one consumer
t.Logf("=== Starting 1 consumer in group '%s' ===", groupID)
consumerGroup, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config)
if err != nil {
t.Fatalf("Failed to create consumer group: %v", err)
@ -95,16 +95,16 @@ func TestConsumerGroup_Debug(t *testing.T) {
select {
case <-handler.ready:
t.Logf("✅ Consumer is ready!")
// Try to consume the message
select {
case msg := <-handler.messages:
t.Logf("✅ Consumed message: key=%s, value=%s, offset=%d",
t.Logf("✅ Consumed message: key=%s, value=%s, offset=%d",
string(msg.Key), string(msg.Value), msg.Offset)
case <-time.After(5 * time.Second):
t.Logf("⚠️ No message received within timeout")
}
case <-time.After(8 * time.Second):
t.Logf("❌ Timeout waiting for consumer to be ready")
}
@ -120,7 +120,7 @@ type DebugHandler struct {
}
func (h *DebugHandler) Setup(session sarama.ConsumerGroupSession) error {
h.t.Logf("🔧 Consumer group session setup - Generation: %d, Claims: %v",
h.t.Logf("🔧 Consumer group session setup - Generation: %d, Claims: %v",
session.GenerationID(), session.Claims())
close(h.ready)
return nil
@ -132,9 +132,9 @@ func (h *DebugHandler) Cleanup(session sarama.ConsumerGroupSession) error {
}
func (h *DebugHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
h.t.Logf("🍽️ Starting to consume partition %d from offset %d",
h.t.Logf("🍽️ Starting to consume partition %d from offset %d",
claim.Partition(), claim.InitialOffset())
for {
select {
case message := <-claim.Messages():
@ -142,7 +142,7 @@ func (h *DebugHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim s
h.t.Logf("📭 Received nil message, ending consumption")
return nil
}
h.t.Logf("📨 Received message: key=%s, value=%s, offset=%d",
h.t.Logf("📨 Received message: key=%s, value=%s, offset=%d",
string(message.Key), string(message.Value), message.Offset)
h.messages <- message
session.MarkMessage(message, "")

18
weed/mq/kafka/protocol/find_coordinator.go

@ -6,6 +6,10 @@ import (
)
func (h *Handler) handleFindCoordinator(correlationID uint32, requestBody []byte) ([]byte, error) {
return h.handleFindCoordinatorV2(correlationID, requestBody)
}
func (h *Handler) handleFindCoordinatorV2(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse FindCoordinator request
// Request format: client_id + coordinator_key + coordinator_type(1)
@ -57,17 +61,25 @@ func (h *Handler) handleFindCoordinator(correlationID uint32, requestBody []byte
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// FindCoordinator v0 Response Format (no throttle_time or error_message):
// FindCoordinator v2 Response Format:
// - throttle_time_ms (INT32)
// - error_code (INT16)
// - error_message (STRING) - nullable
// - node_id (INT32)
// - host (STRING)
// - port (INT32)
// Throttle time (4 bytes, 0 = no throttling)
response = append(response, 0, 0, 0, 0)
// Error code (2 bytes, 0 = no error)
response = append(response, 0, 0)
// Coordinator node_id (4 bytes) - use broker 0 (this gateway)
response = append(response, 0, 0, 0, 0)
// Error message (nullable string) - null for success
response = append(response, 0xff, 0xff) // -1 length indicates null
// Coordinator node_id (4 bytes) - use broker 1 (this gateway)
response = append(response, 0, 0, 0, 1)
// Coordinator host (string)
host := h.brokerHost

2
weed/mq/kafka/protocol/handler.go

@ -364,7 +364,7 @@ func (h *Handler) HandleConn(conn net.Conn) error {
case 9: // OffsetFetch
response, err = h.handleOffsetFetch(correlationID, messageBuf[8:]) // skip header
case 10: // FindCoordinator
fmt.Printf("DEBUG: *** FINDCOORDINATOR REQUEST RECEIVED *** Correlation: %d\n", correlationID)
fmt.Printf("DEBUG: *** FINDCOORDINATOR REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion)
response, err = h.handleFindCoordinator(correlationID, messageBuf[8:]) // skip header
if err != nil {
fmt.Printf("DEBUG: FindCoordinator error: %v\n", err)

44
weed/mq/kafka/protocol/joingroup.go

@ -269,10 +269,11 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
offset := 0
// Skip client_id (part of request header, not JoinGroup payload)
// Skip client_id (part of JoinGroup v5 payload)
clientIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2 + clientIDLength
fmt.Printf("DEBUG: JoinGroup skipped client_id (%d bytes), offset now: %d\n", clientIDLength, offset)
fmt.Printf("DEBUG: JoinGroup v5 skipped client_id (%d bytes: '%s'), offset now: %d\n",
clientIDLength, string(data[2:2+clientIDLength]), offset)
// GroupID (string)
if offset+2 > len(data) {
@ -316,6 +317,27 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
offset += memberIDLength
}
// Parse Group Instance ID (nullable string) - for JoinGroup v5+
var groupInstanceID string
if offset+2 > len(data) {
return nil, fmt.Errorf("missing group instance ID length")
}
instanceIDLength := int16(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if instanceIDLength == -1 {
groupInstanceID = "" // null string
} else if instanceIDLength >= 0 {
if offset+int(instanceIDLength) > len(data) {
return nil, fmt.Errorf("invalid group instance ID length")
}
groupInstanceID = string(data[offset : offset+int(instanceIDLength)])
offset += int(instanceIDLength)
}
fmt.Printf("DEBUG: JoinGroup v5 - MemberID: '%s', GroupInstanceID: '%s' (len=%d), offset now: %d\n",
memberID, groupInstanceID, instanceIDLength, offset)
// Parse Protocol Type
if len(data) < offset+2 {
return nil, fmt.Errorf("JoinGroup request missing protocol type")
@ -377,29 +399,13 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
fmt.Printf("DEBUG: JoinGroup - Protocol: %s, MetadataLength: %d\n", protocolName, metadataLength)
}
// Parse Group Instance ID (nullable string) - for static membership (Kafka 2.3+)
var groupInstanceID string
if len(data) >= offset+2 {
instanceIDLength := int16(binary.BigEndian.Uint16(data[offset : offset+2]))
offset += 2
if instanceIDLength == -1 {
groupInstanceID = "" // null string
} else if instanceIDLength >= 0 && len(data) >= offset+int(instanceIDLength) {
groupInstanceID = string(data[offset : offset+int(instanceIDLength)])
offset += int(instanceIDLength)
}
if groupInstanceID != "" {
fmt.Printf("DEBUG: JoinGroup - GroupInstanceID: %s\n", groupInstanceID)
}
}
return &JoinGroupRequest{
GroupID: groupID,
SessionTimeout: sessionTimeout,
RebalanceTimeout: rebalanceTimeout,
MemberID: memberID,
GroupInstanceID: groupInstanceID,
ProtocolType: protocolType,
GroupProtocols: protocols,
}, nil

Loading…
Cancel
Save