diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 35bfdacdb..7e2bfa173 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -344,9 +344,11 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok key := session.Key() session.mu.Unlock() - // Close the old session completely + // CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset + // This prevents multiple threads from all deciding to recreate based on stale data bc.subscribersLock.Lock() - // CRITICAL: Double-check if another thread already recreated the session at the desired offset + + // Double-check if another thread already recreated the session at the desired offset // This prevents multiple concurrent threads from all trying to recreate the same session if existingSession, exists := bc.subscribers[key]; exists { existingSession.mu.Lock()