From baed1e156a51a51673ef9e588ecb4b29793983c6 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 12:51:26 -0700 Subject: [PATCH] fmt --- test/kafka/metadata_comparison_test.go | 10 +- test/kafka/metadata_debug_test.go | 6 +- weed/mq/kafka/protocol/joingroup.go | 48 ++--- weed/mq/kafka/protocol/offset_management.go | 220 ++++++++++---------- 4 files changed, 142 insertions(+), 142 deletions(-) diff --git a/test/kafka/metadata_comparison_test.go b/test/kafka/metadata_comparison_test.go index 12fafcc97..a9c27919b 100644 --- a/test/kafka/metadata_comparison_test.go +++ b/test/kafka/metadata_comparison_test.go @@ -22,7 +22,7 @@ func TestMetadataResponseComparison(t *testing.T) { host, port := gatewayServer.GetListenerAddr() addr := fmt.Sprintf("%s:%d", host, port) - + // Add the same topic for both tests topic := "comparison-topic" gatewayServer.GetHandler().AddTopicForTesting(topic, 1) @@ -30,17 +30,17 @@ func TestMetadataResponseComparison(t *testing.T) { t.Logf("=== COMPARISON TEST ===") t.Logf("Gateway: %s", addr) t.Logf("Topic: %s", topic) - + // The key insight: Both Sarama and kafka-go should get the SAME metadata response // But Sarama works and kafka-go doesn't - this suggests kafka-go has stricter validation - + // Let's examine what our current Metadata v4 response looks like t.Logf("Run Sarama test and kafka-go test separately to compare logs") t.Logf("Look for differences in:") t.Logf("1. Response byte counts") - t.Logf("2. Broker ID consistency") + t.Logf("2. Broker ID consistency") t.Logf("3. Partition leader/ISR values") t.Logf("4. Error codes") - + // This test is just for documentation - the real comparison happens in logs } diff --git a/test/kafka/metadata_debug_test.go b/test/kafka/metadata_debug_test.go index 490607d5b..06e7550f8 100644 --- a/test/kafka/metadata_debug_test.go +++ b/test/kafka/metadata_debug_test.go @@ -5,8 +5,8 @@ import ( "testing" "time" - "github.com/segmentio/kafka-go" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" + "github.com/segmentio/kafka-go" ) func TestMetadataV6Debug(t *testing.T) { @@ -41,7 +41,7 @@ func TestMetadataV6Debug(t *testing.T) { t.Logf("Successfully read %d partitions for topic %s", len(partitions), topic) for _, p := range partitions { - t.Logf("Partition %d: Leader=%d, Replicas=%v, ISR=%v", + t.Logf("Partition %d: Leader=%d, Replicas=%v, ISR=%v", p.ID, p.Leader.ID, p.Replicas, p.Isr) } -} \ No newline at end of file +} diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 835099ffa..6c2b71798 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -322,25 +322,25 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) } protocolTypeLength := binary.BigEndian.Uint16(data[offset : offset+2]) offset += 2 - + if len(data) < offset+int(protocolTypeLength) { return nil, fmt.Errorf("JoinGroup request protocol type too short") } protocolType := string(data[offset : offset+int(protocolTypeLength)]) offset += int(protocolTypeLength) - + // Parse Group Protocols array if len(data) < offset+4 { return nil, fmt.Errorf("JoinGroup request missing group protocols") } protocolsCount := binary.BigEndian.Uint32(data[offset : offset+4]) offset += 4 - + fmt.Printf("DEBUG: JoinGroup - GroupID: %s, SessionTimeout: %d, RebalanceTimeout: %d, MemberID: %s, ProtocolType: %s, ProtocolsCount: %d\n", groupID, sessionTimeout, rebalanceTimeout, memberID, protocolType, protocolsCount) - + protocols := make([]GroupProtocol, 0, protocolsCount) - + for i := uint32(0); i < protocolsCount && offset < len(data); i++ { // Parse protocol name if len(data) < offset+2 { @@ -348,48 +348,48 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) } protocolNameLength := binary.BigEndian.Uint16(data[offset : offset+2]) offset += 2 - + if len(data) < offset+int(protocolNameLength) { break } protocolName := string(data[offset : offset+int(protocolNameLength)]) offset += int(protocolNameLength) - + // Parse protocol metadata if len(data) < offset+4 { break } metadataLength := binary.BigEndian.Uint32(data[offset : offset+4]) offset += 4 - + var metadata []byte if metadataLength > 0 && len(data) >= offset+int(metadataLength) { metadata = make([]byte, metadataLength) copy(metadata, data[offset:offset+int(metadataLength)]) offset += int(metadataLength) } - + protocols = append(protocols, GroupProtocol{ Name: protocolName, Metadata: metadata, }) - + fmt.Printf("DEBUG: JoinGroup - Protocol: %s, MetadataLength: %d\n", protocolName, metadataLength) } - + // Parse Group Instance ID (nullable string) - for static membership (Kafka 2.3+) var groupInstanceID string if len(data) >= offset+2 { instanceIDLength := int16(binary.BigEndian.Uint16(data[offset : offset+2])) offset += 2 - + if instanceIDLength == -1 { groupInstanceID = "" // null string } else if instanceIDLength >= 0 && len(data) >= offset+int(instanceIDLength) { groupInstanceID = string(data[offset : offset+int(instanceIDLength)]) offset += int(instanceIDLength) } - + if groupInstanceID != "" { fmt.Printf("DEBUG: JoinGroup - GroupInstanceID: %s\n", groupInstanceID) } @@ -572,7 +572,7 @@ func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) [] // - Version (2 bytes) // - Topics array (4 bytes count + topic names) // - User data (4 bytes length + data) - + for _, protocol := range protocols { if protocol.Name == "range" || protocol.Name == "roundrobin" || protocol.Name == "sticky" { topics := h.parseConsumerProtocolMetadata(protocol.Metadata) @@ -582,7 +582,7 @@ func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) [] } } } - + // Fallback to default if parsing fails fmt.Printf("DEBUG: Failed to extract subscription, using fallback topic\n") return []string{"test-topic"} @@ -592,24 +592,24 @@ func (h *Handler) parseConsumerProtocolMetadata(metadata []byte) []string { if len(metadata) < 6 { // version(2) + topics_count(4) return nil } - + offset := 0 - + // Parse version (2 bytes) version := binary.BigEndian.Uint16(metadata[offset : offset+2]) offset += 2 - + // Parse topics array if len(metadata) < offset+4 { return nil } topicsCount := binary.BigEndian.Uint32(metadata[offset : offset+4]) offset += 4 - + fmt.Printf("DEBUG: Consumer protocol metadata - Version: %d, TopicsCount: %d\n", version, topicsCount) - + topics := make([]string, 0, topicsCount) - + for i := uint32(0); i < topicsCount && offset < len(metadata); i++ { // Parse topic name if len(metadata) < offset+2 { @@ -617,17 +617,17 @@ func (h *Handler) parseConsumerProtocolMetadata(metadata []byte) []string { } topicNameLength := binary.BigEndian.Uint16(metadata[offset : offset+2]) offset += 2 - + if len(metadata) < offset+int(topicNameLength) { break } topicName := string(metadata[offset : offset+int(topicNameLength)]) offset += int(topicNameLength) - + topics = append(topics, topicName) fmt.Printf("DEBUG: Consumer subscribed to topic: %s\n", topicName) } - + return topics } diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index 83fcc2938..0d5007925 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -4,7 +4,7 @@ import ( "encoding/binary" "fmt" "time" - + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" ) @@ -13,12 +13,12 @@ import ( // OffsetCommitRequest represents an OffsetCommit request from a Kafka client type OffsetCommitRequest struct { - GroupID string - GenerationID int32 - MemberID string - GroupInstanceID string // Optional static membership ID - RetentionTime int64 // Offset retention time (-1 for broker default) - Topics []OffsetCommitTopic + GroupID string + GenerationID int32 + MemberID string + GroupInstanceID string // Optional static membership ID + RetentionTime int64 // Offset retention time (-1 for broker default) + Topics []OffsetCommitTopic } // OffsetCommitTopic represents topic-level offset commit data @@ -29,10 +29,10 @@ type OffsetCommitTopic struct { // OffsetCommitPartition represents partition-level offset commit data type OffsetCommitPartition struct { - Index int32 // Partition index - Offset int64 // Offset to commit - LeaderEpoch int32 // Leader epoch (-1 if not available) - Metadata string // Optional metadata + Index int32 // Partition index + Offset int64 // Offset to commit + LeaderEpoch int32 // Leader epoch (-1 if not available) + Metadata string // Optional metadata } // OffsetCommitResponse represents an OffsetCommit response to a Kafka client @@ -61,7 +61,7 @@ type OffsetFetchRequest struct { GroupID string GroupInstanceID string // Optional static membership ID Topics []OffsetFetchTopic - RequireStable bool // Only fetch stable offsets + RequireStable bool // Only fetch stable offsets } // OffsetFetchTopic represents topic-level offset fetch data @@ -94,10 +94,10 @@ type OffsetFetchPartitionResponse struct { // Error codes specific to offset management const ( - ErrorCodeInvalidCommitOffsetSize int16 = 28 - ErrorCodeOffsetMetadataTooLarge int16 = 12 - ErrorCodeOffsetLoadInProgress int16 = 14 - ErrorCodeNotCoordinatorForGroup int16 = 16 + ErrorCodeInvalidCommitOffsetSize int16 = 28 + ErrorCodeOffsetMetadataTooLarge int16 = 12 + ErrorCodeOffsetLoadInProgress int16 = 14 + ErrorCodeNotCoordinatorForGroup int16 = 16 ErrorCodeGroupAuthorizationFailed int16 = 30 ) @@ -107,51 +107,51 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, requestBody []byte) ( if err != nil { return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidCommitOffsetSize), nil } - + // Validate request if request.GroupID == "" || request.MemberID == "" { return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + // Get consumer group group := h.groupCoordinator.GetGroup(request.GroupID) if group == nil { return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + group.Mu.Lock() defer group.Mu.Unlock() - + // Update group's last activity group.LastActivity = time.Now() - + // Validate member exists and is in stable state member, exists := group.Members[request.MemberID] if !exists { return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil } - + if member.State != consumer.MemberStateStable { return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeRebalanceInProgress), nil } - + // Validate generation if request.GenerationID != group.Generation { return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeIllegalGeneration), nil } - + // Process offset commits response := OffsetCommitResponse{ CorrelationID: correlationID, Topics: make([]OffsetCommitTopicResponse, 0, len(request.Topics)), } - + for _, topic := range request.Topics { topicResponse := OffsetCommitTopicResponse{ Name: topic.Name, Partitions: make([]OffsetCommitPartitionResponse, 0, len(topic.Partitions)), } - + for _, partition := range topic.Partitions { // Validate partition assignment - consumer should only commit offsets for assigned partitions assigned := false @@ -161,7 +161,7 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, requestBody []byte) ( break } } - + var errorCode int16 = ErrorCodeNone if !assigned && group.State == consumer.GroupStateStable { // Allow commits during rebalancing, but restrict during stable state @@ -173,17 +173,17 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, requestBody []byte) ( errorCode = ErrorCodeOffsetMetadataTooLarge // Generic error } } - + partitionResponse := OffsetCommitPartitionResponse{ Index: partition.Index, ErrorCode: errorCode, } topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse) } - + response.Topics = append(response.Topics, topicResponse) } - + return h.buildOffsetCommitResponse(response), nil } @@ -193,34 +193,34 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, requestBody []byte) ([ if err != nil { return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + // Validate request if request.GroupID == "" { return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + // Get consumer group group := h.groupCoordinator.GetGroup(request.GroupID) if group == nil { return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - + group.Mu.RLock() defer group.Mu.RUnlock() - + // Build response response := OffsetFetchResponse{ CorrelationID: correlationID, Topics: make([]OffsetFetchTopicResponse, 0, len(request.Topics)), ErrorCode: ErrorCodeNone, } - + for _, topic := range request.Topics { topicResponse := OffsetFetchTopicResponse{ Name: topic.Name, Partitions: make([]OffsetFetchPartitionResponse, 0), } - + // If no partitions specified, fetch all partitions for the topic partitionsToFetch := topic.Partitions if len(partitionsToFetch) == 0 { @@ -231,16 +231,16 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, requestBody []byte) ([ } } } - + // Fetch offsets for requested partitions for _, partition := range partitionsToFetch { offset, metadata, err := h.fetchOffset(group, topic.Name, partition) - + var errorCode int16 = ErrorCodeNone if err != nil { errorCode = ErrorCodeOffsetLoadInProgress // Generic error } - + partitionResponse := OffsetFetchPartitionResponse{ Index: partition, Offset: offset, @@ -250,10 +250,10 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, requestBody []byte) ([ } topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse) } - + response.Topics = append(response.Topics, topicResponse) } - + return h.buildOffsetFetchResponse(response), nil } @@ -261,9 +261,9 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e if len(data) < 8 { return nil, fmt.Errorf("request too short") } - + offset := 0 - + // GroupID (string) groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 @@ -272,14 +272,14 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e } groupID := string(data[offset : offset+groupIDLength]) offset += groupIDLength - + // Generation ID (4 bytes) if offset+4 > len(data) { return nil, fmt.Errorf("missing generation ID") } generationID := int32(binary.BigEndian.Uint32(data[offset:])) offset += 4 - + // MemberID (string) if offset+2 > len(data) { return nil, fmt.Errorf("missing member ID length") @@ -291,26 +291,26 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e } memberID := string(data[offset : offset+memberIDLength]) offset += memberIDLength - + // Parse RetentionTime (8 bytes, -1 for broker default) if len(data) < offset+8 { return nil, fmt.Errorf("OffsetCommit request missing retention time") } retentionTime := int64(binary.BigEndian.Uint64(data[offset : offset+8])) offset += 8 - + // Parse Topics array if len(data) < offset+4 { return nil, fmt.Errorf("OffsetCommit request missing topics array") } topicsCount := binary.BigEndian.Uint32(data[offset : offset+4]) offset += 4 - - fmt.Printf("DEBUG: OffsetCommit - GroupID: %s, GenerationID: %d, MemberID: %s, RetentionTime: %d, TopicsCount: %d\n", + + fmt.Printf("DEBUG: OffsetCommit - GroupID: %s, GenerationID: %d, MemberID: %s, RetentionTime: %d, TopicsCount: %d\n", groupID, generationID, memberID, retentionTime, topicsCount) - + topics := make([]OffsetCommitTopic, 0, topicsCount) - + for i := uint32(0); i < topicsCount && offset < len(data); i++ { // Parse topic name if len(data) < offset+2 { @@ -318,22 +318,22 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e } topicNameLength := binary.BigEndian.Uint16(data[offset : offset+2]) offset += 2 - + if len(data) < offset+int(topicNameLength) { break } topicName := string(data[offset : offset+int(topicNameLength)]) offset += int(topicNameLength) - + // Parse partitions array if len(data) < offset+4 { break } partitionsCount := binary.BigEndian.Uint32(data[offset : offset+4]) offset += 4 - + partitions := make([]OffsetCommitPartition, 0, partitionsCount) - + for j := uint32(0); j < partitionsCount && offset < len(data); j++ { // Parse partition index (4 bytes) if len(data) < offset+4 { @@ -341,28 +341,28 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e } partitionIndex := int32(binary.BigEndian.Uint32(data[offset : offset+4])) offset += 4 - + // Parse committed offset (8 bytes) if len(data) < offset+8 { break } committedOffset := int64(binary.BigEndian.Uint64(data[offset : offset+8])) offset += 8 - + // Parse leader epoch (4 bytes) if len(data) < offset+4 { break } leaderEpoch := int32(binary.BigEndian.Uint32(data[offset : offset+4])) offset += 4 - + // Parse metadata (nullable string) if len(data) < offset+2 { break } metadataLength := int16(binary.BigEndian.Uint16(data[offset : offset+2])) offset += 2 - + var metadata string if metadataLength == -1 { metadata = "" // null string @@ -370,24 +370,24 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e metadata = string(data[offset : offset+int(metadataLength)]) offset += int(metadataLength) } - + partitions = append(partitions, OffsetCommitPartition{ Index: partitionIndex, Offset: committedOffset, LeaderEpoch: leaderEpoch, Metadata: metadata, }) - + fmt.Printf("DEBUG: OffsetCommit - Topic: %s, Partition: %d, Offset: %d, LeaderEpoch: %d, Metadata: %s\n", topicName, partitionIndex, committedOffset, leaderEpoch, metadata) } - + topics = append(topics, OffsetCommitTopic{ Name: topicName, Partitions: partitions, }) } - + return &OffsetCommitRequest{ GroupID: groupID, GenerationID: generationID, @@ -401,9 +401,9 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err if len(data) < 4 { return nil, fmt.Errorf("request too short") } - + offset := 0 - + // GroupID (string) groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 @@ -412,18 +412,18 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err } groupID := string(data[offset : offset+groupIDLength]) offset += groupIDLength - + // Parse Topics array if len(data) < offset+4 { return nil, fmt.Errorf("OffsetFetch request missing topics array") } topicsCount := binary.BigEndian.Uint32(data[offset : offset+4]) offset += 4 - + fmt.Printf("DEBUG: OffsetFetch - GroupID: %s, TopicsCount: %d\n", groupID, topicsCount) - + topics := make([]OffsetFetchTopic, 0, topicsCount) - + for i := uint32(0); i < topicsCount && offset < len(data); i++ { // Parse topic name if len(data) < offset+2 { @@ -431,22 +431,22 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err } topicNameLength := binary.BigEndian.Uint16(data[offset : offset+2]) offset += 2 - + if len(data) < offset+int(topicNameLength) { break } topicName := string(data[offset : offset+int(topicNameLength)]) offset += int(topicNameLength) - + // Parse partitions array if len(data) < offset+4 { break } partitionsCount := binary.BigEndian.Uint32(data[offset : offset+4]) offset += 4 - + partitions := make([]int32, 0, partitionsCount) - + // If partitionsCount is 0, it means "fetch all partitions" if partitionsCount == 0 { fmt.Printf("DEBUG: OffsetFetch - Topic: %s, Partitions: ALL\n", topicName) @@ -459,18 +459,18 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err } partitionIndex := int32(binary.BigEndian.Uint32(data[offset : offset+4])) offset += 4 - + partitions = append(partitions, partitionIndex) fmt.Printf("DEBUG: OffsetFetch - Topic: %s, Partition: %d\n", topicName, partitionIndex) } } - + topics = append(topics, OffsetFetchTopic{ Name: topicName, Partitions: partitions, }) } - + // Parse RequireStable flag (1 byte) - for transactional consistency var requireStable bool if len(data) >= offset+1 { @@ -478,7 +478,7 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err offset += 1 fmt.Printf("DEBUG: OffsetFetch - RequireStable: %v\n", requireStable) } - + return &OffsetFetchRequest{ GroupID: groupID, Topics: topics, @@ -491,18 +491,18 @@ func (h *Handler) commitOffset(group *consumer.ConsumerGroup, topic string, part if group.OffsetCommits == nil { group.OffsetCommits = make(map[string]map[int32]consumer.OffsetCommit) } - + if group.OffsetCommits[topic] == nil { group.OffsetCommits[topic] = make(map[int32]consumer.OffsetCommit) } - + // Store the offset commit group.OffsetCommits[topic][partition] = consumer.OffsetCommit{ Offset: offset, Metadata: metadata, Timestamp: time.Now(), } - + return nil } @@ -511,17 +511,17 @@ func (h *Handler) fetchOffset(group *consumer.ConsumerGroup, topic string, parti if group.OffsetCommits == nil { return -1, "", nil // No committed offset } - + topicOffsets, exists := group.OffsetCommits[topic] if !exists { return -1, "", nil // No committed offset for topic } - + offsetCommit, exists := topicOffsets[partition] if !exists { return -1, "", nil // No committed offset for partition } - + return offsetCommit.Offset, offsetCommit.Metadata, nil } @@ -530,51 +530,51 @@ func (h *Handler) buildOffsetCommitResponse(response OffsetCommitResponse) []byt for _, topic := range response.Topics { estimatedSize += len(topic.Name) + 8 + len(topic.Partitions)*8 } - + result := make([]byte, 0, estimatedSize) - + // Correlation ID (4 bytes) correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) result = append(result, correlationIDBytes...) - + // Topics array length (4 bytes) topicsLengthBytes := make([]byte, 4) binary.BigEndian.PutUint32(topicsLengthBytes, uint32(len(response.Topics))) result = append(result, topicsLengthBytes...) - + // Topics for _, topic := range response.Topics { // Topic name length (2 bytes) nameLength := make([]byte, 2) binary.BigEndian.PutUint16(nameLength, uint16(len(topic.Name))) result = append(result, nameLength...) - + // Topic name result = append(result, []byte(topic.Name)...) - + // Partitions array length (4 bytes) partitionsLength := make([]byte, 4) binary.BigEndian.PutUint32(partitionsLength, uint32(len(topic.Partitions))) result = append(result, partitionsLength...) - + // Partitions for _, partition := range topic.Partitions { // Partition index (4 bytes) indexBytes := make([]byte, 4) binary.BigEndian.PutUint32(indexBytes, uint32(partition.Index)) result = append(result, indexBytes...) - + // Error code (2 bytes) errorBytes := make([]byte, 2) binary.BigEndian.PutUint16(errorBytes, uint16(partition.ErrorCode)) result = append(result, errorBytes...) } } - + // Throttle time (4 bytes, 0 = no throttling) result = append(result, 0, 0, 0, 0) - + return result } @@ -586,74 +586,74 @@ func (h *Handler) buildOffsetFetchResponse(response OffsetFetchResponse) []byte estimatedSize += len(partition.Metadata) } } - + result := make([]byte, 0, estimatedSize) - + // Correlation ID (4 bytes) correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) result = append(result, correlationIDBytes...) - + // Topics array length (4 bytes) topicsLengthBytes := make([]byte, 4) binary.BigEndian.PutUint32(topicsLengthBytes, uint32(len(response.Topics))) result = append(result, topicsLengthBytes...) - + // Topics for _, topic := range response.Topics { // Topic name length (2 bytes) nameLength := make([]byte, 2) binary.BigEndian.PutUint16(nameLength, uint16(len(topic.Name))) result = append(result, nameLength...) - + // Topic name result = append(result, []byte(topic.Name)...) - + // Partitions array length (4 bytes) partitionsLength := make([]byte, 4) binary.BigEndian.PutUint32(partitionsLength, uint32(len(topic.Partitions))) result = append(result, partitionsLength...) - + // Partitions for _, partition := range topic.Partitions { // Partition index (4 bytes) indexBytes := make([]byte, 4) binary.BigEndian.PutUint32(indexBytes, uint32(partition.Index)) result = append(result, indexBytes...) - + // Committed offset (8 bytes) offsetBytes := make([]byte, 8) binary.BigEndian.PutUint64(offsetBytes, uint64(partition.Offset)) result = append(result, offsetBytes...) - + // Leader epoch (4 bytes) epochBytes := make([]byte, 4) binary.BigEndian.PutUint32(epochBytes, uint32(partition.LeaderEpoch)) result = append(result, epochBytes...) - + // Metadata length (2 bytes) metadataLength := make([]byte, 2) binary.BigEndian.PutUint16(metadataLength, uint16(len(partition.Metadata))) result = append(result, metadataLength...) - + // Metadata result = append(result, []byte(partition.Metadata)...) - + // Error code (2 bytes) errorBytes := make([]byte, 2) binary.BigEndian.PutUint16(errorBytes, uint16(partition.ErrorCode)) result = append(result, errorBytes...) } } - + // Group-level error code (2 bytes) groupErrorBytes := make([]byte, 2) binary.BigEndian.PutUint16(groupErrorBytes, uint16(response.ErrorCode)) result = append(result, groupErrorBytes...) - + // Throttle time (4 bytes, 0 = no throttling) result = append(result, 0, 0, 0, 0) - + return result } @@ -669,7 +669,7 @@ func (h *Handler) buildOffsetCommitErrorResponse(correlationID uint32, errorCode }, }, } - + return h.buildOffsetCommitResponse(response) } @@ -679,6 +679,6 @@ func (h *Handler) buildOffsetFetchErrorResponse(correlationID uint32, errorCode Topics: []OffsetFetchTopicResponse{}, ErrorCode: errorCode, } - + return h.buildOffsetFetchResponse(response) }