From 7e934d62834a6acf282850a77ae8810d3334e281 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 15 Oct 2025 09:12:03 -0700 Subject: [PATCH] ack messages to broker --- weed/mq/broker/broker_grpc_sub.go | 108 +++++++++--------- .../integration/broker_client_subscribe.go | 77 +++++++++++-- 2 files changed, 124 insertions(+), 61 deletions(-) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 1d607999e..51a74c6a9 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -230,60 +230,60 @@ subscribeLoop: // After waking up, check if we should stop return subscribeCtx.Err() == nil && isConnected }, func(logEntry *filer_pb.LogEntry) (bool, error) { - // Wait for the message to be acknowledged with a timeout to prevent infinite loops - const maxWaitTime = 30 * time.Second - const checkInterval = 137 * time.Millisecond - startTime := time.Now() - - for imt.IsInflight(logEntry.Key) { - // Check if we've exceeded the maximum wait time - if time.Since(startTime) > maxWaitTime { - glog.Warningf("Subscriber %s: message with key %s has been in-flight for more than %v, forcing acknowledgment", - clientName, string(logEntry.Key), maxWaitTime) - // Force remove the message from in-flight tracking to prevent infinite loop - imt.AcknowledgeMessage(logEntry.Key, logEntry.TsNs) - break - } - - time.Sleep(checkInterval) - - // Check if the client has disconnected by monitoring the context - select { - case <-subscribeCtx.Done(): - err := subscribeCtx.Err() - if err == context.Canceled { - // Subscribe cancelled (seek or disconnect) - return false, nil + // Wait for the message to be acknowledged with a timeout to prevent infinite loops + const maxWaitTime = 30 * time.Second + const checkInterval = 137 * time.Millisecond + startTime := time.Now() + + for imt.IsInflight(logEntry.Key) { + // Check if we've exceeded the maximum wait time + if time.Since(startTime) > maxWaitTime { + glog.Warningf("Subscriber %s: message with key %s has been in-flight for more than %v, forcing acknowledgment", + clientName, string(logEntry.Key), maxWaitTime) + // Force remove the message from in-flight tracking to prevent infinite loop + imt.AcknowledgeMessage(logEntry.Key, logEntry.TsNs) + break + } + + time.Sleep(checkInterval) + + // Check if the client has disconnected by monitoring the context + select { + case <-subscribeCtx.Done(): + err := subscribeCtx.Err() + if err == context.Canceled { + // Subscribe cancelled (seek or disconnect) + return false, nil + } + glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + return false, nil + default: + // Continue processing the request + } + } + if logEntry.Key != nil { + imt.EnflightMessage(logEntry.Key, logEntry.TsNs) } - glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) - return false, nil - default: - // Continue processing the request - } - } - if logEntry.Key != nil { - imt.EnflightMessage(logEntry.Key, logEntry.TsNs) - } - // Create the message to send - dataMsg := &mq_pb.DataMessage{ - Key: logEntry.Key, - Value: logEntry.Data, - TsNs: logEntry.TsNs, - } + // Create the message to send + dataMsg := &mq_pb.DataMessage{ + Key: logEntry.Key, + Value: logEntry.Data, + TsNs: logEntry.TsNs, + } - if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{ - Data: dataMsg, - }}); err != nil { - glog.Errorf("Error sending data: %v", err) - return false, err - } + if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{ + Data: dataMsg, + }}); err != nil { + glog.Errorf("Error sending data: %v", err) + return false, err + } - // Update received offset and last seen time for this subscriber - subscriber.UpdateReceivedOffset(logEntry.TsNs) + // Update received offset and last seen time for this subscriber + subscriber.UpdateReceivedOffset(logEntry.TsNs) - counter++ - return false, nil + counter++ + return false, nil }) subscribeDone <- subscribeErr }() @@ -303,24 +303,24 @@ subscribeLoop: // Seek requested - cancel current Subscribe and restart from new offset glog.V(0).Infof("Subscriber %s seeking from offset %d to offset %d (type %v)", clientName, currentPosition.GetOffset(), seekMsg.Offset, seekMsg.OffsetType) - + // Cancel current Subscribe iteration subscribeCancel() - + // Wait for Subscribe to finish cancelling <-subscribeDone - + // Update position for next iteration currentPosition = b.getRequestPositionFromSeek(seekMsg) glog.V(0).Infof("Subscriber %s restarting Subscribe from new offset %d", clientName, seekMsg.Offset) - + // Send acknowledgment that seek completed stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{ Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{ Error: "", // Empty error means success }, }}) - + // Loop will restart with new position } } diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index e9adcd234..a06dcea8d 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -3,6 +3,7 @@ package integration import ( "context" "fmt" + "io" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -248,8 +249,11 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok } glog.V(2).Infof("[FETCH] Returning %d cached records for %s at offset %d (cache: %d-%d)", endIdx-startIdx, session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) + // CRITICAL: Capture slice while holding lock to prevent race condition + // If we unlock before slicing, another goroutine could clear consumedRecords + result := session.consumedRecords[startIdx:endIdx] session.mu.Unlock() - return session.consumedRecords[startIdx:endIdx], nil + return result, nil } } else { glog.V(2).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]", @@ -291,6 +295,11 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok } if err := session.Stream.Send(seekMsg); err != nil { + // Handle graceful shutdown: EOF means stream is closing + if err == io.EOF { + glog.V(2).Infof("[FETCH] Stream closing during seek to offset %d, returning empty", requestedOffset) + return []*SeaweedRecord{}, nil + } return nil, fmt.Errorf("seek to offset %d failed: %v", requestedOffset, err) } @@ -363,7 +372,9 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib } glog.V(2).Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1) - return session.consumedRecords[startIdx:endIdx], nil + // CRITICAL: Capture slice result while holding lock (defer will unlock after return) + result := session.consumedRecords[startIdx:endIdx] + return result, nil } } @@ -438,6 +449,22 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib currentOffset++ glog.V(2).Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d", record.Offset, len(record.Key), len(record.Value)) + + // CRITICAL: Auto-acknowledge first message immediately for Kafka gateway + // Kafka uses offset commits (not per-message acks) so we must ack to prevent + // broker from blocking on in-flight messages waiting for acks that will never come + ackMsg := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Ack{ + Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ + Key: dataMsg.Key, + TsNs: dataMsg.TsNs, + }, + }, + } + if err := stream.Send(ackMsg); err != nil { + glog.V(2).Infof("[FETCH] Failed to send ack for first record offset %d: %v (continuing)", record.Offset, err) + // Don't fail the fetch if ack fails - continue reading + } } case <-ctx.Done(): @@ -518,6 +545,22 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib glog.V(2).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v", len(records), record.Offset, len(record.Key), len(record.Value), readDuration) + + // CRITICAL: Auto-acknowledge message immediately for Kafka gateway + // Kafka uses offset commits (not per-message acks) so we must ack to prevent + // broker from blocking on in-flight messages waiting for acks that will never come + ackMsg := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Ack{ + Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ + Key: dataMsg.Key, + TsNs: dataMsg.TsNs, + }, + }, + } + if err := stream.Send(ackMsg); err != nil { + glog.V(2).Infof("[FETCH] Failed to send ack for offset %d: %v (continuing)", record.Offset, err) + // Don't fail the fetch if ack fails - continue reading + } } case <-ctx2.Done(): @@ -682,12 +725,12 @@ func (session *BrokerSubscriberSession) SeekToOffset(offset int64) error { session.mu.Lock() currentOffset := session.StartOffset session.mu.Unlock() - + if currentOffset == offset { glog.V(2).Infof("[SEEK] Already at offset %d for %s[%d], skipping seek", offset, session.Topic, session.Partition) return nil } - + seekMsg := &mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Seek{ Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ @@ -696,16 +739,21 @@ func (session *BrokerSubscriberSession) SeekToOffset(offset int64) error { }, }, } - + if err := session.Stream.Send(seekMsg); err != nil { + // Handle graceful shutdown + if err == io.EOF { + glog.V(2).Infof("[SEEK] Stream closing during seek to offset %d for %s[%d]", offset, session.Topic, session.Partition) + return nil // Not an error during shutdown + } return fmt.Errorf("seek to offset %d failed: %v", offset, err) } - + session.mu.Lock() session.StartOffset = offset session.consumedRecords = nil session.mu.Unlock() - + glog.V(2).Infof("[SEEK] Seeked to offset %d for %s[%d]", offset, session.Topic, session.Partition) return nil } @@ -725,6 +773,11 @@ func (session *BrokerSubscriberSession) SeekToTimestamp(timestampNs int64) error } if err := session.Stream.Send(seekMsg); err != nil { + // Handle graceful shutdown + if err == io.EOF { + glog.V(2).Infof("[SEEK] Stream closing during seek to timestamp %d for %s[%d]", timestampNs, session.Topic, session.Partition) + return nil // Not an error during shutdown + } return fmt.Errorf("seek to timestamp %d failed: %v", timestampNs, err) } @@ -752,6 +805,11 @@ func (session *BrokerSubscriberSession) SeekToEarliest() error { } if err := session.Stream.Send(seekMsg); err != nil { + // Handle graceful shutdown + if err == io.EOF { + glog.V(2).Infof("[SEEK] Stream closing during seek to earliest for %s[%d]", session.Topic, session.Partition) + return nil // Not an error during shutdown + } return fmt.Errorf("seek to earliest failed: %v", err) } @@ -778,6 +836,11 @@ func (session *BrokerSubscriberSession) SeekToLatest() error { } if err := session.Stream.Send(seekMsg); err != nil { + // Handle graceful shutdown + if err == io.EOF { + glog.V(2).Infof("[SEEK] Stream closing during seek to latest for %s[%d]", session.Topic, session.Partition) + return nil // Not an error during shutdown + } return fmt.Errorf("seek to latest failed: %v", err) }