diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 6d2c3aba5..be9d92ecc 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -95,88 +95,49 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta bc.subscribersLock.RLock() if session, exists := bc.subscribers[key]; exists { - // CRITICAL: Only recreate if we need to seek BACKWARD - // Forward reads (startOffset >= session.StartOffset) can use the existing session + // Check if we can reuse the existing session session.mu.Lock() currentOffset := session.StartOffset - session.mu.Unlock() - - if startOffset >= currentOffset { - // Can read forward from existing session, or already at requested offset - bc.subscribersLock.RUnlock() - glog.V(2).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (can read forward)", - key, currentOffset, startOffset) - return session, nil - } - // startOffset < currentOffset: need to seek backward - // Check cache first before recreating - session.mu.Lock() + // Check cache to see what offsets are available canUseCache := false if len(session.consumedRecords) > 0 { cacheStartOffset := session.consumedRecords[0].Offset cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset if startOffset >= cacheStartOffset && startOffset <= cacheEndOffset { canUseCache = true - glog.V(2).Infof("[FETCH] Session for %s at offset %d, requested %d (backward seek), but offset is in cache [%d-%d]", - key, currentOffset, startOffset, cacheStartOffset, cacheEndOffset) } } session.mu.Unlock() - if canUseCache { - // Offset is in cache, reuse session + // Decision logic: + // 1. Forward read (startOffset >= currentOffset): Always reuse - ReadRecordsFromOffset will handle it + // 2. Backward read with cache hit: Reuse - ReadRecordsFromOffset will serve from cache + // 3. Backward read without cache: Reuse if gap is small, let ReadRecordsFromOffset handle recreation + // This prevents GetOrCreateSubscriber from constantly recreating sessions for small offsets + + if startOffset >= currentOffset || canUseCache { + // Can read forward OR offset is in cache - reuse session bc.subscribersLock.RUnlock() + glog.V(2).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (forward or cached)", + key, currentOffset, startOffset) return session, nil } - // Not in cache - need to recreate session at the requested offset - glog.V(0).Infof("[FETCH] Recreating session for %s: session at %d, requested %d (backward seek, not in cache)", - key, currentOffset, startOffset) - bc.subscribersLock.RUnlock() - - // Close and delete the old session - bc.subscribersLock.Lock() - // CRITICAL: Double-check if another thread already recreated the session at the desired offset - // This prevents multiple concurrent threads from all trying to recreate the same session - if existingSession, exists := bc.subscribers[key]; exists { - existingSession.mu.Lock() - existingOffset := existingSession.StartOffset - existingSession.mu.Unlock() - - // Check if the session was already recreated at (or before) the requested offset - if existingOffset <= startOffset { - bc.subscribersLock.Unlock() - glog.V(1).Infof("[FETCH] Session already recreated by another thread at offset %d (requested %d)", existingOffset, startOffset) - // Re-acquire the existing session and continue - return existingSession, nil - } - - // Session still needs recreation - close it - if existingSession.Stream != nil { - _ = existingSession.Stream.CloseSend() - } - if existingSession.Cancel != nil { - existingSession.Cancel() - } - delete(bc.subscribers, key) - } - // CRITICAL FIX: Don't unlock here! Keep the write lock to prevent race condition - // where another thread creates a session at the wrong offset between our delete and create - // Fall through to session creation below while holding the lock - // bc.subscribersLock.Unlock() - REMOVED to fix race condition - - // Write lock is already held - skip to session creation - goto createSession - } else { + // Backward seek, not in cache + // Let ReadRecordsFromOffset handle the recreation decision based on the actual read context bc.subscribersLock.RUnlock() + glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will handle in ReadRecordsFromOffset)", + key, currentOffset, startOffset) + return session, nil } + // Session doesn't exist - need to create one + bc.subscribersLock.RUnlock() + // Create new subscriber stream // Need to acquire write lock since we don't have it from the paths above bc.subscribersLock.Lock() - -createSession: defer bc.subscribersLock.Unlock() // CRITICAL FIX: Double-check if session exists AND verify it's at the right offset