Core SeaweedMQ Integration completed:
## Implementation
- Implement SeaweedMQHandler.GetStoredRecords() to retrieve actual records from SeaweedMQ
- Add SeaweedSMQRecord wrapper implementing offset.SMQRecord interface
- Wire Fetch API to use real SMQ records instead of synthetic batches
- Support both agent and broker client connections for record retrieval
## Key Features
- Proper Kafka offset mapping from SeaweedMQ records
- Respects maxRecords limit and batch size constraints
- Graceful error handling for missing topics/partitions
- High water mark boundary checking
## Tests
- Unit tests for SMQRecord interface compliance
- Edge case testing (empty topics, offset boundaries, limits)
- Integration with existing end-to-end Kafka tests
- Benchmark tests for record accessor performance
## Verification
- All integration tests pass
- E2E Sarama test shows 'Found X SMQ records' debug output
- GetStoredRecords now returns real data instead of TODO placeholder
Ready for Phase 2: CreateTopics protocol compliance
- Remove _create_objects patches since upstream s3-tests resolved NameError
- SeaweedFS strong consistency should eliminate 404 errors
- Let tests run naturally to detect real issues
Fixed GetHighWaterMark() to use correct partition managers
Fixed GetPartitionOffsetInfo() with proper struct fields
Fixed GetOffsetMetrics() with correct types and system
- Fix gateway tests: Replace AgentAddress with Masters in Options struct
- Fix consumer test: Correct GenerateMemberID test to expect deterministic behavior
- Fix schema tests: Remove incorrect error assertions for mock broker scenarios
- All core offset management and protocol tests now pass
- Gateway, consumer, protocol, and offset packages compile and test successfully
- Remove old SMQIntegratedStorage implementation from persistence.go
- Update all integration modules to use SMQOffsetStorage instead
- Add delegation methods to PersistentLedger for backward compatibility
- Fix method signatures and compilation errors
- Maintain support for legacy offset operations through SeaweedMQStorage
- Add end-to-end flow tests for Kafka OffsetCommit to SMQ storage
- Test multiple consumer groups with independent offset tracking
- Validate SMQ file path and format compatibility
- Test error handling and edge cases (negative, zero, max offsets)
- Verify offset encoding/decoding matches SMQ broker format
- Ensure consumer group isolation and proper key generation
- Update Kafka protocol handler to use SMQOffsetStorage for consumer offsets
- Modify OffsetCommit to save consumer offsets using SMQ's filer format
- Modify OffsetFetch to read consumer offsets from SMQ's filer location
- Add proper ConsumerOffsetKey creation with consumer group and instance ID
- Maintain backward compatibility with in-memory storage fallback
- Include comprehensive test coverage for offset handler integration
- Add SMQOffsetStorage that uses same filer locations and format as SMQ brokers
- Store offsets in <topic-dir>/<partition-dir>/<consumerGroup>.offset files
- Use 8-byte big-endian format matching SMQ broker implementation
- Include comprehensive test coverage for core functionality
- Maintain backward compatibility through legacy method support