Browse Source

fmt

pull/7231/head
chrislu 4 months ago
parent
commit
baed1e156a
  1. 10
      test/kafka/metadata_comparison_test.go
  2. 6
      test/kafka/metadata_debug_test.go
  3. 48
      weed/mq/kafka/protocol/joingroup.go
  4. 220
      weed/mq/kafka/protocol/offset_management.go

10
test/kafka/metadata_comparison_test.go

@ -22,7 +22,7 @@ func TestMetadataResponseComparison(t *testing.T) {
host, port := gatewayServer.GetListenerAddr() host, port := gatewayServer.GetListenerAddr()
addr := fmt.Sprintf("%s:%d", host, port) addr := fmt.Sprintf("%s:%d", host, port)
// Add the same topic for both tests // Add the same topic for both tests
topic := "comparison-topic" topic := "comparison-topic"
gatewayServer.GetHandler().AddTopicForTesting(topic, 1) gatewayServer.GetHandler().AddTopicForTesting(topic, 1)
@ -30,17 +30,17 @@ func TestMetadataResponseComparison(t *testing.T) {
t.Logf("=== COMPARISON TEST ===") t.Logf("=== COMPARISON TEST ===")
t.Logf("Gateway: %s", addr) t.Logf("Gateway: %s", addr)
t.Logf("Topic: %s", topic) t.Logf("Topic: %s", topic)
// The key insight: Both Sarama and kafka-go should get the SAME metadata response // The key insight: Both Sarama and kafka-go should get the SAME metadata response
// But Sarama works and kafka-go doesn't - this suggests kafka-go has stricter validation // But Sarama works and kafka-go doesn't - this suggests kafka-go has stricter validation
// Let's examine what our current Metadata v4 response looks like // Let's examine what our current Metadata v4 response looks like
t.Logf("Run Sarama test and kafka-go test separately to compare logs") t.Logf("Run Sarama test and kafka-go test separately to compare logs")
t.Logf("Look for differences in:") t.Logf("Look for differences in:")
t.Logf("1. Response byte counts") t.Logf("1. Response byte counts")
t.Logf("2. Broker ID consistency")
t.Logf("2. Broker ID consistency")
t.Logf("3. Partition leader/ISR values") t.Logf("3. Partition leader/ISR values")
t.Logf("4. Error codes") t.Logf("4. Error codes")
// This test is just for documentation - the real comparison happens in logs // This test is just for documentation - the real comparison happens in logs
} }

6
test/kafka/metadata_debug_test.go

