You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

457 lines
14 KiB

package protocol
import (
"encoding/binary"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
// Heartbeat API (key 12) - Consumer group heartbeat
// Consumers send periodic heartbeats to stay in the group and receive rebalancing signals
// HeartbeatRequest represents a Heartbeat request from a Kafka client
type HeartbeatRequest struct {
GroupID string
GenerationID int32
MemberID string
GroupInstanceID string // Optional static membership ID
}
// HeartbeatResponse represents a Heartbeat response to a Kafka client
type HeartbeatResponse struct {
CorrelationID uint32
ErrorCode int16
}
// LeaveGroup API (key 13) - Consumer graceful departure
// Consumers call this when shutting down to trigger immediate rebalancing
// LeaveGroupRequest represents a LeaveGroup request from a Kafka client
type LeaveGroupRequest struct {
GroupID string
MemberID string
GroupInstanceID string // Optional static membership ID
Members []LeaveGroupMember // For newer versions, can leave multiple members
}
// LeaveGroupMember represents a member leaving the group (for batch departures)
type LeaveGroupMember struct {
MemberID string
GroupInstanceID string
Reason string // Optional reason for leaving
}
// LeaveGroupResponse represents a LeaveGroup response to a Kafka client
type LeaveGroupResponse struct {
CorrelationID uint32
ErrorCode int16
Members []LeaveGroupMemberResponse // Per-member responses for newer versions
}
// LeaveGroupMemberResponse represents per-member leave group response
type LeaveGroupMemberResponse struct {
MemberID string
GroupInstanceID string
ErrorCode int16
}
// Error codes specific to consumer coordination are imported from errors.go
func (h *Handler) handleHeartbeat(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse Heartbeat request
request, err := h.parseHeartbeatRequest(requestBody)
if err != nil {
fmt.Printf("DEBUG: Heartbeat parse error: %v\n", err)
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
fmt.Printf("DEBUG: Heartbeat request - GroupID: '%s', MemberID: '%s', GenerationID: %d\n",
request.GroupID, request.MemberID, request.GenerationID)
// Validate request
if request.GroupID == "" || request.MemberID == "" {
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
// Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil {
fmt.Printf("DEBUG: Heartbeat group '%s' not found\n", request.GroupID)
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
group.Mu.Lock()
defer group.Mu.Unlock()
// Update group's last activity
group.LastActivity = time.Now()
fmt.Printf("DEBUG: Heartbeat pre-check - group='%s' gen=%d state=%s members=%d leader='%s'\n",
group.ID, group.Generation, group.State, len(group.Members), group.Leader)
// Validate member exists
member, exists := group.Members[request.MemberID]
if !exists {
fmt.Printf("DEBUG: Heartbeat unknown member '%s' in group '%s' (gen=%d)\n", request.MemberID, group.ID, group.Generation)
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil
}
// Validate generation
if request.GenerationID != group.Generation {
fmt.Printf("DEBUG: Heartbeat illegal generation - req=%d, group=%d\n", request.GenerationID, group.Generation)
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeIllegalGeneration), nil
}
// Update member's last heartbeat
member.LastHeartbeat = time.Now()
fmt.Printf("DEBUG: Heartbeat accepted - member='%s' state=%s lastHeartbeat=%v\n",
member.ID, member.State, member.LastHeartbeat)
// Check if rebalancing is in progress
var errorCode int16 = ErrorCodeNone
switch group.State {
case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance:
// Signal the consumer that rebalancing is happening
errorCode = ErrorCodeRebalanceInProgress
case consumer.GroupStateDead:
errorCode = ErrorCodeInvalidGroupID
case consumer.GroupStateEmpty:
// This shouldn't happen if member exists, but handle gracefully
errorCode = ErrorCodeUnknownMemberID
case consumer.GroupStateStable:
// Normal case - heartbeat accepted
errorCode = ErrorCodeNone
}
// Build successful response
response := HeartbeatResponse{
CorrelationID: correlationID,
ErrorCode: errorCode,
}
fmt.Printf("DEBUG: Heartbeat response - group='%s' gen=%d state=%s errorCode=%d\n",
group.ID, group.Generation, group.State, errorCode)
return h.buildHeartbeatResponse(response), nil
}
func (h *Handler) handleLeaveGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
// Parse LeaveGroup request
request, err := h.parseLeaveGroupRequest(requestBody)
if err != nil {
fmt.Printf("DEBUG: LeaveGroup parse error: %v\n", err)
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
}
fmt.Printf("DEBUG: LeaveGroup request - GroupID: '%s', MemberID: '%s', InstanceID: '%s'\n",
request.GroupID, request.MemberID, request.GroupInstanceID)
// Validate request
if request.GroupID == "" || request.MemberID == "" {
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
}
// Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil {
fmt.Printf("DEBUG: LeaveGroup group '%s' not found\n", request.GroupID)
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
}
group.Mu.Lock()
defer group.Mu.Unlock()
// Update group's last activity
group.LastActivity = time.Now()
fmt.Printf("DEBUG: LeaveGroup pre-state - group='%s' gen=%d state=%s members=%d leader='%s'\n",
group.ID, group.Generation, group.State, len(group.Members), group.Leader)
// Validate member exists
member, exists := group.Members[request.MemberID]
if !exists {
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID, apiVersion), nil
}
// For static members, only remove if GroupInstanceID matches or is not provided
if h.groupCoordinator.IsStaticMember(member) {
if request.GroupInstanceID != "" && *member.GroupInstanceID != request.GroupInstanceID {
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeFencedInstanceID, apiVersion), nil
}
// Unregister static member
h.groupCoordinator.UnregisterStaticMemberLocked(group, *member.GroupInstanceID)
}
// Remove the member from the group
delete(group.Members, request.MemberID)
// Update group state based on remaining members
if len(group.Members) == 0 {
// Group becomes empty
group.State = consumer.GroupStateEmpty
group.Generation++
group.Leader = ""
} else {
// Trigger rebalancing for remaining members
group.State = consumer.GroupStatePreparingRebalance
group.Generation++
// If the leaving member was the leader, select a new leader
if group.Leader == request.MemberID {
// Select first remaining member as new leader
for memberID := range group.Members {
group.Leader = memberID
break
}
}
// Mark remaining members as pending to trigger rebalancing
for _, member := range group.Members {
member.State = consumer.MemberStatePending
}
}
fmt.Printf("DEBUG: LeaveGroup post-state - group='%s' gen=%d state=%s members=%d leader='%s'\n",
group.ID, group.Generation, group.State, len(group.Members), group.Leader)
// Update group's subscribed topics (may have changed with member leaving)
h.updateGroupSubscriptionFromMembers(group)
// Build successful response
response := LeaveGroupResponse{
CorrelationID: correlationID,
ErrorCode: ErrorCodeNone,
Members: []LeaveGroupMemberResponse{
{
MemberID: request.MemberID,
GroupInstanceID: request.GroupInstanceID,
ErrorCode: ErrorCodeNone,
},
},
}
fmt.Printf("DEBUG: LeaveGroup response - group='%s' gen=%d state=%s remainingMembers=%d\n",
group.ID, group.Generation, group.State, len(group.Members))
return h.buildLeaveGroupResponse(response, apiVersion), nil
}
func (h *Handler) parseHeartbeatRequest(data []byte) (*HeartbeatRequest, error) {
if len(data) < 8 {
return nil, fmt.Errorf("request too short")
}
offset := 0
// GroupID (string)
groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if offset+groupIDLength > len(data) {
return nil, fmt.Errorf("invalid group ID length")
}
groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength
// Generation ID (4 bytes)
if offset+4 > len(data) {
return nil, fmt.Errorf("missing generation ID")
}
generationID := int32(binary.BigEndian.Uint32(data[offset:]))
offset += 4
// MemberID (string)
if offset+2 > len(data) {
return nil, fmt.Errorf("missing member ID length")
}
memberIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if offset+memberIDLength > len(data) {
return nil, fmt.Errorf("invalid member ID length")
}
memberID := string(data[offset : offset+memberIDLength])
offset += memberIDLength
return &HeartbeatRequest{
GroupID: groupID,
GenerationID: generationID,
MemberID: memberID,
GroupInstanceID: "", // Simplified - would parse from remaining data
}, nil
}
func (h *Handler) parseLeaveGroupRequest(data []byte) (*LeaveGroupRequest, error) {
if len(data) < 4 {
return nil, fmt.Errorf("request too short")
}
offset := 0
// GroupID (string)
groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if offset+groupIDLength > len(data) {
return nil, fmt.Errorf("invalid group ID length")
}
groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength
// MemberID (string)
if offset+2 > len(data) {
return nil, fmt.Errorf("missing member ID length")
}
memberIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if offset+memberIDLength > len(data) {
return nil, fmt.Errorf("invalid member ID length")
}
memberID := string(data[offset : offset+memberIDLength])
offset += memberIDLength
// GroupInstanceID (string, v3+) - optional field
var groupInstanceID string
if offset+2 <= len(data) {
instanceIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if instanceIDLength != 0xFFFF && offset+instanceIDLength <= len(data) {
groupInstanceID = string(data[offset : offset+instanceIDLength])
}
}
return &LeaveGroupRequest{
GroupID: groupID,
MemberID: memberID,
GroupInstanceID: groupInstanceID,
Members: []LeaveGroupMember{}, // Would parse members array for batch operations
}, nil
}
func (h *Handler) buildHeartbeatResponse(response HeartbeatResponse) []byte {
result := make([]byte, 0, 12)
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...)
// Error code (2 bytes)
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
result = append(result, errorCodeBytes...)
// Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0)
return result
}
func (h *Handler) buildLeaveGroupResponse(response LeaveGroupResponse, apiVersion uint16) []byte {
// LeaveGroup v0 only includes correlation_id and error_code (no throttle_time_ms, no members)
if apiVersion == 0 {
return h.buildLeaveGroupV0Response(response)
}
// For v1+ use the full response format
return h.buildLeaveGroupFullResponse(response)
}
func (h *Handler) buildLeaveGroupV0Response(response LeaveGroupResponse) []byte {
result := make([]byte, 0, 6)
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...)
// Error code (2 bytes) - that's it for v0!
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
result = append(result, errorCodeBytes...)
return result
}
func (h *Handler) buildLeaveGroupFullResponse(response LeaveGroupResponse) []byte {
estimatedSize := 16
for _, member := range response.Members {
estimatedSize += len(member.MemberID) + len(member.GroupInstanceID) + 8
}
result := make([]byte, 0, estimatedSize)
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...)
// Error code (2 bytes)
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
result = append(result, errorCodeBytes...)
// Members array length (4 bytes)
membersLengthBytes := make([]byte, 4)
binary.BigEndian.PutUint32(membersLengthBytes, uint32(len(response.Members)))
result = append(result, membersLengthBytes...)
// Members
for _, member := range response.Members {
// Member ID length (2 bytes)
memberIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(memberIDLength, uint16(len(member.MemberID)))
result = append(result, memberIDLength...)
// Member ID
result = append(result, []byte(member.MemberID)...)
// Group instance ID length (2 bytes)
instanceIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID)))
result = append(result, instanceIDLength...)
// Group instance ID
if len(member.GroupInstanceID) > 0 {
result = append(result, []byte(member.GroupInstanceID)...)
}
// Error code (2 bytes)
memberErrorBytes := make([]byte, 2)
binary.BigEndian.PutUint16(memberErrorBytes, uint16(member.ErrorCode))
result = append(result, memberErrorBytes...)
}
// Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0)
return result
}
func (h *Handler) buildHeartbeatErrorResponse(correlationID uint32, errorCode int16) []byte {
response := HeartbeatResponse{
CorrelationID: correlationID,
ErrorCode: errorCode,
}
return h.buildHeartbeatResponse(response)
}
func (h *Handler) buildLeaveGroupErrorResponse(correlationID uint32, errorCode int16, apiVersion uint16) []byte {
response := LeaveGroupResponse{
CorrelationID: correlationID,
ErrorCode: errorCode,
Members: []LeaveGroupMemberResponse{},
}
return h.buildLeaveGroupResponse(response, apiVersion)
}
func (h *Handler) updateGroupSubscriptionFromMembers(group *consumer.ConsumerGroup) {
// Update group's subscribed topics from remaining members
group.SubscribedTopics = make(map[string]bool)
for _, member := range group.Members {
for _, topic := range member.Subscription {
group.SubscribedTopics[topic] = true
}
}
}