diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index a0b8504bf..e3ee6954b 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -442,6 +442,15 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib // Try to receive first record go func() { + // Check if stream is nil (can happen during session recreation race condition) + if session.Stream == nil { + select { + case recvChan <- recvResult{resp: nil, err: fmt.Errorf("stream is nil")}: + case <-ctx.Done(): + // Context cancelled, don't send (avoid blocking) + } + return + } resp, err := session.Stream.Recv() select { case recvChan <- recvResult{resp: resp, err: err}: @@ -501,6 +510,15 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib recvChan2 := make(chan recvResult, 1) go func() { + // Check if stream is nil (can happen during session recreation race condition) + if session.Stream == nil { + select { + case recvChan2 <- recvResult{resp: nil, err: fmt.Errorf("stream is nil")}: + case <-ctx2.Done(): + // Context cancelled + } + return + } resp, err := session.Stream.Recv() select { case recvChan2 <- recvResult{resp: resp, err: err}: