From 60e6e637061c4c90fae50c8f93fe8c6585169162 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 14 Oct 2025 23:49:10 -0700 Subject: [PATCH] avoid goroutine leak --- weed/mq/broker/broker_grpc_sub.go | 79 ++++++++-------- .../integration/broker_client_subscribe.go | 93 ++++++++++++------- .../inflight_message_tracker.go | 11 +++ 3 files changed, 111 insertions(+), 72 deletions(-) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 0d3298ae8..2c2326cbd 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "io" - "sync" - "sync/atomic" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -57,8 +55,15 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs isConnected := true var counter int64 + startPosition := b.getRequestPosition(req.GetInit()) + imt := sub_coordinator.NewInflightMessageTracker(int(req.GetInit().SlidingWindowSize)) + defer func() { isConnected = false + // Clean up any in-flight messages to prevent them from blocking other subscribers + if cleanedCount := imt.Cleanup(); cleanedCount > 0 { + glog.V(0).Infof("Subscriber %s cleaned up %d in-flight messages on disconnect", clientName, cleanedCount) + } localTopicPartition.Subscribers.RemoveSubscriber(clientName) glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) // Use topic-aware shutdown logic to prevent aggressive removal of system topics @@ -67,9 +72,6 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } }() - startPosition := b.getRequestPosition(req.GetInit()) - imt := sub_coordinator.NewInflightMessageTracker(int(req.GetInit().SlidingWindowSize)) - // connect to the follower var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker) @@ -106,9 +108,13 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } go func() { + defer cancel() // CRITICAL: Cancel context when Recv goroutine exits (client disconnect) + var lastOffset int64 + for { ack, err := stream.Recv() + if err != nil { if err == io.EOF { // the client has called CloseSend(). This is to ack the close. @@ -166,50 +172,47 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } }() - var cancelOnce sync.Once + // Create a goroutine to handle context cancellation and wake up the condition variable + // This is created ONCE per subscriber, not per callback invocation + go func() { + <-ctx.Done() + // Wake up the condition variable when context is cancelled + localTopicPartition.ListenersLock.Lock() + localTopicPartition.ListenersCond.Broadcast() + localTopicPartition.ListenersLock.Unlock() + }() err = localTopicPartition.Subscribe(clientName, startPosition, func() bool { - // Check if context is cancelled FIRST before any blocking operations - select { - case <-ctx.Done(): + // Check cancellation before waiting + if ctx.Err() != nil || !isConnected { return false - default: } - if !isConnected { - return false - } - - // Ensure we will wake any Wait() when the client disconnects - cancelOnce.Do(func() { - go func() { - <-ctx.Done() - localTopicPartition.ListenersLock.Lock() - localTopicPartition.ListenersCond.Broadcast() - localTopicPartition.ListenersLock.Unlock() - }() - }) - - // Block until new data is available or the client disconnects + // Wait for new data using condition variable (blocking, not polling) localTopicPartition.ListenersLock.Lock() - atomic.AddInt64(&localTopicPartition.ListenersWaits, 1) localTopicPartition.ListenersCond.Wait() - atomic.AddInt64(&localTopicPartition.ListenersWaits, -1) localTopicPartition.ListenersLock.Unlock() - // Add a small sleep to avoid CPU busy-wait when checking for new data - time.Sleep(10 * time.Millisecond) - - if ctx.Err() != nil { - return false - } - if !isConnected { - return false - } - return true + // After waking up, check if we should stop + return ctx.Err() == nil && isConnected }, func(logEntry *filer_pb.LogEntry) (bool, error) { + // Wait for the message to be acknowledged with a timeout to prevent infinite loops + const maxWaitTime = 30 * time.Second + const checkInterval = 137 * time.Millisecond + startTime := time.Now() + for imt.IsInflight(logEntry.Key) { - time.Sleep(137 * time.Millisecond) + // Check if we've exceeded the maximum wait time + if time.Since(startTime) > maxWaitTime { + glog.Warningf("Subscriber %s: message with key %s has been in-flight for more than %v, forcing acknowledgment", + clientName, string(logEntry.Key), maxWaitTime) + // Force remove the message from in-flight tracking to prevent infinite loop + imt.AcknowledgeMessage(logEntry.Key, logEntry.TsNs) + break + } + + time.Sleep(checkInterval) + // Check if the client has disconnected by monitoring the context select { case <-ctx.Done(): diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index b674c3aeb..432cdfe04 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -38,8 +38,8 @@ func createSubscribeInitMessage(topic string, actualPartition *schema_pb.Partiti // This ensures each fetch gets fresh data from the requested offset // consumerGroup and consumerID are passed from Kafka client for proper tracking in SMQ func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, startOffset int64, consumerGroup string, consumerID string) (*BrokerSubscriberSession, error) { - // Create a dedicated context for this subscriber - subscriberCtx := context.Background() + // Use BrokerClient's context so subscriber is cancelled when connection closes + subscriberCtx, subscriberCancel := context.WithCancel(bc.ctx) stream, err := bc.client.SubscribeMessage(subscriberCtx) if err != nil { @@ -78,6 +78,8 @@ func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, sta StartOffset: startOffset, ConsumerGroup: consumerGroup, ConsumerID: consumerID, + Ctx: subscriberCtx, + Cancel: subscriberCancel, } return session, nil @@ -157,21 +159,21 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta // Session is at wrong offset - must recreate glog.V(2).Infof("[FETCH] Session exists at wrong offset %d (requested %d), recreating", existingOffset, startOffset) + // CRITICAL: Hold session lock while cancelling to prevent race with active Recv() calls + session.mu.Lock() if session.Stream != nil { _ = session.Stream.CloseSend() } if session.Cancel != nil { session.Cancel() } + session.mu.Unlock() delete(bc.subscribers, key) } - // CRITICAL FIX: Use background context for subscriber to prevent premature cancellation - // Subscribers need to continue reading data even when the connection is closing, - // otherwise Schema Registry and other clients can't read existing data. - // The subscriber will be cleaned up when the stream is explicitly closed. - subscriberCtx := context.Background() - subscriberCancel := func() {} // No-op cancel + // Use BrokerClient's context so subscribers are automatically cancelled when connection closes + // This ensures proper cleanup without artificial timeouts + subscriberCtx, subscriberCancel := context.WithCancel(bc.ctx) stream, err := bc.client.SubscribeMessage(subscriberCtx) if err != nil { @@ -287,12 +289,21 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok // The session will naturally advance as records are consumed, so we should NOT // recreate it just because requestedOffset != session.StartOffset - if requestedOffset < currentStartOffset { - // Need to seek backward - close old session and create a fresh subscriber + // OPTIMIZATION: Only recreate for EXTREMELY LARGE backward seeks (>1000000 offsets back) + // Most backward seeks should be served from cache or tolerated as forward reads + // This prevents creating zombie streams that never get cleaned up on the broker + // gRPC's stream.Recv() NEVER unblocks when streams are cancelled, leaving goroutines + // orphaned forever. Each recreation leaves 2 goroutines (first record + loop) blocked. + // With 14K recreations, that's 28K leaked goroutines. Solution: almost never recreate. + const maxBackwardGap = 1000000 + offsetGap := currentStartOffset - requestedOffset + + if requestedOffset < currentStartOffset && offsetGap > maxBackwardGap { + // Need to seek backward significantly - close old session and create a fresh subscriber // Restarting an existing stream doesn't work reliably because the broker may still // have old data buffered in the stream pipeline - glog.V(2).Infof("[FETCH] Seeking backward: requested=%d < session=%d, creating fresh subscriber", - requestedOffset, currentStartOffset) + glog.V(2).Infof("[FETCH] Seeking backward significantly: requested=%d < session=%d (gap=%d), creating fresh subscriber", + requestedOffset, currentStartOffset, offsetGap) // Extract session details (note: session.mu was already unlocked at line 294) topic := session.Topic @@ -325,12 +336,15 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok glog.V(2).Infof("[FETCH] Session %s still at wrong offset %d (requested %d) - must recreate", key, existingOffset, requestedOffset) // Session still needs recreation - close it + // CRITICAL: Hold session lock while cancelling to prevent race with active Recv() calls + existingSession.mu.Lock() if existingSession.Stream != nil { _ = existingSession.Stream.CloseSend() } if existingSession.Cancel != nil { existingSession.Cancel() } + existingSession.mu.Unlock() delete(bc.subscribers, key) glog.V(2).Infof("[FETCH] Closed old subscriber session for backward seek: %s (was at offset %d, need offset %d)", key, existingOffset, requestedOffset) } @@ -341,8 +355,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok // Create a completely fresh subscriber at the requested offset // INLINE SESSION CREATION to hold the lock continuously glog.V(1).Infof("[FETCH] Creating inline subscriber session while holding lock: %s at offset %d", key, requestedOffset) - subscriberCtx := context.Background() - subscriberCancel := func() {} // No-op cancel + subscriberCtx, subscriberCancel := context.WithCancel(bc.ctx) stream, err := bc.client.SubscribeMessage(subscriberCtx) if err != nil { @@ -483,24 +496,32 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib defer cancel() } + // CRITICAL: Capture stream pointer while holding lock to prevent TOCTOU race + // If we access session.Stream in the goroutine, it could become nil between check and use + stream := session.Stream + if stream == nil { + glog.V(2).Infof("[FETCH] Stream is nil, cannot read") + return records, nil + } + type recvResult struct { resp *mq_pb.SubscribeMessageResponse err error } recvChan := make(chan recvResult, 1) - // Try to receive first record + // Try to receive first record using captured stream pointer go func() { - // Check if stream is nil (can happen during session recreation race condition) - if session.Stream == nil { - select { - case recvChan <- recvResult{resp: nil, err: fmt.Errorf("stream is nil")}: - case <-ctx.Done(): - // Context cancelled, don't send (avoid blocking) + // Recover from panics caused by stream being closed during Recv() + defer func() { + if r := recover(); r != nil { + select { + case recvChan <- recvResult{resp: nil, err: fmt.Errorf("stream recv panicked: %v", r)}: + case <-ctx.Done(): + } } - return - } - resp, err := session.Stream.Recv() + }() + resp, err := stream.Recv() select { case recvChan <- recvResult{resp: resp, err: err}: case <-ctx.Done(): @@ -561,16 +582,17 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib recvChan2 := make(chan recvResult, 1) go func() { - // Check if stream is nil (can happen during session recreation race condition) - if session.Stream == nil { - select { - case recvChan2 <- recvResult{resp: nil, err: fmt.Errorf("stream is nil")}: - case <-ctx2.Done(): - // Context cancelled + // Recover from panics caused by stream being closed during Recv() + defer func() { + if r := recover(); r != nil { + select { + case recvChan2 <- recvResult{resp: nil, err: fmt.Errorf("stream recv panicked: %v", r)}: + case <-ctx2.Done(): + } } - return - } - resp, err := session.Stream.Recv() + }() + // Use captured stream pointer to prevent TOCTOU race + resp, err := stream.Recv() select { case recvChan2 <- recvResult{resp: resp, err: err}: case <-ctx2.Done(): @@ -651,12 +673,15 @@ func (bc *BrokerClient) CloseSubscriber(topic string, partition int32, consumerG defer bc.subscribersLock.Unlock() if session, exists := bc.subscribers[key]; exists { + // CRITICAL: Hold session lock while cancelling to prevent race with active Recv() calls + session.mu.Lock() if session.Stream != nil { _ = session.Stream.CloseSend() } if session.Cancel != nil { session.Cancel() } + session.mu.Unlock() delete(bc.subscribers, key) glog.V(2).Infof("[FETCH] Closed subscriber for %s", key) } @@ -721,7 +746,7 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO session.nextOffsetToRead = newOffset // Create new stream from new offset - subscriberCtx, cancel := context.WithCancel(context.Background()) + subscriberCtx, cancel := context.WithCancel(bc.ctx) stream, err := bc.client.SubscribeMessage(subscriberCtx) if err != nil { diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go index 2cdfbc4e5..8ecbb2ccd 100644 --- a/weed/mq/sub_coordinator/inflight_message_tracker.go +++ b/weed/mq/sub_coordinator/inflight_message_tracker.go @@ -77,6 +77,17 @@ func (imt *InflightMessageTracker) IsInflight(key []byte) bool { return found } +// Cleanup clears all in-flight messages. This should be called when a subscriber disconnects +// to prevent messages from being stuck in the in-flight state indefinitely. +func (imt *InflightMessageTracker) Cleanup() int { + imt.mu.Lock() + defer imt.mu.Unlock() + count := len(imt.messages) + // Clear all in-flight messages + imt.messages = make(map[string]int64) + return count +} + type TimestampStatus struct { Timestamp int64 Acked bool