Browse Source

mq(kafka): Add comprehensive protocol compatibility review and TODOs

- Create PROTOCOL_COMPATIBILITY_REVIEW.md documenting all compatibility issues
- Add critical TODOs to most problematic protocol implementations:
  * Produce: Record batch parsing is simplified, missing compression/CRC
  * Offset management: Hardcoded 'test-topic' parsing breaks real clients
  * JoinGroup: Consumer subscription extraction hardcoded, incomplete parsing
  * Fetch: Fake record batch construction with dummy data
  * Handler: Missing API version validation across all endpoints
- Identify high/medium/low priority fixes needed for real client compatibility
- Document specific areas needing work:
  * Record format parsing (v0/v1/v2, compression, CRC validation)
  * Request parsing (topics arrays, partition arrays, protocol metadata)
  * Consumer group protocol metadata parsing
  * Connection metadata extraction
  * Error code accuracy
- Add testing recommendations for kafka-go, Sarama, Java clients
- Provide roadmap for Phase 4 protocol compliance improvements

This review is essential before attempting integration with real Kafka clients
as current simplified implementations will fail with actual client libraries.
pull/7231/head
chrislu 2 months ago
parent
commit
5595dfd476
  1. 273
      weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md
  2. 9
      weed/mq/kafka/protocol/fetch.go
  3. 8
      weed/mq/kafka/protocol/handler.go
  4. 24
      weed/mq/kafka/protocol/joingroup.go
  5. 27
      weed/mq/kafka/protocol/offset_management.go
  6. 14
      weed/mq/kafka/protocol/produce.go

273
weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md

