From 3eaff0e787684a32020bd8198962563e064b40f0 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 10 Sep 2025 12:46:29 -0700 Subject: [PATCH] mq(kafka): implement offset ledger system with thread-safe in-memory mapping from Kafka offsets to timestamps; integrate with ListOffsets handler and topic lifecycle --- weed/mq/kafka/offset/ledger.go | 171 +++++++++++++++ weed/mq/kafka/offset/ledger_test.go | 280 +++++++++++++++++++++++++ weed/mq/kafka/protocol/handler.go | 109 +++++++++- weed/mq/kafka/protocol/handler_test.go | 4 +- 4 files changed, 551 insertions(+), 13 deletions(-) create mode 100644 weed/mq/kafka/offset/ledger.go create mode 100644 weed/mq/kafka/offset/ledger_test.go diff --git a/weed/mq/kafka/offset/ledger.go b/weed/mq/kafka/offset/ledger.go new file mode 100644 index 000000000..b5dcaf75b --- /dev/null +++ b/weed/mq/kafka/offset/ledger.go @@ -0,0 +1,171 @@ +package offset + +import ( + "fmt" + "sort" + "sync" + "time" +) + +// OffsetEntry represents a single offset mapping +type OffsetEntry struct { + KafkaOffset int64 // Kafka offset (sequential integer) + Timestamp int64 // SeaweedMQ timestamp (nanoseconds) + Size int32 // Message size in bytes +} + +// Ledger maintains the mapping between Kafka offsets and SeaweedMQ timestamps +// for a single topic partition +type Ledger struct { + mu sync.RWMutex + entries []OffsetEntry // sorted by KafkaOffset + nextOffset int64 // next offset to assign + earliestTime int64 // timestamp of earliest message + latestTime int64 // timestamp of latest message +} + +// NewLedger creates a new offset ledger starting from offset 0 +func NewLedger() *Ledger { + return &Ledger{ + entries: make([]OffsetEntry, 0, 1000), // pre-allocate for performance + nextOffset: 0, + } +} + +// AssignOffsets reserves a range of consecutive Kafka offsets +// Returns the base offset of the reserved range +func (l *Ledger) AssignOffsets(count int64) int64 { + l.mu.Lock() + defer l.mu.Unlock() + + baseOffset := l.nextOffset + l.nextOffset += count + return baseOffset +} + +// AppendRecord adds a new offset entry to the ledger +// The kafkaOffset should be from a previous AssignOffsets call +func (l *Ledger) AppendRecord(kafkaOffset, timestamp int64, size int32) error { + l.mu.Lock() + defer l.mu.Unlock() + + // Validate offset is in expected range + if kafkaOffset < 0 || kafkaOffset >= l.nextOffset { + return fmt.Errorf("invalid offset %d, expected 0 <= offset < %d", kafkaOffset, l.nextOffset) + } + + // Check for duplicate offset (shouldn't happen in normal operation) + if len(l.entries) > 0 && l.entries[len(l.entries)-1].KafkaOffset >= kafkaOffset { + return fmt.Errorf("offset %d already exists or is out of order", kafkaOffset) + } + + entry := OffsetEntry{ + KafkaOffset: kafkaOffset, + Timestamp: timestamp, + Size: size, + } + + l.entries = append(l.entries, entry) + + // Update earliest/latest timestamps + if l.earliestTime == 0 || timestamp < l.earliestTime { + l.earliestTime = timestamp + } + if timestamp > l.latestTime { + l.latestTime = timestamp + } + + return nil +} + +// GetRecord retrieves the record information for a given Kafka offset +func (l *Ledger) GetRecord(kafkaOffset int64) (timestamp int64, size int32, err error) { + l.mu.RLock() + defer l.mu.RUnlock() + + // Binary search for the offset + idx := sort.Search(len(l.entries), func(i int) bool { + return l.entries[i].KafkaOffset >= kafkaOffset + }) + + if idx >= len(l.entries) || l.entries[idx].KafkaOffset != kafkaOffset { + return 0, 0, fmt.Errorf("offset %d not found", kafkaOffset) + } + + entry := l.entries[idx] + return entry.Timestamp, entry.Size, nil +} + +// GetEarliestOffset returns the smallest Kafka offset in the ledger +func (l *Ledger) GetEarliestOffset() int64 { + l.mu.RLock() + defer l.mu.RUnlock() + + if len(l.entries) == 0 { + return 0 // no messages yet, earliest is 0 + } + return l.entries[0].KafkaOffset +} + +// GetLatestOffset returns the largest Kafka offset in the ledger +func (l *Ledger) GetLatestOffset() int64 { + l.mu.RLock() + defer l.mu.RUnlock() + + if len(l.entries) == 0 { + return 0 // no messages yet, latest is 0 + } + return l.entries[len(l.entries)-1].KafkaOffset +} + +// GetHighWaterMark returns the next offset that will be assigned +// (i.e., one past the latest offset) +func (l *Ledger) GetHighWaterMark() int64 { + l.mu.RLock() + defer l.mu.RUnlock() + return l.nextOffset +} + +// FindOffsetByTimestamp returns the first offset with a timestamp >= target +// Used for timestamp-based offset lookup +func (l *Ledger) FindOffsetByTimestamp(targetTimestamp int64) int64 { + l.mu.RLock() + defer l.mu.RUnlock() + + if len(l.entries) == 0 { + return 0 + } + + // Binary search for first entry with timestamp >= targetTimestamp + idx := sort.Search(len(l.entries), func(i int) bool { + return l.entries[i].Timestamp >= targetTimestamp + }) + + if idx >= len(l.entries) { + // Target timestamp is after all entries, return high water mark + return l.nextOffset + } + + return l.entries[idx].KafkaOffset +} + +// GetStats returns basic statistics about the ledger +func (l *Ledger) GetStats() (entryCount int, earliestTime, latestTime, nextOffset int64) { + l.mu.RLock() + defer l.mu.RUnlock() + + return len(l.entries), l.earliestTime, l.latestTime, l.nextOffset +} + +// GetTimestampRange returns the time range covered by this ledger +func (l *Ledger) GetTimestampRange() (earliest, latest int64) { + l.mu.RLock() + defer l.mu.RUnlock() + + if len(l.entries) == 0 { + now := time.Now().UnixNano() + return now, now // stub values when no data + } + + return l.earliestTime, l.latestTime +} diff --git a/weed/mq/kafka/offset/ledger_test.go b/weed/mq/kafka/offset/ledger_test.go new file mode 100644 index 000000000..1940097ee --- /dev/null +++ b/weed/mq/kafka/offset/ledger_test.go @@ -0,0 +1,280 @@ +package offset + +import ( + "testing" + "time" +) + +func TestLedger_BasicOperations(t *testing.T) { + ledger := NewLedger() + + // Initially empty + if earliest := ledger.GetEarliestOffset(); earliest != 0 { + t.Errorf("earliest offset: got %d, want 0", earliest) + } + if latest := ledger.GetLatestOffset(); latest != 0 { + t.Errorf("latest offset: got %d, want 0", latest) + } + if hwm := ledger.GetHighWaterMark(); hwm != 0 { + t.Errorf("high water mark: got %d, want 0", hwm) + } + + // Assign some offsets + baseOffset1 := ledger.AssignOffsets(3) + if baseOffset1 != 0 { + t.Errorf("first base offset: got %d, want 0", baseOffset1) + } + + baseOffset2 := ledger.AssignOffsets(2) + if baseOffset2 != 3 { + t.Errorf("second base offset: got %d, want 3", baseOffset2) + } + + // High water mark should be updated + if hwm := ledger.GetHighWaterMark(); hwm != 5 { + t.Errorf("high water mark after assignment: got %d, want 5", hwm) + } + + // But no records yet, so earliest/latest still 0 + if latest := ledger.GetLatestOffset(); latest != 0 { + t.Errorf("latest offset with no records: got %d, want 0", latest) + } +} + +func TestLedger_AppendAndRetrieve(t *testing.T) { + ledger := NewLedger() + + // Assign and append some records + baseOffset := ledger.AssignOffsets(3) + if baseOffset != 0 { + t.Fatalf("unexpected base offset: %d", baseOffset) + } + + // Append records with different timestamps + timestamp1 := time.Now().UnixNano() + timestamp2 := timestamp1 + 1000000 // +1ms + timestamp3 := timestamp2 + 2000000 // +2ms + + if err := ledger.AppendRecord(0, timestamp1, 100); err != nil { + t.Fatalf("append record 0: %v", err) + } + if err := ledger.AppendRecord(1, timestamp2, 200); err != nil { + t.Fatalf("append record 1: %v", err) + } + if err := ledger.AppendRecord(2, timestamp3, 150); err != nil { + t.Fatalf("append record 2: %v", err) + } + + // Check earliest/latest + if earliest := ledger.GetEarliestOffset(); earliest != 0 { + t.Errorf("earliest offset: got %d, want 0", earliest) + } + if latest := ledger.GetLatestOffset(); latest != 2 { + t.Errorf("latest offset: got %d, want 2", latest) + } + + // Retrieve records + ts, size, err := ledger.GetRecord(0) + if err != nil { + t.Fatalf("get record 0: %v", err) + } + if ts != timestamp1 { + t.Errorf("record 0 timestamp: got %d, want %d", ts, timestamp1) + } + if size != 100 { + t.Errorf("record 0 size: got %d, want 100", size) + } + + ts, size, err = ledger.GetRecord(2) + if err != nil { + t.Fatalf("get record 2: %v", err) + } + if ts != timestamp3 { + t.Errorf("record 2 timestamp: got %d, want %d", ts, timestamp3) + } + if size != 150 { + t.Errorf("record 2 size: got %d, want 150", size) + } + + // Try to get non-existent record + _, _, err = ledger.GetRecord(5) + if err == nil { + t.Errorf("expected error for non-existent offset 5") + } +} + +func TestLedger_FindOffsetByTimestamp(t *testing.T) { + ledger := NewLedger() + + // Add some records with known timestamps + baseTime := time.Now().UnixNano() + ledger.AssignOffsets(5) + + ledger.AppendRecord(0, baseTime, 100) + ledger.AppendRecord(1, baseTime+1000000, 200) // +1ms + ledger.AppendRecord(2, baseTime+3000000, 150) // +3ms + ledger.AppendRecord(3, baseTime+10000000, 300) // +10ms + ledger.AppendRecord(4, baseTime+20000000, 250) // +20ms + + // Find offset by timestamp + testCases := []struct { + timestamp int64 + expectedOffset int64 + description string + }{ + {baseTime - 1000000, 0, "before first record"}, // Before any record + {baseTime, 0, "exact first timestamp"}, // Exact match first + {baseTime + 500000, 1, "between first and second"}, // Between records + {baseTime + 1000000, 1, "exact second timestamp"}, // Exact match middle + {baseTime + 5000000, 3, "between records"}, // Between records + {baseTime + 20000000, 4, "exact last timestamp"}, // Exact match last + {baseTime + 30000000, 5, "after last record"}, // After all records (should return HWM) + } + + for _, tc := range testCases { + offset := ledger.FindOffsetByTimestamp(tc.timestamp) + if offset != tc.expectedOffset { + t.Errorf("%s: got offset %d, want %d", tc.description, offset, tc.expectedOffset) + } + } +} + +func TestLedger_ErrorConditions(t *testing.T) { + ledger := NewLedger() + + // Try to append without assigning offsets first + err := ledger.AppendRecord(0, time.Now().UnixNano(), 100) + if err == nil { + t.Errorf("expected error when appending without assignment") + } + + // Assign some offsets + ledger.AssignOffsets(3) + + // Try to append out-of-range offset + err = ledger.AppendRecord(5, time.Now().UnixNano(), 100) + if err == nil { + t.Errorf("expected error for out-of-range offset") + } + + // Try negative offset + err = ledger.AppendRecord(-1, time.Now().UnixNano(), 100) + if err == nil { + t.Errorf("expected error for negative offset") + } + + // Append a record successfully + timestamp := time.Now().UnixNano() + if err := ledger.AppendRecord(0, timestamp, 100); err != nil { + t.Fatalf("failed to append valid record: %v", err) + } + + // Try to append the same offset again + err = ledger.AppendRecord(0, timestamp+1000, 200) + if err == nil { + t.Errorf("expected error for duplicate offset") + } +} + +func TestLedger_ConcurrentAccess(t *testing.T) { + ledger := NewLedger() + + // Test concurrent offset assignment + done := make(chan bool, 2) + + go func() { + for i := 0; i < 100; i++ { + ledger.AssignOffsets(1) + } + done <- true + }() + + go func() { + for i := 0; i < 100; i++ { + ledger.AssignOffsets(1) + } + done <- true + }() + + // Wait for both goroutines + <-done + <-done + + // Should have assigned 200 total offsets + hwm := ledger.GetHighWaterMark() + if hwm != 200 { + t.Errorf("concurrent assignment: got HWM %d, want 200", hwm) + } + + // Test concurrent reads + ledger.AssignOffsets(1) + timestamp := time.Now().UnixNano() + ledger.AppendRecord(200, timestamp, 100) + + // Multiple concurrent reads should work fine + readDone := make(chan bool, 5) + for i := 0; i < 5; i++ { + go func() { + ts, size, err := ledger.GetRecord(200) + if err != nil || ts != timestamp || size != 100 { + t.Errorf("concurrent read failed: ts=%d size=%d err=%v", ts, size, err) + } + readDone <- true + }() + } + + // Wait for all reads + for i := 0; i < 5; i++ { + <-readDone + } +} + +func TestLedger_GetStats(t *testing.T) { + ledger := NewLedger() + + // Initially empty + count, earliest, latest, next := ledger.GetStats() + if count != 0 || earliest != 0 || latest != 0 || next != 0 { + t.Errorf("initial stats: got count=%d earliest=%d latest=%d next=%d", count, earliest, latest, next) + } + + // Add some data + baseTime := time.Now().UnixNano() + ledger.AssignOffsets(3) + ledger.AppendRecord(0, baseTime, 100) + ledger.AppendRecord(1, baseTime+1000000, 200) + ledger.AppendRecord(2, baseTime+2000000, 150) + + count, earliest, latest, next = ledger.GetStats() + if count != 3 { + t.Errorf("entry count: got %d, want 3", count) + } + if earliest != baseTime { + t.Errorf("earliest time: got %d, want %d", earliest, baseTime) + } + if latest != baseTime+2000000 { + t.Errorf("latest time: got %d, want %d", latest, baseTime+2000000) + } + if next != 3 { + t.Errorf("next offset: got %d, want 3", next) + } +} + +func TestLedger_EmptyLedgerTimestampLookup(t *testing.T) { + ledger := NewLedger() + + // Empty ledger should handle timestamp queries gracefully + offset := ledger.FindOffsetByTimestamp(time.Now().UnixNano()) + if offset != 0 { + t.Errorf("empty ledger timestamp lookup: got %d, want 0", offset) + } + + earliest, latest := ledger.GetTimestampRange() + if earliest <= 0 || latest <= 0 { + t.Errorf("empty ledger timestamp range: earliest=%d latest=%d", earliest, latest) + } + // For empty ledger, should return current time as both earliest and latest + if earliest != latest { + t.Errorf("empty ledger should have same earliest and latest time") + } +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 52fda1456..860b25dc7 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -8,6 +8,8 @@ import ( "net" "sync" "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" ) // TopicInfo holds basic information about a topic @@ -17,16 +19,64 @@ type TopicInfo struct { CreatedAt int64 } +// TopicPartitionKey uniquely identifies a topic partition +type TopicPartitionKey struct { + Topic string + Partition int32 +} + // Handler processes Kafka protocol requests from clients type Handler struct { topicsMu sync.RWMutex topics map[string]*TopicInfo // topic name -> topic info + + ledgersMu sync.RWMutex + ledgers map[TopicPartitionKey]*offset.Ledger // topic-partition -> offset ledger } func NewHandler() *Handler { return &Handler{ - topics: make(map[string]*TopicInfo), + topics: make(map[string]*TopicInfo), + ledgers: make(map[TopicPartitionKey]*offset.Ledger), + } +} + +// GetOrCreateLedger returns the offset ledger for a topic-partition, creating it if needed +func (h *Handler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger { + key := TopicPartitionKey{Topic: topic, Partition: partition} + + // First try to get existing ledger with read lock + h.ledgersMu.RLock() + ledger, exists := h.ledgers[key] + h.ledgersMu.RUnlock() + + if exists { + return ledger + } + + // Create new ledger with write lock + h.ledgersMu.Lock() + defer h.ledgersMu.Unlock() + + // Double-check after acquiring write lock + if ledger, exists := h.ledgers[key]; exists { + return ledger } + + // Create and store new ledger + ledger = offset.NewLedger() + h.ledgers[key] = ledger + return ledger +} + +// GetLedger returns the offset ledger for a topic-partition, or nil if not found +func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger { + key := TopicPartitionKey{Topic: topic, Partition: partition} + + h.ledgersMu.RLock() + defer h.ledgersMu.RUnlock() + + return h.ledgers[key] } // HandleConn processes a single client connection @@ -278,20 +328,43 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([ // Error code (0 = no error) response = append(response, 0, 0) - // For stub: return the original timestamp for timestamp queries, or current time for earliest/latest + // Get the ledger for this topic-partition + ledger := h.GetOrCreateLedger(string(topicName), int32(partitionID)) + var responseTimestamp int64 var responseOffset int64 - + switch timestamp { case -2: // earliest offset - responseTimestamp = 0 - responseOffset = 0 - case -1: // latest offset - responseTimestamp = 1000000000 // some timestamp - responseOffset = 0 // stub: no messages yet - default: // specific timestamp + responseOffset = ledger.GetEarliestOffset() + if responseOffset == ledger.GetHighWaterMark() { + // No messages yet, return current time + responseTimestamp = time.Now().UnixNano() + } else { + // Get timestamp of earliest message + if ts, _, err := ledger.GetRecord(responseOffset); err == nil { + responseTimestamp = ts + } else { + responseTimestamp = time.Now().UnixNano() + } + } + case -1: // latest offset + responseOffset = ledger.GetLatestOffset() + if responseOffset == 0 && ledger.GetHighWaterMark() == 0 { + // No messages yet + responseTimestamp = time.Now().UnixNano() + responseOffset = 0 + } else { + // Get timestamp of latest message + if ts, _, err := ledger.GetRecord(responseOffset); err == nil { + responseTimestamp = ts + } else { + responseTimestamp = time.Now().UnixNano() + } + } + default: // specific timestamp - find offset by timestamp + responseOffset = ledger.FindOffsetByTimestamp(timestamp) responseTimestamp = timestamp - responseOffset = 0 // stub: no messages at any timestamp } timestampBytes := make([]byte, 8) @@ -417,6 +490,11 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ( Partitions: int32(numPartitions), CreatedAt: time.Now().UnixNano(), } + + // Initialize ledgers for all partitions + for partitionID := int32(0); partitionID < int32(numPartitions); partitionID++ { + h.GetOrCreateLedger(topicName, partitionID) + } } // Error code @@ -500,12 +578,21 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( var errorCode uint16 = 0 var errorMessage string = "" - if _, exists := h.topics[topicName]; !exists { + topicInfo, exists := h.topics[topicName] + if !exists { errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION errorMessage = "Unknown topic" } else { // Delete the topic delete(h.topics, topicName) + + // Clean up associated ledgers + h.ledgersMu.Lock() + for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ { + key := TopicPartitionKey{Topic: topicName, Partition: partitionID} + delete(h.ledgers, key) + } + h.ledgersMu.Unlock() } // Error code diff --git a/weed/mq/kafka/protocol/handler_test.go b/weed/mq/kafka/protocol/handler_test.go index 3d881fc86..f15689c6d 100644 --- a/weed/mq/kafka/protocol/handler_test.go +++ b/weed/mq/kafka/protocol/handler_test.go @@ -408,8 +408,8 @@ func TestHandler_handleListOffsets(t *testing.T) { timestamp := int64(binary.BigEndian.Uint64(response[offset : offset+8])) offset += 8 - if timestamp != 0 { - t.Errorf("partition 0 timestamp: got %d, want 0", timestamp) + if timestamp <= 0 { + t.Errorf("partition 0 timestamp: got %d, want > 0", timestamp) } offsetValue := int64(binary.BigEndian.Uint64(response[offset : offset+8]))