diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 6a6800a70..828225e74 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -311,13 +311,12 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok glog.V(0).Infof("[FETCH] Seeking backward: requested=%d < session=%d, creating fresh subscriber", requestedOffset, currentStartOffset) - // Extract session details before unlocking + // Extract session details (note: session.mu was already unlocked at line 294) topic := session.Topic partition := session.Partition consumerGroup := session.ConsumerGroup consumerID := session.ConsumerID key := session.Key() - session.mu.Unlock() // CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset // This prevents multiple threads from all deciding to recreate based on stale data