diff --git a/weed/mq/kafka/IMPLEMENTATION_PHASES.md b/weed/mq/kafka/IMPLEMENTATION_PHASES.md new file mode 100644 index 000000000..611d5326d --- /dev/null +++ b/weed/mq/kafka/IMPLEMENTATION_PHASES.md @@ -0,0 +1,106 @@ +# Kafka Gateway Implementation Phases + +## Phase 1: Core SeaweedMQ Integration (PRIORITY HIGH) +**Goal**: Enable real message retrieval from SeaweedMQ storage + +### Tasks: +- [ ] Implement `integration.SeaweedMQHandler.GetStoredRecords()` to return actual records +- [ ] Add proper SMQ record conversion from SeaweedMQ format to Kafka format +- [ ] Wire Fetch API to use real SMQ records instead of synthetic batches +- [ ] Add integration tests for end-to-end message storage and retrieval + +**Files to modify**: +- `weed/mq/kafka/integration/seaweedmq_handler.go` +- `weed/mq/kafka/protocol/fetch.go` (verification) +- Add test file: `weed/mq/kafka/integration/record_retrieval_test.go` + +## Phase 2: CreateTopics Protocol Compliance (PRIORITY HIGH) +**Goal**: Fix CreateTopics API parsing and partition handling + +### Tasks: +- [ ] Implement `handleCreateTopicsV0V1` request parsing +- [ ] Add support for partition count and basic topic configurations +- [ ] Wire CreateTopics to actual SeaweedMQ topic creation +- [ ] Add CreateTopics integration tests + +**Files to modify**: +- `weed/mq/kafka/protocol/handler.go` +- `weed/mq/kafka/protocol/create_topics.go` (new file) +- Add test file: `weed/mq/kafka/protocol/create_topics_test.go` + +## Phase 3: ApiVersions Matrix Accuracy (PRIORITY MEDIUM) +**Goal**: Ensure advertised API versions match actual implementation + +### Tasks: +- [ ] Audit current `handleApiVersions` response against implemented features +- [ ] Lower max versions for APIs with incomplete implementations +- [ ] Add version validation in request handlers +- [ ] Document supported vs unsupported features per API version + +**Files to modify**: +- `weed/mq/kafka/protocol/handler.go` (`handleApiVersions`) +- Add file: `weed/mq/kafka/API_VERSION_MATRIX.md` +- Add test file: `weed/mq/kafka/protocol/api_versions_test.go` + +## Phase 4: Consumer Group Protocol Metadata (PRIORITY MEDIUM) +**Goal**: Proper JoinGroup protocol metadata parsing + +### Tasks: +- [ ] Implement consumer protocol metadata parsing in JoinGroup +- [ ] Extract subscription topics and user data from metadata +- [ ] Populate ClientHost from connection information +- [ ] Support multiple assignment strategies properly + +**Files to modify**: +- `weed/mq/kafka/protocol/joingroup.go` +- `weed/mq/kafka/protocol/consumer_group_metadata.go` (new file) +- Add test file: `weed/mq/kafka/protocol/consumer_group_metadata_test.go` + +## Phase 5: Multi-Batch Fetch Support (PRIORITY MEDIUM) +**Goal**: Support multiple record batch concatenation in Fetch responses + +### Tasks: +- [ ] Implement proper record batch concatenation +- [ ] Handle batch size limits and chunking +- [ ] Add support for compressed record batches +- [ ] Performance optimization for large batch responses + +**Files to modify**: +- `weed/mq/kafka/protocol/fetch.go` +- Add test file: `weed/mq/kafka/protocol/fetch_multi_batch_test.go` + +## Phase 6: Flexible Versions Support (PRIORITY LOW) +**Goal**: Basic support for flexible versions and tagged fields + +### Tasks: +- [ ] Add flexible version detection in request headers +- [ ] Implement tagged field parsing/skipping +- [ ] Update response encoders for flexible versions +- [ ] Add flexible version tests + +**Files to modify**: +- `weed/mq/kafka/protocol/handler.go` +- `weed/mq/kafka/protocol/flexible_versions.go` (new file) +- Add test file: `weed/mq/kafka/protocol/flexible_versions_test.go` + +## Phase 7: Error Handling and Edge Cases (PRIORITY LOW) +**Goal**: Comprehensive error handling and Kafka spec compliance + +### Tasks: +- [ ] Audit all error codes against Kafka specification +- [ ] Add missing error codes for network/timeout scenarios +- [ ] Implement proper connection timeout handling +- [ ] Add comprehensive error handling tests + +**Files to modify**: +- `weed/mq/kafka/protocol/errors.go` +- All protocol handler files +- Add test file: `weed/mq/kafka/protocol/error_handling_test.go` + +## Current Status: Ready to start Phase 1 + +### Implementation Notes: +- Each phase should include comprehensive tests +- Commit after each phase completion +- Backward compatibility must be maintained +- Integration tests should verify end-to-end functionality