diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 498c06644..84b2060ad 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -4,7 +4,7 @@ import ( "encoding/binary" "fmt" "time" - + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" ) @@ -16,9 +16,9 @@ type JoinGroupRequest struct { GroupID string SessionTimeout int32 RebalanceTimeout int32 - MemberID string // Empty for new members - GroupInstanceID string // Optional static membership - ProtocolType string // "consumer" for regular consumers + MemberID string // Empty for new members + GroupInstanceID string // Optional static membership + ProtocolType string // "consumer" for regular consumers GroupProtocols []GroupProtocol } @@ -30,31 +30,31 @@ type GroupProtocol struct { // JoinGroupResponse represents a JoinGroup response to a Kafka client type JoinGroupResponse struct { - CorrelationID uint32 - ErrorCode int16 - GenerationID int32 - GroupProtocol string - GroupLeader string - MemberID string - Members []JoinGroupMember // Only populated for group leader + CorrelationID uint32 + ErrorCode int16 + GenerationID int32 + GroupProtocol string + GroupLeader string + MemberID string + Members []JoinGroupMember // Only populated for group leader } // JoinGroupMember represents member info sent to group leader type JoinGroupMember struct { - MemberID string - GroupInstanceID string - Metadata []byte + MemberID string + GroupInstanceID string + Metadata []byte } // Error codes for JoinGroup const ( - ErrorCodeNone int16 = 0 - ErrorCodeInvalidGroupID int16 = 24 - ErrorCodeUnknownMemberID int16 = 25 - ErrorCodeInvalidSessionTimeout int16 = 26 - ErrorCodeRebalanceInProgress int16 = 27 - ErrorCodeMemberIDRequired int16 = 79 - ErrorCodeFencedInstanceID int16 = 82 + ErrorCodeNone int16 = 0 + ErrorCodeInvalidGroupID int16 = 24 + ErrorCodeUnknownMemberID int16 = 25 + ErrorCodeInvalidSessionTimeout int16 = 26 + ErrorCodeRebalanceInProgress int16 = 27 + ErrorCodeMemberIDRequired int16 = 79 + ErrorCodeFencedInstanceID int16 = 82 ) func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { @@ -64,44 +64,44 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque dumpLen = 100 } fmt.Printf("DEBUG: JoinGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) - + // Parse JoinGroup request request, err := h.parseJoinGroupRequest(requestBody) if err != nil { fmt.Printf("DEBUG: JoinGroup parseJoinGroupRequest error: %v\n", err) return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - - fmt.Printf("DEBUG: JoinGroup parsed request - GroupID: '%s', MemberID: '%s', SessionTimeout: %d\n", + + fmt.Printf("DEBUG: JoinGroup parsed request - GroupID: '%s', MemberID: '%s', SessionTimeout: %d\n", request.GroupID, request.MemberID, request.SessionTimeout) fmt.Printf("DEBUG: JoinGroup protocols count: %d\n", len(request.GroupProtocols)) for i, protocol := range request.GroupProtocols { - fmt.Printf("DEBUG: JoinGroup protocol[%d]: name='%s', metadata_len=%d, metadata_hex=%x\n", + fmt.Printf("DEBUG: JoinGroup protocol[%d]: name='%s', metadata_len=%d, metadata_hex=%x\n", i, protocol.Name, len(protocol.Metadata), protocol.Metadata) } - + // Validate request if request.GroupID == "" { return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + if !h.groupCoordinator.ValidateSessionTimeout(request.SessionTimeout) { return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidSessionTimeout), nil } - + // Get or create consumer group group := h.groupCoordinator.GetOrCreateGroup(request.GroupID) - + group.Mu.Lock() defer group.Mu.Unlock() - + // Update group's last activity group.LastActivity = time.Now() - + // Handle member ID logic var memberID string var isNewMember bool - + if request.MemberID == "" { // New member - generate ID memberID = h.groupCoordinator.GenerateMemberID(request.GroupInstanceID, "unknown-host") @@ -114,14 +114,16 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil } } - + // Check group state + fmt.Printf("DEBUG: JoinGroup current group state: %s, generation: %d\n", group.State, group.Generation) switch group.State { case consumer.GroupStateEmpty, consumer.GroupStateStable: // Can join or trigger rebalance if isNewMember || len(group.Members) == 0 { group.State = consumer.GroupStatePreparingRebalance group.Generation++ + fmt.Printf("DEBUG: JoinGroup transitioned to PreparingRebalance, new generation: %d\n", group.Generation) } case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance: // Rebalance already in progress @@ -129,7 +131,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque case consumer.GroupStateDead: return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + // Create or update member member := &consumer.GroupMember{ ID: memberID, @@ -142,7 +144,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque LastHeartbeat: time.Now(), JoinedAt: time.Now(), } - + // Store protocol metadata for leader - CRITICAL: Generate proper subscription metadata if len(request.GroupProtocols) > 0 { // If client sends empty metadata, generate subscription metadata for available topics @@ -151,7 +153,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque // Format: version(2) + topics_count(4) + topics[] availableTopics := h.getAvailableTopics() fmt.Printf("DEBUG: JoinGroup generating subscription metadata for topics: %v\n", availableTopics) - + metadata := make([]byte, 0, 64) // Version (2 bytes) - use version 0 metadata = append(metadata, 0, 0) @@ -172,13 +174,13 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque member.Metadata = request.GroupProtocols[0].Metadata } } - + // Add member to group group.Members[memberID] = member - + // Update group's subscribed topics h.updateGroupSubscription(group) - + // Select assignment protocol (prefer range, fall back to roundrobin) groupProtocol := "range" for _, protocol := range request.GroupProtocols { @@ -188,12 +190,15 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque } } group.Protocol = groupProtocol - + // Select group leader (first member or keep existing if still present) if group.Leader == "" || group.Members[group.Leader] == nil { group.Leader = memberID + fmt.Printf("DEBUG: JoinGroup elected new leader: '%s' for group '%s'\n", memberID, request.GroupID) + } else { + fmt.Printf("DEBUG: JoinGroup keeping existing leader: '%s' for group '%s'\n", group.Leader, request.GroupID) } - + // Build response response := JoinGroupResponse{ CorrelationID: correlationID, @@ -203,19 +208,26 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque GroupLeader: group.Leader, MemberID: memberID, } - + + fmt.Printf("DEBUG: JoinGroup response - Generation: %d, Protocol: '%s', Leader: '%s', Member: '%s'\n", + response.GenerationID, response.GroupProtocol, response.GroupLeader, response.MemberID) + // If this member is the leader, include all member info if memberID == group.Leader { + fmt.Printf("DEBUG: JoinGroup member '%s' is the leader, including %d members in response\n", memberID, len(group.Members)) response.Members = make([]JoinGroupMember, 0, len(group.Members)) for _, m := range group.Members { response.Members = append(response.Members, JoinGroupMember{ MemberID: m.ID, - GroupInstanceID: m.ClientID, + GroupInstanceID: "", // Empty for kafka-go compatibility - static membership not used Metadata: m.Metadata, }) + fmt.Printf("DEBUG: JoinGroup adding member to response - ID: '%s', Metadata: %d bytes\n", m.ID, len(m.Metadata)) } + } else { + fmt.Printf("DEBUG: JoinGroup member '%s' is NOT the leader (leader is '%s'), empty members array\n", memberID, group.Leader) } - + return h.buildJoinGroupResponse(response), nil } @@ -223,14 +235,14 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) if len(data) < 8 { return nil, fmt.Errorf("request too short") } - + offset := 0 - + // Skip client_id (part of request header, not JoinGroup payload) clientIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 + clientIDLength fmt.Printf("DEBUG: JoinGroup skipped client_id (%d bytes), offset now: %d\n", clientIDLength, offset) - + // GroupID (string) if offset+2 > len(data) { return nil, fmt.Errorf("missing group ID length") @@ -243,21 +255,21 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) groupID := string(data[offset : offset+groupIDLength]) offset += groupIDLength fmt.Printf("DEBUG: JoinGroup parsed GroupID: '%s', offset now: %d\n", groupID, offset) - + // Session timeout (4 bytes) if offset+4 > len(data) { return nil, fmt.Errorf("missing session timeout") } sessionTimeout := int32(binary.BigEndian.Uint32(data[offset:])) offset += 4 - + // Rebalance timeout (4 bytes) - for newer versions rebalanceTimeout := sessionTimeout // Default to session timeout if offset+4 <= len(data) { rebalanceTimeout = int32(binary.BigEndian.Uint32(data[offset:])) offset += 4 } - + // MemberID (string) if offset+2 > len(data) { return nil, fmt.Errorf("missing member ID length") @@ -272,15 +284,15 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) memberID = string(data[offset : offset+memberIDLength]) offset += memberIDLength } - + // TODO: CRITICAL - JoinGroup request parsing is incomplete // Missing parsing of: // - Group instance ID (for static membership) // - Protocol type validation - // - Group protocols array (client's supported assignment strategies) + // - Group protocols array (client's supported assignment strategies) // - Protocol metadata (consumer subscriptions, user data) // Without this, assignment strategies and subscriptions won't work with real clients - + return &JoinGroupRequest{ GroupID: groupID, SessionTimeout: sessionTimeout, @@ -299,60 +311,60 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte { for _, member := range response.Members { estimatedSize += len(member.MemberID) + len(member.GroupInstanceID) + len(member.Metadata) + 8 } - + result := make([]byte, 0, estimatedSize) - + // Correlation ID (4 bytes) correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) result = append(result, correlationIDBytes...) - + // JoinGroup v2 Response Format: throttle_time_ms + error_code + generation_id + ... // Throttle time (4 bytes) - CRITICAL: This was missing! throttleTimeBytes := make([]byte, 4) binary.BigEndian.PutUint32(throttleTimeBytes, 0) // No throttling result = append(result, throttleTimeBytes...) - + // Error code (2 bytes) errorCodeBytes := make([]byte, 2) binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode)) result = append(result, errorCodeBytes...) - + // Generation ID (4 bytes) generationBytes := make([]byte, 4) binary.BigEndian.PutUint32(generationBytes, uint32(response.GenerationID)) result = append(result, generationBytes...) - + // Group protocol (string) protocolLength := make([]byte, 2) binary.BigEndian.PutUint16(protocolLength, uint16(len(response.GroupProtocol))) result = append(result, protocolLength...) result = append(result, []byte(response.GroupProtocol)...) - - // Group leader (string) + + // Group leader (string) leaderLength := make([]byte, 2) binary.BigEndian.PutUint16(leaderLength, uint16(len(response.GroupLeader))) result = append(result, leaderLength...) result = append(result, []byte(response.GroupLeader)...) - + // Member ID (string) memberIDLength := make([]byte, 2) binary.BigEndian.PutUint16(memberIDLength, uint16(len(response.MemberID))) result = append(result, memberIDLength...) result = append(result, []byte(response.MemberID)...) - + // Members array (4 bytes count + members) memberCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(memberCountBytes, uint32(len(response.Members))) result = append(result, memberCountBytes...) - + for _, member := range response.Members { // Member ID (string) memberLength := make([]byte, 2) binary.BigEndian.PutUint16(memberLength, uint16(len(member.MemberID))) result = append(result, memberLength...) result = append(result, []byte(member.MemberID)...) - + // Group instance ID (string) - can be empty instanceIDLength := make([]byte, 2) binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID))) @@ -360,14 +372,14 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte { if len(member.GroupInstanceID) > 0 { result = append(result, []byte(member.GroupInstanceID)...) } - + // Metadata (bytes) metadataLength := make([]byte, 4) binary.BigEndian.PutUint32(metadataLength, uint32(len(member.Metadata))) result = append(result, metadataLength...) result = append(result, member.Metadata...) } - + return result } @@ -381,7 +393,7 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in MemberID: "", Members: []JoinGroupMember{}, } - + return h.buildJoinGroupResponse(response) } @@ -389,7 +401,7 @@ func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) [] // TODO: CRITICAL - Consumer subscription extraction is hardcoded to "test-topic" // This breaks real Kafka consumers which send their actual subscriptions // Consumer protocol metadata format (for "consumer" protocol type): - // - Version (2 bytes) + // - Version (2 bytes) // - Topics array (4 bytes count + topic names) // - User data (4 bytes length + data) // Without fixing this, consumers will be assigned wrong topics @@ -411,10 +423,10 @@ func (h *Handler) updateGroupSubscription(group *consumer.ConsumerGroup) { // SyncGroupRequest represents a SyncGroup request from a Kafka client type SyncGroupRequest struct { - GroupID string - GenerationID int32 - MemberID string - GroupInstanceID string + GroupID string + GenerationID int32 + MemberID string + GroupInstanceID string GroupAssignments []GroupAssignment // Only from group leader } @@ -433,7 +445,7 @@ type SyncGroupResponse struct { // Additional error codes for SyncGroup const ( - ErrorCodeIllegalGeneration int16 = 22 + ErrorCodeIllegalGeneration int16 = 22 ErrorCodeInconsistentGroupProtocol int16 = 23 ) @@ -444,45 +456,45 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque dumpLen = 100 } fmt.Printf("DEBUG: SyncGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) - + // Parse SyncGroup request request, err := h.parseSyncGroupRequest(requestBody) if err != nil { fmt.Printf("DEBUG: SyncGroup parseSyncGroupRequest error: %v\n", err) return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - - fmt.Printf("DEBUG: SyncGroup parsed request - GroupID: '%s', MemberID: '%s', GenerationID: %d\n", + + fmt.Printf("DEBUG: SyncGroup parsed request - GroupID: '%s', MemberID: '%s', GenerationID: %d\n", request.GroupID, request.MemberID, request.GenerationID) - + // Validate request if request.GroupID == "" || request.MemberID == "" { return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + // Get consumer group group := h.groupCoordinator.GetGroup(request.GroupID) if group == nil { return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + group.Mu.Lock() defer group.Mu.Unlock() - + // Update group's last activity group.LastActivity = time.Now() - + // Validate member exists member, exists := group.Members[request.MemberID] if !exists { return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil } - + // Validate generation if request.GenerationID != group.Generation { return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration), nil } - + // Check if this is the group leader with assignments if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 { // Leader is providing assignments - process and store them @@ -490,10 +502,10 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque if err != nil { return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol), nil } - + // Move group to stable state group.State = consumer.GroupStateStable - + // Mark all members as stable for _, m := range group.Members { m.State = consumer.MemberStateStable @@ -505,23 +517,23 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque // Trigger partition assignment using built-in strategy topicPartitions := h.getTopicPartitions(group) group.AssignPartitions(topicPartitions) - + group.State = consumer.GroupStateStable for _, m := range group.Members { m.State = consumer.MemberStateStable } } - + // Get assignment for this member assignment := h.serializeMemberAssignment(member.Assignment) - + // Build response response := SyncGroupResponse{ CorrelationID: correlationID, ErrorCode: ErrorCodeNone, Assignment: assignment, } - + return h.buildSyncGroupResponse(response), nil } @@ -529,9 +541,9 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error) if len(data) < 8 { return nil, fmt.Errorf("request too short") } - + offset := 0 - + // GroupID (string) groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 @@ -540,14 +552,14 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error) } groupID := string(data[offset : offset+groupIDLength]) offset += groupIDLength - + // Generation ID (4 bytes) if offset+4 > len(data) { return nil, fmt.Errorf("missing generation ID") } generationID := int32(binary.BigEndian.Uint32(data[offset:])) offset += 4 - + // MemberID (string) if offset+2 > len(data) { return nil, fmt.Errorf("missing member ID length") @@ -559,10 +571,10 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error) } memberID := string(data[offset : offset+memberIDLength]) offset += memberIDLength - + // For simplicity, we'll parse basic fields // In a full implementation, we'd parse the full group assignments array - + return &SyncGroupRequest{ GroupID: groupID, GenerationID: generationID, @@ -575,26 +587,26 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error) func (h *Handler) buildSyncGroupResponse(response SyncGroupResponse) []byte { estimatedSize := 16 + len(response.Assignment) result := make([]byte, 0, estimatedSize) - + // Correlation ID (4 bytes) correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) result = append(result, correlationIDBytes...) - + // Error code (2 bytes) errorCodeBytes := make([]byte, 2) binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode)) result = append(result, errorCodeBytes...) - + // Assignment (bytes) assignmentLength := make([]byte, 4) binary.BigEndian.PutUint32(assignmentLength, uint32(len(response.Assignment))) result = append(result, assignmentLength...) result = append(result, response.Assignment...) - + // Throttle time (4 bytes, 0 = no throttling) result = append(result, 0, 0, 0, 0) - + return result } @@ -604,7 +616,7 @@ func (h *Handler) buildSyncGroupErrorResponse(correlationID uint32, errorCode in ErrorCode: errorCode, Assignment: []byte{}, } - + return h.buildSyncGroupResponse(response) } @@ -612,23 +624,23 @@ func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignm // In a full implementation, we'd deserialize the assignment data // and update each member's partition assignment // For now, we'll trigger our own assignment logic - + topicPartitions := h.getTopicPartitions(group) group.AssignPartitions(topicPartitions) - + return nil } func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][]int32 { topicPartitions := make(map[string][]int32) - + // Get partition info for all subscribed topics for topic := range group.SubscribedTopics { // Check if topic exists in our topic registry h.topicsMu.RLock() topicInfo, exists := h.topics[topic] h.topicsMu.RUnlock() - + if exists { // Create partition list for this topic partitions := make([]int32, topicInfo.Partitions) @@ -641,7 +653,7 @@ func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][ topicPartitions[topic] = []int32{0} } } - + return topicPartitions } @@ -649,42 +661,42 @@ func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssi // Build a simple serialized format for partition assignments // Format: version(2) + num_topics(4) + topics... // For each topic: topic_name_len(2) + topic_name + num_partitions(4) + partitions... - + if len(assignments) == 0 { return []byte{0, 1, 0, 0, 0, 0} // Version 1, 0 topics } - + // Group assignments by topic topicAssignments := make(map[string][]int32) for _, assignment := range assignments { topicAssignments[assignment.Topic] = append(topicAssignments[assignment.Topic], assignment.Partition) } - + result := make([]byte, 0, 64) - + // Version (2 bytes) - use version 1 result = append(result, 0, 1) - + // Number of topics (4 bytes) numTopicsBytes := make([]byte, 4) binary.BigEndian.PutUint32(numTopicsBytes, uint32(len(topicAssignments))) result = append(result, numTopicsBytes...) - + // Topics for topic, partitions := range topicAssignments { // Topic name length (2 bytes) topicLenBytes := make([]byte, 2) binary.BigEndian.PutUint16(topicLenBytes, uint16(len(topic))) result = append(result, topicLenBytes...) - + // Topic name result = append(result, []byte(topic)...) - + // Number of partitions (4 bytes) numPartitionsBytes := make([]byte, 4) binary.BigEndian.PutUint32(numPartitionsBytes, uint32(len(partitions))) result = append(result, numPartitionsBytes...) - + // Partitions (4 bytes each) for _, partition := range partitions { partitionBytes := make([]byte, 4) @@ -692,10 +704,10 @@ func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssi result = append(result, partitionBytes...) } } - + // User data length (4 bytes) - no user data result = append(result, 0, 0, 0, 0) - + return result } @@ -703,7 +715,7 @@ func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssi func (h *Handler) getAvailableTopics() []string { h.topicsMu.RLock() defer h.topicsMu.RUnlock() - + topics := make([]string, 0, len(h.topics)) for topicName := range h.topics { topics = append(topics, topicName)