@ -2,6 +2,7 @@ package engine
import (
"context"
"encoding/json"
"fmt"
"io"
"strconv"
@ -16,6 +17,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
jsonpb "google.golang.org/protobuf/encoding/protojson"
@ -429,7 +431,7 @@ func (c *BrokerClient) ListTopicPartitions(ctx context.Context, namespace, topic
}
// GetUnflushedMessages returns only messages that haven't been flushed to disk yet
// Uses buffer_start metadata to determine what data has been persisted vs still in-memory
// Uses buffer_start metadata from disk files for precise deduplication
// This prevents double-counting when combining with disk-based data
func ( c * BrokerClient ) GetUnflushedMessages ( ctx context . Context , namespace , topicName string , partition topic . Partition , startTimeNs int64 ) ( [ ] * filer_pb . LogEntry , error ) {
// Step 1: Find the broker that hosts this partition
@ -438,7 +440,7 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi
return [ ] * filer_pb . LogEntry { } , nil
}
// Step 2: Connect to broker and call the GetUnflushedMessages gRPC method
// Step 2: Connect to broker
conn , err := grpc . Dial ( c . brokerAddress , c . grpcDialOption )
if err != nil {
// Return empty slice if connection fails - prevents double-counting
@ -448,7 +450,16 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi
client := mq_pb . NewSeaweedMessagingClient ( conn )
// Step 3: Prepare the request using oneof start_filter (timestamp-based)
// Step 3: Get earliest buffer_start from disk files for precise deduplication
topicObj := topic . Topic { Namespace : namespace , Name : topicName }
partitionPath := topic . PartitionDir ( topicObj , partition )
earliestBufferIndex , err := c . getEarliestBufferStart ( ctx , partitionPath )
if err != nil {
// If we can't get buffer info, use 0 (get all unflushed data)
earliestBufferIndex = 0
}
// Step 4: Prepare request using buffer index filtering only
request := & mq_pb . GetUnflushedMessagesRequest {
Topic : & schema_pb . Topic {
Namespace : namespace ,
@ -460,16 +471,10 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi
RangeStop : partition . RangeStop ,
UnixTimeNs : partition . UnixTimeNs ,
} ,
StartFilter : & mq_pb . GetUnflushedMessagesRequest_StartTimeNs {
StartTimeNs : startTimeNs ,
} ,
// TODO: Could use buffer index filtering for more precision:
// StartFilter: &mq_pb.GetUnflushedMessagesRequest_StartBufferIndex{
// StartBufferIndex: latestBufferIndex,
// },
StartBufferIndex : earliestBufferIndex ,
}
// Step 4 : Call the broker streaming API
// Step 5: Call the broker streaming API
stream , err := client . GetUnflushedMessages ( ctx , request )
if err != nil {
// Return empty slice if gRPC call fails - prevents double-counting
@ -510,3 +515,58 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi
return logEntries , nil
}
// getEarliestBufferStart finds the earliest buffer_start index from disk files in the partition
// This is used for precise deduplication - any buffer index >= this value may still be in memory
func ( c * BrokerClient ) getEarliestBufferStart ( ctx context . Context , partitionPath string ) ( int64 , error ) {
filerClient , err := c . GetFilerClient ( )
if err != nil {
return 0 , fmt . Errorf ( "failed to get filer client: %v" , err )
}
var earliestBufferIndex int64 = - 1 // -1 means no buffer_start found
err = filer_pb . ReadDirAllEntries ( ctx , filerClient , util . FullPath ( partitionPath ) , "" , func ( entry * filer_pb . Entry , isLast bool ) error {
// Skip directories and parquet files
if entry . IsDirectory || strings . HasSuffix ( entry . Name , ".parquet" ) {
return nil
}
// Extract buffer_start from file extended attributes
bufferStart := c . getBufferStartFromEntry ( entry )
if bufferStart != nil && bufferStart . StartIndex > 0 {
if earliestBufferIndex == - 1 || bufferStart . StartIndex < earliestBufferIndex {
earliestBufferIndex = bufferStart . StartIndex
}
}
return nil
} )
if err != nil {
return 0 , fmt . Errorf ( "failed to scan partition directory: %v" , err )
}
if earliestBufferIndex == - 1 {
return 0 , fmt . Errorf ( "no buffer_start metadata found in partition" )
}
return earliestBufferIndex , nil
}
// getBufferStartFromEntry extracts LogBufferStart from file entry metadata
func ( c * BrokerClient ) getBufferStartFromEntry ( entry * filer_pb . Entry ) * LogBufferStart {
if entry . Extended == nil {
return nil
}
// Parse buffer_start format
if startJson , exists := entry . Extended [ "buffer_start" ] ; exists {
var bufferStart LogBufferStart
if err := json . Unmarshal ( startJson , & bufferStart ) ; err == nil {
return & bufferStart
}
}
return nil
}