Browse Source

ack messages to broker

pull/7329/head
chrislu 4 weeks ago
parent
commit
7e934d6283
  1. 98
      weed/mq/broker/broker_grpc_sub.go
  2. 67
      weed/mq/kafka/integration/broker_client_subscribe.go

98
weed/mq/broker/broker_grpc_sub.go

@ -230,60 +230,60 @@ subscribeLoop:
// After waking up, check if we should stop // After waking up, check if we should stop
return subscribeCtx.Err() == nil && isConnected return subscribeCtx.Err() == nil && isConnected
}, func(logEntry *filer_pb.LogEntry) (bool, error) { }, func(logEntry *filer_pb.LogEntry) (bool, error) {
// Wait for the message to be acknowledged with a timeout to prevent infinite loops
const maxWaitTime = 30 * time.Second
const checkInterval = 137 * time.Millisecond
startTime := time.Now()
for imt.IsInflight(logEntry.Key) {
// Check if we've exceeded the maximum wait time
if time.Since(startTime) > maxWaitTime {
glog.Warningf("Subscriber %s: message with key %s has been in-flight for more than %v, forcing acknowledgment",
clientName, string(logEntry.Key), maxWaitTime)
// Force remove the message from in-flight tracking to prevent infinite loop
imt.AcknowledgeMessage(logEntry.Key, logEntry.TsNs)
break
}
time.Sleep(checkInterval)
// Check if the client has disconnected by monitoring the context
select {
case <-subscribeCtx.Done():
err := subscribeCtx.Err()
if err == context.Canceled {
// Subscribe cancelled (seek or disconnect)
return false, nil
// Wait for the message to be acknowledged with a timeout to prevent infinite loops
const maxWaitTime = 30 * time.Second
const checkInterval = 137 * time.Millisecond
startTime := time.Now()
for imt.IsInflight(logEntry.Key) {
// Check if we've exceeded the maximum wait time
if time.Since(startTime) > maxWaitTime {
glog.Warningf("Subscriber %s: message with key %s has been in-flight for more than %v, forcing acknowledgment",
clientName, string(logEntry.Key), maxWaitTime)
// Force remove the message from in-flight tracking to prevent infinite loop
imt.AcknowledgeMessage(logEntry.Key, logEntry.TsNs)
break
}
time.Sleep(checkInterval)
// Check if the client has disconnected by monitoring the context
select {
case <-subscribeCtx.Done():
err := subscribeCtx.Err()
if err == context.Canceled {
// Subscribe cancelled (seek or disconnect)
return false, nil
}
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
return false, nil
default:
// Continue processing the request
}
}
if logEntry.Key != nil {
imt.EnflightMessage(logEntry.Key, logEntry.TsNs)
} }
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
return false, nil
default:
// Continue processing the request
}
}
if logEntry.Key != nil {
imt.EnflightMessage(logEntry.Key, logEntry.TsNs)
}
// Create the message to send
dataMsg := &mq_pb.DataMessage{
Key: logEntry.Key,
Value: logEntry.Data,
TsNs: logEntry.TsNs,
}
// Create the message to send
dataMsg := &mq_pb.DataMessage{
Key: logEntry.Key,
Value: logEntry.Data,
TsNs: logEntry.TsNs,
}
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
Data: dataMsg,
}}); err != nil {
glog.Errorf("Error sending data: %v", err)
return false, err
}
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
Data: dataMsg,
}}); err != nil {
glog.Errorf("Error sending data: %v", err)
return false, err
}
// Update received offset and last seen time for this subscriber
subscriber.UpdateReceivedOffset(logEntry.TsNs)
// Update received offset and last seen time for this subscriber
subscriber.UpdateReceivedOffset(logEntry.TsNs)
counter++
return false, nil
counter++
return false, nil
}) })
subscribeDone <- subscribeErr subscribeDone <- subscribeErr
}() }()

67
weed/mq/kafka/integration/broker_client_subscribe.go