@ -0,0 +1,273 @@
# Kafka Protocol Compatibility Review & TODOs
## Overview
This document identifies areas in the current Kafka implementation that need attention for full protocol compatibility, including assumptions, simplifications, and potential issues.
## Critical Protocol Issues
### 🚨 HIGH PRIORITY - Protocol Breaking Issues
#### 1. **Record Batch Parsing (Produce API)**
**File**: `protocol/produce.go`
**Issues**:
- `parseRecordSet()` uses simplified parsing logic that doesn't handle the full Kafka record batch format
- Hardcoded assumptions about record batch structure
- Missing compression support (gzip, snappy, lz4, zstd)
- CRC validation is completely missing
**TODOs**:
```go
// TODO: Implement full Kafka record batch parsing
// - Support all record batch versions (v0, v1, v2)
// - Handle compression codecs (gzip, snappy, lz4, zstd)
// - Validate CRC32 checksums
// - Parse individual record headers, keys, values, timestamps
// - Handle transaction markers and control records
```
#### 2. **Request Parsing Assumptions**
**Files**: `protocol/offset_management.go`, `protocol/joingroup.go`, `protocol/consumer_coordination.go`
**Issues**:
- Most parsing functions have hardcoded topic/partition assumptions
- Missing support for array parsing (topics, partitions, group protocols)
- Simplified request structures that don't match real Kafka clients
**TODOs**:
```go
// TODO: Fix OffsetCommit/OffsetFetch request parsing
// Currently returns hardcoded "test-topic" with partition 0
// Need to parse actual topics array from request body
// TODO: Fix JoinGroup protocol parsing
// Currently ignores group protocols array and subscription metadata
// Need to extract actual subscribed topics from consumer metadata
// TODO: Add support for batch operations
// OffsetCommit can commit multiple topic-partitions
// LeaveGroup can handle multiple members leaving
```
#### 3. **Fetch Record Construction**
**File**: `protocol/fetch.go`
**Issues**:
- `constructRecordBatch()` creates fake record batches with dummy data
- Varint encoding is simplified to single bytes (incorrect)
- Missing proper record headers, timestamps, and metadata
**TODOs**:
```go
// TODO: Replace dummy record batch construction with real data
// - Read actual message data from SeaweedMQ/storage
// - Implement proper varint encoding/decoding
// - Support record headers and custom timestamps
// - Handle different record batch versions correctly
```
### ⚠️ MEDIUM PRIORITY - Compatibility Issues
#### 4. **API Version Support**
**File**: `protocol/handler.go`
**Issues**:
- ApiVersions response advertises max versions but implementations may not support all features
- No version-specific handling in most APIs
**TODOs**:
```go
// TODO: Add API version validation per request
// Different API versions have different request/response formats
// Need to validate apiVersion from request header and respond accordingly
// TODO: Update handleApiVersions to reflect actual supported features
// Current max versions may be too optimistic for partial implementations
```
#### 5. **Consumer Group Protocol Metadata**
**File**: `protocol/joingroup.go`
**Issues**:
- Consumer subscription extraction is hardcoded to return `["test-topic"]`
- Group protocol metadata parsing is completely stubbed
**TODOs**:
```go
// TODO: Implement proper consumer protocol metadata parsing
// Consumer clients send subscription information in protocol metadata
// Need to decode consumer subscription protocol format:
// - Version(2) + subscription topics + user data
// TODO: Support multiple assignment strategies properly
// Currently only basic range/roundrobin, need to parse client preferences
```
#### 6. **Error Code Mapping**
**Files**: Multiple protocol files
**Issues**:
- Some error codes may not match Kafka specifications exactly
- Missing error codes for edge cases
**TODOs**:
```go
// TODO: Verify all error codes match Kafka specification
// Check ErrorCode constants against official Kafka protocol docs
// Some custom error codes may not be recognized by clients
// TODO: Add missing error codes for:
// - Network errors, timeout errors
// - Quota exceeded, throttling
// - Security/authorization errors
```
### 🔧 LOW PRIORITY - Implementation Completeness
#### 7. **Connection Management**
**File**: `protocol/handler.go`
**Issues**:
- Basic connection handling without connection pooling
- No support for SASL authentication or SSL/TLS
- Missing connection metadata (client host, version)
**TODOs**:
```go
// TODO: Extract client connection metadata
// JoinGroup requests need actual client host instead of "unknown-host"
// Parse client version from request headers for better compatibility
// TODO: Add connection security support
// Support SASL/PLAIN, SASL/SCRAM authentication
// Support SSL/TLS encryption
```
#### 8. **Record Timestamps and Offsets**
**Files**: `protocol/produce.go`, `protocol/fetch.go`
**Issues**:
- Simplified timestamp handling
- Offset assignment may not match Kafka behavior exactly
**TODOs**:
```go
// TODO: Implement proper offset assignment strategy
// Kafka offsets are partition-specific and strictly increasing
// Current implementation may have gaps or inconsistencies
// TODO: Support timestamp types correctly
// Kafka supports CreateTime vs LogAppendTime
// Need to handle timestamp-based offset lookups properly
```
#### 9. **SeaweedMQ Integration Assumptions**
**File**: `integration/seaweedmq_handler.go`
**Issues**:
- Simplified record format conversion
- Single partition assumption for new topics
- Missing topic configuration support
**TODOs**:
```go
// TODO: Implement proper Kafka->SeaweedMQ record conversion
// Currently uses placeholder keys/values
// Need to extract actual record data from Kafka record batches
// TODO: Support configurable partition counts
// Currently hardcoded to 1 partition per topic
// Need to respect CreateTopics partition count requests
// TODO: Add topic configuration support
// Kafka topics have configs like retention, compression, cleanup policy
// Map these to SeaweedMQ topic settings
```
## Testing Compatibility Issues
### Missing Integration Tests
**TODOs**:
```go
// TODO: Add real Kafka client integration tests
// Test with kafka-go, Sarama, and other popular Go clients
// Verify producer/consumer workflows work end-to-end
// TODO: Add protocol conformance tests
// Use Kafka protocol test vectors if available
// Test edge cases and error conditions
// TODO: Add load testing
// Verify behavior under high throughput
// Test with multiple concurrent consumer groups
```
### Protocol Version Testing
**TODOs**:
```go
// TODO: Test multiple API versions
// Clients may use different API versions
// Ensure backward compatibility
// TODO: Test with different Kafka client libraries
// Java clients, Python clients, etc.
// Different clients may have different protocol expectations
```
## Performance & Scalability TODOs
### Memory Management
**TODOs**:
```go
// TODO: Add memory pooling for large messages
// Avoid allocating large byte slices for each request
// Reuse buffers for protocol encoding/decoding
// TODO: Implement streaming for large record batches
// Don't load entire batches into memory at once
// Stream records directly from storage to client
```
### Connection Handling
**TODOs**:
```go
// TODO: Add connection timeout handling
// Implement proper client timeout detection
// Clean up stale connections and consumer group members
// TODO: Add backpressure handling
// Implement flow control for high-throughput scenarios
// Prevent memory exhaustion during load spikes
```
## Immediate Action Items
### Phase 4 Priority List:
1. **Fix Record Batch Parsing** - Critical for real client compatibility
2. **Implement Proper Request Parsing** - Remove hardcoded assumptions
3. **Add Compression Support** - Essential for performance
4. **Real SeaweedMQ Integration** - Move beyond placeholder data
5. **Consumer Protocol Metadata** - Fix subscription handling
6. **API Version Handling** - Support multiple protocol versions
### Compatibility Validation:
1. **Test with kafka-go library** - Most popular Go Kafka client
2. **Test with Sarama library** - Alternative popular Go client
3. **Test with Java Kafka clients** - Reference implementation
4. **Performance benchmarking** - Compare against Apache Kafka
## Protocol Standards References
- **Kafka Protocol Guide**: https://kafka.apache.org/protocol.html
- **Record Batch Format**: Kafka protocol v2 record format specification
- **Consumer Protocol**: Group coordination and assignment protocol details
- **API Versioning**: How different API versions affect request/response format
## Notes on Current State
### What Works Well:
- Basic produce/consume flow for simple cases
- Consumer group coordination state management
- In-memory testing mode for development
- Graceful error handling for most common cases
### What Needs Work:
- Real-world client compatibility (requires fixing parsing issues)
- Performance under load (needs compression, streaming)
- Production deployment (needs security, monitoring)
- Edge case handling (various protocol versions, error conditions)
---
**This review should be updated as protocol implementations improve and more compatibility issues are discovered.**

