|
|
|
@ -198,10 +198,6 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re |
|
|
|
} |
|
|
|
|
|
|
|
func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { |
|
|
|
glog.V(0).Infof("═══════════════════════════════════════════════════════════════") |
|
|
|
glog.V(0).Infof(" 🔍 OFFSET_FETCH API CALLED (ApiKey 9)") |
|
|
|
glog.V(0).Infof("═══════════════════════════════════════════════════════════════") |
|
|
|
|
|
|
|
// Parse OffsetFetch request
|
|
|
|
request, err := h.parseOffsetFetchRequest(requestBody) |
|
|
|
if err != nil { |
|
|
|
@ -222,7 +218,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req |
|
|
|
group.Mu.RLock() |
|
|
|
defer group.Mu.RUnlock() |
|
|
|
|
|
|
|
glog.V(0).Infof("[OFFSET_FETCH] Request: group=%s topics=%d", request.GroupID, len(request.Topics)) |
|
|
|
glog.V(2).Infof("[OFFSET_FETCH] Request: group=%s topics=%d", request.GroupID, len(request.Topics)) |
|
|
|
|
|
|
|
// Build response
|
|
|
|
response := OffsetFetchResponse{ |
|
|
|
@ -258,7 +254,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req |
|
|
|
if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 { |
|
|
|
fetchedOffset = off |
|
|
|
metadata = meta |
|
|
|
glog.V(0).Infof("[OFFSET_FETCH] ✓ Found in memory: group=%s topic=%s partition=%d offset=%d", |
|
|
|
glog.V(2).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", |
|
|
|
request.GroupID, topic.Name, partition, off) |
|
|
|
} else { |
|
|
|
// Fallback: try fetching from SMQ persistent storage
|
|
|
|
@ -272,10 +268,10 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req |
|
|
|
if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { |
|
|
|
fetchedOffset = off |
|
|
|
metadata = meta |
|
|
|
glog.V(0).Infof("[OFFSET_FETCH] ✓ Found in storage: group=%s topic=%s partition=%d offset=%d", |
|
|
|
glog.V(2).Infof("[OFFSET_FETCH] Found in storage: group=%s topic=%s partition=%d offset=%d", |
|
|
|
request.GroupID, topic.Name, partition, off) |
|
|
|
} else { |
|
|
|
glog.V(0).Infof("[OFFSET_FETCH] ✗ No offset found: group=%s topic=%s partition=%d (will start from auto.offset.reset)", |
|
|
|
glog.V(2).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d (will start from auto.offset.reset)", |
|
|
|
request.GroupID, topic.Name, partition) |
|
|
|
} |
|
|
|
// No offset found in either location (-1 indicates no committed offset)
|
|
|
|
|