|
|
@ -163,35 +163,20 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re |
|
|
|
// Store in in-memory map first (works for both mock and SMQ backends)
|
|
|
|
if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil { |
|
|
|
errCode = ErrorCodeOffsetMetadataTooLarge |
|
|
|
glog.Warningf("[OFFSET_COMMIT] Failed to commit in-memory: group=%s topic=%s partition=%d offset=%d err=%v", |
|
|
|
req.GroupID, t.Name, p.Index, p.Offset, err) |
|
|
|
} |
|
|
|
|
|
|
|
// CRITICAL: Store in SMQ persistent storage - MUST NOT FAIL SILENTLY
|
|
|
|
// If SMQ storage fails, we must notify the client, as offset won't survive rebalancing
|
|
|
|
// Also store in SMQ persistent storage if available
|
|
|
|
if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil { |
|
|
|
// Only suppress error for mock backends where SMQ handler is not available
|
|
|
|
// For real deployments, this is a critical error that must be reported
|
|
|
|
if errCode == ErrorCodeNone { |
|
|
|
errCode = 45 // OutOfOrderSequenceException (closest to "storage unavailable")
|
|
|
|
glog.Errorf("[OFFSET_COMMIT] CRITICAL: Failed to persist offset to SMQ: group=%s topic=%s partition=%d offset=%d err=%v", |
|
|
|
req.GroupID, t.Name, p.Index, p.Offset, err) |
|
|
|
glog.Errorf("[OFFSET_COMMIT] WARNING: This offset will NOT survive rebalancing and may cause message loss!") |
|
|
|
} |
|
|
|
} else { |
|
|
|
// Successfully persisted
|
|
|
|
glog.V(3).Infof("[OFFSET_COMMIT] Persisted to SMQ: group=%s topic=%s partition=%d offset=%d", |
|
|
|
req.GroupID, t.Name, p.Index, p.Offset) |
|
|
|
// SMQ storage may not be available (e.g., in mock mode) - that's okay
|
|
|
|
glog.V(4).Infof("[OFFSET_COMMIT] SMQ storage not available: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
if errCode == ErrorCodeNone { |
|
|
|
if groupIsEmpty { |
|
|
|
glog.V(3).Infof("[OFFSET_COMMIT] Committed (empty group): group=%s topic=%s partition=%d offset=%d", |
|
|
|
req.GroupID, t.Name, p.Index, p.Offset) |
|
|
|
} else { |
|
|
|
glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d", |
|
|
|
req.GroupID, t.Name, p.Index, p.Offset, group.Generation) |
|
|
|
} |
|
|
|
if groupIsEmpty { |
|
|
|
glog.V(3).Infof("[OFFSET_COMMIT] Committed (empty group): group=%s topic=%s partition=%d offset=%d", |
|
|
|
req.GroupID, t.Name, p.Index, p.Offset) |
|
|
|
} else { |
|
|
|
glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d", |
|
|
|
req.GroupID, t.Name, p.Index, p.Offset, group.Generation) |
|
|
|
} |
|
|
|
} else { |
|
|
|
// Do not store commit if generation mismatch
|
|
|
@ -212,36 +197,37 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re |
|
|
|
return h.buildOffsetCommitResponse(resp, apiVersion), nil |
|
|
|
} |
|
|
|
|
|
|
|
func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, |
|
|
|
requestBody []byte) ([]byte, error) { |
|
|
|
req, err := h.parseOffsetFetchRequest(requestBody) |
|
|
|
func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { |
|
|
|
// Parse OffsetFetch request
|
|
|
|
request, err := h.parseOffsetFetchRequest(requestBody) |
|
|
|
if err != nil { |
|
|
|
return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil |
|
|
|
} |
|
|
|
|
|
|
|
glog.V(2).Infof("[OFFSET_FETCH] group=%s topics=%d", req.GroupID, len(req.Topics)) |
|
|
|
for _, t := range req.Topics { |
|
|
|
for _, p := range t.Partitions { |
|
|
|
glog.V(2).Infof("[OFFSET_FETCH] topic=%s partition=%d", t.Name, p) |
|
|
|
} |
|
|
|
// Validate request
|
|
|
|
if request.GroupID == "" { |
|
|
|
return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil |
|
|
|
} |
|
|
|
|
|
|
|
// Get the consumer group
|
|
|
|
group := h.groupCoordinator.GetOrCreateGroup(req.GroupID) |
|
|
|
// Get or create consumer group
|
|
|
|
// IMPORTANT: Use GetOrCreateGroup (not GetGroup) to allow fetching persisted offsets
|
|
|
|
// even if the group doesn't exist in memory yet. This is critical for consumer restarts.
|
|
|
|
// Kafka allows offset fetches for groups that haven't joined yet (e.g., simple consumers).
|
|
|
|
group := h.groupCoordinator.GetOrCreateGroup(request.GroupID) |
|
|
|
|
|
|
|
group.Mu.RLock() |
|
|
|
defer group.Mu.RUnlock() |
|
|
|
|
|
|
|
glog.V(4).Infof("[OFFSET_FETCH] Request: group=%s topics=%d", req.GroupID, len(req.Topics)) |
|
|
|
glog.V(4).Infof("[OFFSET_FETCH] Request: group=%s topics=%d", request.GroupID, len(request.Topics)) |
|
|
|
|
|
|
|
// Build response
|
|
|
|
response := OffsetFetchResponse{ |
|
|
|
CorrelationID: correlationID, |
|
|
|
Topics: make([]OffsetFetchTopicResponse, 0, len(req.Topics)), |
|
|
|
Topics: make([]OffsetFetchTopicResponse, 0, len(request.Topics)), |
|
|
|
ErrorCode: ErrorCodeNone, |
|
|
|
} |
|
|
|
|
|
|
|
for _, topic := range req.Topics { |
|
|
|
for _, topic := range request.Topics { |
|
|
|
topicResponse := OffsetFetchTopicResponse{ |
|
|
|
Name: topic.Name, |
|
|
|
Partitions: make([]OffsetFetchPartitionResponse, 0), |
|
|
@ -268,41 +254,25 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, |
|
|
|
if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 { |
|
|
|
fetchedOffset = off |
|
|
|
metadata = meta |
|
|
|
glog.V(2).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", |
|
|
|
req.GroupID, topic.Name, partition, off) |
|
|
|
glog.V(4).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", |
|
|
|
request.GroupID, topic.Name, partition, off) |
|
|
|
} else { |
|
|
|
// Fallback: try fetching from SMQ persistent storage
|
|
|
|
// This handles cases where offsets are stored in SMQ but not yet loaded into memory
|
|
|
|
key := ConsumerOffsetKey{ |
|
|
|
Topic: topic.Name, |
|
|
|
Partition: partition, |
|
|
|
ConsumerGroup: req.GroupID, |
|
|
|
ConsumerGroupInstance: req.GroupInstanceID, |
|
|
|
} |
|
|
|
|
|
|
|
// CRITICAL FIX: Retry offset fetch from SMQ with backoff
|
|
|
|
// During rebalancing, there can be a brief race where the offset was committed
|
|
|
|
// but not yet persisted to SMQ. A single retry with small delay fixes this.
|
|
|
|
var smqErr error |
|
|
|
for attempt := 0; attempt < 3; attempt++ { |
|
|
|
if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { |
|
|
|
fetchedOffset = off |
|
|
|
metadata = meta |
|
|
|
glog.V(2).Infof("[OFFSET_FETCH] Found in storage (attempt %d): group=%s topic=%s partition=%d offset=%d", |
|
|
|
attempt+1, req.GroupID, topic.Name, partition, off) |
|
|
|
break |
|
|
|
} else { |
|
|
|
smqErr = err |
|
|
|
if attempt < 2 { |
|
|
|
// Brief backoff before retry: 5ms * attempt
|
|
|
|
time.Sleep(time.Duration(5*(attempt+1)) * time.Millisecond) |
|
|
|
} |
|
|
|
} |
|
|
|
ConsumerGroup: request.GroupID, |
|
|
|
ConsumerGroupInstance: request.GroupInstanceID, |
|
|
|
} |
|
|
|
|
|
|
|
if fetchedOffset < 0 && smqErr != nil { |
|
|
|
glog.V(2).Infof("[OFFSET_FETCH] No offset found after retries: group=%s topic=%s partition=%d err=%v (will start from auto.offset.reset)", |
|
|
|
req.GroupID, topic.Name, partition, smqErr) |
|
|
|
if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { |
|
|
|
fetchedOffset = off |
|
|
|
metadata = meta |
|
|
|
glog.V(4).Infof("[OFFSET_FETCH] Found in storage: group=%s topic=%s partition=%d offset=%d", |
|
|
|
request.GroupID, topic.Name, partition, off) |
|
|
|
} else { |
|
|
|
glog.V(4).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d (will start from auto.offset.reset)", |
|
|
|
request.GroupID, topic.Name, partition) |
|
|
|
} |
|
|
|
// No offset found in either location (-1 indicates no committed offset)
|
|
|
|
} |
|
|
|