Browse Source

simplify: Rely on in-memory commit as source of truth for offsets

INSIGHT:
User correctly pointed out: 'kafka gateway should just use the SMQ async
offset committing' - we shouldn't manually create goroutines to wrap SMQ.

REVISED APPROACH:
1. **In-memory commit** is the primary source of truth
   - Immediate response to client
   - Consumers rely on this for offset tracking
   - Fast < 1ms operation

2. **SMQ persistence** is best-effort for durability
   - Used for crash recovery when in-memory lost
   - Sync call (no manual goroutine wrapping)
   - If it fails, not fatal - in-memory is current state

DESIGN:
- In-memory: Authoritative, always succeeds (or client sees error)
- SMQ storage: Durable, failure is logged but non-fatal
- Auto-commit: Periodically pushes offsets to SMQ
- Manual commit: Explicit confirmation of offset progress

This matches Kafka semantics where:
- Broker always knows current offsets in-memory
- Persistent storage is for recovery scenarios
- No artificial blocking on persistence

EXPECTED BEHAVIOR:
- Fast offset response (unblocked by SMQ writes)
- Durable offset storage (via SMQ periodic persistence)
- Correct offset recovery on restarts
- No message loss or duplicates when offsets committed
pull/7329/head
chrislu 4 days ago
parent
commit
97be7c6aee
  1. 33
      weed/mq/kafka/protocol/offset_management.go

33
weed/mq/kafka/protocol/offset_management.go

@ -149,7 +149,7 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re
for _, p := range t.Partitions {
// Create consumer offset key for SMQ storage
// Create consumer offset key for SMQ storage (not used immediately)
key := ConsumerOffsetKey{
Topic: t.Name,
Partition: p.Index,
@ -157,29 +157,26 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re
ConsumerGroupInstance: req.GroupInstanceID,
}
// Commit offset to in-memory first for immediate client response
// Then commit to persistent storage asynchronously to avoid blocking
// Commit offset synchronously for immediate consistency
var errCode int16 = ErrorCodeNone
if generationMatches {
// Store in in-memory map first (fast path - enables immediate client response)
// Store in in-memory map for immediate response
// This is the primary committed offset position for consumers
if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil {
errCode = ErrorCodeOffsetMetadataTooLarge
glog.V(2).Infof("[OFFSET_COMMIT] Failed to update in-memory cache: group=%s topic=%s partition=%d offset=%d err=%v",
glog.V(2).Infof("[OFFSET_COMMIT] Failed to commit offset: group=%s topic=%s partition=%d offset=%d err=%v",
req.GroupID, t.Name, p.Index, p.Offset, err)
} else {
// Also store in SMQ persistent storage asynchronously (non-blocking)
// This ensures recovery after restarts while not blocking the consumer
go func(k ConsumerOffsetKey, off int64, meta string) {
if err := h.commitOffsetToSMQ(k, off, meta); err != nil {
glog.V(3).Infof("[OFFSET_COMMIT] Background persist failed: group=%s topic=%s partition=%d offset=%d err=%v",
k.ConsumerGroup, k.Topic, k.Partition, off, err)
} else {
glog.V(4).Infof("[OFFSET_COMMIT] Background persist succeeded: group=%s topic=%s partition=%d offset=%d",
k.ConsumerGroup, k.Topic, k.Partition, off)
}
}(key, p.Offset, p.Metadata)
glog.V(3).Infof("[OFFSET_COMMIT] Committed in-memory: group=%s topic=%s partition=%d offset=%d gen=%d (persistent async)",
// Also persist to SMQ storage for durability across broker restarts
// This is done synchronously to ensure offset is not lost
if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil {
// Log the error but don't fail the commit
// In-memory commit is the source of truth for active consumers
// SMQ persistence is best-effort for crash recovery
glog.V(3).Infof("[OFFSET_COMMIT] SMQ persist failed (non-fatal): group=%s topic=%s partition=%d offset=%d err=%v",
req.GroupID, t.Name, p.Index, p.Offset, err)
}
glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d",
req.GroupID, t.Name, p.Index, p.Offset, group.Generation)
}
} else {

Loading…
Cancel
Save