Browse Source

fix remaining tests

pull/7231/head
chrislu 2 months ago
parent
commit
6eafc87413
  1. 114
      weed/mq/kafka/protocol/consumer_coordination.go
  2. 13
      weed/mq/kafka/protocol/fetch.go

114
weed/mq/kafka/protocol/consumer_coordination.go

@ -4,7 +4,7 @@ import (
"encoding/binary"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
@ -13,9 +13,9 @@ import (
// HeartbeatRequest represents a Heartbeat request from a Kafka client
type HeartbeatRequest struct {
GroupID string
GenerationID int32
MemberID string
GroupID string
GenerationID int32
MemberID string
GroupInstanceID string // Optional static membership ID
}
@ -30,10 +30,10 @@ type HeartbeatResponse struct {
// LeaveGroupRequest represents a LeaveGroup request from a Kafka client
type LeaveGroupRequest struct {
GroupID string
MemberID string
GroupInstanceID string // Optional static membership ID
Members []LeaveGroupMember // For newer versions, can leave multiple members
GroupID string
MemberID string
GroupInstanceID string // Optional static membership ID
Members []LeaveGroupMember // For newer versions, can leave multiple members
}
// LeaveGroupMember represents a member leaving the group (for batch departures)
@ -69,38 +69,38 @@ func (h *Handler) handleHeartbeat(correlationID uint32, requestBody []byte) ([]b
if err != nil {
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
// Validate request
if request.GroupID == "" || request.MemberID == "" {
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
// Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil {
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
group.Mu.Lock()
defer group.Mu.Unlock()
// Update group's last activity
group.LastActivity = time.Now()
// Validate member exists
member, exists := group.Members[request.MemberID]
if !exists {
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil
}
// Validate generation
if request.GenerationID != group.Generation {
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeIllegalGeneration), nil
}
// Update member's last heartbeat
member.LastHeartbeat = time.Now()
// Check if rebalancing is in progress
var errorCode int16 = ErrorCodeNone
switch group.State {
@ -116,13 +116,13 @@ func (h *Handler) handleHeartbeat(correlationID uint32, requestBody []byte) ([]b
// Normal case - heartbeat accepted
errorCode = ErrorCodeNone
}
// Build successful response
response := HeartbeatResponse{
CorrelationID: correlationID,
ErrorCode: errorCode,
}
return h.buildHeartbeatResponse(response), nil
}
@ -132,33 +132,33 @@ func (h *Handler) handleLeaveGroup(correlationID uint32, requestBody []byte) ([]
if err != nil {
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
// Validate request
if request.GroupID == "" || request.MemberID == "" {
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
// Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil {
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
group.Mu.Lock()
defer group.Mu.Unlock()
// Update group's last activity
group.LastActivity = time.Now()
// Validate member exists
_, exists := group.Members[request.MemberID]
if !exists {
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil
}
// Remove the member from the group
delete(group.Members, request.MemberID)
// Update group state based on remaining members
if len(group.Members) == 0 {
// Group becomes empty
@ -169,7 +169,7 @@ func (h *Handler) handleLeaveGroup(correlationID uint32, requestBody []byte) ([]
// Trigger rebalancing for remaining members
group.State = consumer.GroupStatePreparingRebalance
group.Generation++
// If the leaving member was the leader, select a new leader
if group.Leader == request.MemberID {
// Select first remaining member as new leader
@ -178,16 +178,16 @@ func (h *Handler) handleLeaveGroup(correlationID uint32, requestBody []byte) ([]
break
}
}
// Mark remaining members as pending to trigger rebalancing
for _, member := range group.Members {
member.State = consumer.MemberStatePending
}
}
// Update group's subscribed topics (may have changed with member leaving)
h.updateGroupSubscriptionFromMembers(group)
// Build successful response
response := LeaveGroupResponse{
CorrelationID: correlationID,
@ -200,7 +200,7 @@ func (h *Handler) handleLeaveGroup(correlationID uint32, requestBody []byte) ([]
},
},
}
return h.buildLeaveGroupResponse(response), nil
}
@ -208,9 +208,9 @@ func (h *Handler) parseHeartbeatRequest(data []byte) (*HeartbeatRequest, error)
if len(data) < 8 {
return nil, fmt.Errorf("request too short")
}
offset := 0
// GroupID (string)
groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
@ -219,14 +219,14 @@ func (h *Handler) parseHeartbeatRequest(data []byte) (*HeartbeatRequest, error)
}
groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength
// Generation ID (4 bytes)
if offset+4 > len(data) {
return nil, fmt.Errorf("missing generation ID")
}
generationID := int32(binary.BigEndian.Uint32(data[offset:]))
offset += 4
// MemberID (string)
if offset+2 > len(data) {
return nil, fmt.Errorf("missing member ID length")
@ -238,7 +238,7 @@ func (h *Handler) parseHeartbeatRequest(data []byte) (*HeartbeatRequest, error)
}
memberID := string(data[offset : offset+memberIDLength])
offset += memberIDLength
return &HeartbeatRequest{
GroupID: groupID,
GenerationID: generationID,
@ -251,9 +251,9 @@ func (h *Handler) parseLeaveGroupRequest(data []byte) (*LeaveGroupRequest, error
if len(data) < 4 {
return nil, fmt.Errorf("request too short")
}
offset := 0
// GroupID (string)
groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
@ -262,7 +262,7 @@ func (h *Handler) parseLeaveGroupRequest(data []byte) (*LeaveGroupRequest, error
}
groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength
// MemberID (string)
if offset+2 > len(data) {
return nil, fmt.Errorf("missing member ID length")
@ -274,31 +274,31 @@ func (h *Handler) parseLeaveGroupRequest(data []byte) (*LeaveGroupRequest, error
}
memberID := string(data[offset : offset+memberIDLength])
offset += memberIDLength
return &LeaveGroupRequest{
GroupID: groupID,
MemberID: memberID,
GroupInstanceID: "", // Simplified - would parse from remaining data
GroupInstanceID: "", // Simplified - would parse from remaining data
Members: []LeaveGroupMember{}, // Would parse members array for batch operations
}, nil
}
func (h *Handler) buildHeartbeatResponse(response HeartbeatResponse) []byte {
result := make([]byte, 0, 12)
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...)
// Error code (2 bytes)
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
result = append(result, errorCodeBytes...)
// Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0)
return result
}
@ -307,53 +307,53 @@ func (h *Handler) buildLeaveGroupResponse(response LeaveGroupResponse) []byte {
for _, member := range response.Members {
estimatedSize += len(member.MemberID) + len(member.GroupInstanceID) + 8
}
result := make([]byte, 0, estimatedSize)
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...)
// Error code (2 bytes)
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
result = append(result, errorCodeBytes...)
// Members array length (4 bytes)
membersLengthBytes := make([]byte, 4)
binary.BigEndian.PutUint32(membersLengthBytes, uint32(len(response.Members)))
result = append(result, membersLengthBytes...)
// Members
for _, member := range response.Members {
// Member ID length (2 bytes)
memberIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(memberIDLength, uint16(len(member.MemberID)))
result = append(result, memberIDLength...)
// Member ID
result = append(result, []byte(member.MemberID)...)
// Group instance ID length (2 bytes)
instanceIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID)))
result = append(result, instanceIDLength...)
// Group instance ID
if len(member.GroupInstanceID) > 0 {
result = append(result, []byte(member.GroupInstanceID)...)
}
// Error code (2 bytes)
memberErrorBytes := make([]byte, 2)
binary.BigEndian.PutUint16(memberErrorBytes, uint16(member.ErrorCode))
result = append(result, memberErrorBytes...)
}
// Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0)
return result
}
@ -362,7 +362,7 @@ func (h *Handler) buildHeartbeatErrorResponse(correlationID uint32, errorCode in
CorrelationID: correlationID,
ErrorCode: errorCode,
}
return h.buildHeartbeatResponse(response)
}
@ -372,7 +372,7 @@ func (h *Handler) buildLeaveGroupErrorResponse(correlationID uint32, errorCode i
ErrorCode: errorCode,
Members: []LeaveGroupMemberResponse{},
}
return h.buildLeaveGroupResponse(response)
}

13
weed/mq/kafka/protocol/fetch.go

@ -344,8 +344,9 @@ func (h *Handler) constructRecordBatchFromLedger(ledger interface{}, fetchOffset
batch = append(batch, 0, 0, 0, 0) // partition leader epoch (4 bytes)
batch = append(batch, 2) // magic byte (version 2) (1 byte)
// CRC placeholder (4 bytes) - for testing, use 0
batch = append(batch, 0, 0, 0, 0) // CRC32
// CRC placeholder (4 bytes) - will be calculated at the end
crcPos := len(batch)
batch = append(batch, 0, 0, 0, 0) // CRC32 placeholder
// Batch attributes (2 bytes) - no compression, no transactional
batch = append(batch, 0, 0) // attributes
@ -449,6 +450,14 @@ func (h *Handler) constructRecordBatchFromLedger(ledger interface{}, fetchOffset
batchLength := uint32(len(batch) - batchLengthPos - 4)
binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], batchLength)
// Calculate CRC32 for the batch
// CRC is calculated over: attributes + last_offset_delta + first_timestamp + max_timestamp + producer_id + producer_epoch + base_sequence + records_count + records
// This starts after the CRC field (which comes after magic byte)
crcStartPos := crcPos + 4 // start after the CRC field
crcData := batch[crcStartPos:]
crc := crc32.ChecksumIEEE(crcData)
binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc)
return batch
}

Loading…
Cancel
Save