From 73ebc69a824fe0abd9fbe36ebb79d76f811e3188 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 14 Oct 2025 07:51:06 -0700 Subject: [PATCH] avoid deadlock --- weed/mq/kafka/integration/broker_client_subscribe.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 0600bdd5d..3d8045430 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -287,6 +287,11 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) } } + + // CRITICAL: Get the current offset atomically before making recreation decision + // We need to unlock first (lock acquired at line 257) then re-acquire for atomic read + currentStartOffset := session.StartOffset + session.mu.Unlock() // CRITICAL FIX for Schema Registry: Keep subscriber alive across multiple fetch requests // Schema Registry expects to make multiple poll() calls on the same consumer connection @@ -299,12 +304,6 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok // The session will naturally advance as records are consumed, so we should NOT // recreate it just because requestedOffset != session.StartOffset - // CRITICAL: Re-check the offset under session lock to prevent race conditions - // Another thread might be reading from this session right now and advancing the offset - session.mu.Lock() - currentStartOffset := session.StartOffset - session.mu.Unlock() - if requestedOffset < currentStartOffset { // Need to seek backward - close old session and create a fresh subscriber // Restarting an existing stream doesn't work reliably because the broker may still