Browse Source

update plans

pull/7231/head
chrislu 2 months ago
parent
commit
d0059cb65b
  1. 60
      weed/mq/KAFKA_DEV_PLAN.md
  2. 12
      weed/mq/KAFKA_PHASE3_PLAN.md
  3. 47
      weed/mq/kafka/API_VERSION_MATRIX.md
  4. 60
      weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md
  5. 38
      weed/mq/kafka/TODO.md

60
weed/mq/KAFKA_DEV_PLAN.md

@ -24,11 +24,10 @@
- Message routing: `hash(key) -> kafka partition -> ring slot -> SMQ partition covering that slot`.
- SMQ’s internal segment split/merge remains transparent; ordering is preserved per Kafka partition.
### Offset Model
- Kafka requires strictly increasing integer offsets per partition; SMQ uses timestamps.
- Maintain a per-partition offset ledger mapping `kOffset -> tsNs (+ size)`, with a sparse index for seeks.
- Earliest/latest offsets and timestamp-based lookup are served from the ledger and its index.
- Consumer group commits store Kafka offsets (not timestamps). On Fetch, offsets are translated to timestamps.
### Offset Model (updated)
- Use SMQ native per-partition sequential offsets. Kafka offsets map 1:1 to SMQ offsets.
- Earliest/latest and timestamp-based lookups come from SMQ APIs; minimize translation.
- Consumer group commits store SMQ offsets directly.
### Consumer Groups and Assignment
- Gateway implements Kafka group coordinator: Join/Sync/Heartbeat/Leave.
@ -37,8 +36,8 @@
### Protocol Coverage (initial)
- ApiVersions, Metadata, CreateTopics/DeleteTopics.
- Produce (v2+) uncompressed to start; Fetch (v2+) with wait/maxBytes semantics.
- ListOffsets (earliest/latest; timestamp in phase 2).
- Produce (v2+) with record-batch v2 parsing, compression, and CRC validation; Fetch (v2+) with wait/maxBytes semantics.
- ListOffsets (earliest/latest; timestamp in a later phase).
- FindCoordinator/JoinGroup/SyncGroup/Heartbeat/LeaveGroup.
- OffsetCommit/OffsetFetch.
@ -52,12 +51,12 @@
### Compatibility Limits (initial)
- No idempotent producers, transactions, or compaction policies.
- Compression support added in phase 2 (GZIP/Snappy/LZ4/ZSTD).
- Compression codecs (GZIP/Snappy/LZ4/ZSTD) are available via the record-batch parser.
### Milestones
- **M1**: Gateway skeleton; ApiVersions/Metadata/Create/Delete; single-partition Produce/Fetch (no compression); plaintext; initial offset ledger.
- **M1**: Gateway skeleton; ApiVersions/Metadata/Create/Delete; single-partition Produce/Fetch; plaintext; SMQ native offsets.
- **M2**: Multi-partition mapping, ListOffsets (earliest/latest), OffsetCommit/Fetch, group coordinator (Range), TLS.
- **M3**: Compression codecs, timestamp ListOffsets, Sticky assignor, SASL/PLAIN, metrics.
- **M3**: Record-batch compression codecs + CRC; timestamp ListOffsets; Sticky assignor; SASL/PLAIN; metrics.
- **M4**: SCRAM, admin HTTP, ledger compaction tooling, performance tuning.
- **M5** (optional): Idempotent producers groundwork, EOS design exploration.
@ -68,8 +67,8 @@
### Scope
- Kafka Gateway process scaffolding and configuration.
- Protocol: ApiVersions, Metadata, CreateTopics, DeleteTopics.
- Produce (single topic-partition path) and Fetch for uncompressed records.
- Basic filer-backed topic registry and offset ledger (append-only + sparse index stub).
- Produce (single topic-partition path) and Fetch using v2 record-batch parser with compression and CRC.
- Basic filer-backed topic registry; offsets via SMQ native offsets (no separate ledger files).
- Plaintext only; no consumer groups yet (direct Fetch by offset).
### Deliverables
@ -95,9 +94,9 @@
- ApiVersions: advertise minimal supported versions.
- Metadata: topics/partitions and leader endpoints (this gateway instance).
- CreateTopics/DeleteTopics: validate, persist topic metadata in filer, create SMQ topic.
- ListOffsets: earliest/latest only using the ledger bounds.
- Produce: parse record batches (uncompressed); per record compute Kafka offset; publish to SMQ; return baseOffset.
- Fetch: translate Kafka offset -> tsNs via ledger; read from SMQ starting at tsNs; return records honoring `maxBytes`/`maxWait`.
- ListOffsets: earliest/latest using SMQ bounds.
- Produce: parse v2 record batches (compressed/uncompressed), extract records, publish to SMQ; return baseOffset.
- Fetch: read from SMQ starting at requested offset; construct proper v2 record batches honoring `maxBytes`/`maxWait`.
3) Topic Registry and Mapping
- Define `meta.json` schema:
@ -105,39 +104,30 @@
- Map Kafka partition id to SMQ ring range: divide ring (4096) into `partitions` contiguous ranges.
- Enforce fixed partition count post-create.
4) Offset Ledger (M1 minimal)
- Append-only `ledger.log` entries: `varint(kOffsetDelta), varint(tsNsDelta), varint(size)` per record; batched fsync policy.
- Maintain in-memory `lastKafkaOffset` and `lastTsNs` per partition; write periodic checkpoints every N records.
- `ledger.index` sparse index format (stub in M1): record periodic `(kOffset, filePos)`.
- APIs:
- `AssignOffsets(batchCount) -> baseOffset` (reserve range atomically per partition).
- `AppendOffsets(kOffset, tsNs, size)` batched.
- `Translate(kOffset) -> tsNs` (linear forward from nearest checkpoint/index in M1).
- `Earliest()`, `Latest()` from on-disk checkpoints + tail state.
4) Offset Handling (M1)
- Use SMQ native offsets; remove separate ledger and translation in the gateway.
- Earliest/Latest come from SMQ; timestamp lookups added in later phase.
5) Produce Path
- For each topic-partition in request:
- Validate topic existence and partition id.
- Reserve offsets for all records in the batch.
- For each record: compute SMQ key/value/headers; timestamp = client-provided or broker time.
- Publish to SMQ via broker gRPC (batch if available). On success, append `(kOffset, tsNs, size)` to ledger.
- Return `baseOffset` per partition.
- Parse record-batch v2; extract records (varints/headers), handle compression and CRC.
- Publish to SMQ via broker (batch if available); SMQ assigns offsets; return `baseOffset` per partition.
6) Fetch Path (no groups)
- For each topic-partition in request:
- If offset is `-1` (latest) or `-2` (earliest), use ledger bounds.
- Translate offset to `tsNs` via ledger; start a bounded scan from SMQ at `tsNs`.
- Page results into Kafka record sets up to `maxBytes` or `minBytes`/`maxWait` semantics.
- Close scan when request satisfied; no long-lived group sessions in M1.
- If offset is `-1` (latest) or `-2` (earliest), use SMQ bounds.
- Read from SMQ starting at the requested offset; construct proper v2 record batches.
- Page results up to `maxBytes` or `minBytes`/`maxWait` semantics.
7) Metadata and SMQ Integration
- Create/delete topic maps to SMQ topic lifecycle using existing MQ APIs.
- No auto-scaling of partitions in M1 (Kafka partition count fixed).
8) Testing
- Unit tests for ledger encode/decode, earliest/latest, translate.
- Unit tests for record-batch parser (compression, CRC), earliest/latest via SMQ.
- E2E:
- sarama producer -> gateway -> SMQ; then fetch and validate ordering/offsets.
- sarama producer -> gateway -> SMQ; fetch and validate ordering/offsets.
- kafka-go fetch from earliest/latest.
- Metadata and create/delete topic via Kafka Admin client (happy path).
@ -151,6 +141,6 @@
### Open Questions / Follow-ups
- Exact `ApiVersions` and version ranges to advertise for maximal client compatibility.
- Whether to expose namespace as Kafka cluster or encode in topic names (`ns.topic`).
- Offset ledger compaction cadence and background tasks (defer to M3/M4).
- Offset state compaction not applicable in gateway; defer SMQ-side retention considerations to later phases.

