From 233ade4187205f957cc4079ef090f6e784f19c82 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 13 Oct 2025 23:03:02 -0700 Subject: [PATCH] fix race condition --- .../integration/broker_client_subscribe.go | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 09015ec60..d1cdefb5d 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -151,7 +151,13 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta } delete(bc.subscribers, key) } - bc.subscribersLock.Unlock() + // CRITICAL FIX: Don't unlock here! Keep the write lock to prevent race condition + // where another thread creates a session at the wrong offset between our delete and create + // Fall through to session creation below while holding the lock + // bc.subscribersLock.Unlock() - REMOVED to fix race condition + + // Write lock is already held - skip to session creation + goto createSession } else { // Exact match - reuse bc.subscribersLock.RUnlock() @@ -162,11 +168,35 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta } // Create new subscriber stream + // Need to acquire write lock since we don't have it from the paths above bc.subscribersLock.Lock() + +createSession: defer bc.subscribersLock.Unlock() + // CRITICAL FIX: Double-check if session exists AND verify it's at the right offset + // This can happen if another thread created a session while we were acquiring the lock + // (only possible in the non-recreation path where we released the read lock) if session, exists := bc.subscribers[key]; exists { - return session, nil + session.mu.Lock() + existingOffset := session.StartOffset + session.mu.Unlock() + + // Only reuse if the session is at or before the requested offset + if existingOffset <= startOffset { + glog.V(1).Infof("[FETCH] Session already exists at offset %d (requested %d), reusing", existingOffset, startOffset) + return session, nil + } + + // Session is at wrong offset - must recreate + glog.V(0).Infof("[FETCH] Session exists at wrong offset %d (requested %d), recreating", existingOffset, startOffset) + if session.Stream != nil { + _ = session.Stream.CloseSend() + } + if session.Cancel != nil { + session.Cancel() + } + delete(bc.subscribers, key) } // CRITICAL FIX: Use background context for subscriber to prevent premature cancellation