Browse Source
Phase 4: Clean up old SMQIntegratedStorage and fix compilation
Phase 4: Clean up old SMQIntegratedStorage and fix compilation
- Remove old SMQIntegratedStorage implementation from persistence.go - Update all integration modules to use SMQOffsetStorage instead - Add delegation methods to PersistentLedger for backward compatibility - Fix method signatures and compilation errors - Maintain support for legacy offset operations through SeaweedMQStoragepull/7231/head
4 changed files with 126 additions and 479 deletions
-
6weed/mq/kafka/integration/persistent_handler.go
-
6weed/mq/kafka/integration/smq_publisher.go
-
6weed/mq/kafka/integration/smq_subscriber.go
-
577weed/mq/kafka/offset/persistence.go
@ -1,552 +1,193 @@ |
|||||
package offset |
package offset |
||||
|
|
||||
import ( |
import ( |
||||
"context" |
|
||||
"encoding/json" |
|
||||
"fmt" |
"fmt" |
||||
"sort" |
|
||||
"strconv" |
|
||||
"strings" |
|
||||
"sync" |
"sync" |
||||
"time" |
|
||||
|
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer" |
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer_client" |
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|
||||
"google.golang.org/grpc" |
|
||||
) |
) |
||||
|
|
||||
// PersistentLedger extends Ledger with persistence capabilities
|
|
||||
type PersistentLedger struct { |
|
||||
*Ledger |
|
||||
topicPartition string |
|
||||
storage LedgerStorage |
|
||||
} |
|
||||
|
|
||||
// ConsumerOffsetKey represents the full key for consumer offset storage
|
|
||||
type ConsumerOffsetKey struct { |
|
||||
Topic string |
|
||||
Partition int32 |
|
||||
ConsumerGroup string |
|
||||
ConsumerGroupInstance string // Optional - can be empty
|
|
||||
} |
|
||||
|
|
||||
// String returns the string representation for use as map key
|
|
||||
func (k ConsumerOffsetKey) String() string { |
|
||||
if k.ConsumerGroupInstance != "" { |
|
||||
return fmt.Sprintf("%s:%d:%s:%s", k.Topic, k.Partition, k.ConsumerGroup, k.ConsumerGroupInstance) |
|
||||
} |
|
||||
return fmt.Sprintf("%s:%d:%s", k.Topic, k.Partition, k.ConsumerGroup) |
|
||||
} |
|
||||
|
|
||||
// LedgerStorage interface for persisting consumer group offset mappings
|
|
||||
|
// LedgerStorage interface for consumer offset persistence
|
||||
type LedgerStorage interface { |
type LedgerStorage interface { |
||||
// SaveConsumerOffset persists a consumer's committed Kafka offset -> SMQ timestamp mapping
|
|
||||
SaveConsumerOffset(key ConsumerOffsetKey, kafkaOffset, smqTimestamp int64, size int32) error |
SaveConsumerOffset(key ConsumerOffsetKey, kafkaOffset, smqTimestamp int64, size int32) error |
||||
|
|
||||
// LoadConsumerOffsets restores all offset mappings for a consumer group's topic-partition
|
|
||||
LoadConsumerOffsets(key ConsumerOffsetKey) ([]OffsetEntry, error) |
LoadConsumerOffsets(key ConsumerOffsetKey) ([]OffsetEntry, error) |
||||
|
|
||||
// GetConsumerHighWaterMark returns the highest committed Kafka offset for a consumer
|
|
||||
GetConsumerHighWaterMark(key ConsumerOffsetKey) (int64, error) |
GetConsumerHighWaterMark(key ConsumerOffsetKey) (int64, error) |
||||
|
Close() error |
||||
|
|
||||
// Legacy methods for backward compatibility (deprecated)
|
|
||||
|
// Legacy methods for backward compatibility
|
||||
SaveOffsetMapping(topicPartition string, kafkaOffset, smqTimestamp int64, size int32) error |
SaveOffsetMapping(topicPartition string, kafkaOffset, smqTimestamp int64, size int32) error |
||||
LoadOffsetMappings(topicPartition string) ([]OffsetEntry, error) |
LoadOffsetMappings(topicPartition string) ([]OffsetEntry, error) |
||||
GetHighWaterMark(topicPartition string) (int64, error) |
GetHighWaterMark(topicPartition string) (int64, error) |
||||
} |
} |
||||
|
|
||||
// NewPersistentLedger creates a ledger that persists to storage
|
|
||||
func NewPersistentLedger(topicPartition string, storage LedgerStorage) (*PersistentLedger, error) { |
|
||||
// Try to restore from storage (legacy method for backward compatibility)
|
|
||||
entries, err := storage.LoadOffsetMappings(topicPartition) |
|
||||
if err != nil { |
|
||||
return nil, fmt.Errorf("failed to load offset mappings: %w", err) |
|
||||
} |
|
||||
|
|
||||
// Determine next offset
|
|
||||
var nextOffset int64 = 0 |
|
||||
if len(entries) > 0 { |
|
||||
// Sort entries by offset to find the highest
|
|
||||
sort.Slice(entries, func(i, j int) bool { |
|
||||
return entries[i].KafkaOffset < entries[j].KafkaOffset |
|
||||
}) |
|
||||
nextOffset = entries[len(entries)-1].KafkaOffset + 1 |
|
||||
} |
|
||||
|
|
||||
// Create base ledger with restored state
|
|
||||
ledger := &Ledger{ |
|
||||
entries: entries, |
|
||||
nextOffset: nextOffset, |
|
||||
} |
|
||||
|
|
||||
// Update earliest/latest timestamps
|
|
||||
if len(entries) > 0 { |
|
||||
ledger.earliestTime = entries[0].Timestamp |
|
||||
ledger.latestTime = entries[len(entries)-1].Timestamp |
|
||||
} |
|
||||
|
|
||||
return &PersistentLedger{ |
|
||||
Ledger: ledger, |
|
||||
topicPartition: topicPartition, |
|
||||
storage: storage, |
|
||||
}, nil |
|
||||
} |
|
||||
|
|
||||
// AppendRecord persists the offset mapping in addition to in-memory storage
|
|
||||
func (pl *PersistentLedger) AppendRecord(kafkaOffset, timestamp int64, size int32) error { |
|
||||
// First persist to storage (legacy method for backward compatibility)
|
|
||||
if err := pl.storage.SaveOffsetMapping(pl.topicPartition, kafkaOffset, timestamp, size); err != nil { |
|
||||
return fmt.Errorf("failed to persist offset mapping: %w", err) |
|
||||
} |
|
||||
|
|
||||
// Then update in-memory ledger
|
|
||||
return pl.Ledger.AppendRecord(kafkaOffset, timestamp, size) |
|
||||
} |
|
||||
|
|
||||
// GetEntries returns the offset entries from the underlying ledger
|
|
||||
func (pl *PersistentLedger) GetEntries() []OffsetEntry { |
|
||||
return pl.Ledger.GetEntries() |
|
||||
} |
|
||||
|
|
||||
// SMQIntegratedStorage implements LedgerStorage using SMQ's in-memory replication pattern
|
|
||||
// This approach avoids the scalability issue by using checkpoints instead of reading full history
|
|
||||
type SMQIntegratedStorage struct { |
|
||||
filerAddress string |
|
||||
filerClientAccessor *filer_client.FilerClientAccessor |
|
||||
|
|
||||
// In-memory replicated state (SMQ pattern)
|
|
||||
ledgers sync.Map // map[ConsumerOffsetKey.String()]*ReplicatedOffsetLedger
|
|
||||
|
|
||||
// Configuration
|
|
||||
checkpointInterval time.Duration |
|
||||
maxMemoryMappings int |
|
||||
ctx context.Context |
|
||||
cancel context.CancelFunc |
|
||||
} |
|
||||
|
|
||||
// ReplicatedOffsetLedger represents in-memory consumer offset state with checkpoint persistence
|
|
||||
type ReplicatedOffsetLedger struct { |
|
||||
consumerKey ConsumerOffsetKey |
|
||||
|
|
||||
// In-memory mappings (recent entries only)
|
|
||||
mappings sync.Map // map[int64]*OffsetEntry
|
|
||||
currentOffset int64 |
|
||||
maxOffset int64 |
|
||||
|
|
||||
// Checkpoint state
|
|
||||
lastCheckpoint int64 |
|
||||
lastCheckpointTime time.Time |
|
||||
lastPersistTime time.Time |
|
||||
|
|
||||
// State management
|
|
||||
mu sync.RWMutex |
|
||||
needsPersistence bool |
|
||||
} |
|
||||
|
|
||||
// NewSMQIntegratedStorage creates SMQ-integrated offset storage
|
|
||||
// This uses SMQ's proven in-memory replication + checkpoint persistence pattern
|
|
||||
func NewSMQIntegratedStorage(brokers []string) (*SMQIntegratedStorage, error) { |
|
||||
if len(brokers) == 0 { |
|
||||
return nil, fmt.Errorf("no brokers provided") |
|
||||
} |
|
||||
|
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
|
||||
|
|
||||
// Use first broker as filer address (brokers typically run co-located with filer)
|
|
||||
// In SMQ architecture, brokers connect to local filer instances
|
|
||||
filerAddress := brokers[0] |
|
||||
|
|
||||
// Create filer client accessor (like SMQ does)
|
|
||||
filerClientAccessor := &filer_client.FilerClientAccessor{ |
|
||||
GetFiler: func() pb.ServerAddress { |
|
||||
return pb.ServerAddress(filerAddress) |
|
||||
}, |
|
||||
GetGrpcDialOption: func() grpc.DialOption { |
|
||||
return grpc.WithInsecure() |
|
||||
}, |
|
||||
} |
|
||||
|
|
||||
storage := &SMQIntegratedStorage{ |
|
||||
filerAddress: filerAddress, |
|
||||
filerClientAccessor: filerClientAccessor, |
|
||||
checkpointInterval: 30 * time.Second, // SMQ-style periodic checkpoints
|
|
||||
maxMemoryMappings: 10000, // Keep recent mappings in memory
|
|
||||
ctx: ctx, |
|
||||
cancel: cancel, |
|
||||
} |
|
||||
|
|
||||
// Start background checkpoint persistence (SMQ pattern)
|
|
||||
go storage.backgroundCheckpointPersistence() |
|
||||
|
|
||||
return storage, nil |
|
||||
} |
|
||||
|
|
||||
// SaveConsumerOffset stores consumer offset mapping in memory (SMQ pattern) and triggers checkpoint if needed
|
|
||||
func (s *SMQIntegratedStorage) SaveConsumerOffset(key ConsumerOffsetKey, kafkaOffset, smqTimestamp int64, size int32) error { |
|
||||
// Get or create replicated ledger for this consumer
|
|
||||
ledger := s.getOrCreateLedger(key) |
|
||||
|
|
||||
// Update in-memory state (like SMQ subscriber offsets)
|
|
||||
entry := &OffsetEntry{ |
|
||||
KafkaOffset: kafkaOffset, |
|
||||
Timestamp: smqTimestamp, |
|
||||
Size: size, |
|
||||
} |
|
||||
|
|
||||
ledger.mu.Lock() |
|
||||
ledger.mappings.Store(kafkaOffset, entry) |
|
||||
ledger.currentOffset = kafkaOffset |
|
||||
if kafkaOffset > ledger.maxOffset { |
|
||||
ledger.maxOffset = kafkaOffset |
|
||||
} |
|
||||
ledger.needsPersistence = true |
|
||||
ledger.mu.Unlock() |
|
||||
|
|
||||
// Trigger checkpoint if threshold reached (SMQ pattern)
|
|
||||
if s.shouldCheckpoint(ledger) { |
|
||||
return s.persistCheckpoint(ledger) |
|
||||
} |
|
||||
|
|
||||
return nil |
|
||||
} |
|
||||
|
|
||||
// LoadConsumerOffsets loads checkpoint + in-memory state (SMQ pattern) - O(1) instead of O(n)!
|
|
||||
func (s *SMQIntegratedStorage) LoadConsumerOffsets(key ConsumerOffsetKey) ([]OffsetEntry, error) { |
|
||||
ledger := s.getOrCreateLedger(key) |
|
||||
|
|
||||
// Load from checkpoint if not already loaded (SMQ pattern)
|
|
||||
if err := s.loadCheckpointIfNeeded(ledger); err != nil { |
|
||||
return nil, fmt.Errorf("failed to load checkpoint: %w", err) |
|
||||
|
// ConsumerOffsetKey represents the full key for consumer offsets
|
||||
|
type ConsumerOffsetKey struct { |
||||
|
Topic string `json:"topic"` |
||||
|
Partition int32 `json:"partition"` |
||||
|
ConsumerGroup string `json:"consumer_group"` |
||||
|
ConsumerGroupInstance string `json:"consumer_group_instance,omitempty"` // Optional static membership ID
|
||||
} |
} |
||||
|
|
||||
// Return current in-memory state (fast!)
|
|
||||
return s.getCurrentMappings(ledger), nil |
|
||||
|
func (k ConsumerOffsetKey) String() string { |
||||
|
if k.ConsumerGroupInstance != "" { |
||||
|
return fmt.Sprintf("%s:%d:%s:%s", k.Topic, k.Partition, k.ConsumerGroup, k.ConsumerGroupInstance) |
||||
} |
} |
||||
|
|
||||
// GetConsumerHighWaterMark returns consumer's next offset from in-memory state (fast!)
|
|
||||
func (s *SMQIntegratedStorage) GetConsumerHighWaterMark(key ConsumerOffsetKey) (int64, error) { |
|
||||
ledger := s.getOrCreateLedger(key) |
|
||||
|
|
||||
// Load checkpoint if needed
|
|
||||
if err := s.loadCheckpointIfNeeded(ledger); err != nil { |
|
||||
return 0, fmt.Errorf("failed to load checkpoint: %w", err) |
|
||||
|
return fmt.Sprintf("%s:%d:%s", k.Topic, k.Partition, k.ConsumerGroup) |
||||
} |
} |
||||
|
|
||||
ledger.mu.RLock() |
|
||||
maxOffset := ledger.maxOffset |
|
||||
ledger.mu.RUnlock() |
|
||||
|
// OffsetEntry is already defined in ledger.go
|
||||
|
|
||||
if maxOffset < 0 { |
|
||||
return 0, nil |
|
||||
|
// Legacy storage implementation using SeaweedMQ's ledgers
|
||||
|
// This is kept for backward compatibility but should not be used for new deployments
|
||||
|
type SeaweedMQStorage struct { |
||||
|
ledgersMu sync.RWMutex |
||||
|
ledgers map[string]*Ledger // key: topic-partition OR ConsumerOffsetKey.String()
|
||||
} |
} |
||||
return maxOffset + 1, nil |
|
||||
} |
|
||||
|
|
||||
// Close persists all pending checkpoints and shuts down (SMQ pattern)
|
|
||||
func (s *SMQIntegratedStorage) Close() error { |
|
||||
s.cancel() |
|
||||
|
|
||||
// Persist all ledgers before shutdown (like SMQ on disconnect)
|
|
||||
s.ledgers.Range(func(key, value interface{}) bool { |
|
||||
ledger := value.(*ReplicatedOffsetLedger) |
|
||||
if ledger.needsPersistence { |
|
||||
s.persistCheckpoint(ledger) |
|
||||
|
// NewSeaweedMQStorage creates a SeaweedMQ-compatible storage backend
|
||||
|
func NewSeaweedMQStorage() *SeaweedMQStorage { |
||||
|
return &SeaweedMQStorage{ |
||||
|
ledgers: make(map[string]*Ledger), |
||||
} |
} |
||||
return true |
|
||||
}) |
|
||||
|
|
||||
return nil |
|
||||
} |
} |
||||
|
|
||||
// SMQ-style helper methods for in-memory replication + checkpoint persistence
|
|
||||
|
|
||||
// getOrCreateLedger gets or creates in-memory consumer ledger (SMQ pattern)
|
|
||||
func (s *SMQIntegratedStorage) getOrCreateLedger(key ConsumerOffsetKey) *ReplicatedOffsetLedger { |
|
||||
|
func (s *SeaweedMQStorage) SaveConsumerOffset(key ConsumerOffsetKey, kafkaOffset, smqTimestamp int64, size int32) error { |
||||
keyStr := key.String() |
keyStr := key.String() |
||||
if existing, ok := s.ledgers.Load(keyStr); ok { |
|
||||
return existing.(*ReplicatedOffsetLedger) |
|
||||
} |
|
||||
|
|
||||
// Create new consumer ledger
|
|
||||
ledger := &ReplicatedOffsetLedger{ |
|
||||
consumerKey: key, |
|
||||
currentOffset: -1, |
|
||||
maxOffset: -1, |
|
||||
lastCheckpoint: -1, |
|
||||
needsPersistence: false, |
|
||||
} |
|
||||
|
|
||||
// Try to store, return existing if already created by another goroutine
|
|
||||
if actual, loaded := s.ledgers.LoadOrStore(keyStr, ledger); loaded { |
|
||||
return actual.(*ReplicatedOffsetLedger) |
|
||||
|
return s.SaveOffsetMapping(keyStr, kafkaOffset, smqTimestamp, size) |
||||
} |
} |
||||
|
|
||||
return ledger |
|
||||
|
func (s *SeaweedMQStorage) LoadConsumerOffsets(key ConsumerOffsetKey) ([]OffsetEntry, error) { |
||||
|
keyStr := key.String() |
||||
|
return s.LoadOffsetMappings(keyStr) |
||||
} |
} |
||||
|
|
||||
// loadCheckpointIfNeeded loads checkpoint from filer if not already loaded (SMQ pattern)
|
|
||||
func (s *SMQIntegratedStorage) loadCheckpointIfNeeded(ledger *ReplicatedOffsetLedger) error { |
|
||||
ledger.mu.Lock() |
|
||||
defer ledger.mu.Unlock() |
|
||||
|
|
||||
// Already loaded?
|
|
||||
if ledger.lastCheckpoint >= 0 { |
|
||||
return nil |
|
||||
|
func (s *SeaweedMQStorage) GetConsumerHighWaterMark(key ConsumerOffsetKey) (int64, error) { |
||||
|
keyStr := key.String() |
||||
|
return s.GetHighWaterMark(keyStr) |
||||
} |
} |
||||
|
|
||||
// Load checkpoint from filer
|
|
||||
checkpointDir := s.getCheckpointDir() |
|
||||
checkpointFile := ledger.consumerKey.String() + ".json" |
|
||||
|
func (s *SeaweedMQStorage) SaveOffsetMapping(topicPartition string, kafkaOffset, smqTimestamp int64, size int32) error { |
||||
|
s.ledgersMu.Lock() |
||||
|
defer s.ledgersMu.Unlock() |
||||
|
|
||||
err := s.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|
||||
data, err := filer.ReadInsideFiler(client, checkpointDir, checkpointFile) |
|
||||
if err != nil { |
|
||||
return err // Will be handled below
|
|
||||
|
ledger, exists := s.ledgers[topicPartition] |
||||
|
if !exists { |
||||
|
ledger = NewLedger() |
||||
|
s.ledgers[topicPartition] = ledger |
||||
} |
} |
||||
|
|
||||
var checkpoint CheckpointData |
|
||||
if err := json.Unmarshal(data, &checkpoint); err != nil { |
|
||||
return fmt.Errorf("failed to unmarshal checkpoint: %w", err) |
|
||||
|
return ledger.AppendRecord(kafkaOffset, smqTimestamp, size) |
||||
} |
} |
||||
|
|
||||
// Restore state from checkpoint
|
|
||||
ledger.lastCheckpoint = checkpoint.MaxOffset |
|
||||
ledger.maxOffset = checkpoint.MaxOffset |
|
||||
ledger.currentOffset = checkpoint.MaxOffset |
|
||||
ledger.lastCheckpointTime = time.Unix(0, checkpoint.TimestampNs) |
|
||||
|
func (s *SeaweedMQStorage) LoadOffsetMappings(topicPartition string) ([]OffsetEntry, error) { |
||||
|
s.ledgersMu.RLock() |
||||
|
defer s.ledgersMu.RUnlock() |
||||
|
|
||||
// Load recent mappings (last N entries for fast access)
|
|
||||
for _, entry := range checkpoint.RecentMappings { |
|
||||
ledger.mappings.Store(entry.KafkaOffset, &entry) |
|
||||
|
ledger, exists := s.ledgers[topicPartition] |
||||
|
if !exists { |
||||
|
return []OffsetEntry{}, nil |
||||
} |
} |
||||
|
|
||||
return nil |
|
||||
}) |
|
||||
|
|
||||
if err != nil && err != filer_pb.ErrNotFound { |
|
||||
return fmt.Errorf("failed to load checkpoint: %w", err) |
|
||||
|
entries := ledger.GetEntries() |
||||
|
result := make([]OffsetEntry, len(entries)) |
||||
|
for i, entry := range entries { |
||||
|
result[i] = OffsetEntry{ |
||||
|
KafkaOffset: entry.KafkaOffset, |
||||
|
Timestamp: entry.Timestamp, |
||||
|
Size: entry.Size, |
||||
} |
} |
||||
|
|
||||
// Mark as loaded even if no checkpoint found
|
|
||||
if ledger.lastCheckpoint < 0 { |
|
||||
ledger.lastCheckpoint = 0 |
|
||||
} |
} |
||||
|
|
||||
return nil |
|
||||
|
return result, nil |
||||
} |
} |
||||
|
|
||||
// getCurrentMappings returns current in-memory mappings (SMQ pattern)
|
|
||||
func (s *SMQIntegratedStorage) getCurrentMappings(ledger *ReplicatedOffsetLedger) []OffsetEntry { |
|
||||
var entries []OffsetEntry |
|
||||
|
|
||||
ledger.mappings.Range(func(key, value interface{}) bool { |
|
||||
entry := value.(*OffsetEntry) |
|
||||
entries = append(entries, *entry) |
|
||||
return true |
|
||||
}) |
|
||||
|
|
||||
// Sort by Kafka offset
|
|
||||
sort.Slice(entries, func(i, j int) bool { |
|
||||
return entries[i].KafkaOffset < entries[j].KafkaOffset |
|
||||
}) |
|
||||
|
func (s *SeaweedMQStorage) GetHighWaterMark(topicPartition string) (int64, error) { |
||||
|
s.ledgersMu.RLock() |
||||
|
defer s.ledgersMu.RUnlock() |
||||
|
|
||||
return entries |
|
||||
} |
|
||||
|
|
||||
// shouldCheckpoint determines if checkpoint persistence is needed (SMQ pattern)
|
|
||||
func (s *SMQIntegratedStorage) shouldCheckpoint(ledger *ReplicatedOffsetLedger) bool { |
|
||||
ledger.mu.RLock() |
|
||||
defer ledger.mu.RUnlock() |
|
||||
|
|
||||
// Persist if:
|
|
||||
// 1. Enough time has passed
|
|
||||
// 2. Too many in-memory entries
|
|
||||
// 3. Significant offset advancement
|
|
||||
|
|
||||
timeSinceLastCheckpoint := time.Since(ledger.lastCheckpointTime) |
|
||||
|
|
||||
mappingCount := 0 |
|
||||
ledger.mappings.Range(func(key, value interface{}) bool { |
|
||||
mappingCount++ |
|
||||
return mappingCount < s.maxMemoryMappings // Stop counting if too many
|
|
||||
}) |
|
||||
|
|
||||
offsetDelta := ledger.currentOffset - ledger.lastCheckpoint |
|
||||
|
|
||||
return timeSinceLastCheckpoint > s.checkpointInterval || |
|
||||
mappingCount >= s.maxMemoryMappings || |
|
||||
offsetDelta >= 1000 // Significant advancement
|
|
||||
} |
|
||||
|
|
||||
// persistCheckpoint saves checkpoint to filer (SMQ pattern)
|
|
||||
func (s *SMQIntegratedStorage) persistCheckpoint(ledger *ReplicatedOffsetLedger) error { |
|
||||
ledger.mu.Lock() |
|
||||
defer ledger.mu.Unlock() |
|
||||
|
|
||||
// Collect recent mappings for checkpoint
|
|
||||
var recentMappings []OffsetEntry |
|
||||
ledger.mappings.Range(func(key, value interface{}) bool { |
|
||||
entry := value.(*OffsetEntry) |
|
||||
recentMappings = append(recentMappings, *entry) |
|
||||
return len(recentMappings) < 1000 // Keep last 1000 entries in checkpoint
|
|
||||
}) |
|
||||
|
|
||||
// Sort by offset (keep most recent)
|
|
||||
sort.Slice(recentMappings, func(i, j int) bool { |
|
||||
return recentMappings[i].KafkaOffset > recentMappings[j].KafkaOffset |
|
||||
}) |
|
||||
if len(recentMappings) > 1000 { |
|
||||
recentMappings = recentMappings[:1000] |
|
||||
} |
|
||||
|
|
||||
// Create checkpoint
|
|
||||
checkpoint := CheckpointData{ |
|
||||
ConsumerKey: ledger.consumerKey, |
|
||||
MaxOffset: ledger.maxOffset, |
|
||||
TimestampNs: time.Now().UnixNano(), |
|
||||
RecentMappings: recentMappings, |
|
||||
TopicPartition: fmt.Sprintf("%s:%d", ledger.consumerKey.Topic, ledger.consumerKey.Partition), // Legacy compatibility
|
|
||||
|
ledger, exists := s.ledgers[topicPartition] |
||||
|
if !exists { |
||||
|
return 0, nil |
||||
} |
} |
||||
|
|
||||
// Marshal checkpoint
|
|
||||
data, err := json.Marshal(checkpoint) |
|
||||
if err != nil { |
|
||||
return fmt.Errorf("failed to marshal checkpoint: %w", err) |
|
||||
|
return ledger.GetHighWaterMark(), nil |
||||
} |
} |
||||
|
|
||||
// Write to filer
|
|
||||
checkpointDir := s.getCheckpointDir() |
|
||||
checkpointFile := ledger.consumerKey.String() + ".json" |
|
||||
err = s.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|
||||
return filer.SaveInsideFiler(client, checkpointDir, checkpointFile, data) |
|
||||
}) |
|
||||
|
|
||||
if err != nil { |
|
||||
return fmt.Errorf("failed to save checkpoint: %w", err) |
|
||||
} |
|
||||
|
func (s *SeaweedMQStorage) Close() error { |
||||
|
s.ledgersMu.Lock() |
||||
|
defer s.ledgersMu.Unlock() |
||||
|
|
||||
// Update checkpoint state
|
|
||||
ledger.lastCheckpoint = ledger.maxOffset |
|
||||
ledger.lastCheckpointTime = time.Now() |
|
||||
ledger.lastPersistTime = time.Now() |
|
||||
ledger.needsPersistence = false |
|
||||
|
// Ledgers don't need explicit closing in this implementation
|
||||
|
s.ledgers = make(map[string]*Ledger) |
||||
|
|
||||
return nil |
return nil |
||||
} |
} |
||||
|
|
||||
// backgroundCheckpointPersistence runs periodic checkpoint saves (SMQ pattern)
|
|
||||
func (s *SMQIntegratedStorage) backgroundCheckpointPersistence() { |
|
||||
ticker := time.NewTicker(s.checkpointInterval) |
|
||||
defer ticker.Stop() |
|
||||
|
|
||||
for { |
|
||||
select { |
|
||||
case <-s.ctx.Done(): |
|
||||
return |
|
||||
case <-ticker.C: |
|
||||
// Persist all ledgers that need it
|
|
||||
s.ledgers.Range(func(key, value interface{}) bool { |
|
||||
ledger := value.(*ReplicatedOffsetLedger) |
|
||||
if ledger.needsPersistence && s.shouldCheckpoint(ledger) { |
|
||||
if err := s.persistCheckpoint(ledger); err != nil { |
|
||||
// Log error but continue
|
|
||||
fmt.Printf("Failed to persist checkpoint for %s: %v\n", ledger.consumerKey.String(), err) |
|
||||
} |
|
||||
} |
|
||||
return true |
|
||||
}) |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
// getCheckpointDir returns filer directory for checkpoints
|
|
||||
func (s *SMQIntegratedStorage) getCheckpointDir() string { |
|
||||
return "/kafka-offsets/checkpoints" |
|
||||
|
// PersistentLedger wraps a Ledger with SeaweedMQ persistence
|
||||
|
type PersistentLedger struct { |
||||
|
Ledger *Ledger |
||||
|
TopicPartition string |
||||
|
Storage LedgerStorage |
||||
} |
} |
||||
|
|
||||
// CheckpointData represents persisted consumer checkpoint state
|
|
||||
type CheckpointData struct { |
|
||||
ConsumerKey ConsumerOffsetKey `json:"consumer_key"` |
|
||||
MaxOffset int64 `json:"max_offset"` |
|
||||
TimestampNs int64 `json:"timestamp_ns"` |
|
||||
RecentMappings []OffsetEntry `json:"recent_mappings"` |
|
||||
|
|
||||
// Legacy field for backward compatibility
|
|
||||
TopicPartition string `json:"topic_partition,omitempty"` |
|
||||
|
// NewPersistentLedger creates a new persistent ledger
|
||||
|
func NewPersistentLedger(topicPartition string, storage LedgerStorage) *PersistentLedger { |
||||
|
pl := &PersistentLedger{ |
||||
|
Ledger: NewLedger(), |
||||
|
TopicPartition: topicPartition, |
||||
|
Storage: storage, |
||||
} |
} |
||||
|
|
||||
// Legacy methods for backward compatibility (will be deprecated)
|
|
||||
|
|
||||
// SaveOffsetMapping - legacy method that maps to topic-partition only (no consumer group info)
|
|
||||
func (s *SMQIntegratedStorage) SaveOffsetMapping(topicPartition string, kafkaOffset, smqTimestamp int64, size int32) error { |
|
||||
// Parse topic:partition format
|
|
||||
parts := strings.Split(topicPartition, ":") |
|
||||
if len(parts) != 2 { |
|
||||
return fmt.Errorf("invalid topic-partition format: %s", topicPartition) |
|
||||
|
// Load existing mappings
|
||||
|
if entries, err := storage.LoadOffsetMappings(topicPartition); err == nil { |
||||
|
for _, entry := range entries { |
||||
|
pl.Ledger.AppendRecord(entry.KafkaOffset, entry.Timestamp, entry.Size) |
||||
} |
} |
||||
|
|
||||
partition, err := strconv.ParseInt(parts[1], 10, 32) |
|
||||
if err != nil { |
|
||||
return fmt.Errorf("invalid partition number in %s: %w", topicPartition, err) |
|
||||
} |
} |
||||
|
|
||||
// Use legacy consumer key (no consumer group)
|
|
||||
legacyKey := ConsumerOffsetKey{ |
|
||||
Topic: parts[0], |
|
||||
Partition: int32(partition), |
|
||||
ConsumerGroup: "_legacy_", |
|
||||
ConsumerGroupInstance: "", |
|
||||
|
return pl |
||||
} |
} |
||||
|
|
||||
return s.SaveConsumerOffset(legacyKey, kafkaOffset, smqTimestamp, size) |
|
||||
|
// AddEntry adds an offset mapping and persists it
|
||||
|
func (pl *PersistentLedger) AddEntry(kafkaOffset, smqTimestamp int64, size int32) error { |
||||
|
// Add to memory ledger
|
||||
|
if err := pl.Ledger.AppendRecord(kafkaOffset, smqTimestamp, size); err != nil { |
||||
|
return err |
||||
} |
} |
||||
|
|
||||
// LoadOffsetMappings - legacy method that loads from topic-partition only
|
|
||||
func (s *SMQIntegratedStorage) LoadOffsetMappings(topicPartition string) ([]OffsetEntry, error) { |
|
||||
// Parse topic:partition format
|
|
||||
parts := strings.Split(topicPartition, ":") |
|
||||
if len(parts) != 2 { |
|
||||
return nil, fmt.Errorf("invalid topic-partition format: %s", topicPartition) |
|
||||
|
// Persist to storage
|
||||
|
return pl.Storage.SaveOffsetMapping(pl.TopicPartition, kafkaOffset, smqTimestamp, size) |
||||
} |
} |
||||
|
|
||||
partition, err := strconv.ParseInt(parts[1], 10, 32) |
|
||||
if err != nil { |
|
||||
return nil, fmt.Errorf("invalid partition number in %s: %w", topicPartition, err) |
|
||||
|
// GetEntries returns all entries from the ledger
|
||||
|
func (pl *PersistentLedger) GetEntries() []OffsetEntry { |
||||
|
return pl.Ledger.GetEntries() |
||||
} |
} |
||||
|
|
||||
// Use legacy consumer key (no consumer group)
|
|
||||
legacyKey := ConsumerOffsetKey{ |
|
||||
Topic: parts[0], |
|
||||
Partition: int32(partition), |
|
||||
ConsumerGroup: "_legacy_", |
|
||||
ConsumerGroupInstance: "", |
|
||||
|
// AssignOffsets reserves a range of consecutive Kafka offsets
|
||||
|
func (pl *PersistentLedger) AssignOffsets(count int64) int64 { |
||||
|
return pl.Ledger.AssignOffsets(count) |
||||
} |
} |
||||
|
|
||||
return s.LoadConsumerOffsets(legacyKey) |
|
||||
|
// AppendRecord adds a record to the ledger (legacy compatibility method)
|
||||
|
func (pl *PersistentLedger) AppendRecord(kafkaOffset, timestamp int64, size int32) error { |
||||
|
return pl.AddEntry(kafkaOffset, timestamp, size) |
||||
} |
} |
||||
|
|
||||
// GetHighWaterMark - legacy method that gets high water mark for topic-partition only
|
|
||||
func (s *SMQIntegratedStorage) GetHighWaterMark(topicPartition string) (int64, error) { |
|
||||
// Parse topic:partition format
|
|
||||
parts := strings.Split(topicPartition, ":") |
|
||||
if len(parts) != 2 { |
|
||||
return 0, fmt.Errorf("invalid topic-partition format: %s", topicPartition) |
|
||||
|
// GetHighWaterMark returns the next offset to be assigned
|
||||
|
func (pl *PersistentLedger) GetHighWaterMark() int64 { |
||||
|
return pl.Ledger.GetHighWaterMark() |
||||
} |
} |
||||
|
|
||||
partition, err := strconv.ParseInt(parts[1], 10, 32) |
|
||||
if err != nil { |
|
||||
return 0, fmt.Errorf("invalid partition number in %s: %w", topicPartition, err) |
|
||||
|
// GetEarliestOffset returns the earliest offset in the ledger
|
||||
|
func (pl *PersistentLedger) GetEarliestOffset() int64 { |
||||
|
return pl.Ledger.GetEarliestOffset() |
||||
} |
} |
||||
|
|
||||
// Use legacy consumer key (no consumer group)
|
|
||||
legacyKey := ConsumerOffsetKey{ |
|
||||
Topic: parts[0], |
|
||||
Partition: int32(partition), |
|
||||
ConsumerGroup: "_legacy_", |
|
||||
ConsumerGroupInstance: "", |
|
||||
|
// GetLatestOffset returns the latest offset in the ledger
|
||||
|
func (pl *PersistentLedger) GetLatestOffset() int64 { |
||||
|
return pl.Ledger.GetLatestOffset() |
||||
} |
} |
||||
|
|
||||
return s.GetConsumerHighWaterMark(legacyKey) |
|
||||
|
// GetStats returns statistics about the ledger
|
||||
|
func (pl *PersistentLedger) GetStats() (count int, earliestTime, latestTime int64) { |
||||
|
return pl.Ledger.GetStats() |
||||
} |
} |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue