|
|
@ -344,9 +344,11 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
key := session.Key() |
|
|
|
session.mu.Unlock() |
|
|
|
|
|
|
|
// Close the old session completely
|
|
|
|
// CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset
|
|
|
|
// This prevents multiple threads from all deciding to recreate based on stale data
|
|
|
|
bc.subscribersLock.Lock() |
|
|
|
// CRITICAL: Double-check if another thread already recreated the session at the desired offset
|
|
|
|
|
|
|
|
// Double-check if another thread already recreated the session at the desired offset
|
|
|
|
// This prevents multiple concurrent threads from all trying to recreate the same session
|
|
|
|
if existingSession, exists := bc.subscribers[key]; exists { |
|
|
|
existingSession.mu.Lock() |
|
|
|