@ -3,6 +3,7 @@ package integration
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
@ -248,8 +249,11 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
} }
glog.V(2).Infof("[FETCH] Returning %d cached records for %s at offset %d (cache: %d-%d)", glog.V(2).Infof("[FETCH] Returning %d cached records for %s at offset %d (cache: %d-%d)",
endIdx-startIdx, session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) endIdx-startIdx, session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset)
// CRITICAL: Capture slice while holding lock to prevent race condition
// If we unlock before slicing, another goroutine could clear consumedRecords
result := session.consumedRecords[startIdx:endIdx]
session.mu.Unlock() session.mu.Unlock()
return session.consumedRecords[startIdx:endIdx], nil
return result, nil
} }
} else { } else {
glog.V(2).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]", glog.V(2).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]",
@ -291,6 +295,11 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
} }
if err := session.Stream.Send(seekMsg); err != nil { if err := session.Stream.Send(seekMsg); err != nil {
// Handle graceful shutdown: EOF means stream is closing
if err == io.EOF {
glog.V(2).Infof("[FETCH] Stream closing during seek to offset %d, returning empty", requestedOffset)
return []*SeaweedRecord{}, nil
}
return nil, fmt.Errorf("seek to offset %d failed: %v", requestedOffset, err) return nil, fmt.Errorf("seek to offset %d failed: %v", requestedOffset, err)
} }
@ -363,7 +372,9 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
} }
glog.V(2).Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1) glog.V(2).Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1)
return session.consumedRecords[startIdx:endIdx], nil
// CRITICAL: Capture slice result while holding lock (defer will unlock after return)
result := session.consumedRecords[startIdx:endIdx]
return result, nil
} }
} }
@ -438,6 +449,22 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
currentOffset++ currentOffset++
glog.V(2).Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d", glog.V(2).Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d",
record.Offset, len(record.Key), len(record.Value)) record.Offset, len(record.Key), len(record.Value))
// CRITICAL: Auto-acknowledge first message immediately for Kafka gateway
// Kafka uses offset commits (not per-message acks) so we must ack to prevent
// broker from blocking on in-flight messages waiting for acks that will never come
ackMsg := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
Key: dataMsg.Key,
TsNs: dataMsg.TsNs,
},
},
}
if err := stream.Send(ackMsg); err != nil {
glog.V(2).Infof("[FETCH] Failed to send ack for first record offset %d: %v (continuing)", record.Offset, err)
// Don't fail the fetch if ack fails - continue reading
}
} }
case <-ctx.Done(): case <-ctx.Done():
@ -518,6 +545,22 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
glog.V(2).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v", glog.V(2).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v",
len(records), record.Offset, len(record.Key), len(record.Value), readDuration) len(records), record.Offset, len(record.Key), len(record.Value), readDuration)
// CRITICAL: Auto-acknowledge message immediately for Kafka gateway
// Kafka uses offset commits (not per-message acks) so we must ack to prevent
// broker from blocking on in-flight messages waiting for acks that will never come
ackMsg := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
Key: dataMsg.Key,
TsNs: dataMsg.TsNs,
},
},
}
if err := stream.Send(ackMsg); err != nil {
glog.V(2).Infof("[FETCH] Failed to send ack for offset %d: %v (continuing)", record.Offset, err)
// Don't fail the fetch if ack fails - continue reading
}
} }
case <-ctx2.Done(): case <-ctx2.Done():
@ -698,6 +741,11 @@ func (session *BrokerSubscriberSession) SeekToOffset(offset int64) error {
} }
if err := session.Stream.Send(seekMsg); err != nil { if err := session.Stream.Send(seekMsg); err != nil {
// Handle graceful shutdown
if err == io.EOF {
glog.V(2).Infof("[SEEK] Stream closing during seek to offset %d for %s[%d]", offset, session.Topic, session.Partition)
return nil // Not an error during shutdown
}
return fmt.Errorf("seek to offset %d failed: %v", offset, err) return fmt.Errorf("seek to offset %d failed: %v", offset, err)
} }
@ -725,6 +773,11 @@ func (session *BrokerSubscriberSession) SeekToTimestamp(timestampNs int64) error
} }
if err := session.Stream.Send(seekMsg); err != nil { if err := session.Stream.Send(seekMsg); err != nil {
// Handle graceful shutdown
if err == io.EOF {
glog.V(2).Infof("[SEEK] Stream closing during seek to timestamp %d for %s[%d]", timestampNs, session.Topic, session.Partition)
return nil // Not an error during shutdown
}
return fmt.Errorf("seek to timestamp %d failed: %v", timestampNs, err) return fmt.Errorf("seek to timestamp %d failed: %v", timestampNs, err)
} }
@ -752,6 +805,11 @@ func (session *BrokerSubscriberSession) SeekToEarliest() error {
} }
if err := session.Stream.Send(seekMsg); err != nil { if err := session.Stream.Send(seekMsg); err != nil {
// Handle graceful shutdown
if err == io.EOF {
glog.V(2).Infof("[SEEK] Stream closing during seek to earliest for %s[%d]", session.Topic, session.Partition)
return nil // Not an error during shutdown
}
return fmt.Errorf("seek to earliest failed: %v", err) return fmt.Errorf("seek to earliest failed: %v", err)
} }
@ -778,6 +836,11 @@ func (session *BrokerSubscriberSession) SeekToLatest() error {
} }
if err := session.Stream.Send(seekMsg); err != nil { if err := session.Stream.Send(seekMsg); err != nil {
// Handle graceful shutdown
if err == io.EOF {
glog.V(2).Infof("[SEEK] Stream closing during seek to latest for %s[%d]", session.Topic, session.Partition)
return nil // Not an error during shutdown
}
return fmt.Errorf("seek to latest failed: %v", err) return fmt.Errorf("seek to latest failed: %v", err)
} }

Loading…
Cancel
Save