Browse Source

mq(kafka): implement offset ledger system with thread-safe in-memory mapping from Kafka offsets to timestamps; integrate with ListOffsets handler and topic lifecycle

pull/7231/head
chrislu 2 months ago
parent
commit
3eaff0e787
  1. 171
      weed/mq/kafka/offset/ledger.go
  2. 280
      weed/mq/kafka/offset/ledger_test.go
  3. 109
      weed/mq/kafka/protocol/handler.go
  4. 4
      weed/mq/kafka/protocol/handler_test.go

171
weed/mq/kafka/offset/ledger.go

@ -0,0 +1,171 @@
package offset
import (
"fmt"
"sort"
"sync"
"time"
)
// OffsetEntry represents a single offset mapping
type OffsetEntry struct {
KafkaOffset int64 // Kafka offset (sequential integer)
Timestamp int64 // SeaweedMQ timestamp (nanoseconds)
Size int32 // Message size in bytes
}
// Ledger maintains the mapping between Kafka offsets and SeaweedMQ timestamps
// for a single topic partition
type Ledger struct {
mu sync.RWMutex
entries []OffsetEntry // sorted by KafkaOffset
nextOffset int64 // next offset to assign
earliestTime int64 // timestamp of earliest message
latestTime int64 // timestamp of latest message
}
// NewLedger creates a new offset ledger starting from offset 0
func NewLedger() *Ledger {
return &Ledger{
entries: make([]OffsetEntry, 0, 1000), // pre-allocate for performance
nextOffset: 0,
}
}
// AssignOffsets reserves a range of consecutive Kafka offsets
// Returns the base offset of the reserved range
func (l *Ledger) AssignOffsets(count int64) int64 {
l.mu.Lock()
defer l.mu.Unlock()
baseOffset := l.nextOffset
l.nextOffset += count
return baseOffset
}
// AppendRecord adds a new offset entry to the ledger
// The kafkaOffset should be from a previous AssignOffsets call
func (l *Ledger) AppendRecord(kafkaOffset, timestamp int64, size int32) error {
l.mu.Lock()
defer l.mu.Unlock()
// Validate offset is in expected range
if kafkaOffset < 0 || kafkaOffset >= l.nextOffset {
return fmt.Errorf("invalid offset %d, expected 0 <= offset < %d", kafkaOffset, l.nextOffset)
}
// Check for duplicate offset (shouldn't happen in normal operation)
if len(l.entries) > 0 && l.entries[len(l.entries)-1].KafkaOffset >= kafkaOffset {
return fmt.Errorf("offset %d already exists or is out of order", kafkaOffset)
}
entry := OffsetEntry{
KafkaOffset: kafkaOffset,
Timestamp: timestamp,
Size: size,
}
l.entries = append(l.entries, entry)
// Update earliest/latest timestamps
if l.earliestTime == 0 || timestamp < l.earliestTime {
l.earliestTime = timestamp
}
if timestamp > l.latestTime {
l.latestTime = timestamp
}
return nil
}
// GetRecord retrieves the record information for a given Kafka offset
func (l *Ledger) GetRecord(kafkaOffset int64) (timestamp int64, size int32, err error) {
l.mu.RLock()
defer l.mu.RUnlock()
// Binary search for the offset
idx := sort.Search(len(l.entries), func(i int) bool {
return l.entries[i].KafkaOffset >= kafkaOffset
})
if idx >= len(l.entries) || l.entries[idx].KafkaOffset != kafkaOffset {
return 0, 0, fmt.Errorf("offset %d not found", kafkaOffset)
}
entry := l.entries[idx]
return entry.Timestamp, entry.Size, nil
}
// GetEarliestOffset returns the smallest Kafka offset in the ledger
func (l *Ledger) GetEarliestOffset() int64 {
l.mu.RLock()
defer l.mu.RUnlock()
if len(l.entries) == 0 {
return 0 // no messages yet, earliest is 0
}
return l.entries[0].KafkaOffset
}
// GetLatestOffset returns the largest Kafka offset in the ledger
func (l *Ledger) GetLatestOffset() int64 {
l.mu.RLock()
defer l.mu.RUnlock()
if len(l.entries) == 0 {
return 0 // no messages yet, latest is 0
}
return l.entries[len(l.entries)-1].KafkaOffset
}
// GetHighWaterMark returns the next offset that will be assigned
// (i.e., one past the latest offset)
func (l *Ledger) GetHighWaterMark() int64 {
l.mu.RLock()
defer l.mu.RUnlock()
return l.nextOffset
}
// FindOffsetByTimestamp returns the first offset with a timestamp >= target
// Used for timestamp-based offset lookup
func (l *Ledger) FindOffsetByTimestamp(targetTimestamp int64) int64 {
l.mu.RLock()
defer l.mu.RUnlock()
if len(l.entries) == 0 {
return 0
}
// Binary search for first entry with timestamp >= targetTimestamp
idx := sort.Search(len(l.entries), func(i int) bool {
return l.entries[i].Timestamp >= targetTimestamp
})
if idx >= len(l.entries) {
// Target timestamp is after all entries, return high water mark
return l.nextOffset
}
return l.entries[idx].KafkaOffset
}
// GetStats returns basic statistics about the ledger
func (l *Ledger) GetStats() (entryCount int, earliestTime, latestTime, nextOffset int64) {
l.mu.RLock()
defer l.mu.RUnlock()
return len(l.entries), l.earliestTime, l.latestTime, l.nextOffset
}
// GetTimestampRange returns the time range covered by this ledger
func (l *Ledger) GetTimestampRange() (earliest, latest int64) {
l.mu.RLock()
defer l.mu.RUnlock()
if len(l.entries) == 0 {
now := time.Now().UnixNano()
return now, now // stub values when no data
}
return l.earliestTime, l.latestTime
}

