Browse Source

align package decoding

pull/7231/head
chrislu 2 months ago
parent
commit
42aea1dc68
  1. 3
      test/kafka/sarama_e2e_test.go
  2. 8
      weed/mq/kafka/protocol/handler.go
  3. 23
      weed/mq/kafka/protocol/joingroup.go
  4. 24
      weed/mq/kafka/protocol/offset_management.go
  5. 6
      weed/mq/kafka/protocol/offset_management_test.go

3
test/kafka/sarama_e2e_test.go

@ -146,6 +146,9 @@ func TestSaramaConsumerGroup(t *testing.T) {
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Return.Errors = true
// Producer configuration for SyncProducer
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
t.Logf("=== Testing Sarama Consumer Group ===")

8
weed/mq/kafka/protocol/handler.go

@ -527,7 +527,13 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
case 8: // OffsetCommit
response, err = h.handleOffsetCommit(correlationID, requestBody)
case 9: // OffsetFetch
response, err = h.handleOffsetFetch(correlationID, requestBody)
fmt.Printf("DEBUG: *** OFFSETFETCH REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion)
response, err = h.handleOffsetFetch(correlationID, apiVersion, requestBody)
if err != nil {
fmt.Printf("DEBUG: OffsetFetch error: %v\n", err)
} else {
fmt.Printf("DEBUG: OffsetFetch response hex dump (%d bytes): %x\n", len(response), response)
}
case 10: // FindCoordinator
fmt.Printf("DEBUG: *** FINDCOORDINATOR REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion)
response, err = h.handleFindCoordinator(correlationID, requestBody)

23
weed/mq/kafka/protocol/joingroup.go

@ -897,13 +897,8 @@ func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][
}
func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssignment) []byte {
// Build a simple serialized format for partition assignments
// Format: version(2) + num_topics(4) + topics...
// For each topic: topic_name_len(2) + topic_name + num_partitions(4) + partitions...
if len(assignments) == 0 {
return []byte{0, 1, 0, 0, 0, 0} // Version 1, 0 topics
}
// Build ConsumerGroupMemberAssignment format exactly as Sarama expects:
// Version(2) + Topics array + UserData bytes
// Group assignments by topic
topicAssignments := make(map[string][]int32)
@ -916,22 +911,20 @@ func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssi
// Version (2 bytes) - use version 1
result = append(result, 0, 1)
// Number of topics (4 bytes)
// Number of topics (4 bytes) - array length
numTopicsBytes := make([]byte, 4)
binary.BigEndian.PutUint32(numTopicsBytes, uint32(len(topicAssignments)))
result = append(result, numTopicsBytes...)
// Topics
// Topics - each topic follows Kafka string + int32 array format
for topic, partitions := range topicAssignments {
// Topic name length (2 bytes)
// Topic name as Kafka string: length(2) + content
topicLenBytes := make([]byte, 2)
binary.BigEndian.PutUint16(topicLenBytes, uint16(len(topic)))
result = append(result, topicLenBytes...)
// Topic name
result = append(result, []byte(topic)...)
// Number of partitions (4 bytes)
// Partitions as int32 array: length(4) + elements
numPartitionsBytes := make([]byte, 4)
binary.BigEndian.PutUint32(numPartitionsBytes, uint32(len(partitions)))
result = append(result, numPartitionsBytes...)
@ -944,9 +937,11 @@ func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssi
}
}
// User data length (4 bytes) - no user data
// UserData as Kafka bytes: length(4) + data (empty in our case)
// For empty user data, just put length = 0
result = append(result, 0, 0, 0, 0)
fmt.Printf("DEBUG: Generated assignment bytes (%d): %x\n", len(result), result)
return result
}

24
weed/mq/kafka/protocol/offset_management.go

@ -180,7 +180,7 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, requestBody []byte) (
return h.buildOffsetCommitResponse(response), nil
}
func (h *Handler) handleOffsetFetch(correlationID uint32, requestBody []byte) ([]byte, error) {
func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
// Parse OffsetFetch request
request, err := h.parseOffsetFetchRequest(requestBody)
if err != nil {
@ -266,7 +266,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, requestBody []byte) ([
response.Topics = append(response.Topics, topicResponse)
}
return h.buildOffsetFetchResponse(response), nil
return h.buildOffsetFetchResponse(response, apiVersion), nil
}
func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, error) {
@ -579,7 +579,7 @@ func (h *Handler) buildOffsetCommitResponse(response OffsetCommitResponse) []byt
return result
}
func (h *Handler) buildOffsetFetchResponse(response OffsetFetchResponse) []byte {
func (h *Handler) buildOffsetFetchResponse(response OffsetFetchResponse, apiVersion uint16) []byte {
estimatedSize := 32
for _, topic := range response.Topics {
estimatedSize += len(topic.Name) + 16 + len(topic.Partitions)*32
@ -627,10 +627,12 @@ func (h *Handler) buildOffsetFetchResponse(response OffsetFetchResponse) []byte
binary.BigEndian.PutUint64(offsetBytes, uint64(partition.Offset))
result = append(result, offsetBytes...)
// Leader epoch (4 bytes)
epochBytes := make([]byte, 4)
binary.BigEndian.PutUint32(epochBytes, uint32(partition.LeaderEpoch))
result = append(result, epochBytes...)
// Leader epoch (4 bytes) - only included in version 5+
if apiVersion >= 5 {
epochBytes := make([]byte, 4)
binary.BigEndian.PutUint32(epochBytes, uint32(partition.LeaderEpoch))
result = append(result, epochBytes...)
}
// Metadata length (2 bytes)
metadataLength := make([]byte, 2)
@ -652,8 +654,10 @@ func (h *Handler) buildOffsetFetchResponse(response OffsetFetchResponse) []byte
binary.BigEndian.PutUint16(groupErrorBytes, uint16(response.ErrorCode))
result = append(result, groupErrorBytes...)
// Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0)
// Throttle time (4 bytes) - only included in version 3+
if apiVersion >= 3 {
result = append(result, 0, 0, 0, 0)
}
return result
}
@ -681,5 +685,5 @@ func (h *Handler) buildOffsetFetchErrorResponse(correlationID uint32, errorCode
ErrorCode: errorCode,
}
return h.buildOffsetFetchResponse(response)
return h.buildOffsetFetchResponse(response, 0)
}

6
weed/mq/kafka/protocol/offset_management_test.go

@ -156,7 +156,7 @@ func TestHandler_handleOffsetFetch(t *testing.T) {
requestBody := createOffsetFetchRequestBody("test-group")
correlationID := uint32(126)
response, err := h.handleOffsetFetch(correlationID, requestBody)
response, err := h.handleOffsetFetch(correlationID, 2, requestBody)
if err != nil {
t.Fatalf("handleOffsetFetch failed: %v", err)
@ -188,7 +188,7 @@ func TestHandler_handleOffsetFetch_NoCommittedOffset(t *testing.T) {
requestBody := createOffsetFetchRequestBody("test-group")
correlationID := uint32(127)
response, err := h.handleOffsetFetch(correlationID, requestBody)
response, err := h.handleOffsetFetch(correlationID, 2, requestBody)
if err != nil {
t.Fatalf("handleOffsetFetch failed: %v", err)
@ -369,7 +369,7 @@ func TestHandler_OffsetCommitFetch_EndToEnd(t *testing.T) {
// Test offset fetch
fetchRequestBody := createOffsetFetchRequestBody("test-group")
fetchResponse, err := server.handleOffsetFetch(457, fetchRequestBody)
fetchResponse, err := server.handleOffsetFetch(457, 2, fetchRequestBody)
if err != nil {
t.Fatalf("offset fetch failed: %v", err)
}

Loading…
Cancel
Save