diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 3d8045430..6a6800a70 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -287,7 +287,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) } } - + // CRITICAL: Get the current offset atomically before making recreation decision // We need to unlock first (lock acquired at line 257) then re-acquire for atomic read currentStartOffset := session.StartOffset @@ -435,8 +435,8 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok // - Exact match (requestedOffset == session.StartOffset) // - Reading ahead (requestedOffset > session.StartOffset, e.g., from cache) glog.V(2).Infof("[FETCH] Using persistent session: requested=%d session=%d (persistent connection)", - requestedOffset, session.StartOffset) - session.mu.Unlock() + requestedOffset, currentStartOffset) + // Note: session.mu was already unlocked at line 294 after reading currentStartOffset return bc.ReadRecords(ctx, session, maxRecords) }