280
weed/mq/kafka/offset/ledger_test.go

@ -0,0 +1,280 @@
package offset
import (
"testing"
"time"
)
func TestLedger_BasicOperations(t *testing.T) {
ledger := NewLedger()
// Initially empty
if earliest := ledger.GetEarliestOffset(); earliest != 0 {
t.Errorf("earliest offset: got %d, want 0", earliest)
}
if latest := ledger.GetLatestOffset(); latest != 0 {
t.Errorf("latest offset: got %d, want 0", latest)
}
if hwm := ledger.GetHighWaterMark(); hwm != 0 {
t.Errorf("high water mark: got %d, want 0", hwm)
}
// Assign some offsets
baseOffset1 := ledger.AssignOffsets(3)
if baseOffset1 != 0 {
t.Errorf("first base offset: got %d, want 0", baseOffset1)
}
baseOffset2 := ledger.AssignOffsets(2)
if baseOffset2 != 3 {
t.Errorf("second base offset: got %d, want 3", baseOffset2)
}
// High water mark should be updated
if hwm := ledger.GetHighWaterMark(); hwm != 5 {
t.Errorf("high water mark after assignment: got %d, want 5", hwm)
}
// But no records yet, so earliest/latest still 0
if latest := ledger.GetLatestOffset(); latest != 0 {
t.Errorf("latest offset with no records: got %d, want 0", latest)
}
}
func TestLedger_AppendAndRetrieve(t *testing.T) {
ledger := NewLedger()
// Assign and append some records
baseOffset := ledger.AssignOffsets(3)
if baseOffset != 0 {
t.Fatalf("unexpected base offset: %d", baseOffset)
}
// Append records with different timestamps
timestamp1 := time.Now().UnixNano()
timestamp2 := timestamp1 + 1000000 // +1ms
timestamp3 := timestamp2 + 2000000 // +2ms
if err := ledger.AppendRecord(0, timestamp1, 100); err != nil {
t.Fatalf("append record 0: %v", err)
}
if err := ledger.AppendRecord(1, timestamp2, 200); err != nil {
t.Fatalf("append record 1: %v", err)
}
if err := ledger.AppendRecord(2, timestamp3, 150); err != nil {
t.Fatalf("append record 2: %v", err)
}
// Check earliest/latest
if earliest := ledger.GetEarliestOffset(); earliest != 0 {
t.Errorf("earliest offset: got %d, want 0", earliest)
}
if latest := ledger.GetLatestOffset(); latest != 2 {
t.Errorf("latest offset: got %d, want 2", latest)
}
// Retrieve records
ts, size, err := ledger.GetRecord(0)
if err != nil {
t.Fatalf("get record 0: %v", err)
}
if ts != timestamp1 {
t.Errorf("record 0 timestamp: got %d, want %d", ts, timestamp1)
}
if size != 100 {
t.Errorf("record 0 size: got %d, want 100", size)
}
ts, size, err = ledger.GetRecord(2)
if err != nil {
t.Fatalf("get record 2: %v", err)
}
if ts != timestamp3 {
t.Errorf("record 2 timestamp: got %d, want %d", ts, timestamp3)
}
if size != 150 {
t.Errorf("record 2 size: got %d, want 150", size)
}
// Try to get non-existent record
_, _, err = ledger.GetRecord(5)
if err == nil {
t.Errorf("expected error for non-existent offset 5")
}
}
func TestLedger_FindOffsetByTimestamp(t *testing.T) {
ledger := NewLedger()
// Add some records with known timestamps
baseTime := time.Now().UnixNano()
ledger.AssignOffsets(5)
ledger.AppendRecord(0, baseTime, 100)
ledger.AppendRecord(1, baseTime+1000000, 200) // +1ms
ledger.AppendRecord(2, baseTime+3000000, 150) // +3ms
ledger.AppendRecord(3, baseTime+10000000, 300) // +10ms
ledger.AppendRecord(4, baseTime+20000000, 250) // +20ms
// Find offset by timestamp
testCases := []struct {
timestamp int64
expectedOffset int64
description string
}{
{baseTime - 1000000, 0, "before first record"}, // Before any record
{baseTime, 0, "exact first timestamp"}, // Exact match first
{baseTime + 500000, 1, "between first and second"}, // Between records
{baseTime + 1000000, 1, "exact second timestamp"}, // Exact match middle
{baseTime + 5000000, 3, "between records"}, // Between records
{baseTime + 20000000, 4, "exact last timestamp"}, // Exact match last
{baseTime + 30000000, 5, "after last record"}, // After all records (should return HWM)
}
for _, tc := range testCases {
offset := ledger.FindOffsetByTimestamp(tc.timestamp)
if offset != tc.expectedOffset {
t.Errorf("%s: got offset %d, want %d", tc.description, offset, tc.expectedOffset)
}
}
}
func TestLedger_ErrorConditions(t *testing.T) {
ledger := NewLedger()
// Try to append without assigning offsets first
err := ledger.AppendRecord(0, time.Now().UnixNano(), 100)
if err == nil {
t.Errorf("expected error when appending without assignment")
}
// Assign some offsets
ledger.AssignOffsets(3)
// Try to append out-of-range offset
err = ledger.AppendRecord(5, time.Now().UnixNano(), 100)
if err == nil {
t.Errorf("expected error for out-of-range offset")
}
// Try negative offset
err = ledger.AppendRecord(-1, time.Now().UnixNano(), 100)
if err == nil {
t.Errorf("expected error for negative offset")
}
// Append a record successfully
timestamp := time.Now().UnixNano()
if err := ledger.AppendRecord(0, timestamp, 100); err != nil {
t.Fatalf("failed to append valid record: %v", err)
}
// Try to append the same offset again
err = ledger.AppendRecord(0, timestamp+1000, 200)
if err == nil {
t.Errorf("expected error for duplicate offset")
}
}
func TestLedger_ConcurrentAccess(t *testing.T) {
ledger := NewLedger()
// Test concurrent offset assignment
done := make(chan bool, 2)
go func() {
for i := 0; i < 100; i++ {
ledger.AssignOffsets(1)
}
done <- true
}()
go func() {
for i := 0; i < 100; i++ {
ledger.AssignOffsets(1)
}
done <- true
}()
// Wait for both goroutines
<-done
<-done
// Should have assigned 200 total offsets
hwm := ledger.GetHighWaterMark()
if hwm != 200 {
t.Errorf("concurrent assignment: got HWM %d, want 200", hwm)
}
// Test concurrent reads
ledger.AssignOffsets(1)
timestamp := time.Now().UnixNano()
ledger.AppendRecord(200, timestamp, 100)
// Multiple concurrent reads should work fine
readDone := make(chan bool, 5)
for i := 0; i < 5; i++ {
go func() {
ts, size, err := ledger.GetRecord(200)
if err != nil || ts != timestamp || size != 100 {
t.Errorf("concurrent read failed: ts=%d size=%d err=%v", ts, size, err)
}
readDone <- true
}()
}
// Wait for all reads
for i := 0; i < 5; i++ {
<-readDone
}
}
func TestLedger_GetStats(t *testing.T) {
ledger := NewLedger()
// Initially empty
count, earliest, latest, next := ledger.GetStats()
if count != 0 || earliest != 0 || latest != 0 || next != 0 {
t.Errorf("initial stats: got count=%d earliest=%d latest=%d next=%d", count, earliest, latest, next)
}
// Add some data
baseTime := time.Now().UnixNano()
ledger.AssignOffsets(3)
ledger.AppendRecord(0, baseTime, 100)
ledger.AppendRecord(1, baseTime+1000000, 200)
ledger.AppendRecord(2, baseTime+2000000, 150)
count, earliest, latest, next = ledger.GetStats()
if count != 3 {
t.Errorf("entry count: got %d, want 3", count)
}
if earliest != baseTime {
t.Errorf("earliest time: got %d, want %d", earliest, baseTime)
}
if latest != baseTime+2000000 {
t.Errorf("latest time: got %d, want %d", latest, baseTime+2000000)
}
if next != 3 {
t.Errorf("next offset: got %d, want 3", next)
}
}
func TestLedger_EmptyLedgerTimestampLookup(t *testing.T) {
ledger := NewLedger()
// Empty ledger should handle timestamp queries gracefully
offset := ledger.FindOffsetByTimestamp(time.Now().UnixNano())
if offset != 0 {
t.Errorf("empty ledger timestamp lookup: got %d, want 0", offset)
}
earliest, latest := ledger.GetTimestampRange()
if earliest <= 0 || latest <= 0 {
t.Errorf("empty ledger timestamp range: earliest=%d latest=%d", earliest, latest)
}
// For empty ledger, should return current time as both earliest and latest
if earliest != latest {
t.Errorf("empty ledger should have same earliest and latest time")
}
}

