You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

383 lines
11 KiB

package protocol
import (
"encoding/binary"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
// Heartbeat API (key 12) - Consumer group heartbeat
// Consumers send periodic heartbeats to stay in the group and receive rebalancing signals
// HeartbeatRequest represents a Heartbeat request from a Kafka client
type HeartbeatRequest struct {
GroupID string
GenerationID int32
MemberID string
GroupInstanceID string // Optional static membership ID
}
// HeartbeatResponse represents a Heartbeat response to a Kafka client
type HeartbeatResponse struct {
CorrelationID uint32
ErrorCode int16
}
// LeaveGroup API (key 13) - Consumer graceful departure
// Consumers call this when shutting down to trigger immediate rebalancing
// LeaveGroupRequest represents a LeaveGroup request from a Kafka client
type LeaveGroupRequest struct {
GroupID string
MemberID string
GroupInstanceID string // Optional static membership ID
Members []LeaveGroupMember // For newer versions, can leave multiple members
}
// LeaveGroupMember represents a member leaving the group (for batch departures)
type LeaveGroupMember struct {
MemberID string
GroupInstanceID string
Reason string // Optional reason for leaving
}
// LeaveGroupResponse represents a LeaveGroup response to a Kafka client
type LeaveGroupResponse struct {
CorrelationID uint32
ErrorCode int16
Members []LeaveGroupMemberResponse // Per-member responses for newer versions
}
// LeaveGroupMemberResponse represents per-member leave group response
type LeaveGroupMemberResponse struct {
MemberID string
GroupInstanceID string
ErrorCode int16
}
// Error codes specific to consumer coordination are imported from errors.go
func (h *Handler) handleHeartbeat(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse Heartbeat request
request, err := h.parseHeartbeatRequest(requestBody)
if err != nil {
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
// Validate request
if request.GroupID == "" || request.MemberID == "" {
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
// Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil {
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
group.Mu.Lock()
defer group.Mu.Unlock()
// Update group's last activity
group.LastActivity = time.Now()
// Validate member exists
member, exists := group.Members[request.MemberID]
if !exists {
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil
}
// Validate generation
if request.GenerationID != group.Generation {
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeIllegalGeneration), nil
}
// Update member's last heartbeat
member.LastHeartbeat = time.Now()
// Check if rebalancing is in progress
var errorCode int16 = ErrorCodeNone
switch group.State {
case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance:
// Signal the consumer that rebalancing is happening
errorCode = ErrorCodeRebalanceInProgress
case consumer.GroupStateDead:
errorCode = ErrorCodeInvalidGroupID
case consumer.GroupStateEmpty:
// This shouldn't happen if member exists, but handle gracefully
errorCode = ErrorCodeUnknownMemberID
case consumer.GroupStateStable:
// Normal case - heartbeat accepted
errorCode = ErrorCodeNone
}
// Build successful response
response := HeartbeatResponse{
CorrelationID: correlationID,
ErrorCode: errorCode,
}
return h.buildHeartbeatResponse(response), nil
}
func (h *Handler) handleLeaveGroup(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse LeaveGroup request
request, err := h.parseLeaveGroupRequest(requestBody)
if err != nil {
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
// Validate request
if request.GroupID == "" || request.MemberID == "" {
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
// Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil {
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
group.Mu.Lock()
defer group.Mu.Unlock()
// Update group's last activity
group.LastActivity = time.Now()
// Validate member exists
_, exists := group.Members[request.MemberID]
if !exists {
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil
}
// Remove the member from the group
delete(group.Members, request.MemberID)
// Update group state based on remaining members
if len(group.Members) == 0 {
// Group becomes empty
group.State = consumer.GroupStateEmpty
group.Generation++
group.Leader = ""
} else {
// Trigger rebalancing for remaining members
group.State = consumer.GroupStatePreparingRebalance
group.Generation++
// If the leaving member was the leader, select a new leader
if group.Leader == request.MemberID {
// Select first remaining member as new leader
for memberID := range group.Members {
group.Leader = memberID
break
}
}
// Mark remaining members as pending to trigger rebalancing
for _, member := range group.Members {
member.State = consumer.MemberStatePending
}
}
// Update group's subscribed topics (may have changed with member leaving)
h.updateGroupSubscriptionFromMembers(group)
// Build successful response
response := LeaveGroupResponse{
CorrelationID: correlationID,
ErrorCode: ErrorCodeNone,
Members: []LeaveGroupMemberResponse{
{
MemberID: request.MemberID,
GroupInstanceID: request.GroupInstanceID,
ErrorCode: ErrorCodeNone,
},
},
}
return h.buildLeaveGroupResponse(response), nil
}
func (h *Handler) parseHeartbeatRequest(data []byte) (*HeartbeatRequest, error) {
if len(data) < 8 {
return nil, fmt.Errorf("request too short")
}
offset := 0
// GroupID (string)
groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if offset+groupIDLength > len(data) {
return nil, fmt.Errorf("invalid group ID length")
}
groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength
// Generation ID (4 bytes)
if offset+4 > len(data) {
return nil, fmt.Errorf("missing generation ID")
}
generationID := int32(binary.BigEndian.Uint32(data[offset:]))
offset += 4
// MemberID (string)
if offset+2 > len(data) {
return nil, fmt.Errorf("missing member ID length")
}
memberIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if offset+memberIDLength > len(data) {
return nil, fmt.Errorf("invalid member ID length")
}
memberID := string(data[offset : offset+memberIDLength])
offset += memberIDLength
return &HeartbeatRequest{
GroupID: groupID,
GenerationID: generationID,
MemberID: memberID,
GroupInstanceID: "", // Simplified - would parse from remaining data
}, nil
}
func (h *Handler) parseLeaveGroupRequest(data []byte) (*LeaveGroupRequest, error) {
if len(data) < 4 {
return nil, fmt.Errorf("request too short")
}
offset := 0
// GroupID (string)
groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if offset+groupIDLength > len(data) {
return nil, fmt.Errorf("invalid group ID length")
}
groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength
// MemberID (string)
if offset+2 > len(data) {
return nil, fmt.Errorf("missing member ID length")
}
memberIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if offset+memberIDLength > len(data) {
return nil, fmt.Errorf("invalid member ID length")
}
memberID := string(data[offset : offset+memberIDLength])
offset += memberIDLength
return &LeaveGroupRequest{
GroupID: groupID,
MemberID: memberID,
GroupInstanceID: "", // Simplified - would parse from remaining data
Members: []LeaveGroupMember{}, // Would parse members array for batch operations
}, nil
}
func (h *Handler) buildHeartbeatResponse(response HeartbeatResponse) []byte {
result := make([]byte, 0, 12)
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...)
// Error code (2 bytes)
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
result = append(result, errorCodeBytes...)
// Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0)
return result
}
func (h *Handler) buildLeaveGroupResponse(response LeaveGroupResponse) []byte {
estimatedSize := 16
for _, member := range response.Members {
estimatedSize += len(member.MemberID) + len(member.GroupInstanceID) + 8
}
result := make([]byte, 0, estimatedSize)
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...)
// Error code (2 bytes)
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
result = append(result, errorCodeBytes...)
// Members array length (4 bytes)
membersLengthBytes := make([]byte, 4)
binary.BigEndian.PutUint32(membersLengthBytes, uint32(len(response.Members)))
result = append(result, membersLengthBytes...)
// Members
for _, member := range response.Members {
// Member ID length (2 bytes)
memberIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(memberIDLength, uint16(len(member.MemberID)))
result = append(result, memberIDLength...)
// Member ID
result = append(result, []byte(member.MemberID)...)
// Group instance ID length (2 bytes)
instanceIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID)))
result = append(result, instanceIDLength...)
// Group instance ID
if len(member.GroupInstanceID) > 0 {
result = append(result, []byte(member.GroupInstanceID)...)
}
// Error code (2 bytes)
memberErrorBytes := make([]byte, 2)
binary.BigEndian.PutUint16(memberErrorBytes, uint16(member.ErrorCode))
result = append(result, memberErrorBytes...)
}
// Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0)
return result
}
func (h *Handler) buildHeartbeatErrorResponse(correlationID uint32, errorCode int16) []byte {
response := HeartbeatResponse{
CorrelationID: correlationID,
ErrorCode: errorCode,
}
return h.buildHeartbeatResponse(response)
}
func (h *Handler) buildLeaveGroupErrorResponse(correlationID uint32, errorCode int16) []byte {
response := LeaveGroupResponse{
CorrelationID: correlationID,
ErrorCode: errorCode,
Members: []LeaveGroupMemberResponse{},
}
return h.buildLeaveGroupResponse(response)
}
func (h *Handler) updateGroupSubscriptionFromMembers(group *consumer.ConsumerGroup) {
// Update group's subscribed topics from remaining members
group.SubscribedTopics = make(map[string]bool)
for _, member := range group.Members {
for _, topic := range member.Subscription {
group.SubscribedTopics[topic] = true
}
}
}