Browse Source

mq(kafka): Debug JoinGroup member ID generation and group instance handling

🎯 CRITICAL DISCOVERY - Multiple Member IDs Issue

 DEBUGGING INSIGHTS:
- First JoinGroup: Member becomes leader (158-byte response) 
- Second JoinGroup: Different member ID, NOT leader (95-byte response) 
- Empty group instance ID for kafka-go compatibility 
- Group state transitions: Empty → PreparingRebalance 

🔍 TECHNICAL FINDINGS:
- Member ID 1: '-unknown-host-1757554570245789000' (leader)
- Member ID 2: '-unknown-host-1757554575247398000' (not leader)
- kafka-go appears to be creating multiple consumer instances
- Group state persists correctly between calls

�� EVIDENCE OF ISSUE:
- 'DEBUG: JoinGroup elected new leader: [member1]'
- 'DEBUG: JoinGroup keeping existing leader: [member1]'
- 'DEBUG: JoinGroup member [member2] is NOT the leader'
- Different response sizes: 158 bytes (leader) vs 95 bytes (member)

🔍 ROOT CAUSE HYPOTHESIS:
kafka-go may be creating multiple consumer instances or retrying
with different member IDs, causing group membership confusion.

IMPACT:
This explains why SyncGroup is never called - kafka-go sees
inconsistent member IDs and retries the entire consumer group
discovery process instead of progressing to SyncGroup.

Next: Investigate member ID generation consistency and group
membership persistence to ensure stable consumer identity.
pull/7231/head
chrislu 2 months ago
parent
commit
1696ddf570
  1. 250
      weed/mq/kafka/protocol/joingroup.go

250
weed/mq/kafka/protocol/joingroup.go

