From e3a56d7c30c969957af962748a777b6ba6a51ac2 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 2 Sep 2025 00:10:07 -0700 Subject: [PATCH] filter out already flushed messages --- other/java/client/src/main/proto/filer.proto | 2 +- weed/mq/broker/broker_grpc_query.go | 229 ++++++++ weed/mq/broker/broker_write.go | 5 - weed/pb/mq_broker.proto | 29 + weed/pb/mq_pb/mq_broker.pb.go | 550 ++++++++++++++----- weed/pb/mq_pb/mq_broker_grpc.pb.go | 43 ++ weed/query/engine/broker_client.go | 83 +++ weed/query/engine/catalog.go | 4 + weed/query/engine/hybrid_message_scanner.go | 74 ++- weed/query/engine/mocks_test.go | 22 + 10 files changed, 864 insertions(+), 177 deletions(-) create mode 100644 weed/mq/broker/broker_grpc_query.go diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 8116a6589..3eb3d3a14 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -162,7 +162,7 @@ message FileChunk { bool is_compressed = 10; bool is_chunk_manifest = 11; // content is a list of FileChunks SSEType sse_type = 12; // Server-side encryption type - bytes sse_kms_metadata = 13; // Serialized SSE-KMS metadata for this chunk + bytes sse_metadata = 13; // Serialized SSE metadata for this chunk (SSE-C, SSE-KMS, or SSE-S3) } message FileChunkManifest { diff --git a/weed/mq/broker/broker_grpc_query.go b/weed/mq/broker/broker_grpc_query.go new file mode 100644 index 000000000..b024ab70b --- /dev/null +++ b/weed/mq/broker/broker_grpc_query.go @@ -0,0 +1,229 @@ +package broker + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" +) + +// BufferRange represents a range of buffer indexes that have been flushed to disk +type BufferRange struct { + start int64 + end int64 +} + +// GetUnflushedMessages returns messages from the broker's in-memory LogBuffer +// that haven't been flushed to disk yet, using buffer_start metadata for deduplication +// Now supports streaming responses and buffer index filtering for better performance +func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error { + // Convert protobuf types to internal types + t := topic.FromPbTopic(req.Topic) + partition := topic.FromPbPartition(req.Partition) + + glog.V(2).Infof("GetUnflushedMessages request for %v %v", t, partition) + + // Get the local partition for this topic/partition + b.accessLock.Lock() + localPartition := b.localTopicManager.GetLocalPartition(t, partition) + b.accessLock.Unlock() + + if localPartition == nil { + return stream.Send(&mq_pb.GetUnflushedMessagesResponse{ + Error: fmt.Sprintf("partition %v %v not found on this broker", t, partition), + EndOfStream: true, + }) + } + + // Build deduplication map from existing log files using buffer_start metadata + partitionDir := topic.PartitionDir(t, partition) + flushedBufferRanges, err := b.buildBufferStartDeduplicationMap(partitionDir) + if err != nil { + glog.Errorf("Failed to build deduplication map for %v %v: %v", t, partition, err) + // Continue with empty map - better to potentially duplicate than to miss data + flushedBufferRanges = make([]BufferRange, 0) + } + + // Determine filtering criteria based on oneof start_filter + lastFlushTsNs := localPartition.LogBuffer.LastFlushTsNs + var startTimeNs int64 + var startBufferIndex int64 + var filterType string + + // Handle oneof start_filter + switch filter := req.StartFilter.(type) { + case *mq_pb.GetUnflushedMessagesRequest_StartTimeNs: + startTimeNs = filter.StartTimeNs + filterType = "timestamp" + // Use the more restrictive of lastFlushTsNs vs requested startTimeNs + if lastFlushTsNs > startTimeNs { + startTimeNs = lastFlushTsNs + } + case *mq_pb.GetUnflushedMessagesRequest_StartBufferIndex: + startBufferIndex = filter.StartBufferIndex + startTimeNs = lastFlushTsNs // Still respect last flush time + filterType = "buffer_index" + default: + // No specific filter provided, use lastFlushTsNs as default + startTimeNs = lastFlushTsNs + filterType = "default" + } + + glog.V(2).Infof("Streaming unflushed messages for %v %v, filter_type=%s, timestamp >= %d, buffer >= %d, excluding %d flushed buffer ranges", + t, partition, filterType, startTimeNs, startBufferIndex, len(flushedBufferRanges)) + + // Stream messages from LogBuffer with filtering + messageCount := 0 + startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex) + _, _, err = localPartition.LogBuffer.LoopProcessLogData("sql_query_stream", startPosition, 0, + func() bool { return false }, // Don't wait for more data + func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { + // Apply buffer index filtering if specified + currentBatchIndex := localPartition.LogBuffer.GetBatchIndex() + if startBufferIndex > 0 && currentBatchIndex < startBufferIndex { + glog.V(3).Infof("Skipping message from buffer index %d (< %d)", currentBatchIndex, startBufferIndex) + return false, nil + } + + // Check if this message is from a buffer range that's already been flushed + if b.isBufferIndexFlushed(currentBatchIndex, flushedBufferRanges) { + glog.V(3).Infof("Skipping message from flushed buffer index %d", currentBatchIndex) + return false, nil + } + + // Stream this message + err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{ + Message: &mq_pb.LogEntry{ + TsNs: logEntry.TsNs, + Key: logEntry.Key, + Data: logEntry.Data, + PartitionKeyHash: uint32(logEntry.PartitionKeyHash), + }, + EndOfStream: false, + }) + + if err != nil { + glog.Errorf("Failed to stream message: %v", err) + return true, err // Stop streaming on error + } + + messageCount++ + return false, nil + }) + + // Handle collection errors + if err != nil && err != log_buffer.ResumeFromDiskError { + streamErr := stream.Send(&mq_pb.GetUnflushedMessagesResponse{ + Error: fmt.Sprintf("failed to stream unflushed messages: %v", err), + EndOfStream: true, + }) + if streamErr != nil { + glog.Errorf("Failed to send error response: %v", streamErr) + } + return err + } + + // Send end-of-stream marker + err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{ + EndOfStream: true, + }) + + if err != nil { + glog.Errorf("Failed to send end-of-stream marker: %v", err) + return err + } + + glog.V(1).Infof("Streamed %d unflushed messages for %v %v", messageCount, t, partition) + return nil +} + +// buildBufferStartDeduplicationMap scans log files to build a map of buffer ranges +// that have been flushed to disk, using the buffer_start metadata +func (b *MessageQueueBroker) buildBufferStartDeduplicationMap(partitionDir string) ([]BufferRange, error) { + var flushedRanges []BufferRange + + // List all files in the partition directory using filer client accessor + err := b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory { + return nil + } + + // Skip Parquet files - they don't represent buffer ranges + if strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + + // Skip offset files + if strings.HasSuffix(entry.Name, ".offset") { + return nil + } + + // Get buffer start for this file + bufferStart, err := b.getLogBufferStartFromFile(entry) + if err != nil { + glog.V(2).Infof("Failed to get buffer start from file %s: %v", entry.Name, err) + return nil // Continue with other files + } + + if bufferStart == nil { + // File has no buffer metadata - skip deduplication for this file + glog.V(2).Infof("File %s has no buffer_start metadata", entry.Name) + return nil + } + + // Calculate the buffer range covered by this file + chunkCount := int64(len(entry.GetChunks())) + if chunkCount > 0 { + fileRange := BufferRange{ + start: bufferStart.StartIndex, + end: bufferStart.StartIndex + chunkCount - 1, + } + flushedRanges = append(flushedRanges, fileRange) + glog.V(3).Infof("File %s covers buffer range [%d-%d]", entry.Name, fileRange.start, fileRange.end) + } + + return nil + }, "", true, 1000) + }) + + if err != nil { + return flushedRanges, fmt.Errorf("failed to list partition directory %s: %v", partitionDir, err) + } + + return flushedRanges, nil +} + +// getLogBufferStartFromFile extracts LogBufferStart metadata from a log file +func (b *MessageQueueBroker) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBufferStart, error) { + if entry.Extended == nil { + return nil, nil + } + + // Only support buffer_start format + if startJson, exists := entry.Extended["buffer_start"]; exists { + var bufferStart LogBufferStart + if err := json.Unmarshal(startJson, &bufferStart); err != nil { + return nil, fmt.Errorf("failed to parse buffer start: %v", err) + } + return &bufferStart, nil + } + + return nil, nil +} + +// isBufferIndexFlushed checks if a buffer index is covered by any of the flushed ranges +func (b *MessageQueueBroker) isBufferIndexFlushed(bufferIndex int64, flushedRanges []BufferRange) bool { + for _, flushedRange := range flushedRanges { + if bufferIndex >= flushedRange.start && bufferIndex <= flushedRange.end { + return true + } + } + return false +} diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go index 4f4ead23f..c1b22bff9 100644 --- a/weed/mq/broker/broker_write.go +++ b/weed/mq/broker/broker_write.go @@ -13,11 +13,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) -// LogBufferStart tracks the starting buffer index for a file -type LogBufferStart struct { - StartIndex int64 `json:"start_index"` // Starting buffer index (count = len(chunks)) -} - func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error { return b.appendToFileWithBufferIndex(targetFile, data, 0) } diff --git a/weed/pb/mq_broker.proto b/weed/pb/mq_broker.proto index 1c9619d48..93fe8940d 100644 --- a/weed/pb/mq_broker.proto +++ b/weed/pb/mq_broker.proto @@ -58,6 +58,10 @@ service SeaweedMessaging { } rpc SubscribeFollowMe (stream SubscribeFollowMeRequest) returns (SubscribeFollowMeResponse) { } + + // SQL query support - get unflushed messages from broker's in-memory buffer (streaming) + rpc GetUnflushedMessages (GetUnflushedMessagesRequest) returns (stream GetUnflushedMessagesResponse) { + } } ////////////////////////////////////////////////// @@ -350,3 +354,28 @@ message CloseSubscribersRequest { } message CloseSubscribersResponse { } + +////////////////////////////////////////////////// +// SQL query support messages + +message GetUnflushedMessagesRequest { + schema_pb.Topic topic = 1; + schema_pb.Partition partition = 2; + oneof start_filter { + int64 start_time_ns = 3; // Filter by timestamp (messages after this time) + int64 start_buffer_index = 4; // Filter by buffer index (messages from buffers >= this index) + } +} + +message GetUnflushedMessagesResponse { + LogEntry message = 1; // Single message per response (streaming) + string error = 2; // Error message if any + bool end_of_stream = 3; // Indicates this is the final response +} + +message LogEntry { + int64 ts_ns = 1; + bytes key = 2; + bytes data = 3; + uint32 partition_key_hash = 4; +} diff --git a/weed/pb/mq_pb/mq_broker.pb.go b/weed/pb/mq_pb/mq_broker.pb.go index 355b02fcb..e9ae81d09 100644 --- a/weed/pb/mq_pb/mq_broker.pb.go +++ b/weed/pb/mq_pb/mq_broker.pb.go @@ -2573,6 +2573,232 @@ func (*CloseSubscribersResponse) Descriptor() ([]byte, []int) { return file_mq_broker_proto_rawDescGZIP(), []int{41} } +type GetUnflushedMessagesRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition *schema_pb.Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` + // Types that are valid to be assigned to StartFilter: + // + // *GetUnflushedMessagesRequest_StartTimeNs + // *GetUnflushedMessagesRequest_StartBufferIndex + StartFilter isGetUnflushedMessagesRequest_StartFilter `protobuf_oneof:"start_filter"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetUnflushedMessagesRequest) Reset() { + *x = GetUnflushedMessagesRequest{} + mi := &file_mq_broker_proto_msgTypes[42] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetUnflushedMessagesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetUnflushedMessagesRequest) ProtoMessage() {} + +func (x *GetUnflushedMessagesRequest) ProtoReflect() protoreflect.Message { + mi := &file_mq_broker_proto_msgTypes[42] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetUnflushedMessagesRequest.ProtoReflect.Descriptor instead. +func (*GetUnflushedMessagesRequest) Descriptor() ([]byte, []int) { + return file_mq_broker_proto_rawDescGZIP(), []int{42} +} + +func (x *GetUnflushedMessagesRequest) GetTopic() *schema_pb.Topic { + if x != nil { + return x.Topic + } + return nil +} + +func (x *GetUnflushedMessagesRequest) GetPartition() *schema_pb.Partition { + if x != nil { + return x.Partition + } + return nil +} + +func (x *GetUnflushedMessagesRequest) GetStartFilter() isGetUnflushedMessagesRequest_StartFilter { + if x != nil { + return x.StartFilter + } + return nil +} + +func (x *GetUnflushedMessagesRequest) GetStartTimeNs() int64 { + if x != nil { + if x, ok := x.StartFilter.(*GetUnflushedMessagesRequest_StartTimeNs); ok { + return x.StartTimeNs + } + } + return 0 +} + +func (x *GetUnflushedMessagesRequest) GetStartBufferIndex() int64 { + if x != nil { + if x, ok := x.StartFilter.(*GetUnflushedMessagesRequest_StartBufferIndex); ok { + return x.StartBufferIndex + } + } + return 0 +} + +type isGetUnflushedMessagesRequest_StartFilter interface { + isGetUnflushedMessagesRequest_StartFilter() +} + +type GetUnflushedMessagesRequest_StartTimeNs struct { + StartTimeNs int64 `protobuf:"varint,3,opt,name=start_time_ns,json=startTimeNs,proto3,oneof"` // Filter by timestamp (messages after this time) +} + +type GetUnflushedMessagesRequest_StartBufferIndex struct { + StartBufferIndex int64 `protobuf:"varint,4,opt,name=start_buffer_index,json=startBufferIndex,proto3,oneof"` // Filter by buffer index (messages from buffers >= this index) +} + +func (*GetUnflushedMessagesRequest_StartTimeNs) isGetUnflushedMessagesRequest_StartFilter() {} + +func (*GetUnflushedMessagesRequest_StartBufferIndex) isGetUnflushedMessagesRequest_StartFilter() {} + +type GetUnflushedMessagesResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Message *LogEntry `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` // Single message per response (streaming) + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` // Error message if any + EndOfStream bool `protobuf:"varint,3,opt,name=end_of_stream,json=endOfStream,proto3" json:"end_of_stream,omitempty"` // Indicates this is the final response + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetUnflushedMessagesResponse) Reset() { + *x = GetUnflushedMessagesResponse{} + mi := &file_mq_broker_proto_msgTypes[43] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetUnflushedMessagesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetUnflushedMessagesResponse) ProtoMessage() {} + +func (x *GetUnflushedMessagesResponse) ProtoReflect() protoreflect.Message { + mi := &file_mq_broker_proto_msgTypes[43] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetUnflushedMessagesResponse.ProtoReflect.Descriptor instead. +func (*GetUnflushedMessagesResponse) Descriptor() ([]byte, []int) { + return file_mq_broker_proto_rawDescGZIP(), []int{43} +} + +func (x *GetUnflushedMessagesResponse) GetMessage() *LogEntry { + if x != nil { + return x.Message + } + return nil +} + +func (x *GetUnflushedMessagesResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *GetUnflushedMessagesResponse) GetEndOfStream() bool { + if x != nil { + return x.EndOfStream + } + return false +} + +type LogEntry struct { + state protoimpl.MessageState `protogen:"open.v1"` + TsNs int64 `protobuf:"varint,1,opt,name=ts_ns,json=tsNs,proto3" json:"ts_ns,omitempty"` + Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` + Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` + PartitionKeyHash uint32 `protobuf:"varint,4,opt,name=partition_key_hash,json=partitionKeyHash,proto3" json:"partition_key_hash,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *LogEntry) Reset() { + *x = LogEntry{} + mi := &file_mq_broker_proto_msgTypes[44] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *LogEntry) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogEntry) ProtoMessage() {} + +func (x *LogEntry) ProtoReflect() protoreflect.Message { + mi := &file_mq_broker_proto_msgTypes[44] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogEntry.ProtoReflect.Descriptor instead. +func (*LogEntry) Descriptor() ([]byte, []int) { + return file_mq_broker_proto_rawDescGZIP(), []int{44} +} + +func (x *LogEntry) GetTsNs() int64 { + if x != nil { + return x.TsNs + } + return 0 +} + +func (x *LogEntry) GetKey() []byte { + if x != nil { + return x.Key + } + return nil +} + +func (x *LogEntry) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *LogEntry) GetPartitionKeyHash() uint32 { + if x != nil { + return x.PartitionKeyHash + } + return 0 +} + type PublisherToPubBalancerRequest_InitMessage struct { state protoimpl.MessageState `protogen:"open.v1"` Broker string `protobuf:"bytes,1,opt,name=broker,proto3" json:"broker,omitempty"` @@ -2582,7 +2808,7 @@ type PublisherToPubBalancerRequest_InitMessage struct { func (x *PublisherToPubBalancerRequest_InitMessage) Reset() { *x = PublisherToPubBalancerRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[43] + mi := &file_mq_broker_proto_msgTypes[46] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2594,7 +2820,7 @@ func (x *PublisherToPubBalancerRequest_InitMessage) String() string { func (*PublisherToPubBalancerRequest_InitMessage) ProtoMessage() {} func (x *PublisherToPubBalancerRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[43] + mi := &file_mq_broker_proto_msgTypes[46] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2638,7 +2864,7 @@ type SubscriberToSubCoordinatorRequest_InitMessage struct { func (x *SubscriberToSubCoordinatorRequest_InitMessage) Reset() { *x = SubscriberToSubCoordinatorRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[44] + mi := &file_mq_broker_proto_msgTypes[47] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2650,7 +2876,7 @@ func (x *SubscriberToSubCoordinatorRequest_InitMessage) String() string { func (*SubscriberToSubCoordinatorRequest_InitMessage) ProtoMessage() {} func (x *SubscriberToSubCoordinatorRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[44] + mi := &file_mq_broker_proto_msgTypes[47] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2710,7 +2936,7 @@ type SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage struct { func (x *SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) Reset() { *x = SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage{} - mi := &file_mq_broker_proto_msgTypes[45] + mi := &file_mq_broker_proto_msgTypes[48] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2722,7 +2948,7 @@ func (x *SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) String() stri func (*SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) ProtoMessage() {} func (x *SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[45] + mi := &file_mq_broker_proto_msgTypes[48] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2754,7 +2980,7 @@ type SubscriberToSubCoordinatorRequest_AckAssignmentMessage struct { func (x *SubscriberToSubCoordinatorRequest_AckAssignmentMessage) Reset() { *x = SubscriberToSubCoordinatorRequest_AckAssignmentMessage{} - mi := &file_mq_broker_proto_msgTypes[46] + mi := &file_mq_broker_proto_msgTypes[49] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2766,7 +2992,7 @@ func (x *SubscriberToSubCoordinatorRequest_AckAssignmentMessage) String() string func (*SubscriberToSubCoordinatorRequest_AckAssignmentMessage) ProtoMessage() {} func (x *SubscriberToSubCoordinatorRequest_AckAssignmentMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[46] + mi := &file_mq_broker_proto_msgTypes[49] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2798,7 +3024,7 @@ type SubscriberToSubCoordinatorResponse_Assignment struct { func (x *SubscriberToSubCoordinatorResponse_Assignment) Reset() { *x = SubscriberToSubCoordinatorResponse_Assignment{} - mi := &file_mq_broker_proto_msgTypes[47] + mi := &file_mq_broker_proto_msgTypes[50] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2810,7 +3036,7 @@ func (x *SubscriberToSubCoordinatorResponse_Assignment) String() string { func (*SubscriberToSubCoordinatorResponse_Assignment) ProtoMessage() {} func (x *SubscriberToSubCoordinatorResponse_Assignment) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[47] + mi := &file_mq_broker_proto_msgTypes[50] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2842,7 +3068,7 @@ type SubscriberToSubCoordinatorResponse_UnAssignment struct { func (x *SubscriberToSubCoordinatorResponse_UnAssignment) Reset() { *x = SubscriberToSubCoordinatorResponse_UnAssignment{} - mi := &file_mq_broker_proto_msgTypes[48] + mi := &file_mq_broker_proto_msgTypes[51] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2854,7 +3080,7 @@ func (x *SubscriberToSubCoordinatorResponse_UnAssignment) String() string { func (*SubscriberToSubCoordinatorResponse_UnAssignment) ProtoMessage() {} func (x *SubscriberToSubCoordinatorResponse_UnAssignment) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[48] + mi := &file_mq_broker_proto_msgTypes[51] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2890,7 +3116,7 @@ type PublishMessageRequest_InitMessage struct { func (x *PublishMessageRequest_InitMessage) Reset() { *x = PublishMessageRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[49] + mi := &file_mq_broker_proto_msgTypes[52] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2902,7 +3128,7 @@ func (x *PublishMessageRequest_InitMessage) String() string { func (*PublishMessageRequest_InitMessage) ProtoMessage() {} func (x *PublishMessageRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[49] + mi := &file_mq_broker_proto_msgTypes[52] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2963,7 +3189,7 @@ type PublishFollowMeRequest_InitMessage struct { func (x *PublishFollowMeRequest_InitMessage) Reset() { *x = PublishFollowMeRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[50] + mi := &file_mq_broker_proto_msgTypes[53] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2975,7 +3201,7 @@ func (x *PublishFollowMeRequest_InitMessage) String() string { func (*PublishFollowMeRequest_InitMessage) ProtoMessage() {} func (x *PublishFollowMeRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[50] + mi := &file_mq_broker_proto_msgTypes[53] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3014,7 +3240,7 @@ type PublishFollowMeRequest_FlushMessage struct { func (x *PublishFollowMeRequest_FlushMessage) Reset() { *x = PublishFollowMeRequest_FlushMessage{} - mi := &file_mq_broker_proto_msgTypes[51] + mi := &file_mq_broker_proto_msgTypes[54] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3026,7 +3252,7 @@ func (x *PublishFollowMeRequest_FlushMessage) String() string { func (*PublishFollowMeRequest_FlushMessage) ProtoMessage() {} func (x *PublishFollowMeRequest_FlushMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[51] + mi := &file_mq_broker_proto_msgTypes[54] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3057,7 +3283,7 @@ type PublishFollowMeRequest_CloseMessage struct { func (x *PublishFollowMeRequest_CloseMessage) Reset() { *x = PublishFollowMeRequest_CloseMessage{} - mi := &file_mq_broker_proto_msgTypes[52] + mi := &file_mq_broker_proto_msgTypes[55] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3069,7 +3295,7 @@ func (x *PublishFollowMeRequest_CloseMessage) String() string { func (*PublishFollowMeRequest_CloseMessage) ProtoMessage() {} func (x *PublishFollowMeRequest_CloseMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[52] + mi := &file_mq_broker_proto_msgTypes[55] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3102,7 +3328,7 @@ type SubscribeMessageRequest_InitMessage struct { func (x *SubscribeMessageRequest_InitMessage) Reset() { *x = SubscribeMessageRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[53] + mi := &file_mq_broker_proto_msgTypes[56] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3114,7 +3340,7 @@ func (x *SubscribeMessageRequest_InitMessage) String() string { func (*SubscribeMessageRequest_InitMessage) ProtoMessage() {} func (x *SubscribeMessageRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[53] + mi := &file_mq_broker_proto_msgTypes[56] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3203,7 +3429,7 @@ type SubscribeMessageRequest_AckMessage struct { func (x *SubscribeMessageRequest_AckMessage) Reset() { *x = SubscribeMessageRequest_AckMessage{} - mi := &file_mq_broker_proto_msgTypes[54] + mi := &file_mq_broker_proto_msgTypes[57] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3215,7 +3441,7 @@ func (x *SubscribeMessageRequest_AckMessage) String() string { func (*SubscribeMessageRequest_AckMessage) ProtoMessage() {} func (x *SubscribeMessageRequest_AckMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[54] + mi := &file_mq_broker_proto_msgTypes[57] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3256,7 +3482,7 @@ type SubscribeMessageResponse_SubscribeCtrlMessage struct { func (x *SubscribeMessageResponse_SubscribeCtrlMessage) Reset() { *x = SubscribeMessageResponse_SubscribeCtrlMessage{} - mi := &file_mq_broker_proto_msgTypes[55] + mi := &file_mq_broker_proto_msgTypes[58] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3268,7 +3494,7 @@ func (x *SubscribeMessageResponse_SubscribeCtrlMessage) String() string { func (*SubscribeMessageResponse_SubscribeCtrlMessage) ProtoMessage() {} func (x *SubscribeMessageResponse_SubscribeCtrlMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[55] + mi := &file_mq_broker_proto_msgTypes[58] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3316,7 +3542,7 @@ type SubscribeFollowMeRequest_InitMessage struct { func (x *SubscribeFollowMeRequest_InitMessage) Reset() { *x = SubscribeFollowMeRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[56] + mi := &file_mq_broker_proto_msgTypes[59] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3328,7 +3554,7 @@ func (x *SubscribeFollowMeRequest_InitMessage) String() string { func (*SubscribeFollowMeRequest_InitMessage) ProtoMessage() {} func (x *SubscribeFollowMeRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[56] + mi := &file_mq_broker_proto_msgTypes[59] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3374,7 +3600,7 @@ type SubscribeFollowMeRequest_AckMessage struct { func (x *SubscribeFollowMeRequest_AckMessage) Reset() { *x = SubscribeFollowMeRequest_AckMessage{} - mi := &file_mq_broker_proto_msgTypes[57] + mi := &file_mq_broker_proto_msgTypes[60] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3386,7 +3612,7 @@ func (x *SubscribeFollowMeRequest_AckMessage) String() string { func (*SubscribeFollowMeRequest_AckMessage) ProtoMessage() {} func (x *SubscribeFollowMeRequest_AckMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[57] + mi := &file_mq_broker_proto_msgTypes[60] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3417,7 +3643,7 @@ type SubscribeFollowMeRequest_CloseMessage struct { func (x *SubscribeFollowMeRequest_CloseMessage) Reset() { *x = SubscribeFollowMeRequest_CloseMessage{} - mi := &file_mq_broker_proto_msgTypes[58] + mi := &file_mq_broker_proto_msgTypes[61] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3429,7 +3655,7 @@ func (x *SubscribeFollowMeRequest_CloseMessage) String() string { func (*SubscribeFollowMeRequest_CloseMessage) ProtoMessage() {} func (x *SubscribeFollowMeRequest_CloseMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[58] + mi := &file_mq_broker_proto_msgTypes[61] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3669,7 +3895,22 @@ const file_mq_broker_proto_rawDesc = "" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x12 \n" + "\funix_time_ns\x18\x02 \x01(\x03R\n" + "unixTimeNs\"\x1a\n" + - "\x18CloseSubscribersResponse2\x97\x0e\n" + + "\x18CloseSubscribersResponse\"\xdf\x01\n" + + "\x1bGetUnflushedMessagesRequest\x12&\n" + + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x122\n" + + "\tpartition\x18\x02 \x01(\v2\x14.schema_pb.PartitionR\tpartition\x12$\n" + + "\rstart_time_ns\x18\x03 \x01(\x03H\x00R\vstartTimeNs\x12.\n" + + "\x12start_buffer_index\x18\x04 \x01(\x03H\x00R\x10startBufferIndexB\x0e\n" + + "\fstart_filter\"\x8a\x01\n" + + "\x1cGetUnflushedMessagesResponse\x120\n" + + "\amessage\x18\x01 \x01(\v2\x16.messaging_pb.LogEntryR\amessage\x12\x14\n" + + "\x05error\x18\x02 \x01(\tR\x05error\x12\"\n" + + "\rend_of_stream\x18\x03 \x01(\bR\vendOfStream\"s\n" + + "\bLogEntry\x12\x13\n" + + "\x05ts_ns\x18\x01 \x01(\x03R\x04tsNs\x12\x10\n" + + "\x03key\x18\x02 \x01(\fR\x03key\x12\x12\n" + + "\x04data\x18\x03 \x01(\fR\x04data\x12,\n" + + "\x12partition_key_hash\x18\x04 \x01(\rR\x10partitionKeyHash2\x8a\x0f\n" + "\x10SeaweedMessaging\x12c\n" + "\x10FindBrokerLeader\x12%.messaging_pb.FindBrokerLeaderRequest\x1a&.messaging_pb.FindBrokerLeaderResponse\"\x00\x12y\n" + "\x16PublisherToPubBalancer\x12+.messaging_pb.PublisherToPubBalancerRequest\x1a,.messaging_pb.PublisherToPubBalancerResponse\"\x00(\x010\x01\x12Z\n" + @@ -3688,7 +3929,8 @@ const file_mq_broker_proto_rawDesc = "" + "\x0ePublishMessage\x12#.messaging_pb.PublishMessageRequest\x1a$.messaging_pb.PublishMessageResponse\"\x00(\x010\x01\x12g\n" + "\x10SubscribeMessage\x12%.messaging_pb.SubscribeMessageRequest\x1a&.messaging_pb.SubscribeMessageResponse\"\x00(\x010\x01\x12d\n" + "\x0fPublishFollowMe\x12$.messaging_pb.PublishFollowMeRequest\x1a%.messaging_pb.PublishFollowMeResponse\"\x00(\x010\x01\x12h\n" + - "\x11SubscribeFollowMe\x12&.messaging_pb.SubscribeFollowMeRequest\x1a'.messaging_pb.SubscribeFollowMeResponse\"\x00(\x01BO\n" + + "\x11SubscribeFollowMe\x12&.messaging_pb.SubscribeFollowMeRequest\x1a'.messaging_pb.SubscribeFollowMeResponse\"\x00(\x01\x12q\n" + + "\x14GetUnflushedMessages\x12).messaging_pb.GetUnflushedMessagesRequest\x1a*.messaging_pb.GetUnflushedMessagesResponse\"\x000\x01BO\n" + "\fseaweedfs.mqB\x11MessageQueueProtoZ,github.com/seaweedfs/seaweedfs/weed/pb/mq_pbb\x06proto3" var ( @@ -3703,7 +3945,7 @@ func file_mq_broker_proto_rawDescGZIP() []byte { return file_mq_broker_proto_rawDescData } -var file_mq_broker_proto_msgTypes = make([]protoimpl.MessageInfo, 59) +var file_mq_broker_proto_msgTypes = make([]protoimpl.MessageInfo, 62) var file_mq_broker_proto_goTypes = []any{ (*FindBrokerLeaderRequest)(nil), // 0: messaging_pb.FindBrokerLeaderRequest (*FindBrokerLeaderResponse)(nil), // 1: messaging_pb.FindBrokerLeaderResponse @@ -3747,134 +3989,142 @@ var file_mq_broker_proto_goTypes = []any{ (*ClosePublishersResponse)(nil), // 39: messaging_pb.ClosePublishersResponse (*CloseSubscribersRequest)(nil), // 40: messaging_pb.CloseSubscribersRequest (*CloseSubscribersResponse)(nil), // 41: messaging_pb.CloseSubscribersResponse - nil, // 42: messaging_pb.BrokerStats.StatsEntry - (*PublisherToPubBalancerRequest_InitMessage)(nil), // 43: messaging_pb.PublisherToPubBalancerRequest.InitMessage - (*SubscriberToSubCoordinatorRequest_InitMessage)(nil), // 44: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage - (*SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage)(nil), // 45: messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage - (*SubscriberToSubCoordinatorRequest_AckAssignmentMessage)(nil), // 46: messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage - (*SubscriberToSubCoordinatorResponse_Assignment)(nil), // 47: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment - (*SubscriberToSubCoordinatorResponse_UnAssignment)(nil), // 48: messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment - (*PublishMessageRequest_InitMessage)(nil), // 49: messaging_pb.PublishMessageRequest.InitMessage - (*PublishFollowMeRequest_InitMessage)(nil), // 50: messaging_pb.PublishFollowMeRequest.InitMessage - (*PublishFollowMeRequest_FlushMessage)(nil), // 51: messaging_pb.PublishFollowMeRequest.FlushMessage - (*PublishFollowMeRequest_CloseMessage)(nil), // 52: messaging_pb.PublishFollowMeRequest.CloseMessage - (*SubscribeMessageRequest_InitMessage)(nil), // 53: messaging_pb.SubscribeMessageRequest.InitMessage - (*SubscribeMessageRequest_AckMessage)(nil), // 54: messaging_pb.SubscribeMessageRequest.AckMessage - (*SubscribeMessageResponse_SubscribeCtrlMessage)(nil), // 55: messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage - (*SubscribeFollowMeRequest_InitMessage)(nil), // 56: messaging_pb.SubscribeFollowMeRequest.InitMessage - (*SubscribeFollowMeRequest_AckMessage)(nil), // 57: messaging_pb.SubscribeFollowMeRequest.AckMessage - (*SubscribeFollowMeRequest_CloseMessage)(nil), // 58: messaging_pb.SubscribeFollowMeRequest.CloseMessage - (*schema_pb.Topic)(nil), // 59: schema_pb.Topic - (*schema_pb.Partition)(nil), // 60: schema_pb.Partition - (*schema_pb.RecordType)(nil), // 61: schema_pb.RecordType - (*schema_pb.PartitionOffset)(nil), // 62: schema_pb.PartitionOffset - (schema_pb.OffsetType)(0), // 63: schema_pb.OffsetType + (*GetUnflushedMessagesRequest)(nil), // 42: messaging_pb.GetUnflushedMessagesRequest + (*GetUnflushedMessagesResponse)(nil), // 43: messaging_pb.GetUnflushedMessagesResponse + (*LogEntry)(nil), // 44: messaging_pb.LogEntry + nil, // 45: messaging_pb.BrokerStats.StatsEntry + (*PublisherToPubBalancerRequest_InitMessage)(nil), // 46: messaging_pb.PublisherToPubBalancerRequest.InitMessage + (*SubscriberToSubCoordinatorRequest_InitMessage)(nil), // 47: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage + (*SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage)(nil), // 48: messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage + (*SubscriberToSubCoordinatorRequest_AckAssignmentMessage)(nil), // 49: messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage + (*SubscriberToSubCoordinatorResponse_Assignment)(nil), // 50: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment + (*SubscriberToSubCoordinatorResponse_UnAssignment)(nil), // 51: messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment + (*PublishMessageRequest_InitMessage)(nil), // 52: messaging_pb.PublishMessageRequest.InitMessage + (*PublishFollowMeRequest_InitMessage)(nil), // 53: messaging_pb.PublishFollowMeRequest.InitMessage + (*PublishFollowMeRequest_FlushMessage)(nil), // 54: messaging_pb.PublishFollowMeRequest.FlushMessage + (*PublishFollowMeRequest_CloseMessage)(nil), // 55: messaging_pb.PublishFollowMeRequest.CloseMessage + (*SubscribeMessageRequest_InitMessage)(nil), // 56: messaging_pb.SubscribeMessageRequest.InitMessage + (*SubscribeMessageRequest_AckMessage)(nil), // 57: messaging_pb.SubscribeMessageRequest.AckMessage + (*SubscribeMessageResponse_SubscribeCtrlMessage)(nil), // 58: messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage + (*SubscribeFollowMeRequest_InitMessage)(nil), // 59: messaging_pb.SubscribeFollowMeRequest.InitMessage + (*SubscribeFollowMeRequest_AckMessage)(nil), // 60: messaging_pb.SubscribeFollowMeRequest.AckMessage + (*SubscribeFollowMeRequest_CloseMessage)(nil), // 61: messaging_pb.SubscribeFollowMeRequest.CloseMessage + (*schema_pb.Topic)(nil), // 62: schema_pb.Topic + (*schema_pb.Partition)(nil), // 63: schema_pb.Partition + (*schema_pb.RecordType)(nil), // 64: schema_pb.RecordType + (*schema_pb.PartitionOffset)(nil), // 65: schema_pb.PartitionOffset + (schema_pb.OffsetType)(0), // 66: schema_pb.OffsetType } var file_mq_broker_proto_depIdxs = []int32{ - 42, // 0: messaging_pb.BrokerStats.stats:type_name -> messaging_pb.BrokerStats.StatsEntry - 59, // 1: messaging_pb.TopicPartitionStats.topic:type_name -> schema_pb.Topic - 60, // 2: messaging_pb.TopicPartitionStats.partition:type_name -> schema_pb.Partition - 43, // 3: messaging_pb.PublisherToPubBalancerRequest.init:type_name -> messaging_pb.PublisherToPubBalancerRequest.InitMessage + 45, // 0: messaging_pb.BrokerStats.stats:type_name -> messaging_pb.BrokerStats.StatsEntry + 62, // 1: messaging_pb.TopicPartitionStats.topic:type_name -> schema_pb.Topic + 63, // 2: messaging_pb.TopicPartitionStats.partition:type_name -> schema_pb.Partition + 46, // 3: messaging_pb.PublisherToPubBalancerRequest.init:type_name -> messaging_pb.PublisherToPubBalancerRequest.InitMessage 2, // 4: messaging_pb.PublisherToPubBalancerRequest.stats:type_name -> messaging_pb.BrokerStats - 59, // 5: messaging_pb.ConfigureTopicRequest.topic:type_name -> schema_pb.Topic - 61, // 6: messaging_pb.ConfigureTopicRequest.record_type:type_name -> schema_pb.RecordType + 62, // 5: messaging_pb.ConfigureTopicRequest.topic:type_name -> schema_pb.Topic + 64, // 6: messaging_pb.ConfigureTopicRequest.record_type:type_name -> schema_pb.RecordType 8, // 7: messaging_pb.ConfigureTopicRequest.retention:type_name -> messaging_pb.TopicRetention 15, // 8: messaging_pb.ConfigureTopicResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment - 61, // 9: messaging_pb.ConfigureTopicResponse.record_type:type_name -> schema_pb.RecordType + 64, // 9: messaging_pb.ConfigureTopicResponse.record_type:type_name -> schema_pb.RecordType 8, // 10: messaging_pb.ConfigureTopicResponse.retention:type_name -> messaging_pb.TopicRetention - 59, // 11: messaging_pb.ListTopicsResponse.topics:type_name -> schema_pb.Topic - 59, // 12: messaging_pb.LookupTopicBrokersRequest.topic:type_name -> schema_pb.Topic - 59, // 13: messaging_pb.LookupTopicBrokersResponse.topic:type_name -> schema_pb.Topic + 62, // 11: messaging_pb.ListTopicsResponse.topics:type_name -> schema_pb.Topic + 62, // 12: messaging_pb.LookupTopicBrokersRequest.topic:type_name -> schema_pb.Topic + 62, // 13: messaging_pb.LookupTopicBrokersResponse.topic:type_name -> schema_pb.Topic 15, // 14: messaging_pb.LookupTopicBrokersResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment - 60, // 15: messaging_pb.BrokerPartitionAssignment.partition:type_name -> schema_pb.Partition - 59, // 16: messaging_pb.GetTopicConfigurationRequest.topic:type_name -> schema_pb.Topic - 59, // 17: messaging_pb.GetTopicConfigurationResponse.topic:type_name -> schema_pb.Topic - 61, // 18: messaging_pb.GetTopicConfigurationResponse.record_type:type_name -> schema_pb.RecordType + 63, // 15: messaging_pb.BrokerPartitionAssignment.partition:type_name -> schema_pb.Partition + 62, // 16: messaging_pb.GetTopicConfigurationRequest.topic:type_name -> schema_pb.Topic + 62, // 17: messaging_pb.GetTopicConfigurationResponse.topic:type_name -> schema_pb.Topic + 64, // 18: messaging_pb.GetTopicConfigurationResponse.record_type:type_name -> schema_pb.RecordType 15, // 19: messaging_pb.GetTopicConfigurationResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment 8, // 20: messaging_pb.GetTopicConfigurationResponse.retention:type_name -> messaging_pb.TopicRetention - 59, // 21: messaging_pb.GetTopicPublishersRequest.topic:type_name -> schema_pb.Topic + 62, // 21: messaging_pb.GetTopicPublishersRequest.topic:type_name -> schema_pb.Topic 22, // 22: messaging_pb.GetTopicPublishersResponse.publishers:type_name -> messaging_pb.TopicPublisher - 59, // 23: messaging_pb.GetTopicSubscribersRequest.topic:type_name -> schema_pb.Topic + 62, // 23: messaging_pb.GetTopicSubscribersRequest.topic:type_name -> schema_pb.Topic 23, // 24: messaging_pb.GetTopicSubscribersResponse.subscribers:type_name -> messaging_pb.TopicSubscriber - 60, // 25: messaging_pb.TopicPublisher.partition:type_name -> schema_pb.Partition - 60, // 26: messaging_pb.TopicSubscriber.partition:type_name -> schema_pb.Partition - 59, // 27: messaging_pb.AssignTopicPartitionsRequest.topic:type_name -> schema_pb.Topic + 63, // 25: messaging_pb.TopicPublisher.partition:type_name -> schema_pb.Partition + 63, // 26: messaging_pb.TopicSubscriber.partition:type_name -> schema_pb.Partition + 62, // 27: messaging_pb.AssignTopicPartitionsRequest.topic:type_name -> schema_pb.Topic 15, // 28: messaging_pb.AssignTopicPartitionsRequest.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment - 44, // 29: messaging_pb.SubscriberToSubCoordinatorRequest.init:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage - 46, // 30: messaging_pb.SubscriberToSubCoordinatorRequest.ack_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage - 45, // 31: messaging_pb.SubscriberToSubCoordinatorRequest.ack_un_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage - 47, // 32: messaging_pb.SubscriberToSubCoordinatorResponse.assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.Assignment - 48, // 33: messaging_pb.SubscriberToSubCoordinatorResponse.un_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment + 47, // 29: messaging_pb.SubscriberToSubCoordinatorRequest.init:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage + 49, // 30: messaging_pb.SubscriberToSubCoordinatorRequest.ack_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage + 48, // 31: messaging_pb.SubscriberToSubCoordinatorRequest.ack_un_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage + 50, // 32: messaging_pb.SubscriberToSubCoordinatorResponse.assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.Assignment + 51, // 33: messaging_pb.SubscriberToSubCoordinatorResponse.un_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment 28, // 34: messaging_pb.DataMessage.ctrl:type_name -> messaging_pb.ControlMessage - 49, // 35: messaging_pb.PublishMessageRequest.init:type_name -> messaging_pb.PublishMessageRequest.InitMessage + 52, // 35: messaging_pb.PublishMessageRequest.init:type_name -> messaging_pb.PublishMessageRequest.InitMessage 29, // 36: messaging_pb.PublishMessageRequest.data:type_name -> messaging_pb.DataMessage - 50, // 37: messaging_pb.PublishFollowMeRequest.init:type_name -> messaging_pb.PublishFollowMeRequest.InitMessage + 53, // 37: messaging_pb.PublishFollowMeRequest.init:type_name -> messaging_pb.PublishFollowMeRequest.InitMessage 29, // 38: messaging_pb.PublishFollowMeRequest.data:type_name -> messaging_pb.DataMessage - 51, // 39: messaging_pb.PublishFollowMeRequest.flush:type_name -> messaging_pb.PublishFollowMeRequest.FlushMessage - 52, // 40: messaging_pb.PublishFollowMeRequest.close:type_name -> messaging_pb.PublishFollowMeRequest.CloseMessage - 53, // 41: messaging_pb.SubscribeMessageRequest.init:type_name -> messaging_pb.SubscribeMessageRequest.InitMessage - 54, // 42: messaging_pb.SubscribeMessageRequest.ack:type_name -> messaging_pb.SubscribeMessageRequest.AckMessage - 55, // 43: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage + 54, // 39: messaging_pb.PublishFollowMeRequest.flush:type_name -> messaging_pb.PublishFollowMeRequest.FlushMessage + 55, // 40: messaging_pb.PublishFollowMeRequest.close:type_name -> messaging_pb.PublishFollowMeRequest.CloseMessage + 56, // 41: messaging_pb.SubscribeMessageRequest.init:type_name -> messaging_pb.SubscribeMessageRequest.InitMessage + 57, // 42: messaging_pb.SubscribeMessageRequest.ack:type_name -> messaging_pb.SubscribeMessageRequest.AckMessage + 58, // 43: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage 29, // 44: messaging_pb.SubscribeMessageResponse.data:type_name -> messaging_pb.DataMessage - 56, // 45: messaging_pb.SubscribeFollowMeRequest.init:type_name -> messaging_pb.SubscribeFollowMeRequest.InitMessage - 57, // 46: messaging_pb.SubscribeFollowMeRequest.ack:type_name -> messaging_pb.SubscribeFollowMeRequest.AckMessage - 58, // 47: messaging_pb.SubscribeFollowMeRequest.close:type_name -> messaging_pb.SubscribeFollowMeRequest.CloseMessage - 59, // 48: messaging_pb.ClosePublishersRequest.topic:type_name -> schema_pb.Topic - 59, // 49: messaging_pb.CloseSubscribersRequest.topic:type_name -> schema_pb.Topic - 3, // 50: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats - 59, // 51: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> schema_pb.Topic - 60, // 52: messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage.partition:type_name -> schema_pb.Partition - 60, // 53: messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage.partition:type_name -> schema_pb.Partition - 15, // 54: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.partition_assignment:type_name -> messaging_pb.BrokerPartitionAssignment - 60, // 55: messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment.partition:type_name -> schema_pb.Partition - 59, // 56: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic - 60, // 57: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> schema_pb.Partition - 59, // 58: messaging_pb.PublishFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic - 60, // 59: messaging_pb.PublishFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition - 59, // 60: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic - 62, // 61: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> schema_pb.PartitionOffset - 63, // 62: messaging_pb.SubscribeMessageRequest.InitMessage.offset_type:type_name -> schema_pb.OffsetType - 59, // 63: messaging_pb.SubscribeFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic - 60, // 64: messaging_pb.SubscribeFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition - 0, // 65: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest - 4, // 66: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest - 6, // 67: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest - 11, // 68: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest - 9, // 69: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest - 13, // 70: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest - 16, // 71: messaging_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> messaging_pb.GetTopicConfigurationRequest - 18, // 72: messaging_pb.SeaweedMessaging.GetTopicPublishers:input_type -> messaging_pb.GetTopicPublishersRequest - 20, // 73: messaging_pb.SeaweedMessaging.GetTopicSubscribers:input_type -> messaging_pb.GetTopicSubscribersRequest - 24, // 74: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest - 38, // 75: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest - 40, // 76: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest - 26, // 77: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest - 30, // 78: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest - 34, // 79: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest - 32, // 80: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest - 36, // 81: messaging_pb.SeaweedMessaging.SubscribeFollowMe:input_type -> messaging_pb.SubscribeFollowMeRequest - 1, // 82: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse - 5, // 83: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse - 7, // 84: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse - 12, // 85: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse - 10, // 86: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse - 14, // 87: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse - 17, // 88: messaging_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> messaging_pb.GetTopicConfigurationResponse - 19, // 89: messaging_pb.SeaweedMessaging.GetTopicPublishers:output_type -> messaging_pb.GetTopicPublishersResponse - 21, // 90: messaging_pb.SeaweedMessaging.GetTopicSubscribers:output_type -> messaging_pb.GetTopicSubscribersResponse - 25, // 91: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse - 39, // 92: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse - 41, // 93: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse - 27, // 94: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse - 31, // 95: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse - 35, // 96: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse - 33, // 97: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse - 37, // 98: messaging_pb.SeaweedMessaging.SubscribeFollowMe:output_type -> messaging_pb.SubscribeFollowMeResponse - 82, // [82:99] is the sub-list for method output_type - 65, // [65:82] is the sub-list for method input_type - 65, // [65:65] is the sub-list for extension type_name - 65, // [65:65] is the sub-list for extension extendee - 0, // [0:65] is the sub-list for field type_name + 59, // 45: messaging_pb.SubscribeFollowMeRequest.init:type_name -> messaging_pb.SubscribeFollowMeRequest.InitMessage + 60, // 46: messaging_pb.SubscribeFollowMeRequest.ack:type_name -> messaging_pb.SubscribeFollowMeRequest.AckMessage + 61, // 47: messaging_pb.SubscribeFollowMeRequest.close:type_name -> messaging_pb.SubscribeFollowMeRequest.CloseMessage + 62, // 48: messaging_pb.ClosePublishersRequest.topic:type_name -> schema_pb.Topic + 62, // 49: messaging_pb.CloseSubscribersRequest.topic:type_name -> schema_pb.Topic + 62, // 50: messaging_pb.GetUnflushedMessagesRequest.topic:type_name -> schema_pb.Topic + 63, // 51: messaging_pb.GetUnflushedMessagesRequest.partition:type_name -> schema_pb.Partition + 44, // 52: messaging_pb.GetUnflushedMessagesResponse.message:type_name -> messaging_pb.LogEntry + 3, // 53: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats + 62, // 54: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> schema_pb.Topic + 63, // 55: messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage.partition:type_name -> schema_pb.Partition + 63, // 56: messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage.partition:type_name -> schema_pb.Partition + 15, // 57: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.partition_assignment:type_name -> messaging_pb.BrokerPartitionAssignment + 63, // 58: messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment.partition:type_name -> schema_pb.Partition + 62, // 59: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic + 63, // 60: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> schema_pb.Partition + 62, // 61: messaging_pb.PublishFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic + 63, // 62: messaging_pb.PublishFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition + 62, // 63: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic + 65, // 64: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> schema_pb.PartitionOffset + 66, // 65: messaging_pb.SubscribeMessageRequest.InitMessage.offset_type:type_name -> schema_pb.OffsetType + 62, // 66: messaging_pb.SubscribeFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic + 63, // 67: messaging_pb.SubscribeFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition + 0, // 68: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest + 4, // 69: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest + 6, // 70: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest + 11, // 71: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest + 9, // 72: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest + 13, // 73: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest + 16, // 74: messaging_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> messaging_pb.GetTopicConfigurationRequest + 18, // 75: messaging_pb.SeaweedMessaging.GetTopicPublishers:input_type -> messaging_pb.GetTopicPublishersRequest + 20, // 76: messaging_pb.SeaweedMessaging.GetTopicSubscribers:input_type -> messaging_pb.GetTopicSubscribersRequest + 24, // 77: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest + 38, // 78: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest + 40, // 79: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest + 26, // 80: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest + 30, // 81: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest + 34, // 82: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest + 32, // 83: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest + 36, // 84: messaging_pb.SeaweedMessaging.SubscribeFollowMe:input_type -> messaging_pb.SubscribeFollowMeRequest + 42, // 85: messaging_pb.SeaweedMessaging.GetUnflushedMessages:input_type -> messaging_pb.GetUnflushedMessagesRequest + 1, // 86: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse + 5, // 87: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse + 7, // 88: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse + 12, // 89: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse + 10, // 90: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse + 14, // 91: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse + 17, // 92: messaging_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> messaging_pb.GetTopicConfigurationResponse + 19, // 93: messaging_pb.SeaweedMessaging.GetTopicPublishers:output_type -> messaging_pb.GetTopicPublishersResponse + 21, // 94: messaging_pb.SeaweedMessaging.GetTopicSubscribers:output_type -> messaging_pb.GetTopicSubscribersResponse + 25, // 95: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse + 39, // 96: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse + 41, // 97: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse + 27, // 98: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse + 31, // 99: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse + 35, // 100: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse + 33, // 101: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse + 37, // 102: messaging_pb.SeaweedMessaging.SubscribeFollowMe:output_type -> messaging_pb.SubscribeFollowMeResponse + 43, // 103: messaging_pb.SeaweedMessaging.GetUnflushedMessages:output_type -> messaging_pb.GetUnflushedMessagesResponse + 86, // [86:104] is the sub-list for method output_type + 68, // [68:86] is the sub-list for method input_type + 68, // [68:68] is the sub-list for extension type_name + 68, // [68:68] is the sub-list for extension extendee + 0, // [0:68] is the sub-list for field type_name } func init() { file_mq_broker_proto_init() } @@ -3918,13 +4168,17 @@ func file_mq_broker_proto_init() { (*SubscribeFollowMeRequest_Ack)(nil), (*SubscribeFollowMeRequest_Close)(nil), } + file_mq_broker_proto_msgTypes[42].OneofWrappers = []any{ + (*GetUnflushedMessagesRequest_StartTimeNs)(nil), + (*GetUnflushedMessagesRequest_StartBufferIndex)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_mq_broker_proto_rawDesc), len(file_mq_broker_proto_rawDesc)), NumEnums: 0, - NumMessages: 59, + NumMessages: 62, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/pb/mq_pb/mq_broker_grpc.pb.go b/weed/pb/mq_pb/mq_broker_grpc.pb.go index 5241861bc..3a6c6dc59 100644 --- a/weed/pb/mq_pb/mq_broker_grpc.pb.go +++ b/weed/pb/mq_pb/mq_broker_grpc.pb.go @@ -36,6 +36,7 @@ const ( SeaweedMessaging_SubscribeMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/SubscribeMessage" SeaweedMessaging_PublishFollowMe_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishFollowMe" SeaweedMessaging_SubscribeFollowMe_FullMethodName = "/messaging_pb.SeaweedMessaging/SubscribeFollowMe" + SeaweedMessaging_GetUnflushedMessages_FullMethodName = "/messaging_pb.SeaweedMessaging/GetUnflushedMessages" ) // SeaweedMessagingClient is the client API for SeaweedMessaging service. @@ -66,6 +67,8 @@ type SeaweedMessagingClient interface { // The lead broker asks a follower broker to follow itself PublishFollowMe(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[PublishFollowMeRequest, PublishFollowMeResponse], error) SubscribeFollowMe(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[SubscribeFollowMeRequest, SubscribeFollowMeResponse], error) + // SQL query support - get unflushed messages from broker's in-memory buffer (streaming) + GetUnflushedMessages(ctx context.Context, in *GetUnflushedMessagesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[GetUnflushedMessagesResponse], error) } type seaweedMessagingClient struct { @@ -264,6 +267,25 @@ func (c *seaweedMessagingClient) SubscribeFollowMe(ctx context.Context, opts ... // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type SeaweedMessaging_SubscribeFollowMeClient = grpc.ClientStreamingClient[SubscribeFollowMeRequest, SubscribeFollowMeResponse] +func (c *seaweedMessagingClient) GetUnflushedMessages(ctx context.Context, in *GetUnflushedMessagesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[GetUnflushedMessagesResponse], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[6], SeaweedMessaging_GetUnflushedMessages_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[GetUnflushedMessagesRequest, GetUnflushedMessagesResponse]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type SeaweedMessaging_GetUnflushedMessagesClient = grpc.ServerStreamingClient[GetUnflushedMessagesResponse] + // SeaweedMessagingServer is the server API for SeaweedMessaging service. // All implementations must embed UnimplementedSeaweedMessagingServer // for forward compatibility. @@ -292,6 +314,8 @@ type SeaweedMessagingServer interface { // The lead broker asks a follower broker to follow itself PublishFollowMe(grpc.BidiStreamingServer[PublishFollowMeRequest, PublishFollowMeResponse]) error SubscribeFollowMe(grpc.ClientStreamingServer[SubscribeFollowMeRequest, SubscribeFollowMeResponse]) error + // SQL query support - get unflushed messages from broker's in-memory buffer (streaming) + GetUnflushedMessages(*GetUnflushedMessagesRequest, grpc.ServerStreamingServer[GetUnflushedMessagesResponse]) error mustEmbedUnimplementedSeaweedMessagingServer() } @@ -353,6 +377,9 @@ func (UnimplementedSeaweedMessagingServer) PublishFollowMe(grpc.BidiStreamingSer func (UnimplementedSeaweedMessagingServer) SubscribeFollowMe(grpc.ClientStreamingServer[SubscribeFollowMeRequest, SubscribeFollowMeResponse]) error { return status.Errorf(codes.Unimplemented, "method SubscribeFollowMe not implemented") } +func (UnimplementedSeaweedMessagingServer) GetUnflushedMessages(*GetUnflushedMessagesRequest, grpc.ServerStreamingServer[GetUnflushedMessagesResponse]) error { + return status.Errorf(codes.Unimplemented, "method GetUnflushedMessages not implemented") +} func (UnimplementedSeaweedMessagingServer) mustEmbedUnimplementedSeaweedMessagingServer() {} func (UnimplementedSeaweedMessagingServer) testEmbeddedByValue() {} @@ -614,6 +641,17 @@ func _SeaweedMessaging_SubscribeFollowMe_Handler(srv interface{}, stream grpc.Se // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type SeaweedMessaging_SubscribeFollowMeServer = grpc.ClientStreamingServer[SubscribeFollowMeRequest, SubscribeFollowMeResponse] +func _SeaweedMessaging_GetUnflushedMessages_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(GetUnflushedMessagesRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(SeaweedMessagingServer).GetUnflushedMessages(m, &grpc.GenericServerStream[GetUnflushedMessagesRequest, GetUnflushedMessagesResponse]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type SeaweedMessaging_GetUnflushedMessagesServer = grpc.ServerStreamingServer[GetUnflushedMessagesResponse] + // SeaweedMessaging_ServiceDesc is the grpc.ServiceDesc for SeaweedMessaging service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -702,6 +740,11 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{ Handler: _SeaweedMessaging_SubscribeFollowMe_Handler, ClientStreams: true, }, + { + StreamName: "GetUnflushedMessages", + Handler: _SeaweedMessaging_GetUnflushedMessages_Handler, + ServerStreams: true, + }, }, Metadata: "mq_broker.proto", } diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index d6216a04e..d016169a2 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -427,3 +427,86 @@ func (c *BrokerClient) ListTopicPartitions(ctx context.Context, namespace, topic return partitions, nil } + +// GetUnflushedMessages returns only messages that haven't been flushed to disk yet +// Uses buffer_start metadata to determine what data has been persisted vs still in-memory +// This prevents double-counting when combining with disk-based data +func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) { + // Step 1: Find the broker that hosts this partition + if err := c.findBrokerBalancer(); err != nil { + // Return empty slice if we can't find broker - prevents double-counting + return []*filer_pb.LogEntry{}, nil + } + + // Step 2: Connect to broker and call the GetUnflushedMessages gRPC method + conn, err := grpc.Dial(c.brokerAddress, c.grpcDialOption) + if err != nil { + // Return empty slice if connection fails - prevents double-counting + return []*filer_pb.LogEntry{}, nil + } + defer conn.Close() + + client := mq_pb.NewSeaweedMessagingClient(conn) + + // Step 3: Prepare the request using oneof start_filter (timestamp-based) + request := &mq_pb.GetUnflushedMessagesRequest{ + Topic: &schema_pb.Topic{ + Namespace: namespace, + Name: topicName, + }, + Partition: &schema_pb.Partition{ + RingSize: partition.RingSize, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + UnixTimeNs: partition.UnixTimeNs, + }, + StartFilter: &mq_pb.GetUnflushedMessagesRequest_StartTimeNs{ + StartTimeNs: startTimeNs, + }, + // TODO: Could use buffer index filtering for more precision: + // StartFilter: &mq_pb.GetUnflushedMessagesRequest_StartBufferIndex{ + // StartBufferIndex: latestBufferIndex, + // }, + } + + // Step 4: Call the broker streaming API + stream, err := client.GetUnflushedMessages(ctx, request) + if err != nil { + // Return empty slice if gRPC call fails - prevents double-counting + return []*filer_pb.LogEntry{}, nil + } + + // Step 5: Receive streaming responses + var logEntries []*filer_pb.LogEntry + for { + response, err := stream.Recv() + if err != nil { + // End of stream or error - return what we have to prevent double-counting + break + } + + // Handle error messages + if response.Error != "" { + // Log the error but return empty slice - prevents double-counting + // (In debug mode, this would be visible) + return []*filer_pb.LogEntry{}, nil + } + + // Check for end of stream + if response.EndOfStream { + break + } + + // Convert and collect the message + if response.Message != nil { + logEntries = append(logEntries, &filer_pb.LogEntry{ + TsNs: response.Message.TsNs, + Key: response.Message.Key, + Data: response.Message.Data, + PartitionKeyHash: int32(response.Message.PartitionKeyHash), // Convert uint32 to int32 + }) + } + } + + return logEntries, nil +} diff --git a/weed/query/engine/catalog.go b/weed/query/engine/catalog.go index 1dc2ed652..4e21a54fc 100644 --- a/weed/query/engine/catalog.go +++ b/weed/query/engine/catalog.go @@ -7,6 +7,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) @@ -20,6 +21,9 @@ type BrokerClientInterface interface { GetFilerClient() (filer_pb.FilerClient, error) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error DeleteTopic(ctx context.Context, namespace, topicName string) error + // GetUnflushedMessages returns only messages that haven't been flushed to disk yet + // This prevents double-counting when combining with disk-based data + GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) } // SchemaCatalog manages the mapping between MQ topics and SQL tables diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index b0d9d8c9e..73d8efde4 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -165,7 +165,7 @@ func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOpt return results, nil } -// scanUnflushedData queries brokers for unflushed in-memory data +// scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) { var results []HybridScanResult @@ -174,31 +174,33 @@ func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partitio return results, nil } - // Get broker address for this partition - // TODO: Implement proper broker discovery for partition - // For now, assume broker client knows how to reach the right broker - - // Create a temporary slice to collect unflushed messages - unflushedMessages := make([]*mq_pb.DataMessage, 0) - - // We need to call the broker to get unflushed data - // For now, we'll implement this as a best-effort approach - // In a full implementation, this would require a new gRPC method on the broker - // TODO: Implement actual broker gRPC call to get unflushed data + // Step 1: Get unflushed data from broker using our new interface method + // This method uses buffer_start metadata to avoid double-counting + unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs) + if err != nil { + // Log error but don't fail the query - continue with disk data only + if isDebugMode(ctx) { + fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err) + } + return results, nil + } - // Convert unflushed messages to HybridScanResult format - for _, msg := range unflushedMessages { + // Step 2: Process unflushed entries (already deduplicated by broker) + for _, logEntry := range unflushedEntries { // Skip messages outside time range - if options.StartTimeNs > 0 && msg.TsNs < options.StartTimeNs { + if options.StartTimeNs > 0 && logEntry.TsNs < options.StartTimeNs { continue } - if options.StopTimeNs > 0 && msg.TsNs > options.StopTimeNs { + if options.StopTimeNs > 0 && logEntry.TsNs > options.StopTimeNs { continue } - // Convert DataMessage to RecordValue format - recordValue, _, err := hms.convertDataMessageToRecord(msg) + // Convert LogEntry to RecordValue format (same as disk data) + recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry) if err != nil { + if isDebugMode(ctx) { + fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err) + } continue // Skip malformed messages } @@ -207,12 +209,34 @@ func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partitio continue } - // Convert to HybridScanResult + // Extract system columns for result + timestamp := recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value() + key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() + + // Apply column projection + values := make(map[string]*schema_pb.Value) + if len(options.Columns) == 0 { + // Select all columns (excluding system columns from user view) + for name, value := range recordValue.Fields { + if name != SW_COLUMN_NAME_TS && name != SW_COLUMN_NAME_KEY { + values[name] = value + } + } + } else { + // Select specified columns only + for _, columnName := range options.Columns { + if value, exists := recordValue.Fields[columnName]; exists { + values[columnName] = value + } + } + } + + // Create result with proper source tagging result := HybridScanResult{ - Values: recordValue.Fields, - Timestamp: msg.TsNs, - Key: msg.Key, - Source: "in_memory_broker", + Values: values, + Timestamp: timestamp, + Key: key, + Source: "in_memory_broker", // Tag for debugging/analysis } results = append(results, result) @@ -223,6 +247,10 @@ func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partitio } } + if isDebugMode(ctx) { + fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results)) + } + return results, nil } diff --git a/weed/query/engine/mocks_test.go b/weed/query/engine/mocks_test.go index 08246865e..10a01f07c 100644 --- a/weed/query/engine/mocks_test.go +++ b/weed/query/engine/mocks_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" @@ -221,3 +222,24 @@ func (m *MockBrokerClient) DeleteTopic(ctx context.Context, namespace, topicName return nil } + +// GetUnflushedMessages returns mock unflushed data for testing +// Always returns empty slice to simulate safe deduplication behavior +func (m *MockBrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) { + if m.shouldFail { + return nil, fmt.Errorf("mock broker failed to get unflushed messages: %s", m.failMessage) + } + + // For testing, return empty slice to simulate: + // 1. No unflushed data available + // 2. Safe deduplication behavior (prevents double-counting) + // 3. Successful broker communication + // + // In a real implementation, this would: + // - Connect to actual broker + // - Access LocalPartition's LogBuffer + // - Use buffer_start metadata for deduplication + // - Return only truly unflushed messages + + return []*filer_pb.LogEntry{}, nil +}