From 92a7e423683c6400419c18ca2e473c2340621fb1 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 14 Oct 2025 00:54:15 -0700 Subject: [PATCH] atomic currentStartOffset --- weed/mq/kafka/integration/broker_client_subscribe.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index be9d92ecc..d3a453224 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -298,13 +298,19 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok // // The session will naturally advance as records are consumed, so we should NOT // recreate it just because requestedOffset != session.StartOffset + + // CRITICAL: Re-check the offset under session lock to prevent race conditions + // Another thread might be reading from this session right now and advancing the offset + session.mu.Lock() + currentStartOffset := session.StartOffset + session.mu.Unlock() - if requestedOffset < session.StartOffset { + if requestedOffset < currentStartOffset { // Need to seek backward - close old session and create a fresh subscriber // Restarting an existing stream doesn't work reliably because the broker may still // have old data buffered in the stream pipeline glog.V(0).Infof("[FETCH] Seeking backward: requested=%d < session=%d, creating fresh subscriber", - requestedOffset, session.StartOffset) + requestedOffset, currentStartOffset) // Extract session details before unlocking topic := session.Topic