9
weed/mq/kafka/protocol/fetch.go

@ -193,7 +193,14 @@ func (h *Handler) handleFetch(correlationID uint32, requestBody []byte) ([]byte,
} }
// constructRecordBatch creates a simplified Kafka record batch for testing // constructRecordBatch creates a simplified Kafka record batch for testing
// In a real implementation, this would read actual message data from storage
// TODO: CRITICAL - This function creates fake record batches with dummy data
// For real client compatibility need to:
// - Read actual message data from SeaweedMQ/storage
// - Construct proper record batch headers with correct CRC
// - Use proper varint encoding (not single-byte shortcuts)
// - Support different record batch versions
// - Handle compressed batches if messages were stored compressed
// Currently returns fake "message-N" data that no real client expects
func (h *Handler) constructRecordBatch(ledger interface{}, fetchOffset, highWaterMark int64) []byte { func (h *Handler) constructRecordBatch(ledger interface{}, fetchOffset, highWaterMark int64) []byte {
// For Phase 1, create a simple record batch with dummy messages // For Phase 1, create a simple record batch with dummy messages
// This simulates what would come from real message storage // This simulates what would come from real message storage

8
weed/mq/kafka/protocol/handler.go

@ -142,6 +142,9 @@ func (h *Handler) HandleConn(conn net.Conn) error {
size := binary.BigEndian.Uint32(sizeBytes[:]) size := binary.BigEndian.Uint32(sizeBytes[:])
if size == 0 || size > 1024*1024 { // 1MB limit if size == 0 || size > 1024*1024 { // 1MB limit
// TODO: Consider making message size limit configurable
// 1MB might be too restrictive for some use cases
// Kafka default max.message.bytes is often higher
return fmt.Errorf("invalid message size: %d", size) return fmt.Errorf("invalid message size: %d", size)
} }
@ -160,6 +163,11 @@ func (h *Handler) HandleConn(conn net.Conn) error {
apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) apiVersion := binary.BigEndian.Uint16(messageBuf[2:4])
correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8])
// TODO: IMPORTANT - API version validation is missing
// Different API versions have different request/response formats
// Need to validate apiVersion against supported versions for each API
// Currently ignoring apiVersion completely which may cause parsing errors
// Handle the request based on API key // Handle the request based on API key
var response []byte var response []byte
var err error var err error

