Browse Source

debug fetch offset APIs

pull/7329/head
chrislu 6 days ago
parent
commit
1807b8093c
  1. 11
      test/kafka/kafka-client-loadtest/go.mod
  2. 14
      test/kafka/kafka-client-loadtest/go.sum
  3. 86
      test/kafka/kafka-client-loadtest/test_offset_fetch.go
  4. 12
      weed/mq/kafka/consumer_offset/filer_storage.go
  5. 11
      weed/mq/kafka/consumer_offset/filer_storage_test.go
  6. 3
      weed/mq/kafka/protocol/find_coordinator.go
  7. 6
      weed/mq/kafka/protocol/handler.go
  8. 25
      weed/mq/kafka/protocol/offset_management.go
  9. 11
      weed/util/log_buffer/log_read_stateless.go

11
test/kafka/kafka-client-loadtest/go.mod

@ -8,6 +8,7 @@ require (
github.com/IBM/sarama v1.46.1
github.com/linkedin/goavro/v2 v2.14.0
github.com/prometheus/client_golang v1.23.2
google.golang.org/protobuf v1.36.8
gopkg.in/yaml.v3 v3.0.1
)
@ -34,8 +35,10 @@ require (
github.com/prometheus/procfs v0.16.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/crypto v0.42.0 // indirect
golang.org/x/net v0.44.0 // indirect
golang.org/x/sys v0.36.0 // indirect
google.golang.org/protobuf v1.36.8 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/net v0.46.0 // indirect
golang.org/x/sys v0.37.0 // indirect
)
// Use local Sarama repo for debugging
replace github.com/IBM/sarama => /Users/chrislu/dev/sarama

14
test/kafka/kafka-client-loadtest/go.sum

@ -1,5 +1,3 @@
github.com/IBM/sarama v1.46.1 h1:AlDkvyQm4LKktoQZxv0sbTfH3xukeH7r/UFBbUmFV9M=
github.com/IBM/sarama v1.46.1/go.mod h1:ipyOREIx+o9rMSrrPGLZHGuT0mzecNzKd19Quq+Q8AA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
@ -84,8 +82,8 @@ go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -93,8 +91,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
@ -105,8 +103,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=

86
test/kafka/kafka-client-loadtest/test_offset_fetch.go

@ -0,0 +1,86 @@
package main
import (
"context"
"log"
"time"
"github.com/IBM/sarama"
)
func main() {
log.Println("=== Testing OffsetFetch with Debug Sarama ===")
config := sarama.NewConfig()
config.Version = sarama.V2_8_0_0
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 100 * time.Millisecond
config.Consumer.Group.Session.Timeout = 30 * time.Second
config.Consumer.Group.Heartbeat.Interval = 3 * time.Second
brokers := []string{"localhost:9093"}
group := "test-offset-fetch-group"
topics := []string{"loadtest-topic-0"}
log.Printf("Creating consumer group: group=%s brokers=%v topics=%v", group, brokers, topics)
consumerGroup, err := sarama.NewConsumerGroup(brokers, group, config)
if err != nil {
log.Fatalf("Failed to create consumer group: %v", err)
}
defer consumerGroup.Close()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
handler := &testHandler{}
log.Println("Starting consumer group session...")
log.Println("Watch for 🔍 [SARAMA-DEBUG] logs to trace OffsetFetch calls")
go func() {
for {
if err := consumerGroup.Consume(ctx, topics, handler); err != nil {
log.Printf("Error from consumer: %v", err)
}
if ctx.Err() != nil {
return
}
}
}()
// Wait for context to be done
<-ctx.Done()
log.Println("Test completed")
}
type testHandler struct{}
func (h *testHandler) Setup(session sarama.ConsumerGroupSession) error {
log.Printf("✓ Consumer group session setup: generation=%d memberID=%s", session.GenerationID(), session.MemberID())
return nil
}
func (h *testHandler) Cleanup(session sarama.ConsumerGroupSession) error {
log.Println("Consumer group session cleanup")
return nil
}
func (h *testHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
log.Printf("✓ Started consuming: topic=%s partition=%d offset=%d", claim.Topic(), claim.Partition(), claim.InitialOffset())
count := 0
for message := range claim.Messages() {
count++
log.Printf(" Received message #%d: offset=%d", count, message.Offset)
session.MarkMessage(message, "")
if count >= 5 {
log.Println("Received 5 messages, stopping")
return nil
}
}
return nil
}