12
weed/mq/KAFKA_PHASE3_PLAN.md

@ -36,8 +36,8 @@ Phase 3 transforms the Kafka Gateway from a basic producer/consumer system into
**Record Batch Improvements:**
- Full Kafka record format parsing (v0, v1, v2)
- Compression support (gzip, snappy, lz4, zstd)
- Proper CRC validation
- Compression support (gzip, snappy, lz4, zstd) — IMPLEMENTED
- Proper CRC validation — IMPLEMENTED
- Transaction markers handling
- Timestamp extraction and validation
@ -104,8 +104,8 @@ Phase 3 transforms the Kafka Gateway from a basic producer/consumer system into
4. LeaveGroup handling
### Step 4: Advanced Record Processing (2-3 days)
1. Full record batch parsing
2. Compression codec support
1. Full record parsing and real Fetch batch construction
2. Compression codecs and CRC (done) — focus on integration and tests
3. Performance optimizations
4. Memory management
@ -167,8 +167,8 @@ Consumer States:
- ✅ Automatic partition rebalancing
- ✅ Offset commit/fetch functionality
- ✅ Consumer failure handling
- ✅ Full Kafka record format support
- ✅ Compression support for major codecs
- ✅ Full Kafka record format support (v2 with real records)
- ✅ Compression support for major codecs (already available)
### Performance Requirements
- ✅ Handle 10k+ messages/second per partition

