2 changed files with 126 additions and 219 deletions
@ -1,246 +1,153 @@ |
|||||
# Kafka-SMQ Integration Implementation Summary |
# Kafka-SMQ Integration Implementation Summary |
||||
|
|
||||
## 🎯 **Overview** |
|
||||
|
## Overview |
||||
|
|
||||
This implementation provides **full ledger persistence** and **complete SMQ integration** for the Kafka Gateway, solving the critical offset persistence problem and enabling production-ready Kafka-to-SeaweedMQ bridging. |
|
||||
|
This implementation provides **SMQ native offset management** for the Kafka Gateway, leveraging SeaweedMQ's existing distributed infrastructure instead of building custom persistence layers. |
||||
|
|
||||
## 📋 **Completed Components** |
|
||||
|
## Recommended Approach: SMQ Native Offset Management |
||||
|
|
||||
|
### Core Concept |
||||
|
|
||||
|
Instead of implementing separate consumer offset persistence, leverage SMQ's existing distributed offset management system where offsets are replicated across brokers and survive client disconnections and broker failures. |
||||
|
|
||||
|
### Architecture |
||||
|
|
||||
### 1. **Offset Ledger Persistence** ✅ |
|
||||
- **File**: `weed/mq/kafka/offset/persistence.go` |
|
||||
- **Features**: |
|
||||
- `SeaweedMQStorage`: Persistent storage backend using SMQ |
|
||||
- `PersistentLedger`: Extends base ledger with automatic persistence |
|
||||
- Offset mappings stored in dedicated SMQ topic: `kafka-system/offset-mappings` |
|
||||
- Automatic ledger restoration on startup |
|
||||
- Thread-safe operations with proper locking |
|
||||
|
|
||||
### 2. **Kafka-SMQ Offset Mapping** ✅ |
|
||||
- **File**: `weed/mq/kafka/offset/smq_mapping.go` |
|
||||
- **Features**: |
|
||||
- `KafkaToSMQMapper`: Bidirectional offset conversion |
|
||||
- Kafka partitions → SMQ ring ranges (32 slots per partition) |
|
||||
- Special offset handling (-1 = LATEST, -2 = EARLIEST) |
|
||||
- Comprehensive validation and debugging tools |
|
||||
- Time-based offset queries |
|
||||
|
|
||||
### 3. **SMQ Publisher Integration** ✅ |
|
||||
- **File**: `weed/mq/kafka/integration/smq_publisher.go` |
|
||||
- **Features**: |
|
||||
- `SMQPublisher`: Full Kafka message publishing to SMQ |
|
||||
- Automatic offset assignment and tracking |
|
||||
- Kafka metadata enrichment (`_kafka_offset`, `_kafka_partition`, `_kafka_timestamp`) |
|
||||
- Per-topic SMQ publishers with enhanced record types |
|
||||
- Comprehensive statistics and monitoring |
|
||||
|
|
||||
### 4. **SMQ Subscriber Integration** ✅ |
|
||||
- **File**: `weed/mq/kafka/integration/smq_subscriber.go` |
|
||||
- **Features**: |
|
||||
- `SMQSubscriber`: Kafka fetch requests via SMQ subscriptions |
|
||||
- Message format conversion (SMQ → Kafka) |
|
||||
- Consumer group management |
|
||||
- Offset commit handling |
|
||||
- Message buffering and timeout handling |
|
||||
|
|
||||
### 5. **Persistent Handler** ✅ |
|
||||
- **File**: `weed/mq/kafka/integration/persistent_handler.go` |
|
||||
- **Features**: |
|
||||
- `PersistentKafkaHandler`: Complete Kafka protocol handler |
|
||||
- Unified interface for produce/fetch operations |
|
||||
- Topic management with persistent ledgers |
|
||||
- Comprehensive statistics and monitoring |
|
||||
- Graceful shutdown and resource management |
|
||||
|
|
||||
### 6. **Comprehensive Testing** ✅ |
|
||||
- **File**: `test/kafka/persistent_offset_integration_test.go` |
|
||||
- **Test Coverage**: |
|
||||
- Offset persistence and recovery |
|
||||
- SMQ publisher integration |
|
||||
- SMQ subscriber integration |
|
||||
- End-to-end publish-subscribe workflows |
|
||||
- Offset mapping consistency validation |
|
||||
|
|
||||
## 🔧 **Key Technical Features** |
|
||||
|
|
||||
### **Offset Persistence Architecture** |
|
||||
``` |
``` |
||||
Kafka Offset (Sequential) ←→ SMQ Timestamp (Nanoseconds) + Ring Range |
|
||||
0 ←→ 1757639923746423000 + [0-31] |
|
||||
1 ←→ 1757639923746424000 + [0-31] |
|
||||
2 ←→ 1757639923746425000 + [0-31] |
|
||||
|
┌─ Kafka Gateway ─────────────────────────────────────┐ |
||||
|
│ ┌─ Kafka Abstraction ──────────────────────────────┐ │ |
||||
|
│ │ • Presents Kafka-style sequential offsets │ │ |
||||
|
│ │ • Translates to/from SMQ consumer groups │ │ |
||||
|
│ └──────────────────────────────────────────────────┘ │ |
||||
|
│ ↕ │ |
||||
|
│ ┌─ SMQ Native Offset Management ──────────────────┐ │ |
||||
|
│ │ • Consumer groups with SMQ semantics │ │ |
||||
|
│ │ • Distributed offset tracking │ │ |
||||
|
│ │ • Automatic replication & failover │ │ |
||||
|
│ │ • Built-in persistence │ │ |
||||
|
│ └──────────────────────────────────────────────────┘ │ |
||||
|
└─────────────────────────────────────────────────────┘ |
||||
``` |
``` |
||||
|
|
||||
### **SMQ Storage Schema** |
|
||||
- **Offset Mappings Topic**: `kafka-system/offset-mappings` |
|
||||
- **Message Topics**: `kafka/{original-topic-name}` |
|
||||
- **Metadata Fields**: `_kafka_offset`, `_kafka_partition`, `_kafka_timestamp` |
|
||||
|
|
||||
### **Partition Mapping** |
|
||||
```go |
|
||||
// Kafka partition → SMQ ring range |
|
||||
SMQRangeStart = KafkaPartition * 32 |
|
||||
SMQRangeStop = (KafkaPartition + 1) * 32 - 1 |
|
||||
|
|
||||
Examples: |
|
||||
Kafka Partition 0 → SMQ Range [0, 31] |
|
||||
Kafka Partition 1 → SMQ Range [32, 63] |
|
||||
Kafka Partition 15 → SMQ Range [480, 511] |
|
||||
|
## Key Components |
||||
|
|
||||
|
### 1. Message Offset Persistence (Implemented) |
||||
|
- **File**: `weed/mq/kafka/offset/persistence.go` |
||||
|
- **Purpose**: Maps Kafka sequential offsets to SMQ timestamps |
||||
|
- **Storage**: `kafka-system/offset-mappings` topic |
||||
|
- **Status**: Fully implemented and working |
||||
|
|
||||
|
### 2. SMQ Native Consumer Groups (Recommended) |
||||
|
- **File**: `weed/mq/kafka/offset/smq_native_offset_management.go` |
||||
|
- **Purpose**: Use SMQ's built-in consumer group offset management |
||||
|
- **Benefits**: |
||||
|
- Automatic persistence and replication |
||||
|
- Survives broker failures and restarts |
||||
|
- No custom persistence code needed |
||||
|
- Leverages battle-tested SMQ infrastructure |
||||
|
|
||||
|
### 3. Offset Translation Layer |
||||
|
- **Purpose**: Lightweight cache for Kafka offset ↔ SMQ timestamp mapping |
||||
|
- **Implementation**: In-memory cache with LRU eviction |
||||
|
- **Scope**: Only recent mappings needed for active consumers |
||||
|
|
||||
|
## Kafka to SMQ Mapping |
||||
|
|
||||
|
| Kafka Concept | SMQ Concept | Implementation | |
||||
|
|---------------|-------------|----------------| |
||||
|
| Consumer Group | SMQ Consumer Group | Direct 1:1 mapping with "kafka-cg-" prefix | |
||||
|
| Topic-Partition | SMQ Topic-Partition Range | Map Kafka partition N to SMQ ring [N*32, (N+1)*32-1] | |
||||
|
| Sequential Offset | SMQ Timestamp + Index | Use SMQ's natural timestamp ordering | |
||||
|
| Offset Commit | SMQ Consumer Position | Let SMQ track position natively | |
||||
|
| Offset Fetch | SMQ Consumer Status | Query SMQ for current position | |
||||
|
|
||||
|
## Long-Term Disconnection Handling |
||||
|
|
||||
|
### Scenario: 30-Day Disconnection |
||||
``` |
``` |
||||
|
Day 0: Consumer reads to offset 1000, disconnects |
||||
|
→ SMQ consumer group position: timestamp T1000 |
||||
|
|
||||
## 🚀 **Usage Examples** |
|
||||
|
Day 1-30: System continues, messages 1001-5000 arrive |
||||
|
→ SMQ stores messages with timestamps T1001-T5000 |
||||
|
→ SMQ preserves consumer group position at T1000 |
||||
|
|
||||
### **Creating a Persistent Handler** |
|
||||
```go |
|
||||
handler, err := integration.NewPersistentKafkaHandler([]string{"localhost:17777"}) |
|
||||
if err != nil { |
|
||||
log.Fatal(err) |
|
||||
} |
|
||||
defer handler.Close() |
|
||||
``` |
|
||||
|
Day 15: Kafka Gateway restarts |
||||
|
→ In-memory cache lost |
||||
|
→ SMQ consumer group position PRESERVED |
||||
|
|
||||
### **Publishing Messages** |
|
||||
```go |
|
||||
record := &schema_pb.RecordValue{ |
|
||||
Fields: map[string]*schema_pb.Value{ |
|
||||
"user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}}, |
|
||||
"action": {Kind: &schema_pb.Value_StringValue{StringValue: "login"}}, |
|
||||
}, |
|
||||
} |
|
||||
|
|
||||
offset, err := handler.ProduceMessage("user-events", 0, []byte("key1"), record, recordType) |
|
||||
// Returns: offset=0 (first message) |
|
||||
|
Day 30: Consumer reconnects |
||||
|
→ SMQ automatically resumes from position T1000 |
||||
|
→ Consumer receives messages starting from offset 1001 |
||||
|
→ Perfect resumption, no manual recovery needed |
||||
``` |
``` |
||||
|
|
||||
### **Fetching Messages** |
|
||||
```go |
|
||||
messages, err := handler.FetchMessages("user-events", 0, 0, 1024*1024, "my-consumer-group") |
|
||||
// Returns: All messages from offset 0 onwards |
|
||||
``` |
|
||||
|
## Benefits vs Custom Persistence |
||||
|
|
||||
### **Offset Queries** |
|
||||
```go |
|
||||
highWaterMark, _ := handler.GetHighWaterMark("user-events", 0) |
|
||||
earliestOffset, _ := handler.GetEarliestOffset("user-events", 0) |
|
||||
latestOffset, _ := handler.GetLatestOffset("user-events", 0) |
|
||||
``` |
|
||||
|
| Aspect | Custom Persistence | SMQ Native | |
||||
|
|--------|-------------------|------------| |
||||
|
| Implementation | High complexity (dual systems) | Medium (SMQ integration) | |
||||
|
| Reliability | Custom code risks | Battle-tested SMQ | |
||||
|
| Performance | Dual writes penalty | Native SMQ speed | |
||||
|
| Operational | Monitor 2 systems | Single system | |
||||
|
| Maintenance | Custom bugs/fixes | SMQ team handles | |
||||
|
| Scalability | Custom scaling | SMQ distributed arch | |
||||
|
|
||||
## 📊 **Performance Characteristics** |
|
||||
|
|
||||
### **Offset Mapping Performance** |
|
||||
- **Kafka→SMQ**: O(log n) lookup via binary search |
|
||||
- **SMQ→Kafka**: O(log n) lookup via binary search |
|
||||
- **Memory Usage**: ~32 bytes per offset entry |
|
||||
- **Persistence**: Asynchronous writes to SMQ |
|
||||
|
|
||||
### **Message Throughput** |
|
||||
- **Publishing**: Limited by SMQ publisher throughput |
|
||||
- **Fetching**: Buffered with configurable window size |
|
||||
- **Offset Tracking**: Minimal overhead (~1% of message processing) |
|
||||
|
|
||||
## 🔄 **Restart Recovery Process** |
|
||||
|
|
||||
1. **Handler Startup**: |
|
||||
- Creates `SeaweedMQStorage` connection |
|
||||
- Initializes SMQ publisher/subscriber clients |
|
||||
|
|
||||
2. **Ledger Recovery**: |
|
||||
- Queries `kafka-system/offset-mappings` topic |
|
||||
- Reconstructs offset ledgers from persisted mappings |
|
||||
- Sets `nextOffset` to highest found offset + 1 |
|
||||
|
|
||||
3. **Message Continuity**: |
|
||||
- New messages get sequential offsets starting from recovered high water mark |
|
||||
- Existing consumer groups can resume from committed offsets |
|
||||
- No offset gaps or duplicates |
|
||||
|
|
||||
## 🛡️ **Error Handling & Resilience** |
|
||||
|
|
||||
### **Persistence Failures** |
|
||||
- Offset mappings are persisted **before** in-memory updates |
|
||||
- Failed persistence prevents offset assignment |
|
||||
- Automatic retry with exponential backoff |
|
||||
|
|
||||
### **SMQ Connection Issues** |
|
||||
- Graceful degradation with error propagation |
|
||||
- Connection pooling and automatic reconnection |
|
||||
- Circuit breaker pattern for persistent failures |
|
||||
|
|
||||
### **Offset Consistency** |
|
||||
- Validation checks for sequential offsets |
|
||||
- Monotonic timestamp verification |
|
||||
- Comprehensive mapping consistency tests |
|
||||
|
|
||||
## 🔍 **Monitoring & Debugging** |
|
||||
|
|
||||
### **Statistics API** |
|
||||
```go |
|
||||
stats := handler.GetStats() |
|
||||
// Returns comprehensive metrics: |
|
||||
// - Topic count and partition info |
|
||||
// - Ledger entry counts and time ranges |
|
||||
// - High water marks and offset ranges |
|
||||
``` |
|
||||
|
## Implementation Roadmap |
||||
|
|
||||
### **Offset Mapping Info** |
|
||||
```go |
|
||||
mapper := offset.NewKafkaToSMQMapper(ledger) |
|
||||
info, err := mapper.GetMappingInfo(kafkaOffset, kafkaPartition) |
|
||||
// Returns detailed mapping information for debugging |
|
||||
``` |
|
||||
|
### Phase 1: SMQ API Assessment (1-2 weeks) |
||||
|
- Verify SMQ consumer group persistence behavior |
||||
|
- Test `RESUME_OR_EARLIEST` functionality |
||||
|
- Confirm replication across brokers |
||||
|
|
||||
### **Validation Tools** |
|
||||
```go |
|
||||
err := mapper.ValidateMapping(topic, partition) |
|
||||
// Checks offset sequence and timestamp monotonicity |
|
||||
``` |
|
||||
|
### Phase 2: Consumer Group Integration (2-3 weeks) |
||||
|
- Implement SMQ consumer group wrapper |
||||
|
- Map Kafka consumer lifecycle to SMQ |
||||
|
- Handle consumer group coordination |
||||
|
|
||||
## 🎯 **Production Readiness** |
|
||||
|
|
||||
### **✅ Completed Features** |
|
||||
- ✅ Full offset persistence across restarts |
|
||||
- ✅ Bidirectional Kafka-SMQ offset mapping |
|
||||
- ✅ Complete SMQ publisher/subscriber integration |
|
||||
- ✅ Consumer group offset management |
|
||||
- ✅ Comprehensive error handling |
|
||||
- ✅ Thread-safe operations |
|
||||
- ✅ Extensive test coverage |
|
||||
- ✅ Performance monitoring |
|
||||
- ✅ Graceful shutdown |
|
||||
|
|
||||
### **🔧 Integration Points** |
|
||||
- **Kafka Protocol Handler**: Replace in-memory ledgers with `PersistentLedger` |
|
||||
- **Produce Path**: Use `SMQPublisher.PublishMessage()` |
|
||||
- **Fetch Path**: Use `SMQSubscriber.FetchMessages()` |
|
||||
- **Offset APIs**: Use `handler.GetHighWaterMark()`, etc. |
|
||||
|
|
||||
## 📈 **Next Steps for Production** |
|
||||
|
|
||||
1. **Replace Existing Handler**: |
|
||||
```go |
|
||||
// Replace current handler initialization |
|
||||
handler := integration.NewPersistentKafkaHandler(brokers) |
|
||||
``` |
|
||||
|
### Phase 3: Offset Translation (2-3 weeks) |
||||
|
- Lightweight cache for Kafka ↔ SMQ offset mapping |
||||
|
- Handle edge cases (rebalancing, etc.) |
||||
|
- Performance optimization |
||||
|
|
||||
|
### Phase 4: Integration & Testing (3-4 weeks) |
||||
|
- Replace current offset management |
||||
|
- Comprehensive testing with long disconnections |
||||
|
- Performance benchmarking |
||||
|
|
||||
|
**Total Estimated Effort: ~10 weeks** |
||||
|
|
||||
|
## Current Status |
||||
|
|
||||
|
### Completed |
||||
|
- Message offset persistence (Kafka offset → SMQ timestamp mapping) |
||||
|
- SMQ publishing integration with offset tracking |
||||
|
- SMQ subscription integration with offset mapping |
||||
|
- Comprehensive integration tests |
||||
|
- Architecture design for SMQ native approach |
||||
|
|
||||
2. **Update Protocol Handlers**: |
|
||||
- Modify `handleProduce()` to use `handler.ProduceMessage()` |
|
||||
- Modify `handleFetch()` to use `handler.FetchMessages()` |
|
||||
- Update offset APIs to use persistent ledgers |
|
||||
|
### Next Steps |
||||
|
1. Assess SMQ consumer group APIs |
||||
|
2. Implement SMQ native consumer group wrapper |
||||
|
3. Replace current in-memory offset management |
||||
|
4. Test long-term disconnection scenarios |
||||
|
|
||||
3. **Configuration**: |
|
||||
- Add SMQ broker configuration |
|
||||
- Configure offset persistence intervals |
|
||||
- Set up monitoring and alerting |
|
||||
|
## Key Advantages |
||||
|
|
||||
4. **Testing**: |
|
||||
- Run integration tests with real SMQ cluster |
|
||||
- Perform restart recovery testing |
|
||||
- Load testing with persistent offsets |
|
||||
|
- **Leverage Existing Infrastructure**: Use SMQ's proven distributed systems |
||||
|
- **Reduced Complexity**: No custom persistence layer to maintain |
||||
|
- **Better Reliability**: Inherit SMQ's fault tolerance and replication |
||||
|
- **Operational Simplicity**: One system to monitor instead of two |
||||
|
- **Performance**: Native SMQ operations without translation overhead |
||||
|
- **Automatic Edge Cases**: SMQ handles network partitions, broker failures, etc. |
||||
|
|
||||
## 🎉 **Summary** |
|
||||
|
## Recommendation |
||||
|
|
||||
This implementation **completely solves** the offset persistence problem identified earlier: |
|
||||
|
The **SMQ Native Offset Management** approach is strongly recommended over custom persistence because: |
||||
|
|
||||
- ❌ **Before**: "Handler restarts reset offset counters (expected in current implementation)" |
|
||||
- ✅ **After**: "Handler restarts restore offset counters from SMQ persistence" |
|
||||
|
1. **Simpler Implementation**: ~10 weeks vs 15+ weeks for custom persistence |
||||
|
2. **Higher Reliability**: Leverage battle-tested SMQ infrastructure |
||||
|
3. **Better Performance**: Native operations without dual-write penalty |
||||
|
4. **Lower Maintenance**: SMQ team handles distributed systems complexity |
||||
|
5. **Operational Benefits**: Single system to monitor and maintain |
||||
|
|
||||
The Kafka Gateway now provides **production-ready** offset management with full SMQ integration, enabling seamless Kafka client compatibility while leveraging SeaweedMQ's distributed storage capabilities. |
|
||||
|
This approach solves the critical offset persistence problem while minimizing implementation complexity and maximizing reliability. |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue