diff --git a/weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md b/weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md new file mode 100644 index 000000000..33f7b998e --- /dev/null +++ b/weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md @@ -0,0 +1,273 @@ +# Kafka Protocol Compatibility Review & TODOs + +## Overview +This document identifies areas in the current Kafka implementation that need attention for full protocol compatibility, including assumptions, simplifications, and potential issues. + +## Critical Protocol Issues + +### 🚨 HIGH PRIORITY - Protocol Breaking Issues + +#### 1. **Record Batch Parsing (Produce API)** +**File**: `protocol/produce.go` +**Issues**: +- `parseRecordSet()` uses simplified parsing logic that doesn't handle the full Kafka record batch format +- Hardcoded assumptions about record batch structure +- Missing compression support (gzip, snappy, lz4, zstd) +- CRC validation is completely missing + +**TODOs**: +```go +// TODO: Implement full Kafka record batch parsing +// - Support all record batch versions (v0, v1, v2) +// - Handle compression codecs (gzip, snappy, lz4, zstd) +// - Validate CRC32 checksums +// - Parse individual record headers, keys, values, timestamps +// - Handle transaction markers and control records +``` + +#### 2. **Request Parsing Assumptions** +**Files**: `protocol/offset_management.go`, `protocol/joingroup.go`, `protocol/consumer_coordination.go` +**Issues**: +- Most parsing functions have hardcoded topic/partition assumptions +- Missing support for array parsing (topics, partitions, group protocols) +- Simplified request structures that don't match real Kafka clients + +**TODOs**: +```go +// TODO: Fix OffsetCommit/OffsetFetch request parsing +// Currently returns hardcoded "test-topic" with partition 0 +// Need to parse actual topics array from request body + +// TODO: Fix JoinGroup protocol parsing +// Currently ignores group protocols array and subscription metadata +// Need to extract actual subscribed topics from consumer metadata + +// TODO: Add support for batch operations +// OffsetCommit can commit multiple topic-partitions +// LeaveGroup can handle multiple members leaving +``` + +#### 3. **Fetch Record Construction** +**File**: `protocol/fetch.go` +**Issues**: +- `constructRecordBatch()` creates fake record batches with dummy data +- Varint encoding is simplified to single bytes (incorrect) +- Missing proper record headers, timestamps, and metadata + +**TODOs**: +```go +// TODO: Replace dummy record batch construction with real data +// - Read actual message data from SeaweedMQ/storage +// - Implement proper varint encoding/decoding +// - Support record headers and custom timestamps +// - Handle different record batch versions correctly +``` + +### ⚠️ MEDIUM PRIORITY - Compatibility Issues + +#### 4. **API Version Support** +**File**: `protocol/handler.go` +**Issues**: +- ApiVersions response advertises max versions but implementations may not support all features +- No version-specific handling in most APIs + +**TODOs**: +```go +// TODO: Add API version validation per request +// Different API versions have different request/response formats +// Need to validate apiVersion from request header and respond accordingly + +// TODO: Update handleApiVersions to reflect actual supported features +// Current max versions may be too optimistic for partial implementations +``` + +#### 5. **Consumer Group Protocol Metadata** +**File**: `protocol/joingroup.go` +**Issues**: +- Consumer subscription extraction is hardcoded to return `["test-topic"]` +- Group protocol metadata parsing is completely stubbed + +**TODOs**: +```go +// TODO: Implement proper consumer protocol metadata parsing +// Consumer clients send subscription information in protocol metadata +// Need to decode consumer subscription protocol format: +// - Version(2) + subscription topics + user data + +// TODO: Support multiple assignment strategies properly +// Currently only basic range/roundrobin, need to parse client preferences +``` + +#### 6. **Error Code Mapping** +**Files**: Multiple protocol files +**Issues**: +- Some error codes may not match Kafka specifications exactly +- Missing error codes for edge cases + +**TODOs**: +```go +// TODO: Verify all error codes match Kafka specification +// Check ErrorCode constants against official Kafka protocol docs +// Some custom error codes may not be recognized by clients + +// TODO: Add missing error codes for: +// - Network errors, timeout errors +// - Quota exceeded, throttling +// - Security/authorization errors +``` + +### 🔧 LOW PRIORITY - Implementation Completeness + +#### 7. **Connection Management** +**File**: `protocol/handler.go` +**Issues**: +- Basic connection handling without connection pooling +- No support for SASL authentication or SSL/TLS +- Missing connection metadata (client host, version) + +**TODOs**: +```go +// TODO: Extract client connection metadata +// JoinGroup requests need actual client host instead of "unknown-host" +// Parse client version from request headers for better compatibility + +// TODO: Add connection security support +// Support SASL/PLAIN, SASL/SCRAM authentication +// Support SSL/TLS encryption +``` + +#### 8. **Record Timestamps and Offsets** +**Files**: `protocol/produce.go`, `protocol/fetch.go` +**Issues**: +- Simplified timestamp handling +- Offset assignment may not match Kafka behavior exactly + +**TODOs**: +```go +// TODO: Implement proper offset assignment strategy +// Kafka offsets are partition-specific and strictly increasing +// Current implementation may have gaps or inconsistencies + +// TODO: Support timestamp types correctly +// Kafka supports CreateTime vs LogAppendTime +// Need to handle timestamp-based offset lookups properly +``` + +#### 9. **SeaweedMQ Integration Assumptions** +**File**: `integration/seaweedmq_handler.go` +**Issues**: +- Simplified record format conversion +- Single partition assumption for new topics +- Missing topic configuration support + +**TODOs**: +```go +// TODO: Implement proper Kafka->SeaweedMQ record conversion +// Currently uses placeholder keys/values +// Need to extract actual record data from Kafka record batches + +// TODO: Support configurable partition counts +// Currently hardcoded to 1 partition per topic +// Need to respect CreateTopics partition count requests + +// TODO: Add topic configuration support +// Kafka topics have configs like retention, compression, cleanup policy +// Map these to SeaweedMQ topic settings +``` + +## Testing Compatibility Issues + +### Missing Integration Tests +**TODOs**: +```go +// TODO: Add real Kafka client integration tests +// Test with kafka-go, Sarama, and other popular Go clients +// Verify producer/consumer workflows work end-to-end + +// TODO: Add protocol conformance tests +// Use Kafka protocol test vectors if available +// Test edge cases and error conditions + +// TODO: Add load testing +// Verify behavior under high throughput +// Test with multiple concurrent consumer groups +``` + +### Protocol Version Testing +**TODOs**: +```go +// TODO: Test multiple API versions +// Clients may use different API versions +// Ensure backward compatibility + +// TODO: Test with different Kafka client libraries +// Java clients, Python clients, etc. +// Different clients may have different protocol expectations +``` + +## Performance & Scalability TODOs + +### Memory Management +**TODOs**: +```go +// TODO: Add memory pooling for large messages +// Avoid allocating large byte slices for each request +// Reuse buffers for protocol encoding/decoding + +// TODO: Implement streaming for large record batches +// Don't load entire batches into memory at once +// Stream records directly from storage to client +``` + +### Connection Handling +**TODOs**: +```go +// TODO: Add connection timeout handling +// Implement proper client timeout detection +// Clean up stale connections and consumer group members + +// TODO: Add backpressure handling +// Implement flow control for high-throughput scenarios +// Prevent memory exhaustion during load spikes +``` + +## Immediate Action Items + +### Phase 4 Priority List: +1. **Fix Record Batch Parsing** - Critical for real client compatibility +2. **Implement Proper Request Parsing** - Remove hardcoded assumptions +3. **Add Compression Support** - Essential for performance +4. **Real SeaweedMQ Integration** - Move beyond placeholder data +5. **Consumer Protocol Metadata** - Fix subscription handling +6. **API Version Handling** - Support multiple protocol versions + +### Compatibility Validation: +1. **Test with kafka-go library** - Most popular Go Kafka client +2. **Test with Sarama library** - Alternative popular Go client +3. **Test with Java Kafka clients** - Reference implementation +4. **Performance benchmarking** - Compare against Apache Kafka + +## Protocol Standards References + +- **Kafka Protocol Guide**: https://kafka.apache.org/protocol.html +- **Record Batch Format**: Kafka protocol v2 record format specification +- **Consumer Protocol**: Group coordination and assignment protocol details +- **API Versioning**: How different API versions affect request/response format + +## Notes on Current State + +### What Works Well: +- Basic produce/consume flow for simple cases +- Consumer group coordination state management +- In-memory testing mode for development +- Graceful error handling for most common cases + +### What Needs Work: +- Real-world client compatibility (requires fixing parsing issues) +- Performance under load (needs compression, streaming) +- Production deployment (needs security, monitoring) +- Edge case handling (various protocol versions, error conditions) + +--- + +**This review should be updated as protocol implementations improve and more compatibility issues are discovered.** diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index bcdd2ce5e..5f5f2088c 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -193,7 +193,14 @@ func (h *Handler) handleFetch(correlationID uint32, requestBody []byte) ([]byte, } // constructRecordBatch creates a simplified Kafka record batch for testing -// In a real implementation, this would read actual message data from storage +// TODO: CRITICAL - This function creates fake record batches with dummy data +// For real client compatibility need to: +// - Read actual message data from SeaweedMQ/storage +// - Construct proper record batch headers with correct CRC +// - Use proper varint encoding (not single-byte shortcuts) +// - Support different record batch versions +// - Handle compressed batches if messages were stored compressed +// Currently returns fake "message-N" data that no real client expects func (h *Handler) constructRecordBatch(ledger interface{}, fetchOffset, highWaterMark int64) []byte { // For Phase 1, create a simple record batch with dummy messages // This simulates what would come from real message storage diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 7aad2eba8..656ad71bd 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -142,6 +142,9 @@ func (h *Handler) HandleConn(conn net.Conn) error { size := binary.BigEndian.Uint32(sizeBytes[:]) if size == 0 || size > 1024*1024 { // 1MB limit + // TODO: Consider making message size limit configurable + // 1MB might be too restrictive for some use cases + // Kafka default max.message.bytes is often higher return fmt.Errorf("invalid message size: %d", size) } @@ -159,6 +162,11 @@ func (h *Handler) HandleConn(conn net.Conn) error { apiKey := binary.BigEndian.Uint16(messageBuf[0:2]) apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) + + // TODO: IMPORTANT - API version validation is missing + // Different API versions have different request/response formats + // Need to validate apiVersion against supported versions for each API + // Currently ignoring apiVersion completely which may cause parsing errors // Handle the request based on API key var response []byte diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 03f1d70ff..08aba4451 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -118,7 +118,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, requestBody []byte) ([]b member := &consumer.GroupMember{ ID: memberID, ClientID: request.GroupInstanceID, - ClientHost: "unknown", // TODO: extract from connection + ClientHost: "unknown", // TODO: extract from connection - needed for consumer group metadata SessionTimeout: request.SessionTimeout, RebalanceTimeout: request.RebalanceTimeout, Subscription: h.extractSubscriptionFromProtocols(request.GroupProtocols), @@ -223,17 +223,22 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) offset += memberIDLength } - // For simplicity, we'll assume basic protocol parsing - // In a full implementation, we'd parse the protocol type and protocols array + // TODO: CRITICAL - JoinGroup request parsing is incomplete + // Missing parsing of: + // - Group instance ID (for static membership) + // - Protocol type validation + // - Group protocols array (client's supported assignment strategies) + // - Protocol metadata (consumer subscriptions, user data) + // Without this, assignment strategies and subscriptions won't work with real clients return &JoinGroupRequest{ GroupID: groupID, SessionTimeout: sessionTimeout, RebalanceTimeout: rebalanceTimeout, MemberID: memberID, - ProtocolType: "consumer", + ProtocolType: "consumer", // TODO: Parse from request GroupProtocols: []GroupProtocol{ - {Name: "range", Metadata: []byte{}}, + {Name: "range", Metadata: []byte{}}, // TODO: Parse actual protocols from request }, }, nil } @@ -328,8 +333,13 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in } func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) []string { - // For simplicity, return a default subscription - // In a real implementation, we'd parse the protocol metadata to extract subscribed topics + // TODO: CRITICAL - Consumer subscription extraction is hardcoded to "test-topic" + // This breaks real Kafka consumers which send their actual subscriptions + // Consumer protocol metadata format (for "consumer" protocol type): + // - Version (2 bytes) + // - Topics array (4 bytes count + topic names) + // - User data (4 bytes length + data) + // Without fixing this, consumers will be assigned wrong topics return []string{"test-topic"} } diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index d110974ec..bba5f8fb9 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -292,8 +292,14 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e memberID := string(data[offset : offset+memberIDLength]) offset += memberIDLength - // For simplicity, we'll create a basic request structure - // In a full implementation, we'd parse the full topics array + // TODO: CRITICAL - This parsing is completely broken for real clients + // Currently hardcoded to return "test-topic" with partition 0 + // Real OffsetCommit requests contain: + // - RetentionTime (8 bytes, -1 for broker default) + // - Topics array with actual topic names + // - Partitions array with actual partition IDs and offsets + // - Optional group instance ID for static membership + // Without fixing this, no real Kafka client can commit offsets properly return &OffsetCommitRequest{ GroupID: groupID, @@ -302,9 +308,9 @@ func (h *Handler) parseOffsetCommitRequest(data []byte) (*OffsetCommitRequest, e RetentionTime: -1, // Use broker default Topics: []OffsetCommitTopic{ { - Name: "test-topic", // Simplified + Name: "test-topic", // TODO: Parse actual topic from request Partitions: []OffsetCommitPartition{ - {Index: 0, Offset: 0, LeaderEpoch: -1, Metadata: ""}, + {Index: 0, Offset: 0, LeaderEpoch: -1, Metadata: ""}, // TODO: Parse actual partition data }, }, }, @@ -327,15 +333,20 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err groupID := string(data[offset : offset+groupIDLength]) offset += groupIDLength - // For simplicity, we'll create a basic request structure - // In a full implementation, we'd parse the full topics array + // TODO: CRITICAL - OffsetFetch parsing is also hardcoded + // Real clients send topics array with specific partitions to fetch + // Need to parse: + // - Topics array (4 bytes count + topics) + // - For each topic: name + partitions array + // - RequireStable flag for transactional consistency + // Currently will fail with any real Kafka client doing offset fetches return &OffsetFetchRequest{ GroupID: groupID, Topics: []OffsetFetchTopic{ { - Name: "test-topic", // Simplified - Partitions: []int32{0}, // Fetch partition 0 + Name: "test-topic", // TODO: Parse actual topics from request + Partitions: []int32{0}, // TODO: Parse actual partitions or empty for "all" }, }, RequireStable: false, diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 3a341214a..c401a3178 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -169,7 +169,13 @@ func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byt } // parseRecordSet parses a Kafka record set and returns the number of records and total size -// This is a simplified parser for Phase 1 - just counts valid records +// TODO: CRITICAL - This is a simplified parser that needs complete rewrite for protocol compatibility +// Missing: +// - Proper record batch format parsing (v0, v1, v2) +// - Compression support (gzip, snappy, lz4, zstd) +// - CRC32 validation +// - Transaction markers and control records +// - Individual record extraction (key, value, headers, timestamps) func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, totalSize int32, err error) { if len(recordSetData) < 12 { // minimum record set size return 0, 0, fmt.Errorf("record set too small") @@ -218,6 +224,12 @@ func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetDat } // extractFirstRecord extracts the first record from a Kafka record set (simplified) +// TODO: CRITICAL - This function returns placeholder data instead of parsing real records +// For real client compatibility, need to: +// - Parse record batch header properly +// - Extract actual key/value from first record in batch +// - Handle compressed record batches +// - Support all record formats (v0, v1, v2) func (h *Handler) extractFirstRecord(recordSetData []byte) ([]byte, []byte) { // For Phase 2, create a simple placeholder record // This represents what would be extracted from the actual Kafka record batch