diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index a06dcea8d..4b0abe7a0 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -306,7 +306,24 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok // Update session state after successful seek session.mu.Lock() session.StartOffset = requestedOffset - session.consumedRecords = nil // Clear cache after seek + + // CRITICAL: Only clear cache if seeking forward past cached data + // For backward seeks, keep cache to avoid re-reading same data from broker + shouldClearCache := true + if len(session.consumedRecords) > 0 { + cacheStartOffset := session.consumedRecords[0].Offset + cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset + // Keep cache if seeking to an offset within or before cached range + if requestedOffset <= cacheEndOffset { + shouldClearCache = false + glog.V(2).Infof("[FETCH] Keeping cache after seek to %d (cache: [%d-%d])", + requestedOffset, cacheStartOffset, cacheEndOffset) + } + } + if shouldClearCache { + session.consumedRecords = nil + glog.V(2).Infof("[FETCH] Cleared cache after forward seek to %d", requestedOffset) + } session.mu.Unlock() glog.V(2).Infof("[FETCH] Seek to offset %d successful", requestedOffset) @@ -751,7 +768,17 @@ func (session *BrokerSubscriberSession) SeekToOffset(offset int64) error { session.mu.Lock() session.StartOffset = offset - session.consumedRecords = nil + // Only clear cache if seeking forward past cached data + shouldClearCache := true + if len(session.consumedRecords) > 0 { + cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset + if offset <= cacheEndOffset { + shouldClearCache = false + } + } + if shouldClearCache { + session.consumedRecords = nil + } session.mu.Unlock() glog.V(2).Infof("[SEEK] Seeked to offset %d for %s[%d]", offset, session.Topic, session.Partition)