@ -5,8 +5,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/segmentio/kafka-go"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway"
"github.com/segmentio/kafka-go"
) )
func TestMetadataV6Debug(t *testing.T) { func TestMetadataV6Debug(t *testing.T) {
@ -41,7 +41,7 @@ func TestMetadataV6Debug(t *testing.T) {
t.Logf("Successfully read %d partitions for topic %s", len(partitions), topic) t.Logf("Successfully read %d partitions for topic %s", len(partitions), topic)
for _, p := range partitions { for _, p := range partitions {
t.Logf("Partition %d: Leader=%d, Replicas=%v, ISR=%v",
t.Logf("Partition %d: Leader=%d, Replicas=%v, ISR=%v",
p.ID, p.Leader.ID, p.Replicas, p.Isr) p.ID, p.Leader.ID, p.Replicas, p.Isr)
} }
}
}

48
weed/mq/kafka/protocol/joingroup.go

@ -322,25 +322,25 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
} }
protocolTypeLength := binary.BigEndian.Uint16(data[offset : offset+2]) protocolTypeLength := binary.BigEndian.Uint16(data[offset : offset+2])
offset += 2 offset += 2
if len(data) < offset+int(protocolTypeLength) { if len(data) < offset+int(protocolTypeLength) {
return nil, fmt.Errorf("JoinGroup request protocol type too short") return nil, fmt.Errorf("JoinGroup request protocol type too short")
} }
protocolType := string(data[offset : offset+int(protocolTypeLength)]) protocolType := string(data[offset : offset+int(protocolTypeLength)])
offset += int(protocolTypeLength) offset += int(protocolTypeLength)
// Parse Group Protocols array // Parse Group Protocols array
if len(data) < offset+4 { if len(data) < offset+4 {
return nil, fmt.Errorf("JoinGroup request missing group protocols") return nil, fmt.Errorf("JoinGroup request missing group protocols")
} }
protocolsCount := binary.BigEndian.Uint32(data[offset : offset+4]) protocolsCount := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4 offset += 4
fmt.Printf("DEBUG: JoinGroup - GroupID: %s, SessionTimeout: %d, RebalanceTimeout: %d, MemberID: %s, ProtocolType: %s, ProtocolsCount: %d\n", fmt.Printf("DEBUG: JoinGroup - GroupID: %s, SessionTimeout: %d, RebalanceTimeout: %d, MemberID: %s, ProtocolType: %s, ProtocolsCount: %d\n",
groupID, sessionTimeout, rebalanceTimeout, memberID, protocolType, protocolsCount) groupID, sessionTimeout, rebalanceTimeout, memberID, protocolType, protocolsCount)
protocols := make([]GroupProtocol, 0, protocolsCount) protocols := make([]GroupProtocol, 0, protocolsCount)
for i := uint32(0); i < protocolsCount && offset < len(data); i++ { for i := uint32(0); i < protocolsCount && offset < len(data); i++ {
// Parse protocol name // Parse protocol name
if len(data) < offset+2 { if len(data) < offset+2 {
@ -348,48 +348,48 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
} }
protocolNameLength := binary.BigEndian.Uint16(data[offset : offset+2]) protocolNameLength := binary.BigEndian.Uint16(data[offset : offset+2])
offset += 2 offset += 2
if len(data) < offset+int(protocolNameLength) { if len(data) < offset+int(protocolNameLength) {
break break
} }
protocolName := string(data[offset : offset+int(protocolNameLength)]) protocolName := string(data[offset : offset+int(protocolNameLength)])
offset += int(protocolNameLength) offset += int(protocolNameLength)
// Parse protocol metadata // Parse protocol metadata
if len(data) < offset+4 { if len(data) < offset+4 {
break break
} }
metadataLength := binary.BigEndian.Uint32(data[offset : offset+4]) metadataLength := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4 offset += 4
var metadata []byte var metadata []byte
if metadataLength > 0 && len(data) >= offset+int(metadataLength) { if metadataLength > 0 && len(data) >= offset+int(metadataLength) {
metadata = make([]byte, metadataLength) metadata = make([]byte, metadataLength)
copy(metadata, data[offset:offset+int(metadataLength)]) copy(metadata, data[offset:offset+int(metadataLength)])
offset += int(metadataLength) offset += int(metadataLength)
} }
protocols = append(protocols, GroupProtocol{ protocols = append(protocols, GroupProtocol{
Name: protocolName, Name: protocolName,
Metadata: metadata, Metadata: metadata,
}) })
fmt.Printf("DEBUG: JoinGroup - Protocol: %s, MetadataLength: %d\n", protocolName, metadataLength) fmt.Printf("DEBUG: JoinGroup - Protocol: %s, MetadataLength: %d\n", protocolName, metadataLength)
} }
// Parse Group Instance ID (nullable string) - for static membership (Kafka 2.3+) // Parse Group Instance ID (nullable string) - for static membership (Kafka 2.3+)
var groupInstanceID string var groupInstanceID string
if len(data) >= offset+2 { if len(data) >= offset+2 {
instanceIDLength := int16(binary.BigEndian.Uint16(data[offset : offset+2])) instanceIDLength := int16(binary.BigEndian.Uint16(data[offset : offset+2]))
offset += 2 offset += 2
if instanceIDLength == -1 { if instanceIDLength == -1 {
groupInstanceID = "" // null string groupInstanceID = "" // null string
} else if instanceIDLength >= 0 && len(data) >= offset+int(instanceIDLength) { } else if instanceIDLength >= 0 && len(data) >= offset+int(instanceIDLength) {
groupInstanceID = string(data[offset : offset+int(instanceIDLength)]) groupInstanceID = string(data[offset : offset+int(instanceIDLength)])
offset += int(instanceIDLength) offset += int(instanceIDLength)
} }
if groupInstanceID != "" { if groupInstanceID != "" {
fmt.Printf("DEBUG: JoinGroup - GroupInstanceID: %s\n", groupInstanceID) fmt.Printf("DEBUG: JoinGroup - GroupInstanceID: %s\n", groupInstanceID)
} }
@ -572,7 +572,7 @@ func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) []
// - Version (2 bytes) // - Version (2 bytes)
// - Topics array (4 bytes count + topic names) // - Topics array (4 bytes count + topic names)
// - User data (4 bytes length + data) // - User data (4 bytes length + data)
for _, protocol := range protocols { for _, protocol := range protocols {
if protocol.Name == "range" || protocol.Name == "roundrobin" || protocol.Name == "sticky" { if protocol.Name == "range" || protocol.Name == "roundrobin" || protocol.Name == "sticky" {
topics := h.parseConsumerProtocolMetadata(protocol.Metadata) topics := h.parseConsumerProtocolMetadata(protocol.Metadata)
@ -582,7 +582,7 @@ func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) []
} }
} }
} }
// Fallback to default if parsing fails // Fallback to default if parsing fails
fmt.Printf("DEBUG: Failed to extract subscription, using fallback topic\n") fmt.Printf("DEBUG: Failed to extract subscription, using fallback topic\n")
return []string{"test-topic"} return []string{"test-topic"}
@ -592,24 +592,24 @@ func (h *Handler) parseConsumerProtocolMetadata(metadata []byte) []string {
if len(metadata) < 6 { // version(2) + topics_count(4) if len(metadata) < 6 { // version(2) + topics_count(4)
return nil return nil
} }
offset := 0 offset := 0
// Parse version (2 bytes) // Parse version (2 bytes)
version := binary.BigEndian.Uint16(metadata[offset : offset+2]) version := binary.BigEndian.Uint16(metadata[offset : offset+2])
offset += 2 offset += 2
// Parse topics array // Parse topics array
if len(metadata) < offset+4 { if len(metadata) < offset+4 {
return nil return nil
} }
topicsCount := binary.BigEndian.Uint32(metadata[offset : offset+4]) topicsCount := binary.BigEndian.Uint32(metadata[offset : offset+4])
offset += 4 offset += 4
fmt.Printf("DEBUG: Consumer protocol metadata - Version: %d, TopicsCount: %d\n", version, topicsCount) fmt.Printf("DEBUG: Consumer protocol metadata - Version: %d, TopicsCount: %d\n", version, topicsCount)
topics := make([]string, 0, topicsCount) topics := make([]string, 0, topicsCount)
for i := uint32(0); i < topicsCount && offset < len(metadata); i++ { for i := uint32(0); i < topicsCount && offset < len(metadata); i++ {
// Parse topic name // Parse topic name
if len(metadata) < offset+2 { if len(metadata) < offset+2 {
@ -617,17 +617,17 @@ func (h *Handler) parseConsumerProtocolMetadata(metadata []byte) []string {
} }
topicNameLength := binary.BigEndian.Uint16(metadata[offset : offset+2]) topicNameLength := binary.BigEndian.Uint16(metadata[offset : offset+2])
offset += 2 offset += 2
if len(metadata) < offset+int(topicNameLength) { if len(metadata) < offset+int(topicNameLength) {
break break
} }
topicName := string(metadata[offset : offset+int(topicNameLength)]) topicName := string(metadata[offset : offset+int(topicNameLength)])
offset += int(topicNameLength) offset += int(topicNameLength)
topics = append(topics, topicName) topics = append(topics, topicName)
fmt.Printf("DEBUG: Consumer subscribed to topic: %s\n", topicName) fmt.Printf("DEBUG: Consumer subscribed to topic: %s\n", topicName)
} }
return topics return topics
} }

