diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index d3a453224..0600bdd5d 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -298,7 +298,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok // // The session will naturally advance as records are consumed, so we should NOT // recreate it just because requestedOffset != session.StartOffset - + // CRITICAL: Re-check the offset under session lock to prevent race conditions // Another thread might be reading from this session right now and advancing the offset session.mu.Lock()