|
|
|
@ -311,13 +311,12 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
glog.V(0).Infof("[FETCH] Seeking backward: requested=%d < session=%d, creating fresh subscriber", |
|
|
|
requestedOffset, currentStartOffset) |
|
|
|
|
|
|
|
// Extract session details before unlocking
|
|
|
|
// Extract session details (note: session.mu was already unlocked at line 294)
|
|
|
|
topic := session.Topic |
|
|
|
partition := session.Partition |
|
|
|
consumerGroup := session.ConsumerGroup |
|
|
|
consumerID := session.ConsumerID |
|
|
|
key := session.Key() |
|
|
|
session.mu.Unlock() |
|
|
|
|
|
|
|
// CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset
|
|
|
|
// This prevents multiple threads from all deciding to recreate based on stale data
|
|
|
|
|