24
weed/mq/kafka/protocol/joingroup.go

@ -118,7 +118,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, requestBody []byte) ([]b
member := &consumer.GroupMember{ member := &consumer.GroupMember{
ID: memberID, ID: memberID,
ClientID: request.GroupInstanceID, ClientID: request.GroupInstanceID,
ClientHost: "unknown", // TODO: extract from connection
ClientHost: "unknown", // TODO: extract from connection - needed for consumer group metadata
SessionTimeout: request.SessionTimeout, SessionTimeout: request.SessionTimeout,
RebalanceTimeout: request.RebalanceTimeout, RebalanceTimeout: request.RebalanceTimeout,
Subscription: h.extractSubscriptionFromProtocols(request.GroupProtocols), Subscription: h.extractSubscriptionFromProtocols(request.GroupProtocols),
@ -223,17 +223,22 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
offset += memberIDLength offset += memberIDLength
} }
// For simplicity, we'll assume basic protocol parsing
// In a full implementation, we'd parse the protocol type and protocols array
// TODO: CRITICAL - JoinGroup request parsing is incomplete
// Missing parsing of:
// - Group instance ID (for static membership)
// - Protocol type validation
// - Group protocols array (client's supported assignment strategies)
// - Protocol metadata (consumer subscriptions, user data)
// Without this, assignment strategies and subscriptions won't work with real clients
return &JoinGroupRequest{ return &JoinGroupRequest{
GroupID: groupID, GroupID: groupID,
SessionTimeout: sessionTimeout, SessionTimeout: sessionTimeout,
RebalanceTimeout: rebalanceTimeout, RebalanceTimeout: rebalanceTimeout,
MemberID: memberID, MemberID: memberID,
ProtocolType: "consumer",
ProtocolType: "consumer", // TODO: Parse from request
GroupProtocols: []GroupProtocol{ GroupProtocols: []GroupProtocol{
{Name: "range", Metadata: []byte{}},
{Name: "range", Metadata: []byte{}}, // TODO: Parse actual protocols from request
}, },
}, nil }, nil
} }
@ -328,8 +333,13 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in
} }
func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) []string { func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) []string {
// For simplicity, return a default subscription
// In a real implementation, we'd parse the protocol metadata to extract subscribed topics
// TODO: CRITICAL - Consumer subscription extraction is hardcoded to "test-topic"
// This breaks real Kafka consumers which send their actual subscriptions
// Consumer protocol metadata format (for "consumer" protocol type):
// - Version (2 bytes)
// - Topics array (4 bytes count + topic names)
// - User data (4 bytes length + data)
// Without fixing this, consumers will be assigned wrong topics
return []string{"test-topic"} return []string{"test-topic"}
} }

27
weed/mq/kafka/protocol/offset_management.go

