Browse Source

Add Kafka gateway implementation phases roadmap

- Document 7 phases of implementation from high to low priority
- Phase 1: Core SeaweedMQ integration for real message retrieval
- Phase 2: CreateTopics protocol compliance
- Phase 3: ApiVersions matrix accuracy
- Phase 4: Consumer group protocol metadata parsing
- Phase 5: Multi-batch Fetch support
- Phase 6: Flexible versions support
- Phase 7: Error handling and edge cases

Ready to start Phase 1 implementation.
pull/7231/head
chrislu 2 months ago
parent
commit
dec63c22e4
  1. 106
      weed/mq/kafka/IMPLEMENTATION_PHASES.md

106
weed/mq/kafka/IMPLEMENTATION_PHASES.md

@ -0,0 +1,106 @@
# Kafka Gateway Implementation Phases
## Phase 1: Core SeaweedMQ Integration (PRIORITY HIGH)
**Goal**: Enable real message retrieval from SeaweedMQ storage
### Tasks:
- [ ] Implement `integration.SeaweedMQHandler.GetStoredRecords()` to return actual records
- [ ] Add proper SMQ record conversion from SeaweedMQ format to Kafka format
- [ ] Wire Fetch API to use real SMQ records instead of synthetic batches
- [ ] Add integration tests for end-to-end message storage and retrieval
**Files to modify**:
- `weed/mq/kafka/integration/seaweedmq_handler.go`
- `weed/mq/kafka/protocol/fetch.go` (verification)
- Add test file: `weed/mq/kafka/integration/record_retrieval_test.go`
## Phase 2: CreateTopics Protocol Compliance (PRIORITY HIGH)
**Goal**: Fix CreateTopics API parsing and partition handling
### Tasks:
- [ ] Implement `handleCreateTopicsV0V1` request parsing
- [ ] Add support for partition count and basic topic configurations
- [ ] Wire CreateTopics to actual SeaweedMQ topic creation
- [ ] Add CreateTopics integration tests
**Files to modify**:
- `weed/mq/kafka/protocol/handler.go`
- `weed/mq/kafka/protocol/create_topics.go` (new file)
- Add test file: `weed/mq/kafka/protocol/create_topics_test.go`
## Phase 3: ApiVersions Matrix Accuracy (PRIORITY MEDIUM)
**Goal**: Ensure advertised API versions match actual implementation
### Tasks:
- [ ] Audit current `handleApiVersions` response against implemented features
- [ ] Lower max versions for APIs with incomplete implementations
- [ ] Add version validation in request handlers
- [ ] Document supported vs unsupported features per API version
**Files to modify**:
- `weed/mq/kafka/protocol/handler.go` (`handleApiVersions`)
- Add file: `weed/mq/kafka/API_VERSION_MATRIX.md`
- Add test file: `weed/mq/kafka/protocol/api_versions_test.go`
## Phase 4: Consumer Group Protocol Metadata (PRIORITY MEDIUM)
**Goal**: Proper JoinGroup protocol metadata parsing
### Tasks:
- [ ] Implement consumer protocol metadata parsing in JoinGroup
- [ ] Extract subscription topics and user data from metadata
- [ ] Populate ClientHost from connection information
- [ ] Support multiple assignment strategies properly
**Files to modify**:
- `weed/mq/kafka/protocol/joingroup.go`
- `weed/mq/kafka/protocol/consumer_group_metadata.go` (new file)
- Add test file: `weed/mq/kafka/protocol/consumer_group_metadata_test.go`
## Phase 5: Multi-Batch Fetch Support (PRIORITY MEDIUM)
**Goal**: Support multiple record batch concatenation in Fetch responses
### Tasks:
- [ ] Implement proper record batch concatenation
- [ ] Handle batch size limits and chunking
- [ ] Add support for compressed record batches
- [ ] Performance optimization for large batch responses
**Files to modify**:
- `weed/mq/kafka/protocol/fetch.go`
- Add test file: `weed/mq/kafka/protocol/fetch_multi_batch_test.go`
## Phase 6: Flexible Versions Support (PRIORITY LOW)
**Goal**: Basic support for flexible versions and tagged fields
### Tasks:
- [ ] Add flexible version detection in request headers
- [ ] Implement tagged field parsing/skipping
- [ ] Update response encoders for flexible versions
- [ ] Add flexible version tests
**Files to modify**:
- `weed/mq/kafka/protocol/handler.go`
- `weed/mq/kafka/protocol/flexible_versions.go` (new file)
- Add test file: `weed/mq/kafka/protocol/flexible_versions_test.go`
## Phase 7: Error Handling and Edge Cases (PRIORITY LOW)
**Goal**: Comprehensive error handling and Kafka spec compliance
### Tasks:
- [ ] Audit all error codes against Kafka specification
- [ ] Add missing error codes for network/timeout scenarios
- [ ] Implement proper connection timeout handling
- [ ] Add comprehensive error handling tests
**Files to modify**:
- `weed/mq/kafka/protocol/errors.go`
- All protocol handler files
- Add test file: `weed/mq/kafka/protocol/error_handling_test.go`
## Current Status: Ready to start Phase 1
### Implementation Notes:
- Each phase should include comprehensive tests
- Commit after each phase completion
- Backward compatibility must be maintained
- Integration tests should verify end-to-end functionality
Loading…
Cancel
Save