diff --git a/go.sum b/go.sum index 4d77e6b7e..d24dc27c5 100644 --- a/go.sum +++ b/go.sum @@ -1629,8 +1629,6 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/stretchr/testify v1.11.0 h1:ib4sjIrwZKxE5u/Japgo/7SJV3PvgjGiRNAvTVGqQl8= -github.com/stretchr/testify v1.11.0/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= diff --git a/weed/mq/broker/broker_grpc_query.go b/weed/mq/broker/broker_grpc_query.go index b024ab70b..1c0b3a072 100644 --- a/weed/mq/broker/broker_grpc_query.go +++ b/weed/mq/broker/broker_grpc_query.go @@ -50,33 +50,13 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage flushedBufferRanges = make([]BufferRange, 0) } - // Determine filtering criteria based on oneof start_filter + // Use buffer_start index for precise deduplication lastFlushTsNs := localPartition.LogBuffer.LastFlushTsNs - var startTimeNs int64 - var startBufferIndex int64 - var filterType string - - // Handle oneof start_filter - switch filter := req.StartFilter.(type) { - case *mq_pb.GetUnflushedMessagesRequest_StartTimeNs: - startTimeNs = filter.StartTimeNs - filterType = "timestamp" - // Use the more restrictive of lastFlushTsNs vs requested startTimeNs - if lastFlushTsNs > startTimeNs { - startTimeNs = lastFlushTsNs - } - case *mq_pb.GetUnflushedMessagesRequest_StartBufferIndex: - startBufferIndex = filter.StartBufferIndex - startTimeNs = lastFlushTsNs // Still respect last flush time - filterType = "buffer_index" - default: - // No specific filter provided, use lastFlushTsNs as default - startTimeNs = lastFlushTsNs - filterType = "default" - } + startBufferIndex := req.StartBufferIndex + startTimeNs := lastFlushTsNs // Still respect last flush time for safety - glog.V(2).Infof("Streaming unflushed messages for %v %v, filter_type=%s, timestamp >= %d, buffer >= %d, excluding %d flushed buffer ranges", - t, partition, filterType, startTimeNs, startBufferIndex, len(flushedBufferRanges)) + glog.V(2).Infof("Streaming unflushed messages for %v %v, buffer >= %d, timestamp >= %d (safety), excluding %d flushed buffer ranges", + t, partition, startBufferIndex, startTimeNs, len(flushedBufferRanges)) // Stream messages from LogBuffer with filtering messageCount := 0 diff --git a/weed/pb/mq_broker.proto b/weed/pb/mq_broker.proto index 93fe8940d..0f12edc85 100644 --- a/weed/pb/mq_broker.proto +++ b/weed/pb/mq_broker.proto @@ -361,10 +361,7 @@ message CloseSubscribersResponse { message GetUnflushedMessagesRequest { schema_pb.Topic topic = 1; schema_pb.Partition partition = 2; - oneof start_filter { - int64 start_time_ns = 3; // Filter by timestamp (messages after this time) - int64 start_buffer_index = 4; // Filter by buffer index (messages from buffers >= this index) - } + int64 start_buffer_index = 3; // Filter by buffer index (messages from buffers >= this index) } message GetUnflushedMessagesResponse { diff --git a/weed/pb/mq_pb/mq_broker.pb.go b/weed/pb/mq_pb/mq_broker.pb.go index e9ae81d09..6b06f6cfa 100644 --- a/weed/pb/mq_pb/mq_broker.pb.go +++ b/weed/pb/mq_pb/mq_broker.pb.go @@ -2574,16 +2574,12 @@ func (*CloseSubscribersResponse) Descriptor() ([]byte, []int) { } type GetUnflushedMessagesRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - Partition *schema_pb.Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` - // Types that are valid to be assigned to StartFilter: - // - // *GetUnflushedMessagesRequest_StartTimeNs - // *GetUnflushedMessagesRequest_StartBufferIndex - StartFilter isGetUnflushedMessagesRequest_StartFilter `protobuf_oneof:"start_filter"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition *schema_pb.Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` + StartBufferIndex int64 `protobuf:"varint,3,opt,name=start_buffer_index,json=startBufferIndex,proto3" json:"start_buffer_index,omitempty"` // Filter by buffer index (messages from buffers >= this index) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *GetUnflushedMessagesRequest) Reset() { @@ -2630,47 +2626,13 @@ func (x *GetUnflushedMessagesRequest) GetPartition() *schema_pb.Partition { return nil } -func (x *GetUnflushedMessagesRequest) GetStartFilter() isGetUnflushedMessagesRequest_StartFilter { - if x != nil { - return x.StartFilter - } - return nil -} - -func (x *GetUnflushedMessagesRequest) GetStartTimeNs() int64 { - if x != nil { - if x, ok := x.StartFilter.(*GetUnflushedMessagesRequest_StartTimeNs); ok { - return x.StartTimeNs - } - } - return 0 -} - func (x *GetUnflushedMessagesRequest) GetStartBufferIndex() int64 { if x != nil { - if x, ok := x.StartFilter.(*GetUnflushedMessagesRequest_StartBufferIndex); ok { - return x.StartBufferIndex - } + return x.StartBufferIndex } return 0 } -type isGetUnflushedMessagesRequest_StartFilter interface { - isGetUnflushedMessagesRequest_StartFilter() -} - -type GetUnflushedMessagesRequest_StartTimeNs struct { - StartTimeNs int64 `protobuf:"varint,3,opt,name=start_time_ns,json=startTimeNs,proto3,oneof"` // Filter by timestamp (messages after this time) -} - -type GetUnflushedMessagesRequest_StartBufferIndex struct { - StartBufferIndex int64 `protobuf:"varint,4,opt,name=start_buffer_index,json=startBufferIndex,proto3,oneof"` // Filter by buffer index (messages from buffers >= this index) -} - -func (*GetUnflushedMessagesRequest_StartTimeNs) isGetUnflushedMessagesRequest_StartFilter() {} - -func (*GetUnflushedMessagesRequest_StartBufferIndex) isGetUnflushedMessagesRequest_StartFilter() {} - type GetUnflushedMessagesResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Message *LogEntry `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` // Single message per response (streaming) @@ -3895,13 +3857,11 @@ const file_mq_broker_proto_rawDesc = "" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x12 \n" + "\funix_time_ns\x18\x02 \x01(\x03R\n" + "unixTimeNs\"\x1a\n" + - "\x18CloseSubscribersResponse\"\xdf\x01\n" + + "\x18CloseSubscribersResponse\"\xa7\x01\n" + "\x1bGetUnflushedMessagesRequest\x12&\n" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x122\n" + - "\tpartition\x18\x02 \x01(\v2\x14.schema_pb.PartitionR\tpartition\x12$\n" + - "\rstart_time_ns\x18\x03 \x01(\x03H\x00R\vstartTimeNs\x12.\n" + - "\x12start_buffer_index\x18\x04 \x01(\x03H\x00R\x10startBufferIndexB\x0e\n" + - "\fstart_filter\"\x8a\x01\n" + + "\tpartition\x18\x02 \x01(\v2\x14.schema_pb.PartitionR\tpartition\x12,\n" + + "\x12start_buffer_index\x18\x03 \x01(\x03R\x10startBufferIndex\"\x8a\x01\n" + "\x1cGetUnflushedMessagesResponse\x120\n" + "\amessage\x18\x01 \x01(\v2\x16.messaging_pb.LogEntryR\amessage\x12\x14\n" + "\x05error\x18\x02 \x01(\tR\x05error\x12\"\n" + @@ -4168,10 +4128,6 @@ func file_mq_broker_proto_init() { (*SubscribeFollowMeRequest_Ack)(nil), (*SubscribeFollowMeRequest_Close)(nil), } - file_mq_broker_proto_msgTypes[42].OneofWrappers = []any{ - (*GetUnflushedMessagesRequest_StartTimeNs)(nil), - (*GetUnflushedMessagesRequest_StartBufferIndex)(nil), - } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index d016169a2..76055be25 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -2,6 +2,7 @@ package engine import ( "context" + "encoding/json" "fmt" "io" "strconv" @@ -16,6 +17,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" jsonpb "google.golang.org/protobuf/encoding/protojson" @@ -429,7 +431,7 @@ func (c *BrokerClient) ListTopicPartitions(ctx context.Context, namespace, topic } // GetUnflushedMessages returns only messages that haven't been flushed to disk yet -// Uses buffer_start metadata to determine what data has been persisted vs still in-memory +// Uses buffer_start metadata from disk files for precise deduplication // This prevents double-counting when combining with disk-based data func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) { // Step 1: Find the broker that hosts this partition @@ -438,7 +440,7 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi return []*filer_pb.LogEntry{}, nil } - // Step 2: Connect to broker and call the GetUnflushedMessages gRPC method + // Step 2: Connect to broker conn, err := grpc.Dial(c.brokerAddress, c.grpcDialOption) if err != nil { // Return empty slice if connection fails - prevents double-counting @@ -448,7 +450,16 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi client := mq_pb.NewSeaweedMessagingClient(conn) - // Step 3: Prepare the request using oneof start_filter (timestamp-based) + // Step 3: Get earliest buffer_start from disk files for precise deduplication + topicObj := topic.Topic{Namespace: namespace, Name: topicName} + partitionPath := topic.PartitionDir(topicObj, partition) + earliestBufferIndex, err := c.getEarliestBufferStart(ctx, partitionPath) + if err != nil { + // If we can't get buffer info, use 0 (get all unflushed data) + earliestBufferIndex = 0 + } + + // Step 4: Prepare request using buffer index filtering only request := &mq_pb.GetUnflushedMessagesRequest{ Topic: &schema_pb.Topic{ Namespace: namespace, @@ -460,16 +471,10 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi RangeStop: partition.RangeStop, UnixTimeNs: partition.UnixTimeNs, }, - StartFilter: &mq_pb.GetUnflushedMessagesRequest_StartTimeNs{ - StartTimeNs: startTimeNs, - }, - // TODO: Could use buffer index filtering for more precision: - // StartFilter: &mq_pb.GetUnflushedMessagesRequest_StartBufferIndex{ - // StartBufferIndex: latestBufferIndex, - // }, + StartBufferIndex: earliestBufferIndex, } - // Step 4: Call the broker streaming API + // Step 5: Call the broker streaming API stream, err := client.GetUnflushedMessages(ctx, request) if err != nil { // Return empty slice if gRPC call fails - prevents double-counting @@ -510,3 +515,58 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi return logEntries, nil } + +// getEarliestBufferStart finds the earliest buffer_start index from disk files in the partition +// This is used for precise deduplication - any buffer index >= this value may still be in memory +func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath string) (int64, error) { + filerClient, err := c.GetFilerClient() + if err != nil { + return 0, fmt.Errorf("failed to get filer client: %v", err) + } + + var earliestBufferIndex int64 = -1 // -1 means no buffer_start found + + err = filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + // Skip directories and parquet files + if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + + // Extract buffer_start from file extended attributes + bufferStart := c.getBufferStartFromEntry(entry) + if bufferStart != nil && bufferStart.StartIndex > 0 { + if earliestBufferIndex == -1 || bufferStart.StartIndex < earliestBufferIndex { + earliestBufferIndex = bufferStart.StartIndex + } + } + + return nil + }) + + if err != nil { + return 0, fmt.Errorf("failed to scan partition directory: %v", err) + } + + if earliestBufferIndex == -1 { + return 0, fmt.Errorf("no buffer_start metadata found in partition") + } + + return earliestBufferIndex, nil +} + +// getBufferStartFromEntry extracts LogBufferStart from file entry metadata +func (c *BrokerClient) getBufferStartFromEntry(entry *filer_pb.Entry) *LogBufferStart { + if entry.Extended == nil { + return nil + } + + // Parse buffer_start format + if startJson, exists := entry.Extended["buffer_start"]; exists { + var bufferStart LogBufferStart + if err := json.Unmarshal(startJson, &bufferStart); err == nil { + return &bufferStart + } + } + + return nil +} diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index 73d8efde4..b225e1cc0 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -174,8 +174,8 @@ func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partitio return results, nil } - // Step 1: Get unflushed data from broker using our new interface method - // This method uses buffer_start metadata to avoid double-counting + // Step 1: Get unflushed data from broker using buffer_start-based method + // This method uses buffer_start metadata to avoid double-counting with exact precision unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs) if err != nil { // Log error but don't fail the query - continue with disk data only