109
weed/mq/kafka/protocol/handler.go

@ -8,6 +8,8 @@ import (
"net"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
)
// TopicInfo holds basic information about a topic
@ -17,16 +19,64 @@ type TopicInfo struct {
CreatedAt int64
}
// TopicPartitionKey uniquely identifies a topic partition
type TopicPartitionKey struct {
Topic string
Partition int32
}
// Handler processes Kafka protocol requests from clients
type Handler struct {
topicsMu sync.RWMutex
topics map[string]*TopicInfo // topic name -> topic info
ledgersMu sync.RWMutex
ledgers map[TopicPartitionKey]*offset.Ledger // topic-partition -> offset ledger
}
func NewHandler() *Handler {
return &Handler{
topics: make(map[string]*TopicInfo),
topics: make(map[string]*TopicInfo),
ledgers: make(map[TopicPartitionKey]*offset.Ledger),
}
}
// GetOrCreateLedger returns the offset ledger for a topic-partition, creating it if needed
func (h *Handler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
key := TopicPartitionKey{Topic: topic, Partition: partition}
// First try to get existing ledger with read lock
h.ledgersMu.RLock()
ledger, exists := h.ledgers[key]
h.ledgersMu.RUnlock()
if exists {
return ledger
}
// Create new ledger with write lock
h.ledgersMu.Lock()
defer h.ledgersMu.Unlock()
// Double-check after acquiring write lock
if ledger, exists := h.ledgers[key]; exists {
return ledger
}
// Create and store new ledger
ledger = offset.NewLedger()
h.ledgers[key] = ledger
return ledger
}
// GetLedger returns the offset ledger for a topic-partition, or nil if not found
func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger {
key := TopicPartitionKey{Topic: topic, Partition: partition}
h.ledgersMu.RLock()
defer h.ledgersMu.RUnlock()
return h.ledgers[key]
}
// HandleConn processes a single client connection
@ -278,20 +328,43 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([
// Error code (0 = no error)
response = append(response, 0, 0)
// For stub: return the original timestamp for timestamp queries, or current time for earliest/latest
// Get the ledger for this topic-partition
ledger := h.GetOrCreateLedger(string(topicName), int32(partitionID))
var responseTimestamp int64
var responseOffset int64
switch timestamp {
case -2: // earliest offset
responseTimestamp = 0
responseOffset = 0
case -1: // latest offset
responseTimestamp = 1000000000 // some timestamp
responseOffset = 0 // stub: no messages yet
default: // specific timestamp
responseOffset = ledger.GetEarliestOffset()
if responseOffset == ledger.GetHighWaterMark() {
// No messages yet, return current time
responseTimestamp = time.Now().UnixNano()
} else {
// Get timestamp of earliest message
if ts, _, err := ledger.GetRecord(responseOffset); err == nil {
responseTimestamp = ts
} else {
responseTimestamp = time.Now().UnixNano()
}
}
case -1: // latest offset
responseOffset = ledger.GetLatestOffset()
if responseOffset == 0 && ledger.GetHighWaterMark() == 0 {
// No messages yet
responseTimestamp = time.Now().UnixNano()
responseOffset = 0
} else {
// Get timestamp of latest message
if ts, _, err := ledger.GetRecord(responseOffset); err == nil {
responseTimestamp = ts
} else {
responseTimestamp = time.Now().UnixNano()
}
}
default: // specific timestamp - find offset by timestamp
responseOffset = ledger.FindOffsetByTimestamp(timestamp)
responseTimestamp = timestamp
responseOffset = 0 // stub: no messages at any timestamp
}
timestampBytes := make([]byte, 8)
@ -417,6 +490,11 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) (
Partitions: int32(numPartitions),
CreatedAt: time.Now().UnixNano(),
}
// Initialize ledgers for all partitions
for partitionID := int32(0); partitionID < int32(numPartitions); partitionID++ {
h.GetOrCreateLedger(topicName, partitionID)
}
}
// Error code
@ -500,12 +578,21 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) (
var errorCode uint16 = 0
var errorMessage string = ""
if _, exists := h.topics[topicName]; !exists {
topicInfo, exists := h.topics[topicName]
if !exists {
errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
errorMessage = "Unknown topic"
} else {
// Delete the topic
delete(h.topics, topicName)
// Clean up associated ledgers
h.ledgersMu.Lock()
for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ {
key := TopicPartitionKey{Topic: topicName, Partition: partitionID}
delete(h.ledgers, key)
}
h.ledgersMu.Unlock()
}
// Error code

4
weed/mq/kafka/protocol/handler_test.go

@ -408,8 +408,8 @@ func TestHandler_handleListOffsets(t *testing.T) {
timestamp := int64(binary.BigEndian.Uint64(response[offset : offset+8]))
offset += 8
if timestamp != 0 {
t.Errorf("partition 0 timestamp: got %d, want 0", timestamp)
if timestamp <= 0 {
t.Errorf("partition 0 timestamp: got %d, want > 0", timestamp)
}
offsetValue := int64(binary.BigEndian.Uint64(response[offset : offset+8]))

Loading…
Cancel
Save