|
|
@ -38,8 +38,8 @@ func createSubscribeInitMessage(topic string, actualPartition *schema_pb.Partiti |
|
|
|
// This ensures each fetch gets fresh data from the requested offset
|
|
|
|
// consumerGroup and consumerID are passed from Kafka client for proper tracking in SMQ
|
|
|
|
func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, startOffset int64, consumerGroup string, consumerID string) (*BrokerSubscriberSession, error) { |
|
|
|
// Create a dedicated context for this subscriber
|
|
|
|
subscriberCtx := context.Background() |
|
|
|
// Use BrokerClient's context so subscriber is cancelled when connection closes
|
|
|
|
subscriberCtx, subscriberCancel := context.WithCancel(bc.ctx) |
|
|
|
|
|
|
|
stream, err := bc.client.SubscribeMessage(subscriberCtx) |
|
|
|
if err != nil { |
|
|
@ -78,6 +78,8 @@ func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, sta |
|
|
|
StartOffset: startOffset, |
|
|
|
ConsumerGroup: consumerGroup, |
|
|
|
ConsumerID: consumerID, |
|
|
|
Ctx: subscriberCtx, |
|
|
|
Cancel: subscriberCancel, |
|
|
|
} |
|
|
|
|
|
|
|
return session, nil |
|
|
@ -157,21 +159,21 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta |
|
|
|
|
|
|
|
// Session is at wrong offset - must recreate
|
|
|
|
glog.V(2).Infof("[FETCH] Session exists at wrong offset %d (requested %d), recreating", existingOffset, startOffset) |
|
|
|
// CRITICAL: Hold session lock while cancelling to prevent race with active Recv() calls
|
|
|
|
session.mu.Lock() |
|
|
|
if session.Stream != nil { |
|
|
|
_ = session.Stream.CloseSend() |
|
|
|
} |
|
|
|
if session.Cancel != nil { |
|
|
|
session.Cancel() |
|
|
|
} |
|
|
|
session.mu.Unlock() |
|
|
|
delete(bc.subscribers, key) |
|
|
|
} |
|
|
|
|
|
|
|
// CRITICAL FIX: Use background context for subscriber to prevent premature cancellation
|
|
|
|
// Subscribers need to continue reading data even when the connection is closing,
|
|
|
|
// otherwise Schema Registry and other clients can't read existing data.
|
|
|
|
// The subscriber will be cleaned up when the stream is explicitly closed.
|
|
|
|
subscriberCtx := context.Background() |
|
|
|
subscriberCancel := func() {} // No-op cancel
|
|
|
|
// Use BrokerClient's context so subscribers are automatically cancelled when connection closes
|
|
|
|
// This ensures proper cleanup without artificial timeouts
|
|
|
|
subscriberCtx, subscriberCancel := context.WithCancel(bc.ctx) |
|
|
|
|
|
|
|
stream, err := bc.client.SubscribeMessage(subscriberCtx) |
|
|
|
if err != nil { |
|
|
@ -287,12 +289,21 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
// The session will naturally advance as records are consumed, so we should NOT
|
|
|
|
// recreate it just because requestedOffset != session.StartOffset
|
|
|
|
|
|
|
|
if requestedOffset < currentStartOffset { |
|
|
|
// Need to seek backward - close old session and create a fresh subscriber
|
|
|
|
// OPTIMIZATION: Only recreate for EXTREMELY LARGE backward seeks (>1000000 offsets back)
|
|
|
|
// Most backward seeks should be served from cache or tolerated as forward reads
|
|
|
|
// This prevents creating zombie streams that never get cleaned up on the broker
|
|
|
|
// gRPC's stream.Recv() NEVER unblocks when streams are cancelled, leaving goroutines
|
|
|
|
// orphaned forever. Each recreation leaves 2 goroutines (first record + loop) blocked.
|
|
|
|
// With 14K recreations, that's 28K leaked goroutines. Solution: almost never recreate.
|
|
|
|
const maxBackwardGap = 1000000 |
|
|
|
offsetGap := currentStartOffset - requestedOffset |
|
|
|
|
|
|
|
if requestedOffset < currentStartOffset && offsetGap > maxBackwardGap { |
|
|
|
// Need to seek backward significantly - close old session and create a fresh subscriber
|
|
|
|
// Restarting an existing stream doesn't work reliably because the broker may still
|
|
|
|
// have old data buffered in the stream pipeline
|
|
|
|
glog.V(2).Infof("[FETCH] Seeking backward: requested=%d < session=%d, creating fresh subscriber", |
|
|
|
requestedOffset, currentStartOffset) |
|
|
|
glog.V(2).Infof("[FETCH] Seeking backward significantly: requested=%d < session=%d (gap=%d), creating fresh subscriber", |
|
|
|
requestedOffset, currentStartOffset, offsetGap) |
|
|
|
|
|
|
|
// Extract session details (note: session.mu was already unlocked at line 294)
|
|
|
|
topic := session.Topic |
|
|
@ -325,12 +336,15 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
glog.V(2).Infof("[FETCH] Session %s still at wrong offset %d (requested %d) - must recreate", key, existingOffset, requestedOffset) |
|
|
|
|
|
|
|
// Session still needs recreation - close it
|
|
|
|
// CRITICAL: Hold session lock while cancelling to prevent race with active Recv() calls
|
|
|
|
existingSession.mu.Lock() |
|
|
|
if existingSession.Stream != nil { |
|
|
|
_ = existingSession.Stream.CloseSend() |
|
|
|
} |
|
|
|
if existingSession.Cancel != nil { |
|
|
|
existingSession.Cancel() |
|
|
|
} |
|
|
|
existingSession.mu.Unlock() |
|
|
|
delete(bc.subscribers, key) |
|
|
|
glog.V(2).Infof("[FETCH] Closed old subscriber session for backward seek: %s (was at offset %d, need offset %d)", key, existingOffset, requestedOffset) |
|
|
|
} |
|
|
@ -341,8 +355,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
// Create a completely fresh subscriber at the requested offset
|
|
|
|
// INLINE SESSION CREATION to hold the lock continuously
|
|
|
|
glog.V(1).Infof("[FETCH] Creating inline subscriber session while holding lock: %s at offset %d", key, requestedOffset) |
|
|
|
subscriberCtx := context.Background() |
|
|
|
subscriberCancel := func() {} // No-op cancel
|
|
|
|
subscriberCtx, subscriberCancel := context.WithCancel(bc.ctx) |
|
|
|
|
|
|
|
stream, err := bc.client.SubscribeMessage(subscriberCtx) |
|
|
|
if err != nil { |
|
|
@ -483,24 +496,32 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib |
|
|
|
defer cancel() |
|
|
|
} |
|
|
|
|
|
|
|
// CRITICAL: Capture stream pointer while holding lock to prevent TOCTOU race
|
|
|
|
// If we access session.Stream in the goroutine, it could become nil between check and use
|
|
|
|
stream := session.Stream |
|
|
|
if stream == nil { |
|
|
|
glog.V(2).Infof("[FETCH] Stream is nil, cannot read") |
|
|
|
return records, nil |
|
|
|
} |
|
|
|
|
|
|
|
type recvResult struct { |
|
|
|
resp *mq_pb.SubscribeMessageResponse |
|
|
|
err error |
|
|
|
} |
|
|
|
recvChan := make(chan recvResult, 1) |
|
|
|
|
|
|
|
// Try to receive first record
|
|
|
|
// Try to receive first record using captured stream pointer
|
|
|
|
go func() { |
|
|
|
// Check if stream is nil (can happen during session recreation race condition)
|
|
|
|
if session.Stream == nil { |
|
|
|
select { |
|
|
|
case recvChan <- recvResult{resp: nil, err: fmt.Errorf("stream is nil")}: |
|
|
|
case <-ctx.Done(): |
|
|
|
// Context cancelled, don't send (avoid blocking)
|
|
|
|
// Recover from panics caused by stream being closed during Recv()
|
|
|
|
defer func() { |
|
|
|
if r := recover(); r != nil { |
|
|
|
select { |
|
|
|
case recvChan <- recvResult{resp: nil, err: fmt.Errorf("stream recv panicked: %v", r)}: |
|
|
|
case <-ctx.Done(): |
|
|
|
} |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
resp, err := session.Stream.Recv() |
|
|
|
}() |
|
|
|
resp, err := stream.Recv() |
|
|
|
select { |
|
|
|
case recvChan <- recvResult{resp: resp, err: err}: |
|
|
|
case <-ctx.Done(): |
|
|
@ -561,16 +582,17 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib |
|
|
|
recvChan2 := make(chan recvResult, 1) |
|
|
|
|
|
|
|
go func() { |
|
|
|
// Check if stream is nil (can happen during session recreation race condition)
|
|
|
|
if session.Stream == nil { |
|
|
|
select { |
|
|
|
case recvChan2 <- recvResult{resp: nil, err: fmt.Errorf("stream is nil")}: |
|
|
|
case <-ctx2.Done(): |
|
|
|
// Context cancelled
|
|
|
|
// Recover from panics caused by stream being closed during Recv()
|
|
|
|
defer func() { |
|
|
|
if r := recover(); r != nil { |
|
|
|
select { |
|
|
|
case recvChan2 <- recvResult{resp: nil, err: fmt.Errorf("stream recv panicked: %v", r)}: |
|
|
|
case <-ctx2.Done(): |
|
|
|
} |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
resp, err := session.Stream.Recv() |
|
|
|
}() |
|
|
|
// Use captured stream pointer to prevent TOCTOU race
|
|
|
|
resp, err := stream.Recv() |
|
|
|
select { |
|
|
|
case recvChan2 <- recvResult{resp: resp, err: err}: |
|
|
|
case <-ctx2.Done(): |
|
|
@ -651,12 +673,15 @@ func (bc *BrokerClient) CloseSubscriber(topic string, partition int32, consumerG |
|
|
|
defer bc.subscribersLock.Unlock() |
|
|
|
|
|
|
|
if session, exists := bc.subscribers[key]; exists { |
|
|
|
// CRITICAL: Hold session lock while cancelling to prevent race with active Recv() calls
|
|
|
|
session.mu.Lock() |
|
|
|
if session.Stream != nil { |
|
|
|
_ = session.Stream.CloseSend() |
|
|
|
} |
|
|
|
if session.Cancel != nil { |
|
|
|
session.Cancel() |
|
|
|
} |
|
|
|
session.mu.Unlock() |
|
|
|
delete(bc.subscribers, key) |
|
|
|
glog.V(2).Infof("[FETCH] Closed subscriber for %s", key) |
|
|
|
} |
|
|
@ -721,7 +746,7 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO |
|
|
|
session.nextOffsetToRead = newOffset |
|
|
|
|
|
|
|
// Create new stream from new offset
|
|
|
|
subscriberCtx, cancel := context.WithCancel(context.Background()) |
|
|
|
subscriberCtx, cancel := context.WithCancel(bc.ctx) |
|
|
|
|
|
|
|
stream, err := bc.client.SubscribeMessage(subscriberCtx) |
|
|
|
if err != nil { |
|
|
|