diff --git a/test/kafka/sarama_e2e_test.go b/test/kafka/sarama_e2e_test.go index c9d0ebade..04a822c42 100644 --- a/test/kafka/sarama_e2e_test.go +++ b/test/kafka/sarama_e2e_test.go @@ -146,6 +146,9 @@ func TestSaramaConsumerGroup(t *testing.T) { config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Consumer.Return.Errors = true + // Producer configuration for SyncProducer + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true t.Logf("=== Testing Sarama Consumer Group ===") diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index d35c5bc2d..606be38d0 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -527,7 +527,13 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { case 8: // OffsetCommit response, err = h.handleOffsetCommit(correlationID, requestBody) case 9: // OffsetFetch - response, err = h.handleOffsetFetch(correlationID, requestBody) + fmt.Printf("DEBUG: *** OFFSETFETCH REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) + response, err = h.handleOffsetFetch(correlationID, apiVersion, requestBody) + if err != nil { + fmt.Printf("DEBUG: OffsetFetch error: %v\n", err) + } else { + fmt.Printf("DEBUG: OffsetFetch response hex dump (%d bytes): %x\n", len(response), response) + } case 10: // FindCoordinator fmt.Printf("DEBUG: *** FINDCOORDINATOR REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) response, err = h.handleFindCoordinator(correlationID, requestBody) diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 6bf145e25..a807acabd 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -897,13 +897,8 @@ func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][ } func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssignment) []byte { - // Build a simple serialized format for partition assignments - // Format: version(2) + num_topics(4) + topics... - // For each topic: topic_name_len(2) + topic_name + num_partitions(4) + partitions... - - if len(assignments) == 0 { - return []byte{0, 1, 0, 0, 0, 0} // Version 1, 0 topics - } + // Build ConsumerGroupMemberAssignment format exactly as Sarama expects: + // Version(2) + Topics array + UserData bytes // Group assignments by topic topicAssignments := make(map[string][]int32) @@ -916,22 +911,20 @@ func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssi // Version (2 bytes) - use version 1 result = append(result, 0, 1) - // Number of topics (4 bytes) + // Number of topics (4 bytes) - array length numTopicsBytes := make([]byte, 4) binary.BigEndian.PutUint32(numTopicsBytes, uint32(len(topicAssignments))) result = append(result, numTopicsBytes...) - // Topics + // Topics - each topic follows Kafka string + int32 array format for topic, partitions := range topicAssignments { - // Topic name length (2 bytes) + // Topic name as Kafka string: length(2) + content topicLenBytes := make([]byte, 2) binary.BigEndian.PutUint16(topicLenBytes, uint16(len(topic))) result = append(result, topicLenBytes...) - - // Topic name result = append(result, []byte(topic)...) - // Number of partitions (4 bytes) + // Partitions as int32 array: length(4) + elements numPartitionsBytes := make([]byte, 4) binary.BigEndian.PutUint32(numPartitionsBytes, uint32(len(partitions))) result = append(result, numPartitionsBytes...) @@ -944,9 +937,11 @@ func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssi } } - // User data length (4 bytes) - no user data + // UserData as Kafka bytes: length(4) + data (empty in our case) + // For empty user data, just put length = 0 result = append(result, 0, 0, 0, 0) + fmt.Printf("DEBUG: Generated assignment bytes (%d): %x\n", len(result), result) return result } diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index 8e3695da6..3f9b687ea 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -180,7 +180,7 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, requestBody []byte) ( return h.buildOffsetCommitResponse(response), nil } -func (h *Handler) handleOffsetFetch(correlationID uint32, requestBody []byte) ([]byte, error) { +func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { // Parse OffsetFetch request request, err := h.parseOffsetFetchRequest(requestBody) if err != nil { @@ -266,7 +266,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, requestBody []byte) ([ response.Topics = append(response.Topics, topicResponse) } - return h.buildOffsetFetchResponse(response), nil + return h.buildOffsetFetchResponse(response, apiVersion), nil } func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, error) { @@ -579,7 +579,7 @@ func (h *Handler) buildOffsetCommitResponse(response OffsetCommitResponse) []byt return result } -func (h *Handler) buildOffsetFetchResponse(response OffsetFetchResponse) []byte { +func (h *Handler) buildOffsetFetchResponse(response OffsetFetchResponse, apiVersion uint16) []byte { estimatedSize := 32 for _, topic := range response.Topics { estimatedSize += len(topic.Name) + 16 + len(topic.Partitions)*32 @@ -627,10 +627,12 @@ func (h *Handler) buildOffsetFetchResponse(response OffsetFetchResponse) []byte binary.BigEndian.PutUint64(offsetBytes, uint64(partition.Offset)) result = append(result, offsetBytes...) - // Leader epoch (4 bytes) - epochBytes := make([]byte, 4) - binary.BigEndian.PutUint32(epochBytes, uint32(partition.LeaderEpoch)) - result = append(result, epochBytes...) + // Leader epoch (4 bytes) - only included in version 5+ + if apiVersion >= 5 { + epochBytes := make([]byte, 4) + binary.BigEndian.PutUint32(epochBytes, uint32(partition.LeaderEpoch)) + result = append(result, epochBytes...) + } // Metadata length (2 bytes) metadataLength := make([]byte, 2) @@ -652,8 +654,10 @@ func (h *Handler) buildOffsetFetchResponse(response OffsetFetchResponse) []byte binary.BigEndian.PutUint16(groupErrorBytes, uint16(response.ErrorCode)) result = append(result, groupErrorBytes...) - // Throttle time (4 bytes, 0 = no throttling) - result = append(result, 0, 0, 0, 0) + // Throttle time (4 bytes) - only included in version 3+ + if apiVersion >= 3 { + result = append(result, 0, 0, 0, 0) + } return result } @@ -681,5 +685,5 @@ func (h *Handler) buildOffsetFetchErrorResponse(correlationID uint32, errorCode ErrorCode: errorCode, } - return h.buildOffsetFetchResponse(response) + return h.buildOffsetFetchResponse(response, 0) } diff --git a/weed/mq/kafka/protocol/offset_management_test.go b/weed/mq/kafka/protocol/offset_management_test.go index 41a9659d3..7c405f98e 100644 --- a/weed/mq/kafka/protocol/offset_management_test.go +++ b/weed/mq/kafka/protocol/offset_management_test.go @@ -156,7 +156,7 @@ func TestHandler_handleOffsetFetch(t *testing.T) { requestBody := createOffsetFetchRequestBody("test-group") correlationID := uint32(126) - response, err := h.handleOffsetFetch(correlationID, requestBody) + response, err := h.handleOffsetFetch(correlationID, 2, requestBody) if err != nil { t.Fatalf("handleOffsetFetch failed: %v", err) @@ -188,7 +188,7 @@ func TestHandler_handleOffsetFetch_NoCommittedOffset(t *testing.T) { requestBody := createOffsetFetchRequestBody("test-group") correlationID := uint32(127) - response, err := h.handleOffsetFetch(correlationID, requestBody) + response, err := h.handleOffsetFetch(correlationID, 2, requestBody) if err != nil { t.Fatalf("handleOffsetFetch failed: %v", err) @@ -369,7 +369,7 @@ func TestHandler_OffsetCommitFetch_EndToEnd(t *testing.T) { // Test offset fetch fetchRequestBody := createOffsetFetchRequestBody("test-group") - fetchResponse, err := server.handleOffsetFetch(457, fetchRequestBody) + fetchResponse, err := server.handleOffsetFetch(457, 2, fetchRequestBody) if err != nil { t.Fatalf("offset fetch failed: %v", err) }