From 1807b8093c7d9d4658ff2cf1f0cba6de7c6a9217 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 15 Oct 2025 18:59:46 -0700 Subject: [PATCH] debug fetch offset APIs --- test/kafka/kafka-client-loadtest/go.mod | 11 ++- test/kafka/kafka-client-loadtest/go.sum | 14 ++- .../test_offset_fetch.go | 86 +++++++++++++++++++ .../mq/kafka/consumer_offset/filer_storage.go | 12 ++- .../consumer_offset/filer_storage_test.go | 11 ++- weed/mq/kafka/protocol/find_coordinator.go | 3 + weed/mq/kafka/protocol/handler.go | 6 ++ weed/mq/kafka/protocol/offset_management.go | 25 +++--- weed/util/log_buffer/log_read_stateless.go | 11 ++- 9 files changed, 144 insertions(+), 35 deletions(-) create mode 100644 test/kafka/kafka-client-loadtest/test_offset_fetch.go diff --git a/test/kafka/kafka-client-loadtest/go.mod b/test/kafka/kafka-client-loadtest/go.mod index 6ebbfc396..d333a4225 100644 --- a/test/kafka/kafka-client-loadtest/go.mod +++ b/test/kafka/kafka-client-loadtest/go.mod @@ -8,6 +8,7 @@ require ( github.com/IBM/sarama v1.46.1 github.com/linkedin/goavro/v2 v2.14.0 github.com/prometheus/client_golang v1.23.2 + google.golang.org/protobuf v1.36.8 gopkg.in/yaml.v3 v3.0.1 ) @@ -34,8 +35,10 @@ require ( github.com/prometheus/procfs v0.16.1 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect - golang.org/x/crypto v0.42.0 // indirect - golang.org/x/net v0.44.0 // indirect - golang.org/x/sys v0.36.0 // indirect - google.golang.org/protobuf v1.36.8 // indirect + golang.org/x/crypto v0.43.0 // indirect + golang.org/x/net v0.46.0 // indirect + golang.org/x/sys v0.37.0 // indirect ) + +// Use local Sarama repo for debugging +replace github.com/IBM/sarama => /Users/chrislu/dev/sarama diff --git a/test/kafka/kafka-client-loadtest/go.sum b/test/kafka/kafka-client-loadtest/go.sum index d1869c0fc..cdafa918f 100644 --- a/test/kafka/kafka-client-loadtest/go.sum +++ b/test/kafka/kafka-client-loadtest/go.sum @@ -1,5 +1,3 @@ -github.com/IBM/sarama v1.46.1 h1:AlDkvyQm4LKktoQZxv0sbTfH3xukeH7r/UFBbUmFV9M= -github.com/IBM/sarama v1.46.1/go.mod h1:ipyOREIx+o9rMSrrPGLZHGuT0mzecNzKd19Quq+Q8AA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -84,8 +82,8 @@ go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= -golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= +golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= +golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -93,8 +91,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= -golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= +golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= @@ -105,8 +103,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= -golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/test/kafka/kafka-client-loadtest/test_offset_fetch.go b/test/kafka/kafka-client-loadtest/test_offset_fetch.go new file mode 100644 index 000000000..0cb99dbf7 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/test_offset_fetch.go @@ -0,0 +1,86 @@ +package main + +import ( + "context" + "log" + "time" + + "github.com/IBM/sarama" +) + +func main() { + log.Println("=== Testing OffsetFetch with Debug Sarama ===") + + config := sarama.NewConfig() + config.Version = sarama.V2_8_0_0 + config.Consumer.Return.Errors = true + config.Consumer.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Offsets.AutoCommit.Enable = true + config.Consumer.Offsets.AutoCommit.Interval = 100 * time.Millisecond + config.Consumer.Group.Session.Timeout = 30 * time.Second + config.Consumer.Group.Heartbeat.Interval = 3 * time.Second + + brokers := []string{"localhost:9093"} + group := "test-offset-fetch-group" + topics := []string{"loadtest-topic-0"} + + log.Printf("Creating consumer group: group=%s brokers=%v topics=%v", group, brokers, topics) + + consumerGroup, err := sarama.NewConsumerGroup(brokers, group, config) + if err != nil { + log.Fatalf("Failed to create consumer group: %v", err) + } + defer consumerGroup.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + handler := &testHandler{} + + log.Println("Starting consumer group session...") + log.Println("Watch for 🔍 [SARAMA-DEBUG] logs to trace OffsetFetch calls") + + go func() { + for { + if err := consumerGroup.Consume(ctx, topics, handler); err != nil { + log.Printf("Error from consumer: %v", err) + } + if ctx.Err() != nil { + return + } + } + }() + + // Wait for context to be done + <-ctx.Done() + log.Println("Test completed") +} + +type testHandler struct{} + +func (h *testHandler) Setup(session sarama.ConsumerGroupSession) error { + log.Printf("✓ Consumer group session setup: generation=%d memberID=%s", session.GenerationID(), session.MemberID()) + return nil +} + +func (h *testHandler) Cleanup(session sarama.ConsumerGroupSession) error { + log.Println("Consumer group session cleanup") + return nil +} + +func (h *testHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + log.Printf("✓ Started consuming: topic=%s partition=%d offset=%d", claim.Topic(), claim.Partition(), claim.InitialOffset()) + + count := 0 + for message := range claim.Messages() { + count++ + log.Printf(" Received message #%d: offset=%d", count, message.Offset) + session.MarkMessage(message, "") + + if count >= 5 { + log.Println("Received 5 messages, stopping") + return nil + } + } + return nil +} diff --git a/weed/mq/kafka/consumer_offset/filer_storage.go b/weed/mq/kafka/consumer_offset/filer_storage.go index 6edc9d5aa..8eeceb660 100644 --- a/weed/mq/kafka/consumer_offset/filer_storage.go +++ b/weed/mq/kafka/consumer_offset/filer_storage.go @@ -13,6 +13,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +const ( + // ConsumerOffsetsBasePath is the base path for storing Kafka consumer offsets in SeaweedFS + ConsumerOffsetsBasePath = "/topics/kafka/.meta/consumer_offsets" +) + // KafkaConsumerPosition represents a Kafka consumer's position // Can be either offset-based or timestamp-based type KafkaConsumerPosition struct { @@ -23,7 +28,7 @@ type KafkaConsumerPosition struct { } // FilerStorage implements OffsetStorage using SeaweedFS filer -// Offsets are stored in JSON format: /kafka/consumer_offsets/{group}/{topic}/{partition}/offset +// Offsets are stored in JSON format: {ConsumerOffsetsBasePath}/{group}/{topic}/{partition}/offset // Supports both offset and timestamp positioning type FilerStorage struct { fca *filer_client.FilerClientAccessor @@ -160,8 +165,7 @@ func (f *FilerStorage) ListGroups() ([]string, error) { return nil, ErrStorageClosed } - basePath := "/kafka/consumer_offsets" - return f.listDirectory(basePath) + return f.listDirectory(ConsumerOffsetsBasePath) } // Close releases resources @@ -173,7 +177,7 @@ func (f *FilerStorage) Close() error { // Helper methods func (f *FilerStorage) getGroupPath(group string) string { - return fmt.Sprintf("/kafka/consumer_offsets/%s", group) + return fmt.Sprintf("%s/%s", ConsumerOffsetsBasePath, group) } func (f *FilerStorage) getTopicPath(group, topic string) string { diff --git a/weed/mq/kafka/consumer_offset/filer_storage_test.go b/weed/mq/kafka/consumer_offset/filer_storage_test.go index 6f2f533c5..67a0e7e09 100644 --- a/weed/mq/kafka/consumer_offset/filer_storage_test.go +++ b/weed/mq/kafka/consumer_offset/filer_storage_test.go @@ -49,18 +49,17 @@ func TestFilerStoragePath(t *testing.T) { partition := int32(5) groupPath := storage.getGroupPath(group) - assert.Equal(t, "/kafka/consumer_offsets/test-group", groupPath) + assert.Equal(t, ConsumerOffsetsBasePath+"/test-group", groupPath) topicPath := storage.getTopicPath(group, topic) - assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic", topicPath) + assert.Equal(t, ConsumerOffsetsBasePath+"/test-group/test-topic", topicPath) partitionPath := storage.getPartitionPath(group, topic, partition) - assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic/5", partitionPath) + assert.Equal(t, ConsumerOffsetsBasePath+"/test-group/test-topic/5", partitionPath) offsetPath := storage.getOffsetPath(group, topic, partition) - assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic/5/offset", offsetPath) + assert.Equal(t, ConsumerOffsetsBasePath+"/test-group/test-topic/5/offset", offsetPath) metadataPath := storage.getMetadataPath(group, topic, partition) - assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic/5/metadata", metadataPath) + assert.Equal(t, ConsumerOffsetsBasePath+"/test-group/test-topic/5/metadata", metadataPath) } - diff --git a/weed/mq/kafka/protocol/find_coordinator.go b/weed/mq/kafka/protocol/find_coordinator.go index 946c51ef6..f6f719051 100644 --- a/weed/mq/kafka/protocol/find_coordinator.go +++ b/weed/mq/kafka/protocol/find_coordinator.go @@ -29,6 +29,9 @@ type CoordinatorAssignment struct { } func (h *Handler) handleFindCoordinator(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + glog.V(0).Infof("═══════════════════════════════════════════════════════════════") + glog.V(0).Infof(" 🔍 FIND_COORDINATOR API CALLED (ApiKey 10) version=%d", apiVersion) + glog.V(0).Infof("═══════════════════════════════════════════════════════════════") glog.V(4).Infof("FindCoordinator ENTRY: version=%d, correlation=%d, bodyLen=%d", apiVersion, correlationID, len(requestBody)) switch apiVersion { case 0: diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 9e694af43..8730d00ac 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1036,6 +1036,12 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { requestStart := time.Now() apiName := getAPIName(APIKey(req.apiKey)) + // ═══════════════════════════════════════════════════════════════ + // LOG ALL INCOMING KAFKA API CALLS + // ═══════════════════════════════════════════════════════════════ + glog.V(0).Infof("🔵 [API] %s (key=%d, ver=%d, corr=%d)", + apiName, req.apiKey, req.apiVersion, req.correlationID) + var response []byte var err error diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index 95a94538d..c8aa6744e 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -198,6 +198,10 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re } func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + glog.V(0).Infof("═══════════════════════════════════════════════════════════════") + glog.V(0).Infof(" 🔍 OFFSET_FETCH API CALLED (ApiKey 9)") + glog.V(0).Infof("═══════════════════════════════════════════════════════════════") + // Parse OffsetFetch request request, err := h.parseOffsetFetchRequest(requestBody) if err != nil { @@ -209,17 +213,16 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - // Get consumer group - group := h.groupCoordinator.GetGroup(request.GroupID) - if group == nil { - glog.V(1).Infof("[OFFSET_FETCH] Group not found: %s", request.GroupID) - return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil - } + // Get or create consumer group + // IMPORTANT: Use GetOrCreateGroup (not GetGroup) to allow fetching persisted offsets + // even if the group doesn't exist in memory yet. This is critical for consumer restarts. + // Kafka allows offset fetches for groups that haven't joined yet (e.g., simple consumers). + group := h.groupCoordinator.GetOrCreateGroup(request.GroupID) group.Mu.RLock() defer group.Mu.RUnlock() - glog.V(1).Infof("[OFFSET_FETCH] Request: group=%s topics=%v", request.GroupID, request.Topics) + glog.V(0).Infof("[OFFSET_FETCH] Request: group=%s topics=%d", request.GroupID, len(request.Topics)) // Build response response := OffsetFetchResponse{ @@ -255,7 +258,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 { fetchedOffset = off metadata = meta - glog.V(1).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", + glog.V(0).Infof("[OFFSET_FETCH] ✓ Found in memory: group=%s topic=%s partition=%d offset=%d", request.GroupID, topic.Name, partition, off) } else { // Fallback: try fetching from SMQ persistent storage @@ -269,10 +272,10 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { fetchedOffset = off metadata = meta - glog.V(1).Infof("[OFFSET_FETCH] Found in SMQ: group=%s topic=%s partition=%d offset=%d", + glog.V(0).Infof("[OFFSET_FETCH] ✓ Found in storage: group=%s topic=%s partition=%d offset=%d", request.GroupID, topic.Name, partition, off) } else { - glog.V(1).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d", + glog.V(0).Infof("[OFFSET_FETCH] ✗ No offset found: group=%s topic=%s partition=%d (will start from auto.offset.reset)", request.GroupID, topic.Name, partition) } // No offset found in either location (-1 indicates no committed offset) @@ -285,8 +288,6 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req Metadata: metadata, ErrorCode: errorCode, } - glog.V(1).Infof("[OFFSET_FETCH] Returning: group=%s topic=%s partition=%d offset=%d", - request.GroupID, topic.Name, partition, fetchedOffset) topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse) } diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index 4339962d4..8d2b42033 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -54,8 +54,17 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages bufferStartOffset := logBuffer.bufferStartOffset highWaterMark = currentBufferEnd + // Special case: empty buffer (no data written yet) + if currentBufferEnd == 0 && bufferStartOffset == 0 && logBuffer.pos == 0 { + logBuffer.RUnlock() + glog.V(4).Infof("[StatelessRead] Empty buffer, returning no data with endOfPartition=true") + // Return empty result - partition exists but has no data yet + // Preserve the requested offset in nextOffset + return messages, startOffset, 0, true, nil + } + // Check if requested offset is in current buffer - if startOffset >= bufferStartOffset && startOffset <= currentBufferEnd { + if startOffset >= bufferStartOffset && startOffset < currentBufferEnd { // Read from current buffer glog.V(4).Infof("[StatelessRead] Reading from current buffer: start=%d, end=%d", bufferStartOffset, currentBufferEnd)