12
weed/mq/kafka/consumer_offset/filer_storage.go

@ -13,6 +13,11 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
const (
// ConsumerOffsetsBasePath is the base path for storing Kafka consumer offsets in SeaweedFS
ConsumerOffsetsBasePath = "/topics/kafka/.meta/consumer_offsets"
)
// KafkaConsumerPosition represents a Kafka consumer's position
// Can be either offset-based or timestamp-based
type KafkaConsumerPosition struct {
@ -23,7 +28,7 @@ type KafkaConsumerPosition struct {
}
// FilerStorage implements OffsetStorage using SeaweedFS filer
// Offsets are stored in JSON format: /kafka/consumer_offsets/{group}/{topic}/{partition}/offset
// Offsets are stored in JSON format: {ConsumerOffsetsBasePath}/{group}/{topic}/{partition}/offset
// Supports both offset and timestamp positioning
type FilerStorage struct {
fca *filer_client.FilerClientAccessor
@ -160,8 +165,7 @@ func (f *FilerStorage) ListGroups() ([]string, error) {
return nil, ErrStorageClosed
}
basePath := "/kafka/consumer_offsets"
return f.listDirectory(basePath)
return f.listDirectory(ConsumerOffsetsBasePath)
}
// Close releases resources
@ -173,7 +177,7 @@ func (f *FilerStorage) Close() error {
// Helper methods
func (f *FilerStorage) getGroupPath(group string) string {
return fmt.Sprintf("/kafka/consumer_offsets/%s", group)
return fmt.Sprintf("%s/%s", ConsumerOffsetsBasePath, group)
}
func (f *FilerStorage) getTopicPath(group, topic string) string {

11
weed/mq/kafka/consumer_offset/filer_storage_test.go

@ -49,18 +49,17 @@ func TestFilerStoragePath(t *testing.T) {
partition := int32(5)
groupPath := storage.getGroupPath(group)
assert.Equal(t, "/kafka/consumer_offsets/test-group", groupPath)
assert.Equal(t, ConsumerOffsetsBasePath+"/test-group", groupPath)
topicPath := storage.getTopicPath(group, topic)
assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic", topicPath)
assert.Equal(t, ConsumerOffsetsBasePath+"/test-group/test-topic", topicPath)
partitionPath := storage.getPartitionPath(group, topic, partition)
assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic/5", partitionPath)
assert.Equal(t, ConsumerOffsetsBasePath+"/test-group/test-topic/5", partitionPath)
offsetPath := storage.getOffsetPath(group, topic, partition)
assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic/5/offset", offsetPath)
assert.Equal(t, ConsumerOffsetsBasePath+"/test-group/test-topic/5/offset", offsetPath)
metadataPath := storage.getMetadataPath(group, topic, partition)
assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic/5/metadata", metadataPath)
assert.Equal(t, ConsumerOffsetsBasePath+"/test-group/test-topic/5/metadata", metadataPath)
}

3
weed/mq/kafka/protocol/find_coordinator.go

@ -29,6 +29,9 @@ type CoordinatorAssignment struct {
}
func (h *Handler) handleFindCoordinator(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
glog.V(0).Infof("═══════════════════════════════════════════════════════════════")
glog.V(0).Infof(" 🔍 FIND_COORDINATOR API CALLED (ApiKey 10) version=%d", apiVersion)
glog.V(0).Infof("═══════════════════════════════════════════════════════════════")
glog.V(4).Infof("FindCoordinator ENTRY: version=%d, correlation=%d, bodyLen=%d", apiVersion, correlationID, len(requestBody))
switch apiVersion {
case 0:

6
weed/mq/kafka/protocol/handler.go

@ -1036,6 +1036,12 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
requestStart := time.Now()
apiName := getAPIName(APIKey(req.apiKey))
// ═══════════════════════════════════════════════════════════════
// LOG ALL INCOMING KAFKA API CALLS
// ═══════════════════════════════════════════════════════════════
glog.V(0).Infof("🔵 [API] %s (key=%d, ver=%d, corr=%d)",
apiName, req.apiKey, req.apiVersion, req.correlationID)
var response []byte
var err error

25
weed/mq/kafka/protocol/offset_management.go

@ -198,6 +198,10 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re
}
func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
glog.V(0).Infof("═══════════════════════════════════════════════════════════════")
glog.V(0).Infof(" 🔍 OFFSET_FETCH API CALLED (ApiKey 9)")
glog.V(0).Infof("═══════════════════════════════════════════════════════════════")
// Parse OffsetFetch request
request, err := h.parseOffsetFetchRequest(requestBody)
if err != nil {
@ -209,17 +213,16 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
// Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil {
glog.V(1).Infof("[OFFSET_FETCH] Group not found: %s", request.GroupID)
return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
// Get or create consumer group
// IMPORTANT: Use GetOrCreateGroup (not GetGroup) to allow fetching persisted offsets
// even if the group doesn't exist in memory yet. This is critical for consumer restarts.
// Kafka allows offset fetches for groups that haven't joined yet (e.g., simple consumers).
group := h.groupCoordinator.GetOrCreateGroup(request.GroupID)
group.Mu.RLock()
defer group.Mu.RUnlock()
glog.V(1).Infof("[OFFSET_FETCH] Request: group=%s topics=%v", request.GroupID, request.Topics)
glog.V(0).Infof("[OFFSET_FETCH] Request: group=%s topics=%d", request.GroupID, len(request.Topics))
// Build response
response := OffsetFetchResponse{
@ -255,7 +258,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 {
fetchedOffset = off
metadata = meta
glog.V(1).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d",
glog.V(0).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d",
request.GroupID, topic.Name, partition, off)
} else {
// Fallback: try fetching from SMQ persistent storage
@ -269,10 +272,10 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 {
fetchedOffset = off
metadata = meta
glog.V(1).Infof("[OFFSET_FETCH] Found in SMQ: group=%s topic=%s partition=%d offset=%d",
glog.V(0).Infof("[OFFSET_FETCH] ✓ Found in storage: group=%s topic=%s partition=%d offset=%d",
request.GroupID, topic.Name, partition, off)
} else {
glog.V(1).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d",
glog.V(0).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d (will start from auto.offset.reset)",
request.GroupID, topic.Name, partition)
}
// No offset found in either location (-1 indicates no committed offset)
@ -285,8 +288,6 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
Metadata: metadata,
ErrorCode: errorCode,
}
glog.V(1).Infof("[OFFSET_FETCH] Returning: group=%s topic=%s partition=%d offset=%d",
request.GroupID, topic.Name, partition, fetchedOffset)
topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse)
}

11
weed/util/log_buffer/log_read_stateless.go

@ -54,8 +54,17 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages
bufferStartOffset := logBuffer.bufferStartOffset
highWaterMark = currentBufferEnd
// Special case: empty buffer (no data written yet)
if currentBufferEnd == 0 && bufferStartOffset == 0 && logBuffer.pos == 0 {
logBuffer.RUnlock()
glog.V(4).Infof("[StatelessRead] Empty buffer, returning no data with endOfPartition=true")
// Return empty result - partition exists but has no data yet
// Preserve the requested offset in nextOffset
return messages, startOffset, 0, true, nil
}
// Check if requested offset is in current buffer
if startOffset >= bufferStartOffset && startOffset <= currentBufferEnd {
if startOffset >= bufferStartOffset && startOffset < currentBufferEnd {
// Read from current buffer
glog.V(4).Infof("[StatelessRead] Reading from current buffer: start=%d, end=%d",
bufferStartOffset, currentBufferEnd)

Loading…
Cancel
Save