@ -292,8 +292,14 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e
memberID := string(data[offset : offset+memberIDLength]) memberID := string(data[offset : offset+memberIDLength])
offset += memberIDLength offset += memberIDLength
// For simplicity, we'll create a basic request structure
// In a full implementation, we'd parse the full topics array
// TODO: CRITICAL - This parsing is completely broken for real clients
// Currently hardcoded to return "test-topic" with partition 0
// Real OffsetCommit requests contain:
// - RetentionTime (8 bytes, -1 for broker default)
// - Topics array with actual topic names
// - Partitions array with actual partition IDs and offsets
// - Optional group instance ID for static membership
// Without fixing this, no real Kafka client can commit offsets properly
return &OffsetCommitRequest{ return &OffsetCommitRequest{
GroupID: groupID, GroupID: groupID,
@ -302,9 +308,9 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e
RetentionTime: -1, // Use broker default RetentionTime: -1, // Use broker default
Topics: []OffsetCommitTopic{ Topics: []OffsetCommitTopic{
{ {
Name: "test-topic", // Simplified
Name: "test-topic", // TODO: Parse actual topic from request
Partitions: []OffsetCommitPartition{ Partitions: []OffsetCommitPartition{
{Index: 0, Offset: 0, LeaderEpoch: -1, Metadata: ""},
{Index: 0, Offset: 0, LeaderEpoch: -1, Metadata: ""}, // TODO: Parse actual partition data
}, },
}, },
}, },
@ -327,15 +333,20 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err
groupID := string(data[offset : offset+groupIDLength]) groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength offset += groupIDLength
// For simplicity, we'll create a basic request structure
// In a full implementation, we'd parse the full topics array
// TODO: CRITICAL - OffsetFetch parsing is also hardcoded
// Real clients send topics array with specific partitions to fetch
// Need to parse:
// - Topics array (4 bytes count + topics)
// - For each topic: name + partitions array
// - RequireStable flag for transactional consistency
// Currently will fail with any real Kafka client doing offset fetches
return &OffsetFetchRequest{ return &OffsetFetchRequest{
GroupID: groupID, GroupID: groupID,
Topics: []OffsetFetchTopic{ Topics: []OffsetFetchTopic{
{ {
Name: "test-topic", // Simplified
Partitions: []int32{0}, // Fetch partition 0
Name: "test-topic", // TODO: Parse actual topics from request
Partitions: []int32{0}, // TODO: Parse actual partitions or empty for "all"
}, },
}, },
RequireStable: false, RequireStable: false,

14
weed/mq/kafka/protocol/produce.go

@ -169,7 +169,13 @@ func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byt
} }
// parseRecordSet parses a Kafka record set and returns the number of records and total size // parseRecordSet parses a Kafka record set and returns the number of records and total size
// This is a simplified parser for Phase 1 - just counts valid records
// TODO: CRITICAL - This is a simplified parser that needs complete rewrite for protocol compatibility
// Missing:
// - Proper record batch format parsing (v0, v1, v2)
// - Compression support (gzip, snappy, lz4, zstd)
// - CRC32 validation
// - Transaction markers and control records
// - Individual record extraction (key, value, headers, timestamps)
func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, totalSize int32, err error) { func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, totalSize int32, err error) {
if len(recordSetData) < 12 { // minimum record set size if len(recordSetData) < 12 { // minimum record set size
return 0, 0, fmt.Errorf("record set too small") return 0, 0, fmt.Errorf("record set too small")
@ -218,6 +224,12 @@ func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetDat
} }
// extractFirstRecord extracts the first record from a Kafka record set (simplified) // extractFirstRecord extracts the first record from a Kafka record set (simplified)
// TODO: CRITICAL - This function returns placeholder data instead of parsing real records
// For real client compatibility, need to:
// - Parse record batch header properly
// - Extract actual key/value from first record in batch
// - Handle compressed record batches
// - Support all record formats (v0, v1, v2)
func (h *Handler) extractFirstRecord(recordSetData []byte) ([]byte, []byte) { func (h *Handler) extractFirstRecord(recordSetData []byte) ([]byte, []byte) {
// For Phase 2, create a simple placeholder record // For Phase 2, create a simple placeholder record
// This represents what would be extracted from the actual Kafka record batch // This represents what would be extracted from the actual Kafka record batch

Loading…
Cancel
Save