|
|
@ -17,9 +17,12 @@ type PartitionOffsetManager struct { |
|
|
|
nextOffset int64 |
|
|
|
|
|
|
|
// Checkpointing for recovery
|
|
|
|
lastCheckpoint int64 |
|
|
|
checkpointInterval int64 |
|
|
|
storage OffsetStorage |
|
|
|
lastCheckpoint int64 |
|
|
|
lastCheckpointedOffset int64 |
|
|
|
storage OffsetStorage |
|
|
|
|
|
|
|
// Background checkpointing
|
|
|
|
stopCheckpoint chan struct{} |
|
|
|
} |
|
|
|
|
|
|
|
// OffsetStorage interface for persisting offset state
|
|
|
@ -38,11 +41,11 @@ type OffsetStorage interface { |
|
|
|
// NewPartitionOffsetManager creates a new offset manager for a partition
|
|
|
|
func NewPartitionOffsetManager(namespace, topicName string, partition *schema_pb.Partition, storage OffsetStorage) (*PartitionOffsetManager, error) { |
|
|
|
manager := &PartitionOffsetManager{ |
|
|
|
namespace: namespace, |
|
|
|
topicName: topicName, |
|
|
|
partition: partition, |
|
|
|
checkpointInterval: 1, // Checkpoint every offset for immediate persistence
|
|
|
|
storage: storage, |
|
|
|
namespace: namespace, |
|
|
|
topicName: topicName, |
|
|
|
partition: partition, |
|
|
|
storage: storage, |
|
|
|
stopCheckpoint: make(chan struct{}), |
|
|
|
} |
|
|
|
|
|
|
|
// Recover offset state
|
|
|
@ -50,55 +53,46 @@ func NewPartitionOffsetManager(namespace, topicName string, partition *schema_pb |
|
|
|
return nil, fmt.Errorf("failed to recover offset state: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
// Start background checkpoint goroutine
|
|
|
|
go manager.runPeriodicCheckpoint() |
|
|
|
|
|
|
|
return manager, nil |
|
|
|
} |
|
|
|
|
|
|
|
// Close stops the background checkpoint goroutine and performs a final checkpoint
|
|
|
|
func (m *PartitionOffsetManager) Close() error { |
|
|
|
close(m.stopCheckpoint) |
|
|
|
|
|
|
|
// Perform final checkpoint
|
|
|
|
m.mu.RLock() |
|
|
|
currentOffset := m.nextOffset - 1 // Last assigned offset
|
|
|
|
lastCheckpointed := m.lastCheckpointedOffset |
|
|
|
m.mu.RUnlock() |
|
|
|
|
|
|
|
if currentOffset >= 0 && currentOffset > lastCheckpointed { |
|
|
|
return m.storage.SaveCheckpoint(m.namespace, m.topicName, m.partition, currentOffset) |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// AssignOffset assigns the next sequential offset
|
|
|
|
func (m *PartitionOffsetManager) AssignOffset() int64 { |
|
|
|
var shouldCheckpoint bool |
|
|
|
var checkpointOffset int64 |
|
|
|
|
|
|
|
m.mu.Lock() |
|
|
|
offset := m.nextOffset |
|
|
|
m.nextOffset++ |
|
|
|
|
|
|
|
// Check if we should checkpoint (but don't do it inside the lock)
|
|
|
|
if offset-m.lastCheckpoint >= m.checkpointInterval { |
|
|
|
shouldCheckpoint = true |
|
|
|
checkpointOffset = offset |
|
|
|
} |
|
|
|
m.mu.Unlock() |
|
|
|
|
|
|
|
// Checkpoint outside the lock to avoid deadlock
|
|
|
|
if shouldCheckpoint { |
|
|
|
m.checkpoint(checkpointOffset) |
|
|
|
} |
|
|
|
|
|
|
|
return offset |
|
|
|
} |
|
|
|
|
|
|
|
// AssignOffsets assigns a batch of sequential offsets
|
|
|
|
func (m *PartitionOffsetManager) AssignOffsets(count int64) (baseOffset int64, lastOffset int64) { |
|
|
|
var shouldCheckpoint bool |
|
|
|
var checkpointOffset int64 |
|
|
|
|
|
|
|
m.mu.Lock() |
|
|
|
baseOffset = m.nextOffset |
|
|
|
lastOffset = m.nextOffset + count - 1 |
|
|
|
m.nextOffset += count |
|
|
|
|
|
|
|
// Check if we should checkpoint (but don't do it inside the lock)
|
|
|
|
if lastOffset-m.lastCheckpoint >= m.checkpointInterval { |
|
|
|
shouldCheckpoint = true |
|
|
|
checkpointOffset = lastOffset |
|
|
|
} |
|
|
|
m.mu.Unlock() |
|
|
|
|
|
|
|
// Checkpoint outside the lock to avoid deadlock
|
|
|
|
if shouldCheckpoint { |
|
|
|
m.checkpoint(checkpointOffset) |
|
|
|
} |
|
|
|
|
|
|
|
return baseOffset, lastOffset |
|
|
|
} |
|
|
|
|
|
|
@ -134,35 +128,68 @@ func (m *PartitionOffsetManager) recover() error { |
|
|
|
if highestOffset > checkpointOffset { |
|
|
|
m.nextOffset = highestOffset + 1 |
|
|
|
m.lastCheckpoint = highestOffset |
|
|
|
m.lastCheckpointedOffset = highestOffset |
|
|
|
} else { |
|
|
|
m.nextOffset = checkpointOffset + 1 |
|
|
|
m.lastCheckpoint = checkpointOffset |
|
|
|
m.lastCheckpointedOffset = checkpointOffset |
|
|
|
} |
|
|
|
} else if checkpointOffset >= 0 { |
|
|
|
m.nextOffset = checkpointOffset + 1 |
|
|
|
m.lastCheckpoint = checkpointOffset |
|
|
|
m.lastCheckpointedOffset = checkpointOffset |
|
|
|
} else if highestOffset >= 0 { |
|
|
|
m.nextOffset = highestOffset + 1 |
|
|
|
m.lastCheckpoint = highestOffset |
|
|
|
m.lastCheckpointedOffset = highestOffset |
|
|
|
} else { |
|
|
|
// No data exists, start from 0
|
|
|
|
m.nextOffset = 0 |
|
|
|
m.lastCheckpoint = -1 |
|
|
|
m.lastCheckpointedOffset = -1 |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// checkpoint saves the current offset state
|
|
|
|
func (m *PartitionOffsetManager) checkpoint(offset int64) { |
|
|
|
if err := m.storage.SaveCheckpoint(m.namespace, m.topicName, m.partition, offset); err != nil { |
|
|
|
// runPeriodicCheckpoint runs in the background and checkpoints every 2 seconds if the offset changed
|
|
|
|
func (m *PartitionOffsetManager) runPeriodicCheckpoint() { |
|
|
|
ticker := time.NewTicker(2 * time.Second) |
|
|
|
defer ticker.Stop() |
|
|
|
|
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-ticker.C: |
|
|
|
m.performCheckpointIfChanged() |
|
|
|
case <-m.stopCheckpoint: |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// performCheckpointIfChanged saves checkpoint only if offset has changed since last checkpoint
|
|
|
|
func (m *PartitionOffsetManager) performCheckpointIfChanged() { |
|
|
|
m.mu.RLock() |
|
|
|
currentOffset := m.nextOffset - 1 // Last assigned offset
|
|
|
|
lastCheckpointed := m.lastCheckpointedOffset |
|
|
|
m.mu.RUnlock() |
|
|
|
|
|
|
|
// Skip if no messages have been assigned, or no change since last checkpoint
|
|
|
|
if currentOffset < 0 || currentOffset == lastCheckpointed { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// Perform checkpoint
|
|
|
|
if err := m.storage.SaveCheckpoint(m.namespace, m.topicName, m.partition, currentOffset); err != nil { |
|
|
|
// Log error but don't fail - checkpointing is for optimization
|
|
|
|
fmt.Printf("Failed to checkpoint offset %d: %v\n", offset, err) |
|
|
|
fmt.Printf("Failed to checkpoint offset %d for %s/%s: %v\n", currentOffset, m.namespace, m.topicName, err) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// Update last checkpointed offset
|
|
|
|
m.mu.Lock() |
|
|
|
m.lastCheckpoint = offset |
|
|
|
m.lastCheckpointedOffset = currentOffset |
|
|
|
m.lastCheckpoint = currentOffset |
|
|
|
m.mu.Unlock() |
|
|
|
} |
|
|
|
|
|
|
@ -245,6 +272,21 @@ func (r *PartitionOffsetRegistry) GetHighWaterMark(namespace, topicName string, |
|
|
|
return manager.GetHighWaterMark(), nil |
|
|
|
} |
|
|
|
|
|
|
|
// Close stops all partition managers and performs final checkpoints
|
|
|
|
func (r *PartitionOffsetRegistry) Close() error { |
|
|
|
r.mu.Lock() |
|
|
|
defer r.mu.Unlock() |
|
|
|
|
|
|
|
var firstErr error |
|
|
|
for _, manager := range r.managers { |
|
|
|
if err := manager.Close(); err != nil && firstErr == nil { |
|
|
|
firstErr = err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return firstErr |
|
|
|
} |
|
|
|
|
|
|
|
// TopicPartitionKey generates a unique key for a topic-partition combination
|
|
|
|
// This is the canonical key format used across the offset management system
|
|
|
|
func TopicPartitionKey(namespace, topicName string, partition *schema_pb.Partition) string { |
|
|
|