4.3 KiB
Kafka Gateway Implementation Phases
Phase 1: Core SeaweedMQ Integration (COMPLETED ✅)
Goal: Enable real message retrieval from SeaweedMQ storage
Tasks:
- Implement
integration.SeaweedMQHandler.GetStoredRecords()to return actual records - Add proper SMQ record conversion from SeaweedMQ format to Kafka format
- Wire Fetch API to use real SMQ records instead of synthetic batches
- Add integration tests for end-to-end message storage and retrieval
Files modified:
weed/mq/kafka/integration/seaweedmq_handler.goweed/mq/kafka/protocol/fetch.go(verification)- Added test file:
weed/mq/kafka/integration/record_retrieval_test.go
Verification: E2E tests show "Found X SMQ records" - real data retrieval working
Phase 2: CreateTopics Protocol Compliance (COMPLETED ✅)
Goal: Fix CreateTopics API parsing and partition handling
Tasks:
- Implement
handleCreateTopicsV0V1request parsing - Add support for partition count and basic topic configurations
- Wire CreateTopics to actual SeaweedMQ topic creation
- Add CreateTopics integration tests
Files modified:
weed/mq/kafka/protocol/handler.go- Added test file:
weed/mq/kafka/protocol/create_topics_test.go
Verification: All v0-v5 CreateTopics tests pass; proper partition handling
Phase 3: ApiVersions Matrix Accuracy (COMPLETED ✅)
Goal: Ensure advertised API versions match actual implementation
Tasks:
- Audit current
handleApiVersionsresponse against implemented features - Correct max versions for APIs with higher implementations than advertised
- Update version validation in request handlers to match advertisements
- Document supported vs unsupported features per API version
Files modified:
weed/mq/kafka/protocol/handler.go(handleApiVersions&validateAPIVersion)- Added file:
weed/mq/kafka/API_VERSION_MATRIX.md - Added test file:
weed/mq/kafka/protocol/api_versions_test.go
Verification: Critical fixes for OffsetFetch v0-v5 and CreateTopics v0-v5 accuracy
Phase 4: Consumer Group Protocol Metadata (PRIORITY MEDIUM)
Goal: Proper JoinGroup protocol metadata parsing
Tasks:
- Implement consumer protocol metadata parsing in JoinGroup
- Extract subscription topics and user data from metadata
- Populate ClientHost from connection information
- Support multiple assignment strategies properly
Files to modify:
weed/mq/kafka/protocol/joingroup.goweed/mq/kafka/protocol/consumer_group_metadata.go(new file)- Add test file:
weed/mq/kafka/protocol/consumer_group_metadata_test.go
Phase 5: Multi-Batch Fetch Support (PRIORITY MEDIUM)
Goal: Support multiple record batch concatenation in Fetch responses
Tasks:
- Implement proper record batch concatenation
- Handle batch size limits and chunking
- Add support for compressed record batches
- Performance optimization for large batch responses
Files to modify:
weed/mq/kafka/protocol/fetch.go- Add test file:
weed/mq/kafka/protocol/fetch_multi_batch_test.go
Phase 6: Flexible Versions Support (PRIORITY LOW)
Goal: Basic support for flexible versions and tagged fields
Tasks:
- Add flexible version detection in request headers
- Implement tagged field parsing/skipping
- Update response encoders for flexible versions
- Add flexible version tests
Files to modify:
weed/mq/kafka/protocol/handler.goweed/mq/kafka/protocol/flexible_versions.go(new file)- Add test file:
weed/mq/kafka/protocol/flexible_versions_test.go
Phase 7: Error Handling and Edge Cases (PRIORITY LOW)
Goal: Comprehensive error handling and Kafka spec compliance
Tasks:
- Audit all error codes against Kafka specification
- Add missing error codes for network/timeout scenarios
- Implement proper connection timeout handling
- Add comprehensive error handling tests
Files to modify:
weed/mq/kafka/protocol/errors.go- All protocol handler files
- Add test file:
weed/mq/kafka/protocol/error_handling_test.go
Current Status: Phase 1, 2 & 3 completed, ready for Phase 4
Implementation Notes:
- Each phase should include comprehensive tests
- Commit after each phase completion
- Backward compatibility must be maintained
- Integration tests should verify end-to-end functionality