Browse Source

Simplified GetOrCreateSubscriber to always reuse existing sessions

pull/7329/head
chrislu 1 week ago
parent
commit
e2c6f47cf6
  1. 79
      weed/mq/kafka/integration/broker_client_subscribe.go

79
weed/mq/kafka/integration/broker_client_subscribe.go

@ -95,88 +95,49 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
bc.subscribersLock.RLock()
if session, exists := bc.subscribers[key]; exists {
// CRITICAL: Only recreate if we need to seek BACKWARD
// Forward reads (startOffset >= session.StartOffset) can use the existing session
// Check if we can reuse the existing session
session.mu.Lock()
currentOffset := session.StartOffset
session.mu.Unlock()
if startOffset >= currentOffset {
// Can read forward from existing session, or already at requested offset
bc.subscribersLock.RUnlock()
glog.V(2).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (can read forward)",
key, currentOffset, startOffset)
return session, nil
}
// startOffset < currentOffset: need to seek backward
// Check cache first before recreating
session.mu.Lock()
// Check cache to see what offsets are available
canUseCache := false
if len(session.consumedRecords) > 0 {
cacheStartOffset := session.consumedRecords[0].Offset
cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset
if startOffset >= cacheStartOffset && startOffset <= cacheEndOffset {
canUseCache = true
glog.V(2).Infof("[FETCH] Session for %s at offset %d, requested %d (backward seek), but offset is in cache [%d-%d]",
key, currentOffset, startOffset, cacheStartOffset, cacheEndOffset)
}
}
session.mu.Unlock()
if canUseCache {
// Offset is in cache, reuse session
// Decision logic:
// 1. Forward read (startOffset >= currentOffset): Always reuse - ReadRecordsFromOffset will handle it
// 2. Backward read with cache hit: Reuse - ReadRecordsFromOffset will serve from cache
// 3. Backward read without cache: Reuse if gap is small, let ReadRecordsFromOffset handle recreation
// This prevents GetOrCreateSubscriber from constantly recreating sessions for small offsets
if startOffset >= currentOffset || canUseCache {
// Can read forward OR offset is in cache - reuse session
bc.subscribersLock.RUnlock()
glog.V(2).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (forward or cached)",
key, currentOffset, startOffset)
return session, nil
}
// Not in cache - need to recreate session at the requested offset
glog.V(0).Infof("[FETCH] Recreating session for %s: session at %d, requested %d (backward seek, not in cache)",
key, currentOffset, startOffset)
bc.subscribersLock.RUnlock()
// Close and delete the old session
bc.subscribersLock.Lock()
// CRITICAL: Double-check if another thread already recreated the session at the desired offset
// This prevents multiple concurrent threads from all trying to recreate the same session
if existingSession, exists := bc.subscribers[key]; exists {
existingSession.mu.Lock()
existingOffset := existingSession.StartOffset
existingSession.mu.Unlock()
// Check if the session was already recreated at (or before) the requested offset
if existingOffset <= startOffset {
bc.subscribersLock.Unlock()
glog.V(1).Infof("[FETCH] Session already recreated by another thread at offset %d (requested %d)", existingOffset, startOffset)
// Re-acquire the existing session and continue
return existingSession, nil
}
// Session still needs recreation - close it
if existingSession.Stream != nil {
_ = existingSession.Stream.CloseSend()
}
if existingSession.Cancel != nil {
existingSession.Cancel()
}
delete(bc.subscribers, key)
}
// CRITICAL FIX: Don't unlock here! Keep the write lock to prevent race condition
// where another thread creates a session at the wrong offset between our delete and create
// Fall through to session creation below while holding the lock
// bc.subscribersLock.Unlock() - REMOVED to fix race condition
// Write lock is already held - skip to session creation
goto createSession
} else {
// Backward seek, not in cache
// Let ReadRecordsFromOffset handle the recreation decision based on the actual read context
bc.subscribersLock.RUnlock()
glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will handle in ReadRecordsFromOffset)",
key, currentOffset, startOffset)
return session, nil
}
// Session doesn't exist - need to create one
bc.subscribersLock.RUnlock()
// Create new subscriber stream
// Need to acquire write lock since we don't have it from the paths above
bc.subscribersLock.Lock()
createSession:
defer bc.subscribersLock.Unlock()
// CRITICAL FIX: Double-check if session exists AND verify it's at the right offset

Loading…
Cancel
Save