diff --git a/weed/mq/KAFKA_DEV_PLAN.md b/weed/mq/KAFKA_DEV_PLAN.md index fe94fce49..a7552fd8e 100644 --- a/weed/mq/KAFKA_DEV_PLAN.md +++ b/weed/mq/KAFKA_DEV_PLAN.md @@ -24,11 +24,10 @@ - Message routing: `hash(key) -> kafka partition -> ring slot -> SMQ partition covering that slot`. - SMQ’s internal segment split/merge remains transparent; ordering is preserved per Kafka partition. -### Offset Model -- Kafka requires strictly increasing integer offsets per partition; SMQ uses timestamps. -- Maintain a per-partition offset ledger mapping `kOffset -> tsNs (+ size)`, with a sparse index for seeks. -- Earliest/latest offsets and timestamp-based lookup are served from the ledger and its index. -- Consumer group commits store Kafka offsets (not timestamps). On Fetch, offsets are translated to timestamps. +### Offset Model (updated) +- Use SMQ native per-partition sequential offsets. Kafka offsets map 1:1 to SMQ offsets. +- Earliest/latest and timestamp-based lookups come from SMQ APIs; minimize translation. +- Consumer group commits store SMQ offsets directly. ### Consumer Groups and Assignment - Gateway implements Kafka group coordinator: Join/Sync/Heartbeat/Leave. @@ -37,8 +36,8 @@ ### Protocol Coverage (initial) - ApiVersions, Metadata, CreateTopics/DeleteTopics. -- Produce (v2+) uncompressed to start; Fetch (v2+) with wait/maxBytes semantics. -- ListOffsets (earliest/latest; timestamp in phase 2). +- Produce (v2+) with record-batch v2 parsing, compression, and CRC validation; Fetch (v2+) with wait/maxBytes semantics. +- ListOffsets (earliest/latest; timestamp in a later phase). - FindCoordinator/JoinGroup/SyncGroup/Heartbeat/LeaveGroup. - OffsetCommit/OffsetFetch. @@ -52,12 +51,12 @@ ### Compatibility Limits (initial) - No idempotent producers, transactions, or compaction policies. -- Compression support added in phase 2 (GZIP/Snappy/LZ4/ZSTD). +- Compression codecs (GZIP/Snappy/LZ4/ZSTD) are available via the record-batch parser. ### Milestones -- **M1**: Gateway skeleton; ApiVersions/Metadata/Create/Delete; single-partition Produce/Fetch (no compression); plaintext; initial offset ledger. +- **M1**: Gateway skeleton; ApiVersions/Metadata/Create/Delete; single-partition Produce/Fetch; plaintext; SMQ native offsets. - **M2**: Multi-partition mapping, ListOffsets (earliest/latest), OffsetCommit/Fetch, group coordinator (Range), TLS. -- **M3**: Compression codecs, timestamp ListOffsets, Sticky assignor, SASL/PLAIN, metrics. +- **M3**: Record-batch compression codecs + CRC; timestamp ListOffsets; Sticky assignor; SASL/PLAIN; metrics. - **M4**: SCRAM, admin HTTP, ledger compaction tooling, performance tuning. - **M5** (optional): Idempotent producers groundwork, EOS design exploration. @@ -68,8 +67,8 @@ ### Scope - Kafka Gateway process scaffolding and configuration. - Protocol: ApiVersions, Metadata, CreateTopics, DeleteTopics. -- Produce (single topic-partition path) and Fetch for uncompressed records. -- Basic filer-backed topic registry and offset ledger (append-only + sparse index stub). +- Produce (single topic-partition path) and Fetch using v2 record-batch parser with compression and CRC. +- Basic filer-backed topic registry; offsets via SMQ native offsets (no separate ledger files). - Plaintext only; no consumer groups yet (direct Fetch by offset). ### Deliverables @@ -95,9 +94,9 @@ - ApiVersions: advertise minimal supported versions. - Metadata: topics/partitions and leader endpoints (this gateway instance). - CreateTopics/DeleteTopics: validate, persist topic metadata in filer, create SMQ topic. - - ListOffsets: earliest/latest only using the ledger bounds. - - Produce: parse record batches (uncompressed); per record compute Kafka offset; publish to SMQ; return baseOffset. - - Fetch: translate Kafka offset -> tsNs via ledger; read from SMQ starting at tsNs; return records honoring `maxBytes`/`maxWait`. + - ListOffsets: earliest/latest using SMQ bounds. + - Produce: parse v2 record batches (compressed/uncompressed), extract records, publish to SMQ; return baseOffset. + - Fetch: read from SMQ starting at requested offset; construct proper v2 record batches honoring `maxBytes`/`maxWait`. 3) Topic Registry and Mapping - Define `meta.json` schema: @@ -105,39 +104,30 @@ - Map Kafka partition id to SMQ ring range: divide ring (4096) into `partitions` contiguous ranges. - Enforce fixed partition count post-create. -4) Offset Ledger (M1 minimal) -- Append-only `ledger.log` entries: `varint(kOffsetDelta), varint(tsNsDelta), varint(size)` per record; batched fsync policy. -- Maintain in-memory `lastKafkaOffset` and `lastTsNs` per partition; write periodic checkpoints every N records. -- `ledger.index` sparse index format (stub in M1): record periodic `(kOffset, filePos)`. -- APIs: - - `AssignOffsets(batchCount) -> baseOffset` (reserve range atomically per partition). - - `AppendOffsets(kOffset, tsNs, size)` batched. - - `Translate(kOffset) -> tsNs` (linear forward from nearest checkpoint/index in M1). - - `Earliest()`, `Latest()` from on-disk checkpoints + tail state. +4) Offset Handling (M1) +- Use SMQ native offsets; remove separate ledger and translation in the gateway. +- Earliest/Latest come from SMQ; timestamp lookups added in later phase. 5) Produce Path - For each topic-partition in request: - Validate topic existence and partition id. - - Reserve offsets for all records in the batch. - - For each record: compute SMQ key/value/headers; timestamp = client-provided or broker time. - - Publish to SMQ via broker gRPC (batch if available). On success, append `(kOffset, tsNs, size)` to ledger. - - Return `baseOffset` per partition. + - Parse record-batch v2; extract records (varints/headers), handle compression and CRC. + - Publish to SMQ via broker (batch if available); SMQ assigns offsets; return `baseOffset` per partition. 6) Fetch Path (no groups) - For each topic-partition in request: - - If offset is `-1` (latest) or `-2` (earliest), use ledger bounds. - - Translate offset to `tsNs` via ledger; start a bounded scan from SMQ at `tsNs`. - - Page results into Kafka record sets up to `maxBytes` or `minBytes`/`maxWait` semantics. - - Close scan when request satisfied; no long-lived group sessions in M1. + - If offset is `-1` (latest) or `-2` (earliest), use SMQ bounds. + - Read from SMQ starting at the requested offset; construct proper v2 record batches. + - Page results up to `maxBytes` or `minBytes`/`maxWait` semantics. 7) Metadata and SMQ Integration - Create/delete topic maps to SMQ topic lifecycle using existing MQ APIs. - No auto-scaling of partitions in M1 (Kafka partition count fixed). 8) Testing -- Unit tests for ledger encode/decode, earliest/latest, translate. +- Unit tests for record-batch parser (compression, CRC), earliest/latest via SMQ. - E2E: - - sarama producer -> gateway -> SMQ; then fetch and validate ordering/offsets. + - sarama producer -> gateway -> SMQ; fetch and validate ordering/offsets. - kafka-go fetch from earliest/latest. - Metadata and create/delete topic via Kafka Admin client (happy path). @@ -151,6 +141,6 @@ ### Open Questions / Follow-ups - Exact `ApiVersions` and version ranges to advertise for maximal client compatibility. - Whether to expose namespace as Kafka cluster or encode in topic names (`ns.topic`). -- Offset ledger compaction cadence and background tasks (defer to M3/M4). +- Offset state compaction not applicable in gateway; defer SMQ-side retention considerations to later phases. diff --git a/weed/mq/KAFKA_PHASE3_PLAN.md b/weed/mq/KAFKA_PHASE3_PLAN.md index 3be8abd8e..6c92addbb 100644 --- a/weed/mq/KAFKA_PHASE3_PLAN.md +++ b/weed/mq/KAFKA_PHASE3_PLAN.md @@ -36,8 +36,8 @@ Phase 3 transforms the Kafka Gateway from a basic producer/consumer system into **Record Batch Improvements:** - Full Kafka record format parsing (v0, v1, v2) -- Compression support (gzip, snappy, lz4, zstd) -- Proper CRC validation +- Compression support (gzip, snappy, lz4, zstd) — IMPLEMENTED +- Proper CRC validation — IMPLEMENTED - Transaction markers handling - Timestamp extraction and validation @@ -104,8 +104,8 @@ Phase 3 transforms the Kafka Gateway from a basic producer/consumer system into 4. LeaveGroup handling ### Step 4: Advanced Record Processing (2-3 days) -1. Full record batch parsing -2. Compression codec support +1. Full record parsing and real Fetch batch construction +2. Compression codecs and CRC (done) — focus on integration and tests 3. Performance optimizations 4. Memory management @@ -167,8 +167,8 @@ Consumer States: - ✅ Automatic partition rebalancing - ✅ Offset commit/fetch functionality - ✅ Consumer failure handling -- ✅ Full Kafka record format support -- ✅ Compression support for major codecs +- ✅ Full Kafka record format support (v2 with real records) +- ✅ Compression support for major codecs (already available) ### Performance Requirements - ✅ Handle 10k+ messages/second per partition diff --git a/weed/mq/kafka/API_VERSION_MATRIX.md b/weed/mq/kafka/API_VERSION_MATRIX.md index 80457102e..ef305ed84 100644 --- a/weed/mq/kafka/API_VERSION_MATRIX.md +++ b/weed/mq/kafka/API_VERSION_MATRIX.md @@ -14,36 +14,26 @@ This document audits the advertised API versions in `handleApiVersions()` agains | 0 | Produce | v0-v7 | v0-v7 | ✅ Match | None | | 1 | Fetch | v0-v7 | v0-v7 | ✅ Match | None | | 2 | ListOffsets | v0-v2 | v0-v2 | ✅ Match | None | -| 19 | CreateTopics | v0-v4 | v0-v5 | ❌ Mismatch | Update advertised to v0-v5 | +| 19 | CreateTopics | v0-v5 | v0-v5 | ✅ Match | None | | 20 | DeleteTopics | v0-v4 | v0-v4 | ✅ Match | None | | 11 | JoinGroup | v0-v7 | v0-v7 | ✅ Match | None | | 14 | SyncGroup | v0-v5 | v0-v5 | ✅ Match | None | | 8 | OffsetCommit | v0-v2 | v0-v2 | ✅ Match | None | -| 9 | OffsetFetch | v0-v2 | v0-v5+ | ❌ MAJOR Mismatch | Update advertised to v0-v5 | +| 9 | OffsetFetch | v0-v5 | v0-v5 | ✅ Match | None | | 10 | FindCoordinator | v0-v2 | v0-v2 | ✅ Match | None | | 12 | Heartbeat | v0-v4 | v0-v4 | ✅ Match | None | | 13 | LeaveGroup | v0-v4 | v0-v4 | ✅ Match | None | ## Detailed Analysis -### 1. OffsetFetch API (Key 9) - CRITICAL MISMATCH -- **Advertised**: v0-v2 (max version 2) -- **Actually Implemented**: Up to v5+ - - **Evidence**: `buildOffsetFetchResponse()` includes `if apiVersion >= 5` for leader epoch - - **Evidence**: `if apiVersion >= 3` for throttle time -- **Impact**: Clients may not use advanced features available in v3-v5 -- **Action**: Update advertised max version from 2 to 5 - -### 2. CreateTopics API (Key 19) - MINOR MISMATCH -- **Advertised**: v0-v4 (max version 4) -- **Actually Implemented**: v0-v5 - - **Evidence**: `handleCreateTopics()` routes v5 requests to `handleCreateTopicsV2Plus()` - - **Evidence**: Tests validate v0-v5 versions -- **Impact**: v5 clients may not connect expecting v5 support -- **Action**: Update advertised max version from 4 to 5 - -### 3. Validation vs Advertisement Inconsistency -The `validateAPIVersion()` function matches the advertised versions, which means it will incorrectly reject valid v3-v5 OffsetFetch requests that the handler can actually process. +### 1. OffsetFetch API (Key 9) +- Advertised and implemented aligned at v0–v5. + +### 2. CreateTopics API (Key 19) +- Advertised and implemented aligned at v0–v5. + +### Validation vs Advertisement Consistency +`validateAPIVersion()` and `handleApiVersions()` are consistent with the supported ranges. ## Implementation Details @@ -58,18 +48,15 @@ The `validateAPIVersion()` function matches the advertised versions, which means ## Recommendations -1. **Immediate Fix**: Update `handleApiVersions()` to advertise correct max versions -2. **Consistency Check**: Update `validateAPIVersion()` to match the corrected advertised versions -3. **Testing**: Verify that higher version clients can successfully connect and use advanced features -4. **Documentation**: Update any client guidance to reflect the correct supported versions +1. Re-verify after protocol changes to keep the matrix accurate +2. Extend coverage as implementations grow; keep tests for version guards -## Test Verification Needed +## Test Verification -After fixes: -1. Test OffsetFetch v3, v4, v5 with real Kafka clients -2. Test CreateTopics v5 with real Kafka clients -3. Verify throttle_time_ms and leader_epoch are correctly populated -4. Ensure version validation doesn't reject valid higher version requests +1. Exercise OffsetFetch v3–v5 with kafka-go and Sarama +2. Exercise CreateTopics v5; verify fields and tagged fields +3. Verify throttle_time_ms and leader_epoch population +4. Ensure version validation permits supported versions only ## Conservative Alternative diff --git a/weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md b/weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md index 71ef408f5..7ba4dfe23 100644 --- a/weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md +++ b/weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md @@ -8,21 +8,19 @@ This document identifies areas in the current Kafka implementation that need att ### HIGH PRIORITY - Protocol Breaking Issues #### 1. **Record Batch Parsing (Produce API)** -**File**: `protocol/produce.go` -**Issues**: -- `parseRecordSet()` uses simplified parsing logic that doesn't handle the full Kafka record batch format -- Hardcoded assumptions about record batch structure -- Missing compression support (gzip, snappy, lz4, zstd) -- CRC validation is completely missing - +**Files**: `protocol/record_batch_parser.go`, `protocol/produce.go` +**Status**: +- Compression implemented (gzip, snappy, lz4, zstd) +- CRC validation available in `ParseRecordBatchWithValidation` +**Remaining**: +- Parse individual records (varints, headers, control records, timestamps) +- Integrate real record extraction into Produce path (replace simplified fallbacks) **TODOs**: ```go -// TODO: Implement full Kafka record batch parsing -// - Support all record batch versions (v0, v1, v2) -// - Handle compression codecs (gzip, snappy, lz4, zstd) -// - Validate CRC32 checksums -// - Parse individual record headers, keys, values, timestamps -// - Handle transaction markers and control records +// TODO: Implement full record parsing for v2 batches +// - Varint decode for per-record fields and headers +// - Control records and transaction markers +// - Accurate timestamp handling ``` #### 2. **Request Parsing Assumptions** @@ -49,27 +47,25 @@ This document identifies areas in the current Kafka implementation that need att #### 3. **Fetch Record Construction** **File**: `protocol/fetch.go` -**Issues**: -- `constructRecordBatch()` creates fake record batches with dummy data -- Varint encoding is simplified to single bytes (incorrect) -- Missing proper record headers, timestamps, and metadata - +**Status**: +- Basic batching works; needs proper record encoding +**Remaining**: +- Build real record batches from stored records with proper varints and headers +- Honor timestamps and per-record metadata **TODOs**: ```go -// TODO: Replace dummy record batch construction with real data -// - Read actual message data from SeaweedMQ/storage -// - Implement proper varint encoding/decoding -// - Support record headers and custom timestamps -// - Handle different record batch versions correctly +// TODO: Implement real batch construction for Fetch +// - Proper varint encode for lengths and deltas +// - Include headers and correct timestamps +// - Support v2 record batch layout end-to-end ``` ### MEDIUM PRIORITY - Compatibility Issues #### 4. **API Version Support** **File**: `protocol/handler.go` -**Issues**: -- ApiVersions response advertises max versions but implementations may not support all features -- No version-specific handling in most APIs +**Status**: Advertised ranges updated to match implemented (e.g., CreateTopics v5, OffsetFetch v5) +**Remaining**: Maintain alignment as handlers evolve; add stricter per-version validation paths **TODOs**: ```go @@ -140,7 +136,7 @@ This document identifies areas in the current Kafka implementation that need att **Files**: `protocol/produce.go`, `protocol/fetch.go` **Issues**: - Simplified timestamp handling -- Offset assignment may not match Kafka behavior exactly +**Note**: Offset assignment is handled by SMQ native offsets; ensure Kafka-visible semantics map correctly **TODOs**: ```go @@ -234,12 +230,10 @@ This document identifies areas in the current Kafka implementation that need att ## Immediate Action Items ### Phase 4 Priority List: -1. **Fix Record Batch Parsing** - Critical for real client compatibility -2. **Implement Proper Request Parsing** - Remove hardcoded assumptions -3. **Add Compression Support** - Essential for performance -4. **Real SeaweedMQ Integration** - Move beyond placeholder data -5. **Consumer Protocol Metadata** - Fix subscription handling -6. **API Version Handling** - Support multiple protocol versions +1. Full record parsing for Produce (v2) and real Fetch batch construction +2. Proper request parsing (arrays) in OffsetCommit/OffsetFetch and group APIs +3. Consumer protocol metadata parsing (Join/Sync) +4. Maintain API version alignment and validation ### Compatibility Validation: 1. **Test with kafka-go library** - Most popular Go Kafka client diff --git a/weed/mq/kafka/TODO.md b/weed/mq/kafka/TODO.md new file mode 100644 index 000000000..5c40eb73c --- /dev/null +++ b/weed/mq/kafka/TODO.md @@ -0,0 +1,38 @@ +# Kafka Gateway TODO (Concise) + +## Produce/Fetch correctness +- Implement full v2 record parsing: varints, headers, control records, timestamps +- Replace dummy Fetch batches with real per-record construction and varint encoding +- Stream large batches; avoid loading entire sets in memory; enforce maxBytes/minBytes + +## Protobuf completeness +- Resolve imports/nested types from FileDescriptorSet; support fully-qualified names +- Handle Confluent Protobuf indexes; pick correct message within descriptor +- Robust decode/encode for scalars, maps, repeated, oneof; add validation APIs + +## Protocol parsing and coordination +- Remove hardcoded topic/partition assumptions across handlers +- Properly parse arrays in OffsetCommit/OffsetFetch and other APIs +- Implement consumer protocol metadata parsing (JoinGroup/SyncGroup) + +## API versioning and errors +- Validate per-API version shapes; align validateAPIVersion with advertised ranges +- Audit error codes against Kafka spec; add missing codes (quota, auth, timeouts) + +## Schema registry and evolution +- Subject-level compatibility settings; lineage and soft-deletion +- Optional mirroring of schemas to Filer; cache TTL controls + +## Offsets and consumer groups +- Prefer SMQ native offsets end-to-end; minimize legacy timestamp translation +- ListOffsets timestamp lookups backed by SMQ; group lag/high-watermark metrics + +## Security and operations +- TLS listener config; SASL/PLAIN (then SCRAM) auth +- Backpressure, memory pooling, connection cleanup, metrics (Prometheus) + +## Tests and benchmarks +- E2E with kafka-go, Sarama, Java; compressed batches; long-disconnect CG recovery +- Performance/regression tests for compression + CRC across clients + +