220
weed/mq/kafka/protocol/offset_management.go

@ -4,7 +4,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
) )
@ -13,12 +13,12 @@ import (
// OffsetCommitRequest represents an OffsetCommit request from a Kafka client // OffsetCommitRequest represents an OffsetCommit request from a Kafka client
type OffsetCommitRequest struct { type OffsetCommitRequest struct {
GroupID string
GenerationID int32
MemberID string
GroupInstanceID string // Optional static membership ID
RetentionTime int64 // Offset retention time (-1 for broker default)
Topics []OffsetCommitTopic
GroupID string
GenerationID int32
MemberID string
GroupInstanceID string // Optional static membership ID
RetentionTime int64 // Offset retention time (-1 for broker default)
Topics []OffsetCommitTopic
} }
// OffsetCommitTopic represents topic-level offset commit data // OffsetCommitTopic represents topic-level offset commit data
@ -29,10 +29,10 @@ type OffsetCommitTopic struct {
// OffsetCommitPartition represents partition-level offset commit data // OffsetCommitPartition represents partition-level offset commit data
type OffsetCommitPartition struct { type OffsetCommitPartition struct {
Index int32 // Partition index
Offset int64 // Offset to commit
LeaderEpoch int32 // Leader epoch (-1 if not available)
Metadata string // Optional metadata
Index int32 // Partition index
Offset int64 // Offset to commit
LeaderEpoch int32 // Leader epoch (-1 if not available)
Metadata string // Optional metadata
} }
// OffsetCommitResponse represents an OffsetCommit response to a Kafka client // OffsetCommitResponse represents an OffsetCommit response to a Kafka client
@ -61,7 +61,7 @@ type OffsetFetchRequest struct {
GroupID string GroupID string
GroupInstanceID string // Optional static membership ID GroupInstanceID string // Optional static membership ID
Topics []OffsetFetchTopic Topics []OffsetFetchTopic
RequireStable bool // Only fetch stable offsets
RequireStable bool // Only fetch stable offsets
} }
// OffsetFetchTopic represents topic-level offset fetch data // OffsetFetchTopic represents topic-level offset fetch data
@ -94,10 +94,10 @@ type OffsetFetchPartitionResponse struct {
// Error codes specific to offset management // Error codes specific to offset management
const ( const (
ErrorCodeInvalidCommitOffsetSize int16 = 28
ErrorCodeOffsetMetadataTooLarge int16 = 12
ErrorCodeOffsetLoadInProgress int16 = 14
ErrorCodeNotCoordinatorForGroup int16 = 16
ErrorCodeInvalidCommitOffsetSize int16 = 28
ErrorCodeOffsetMetadataTooLarge int16 = 12
ErrorCodeOffsetLoadInProgress int16 = 14
ErrorCodeNotCoordinatorForGroup int16 = 16
ErrorCodeGroupAuthorizationFailed int16 = 30 ErrorCodeGroupAuthorizationFailed int16 = 30
) )
@ -107,51 +107,51 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, requestBody []byte) (
if err != nil { if err != nil {
return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidCommitOffsetSize), nil return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidCommitOffsetSize), nil
} }
// Validate request // Validate request
if request.GroupID == "" || request.MemberID == "" { if request.GroupID == "" || request.MemberID == "" {
return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
} }
// Get consumer group // Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID) group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil { if group == nil {
return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
} }
group.Mu.Lock() group.Mu.Lock()
defer group.Mu.Unlock() defer group.Mu.Unlock()
// Update group's last activity // Update group's last activity
group.LastActivity = time.Now() group.LastActivity = time.Now()
// Validate member exists and is in stable state // Validate member exists and is in stable state
member, exists := group.Members[request.MemberID] member, exists := group.Members[request.MemberID]
if !exists { if !exists {
return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil
} }
if member.State != consumer.MemberStateStable { if member.State != consumer.MemberStateStable {
return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeRebalanceInProgress), nil return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeRebalanceInProgress), nil
} }
// Validate generation // Validate generation
if request.GenerationID != group.Generation { if request.GenerationID != group.Generation {
return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeIllegalGeneration), nil return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeIllegalGeneration), nil
} }
// Process offset commits // Process offset commits
response := OffsetCommitResponse{ response := OffsetCommitResponse{
CorrelationID: correlationID, CorrelationID: correlationID,
Topics: make([]OffsetCommitTopicResponse, 0, len(request.Topics)), Topics: make([]OffsetCommitTopicResponse, 0, len(request.Topics)),
} }
for _, topic := range request.Topics { for _, topic := range request.Topics {
topicResponse := OffsetCommitTopicResponse{ topicResponse := OffsetCommitTopicResponse{
Name: topic.Name, Name: topic.Name,
Partitions: make([]OffsetCommitPartitionResponse, 0, len(topic.Partitions)), Partitions: make([]OffsetCommitPartitionResponse, 0, len(topic.Partitions)),
} }
for _, partition := range topic.Partitions { for _, partition := range topic.Partitions {
// Validate partition assignment - consumer should only commit offsets for assigned partitions // Validate partition assignment - consumer should only commit offsets for assigned partitions
assigned := false assigned := false
@ -161,7 +161,7 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, requestBody []byte) (
break break
} }
} }
var errorCode int16 = ErrorCodeNone var errorCode int16 = ErrorCodeNone
if !assigned && group.State == consumer.GroupStateStable { if !assigned && group.State == consumer.GroupStateStable {
// Allow commits during rebalancing, but restrict during stable state // Allow commits during rebalancing, but restrict during stable state
@ -173,17 +173,17 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, requestBody []byte) (
errorCode = ErrorCodeOffsetMetadataTooLarge // Generic error errorCode = ErrorCodeOffsetMetadataTooLarge // Generic error
} }
} }
partitionResponse := OffsetCommitPartitionResponse{ partitionResponse := OffsetCommitPartitionResponse{
Index: partition.Index, Index: partition.Index,
ErrorCode: errorCode, ErrorCode: errorCode,
} }
topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse) topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse)
} }
response.Topics = append(response.Topics, topicResponse) response.Topics = append(response.Topics, topicResponse)
} }
return h.buildOffsetCommitResponse(response), nil return h.buildOffsetCommitResponse(response), nil
} }
@ -193,34 +193,34 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, requestBody []byte) ([
if err != nil { if err != nil {
return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
} }
// Validate request // Validate request
if request.GroupID == "" { if request.GroupID == "" {
return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
} }
// Get consumer group // Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID) group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil { if group == nil {
return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
} }
group.Mu.RLock() group.Mu.RLock()
defer group.Mu.RUnlock() defer group.Mu.RUnlock()
// Build response // Build response
response := OffsetFetchResponse{ response := OffsetFetchResponse{
CorrelationID: correlationID, CorrelationID: correlationID,
Topics: make([]OffsetFetchTopicResponse, 0, len(request.Topics)), Topics: make([]OffsetFetchTopicResponse, 0, len(request.Topics)),
ErrorCode: ErrorCodeNone, ErrorCode: ErrorCodeNone,
} }
for _, topic := range request.Topics { for _, topic := range request.Topics {
topicResponse := OffsetFetchTopicResponse{ topicResponse := OffsetFetchTopicResponse{
Name: topic.Name, Name: topic.Name,
Partitions: make([]OffsetFetchPartitionResponse, 0), Partitions: make([]OffsetFetchPartitionResponse, 0),
} }
// If no partitions specified, fetch all partitions for the topic // If no partitions specified, fetch all partitions for the topic
partitionsToFetch := topic.Partitions partitionsToFetch := topic.Partitions
if len(partitionsToFetch) == 0 { if len(partitionsToFetch) == 0 {
@ -231,16 +231,16 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, requestBody []byte) ([
} }
} }
} }
// Fetch offsets for requested partitions // Fetch offsets for requested partitions
for _, partition := range partitionsToFetch { for _, partition := range partitionsToFetch {
offset, metadata, err := h.fetchOffset(group, topic.Name, partition) offset, metadata, err := h.fetchOffset(group, topic.Name, partition)
var errorCode int16 = ErrorCodeNone var errorCode int16 = ErrorCodeNone
if err != nil { if err != nil {
errorCode = ErrorCodeOffsetLoadInProgress // Generic error errorCode = ErrorCodeOffsetLoadInProgress // Generic error
} }
partitionResponse := OffsetFetchPartitionResponse{ partitionResponse := OffsetFetchPartitionResponse{
Index: partition, Index: partition,
Offset: offset, Offset: offset,
@ -250,10 +250,10 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, requestBody []byte) ([
} }
topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse) topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse)
} }
response.Topics = append(response.Topics, topicResponse) response.Topics = append(response.Topics, topicResponse)
} }
return h.buildOffsetFetchResponse(response), nil return h.buildOffsetFetchResponse(response), nil
} }
@ -261,9 +261,9 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e
if len(data) < 8 { if len(data) < 8 {
return nil, fmt.Errorf("request too short") return nil, fmt.Errorf("request too short")
} }
offset := 0 offset := 0
// GroupID (string) // GroupID (string)
groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2 offset += 2
@ -272,14 +272,14 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e
} }
groupID := string(data[offset : offset+groupIDLength]) groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength offset += groupIDLength
// Generation ID (4 bytes) // Generation ID (4 bytes)
if offset+4 > len(data) { if offset+4 > len(data) {
return nil, fmt.Errorf("missing generation ID") return nil, fmt.Errorf("missing generation ID")
} }
generationID := int32(binary.BigEndian.Uint32(data[offset:])) generationID := int32(binary.BigEndian.Uint32(data[offset:]))
offset += 4 offset += 4
// MemberID (string) // MemberID (string)
if offset+2 > len(data) { if offset+2 > len(data) {
return nil, fmt.Errorf("missing member ID length") return nil, fmt.Errorf("missing member ID length")
@ -291,26 +291,26 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e
} }
memberID := string(data[offset : offset+memberIDLength]) memberID := string(data[offset : offset+memberIDLength])
offset += memberIDLength offset += memberIDLength
// Parse RetentionTime (8 bytes, -1 for broker default) // Parse RetentionTime (8 bytes, -1 for broker default)
if len(data) < offset+8 { if len(data) < offset+8 {
return nil, fmt.Errorf("OffsetCommit request missing retention time") return nil, fmt.Errorf("OffsetCommit request missing retention time")
} }
retentionTime := int64(binary.BigEndian.Uint64(data[offset : offset+8])) retentionTime := int64(binary.BigEndian.Uint64(data[offset : offset+8]))
offset += 8 offset += 8
// Parse Topics array // Parse Topics array
if len(data) < offset+4 { if len(data) < offset+4 {
return nil, fmt.Errorf("OffsetCommit request missing topics array") return nil, fmt.Errorf("OffsetCommit request missing topics array")
} }
topicsCount := binary.BigEndian.Uint32(data[offset : offset+4]) topicsCount := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4 offset += 4
fmt.Printf("DEBUG: OffsetCommit - GroupID: %s, GenerationID: %d, MemberID: %s, RetentionTime: %d, TopicsCount: %d\n",
fmt.Printf("DEBUG: OffsetCommit - GroupID: %s, GenerationID: %d, MemberID: %s, RetentionTime: %d, TopicsCount: %d\n",
groupID, generationID, memberID, retentionTime, topicsCount) groupID, generationID, memberID, retentionTime, topicsCount)
topics := make([]OffsetCommitTopic, 0, topicsCount) topics := make([]OffsetCommitTopic, 0, topicsCount)
for i := uint32(0); i < topicsCount && offset < len(data); i++ { for i := uint32(0); i < topicsCount && offset < len(data); i++ {
// Parse topic name // Parse topic name
if len(data) < offset+2 { if len(data) < offset+2 {
@ -318,22 +318,22 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e
} }
topicNameLength := binary.BigEndian.Uint16(data[offset : offset+2]) topicNameLength := binary.BigEndian.Uint16(data[offset : offset+2])
offset += 2 offset += 2
if len(data) < offset+int(topicNameLength) { if len(data) < offset+int(topicNameLength) {
break break
} }
topicName := string(data[offset : offset+int(topicNameLength)]) topicName := string(data[offset : offset+int(topicNameLength)])
offset += int(topicNameLength) offset += int(topicNameLength)
// Parse partitions array // Parse partitions array
if len(data) < offset+4 { if len(data) < offset+4 {
break break
} }
partitionsCount := binary.BigEndian.Uint32(data[offset : offset+4]) partitionsCount := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4 offset += 4
partitions := make([]OffsetCommitPartition, 0, partitionsCount) partitions := make([]OffsetCommitPartition, 0, partitionsCount)
for j := uint32(0); j < partitionsCount && offset < len(data); j++ { for j := uint32(0); j < partitionsCount && offset < len(data); j++ {
// Parse partition index (4 bytes) // Parse partition index (4 bytes)
if len(data) < offset+4 { if len(data) < offset+4 {
@ -341,28 +341,28 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e
} }
partitionIndex := int32(binary.BigEndian.Uint32(data[offset : offset+4])) partitionIndex := int32(binary.BigEndian.Uint32(data[offset : offset+4]))
offset += 4 offset += 4
// Parse committed offset (8 bytes) // Parse committed offset (8 bytes)
if len(data) < offset+8 { if len(data) < offset+8 {
break break
} }
committedOffset := int64(binary.BigEndian.Uint64(data[offset : offset+8])) committedOffset := int64(binary.BigEndian.Uint64(data[offset : offset+8]))
offset += 8 offset += 8
// Parse leader epoch (4 bytes) // Parse leader epoch (4 bytes)
if len(data) < offset+4 { if len(data) < offset+4 {
break break
} }
leaderEpoch := int32(binary.BigEndian.Uint32(data[offset : offset+4])) leaderEpoch := int32(binary.BigEndian.Uint32(data[offset : offset+4]))
offset += 4 offset += 4
// Parse metadata (nullable string) // Parse metadata (nullable string)
if len(data) < offset+2 { if len(data) < offset+2 {
break break
} }
metadataLength := int16(binary.BigEndian.Uint16(data[offset : offset+2])) metadataLength := int16(binary.BigEndian.Uint16(data[offset : offset+2]))
offset += 2 offset += 2
var metadata string var metadata string
if metadataLength == -1 { if metadataLength == -1 {
metadata = "" // null string metadata = "" // null string
@ -370,24 +370,24 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e
metadata = string(data[offset : offset+int(metadataLength)]) metadata = string(data[offset : offset+int(metadataLength)])
offset += int(metadataLength) offset += int(metadataLength)
} }
partitions = append(partitions, OffsetCommitPartition{ partitions = append(partitions, OffsetCommitPartition{
Index: partitionIndex, Index: partitionIndex,
Offset: committedOffset, Offset: committedOffset,
LeaderEpoch: leaderEpoch, LeaderEpoch: leaderEpoch,
Metadata: metadata, Metadata: metadata,
}) })
fmt.Printf("DEBUG: OffsetCommit - Topic: %s, Partition: %d, Offset: %d, LeaderEpoch: %d, Metadata: %s\n", fmt.Printf("DEBUG: OffsetCommit - Topic: %s, Partition: %d, Offset: %d, LeaderEpoch: %d, Metadata: %s\n",
topicName, partitionIndex, committedOffset, leaderEpoch, metadata) topicName, partitionIndex, committedOffset, leaderEpoch, metadata)
} }
topics = append(topics, OffsetCommitTopic{ topics = append(topics, OffsetCommitTopic{
Name: topicName, Name: topicName,
Partitions: partitions, Partitions: partitions,
}) })
} }
return &OffsetCommitRequest{ return &OffsetCommitRequest{
GroupID: groupID, GroupID: groupID,
GenerationID: generationID, GenerationID: generationID,
@ -401,9 +401,9 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err
if len(data) < 4 { if len(data) < 4 {
return nil, fmt.Errorf("request too short") return nil, fmt.Errorf("request too short")
} }
offset := 0 offset := 0
// GroupID (string) // GroupID (string)
groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2 offset += 2
@ -412,18 +412,18 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err
} }
groupID := string(data[offset : offset+groupIDLength]) groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength offset += groupIDLength
// Parse Topics array // Parse Topics array
if len(data) < offset+4 { if len(data) < offset+4 {
return nil, fmt.Errorf("OffsetFetch request missing topics array") return nil, fmt.Errorf("OffsetFetch request missing topics array")
} }
topicsCount := binary.BigEndian.Uint32(data[offset : offset+4]) topicsCount := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4 offset += 4
fmt.Printf("DEBUG: OffsetFetch - GroupID: %s, TopicsCount: %d\n", groupID, topicsCount) fmt.Printf("DEBUG: OffsetFetch - GroupID: %s, TopicsCount: %d\n", groupID, topicsCount)
topics := make([]OffsetFetchTopic, 0, topicsCount) topics := make([]OffsetFetchTopic, 0, topicsCount)
for i := uint32(0); i < topicsCount && offset < len(data); i++ { for i := uint32(0); i < topicsCount && offset < len(data); i++ {
// Parse topic name // Parse topic name
if len(data) < offset+2 { if len(data) < offset+2 {
@ -431,22 +431,22 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err
} }
topicNameLength := binary.BigEndian.Uint16(data[offset : offset+2]) topicNameLength := binary.BigEndian.Uint16(data[offset : offset+2])
offset += 2 offset += 2
if len(data) < offset+int(topicNameLength) { if len(data) < offset+int(topicNameLength) {
break break
} }
topicName := string(data[offset : offset+int(topicNameLength)]) topicName := string(data[offset : offset+int(topicNameLength)])
offset += int(topicNameLength) offset += int(topicNameLength)
// Parse partitions array // Parse partitions array
if len(data) < offset+4 { if len(data) < offset+4 {
break break
} }
partitionsCount := binary.BigEndian.Uint32(data[offset : offset+4]) partitionsCount := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4 offset += 4
partitions := make([]int32, 0, partitionsCount) partitions := make([]int32, 0, partitionsCount)
// If partitionsCount is 0, it means "fetch all partitions" // If partitionsCount is 0, it means "fetch all partitions"
if partitionsCount == 0 { if partitionsCount == 0 {
fmt.Printf("DEBUG: OffsetFetch - Topic: %s, Partitions: ALL\n", topicName) fmt.Printf("DEBUG: OffsetFetch - Topic: %s, Partitions: ALL\n", topicName)
@ -459,18 +459,18 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err
} }
partitionIndex := int32(binary.BigEndian.Uint32(data[offset : offset+4])) partitionIndex := int32(binary.BigEndian.Uint32(data[offset : offset+4]))
offset += 4 offset += 4
partitions = append(partitions, partitionIndex) partitions = append(partitions, partitionIndex)
fmt.Printf("DEBUG: OffsetFetch - Topic: %s, Partition: %d\n", topicName, partitionIndex) fmt.Printf("DEBUG: OffsetFetch - Topic: %s, Partition: %d\n", topicName, partitionIndex)
} }
} }
topics = append(topics, OffsetFetchTopic{ topics = append(topics, OffsetFetchTopic{
Name: topicName, Name: topicName,
Partitions: partitions, Partitions: partitions,
}) })
} }
// Parse RequireStable flag (1 byte) - for transactional consistency // Parse RequireStable flag (1 byte) - for transactional consistency
var requireStable bool var requireStable bool
if len(data) >= offset+1 { if len(data) >= offset+1 {
@ -478,7 +478,7 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err
offset += 1 offset += 1
fmt.Printf("DEBUG: OffsetFetch - RequireStable: %v\n", requireStable) fmt.Printf("DEBUG: OffsetFetch - RequireStable: %v\n", requireStable)
} }
return &OffsetFetchRequest{ return &OffsetFetchRequest{
GroupID: groupID, GroupID: groupID,
Topics: topics, Topics: topics,
@ -491,18 +491,18 @@ func (h *Handler) commitOffset(group *consumer.ConsumerGroup, topic string, part
if group.OffsetCommits == nil { if group.OffsetCommits == nil {
group.OffsetCommits = make(map[string]map[int32]consumer.OffsetCommit) group.OffsetCommits = make(map[string]map[int32]consumer.OffsetCommit)
} }
if group.OffsetCommits[topic] == nil { if group.OffsetCommits[topic] == nil {
group.OffsetCommits[topic] = make(map[int32]consumer.OffsetCommit) group.OffsetCommits[topic] = make(map[int32]consumer.OffsetCommit)
} }
// Store the offset commit // Store the offset commit
group.OffsetCommits[topic][partition] = consumer.OffsetCommit{ group.OffsetCommits[topic][partition] = consumer.OffsetCommit{
Offset: offset, Offset: offset,
Metadata: metadata, Metadata: metadata,
Timestamp: time.Now(), Timestamp: time.Now(),
} }
return nil return nil
} }
@ -511,17 +511,17 @@ func (h *Handler) fetchOffset(group *consumer.ConsumerGroup, topic string, parti
if group.OffsetCommits == nil { if group.OffsetCommits == nil {
return -1, "", nil // No committed offset return -1, "", nil // No committed offset
} }
topicOffsets, exists := group.OffsetCommits[topic] topicOffsets, exists := group.OffsetCommits[topic]
if !exists { if !exists {
return -1, "", nil // No committed offset for topic return -1, "", nil // No committed offset for topic
} }
offsetCommit, exists := topicOffsets[partition] offsetCommit, exists := topicOffsets[partition]
if !exists { if !exists {
return -1, "", nil // No committed offset for partition return -1, "", nil // No committed offset for partition
} }
return offsetCommit.Offset, offsetCommit.Metadata, nil return offsetCommit.Offset, offsetCommit.Metadata, nil
} }
@ -530,51 +530,51 @@ func (h *Handler) buildOffsetCommitResponse(response OffsetCommitResponse) []byt
for _, topic := range response.Topics { for _, topic := range response.Topics {
estimatedSize += len(topic.Name) + 8 + len(topic.Partitions)*8 estimatedSize += len(topic.Name) + 8 + len(topic.Partitions)*8
} }
result := make([]byte, 0, estimatedSize) result := make([]byte, 0, estimatedSize)
// Correlation ID (4 bytes) // Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4) correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...) result = append(result, correlationIDBytes...)
// Topics array length (4 bytes) // Topics array length (4 bytes)
topicsLengthBytes := make([]byte, 4) topicsLengthBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsLengthBytes, uint32(len(response.Topics))) binary.BigEndian.PutUint32(topicsLengthBytes, uint32(len(response.Topics)))
result = append(result, topicsLengthBytes...) result = append(result, topicsLengthBytes...)
// Topics // Topics
for _, topic := range response.Topics { for _, topic := range response.Topics {
// Topic name length (2 bytes) // Topic name length (2 bytes)
nameLength := make([]byte, 2) nameLength := make([]byte, 2)
binary.BigEndian.PutUint16(nameLength, uint16(len(topic.Name))) binary.BigEndian.PutUint16(nameLength, uint16(len(topic.Name)))
result = append(result, nameLength...) result = append(result, nameLength...)
// Topic name // Topic name
result = append(result, []byte(topic.Name)...) result = append(result, []byte(topic.Name)...)
// Partitions array length (4 bytes) // Partitions array length (4 bytes)
partitionsLength := make([]byte, 4) partitionsLength := make([]byte, 4)
binary.BigEndian.PutUint32(partitionsLength, uint32(len(topic.Partitions))) binary.BigEndian.PutUint32(partitionsLength, uint32(len(topic.Partitions)))
result = append(result, partitionsLength...) result = append(result, partitionsLength...)
// Partitions // Partitions
for _, partition := range topic.Partitions { for _, partition := range topic.Partitions {
// Partition index (4 bytes) // Partition index (4 bytes)
indexBytes := make([]byte, 4) indexBytes := make([]byte, 4)
binary.BigEndian.PutUint32(indexBytes, uint32(partition.Index)) binary.BigEndian.PutUint32(indexBytes, uint32(partition.Index))
result = append(result, indexBytes...) result = append(result, indexBytes...)
// Error code (2 bytes) // Error code (2 bytes)
errorBytes := make([]byte, 2) errorBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorBytes, uint16(partition.ErrorCode)) binary.BigEndian.PutUint16(errorBytes, uint16(partition.ErrorCode))
result = append(result, errorBytes...) result = append(result, errorBytes...)
} }
} }
// Throttle time (4 bytes, 0 = no throttling) // Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0) result = append(result, 0, 0, 0, 0)
return result return result
} }
@ -586,74 +586,74 @@ func (h *Handler) buildOffsetFetchResponse(response OffsetFetchResponse) []byte
estimatedSize += len(partition.Metadata) estimatedSize += len(partition.Metadata)
} }
} }
result := make([]byte, 0, estimatedSize) result := make([]byte, 0, estimatedSize)
// Correlation ID (4 bytes) // Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4) correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...) result = append(result, correlationIDBytes...)
// Topics array length (4 bytes) // Topics array length (4 bytes)
topicsLengthBytes := make([]byte, 4) topicsLengthBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsLengthBytes, uint32(len(response.Topics))) binary.BigEndian.PutUint32(topicsLengthBytes, uint32(len(response.Topics)))
result = append(result, topicsLengthBytes...) result = append(result, topicsLengthBytes...)
// Topics // Topics
for _, topic := range response.Topics { for _, topic := range response.Topics {
// Topic name length (2 bytes) // Topic name length (2 bytes)
nameLength := make([]byte, 2) nameLength := make([]byte, 2)
binary.BigEndian.PutUint16(nameLength, uint16(len(topic.Name))) binary.BigEndian.PutUint16(nameLength, uint16(len(topic.Name)))
result = append(result, nameLength...) result = append(result, nameLength...)
// Topic name // Topic name
result = append(result, []byte(topic.Name)...) result = append(result, []byte(topic.Name)...)
// Partitions array length (4 bytes) // Partitions array length (4 bytes)
partitionsLength := make([]byte, 4) partitionsLength := make([]byte, 4)
binary.BigEndian.PutUint32(partitionsLength, uint32(len(topic.Partitions))) binary.BigEndian.PutUint32(partitionsLength, uint32(len(topic.Partitions)))
result = append(result, partitionsLength...) result = append(result, partitionsLength...)
// Partitions // Partitions
for _, partition := range topic.Partitions { for _, partition := range topic.Partitions {
// Partition index (4 bytes) // Partition index (4 bytes)
indexBytes := make([]byte, 4) indexBytes := make([]byte, 4)
binary.BigEndian.PutUint32(indexBytes, uint32(partition.Index)) binary.BigEndian.PutUint32(indexBytes, uint32(partition.Index))
result = append(result, indexBytes...) result = append(result, indexBytes...)
// Committed offset (8 bytes) // Committed offset (8 bytes)
offsetBytes := make([]byte, 8) offsetBytes := make([]byte, 8)
binary.BigEndian.PutUint64(offsetBytes, uint64(partition.Offset)) binary.BigEndian.PutUint64(offsetBytes, uint64(partition.Offset))
result = append(result, offsetBytes...) result = append(result, offsetBytes...)
// Leader epoch (4 bytes) // Leader epoch (4 bytes)
epochBytes := make([]byte, 4) epochBytes := make([]byte, 4)
binary.BigEndian.PutUint32(epochBytes, uint32(partition.LeaderEpoch)) binary.BigEndian.PutUint32(epochBytes, uint32(partition.LeaderEpoch))
result = append(result, epochBytes...) result = append(result, epochBytes...)
// Metadata length (2 bytes) // Metadata length (2 bytes)
metadataLength := make([]byte, 2) metadataLength := make([]byte, 2)
binary.BigEndian.PutUint16(metadataLength, uint16(len(partition.Metadata))) binary.BigEndian.PutUint16(metadataLength, uint16(len(partition.Metadata)))
result = append(result, metadataLength...) result = append(result, metadataLength...)
// Metadata // Metadata
result = append(result, []byte(partition.Metadata)...) result = append(result, []byte(partition.Metadata)...)
// Error code (2 bytes) // Error code (2 bytes)
errorBytes := make([]byte, 2) errorBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorBytes, uint16(partition.ErrorCode)) binary.BigEndian.PutUint16(errorBytes, uint16(partition.ErrorCode))
result = append(result, errorBytes...) result = append(result, errorBytes...)
} }
} }
// Group-level error code (2 bytes) // Group-level error code (2 bytes)
groupErrorBytes := make([]byte, 2) groupErrorBytes := make([]byte, 2)
binary.BigEndian.PutUint16(groupErrorBytes, uint16(response.ErrorCode)) binary.BigEndian.PutUint16(groupErrorBytes, uint16(response.ErrorCode))
result = append(result, groupErrorBytes...) result = append(result, groupErrorBytes...)
// Throttle time (4 bytes, 0 = no throttling) // Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0) result = append(result, 0, 0, 0, 0)
return result return result
} }
@ -669,7 +669,7 @@ func (h *Handler) buildOffsetCommitErrorResponse(correlationID uint32, errorCode
}, },
}, },
} }
return h.buildOffsetCommitResponse(response) return h.buildOffsetCommitResponse(response)
} }
@ -679,6 +679,6 @@ func (h *Handler) buildOffsetFetchErrorResponse(correlationID uint32, errorCode
Topics: []OffsetFetchTopicResponse{}, Topics: []OffsetFetchTopicResponse{},
ErrorCode: errorCode, ErrorCode: errorCode,
} }
return h.buildOffsetFetchResponse(response) return h.buildOffsetFetchResponse(response)
} }
Loading…
Cancel
Save