From fe9e0161d538710e345250b43c02dc5cd8ae03a2 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 14 Oct 2025 07:47:18 -0700 Subject: [PATCH] fmt --- weed/mq/kafka/integration/broker_client_subscribe.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index d3a453224..0600bdd5d 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -298,7 +298,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok // // The session will naturally advance as records are consumed, so we should NOT // recreate it just because requestedOffset != session.StartOffset - + // CRITICAL: Re-check the offset under session lock to prevent race conditions // Another thread might be reading from this session right now and advancing the offset session.mu.Lock()