diff --git a/KAFKA_SMQ_INTEGRATION_SUMMARY.md b/KAFKA_SMQ_INTEGRATION_SUMMARY.md index b2f71bbd6..54c336a50 100644 --- a/KAFKA_SMQ_INTEGRATION_SUMMARY.md +++ b/KAFKA_SMQ_INTEGRATION_SUMMARY.md @@ -1,246 +1,153 @@ # Kafka-SMQ Integration Implementation Summary -## 🎯 **Overview** +## Overview -This implementation provides **full ledger persistence** and **complete SMQ integration** for the Kafka Gateway, solving the critical offset persistence problem and enabling production-ready Kafka-to-SeaweedMQ bridging. +This implementation provides **SMQ native offset management** for the Kafka Gateway, leveraging SeaweedMQ's existing distributed infrastructure instead of building custom persistence layers. -## 📋 **Completed Components** +## Recommended Approach: SMQ Native Offset Management + +### Core Concept + +Instead of implementing separate consumer offset persistence, leverage SMQ's existing distributed offset management system where offsets are replicated across brokers and survive client disconnections and broker failures. + +### Architecture -### 1. **Offset Ledger Persistence** ✅ -- **File**: `weed/mq/kafka/offset/persistence.go` -- **Features**: - - `SeaweedMQStorage`: Persistent storage backend using SMQ - - `PersistentLedger`: Extends base ledger with automatic persistence - - Offset mappings stored in dedicated SMQ topic: `kafka-system/offset-mappings` - - Automatic ledger restoration on startup - - Thread-safe operations with proper locking - -### 2. **Kafka-SMQ Offset Mapping** ✅ -- **File**: `weed/mq/kafka/offset/smq_mapping.go` -- **Features**: - - `KafkaToSMQMapper`: Bidirectional offset conversion - - Kafka partitions → SMQ ring ranges (32 slots per partition) - - Special offset handling (-1 = LATEST, -2 = EARLIEST) - - Comprehensive validation and debugging tools - - Time-based offset queries - -### 3. **SMQ Publisher Integration** ✅ -- **File**: `weed/mq/kafka/integration/smq_publisher.go` -- **Features**: - - `SMQPublisher`: Full Kafka message publishing to SMQ - - Automatic offset assignment and tracking - - Kafka metadata enrichment (`_kafka_offset`, `_kafka_partition`, `_kafka_timestamp`) - - Per-topic SMQ publishers with enhanced record types - - Comprehensive statistics and monitoring - -### 4. **SMQ Subscriber Integration** ✅ -- **File**: `weed/mq/kafka/integration/smq_subscriber.go` -- **Features**: - - `SMQSubscriber`: Kafka fetch requests via SMQ subscriptions - - Message format conversion (SMQ → Kafka) - - Consumer group management - - Offset commit handling - - Message buffering and timeout handling - -### 5. **Persistent Handler** ✅ -- **File**: `weed/mq/kafka/integration/persistent_handler.go` -- **Features**: - - `PersistentKafkaHandler`: Complete Kafka protocol handler - - Unified interface for produce/fetch operations - - Topic management with persistent ledgers - - Comprehensive statistics and monitoring - - Graceful shutdown and resource management - -### 6. **Comprehensive Testing** ✅ -- **File**: `test/kafka/persistent_offset_integration_test.go` -- **Test Coverage**: - - Offset persistence and recovery - - SMQ publisher integration - - SMQ subscriber integration - - End-to-end publish-subscribe workflows - - Offset mapping consistency validation - -## 🔧 **Key Technical Features** - -### **Offset Persistence Architecture** ``` -Kafka Offset (Sequential) ←→ SMQ Timestamp (Nanoseconds) + Ring Range - 0 ←→ 1757639923746423000 + [0-31] - 1 ←→ 1757639923746424000 + [0-31] - 2 ←→ 1757639923746425000 + [0-31] +┌─ Kafka Gateway ─────────────────────────────────────┐ +│ ┌─ Kafka Abstraction ──────────────────────────────┐ │ +│ │ • Presents Kafka-style sequential offsets │ │ +│ │ • Translates to/from SMQ consumer groups │ │ +│ └──────────────────────────────────────────────────┘ │ +│ ↕ │ +│ ┌─ SMQ Native Offset Management ──────────────────┐ │ +│ │ • Consumer groups with SMQ semantics │ │ +│ │ • Distributed offset tracking │ │ +│ │ • Automatic replication & failover │ │ +│ │ • Built-in persistence │ │ +│ └──────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────┘ ``` -### **SMQ Storage Schema** -- **Offset Mappings Topic**: `kafka-system/offset-mappings` -- **Message Topics**: `kafka/{original-topic-name}` -- **Metadata Fields**: `_kafka_offset`, `_kafka_partition`, `_kafka_timestamp` - -### **Partition Mapping** -```go -// Kafka partition → SMQ ring range -SMQRangeStart = KafkaPartition * 32 -SMQRangeStop = (KafkaPartition + 1) * 32 - 1 - -Examples: -Kafka Partition 0 → SMQ Range [0, 31] -Kafka Partition 1 → SMQ Range [32, 63] -Kafka Partition 15 → SMQ Range [480, 511] +## Key Components + +### 1. Message Offset Persistence (Implemented) +- **File**: `weed/mq/kafka/offset/persistence.go` +- **Purpose**: Maps Kafka sequential offsets to SMQ timestamps +- **Storage**: `kafka-system/offset-mappings` topic +- **Status**: Fully implemented and working + +### 2. SMQ Native Consumer Groups (Recommended) +- **File**: `weed/mq/kafka/offset/smq_native_offset_management.go` +- **Purpose**: Use SMQ's built-in consumer group offset management +- **Benefits**: + - Automatic persistence and replication + - Survives broker failures and restarts + - No custom persistence code needed + - Leverages battle-tested SMQ infrastructure + +### 3. Offset Translation Layer +- **Purpose**: Lightweight cache for Kafka offset ↔ SMQ timestamp mapping +- **Implementation**: In-memory cache with LRU eviction +- **Scope**: Only recent mappings needed for active consumers + +## Kafka to SMQ Mapping + +| Kafka Concept | SMQ Concept | Implementation | +|---------------|-------------|----------------| +| Consumer Group | SMQ Consumer Group | Direct 1:1 mapping with "kafka-cg-" prefix | +| Topic-Partition | SMQ Topic-Partition Range | Map Kafka partition N to SMQ ring [N*32, (N+1)*32-1] | +| Sequential Offset | SMQ Timestamp + Index | Use SMQ's natural timestamp ordering | +| Offset Commit | SMQ Consumer Position | Let SMQ track position natively | +| Offset Fetch | SMQ Consumer Status | Query SMQ for current position | + +## Long-Term Disconnection Handling + +### Scenario: 30-Day Disconnection ``` +Day 0: Consumer reads to offset 1000, disconnects + → SMQ consumer group position: timestamp T1000 -## 🚀 **Usage Examples** +Day 1-30: System continues, messages 1001-5000 arrive + → SMQ stores messages with timestamps T1001-T5000 + → SMQ preserves consumer group position at T1000 -### **Creating a Persistent Handler** -```go -handler, err := integration.NewPersistentKafkaHandler([]string{"localhost:17777"}) -if err != nil { - log.Fatal(err) -} -defer handler.Close() -``` +Day 15: Kafka Gateway restarts + → In-memory cache lost + → SMQ consumer group position PRESERVED -### **Publishing Messages** -```go -record := &schema_pb.RecordValue{ - Fields: map[string]*schema_pb.Value{ - "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}}, - "action": {Kind: &schema_pb.Value_StringValue{StringValue: "login"}}, - }, -} - -offset, err := handler.ProduceMessage("user-events", 0, []byte("key1"), record, recordType) -// Returns: offset=0 (first message) +Day 30: Consumer reconnects + → SMQ automatically resumes from position T1000 + → Consumer receives messages starting from offset 1001 + → Perfect resumption, no manual recovery needed ``` -### **Fetching Messages** -```go -messages, err := handler.FetchMessages("user-events", 0, 0, 1024*1024, "my-consumer-group") -// Returns: All messages from offset 0 onwards -``` +## Benefits vs Custom Persistence -### **Offset Queries** -```go -highWaterMark, _ := handler.GetHighWaterMark("user-events", 0) -earliestOffset, _ := handler.GetEarliestOffset("user-events", 0) -latestOffset, _ := handler.GetLatestOffset("user-events", 0) -``` +| Aspect | Custom Persistence | SMQ Native | +|--------|-------------------|------------| +| Implementation | High complexity (dual systems) | Medium (SMQ integration) | +| Reliability | Custom code risks | Battle-tested SMQ | +| Performance | Dual writes penalty | Native SMQ speed | +| Operational | Monitor 2 systems | Single system | +| Maintenance | Custom bugs/fixes | SMQ team handles | +| Scalability | Custom scaling | SMQ distributed arch | -## 📊 **Performance Characteristics** - -### **Offset Mapping Performance** -- **Kafka→SMQ**: O(log n) lookup via binary search -- **SMQ→Kafka**: O(log n) lookup via binary search -- **Memory Usage**: ~32 bytes per offset entry -- **Persistence**: Asynchronous writes to SMQ - -### **Message Throughput** -- **Publishing**: Limited by SMQ publisher throughput -- **Fetching**: Buffered with configurable window size -- **Offset Tracking**: Minimal overhead (~1% of message processing) - -## 🔄 **Restart Recovery Process** - -1. **Handler Startup**: - - Creates `SeaweedMQStorage` connection - - Initializes SMQ publisher/subscriber clients - -2. **Ledger Recovery**: - - Queries `kafka-system/offset-mappings` topic - - Reconstructs offset ledgers from persisted mappings - - Sets `nextOffset` to highest found offset + 1 - -3. **Message Continuity**: - - New messages get sequential offsets starting from recovered high water mark - - Existing consumer groups can resume from committed offsets - - No offset gaps or duplicates - -## 🛡️ **Error Handling & Resilience** - -### **Persistence Failures** -- Offset mappings are persisted **before** in-memory updates -- Failed persistence prevents offset assignment -- Automatic retry with exponential backoff - -### **SMQ Connection Issues** -- Graceful degradation with error propagation -- Connection pooling and automatic reconnection -- Circuit breaker pattern for persistent failures - -### **Offset Consistency** -- Validation checks for sequential offsets -- Monotonic timestamp verification -- Comprehensive mapping consistency tests - -## 🔍 **Monitoring & Debugging** - -### **Statistics API** -```go -stats := handler.GetStats() -// Returns comprehensive metrics: -// - Topic count and partition info -// - Ledger entry counts and time ranges -// - High water marks and offset ranges -``` +## Implementation Roadmap -### **Offset Mapping Info** -```go -mapper := offset.NewKafkaToSMQMapper(ledger) -info, err := mapper.GetMappingInfo(kafkaOffset, kafkaPartition) -// Returns detailed mapping information for debugging -``` +### Phase 1: SMQ API Assessment (1-2 weeks) +- Verify SMQ consumer group persistence behavior +- Test `RESUME_OR_EARLIEST` functionality +- Confirm replication across brokers -### **Validation Tools** -```go -err := mapper.ValidateMapping(topic, partition) -// Checks offset sequence and timestamp monotonicity -``` +### Phase 2: Consumer Group Integration (2-3 weeks) +- Implement SMQ consumer group wrapper +- Map Kafka consumer lifecycle to SMQ +- Handle consumer group coordination -## 🎯 **Production Readiness** +### Phase 3: Offset Translation (2-3 weeks) +- Lightweight cache for Kafka ↔ SMQ offset mapping +- Handle edge cases (rebalancing, etc.) +- Performance optimization -### **✅ Completed Features** -- ✅ Full offset persistence across restarts -- ✅ Bidirectional Kafka-SMQ offset mapping -- ✅ Complete SMQ publisher/subscriber integration -- ✅ Consumer group offset management -- ✅ Comprehensive error handling -- ✅ Thread-safe operations -- ✅ Extensive test coverage -- ✅ Performance monitoring -- ✅ Graceful shutdown +### Phase 4: Integration & Testing (3-4 weeks) +- Replace current offset management +- Comprehensive testing with long disconnections +- Performance benchmarking -### **🔧 Integration Points** -- **Kafka Protocol Handler**: Replace in-memory ledgers with `PersistentLedger` -- **Produce Path**: Use `SMQPublisher.PublishMessage()` -- **Fetch Path**: Use `SMQSubscriber.FetchMessages()` -- **Offset APIs**: Use `handler.GetHighWaterMark()`, etc. +**Total Estimated Effort: ~10 weeks** -## 📈 **Next Steps for Production** +## Current Status -1. **Replace Existing Handler**: - ```go - // Replace current handler initialization - handler := integration.NewPersistentKafkaHandler(brokers) - ``` +### Completed +- Message offset persistence (Kafka offset → SMQ timestamp mapping) +- SMQ publishing integration with offset tracking +- SMQ subscription integration with offset mapping +- Comprehensive integration tests +- Architecture design for SMQ native approach -2. **Update Protocol Handlers**: - - Modify `handleProduce()` to use `handler.ProduceMessage()` - - Modify `handleFetch()` to use `handler.FetchMessages()` - - Update offset APIs to use persistent ledgers +### Next Steps +1. Assess SMQ consumer group APIs +2. Implement SMQ native consumer group wrapper +3. Replace current in-memory offset management +4. Test long-term disconnection scenarios -3. **Configuration**: - - Add SMQ broker configuration - - Configure offset persistence intervals - - Set up monitoring and alerting +## Key Advantages -4. **Testing**: - - Run integration tests with real SMQ cluster - - Perform restart recovery testing - - Load testing with persistent offsets +- **Leverage Existing Infrastructure**: Use SMQ's proven distributed systems +- **Reduced Complexity**: No custom persistence layer to maintain +- **Better Reliability**: Inherit SMQ's fault tolerance and replication +- **Operational Simplicity**: One system to monitor instead of two +- **Performance**: Native SMQ operations without translation overhead +- **Automatic Edge Cases**: SMQ handles network partitions, broker failures, etc. -## 🎉 **Summary** +## Recommendation -This implementation **completely solves** the offset persistence problem identified earlier: +The **SMQ Native Offset Management** approach is strongly recommended over custom persistence because: -- ❌ **Before**: "Handler restarts reset offset counters (expected in current implementation)" -- ✅ **After**: "Handler restarts restore offset counters from SMQ persistence" +1. **Simpler Implementation**: ~10 weeks vs 15+ weeks for custom persistence +2. **Higher Reliability**: Leverage battle-tested SMQ infrastructure +3. **Better Performance**: Native operations without dual-write penalty +4. **Lower Maintenance**: SMQ team handles distributed systems complexity +5. **Operational Benefits**: Single system to monitor and maintain -The Kafka Gateway now provides **production-ready** offset management with full SMQ integration, enabling seamless Kafka client compatibility while leveraging SeaweedMQ's distributed storage capabilities. +This approach solves the critical offset persistence problem while minimizing implementation complexity and maximizing reliability. diff --git a/weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md b/weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md index 33f7b998e..71ef408f5 100644 --- a/weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md +++ b/weed/mq/kafka/PROTOCOL_COMPATIBILITY_REVIEW.md @@ -5,7 +5,7 @@ This document identifies areas in the current Kafka implementation that need att ## Critical Protocol Issues -### 🚨 HIGH PRIORITY - Protocol Breaking Issues +### HIGH PRIORITY - Protocol Breaking Issues #### 1. **Record Batch Parsing (Produce API)** **File**: `protocol/produce.go` @@ -63,7 +63,7 @@ This document identifies areas in the current Kafka implementation that need att // - Handle different record batch versions correctly ``` -### ⚠️ MEDIUM PRIORITY - Compatibility Issues +### MEDIUM PRIORITY - Compatibility Issues #### 4. **API Version Support** **File**: `protocol/handler.go` @@ -116,7 +116,7 @@ This document identifies areas in the current Kafka implementation that need att // - Security/authorization errors ``` -### 🔧 LOW PRIORITY - Implementation Completeness +### LOW PRIORITY - Implementation Completeness #### 7. **Connection Management** **File**: `protocol/handler.go`