47
weed/mq/kafka/API_VERSION_MATRIX.md

@ -14,36 +14,26 @@ This document audits the advertised API versions in `handleApiVersions()` agains
| 0 | Produce | v0-v7 | v0-v7 | ✅ Match | None |
| 1 | Fetch | v0-v7 | v0-v7 | ✅ Match | None |
| 2 | ListOffsets | v0-v2 | v0-v2 | ✅ Match | None |
| 19 | CreateTopics | v0-v4 | v0-v5 | ❌ Mismatch | Update advertised to v0-v5 |
| 19 | CreateTopics | v0-v5 | v0-v5 | ✅ Match | None |
| 20 | DeleteTopics | v0-v4 | v0-v4 | ✅ Match | None |
| 11 | JoinGroup | v0-v7 | v0-v7 | ✅ Match | None |
| 14 | SyncGroup | v0-v5 | v0-v5 | ✅ Match | None |
| 8 | OffsetCommit | v0-v2 | v0-v2 | ✅ Match | None |
| 9 | OffsetFetch | v0-v2 | v0-v5+ | ❌ MAJOR Mismatch | Update advertised to v0-v5 |
| 9 | OffsetFetch | v0-v5 | v0-v5 | ✅ Match | None |
| 10 | FindCoordinator | v0-v2 | v0-v2 | ✅ Match | None |
| 12 | Heartbeat | v0-v4 | v0-v4 | ✅ Match | None |
| 13 | LeaveGroup | v0-v4 | v0-v4 | ✅ Match | None |
## Detailed Analysis
### 1. OffsetFetch API (Key 9) - CRITICAL MISMATCH
- **Advertised**: v0-v2 (max version 2)
- **Actually Implemented**: Up to v5+
- **Evidence**: `buildOffsetFetchResponse()` includes `if apiVersion >= 5` for leader epoch
- **Evidence**: `if apiVersion >= 3` for throttle time
- **Impact**: Clients may not use advanced features available in v3-v5
- **Action**: Update advertised max version from 2 to 5
### 2. CreateTopics API (Key 19) - MINOR MISMATCH
- **Advertised**: v0-v4 (max version 4)
- **Actually Implemented**: v0-v5
- **Evidence**: `handleCreateTopics()` routes v5 requests to `handleCreateTopicsV2Plus()`
- **Evidence**: Tests validate v0-v5 versions
- **Impact**: v5 clients may not connect expecting v5 support
- **Action**: Update advertised max version from 4 to 5
### 3. Validation vs Advertisement Inconsistency
The `validateAPIVersion()` function matches the advertised versions, which means it will incorrectly reject valid v3-v5 OffsetFetch requests that the handler can actually process.
### 1. OffsetFetch API (Key 9)
- Advertised and implemented aligned at v0–v5.
### 2. CreateTopics API (Key 19)
- Advertised and implemented aligned at v0–v5.
### Validation vs Advertisement Consistency
`validateAPIVersion()` and `handleApiVersions()` are consistent with the supported ranges.
## Implementation Details
@ -58,18 +48,15 @@ The `validateAPIVersion()` function matches the advertised versions, which means
## Recommendations
1. **Immediate Fix**: Update `handleApiVersions()` to advertise correct max versions
2. **Consistency Check**: Update `validateAPIVersion()` to match the corrected advertised versions
3. **Testing**: Verify that higher version clients can successfully connect and use advanced features
4. **Documentation**: Update any client guidance to reflect the correct supported versions
1. Re-verify after protocol changes to keep the matrix accurate
2. Extend coverage as implementations grow; keep tests for version guards
## Test Verification Needed
## Test Verification
After fixes:
1. Test OffsetFetch v3, v4, v5 with real Kafka clients
2. Test CreateTopics v5 with real Kafka clients
3. Verify throttle_time_ms and leader_epoch are correctly populated
4. Ensure version validation doesn't reject valid higher version requests
1. Exercise OffsetFetch v3–v5 with kafka-go and Sarama
2. Exercise CreateTopics v5; verify fields and tagged fields
3. Verify throttle_time_ms and leader_epoch population
4. Ensure version validation permits supported versions only
## Conservative Alternative

