Fixed GetHighWaterMark() to use correct partition managers
Fixed GetPartitionOffsetInfo() with proper struct fields
Fixed GetOffsetMetrics() with correct types and system
- Design comprehensive SQL schema for offset storage with future _index column support
- Implement SQLOffsetStorage with full database operations:
- Partition offset checkpoints with UPSERT functionality
- Detailed offset mappings with range queries and statistics
- Database migration system with version tracking
- Performance optimizations with proper indexing
- Add database migration manager with automatic schema updates
- Create comprehensive test suite with 11 test cases covering:
- Schema initialization and table creation
- Checkpoint save/load operations with error handling
- Offset mapping storage and retrieval with sorting
- Range queries and highest offset detection
- Partition statistics with NULL value handling
- Cleanup operations for old data retention
- Concurrent access safety and database vacuum
- Extend BrokerOffsetManager with SQL storage integration:
- NewBrokerOffsetManagerWithSQL for database-backed storage
- Configurable storage backends (in-memory fallback, SQL preferred)
- Database connection management and error handling
- Add SQLite driver dependency and configure for optimal performance
- Support for future database types (PostgreSQL, MySQL) with abstraction layer
Key TODOs and Assumptions:
- TODO: Add _index as computed column when database supports it
- TODO: Implement database backup and restore functionality
- TODO: Add configuration for database path and connection parameters
- ASSUMPTION: Using SQLite for now, extensible to other databases
- ASSUMPTION: WAL mode and performance pragmas for production use
- ASSUMPTION: Migration system handles schema evolution gracefully
All 11 SQL storage tests pass, providing robust persistent offset management.
- Add SW_COLUMN_NAME_OFFSET field to parquet storage for offset persistence
- Create BrokerOffsetManager for coordinating offset assignment across partitions
- Integrate offset manager into MessageQueueBroker initialization
- Add PublishWithOffset method to LocalPartition for offset-aware publishing
- Update broker publish flow to assign offsets during message processing
- Create offset-aware subscription handlers for consume operations
- Add comprehensive broker offset integration tests
- Support both single and batch offset assignment
- Implement offset-based subscription creation and management
- Add partition offset information and metrics APIs
Key TODOs and Assumptions:
- TODO: Replace in-memory storage with SQL-based persistence in Phase 5
- TODO: Integrate LogBuffer to natively handle offset assignment
- TODO: Add proper partition field access in subscription requests
- ASSUMPTION: LogEntry.Offset field populated by broker during publishing
- ASSUMPTION: Offset information preserved through parquet storage integration
- ASSUMPTION: BrokerOffsetManager handles all partition offset coordination
Tests show basic functionality working, some integration issues expected
until Phase 5 SQL storage backend is implemented.