@ -4,7 +4,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
) )
@ -16,9 +16,9 @@ type JoinGroupRequest struct {
GroupID string GroupID string
SessionTimeout int32 SessionTimeout int32
RebalanceTimeout int32 RebalanceTimeout int32
MemberID string // Empty for new members
GroupInstanceID string // Optional static membership
ProtocolType string // "consumer" for regular consumers
MemberID string // Empty for new members
GroupInstanceID string // Optional static membership
ProtocolType string // "consumer" for regular consumers
GroupProtocols []GroupProtocol GroupProtocols []GroupProtocol
} }
@ -30,31 +30,31 @@ type GroupProtocol struct {
// JoinGroupResponse represents a JoinGroup response to a Kafka client // JoinGroupResponse represents a JoinGroup response to a Kafka client
type JoinGroupResponse struct { type JoinGroupResponse struct {
CorrelationID uint32
ErrorCode int16
GenerationID int32
GroupProtocol string
GroupLeader string
MemberID string
Members []JoinGroupMember // Only populated for group leader
CorrelationID uint32
ErrorCode int16
GenerationID int32
GroupProtocol string
GroupLeader string
MemberID string
Members []JoinGroupMember // Only populated for group leader
} }
// JoinGroupMember represents member info sent to group leader // JoinGroupMember represents member info sent to group leader
type JoinGroupMember struct { type JoinGroupMember struct {
MemberID string
GroupInstanceID string
Metadata []byte
MemberID string
GroupInstanceID string
Metadata []byte
} }
// Error codes for JoinGroup // Error codes for JoinGroup
const ( const (
ErrorCodeNone int16 = 0
ErrorCodeInvalidGroupID int16 = 24
ErrorCodeUnknownMemberID int16 = 25
ErrorCodeInvalidSessionTimeout int16 = 26
ErrorCodeRebalanceInProgress int16 = 27
ErrorCodeMemberIDRequired int16 = 79
ErrorCodeFencedInstanceID int16 = 82
ErrorCodeNone int16 = 0
ErrorCodeInvalidGroupID int16 = 24
ErrorCodeUnknownMemberID int16 = 25
ErrorCodeInvalidSessionTimeout int16 = 26
ErrorCodeRebalanceInProgress int16 = 27
ErrorCodeMemberIDRequired int16 = 79
ErrorCodeFencedInstanceID int16 = 82
) )
func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
@ -64,44 +64,44 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
dumpLen = 100 dumpLen = 100
} }
fmt.Printf("DEBUG: JoinGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) fmt.Printf("DEBUG: JoinGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen])
// Parse JoinGroup request // Parse JoinGroup request
request, err := h.parseJoinGroupRequest(requestBody) request, err := h.parseJoinGroupRequest(requestBody)
if err != nil { if err != nil {
fmt.Printf("DEBUG: JoinGroup parseJoinGroupRequest error: %v\n", err) fmt.Printf("DEBUG: JoinGroup parseJoinGroupRequest error: %v\n", err)
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
} }
fmt.Printf("DEBUG: JoinGroup parsed request - GroupID: '%s', MemberID: '%s', SessionTimeout: %d\n",
fmt.Printf("DEBUG: JoinGroup parsed request - GroupID: '%s', MemberID: '%s', SessionTimeout: %d\n",
request.GroupID, request.MemberID, request.SessionTimeout) request.GroupID, request.MemberID, request.SessionTimeout)
fmt.Printf("DEBUG: JoinGroup protocols count: %d\n", len(request.GroupProtocols)) fmt.Printf("DEBUG: JoinGroup protocols count: %d\n", len(request.GroupProtocols))
for i, protocol := range request.GroupProtocols { for i, protocol := range request.GroupProtocols {
fmt.Printf("DEBUG: JoinGroup protocol[%d]: name='%s', metadata_len=%d, metadata_hex=%x\n",
fmt.Printf("DEBUG: JoinGroup protocol[%d]: name='%s', metadata_len=%d, metadata_hex=%x\n",
i, protocol.Name, len(protocol.Metadata), protocol.Metadata) i, protocol.Name, len(protocol.Metadata), protocol.Metadata)
} }
// Validate request // Validate request
if request.GroupID == "" { if request.GroupID == "" {
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
} }
if !h.groupCoordinator.ValidateSessionTimeout(request.SessionTimeout) { if !h.groupCoordinator.ValidateSessionTimeout(request.SessionTimeout) {
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidSessionTimeout), nil return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidSessionTimeout), nil
} }
// Get or create consumer group // Get or create consumer group
group := h.groupCoordinator.GetOrCreateGroup(request.GroupID) group := h.groupCoordinator.GetOrCreateGroup(request.GroupID)
group.Mu.Lock() group.Mu.Lock()
defer group.Mu.Unlock() defer group.Mu.Unlock()
// Update group's last activity // Update group's last activity
group.LastActivity = time.Now() group.LastActivity = time.Now()
// Handle member ID logic // Handle member ID logic
var memberID string var memberID string
var isNewMember bool var isNewMember bool
if request.MemberID == "" { if request.MemberID == "" {
// New member - generate ID // New member - generate ID
memberID = h.groupCoordinator.GenerateMemberID(request.GroupInstanceID, "unknown-host") memberID = h.groupCoordinator.GenerateMemberID(request.GroupInstanceID, "unknown-host")
@ -114,14 +114,16 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil
} }
} }
// Check group state // Check group state
fmt.Printf("DEBUG: JoinGroup current group state: %s, generation: %d\n", group.State, group.Generation)
switch group.State { switch group.State {
case consumer.GroupStateEmpty, consumer.GroupStateStable: case consumer.GroupStateEmpty, consumer.GroupStateStable:
// Can join or trigger rebalance // Can join or trigger rebalance
if isNewMember || len(group.Members) == 0 { if isNewMember || len(group.Members) == 0 {
group.State = consumer.GroupStatePreparingRebalance group.State = consumer.GroupStatePreparingRebalance
group.Generation++ group.Generation++
fmt.Printf("DEBUG: JoinGroup transitioned to PreparingRebalance, new generation: %d\n", group.Generation)
} }
case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance: case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance:
// Rebalance already in progress // Rebalance already in progress
@ -129,7 +131,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
case consumer.GroupStateDead: case consumer.GroupStateDead:
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
} }
// Create or update member // Create or update member
member := &consumer.GroupMember{ member := &consumer.GroupMember{
ID: memberID, ID: memberID,
@ -142,7 +144,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
LastHeartbeat: time.Now(), LastHeartbeat: time.Now(),
JoinedAt: time.Now(), JoinedAt: time.Now(),
} }
// Store protocol metadata for leader - CRITICAL: Generate proper subscription metadata // Store protocol metadata for leader - CRITICAL: Generate proper subscription metadata
if len(request.GroupProtocols) > 0 { if len(request.GroupProtocols) > 0 {
// If client sends empty metadata, generate subscription metadata for available topics // If client sends empty metadata, generate subscription metadata for available topics
@ -151,7 +153,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
// Format: version(2) + topics_count(4) + topics[] // Format: version(2) + topics_count(4) + topics[]
availableTopics := h.getAvailableTopics() availableTopics := h.getAvailableTopics()
fmt.Printf("DEBUG: JoinGroup generating subscription metadata for topics: %v\n", availableTopics) fmt.Printf("DEBUG: JoinGroup generating subscription metadata for topics: %v\n", availableTopics)
metadata := make([]byte, 0, 64) metadata := make([]byte, 0, 64)
// Version (2 bytes) - use version 0 // Version (2 bytes) - use version 0
metadata = append(metadata, 0, 0) metadata = append(metadata, 0, 0)
@ -172,13 +174,13 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
member.Metadata = request.GroupProtocols[0].Metadata member.Metadata = request.GroupProtocols[0].Metadata
} }
} }
// Add member to group // Add member to group
group.Members[memberID] = member group.Members[memberID] = member
// Update group's subscribed topics // Update group's subscribed topics
h.updateGroupSubscription(group) h.updateGroupSubscription(group)
// Select assignment protocol (prefer range, fall back to roundrobin) // Select assignment protocol (prefer range, fall back to roundrobin)
groupProtocol := "range" groupProtocol := "range"
for _, protocol := range request.GroupProtocols { for _, protocol := range request.GroupProtocols {
@ -188,12 +190,15 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
} }
} }
group.Protocol = groupProtocol group.Protocol = groupProtocol
// Select group leader (first member or keep existing if still present) // Select group leader (first member or keep existing if still present)
if group.Leader == "" || group.Members[group.Leader] == nil { if group.Leader == "" || group.Members[group.Leader] == nil {
group.Leader = memberID group.Leader = memberID
fmt.Printf("DEBUG: JoinGroup elected new leader: '%s' for group '%s'\n", memberID, request.GroupID)
} else {
fmt.Printf("DEBUG: JoinGroup keeping existing leader: '%s' for group '%s'\n", group.Leader, request.GroupID)
} }
// Build response // Build response
response := JoinGroupResponse{ response := JoinGroupResponse{
CorrelationID: correlationID, CorrelationID: correlationID,
@ -203,19 +208,26 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
GroupLeader: group.Leader, GroupLeader: group.Leader,
MemberID: memberID, MemberID: memberID,
} }
fmt.Printf("DEBUG: JoinGroup response - Generation: %d, Protocol: '%s', Leader: '%s', Member: '%s'\n",
response.GenerationID, response.GroupProtocol, response.GroupLeader, response.MemberID)
// If this member is the leader, include all member info // If this member is the leader, include all member info
if memberID == group.Leader { if memberID == group.Leader {
fmt.Printf("DEBUG: JoinGroup member '%s' is the leader, including %d members in response\n", memberID, len(group.Members))
response.Members = make([]JoinGroupMember, 0, len(group.Members)) response.Members = make([]JoinGroupMember, 0, len(group.Members))
for _, m := range group.Members { for _, m := range group.Members {
response.Members = append(response.Members, JoinGroupMember{ response.Members = append(response.Members, JoinGroupMember{
MemberID: m.ID, MemberID: m.ID,
GroupInstanceID: m.ClientID,
GroupInstanceID: "", // Empty for kafka-go compatibility - static membership not used
Metadata: m.Metadata, Metadata: m.Metadata,
}) })
fmt.Printf("DEBUG: JoinGroup adding member to response - ID: '%s', Metadata: %d bytes\n", m.ID, len(m.Metadata))
} }
} else {
fmt.Printf("DEBUG: JoinGroup member '%s' is NOT the leader (leader is '%s'), empty members array\n", memberID, group.Leader)
} }
return h.buildJoinGroupResponse(response), nil return h.buildJoinGroupResponse(response), nil
} }
@ -223,14 +235,14 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
if len(data) < 8 { if len(data) < 8 {
return nil, fmt.Errorf("request too short") return nil, fmt.Errorf("request too short")
} }
offset := 0 offset := 0
// Skip client_id (part of request header, not JoinGroup payload) // Skip client_id (part of request header, not JoinGroup payload)
clientIDLength := int(binary.BigEndian.Uint16(data[offset:])) clientIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2 + clientIDLength offset += 2 + clientIDLength
fmt.Printf("DEBUG: JoinGroup skipped client_id (%d bytes), offset now: %d\n", clientIDLength, offset) fmt.Printf("DEBUG: JoinGroup skipped client_id (%d bytes), offset now: %d\n", clientIDLength, offset)
// GroupID (string) // GroupID (string)
if offset+2 > len(data) { if offset+2 > len(data) {
return nil, fmt.Errorf("missing group ID length") return nil, fmt.Errorf("missing group ID length")
@ -243,21 +255,21 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
groupID := string(data[offset : offset+groupIDLength]) groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength offset += groupIDLength
fmt.Printf("DEBUG: JoinGroup parsed GroupID: '%s', offset now: %d\n", groupID, offset) fmt.Printf("DEBUG: JoinGroup parsed GroupID: '%s', offset now: %d\n", groupID, offset)
// Session timeout (4 bytes) // Session timeout (4 bytes)
if offset+4 > len(data) { if offset+4 > len(data) {
return nil, fmt.Errorf("missing session timeout") return nil, fmt.Errorf("missing session timeout")
} }
sessionTimeout := int32(binary.BigEndian.Uint32(data[offset:])) sessionTimeout := int32(binary.BigEndian.Uint32(data[offset:]))
offset += 4 offset += 4
// Rebalance timeout (4 bytes) - for newer versions // Rebalance timeout (4 bytes) - for newer versions
rebalanceTimeout := sessionTimeout // Default to session timeout rebalanceTimeout := sessionTimeout // Default to session timeout
if offset+4 <= len(data) { if offset+4 <= len(data) {
rebalanceTimeout = int32(binary.BigEndian.Uint32(data[offset:])) rebalanceTimeout = int32(binary.BigEndian.Uint32(data[offset:]))
offset += 4 offset += 4
} }
// MemberID (string) // MemberID (string)
if offset+2 > len(data) { if offset+2 > len(data) {
return nil, fmt.Errorf("missing member ID length") return nil, fmt.Errorf("missing member ID length")
@ -272,15 +284,15 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
memberID = string(data[offset : offset+memberIDLength]) memberID = string(data[offset : offset+memberIDLength])
offset += memberIDLength offset += memberIDLength
} }
// TODO: CRITICAL - JoinGroup request parsing is incomplete // TODO: CRITICAL - JoinGroup request parsing is incomplete
// Missing parsing of: // Missing parsing of:
// - Group instance ID (for static membership) // - Group instance ID (for static membership)
// - Protocol type validation // - Protocol type validation
// - Group protocols array (client's supported assignment strategies)
// - Group protocols array (client's supported assignment strategies)
// - Protocol metadata (consumer subscriptions, user data) // - Protocol metadata (consumer subscriptions, user data)
// Without this, assignment strategies and subscriptions won't work with real clients // Without this, assignment strategies and subscriptions won't work with real clients
return &JoinGroupRequest{ return &JoinGroupRequest{
GroupID: groupID, GroupID: groupID,
SessionTimeout: sessionTimeout, SessionTimeout: sessionTimeout,
@ -299,60 +311,60 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
for _, member := range response.Members { for _, member := range response.Members {
estimatedSize += len(member.MemberID) + len(member.GroupInstanceID) + len(member.Metadata) + 8 estimatedSize += len(member.MemberID) + len(member.GroupInstanceID) + len(member.Metadata) + 8
} }
result := make([]byte, 0, estimatedSize) result := make([]byte, 0, estimatedSize)
// Correlation ID (4 bytes) // Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4) correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...) result = append(result, correlationIDBytes...)
// JoinGroup v2 Response Format: throttle_time_ms + error_code + generation_id + ... // JoinGroup v2 Response Format: throttle_time_ms + error_code + generation_id + ...
// Throttle time (4 bytes) - CRITICAL: This was missing! // Throttle time (4 bytes) - CRITICAL: This was missing!
throttleTimeBytes := make([]byte, 4) throttleTimeBytes := make([]byte, 4)
binary.BigEndian.PutUint32(throttleTimeBytes, 0) // No throttling binary.BigEndian.PutUint32(throttleTimeBytes, 0) // No throttling
result = append(result, throttleTimeBytes...) result = append(result, throttleTimeBytes...)
// Error code (2 bytes) // Error code (2 bytes)
errorCodeBytes := make([]byte, 2) errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode)) binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
result = append(result, errorCodeBytes...) result = append(result, errorCodeBytes...)
// Generation ID (4 bytes) // Generation ID (4 bytes)
generationBytes := make([]byte, 4) generationBytes := make([]byte, 4)
binary.BigEndian.PutUint32(generationBytes, uint32(response.GenerationID)) binary.BigEndian.PutUint32(generationBytes, uint32(response.GenerationID))
result = append(result, generationBytes...) result = append(result, generationBytes...)
// Group protocol (string) // Group protocol (string)
protocolLength := make([]byte, 2) protocolLength := make([]byte, 2)
binary.BigEndian.PutUint16(protocolLength, uint16(len(response.GroupProtocol))) binary.BigEndian.PutUint16(protocolLength, uint16(len(response.GroupProtocol)))
result = append(result, protocolLength...) result = append(result, protocolLength...)
result = append(result, []byte(response.GroupProtocol)...) result = append(result, []byte(response.GroupProtocol)...)
// Group leader (string)
// Group leader (string)
leaderLength := make([]byte, 2) leaderLength := make([]byte, 2)
binary.BigEndian.PutUint16(leaderLength, uint16(len(response.GroupLeader))) binary.BigEndian.PutUint16(leaderLength, uint16(len(response.GroupLeader)))
result = append(result, leaderLength...) result = append(result, leaderLength...)
result = append(result, []byte(response.GroupLeader)...) result = append(result, []byte(response.GroupLeader)...)
// Member ID (string) // Member ID (string)
memberIDLength := make([]byte, 2) memberIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(memberIDLength, uint16(len(response.MemberID))) binary.BigEndian.PutUint16(memberIDLength, uint16(len(response.MemberID)))
result = append(result, memberIDLength...) result = append(result, memberIDLength...)
result = append(result, []byte(response.MemberID)...) result = append(result, []byte(response.MemberID)...)
// Members array (4 bytes count + members) // Members array (4 bytes count + members)
memberCountBytes := make([]byte, 4) memberCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(memberCountBytes, uint32(len(response.Members))) binary.BigEndian.PutUint32(memberCountBytes, uint32(len(response.Members)))
result = append(result, memberCountBytes...) result = append(result, memberCountBytes...)
for _, member := range response.Members { for _, member := range response.Members {
// Member ID (string) // Member ID (string)
memberLength := make([]byte, 2) memberLength := make([]byte, 2)
binary.BigEndian.PutUint16(memberLength, uint16(len(member.MemberID))) binary.BigEndian.PutUint16(memberLength, uint16(len(member.MemberID)))
result = append(result, memberLength...) result = append(result, memberLength...)
result = append(result, []byte(member.MemberID)...) result = append(result, []byte(member.MemberID)...)
// Group instance ID (string) - can be empty // Group instance ID (string) - can be empty
instanceIDLength := make([]byte, 2) instanceIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID))) binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID)))
@ -360,14 +372,14 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
if len(member.GroupInstanceID) > 0 { if len(member.GroupInstanceID) > 0 {
result = append(result, []byte(member.GroupInstanceID)...) result = append(result, []byte(member.GroupInstanceID)...)
} }
// Metadata (bytes) // Metadata (bytes)
metadataLength := make([]byte, 4) metadataLength := make([]byte, 4)
binary.BigEndian.PutUint32(metadataLength, uint32(len(member.Metadata))) binary.BigEndian.PutUint32(metadataLength, uint32(len(member.Metadata)))
result = append(result, metadataLength...) result = append(result, metadataLength...)
result = append(result, member.Metadata...) result = append(result, member.Metadata...)
} }
return result return result
} }
@ -381,7 +393,7 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in
MemberID: "", MemberID: "",
Members: []JoinGroupMember{}, Members: []JoinGroupMember{},
} }
return h.buildJoinGroupResponse(response) return h.buildJoinGroupResponse(response)
} }
@ -389,7 +401,7 @@ func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) []
// TODO: CRITICAL - Consumer subscription extraction is hardcoded to "test-topic" // TODO: CRITICAL - Consumer subscription extraction is hardcoded to "test-topic"
// This breaks real Kafka consumers which send their actual subscriptions // This breaks real Kafka consumers which send their actual subscriptions
// Consumer protocol metadata format (for "consumer" protocol type): // Consumer protocol metadata format (for "consumer" protocol type):
// - Version (2 bytes)
// - Version (2 bytes)
// - Topics array (4 bytes count + topic names) // - Topics array (4 bytes count + topic names)
// - User data (4 bytes length + data) // - User data (4 bytes length + data)
// Without fixing this, consumers will be assigned wrong topics // Without fixing this, consumers will be assigned wrong topics
@ -411,10 +423,10 @@ func (h *Handler) updateGroupSubscription(group *consumer.ConsumerGroup) {
// SyncGroupRequest represents a SyncGroup request from a Kafka client // SyncGroupRequest represents a SyncGroup request from a Kafka client
type SyncGroupRequest struct { type SyncGroupRequest struct {
GroupID string
GenerationID int32
MemberID string
GroupInstanceID string
GroupID string
GenerationID int32
MemberID string
GroupInstanceID string
GroupAssignments []GroupAssignment // Only from group leader GroupAssignments []GroupAssignment // Only from group leader
} }
@ -433,7 +445,7 @@ type SyncGroupResponse struct {
// Additional error codes for SyncGroup // Additional error codes for SyncGroup
const ( const (
ErrorCodeIllegalGeneration int16 = 22
ErrorCodeIllegalGeneration int16 = 22
ErrorCodeInconsistentGroupProtocol int16 = 23 ErrorCodeInconsistentGroupProtocol int16 = 23
) )
@ -444,45 +456,45 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
dumpLen = 100 dumpLen = 100
} }
fmt.Printf("DEBUG: SyncGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) fmt.Printf("DEBUG: SyncGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen])
// Parse SyncGroup request // Parse SyncGroup request
request, err := h.parseSyncGroupRequest(requestBody) request, err := h.parseSyncGroupRequest(requestBody)
if err != nil { if err != nil {
fmt.Printf("DEBUG: SyncGroup parseSyncGroupRequest error: %v\n", err) fmt.Printf("DEBUG: SyncGroup parseSyncGroupRequest error: %v\n", err)
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
} }
fmt.Printf("DEBUG: SyncGroup parsed request - GroupID: '%s', MemberID: '%s', GenerationID: %d\n",
fmt.Printf("DEBUG: SyncGroup parsed request - GroupID: '%s', MemberID: '%s', GenerationID: %d\n",
request.GroupID, request.MemberID, request.GenerationID) request.GroupID, request.MemberID, request.GenerationID)
// Validate request // Validate request
if request.GroupID == "" || request.MemberID == "" { if request.GroupID == "" || request.MemberID == "" {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
} }
// Get consumer group // Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID) group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil { if group == nil {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
} }
group.Mu.Lock() group.Mu.Lock()
defer group.Mu.Unlock() defer group.Mu.Unlock()
// Update group's last activity // Update group's last activity
group.LastActivity = time.Now() group.LastActivity = time.Now()
// Validate member exists // Validate member exists
member, exists := group.Members[request.MemberID] member, exists := group.Members[request.MemberID]
if !exists { if !exists {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil
} }
// Validate generation // Validate generation
if request.GenerationID != group.Generation { if request.GenerationID != group.Generation {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration), nil return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration), nil
} }
// Check if this is the group leader with assignments // Check if this is the group leader with assignments
if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 { if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 {
// Leader is providing assignments - process and store them // Leader is providing assignments - process and store them
@ -490,10 +502,10 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
if err != nil { if err != nil {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol), nil return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol), nil
} }
// Move group to stable state // Move group to stable state
group.State = consumer.GroupStateStable group.State = consumer.GroupStateStable
// Mark all members as stable // Mark all members as stable
for _, m := range group.Members { for _, m := range group.Members {
m.State = consumer.MemberStateStable m.State = consumer.MemberStateStable
@ -505,23 +517,23 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
// Trigger partition assignment using built-in strategy // Trigger partition assignment using built-in strategy
topicPartitions := h.getTopicPartitions(group) topicPartitions := h.getTopicPartitions(group)
group.AssignPartitions(topicPartitions) group.AssignPartitions(topicPartitions)
group.State = consumer.GroupStateStable group.State = consumer.GroupStateStable
for _, m := range group.Members { for _, m := range group.Members {
m.State = consumer.MemberStateStable m.State = consumer.MemberStateStable
} }
} }
// Get assignment for this member // Get assignment for this member
assignment := h.serializeMemberAssignment(member.Assignment) assignment := h.serializeMemberAssignment(member.Assignment)
// Build response // Build response
response := SyncGroupResponse{ response := SyncGroupResponse{
CorrelationID: correlationID, CorrelationID: correlationID,
ErrorCode: ErrorCodeNone, ErrorCode: ErrorCodeNone,
Assignment: assignment, Assignment: assignment,
} }
return h.buildSyncGroupResponse(response), nil return h.buildSyncGroupResponse(response), nil
} }
@ -529,9 +541,9 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error)
if len(data) < 8 { if len(data) < 8 {
return nil, fmt.Errorf("request too short") return nil, fmt.Errorf("request too short")
} }
offset := 0 offset := 0
// GroupID (string) // GroupID (string)
groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2 offset += 2
@ -540,14 +552,14 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error)
} }
groupID := string(data[offset : offset+groupIDLength]) groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength offset += groupIDLength
// Generation ID (4 bytes) // Generation ID (4 bytes)
if offset+4 > len(data) { if offset+4 > len(data) {
return nil, fmt.Errorf("missing generation ID") return nil, fmt.Errorf("missing generation ID")
} }
generationID := int32(binary.BigEndian.Uint32(data[offset:])) generationID := int32(binary.BigEndian.Uint32(data[offset:]))
offset += 4 offset += 4
// MemberID (string) // MemberID (string)
if offset+2 > len(data) { if offset+2 > len(data) {
return nil, fmt.Errorf("missing member ID length") return nil, fmt.Errorf("missing member ID length")
@ -559,10 +571,10 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error)
} }
memberID := string(data[offset : offset+memberIDLength]) memberID := string(data[offset : offset+memberIDLength])
offset += memberIDLength offset += memberIDLength
// For simplicity, we'll parse basic fields // For simplicity, we'll parse basic fields
// In a full implementation, we'd parse the full group assignments array // In a full implementation, we'd parse the full group assignments array
return &SyncGroupRequest{ return &SyncGroupRequest{
GroupID: groupID, GroupID: groupID,
GenerationID: generationID, GenerationID: generationID,
@ -575,26 +587,26 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error)
func (h *Handler) buildSyncGroupResponse(response SyncGroupResponse) []byte { func (h *Handler) buildSyncGroupResponse(response SyncGroupResponse) []byte {
estimatedSize := 16 + len(response.Assignment) estimatedSize := 16 + len(response.Assignment)
result := make([]byte, 0, estimatedSize) result := make([]byte, 0, estimatedSize)
// Correlation ID (4 bytes) // Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4) correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...) result = append(result, correlationIDBytes...)
// Error code (2 bytes) // Error code (2 bytes)
errorCodeBytes := make([]byte, 2) errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode)) binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
result = append(result, errorCodeBytes...) result = append(result, errorCodeBytes...)
// Assignment (bytes) // Assignment (bytes)
assignmentLength := make([]byte, 4) assignmentLength := make([]byte, 4)
binary.BigEndian.PutUint32(assignmentLength, uint32(len(response.Assignment))) binary.BigEndian.PutUint32(assignmentLength, uint32(len(response.Assignment)))
result = append(result, assignmentLength...) result = append(result, assignmentLength...)
result = append(result, response.Assignment...) result = append(result, response.Assignment...)
// Throttle time (4 bytes, 0 = no throttling) // Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0) result = append(result, 0, 0, 0, 0)
return result return result
} }
@ -604,7 +616,7 @@ func (h *Handler) buildSyncGroupErrorResponse(correlationID uint32, errorCode in
ErrorCode: errorCode, ErrorCode: errorCode,
Assignment: []byte{}, Assignment: []byte{},
} }
return h.buildSyncGroupResponse(response) return h.buildSyncGroupResponse(response)
} }
@ -612,23 +624,23 @@ func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignm
// In a full implementation, we'd deserialize the assignment data // In a full implementation, we'd deserialize the assignment data
// and update each member's partition assignment // and update each member's partition assignment
// For now, we'll trigger our own assignment logic // For now, we'll trigger our own assignment logic
topicPartitions := h.getTopicPartitions(group) topicPartitions := h.getTopicPartitions(group)
group.AssignPartitions(topicPartitions) group.AssignPartitions(topicPartitions)
return nil return nil
} }
func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][]int32 { func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][]int32 {
topicPartitions := make(map[string][]int32) topicPartitions := make(map[string][]int32)
// Get partition info for all subscribed topics // Get partition info for all subscribed topics
for topic := range group.SubscribedTopics { for topic := range group.SubscribedTopics {
// Check if topic exists in our topic registry // Check if topic exists in our topic registry
h.topicsMu.RLock() h.topicsMu.RLock()
topicInfo, exists := h.topics[topic] topicInfo, exists := h.topics[topic]
h.topicsMu.RUnlock() h.topicsMu.RUnlock()
if exists { if exists {
// Create partition list for this topic // Create partition list for this topic
partitions := make([]int32, topicInfo.Partitions) partitions := make([]int32, topicInfo.Partitions)
@ -641,7 +653,7 @@ func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][
topicPartitions[topic] = []int32{0} topicPartitions[topic] = []int32{0}
} }
} }
return topicPartitions return topicPartitions
} }
@ -649,42 +661,42 @@ func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssi
// Build a simple serialized format for partition assignments // Build a simple serialized format for partition assignments
// Format: version(2) + num_topics(4) + topics... // Format: version(2) + num_topics(4) + topics...
// For each topic: topic_name_len(2) + topic_name + num_partitions(4) + partitions... // For each topic: topic_name_len(2) + topic_name + num_partitions(4) + partitions...
if len(assignments) == 0 { if len(assignments) == 0 {
return []byte{0, 1, 0, 0, 0, 0} // Version 1, 0 topics return []byte{0, 1, 0, 0, 0, 0} // Version 1, 0 topics
} }
// Group assignments by topic // Group assignments by topic
topicAssignments := make(map[string][]int32) topicAssignments := make(map[string][]int32)
for _, assignment := range assignments { for _, assignment := range assignments {
topicAssignments[assignment.Topic] = append(topicAssignments[assignment.Topic], assignment.Partition) topicAssignments[assignment.Topic] = append(topicAssignments[assignment.Topic], assignment.Partition)
} }
result := make([]byte, 0, 64) result := make([]byte, 0, 64)
// Version (2 bytes) - use version 1 // Version (2 bytes) - use version 1
result = append(result, 0, 1) result = append(result, 0, 1)
// Number of topics (4 bytes) // Number of topics (4 bytes)
numTopicsBytes := make([]byte, 4) numTopicsBytes := make([]byte, 4)
binary.BigEndian.PutUint32(numTopicsBytes, uint32(len(topicAssignments))) binary.BigEndian.PutUint32(numTopicsBytes, uint32(len(topicAssignments)))
result = append(result, numTopicsBytes...) result = append(result, numTopicsBytes...)
// Topics // Topics
for topic, partitions := range topicAssignments { for topic, partitions := range topicAssignments {
// Topic name length (2 bytes) // Topic name length (2 bytes)
topicLenBytes := make([]byte, 2) topicLenBytes := make([]byte, 2)
binary.BigEndian.PutUint16(topicLenBytes, uint16(len(topic))) binary.BigEndian.PutUint16(topicLenBytes, uint16(len(topic)))
result = append(result, topicLenBytes...) result = append(result, topicLenBytes...)
// Topic name // Topic name
result = append(result, []byte(topic)...) result = append(result, []byte(topic)...)
// Number of partitions (4 bytes) // Number of partitions (4 bytes)
numPartitionsBytes := make([]byte, 4) numPartitionsBytes := make([]byte, 4)
binary.BigEndian.PutUint32(numPartitionsBytes, uint32(len(partitions))) binary.BigEndian.PutUint32(numPartitionsBytes, uint32(len(partitions)))
result = append(result, numPartitionsBytes...) result = append(result, numPartitionsBytes...)
// Partitions (4 bytes each) // Partitions (4 bytes each)
for _, partition := range partitions { for _, partition := range partitions {
partitionBytes := make([]byte, 4) partitionBytes := make([]byte, 4)
@ -692,10 +704,10 @@ func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssi
result = append(result, partitionBytes...) result = append(result, partitionBytes...)
} }
} }
// User data length (4 bytes) - no user data // User data length (4 bytes) - no user data
result = append(result, 0, 0, 0, 0) result = append(result, 0, 0, 0, 0)
return result return result
} }
@ -703,7 +715,7 @@ func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssi
func (h *Handler) getAvailableTopics() []string { func (h *Handler) getAvailableTopics() []string {
h.topicsMu.RLock() h.topicsMu.RLock()
defer h.topicsMu.RUnlock() defer h.topicsMu.RUnlock()
topics := make([]string, 0, len(h.topics)) topics := make([]string, 0, len(h.topics))
for topicName := range h.topics { for topicName := range h.topics {
topics = append(topics, topicName) topics = append(topics, topicName)

Loading…
Cancel
Save