60
weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md

@ -8,21 +8,19 @@ This document identifies areas in the current Kafka implementation that need att
### HIGH PRIORITY - Protocol Breaking Issues
#### 1. **Record Batch Parsing (Produce API)**
**File**: `protocol/produce.go`
**Issues**:
- `parseRecordSet()` uses simplified parsing logic that doesn't handle the full Kafka record batch format
- Hardcoded assumptions about record batch structure
- Missing compression support (gzip, snappy, lz4, zstd)
- CRC validation is completely missing
**Files**: `protocol/record_batch_parser.go`, `protocol/produce.go`
**Status**:
- Compression implemented (gzip, snappy, lz4, zstd)
- CRC validation available in `ParseRecordBatchWithValidation`
**Remaining**:
- Parse individual records (varints, headers, control records, timestamps)
- Integrate real record extraction into Produce path (replace simplified fallbacks)
**TODOs**:
```go
// TODO: Implement full Kafka record batch parsing
// - Support all record batch versions (v0, v1, v2)
// - Handle compression codecs (gzip, snappy, lz4, zstd)
// - Validate CRC32 checksums
// - Parse individual record headers, keys, values, timestamps
// - Handle transaction markers and control records
// TODO: Implement full record parsing for v2 batches
// - Varint decode for per-record fields and headers
// - Control records and transaction markers
// - Accurate timestamp handling
```
#### 2. **Request Parsing Assumptions**
@ -49,27 +47,25 @@ This document identifies areas in the current Kafka implementation that need att
#### 3. **Fetch Record Construction**
**File**: `protocol/fetch.go`
**Issues**:
- `constructRecordBatch()` creates fake record batches with dummy data
- Varint encoding is simplified to single bytes (incorrect)
- Missing proper record headers, timestamps, and metadata
**Status**:
- Basic batching works; needs proper record encoding
**Remaining**:
- Build real record batches from stored records with proper varints and headers
- Honor timestamps and per-record metadata
**TODOs**:
```go
// TODO: Replace dummy record batch construction with real data
// - Read actual message data from SeaweedMQ/storage
// - Implement proper varint encoding/decoding
// - Support record headers and custom timestamps
// - Handle different record batch versions correctly
// TODO: Implement real batch construction for Fetch
// - Proper varint encode for lengths and deltas
// - Include headers and correct timestamps
// - Support v2 record batch layout end-to-end
```
### MEDIUM PRIORITY - Compatibility Issues
#### 4. **API Version Support**
**File**: `protocol/handler.go`
**Issues**:
- ApiVersions response advertises max versions but implementations may not support all features
- No version-specific handling in most APIs
**Status**: Advertised ranges updated to match implemented (e.g., CreateTopics v5, OffsetFetch v5)
**Remaining**: Maintain alignment as handlers evolve; add stricter per-version validation paths
**TODOs**:
```go
@ -140,7 +136,7 @@ This document identifies areas in the current Kafka implementation that need att
**Files**: `protocol/produce.go`, `protocol/fetch.go`
**Issues**:
- Simplified timestamp handling
- Offset assignment may not match Kafka behavior exactly
**Note**: Offset assignment is handled by SMQ native offsets; ensure Kafka-visible semantics map correctly
**TODOs**:
```go
@ -234,12 +230,10 @@ This document identifies areas in the current Kafka implementation that need att
## Immediate Action Items
### Phase 4 Priority List:
1. **Fix Record Batch Parsing** - Critical for real client compatibility
2. **Implement Proper Request Parsing** - Remove hardcoded assumptions
3. **Add Compression Support** - Essential for performance
4. **Real SeaweedMQ Integration** - Move beyond placeholder data
5. **Consumer Protocol Metadata** - Fix subscription handling
6. **API Version Handling** - Support multiple protocol versions
1. Full record parsing for Produce (v2) and real Fetch batch construction
2. Proper request parsing (arrays) in OffsetCommit/OffsetFetch and group APIs
3. Consumer protocol metadata parsing (Join/Sync)
4. Maintain API version alignment and validation
### Compatibility Validation:
1. **Test with kafka-go library** - Most popular Go Kafka client

