From 6eafc8741344b8abb7cfd8f8b15ab8a427368e8a Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 13 Sep 2025 08:47:46 -0700 Subject: [PATCH] fix remaining tests --- .../kafka/protocol/consumer_coordination.go | 114 +++++++++--------- weed/mq/kafka/protocol/fetch.go | 13 +- 2 files changed, 68 insertions(+), 59 deletions(-) diff --git a/weed/mq/kafka/protocol/consumer_coordination.go b/weed/mq/kafka/protocol/consumer_coordination.go index baac1e8bf..8856387d1 100644 --- a/weed/mq/kafka/protocol/consumer_coordination.go +++ b/weed/mq/kafka/protocol/consumer_coordination.go @@ -4,7 +4,7 @@ import ( "encoding/binary" "fmt" "time" - + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" ) @@ -13,9 +13,9 @@ import ( // HeartbeatRequest represents a Heartbeat request from a Kafka client type HeartbeatRequest struct { - GroupID string - GenerationID int32 - MemberID string + GroupID string + GenerationID int32 + MemberID string GroupInstanceID string // Optional static membership ID } @@ -30,10 +30,10 @@ type HeartbeatResponse struct { // LeaveGroupRequest represents a LeaveGroup request from a Kafka client type LeaveGroupRequest struct { - GroupID string - MemberID string - GroupInstanceID string // Optional static membership ID - Members []LeaveGroupMember // For newer versions, can leave multiple members + GroupID string + MemberID string + GroupInstanceID string // Optional static membership ID + Members []LeaveGroupMember // For newer versions, can leave multiple members } // LeaveGroupMember represents a member leaving the group (for batch departures) @@ -69,38 +69,38 @@ func (h *Handler) handleHeartbeat(correlationID uint32, requestBody []byte) ([]b if err != nil { return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + // Validate request if request.GroupID == "" || request.MemberID == "" { return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + // Get consumer group group := h.groupCoordinator.GetGroup(request.GroupID) if group == nil { return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + group.Mu.Lock() defer group.Mu.Unlock() - + // Update group's last activity group.LastActivity = time.Now() - + // Validate member exists member, exists := group.Members[request.MemberID] if !exists { return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil } - + // Validate generation if request.GenerationID != group.Generation { return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeIllegalGeneration), nil } - + // Update member's last heartbeat member.LastHeartbeat = time.Now() - + // Check if rebalancing is in progress var errorCode int16 = ErrorCodeNone switch group.State { @@ -116,13 +116,13 @@ func (h *Handler) handleHeartbeat(correlationID uint32, requestBody []byte) ([]b // Normal case - heartbeat accepted errorCode = ErrorCodeNone } - + // Build successful response response := HeartbeatResponse{ CorrelationID: correlationID, ErrorCode: errorCode, } - + return h.buildHeartbeatResponse(response), nil } @@ -132,33 +132,33 @@ func (h *Handler) handleLeaveGroup(correlationID uint32, requestBody []byte) ([] if err != nil { return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + // Validate request if request.GroupID == "" || request.MemberID == "" { return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + // Get consumer group group := h.groupCoordinator.GetGroup(request.GroupID) if group == nil { return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + group.Mu.Lock() defer group.Mu.Unlock() - + // Update group's last activity group.LastActivity = time.Now() - + // Validate member exists _, exists := group.Members[request.MemberID] if !exists { return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil } - + // Remove the member from the group delete(group.Members, request.MemberID) - + // Update group state based on remaining members if len(group.Members) == 0 { // Group becomes empty @@ -169,7 +169,7 @@ func (h *Handler) handleLeaveGroup(correlationID uint32, requestBody []byte) ([] // Trigger rebalancing for remaining members group.State = consumer.GroupStatePreparingRebalance group.Generation++ - + // If the leaving member was the leader, select a new leader if group.Leader == request.MemberID { // Select first remaining member as new leader @@ -178,16 +178,16 @@ func (h *Handler) handleLeaveGroup(correlationID uint32, requestBody []byte) ([] break } } - + // Mark remaining members as pending to trigger rebalancing for _, member := range group.Members { member.State = consumer.MemberStatePending } } - + // Update group's subscribed topics (may have changed with member leaving) h.updateGroupSubscriptionFromMembers(group) - + // Build successful response response := LeaveGroupResponse{ CorrelationID: correlationID, @@ -200,7 +200,7 @@ func (h *Handler) handleLeaveGroup(correlationID uint32, requestBody []byte) ([] }, }, } - + return h.buildLeaveGroupResponse(response), nil } @@ -208,9 +208,9 @@ func (h *Handler) parseHeartbeatRequest(data []byte) (*HeartbeatRequest, error) if len(data) < 8 { return nil, fmt.Errorf("request too short") } - + offset := 0 - + // GroupID (string) groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 @@ -219,14 +219,14 @@ func (h *Handler) parseHeartbeatRequest(data []byte) (*HeartbeatRequest, error) } groupID := string(data[offset : offset+groupIDLength]) offset += groupIDLength - + // Generation ID (4 bytes) if offset+4 > len(data) { return nil, fmt.Errorf("missing generation ID") } generationID := int32(binary.BigEndian.Uint32(data[offset:])) offset += 4 - + // MemberID (string) if offset+2 > len(data) { return nil, fmt.Errorf("missing member ID length") @@ -238,7 +238,7 @@ func (h *Handler) parseHeartbeatRequest(data []byte) (*HeartbeatRequest, error) } memberID := string(data[offset : offset+memberIDLength]) offset += memberIDLength - + return &HeartbeatRequest{ GroupID: groupID, GenerationID: generationID, @@ -251,9 +251,9 @@ func (h *Handler) parseLeaveGroupRequest(data []byte) (*LeaveGroupRequest, error if len(data) < 4 { return nil, fmt.Errorf("request too short") } - + offset := 0 - + // GroupID (string) groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 @@ -262,7 +262,7 @@ func (h *Handler) parseLeaveGroupRequest(data []byte) (*LeaveGroupRequest, error } groupID := string(data[offset : offset+groupIDLength]) offset += groupIDLength - + // MemberID (string) if offset+2 > len(data) { return nil, fmt.Errorf("missing member ID length") @@ -274,31 +274,31 @@ func (h *Handler) parseLeaveGroupRequest(data []byte) (*LeaveGroupRequest, error } memberID := string(data[offset : offset+memberIDLength]) offset += memberIDLength - + return &LeaveGroupRequest{ GroupID: groupID, MemberID: memberID, - GroupInstanceID: "", // Simplified - would parse from remaining data + GroupInstanceID: "", // Simplified - would parse from remaining data Members: []LeaveGroupMember{}, // Would parse members array for batch operations }, nil } func (h *Handler) buildHeartbeatResponse(response HeartbeatResponse) []byte { result := make([]byte, 0, 12) - + // Correlation ID (4 bytes) correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) result = append(result, correlationIDBytes...) - + // Error code (2 bytes) errorCodeBytes := make([]byte, 2) binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode)) result = append(result, errorCodeBytes...) - + // Throttle time (4 bytes, 0 = no throttling) result = append(result, 0, 0, 0, 0) - + return result } @@ -307,53 +307,53 @@ func (h *Handler) buildLeaveGroupResponse(response LeaveGroupResponse) []byte { for _, member := range response.Members { estimatedSize += len(member.MemberID) + len(member.GroupInstanceID) + 8 } - + result := make([]byte, 0, estimatedSize) - + // Correlation ID (4 bytes) correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) result = append(result, correlationIDBytes...) - + // Error code (2 bytes) errorCodeBytes := make([]byte, 2) binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode)) result = append(result, errorCodeBytes...) - + // Members array length (4 bytes) membersLengthBytes := make([]byte, 4) binary.BigEndian.PutUint32(membersLengthBytes, uint32(len(response.Members))) result = append(result, membersLengthBytes...) - + // Members for _, member := range response.Members { // Member ID length (2 bytes) memberIDLength := make([]byte, 2) binary.BigEndian.PutUint16(memberIDLength, uint16(len(member.MemberID))) result = append(result, memberIDLength...) - + // Member ID result = append(result, []byte(member.MemberID)...) - + // Group instance ID length (2 bytes) instanceIDLength := make([]byte, 2) binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID))) result = append(result, instanceIDLength...) - + // Group instance ID if len(member.GroupInstanceID) > 0 { result = append(result, []byte(member.GroupInstanceID)...) } - + // Error code (2 bytes) memberErrorBytes := make([]byte, 2) binary.BigEndian.PutUint16(memberErrorBytes, uint16(member.ErrorCode)) result = append(result, memberErrorBytes...) } - + // Throttle time (4 bytes, 0 = no throttling) result = append(result, 0, 0, 0, 0) - + return result } @@ -362,7 +362,7 @@ func (h *Handler) buildHeartbeatErrorResponse(correlationID uint32, errorCode in CorrelationID: correlationID, ErrorCode: errorCode, } - + return h.buildHeartbeatResponse(response) } @@ -372,7 +372,7 @@ func (h *Handler) buildLeaveGroupErrorResponse(correlationID uint32, errorCode i ErrorCode: errorCode, Members: []LeaveGroupMemberResponse{}, } - + return h.buildLeaveGroupResponse(response) } diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 31325e2ad..c3240310a 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -344,8 +344,9 @@ func (h *Handler) constructRecordBatchFromLedger(ledger interface{}, fetchOffset batch = append(batch, 0, 0, 0, 0) // partition leader epoch (4 bytes) batch = append(batch, 2) // magic byte (version 2) (1 byte) - // CRC placeholder (4 bytes) - for testing, use 0 - batch = append(batch, 0, 0, 0, 0) // CRC32 + // CRC placeholder (4 bytes) - will be calculated at the end + crcPos := len(batch) + batch = append(batch, 0, 0, 0, 0) // CRC32 placeholder // Batch attributes (2 bytes) - no compression, no transactional batch = append(batch, 0, 0) // attributes @@ -449,6 +450,14 @@ func (h *Handler) constructRecordBatchFromLedger(ledger interface{}, fetchOffset batchLength := uint32(len(batch) - batchLengthPos - 4) binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], batchLength) + // Calculate CRC32 for the batch + // CRC is calculated over: attributes + last_offset_delta + first_timestamp + max_timestamp + producer_id + producer_epoch + base_sequence + records_count + records + // This starts after the CRC field (which comes after magic byte) + crcStartPos := crcPos + 4 // start after the CRC field + crcData := batch[crcStartPos:] + crc := crc32.ChecksumIEEE(crcData) + binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc) + return batch }