diff --git a/weed/mq/offset/benchmark_test.go b/weed/mq/offset/benchmark_test.go index d6f33206f..d82729142 100644 --- a/weed/mq/offset/benchmark_test.go +++ b/weed/mq/offset/benchmark_test.go @@ -445,10 +445,10 @@ func BenchmarkMemoryUsage(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { manager.AssignOffset() - if i%1000 == 0 { - // Periodic checkpoint to simulate real usage - manager.checkpoint(int64(i)) - } + // Note: Checkpointing now happens automatically in background every 2 seconds } + + // Clean up background goroutine + manager.Close() }) } diff --git a/weed/mq/offset/end_to_end_test.go b/weed/mq/offset/end_to_end_test.go index a4db891e1..f2b57b843 100644 --- a/weed/mq/offset/end_to_end_test.go +++ b/weed/mq/offset/end_to_end_test.go @@ -241,7 +241,8 @@ func TestOffsetPersistenceAcrossRestarts(t *testing.T) { lastOffset = response.LastOffset - // Close connections + // Close connections - Close integration first to trigger final checkpoint + integration.Close() storage.Close() db.Close() } diff --git a/weed/mq/offset/integration.go b/weed/mq/offset/integration.go index 4b9ee6183..53bc113e7 100644 --- a/weed/mq/offset/integration.go +++ b/weed/mq/offset/integration.go @@ -12,6 +12,7 @@ import ( // SMQOffsetIntegration provides integration between offset management and SMQ broker type SMQOffsetIntegration struct { mu sync.RWMutex + registry *PartitionOffsetRegistry offsetAssigner *OffsetAssigner offsetSubscriber *OffsetSubscriber offsetSeeker *OffsetSeeker @@ -23,12 +24,18 @@ func NewSMQOffsetIntegration(storage OffsetStorage) *SMQOffsetIntegration { assigner := &OffsetAssigner{registry: registry} return &SMQOffsetIntegration{ + registry: registry, offsetAssigner: assigner, offsetSubscriber: NewOffsetSubscriber(registry), offsetSeeker: NewOffsetSeeker(registry), } } +// Close stops all background checkpoint goroutines and performs final checkpoints +func (integration *SMQOffsetIntegration) Close() error { + return integration.registry.Close() +} + // PublishRecord publishes a record and assigns it an offset func (integration *SMQOffsetIntegration) PublishRecord( namespace, topicName string, diff --git a/weed/mq/offset/manager.go b/weed/mq/offset/manager.go index 01976a8bf..53388d82f 100644 --- a/weed/mq/offset/manager.go +++ b/weed/mq/offset/manager.go @@ -17,9 +17,12 @@ type PartitionOffsetManager struct { nextOffset int64 // Checkpointing for recovery - lastCheckpoint int64 - checkpointInterval int64 - storage OffsetStorage + lastCheckpoint int64 + lastCheckpointedOffset int64 + storage OffsetStorage + + // Background checkpointing + stopCheckpoint chan struct{} } // OffsetStorage interface for persisting offset state @@ -38,11 +41,11 @@ type OffsetStorage interface { // NewPartitionOffsetManager creates a new offset manager for a partition func NewPartitionOffsetManager(namespace, topicName string, partition *schema_pb.Partition, storage OffsetStorage) (*PartitionOffsetManager, error) { manager := &PartitionOffsetManager{ - namespace: namespace, - topicName: topicName, - partition: partition, - checkpointInterval: 1, // Checkpoint every offset for immediate persistence - storage: storage, + namespace: namespace, + topicName: topicName, + partition: partition, + storage: storage, + stopCheckpoint: make(chan struct{}), } // Recover offset state @@ -50,55 +53,46 @@ func NewPartitionOffsetManager(namespace, topicName string, partition *schema_pb return nil, fmt.Errorf("failed to recover offset state: %w", err) } + // Start background checkpoint goroutine + go manager.runPeriodicCheckpoint() + return manager, nil } +// Close stops the background checkpoint goroutine and performs a final checkpoint +func (m *PartitionOffsetManager) Close() error { + close(m.stopCheckpoint) + + // Perform final checkpoint + m.mu.RLock() + currentOffset := m.nextOffset - 1 // Last assigned offset + lastCheckpointed := m.lastCheckpointedOffset + m.mu.RUnlock() + + if currentOffset >= 0 && currentOffset > lastCheckpointed { + return m.storage.SaveCheckpoint(m.namespace, m.topicName, m.partition, currentOffset) + } + return nil +} + // AssignOffset assigns the next sequential offset func (m *PartitionOffsetManager) AssignOffset() int64 { - var shouldCheckpoint bool - var checkpointOffset int64 - m.mu.Lock() offset := m.nextOffset m.nextOffset++ - - // Check if we should checkpoint (but don't do it inside the lock) - if offset-m.lastCheckpoint >= m.checkpointInterval { - shouldCheckpoint = true - checkpointOffset = offset - } m.mu.Unlock() - // Checkpoint outside the lock to avoid deadlock - if shouldCheckpoint { - m.checkpoint(checkpointOffset) - } - return offset } // AssignOffsets assigns a batch of sequential offsets func (m *PartitionOffsetManager) AssignOffsets(count int64) (baseOffset int64, lastOffset int64) { - var shouldCheckpoint bool - var checkpointOffset int64 - m.mu.Lock() baseOffset = m.nextOffset lastOffset = m.nextOffset + count - 1 m.nextOffset += count - - // Check if we should checkpoint (but don't do it inside the lock) - if lastOffset-m.lastCheckpoint >= m.checkpointInterval { - shouldCheckpoint = true - checkpointOffset = lastOffset - } m.mu.Unlock() - // Checkpoint outside the lock to avoid deadlock - if shouldCheckpoint { - m.checkpoint(checkpointOffset) - } - return baseOffset, lastOffset } @@ -134,35 +128,68 @@ func (m *PartitionOffsetManager) recover() error { if highestOffset > checkpointOffset { m.nextOffset = highestOffset + 1 m.lastCheckpoint = highestOffset + m.lastCheckpointedOffset = highestOffset } else { m.nextOffset = checkpointOffset + 1 m.lastCheckpoint = checkpointOffset + m.lastCheckpointedOffset = checkpointOffset } } else if checkpointOffset >= 0 { m.nextOffset = checkpointOffset + 1 m.lastCheckpoint = checkpointOffset + m.lastCheckpointedOffset = checkpointOffset } else if highestOffset >= 0 { m.nextOffset = highestOffset + 1 m.lastCheckpoint = highestOffset + m.lastCheckpointedOffset = highestOffset } else { // No data exists, start from 0 m.nextOffset = 0 m.lastCheckpoint = -1 + m.lastCheckpointedOffset = -1 } return nil } -// checkpoint saves the current offset state -func (m *PartitionOffsetManager) checkpoint(offset int64) { - if err := m.storage.SaveCheckpoint(m.namespace, m.topicName, m.partition, offset); err != nil { +// runPeriodicCheckpoint runs in the background and checkpoints every 2 seconds if the offset changed +func (m *PartitionOffsetManager) runPeriodicCheckpoint() { + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + m.performCheckpointIfChanged() + case <-m.stopCheckpoint: + return + } + } +} + +// performCheckpointIfChanged saves checkpoint only if offset has changed since last checkpoint +func (m *PartitionOffsetManager) performCheckpointIfChanged() { + m.mu.RLock() + currentOffset := m.nextOffset - 1 // Last assigned offset + lastCheckpointed := m.lastCheckpointedOffset + m.mu.RUnlock() + + // Skip if no messages have been assigned, or no change since last checkpoint + if currentOffset < 0 || currentOffset == lastCheckpointed { + return + } + + // Perform checkpoint + if err := m.storage.SaveCheckpoint(m.namespace, m.topicName, m.partition, currentOffset); err != nil { // Log error but don't fail - checkpointing is for optimization - fmt.Printf("Failed to checkpoint offset %d: %v\n", offset, err) + fmt.Printf("Failed to checkpoint offset %d for %s/%s: %v\n", currentOffset, m.namespace, m.topicName, err) return } + // Update last checkpointed offset m.mu.Lock() - m.lastCheckpoint = offset + m.lastCheckpointedOffset = currentOffset + m.lastCheckpoint = currentOffset m.mu.Unlock() } @@ -245,6 +272,21 @@ func (r *PartitionOffsetRegistry) GetHighWaterMark(namespace, topicName string, return manager.GetHighWaterMark(), nil } +// Close stops all partition managers and performs final checkpoints +func (r *PartitionOffsetRegistry) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + var firstErr error + for _, manager := range r.managers { + if err := manager.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + + return firstErr +} + // TopicPartitionKey generates a unique key for a topic-partition combination // This is the canonical key format used across the offset management system func TopicPartitionKey(namespace, topicName string, partition *schema_pb.Partition) string {