38
weed/mq/kafka/TODO.md

@ -0,0 +1,38 @@
# Kafka Gateway TODO (Concise)
## Produce/Fetch correctness
- Implement full v2 record parsing: varints, headers, control records, timestamps
- Replace dummy Fetch batches with real per-record construction and varint encoding
- Stream large batches; avoid loading entire sets in memory; enforce maxBytes/minBytes
## Protobuf completeness
- Resolve imports/nested types from FileDescriptorSet; support fully-qualified names
- Handle Confluent Protobuf indexes; pick correct message within descriptor
- Robust decode/encode for scalars, maps, repeated, oneof; add validation APIs
## Protocol parsing and coordination
- Remove hardcoded topic/partition assumptions across handlers
- Properly parse arrays in OffsetCommit/OffsetFetch and other APIs
- Implement consumer protocol metadata parsing (JoinGroup/SyncGroup)
## API versioning and errors
- Validate per-API version shapes; align validateAPIVersion with advertised ranges
- Audit error codes against Kafka spec; add missing codes (quota, auth, timeouts)
## Schema registry and evolution
- Subject-level compatibility settings; lineage and soft-deletion
- Optional mirroring of schemas to Filer; cache TTL controls
## Offsets and consumer groups
- Prefer SMQ native offsets end-to-end; minimize legacy timestamp translation
- ListOffsets timestamp lookups backed by SMQ; group lag/high-watermark metrics
## Security and operations
- TLS listener config; SASL/PLAIN (then SCRAM) auth
- Backpressure, memory pooling, connection cleanup, metrics (Prometheus)
## Tests and benchmarks
- E2E with kafka-go, Sarama, Java; compressed batches; long-disconnect CG recovery
- Performance/regression tests for compression + CRC across clients
Loading…
Cancel
Save