From 63b3a105351acc59414022187a42caaffe1c48ef Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 13 Oct 2025 23:49:52 -0700 Subject: [PATCH] comment --- weed/mq/kafka/integration/broker_client_subscribe.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 35bfdacdb..7e2bfa173 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -344,9 +344,11 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok key := session.Key() session.mu.Unlock() - // Close the old session completely + // CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset + // This prevents multiple threads from all deciding to recreate based on stale data bc.subscribersLock.Lock() - // CRITICAL: Double-check if another thread already recreated the session at the desired offset + + // Double-check if another thread already recreated the session at the desired offset // This prevents multiple concurrent threads from all trying to recreate the same session if existingSession, exists := bc.subscribers[key]; exists { existingSession.mu.Lock()