From 9eae9e1fedea97e9bef4a6412b75d39b950bffc5 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 14 Oct 2025 11:02:42 -0700 Subject: [PATCH] unlock --- weed/mq/kafka/integration/broker_client_subscribe.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 6a6800a70..828225e74 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -311,13 +311,12 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok glog.V(0).Infof("[FETCH] Seeking backward: requested=%d < session=%d, creating fresh subscriber", requestedOffset, currentStartOffset) - // Extract session details before unlocking + // Extract session details (note: session.mu was already unlocked at line 294) topic := session.Topic partition := session.Partition consumerGroup := session.ConsumerGroup consumerID := session.ConsumerID key := session.Key() - session.mu.Unlock() // CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset // This prevents multiple threads from all deciding to recreate based on stale data