syntax = "proto3"; package messaging_pb; import "mq_schema.proto"; option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"; option java_package = "seaweedfs.mq"; option java_outer_classname = "MessageQueueProto"; ////////////////////////////////////////////////// service SeaweedMessaging { // control plane rpc FindBrokerLeader (FindBrokerLeaderRequest) returns (FindBrokerLeaderResponse) { } // control plane for balancer rpc PublisherToPubBalancer (stream PublisherToPubBalancerRequest) returns (stream PublisherToPubBalancerResponse) { } rpc BalanceTopics (BalanceTopicsRequest) returns (BalanceTopicsResponse) { } // control plane for topic partitions rpc ListTopics (ListTopicsRequest) returns (ListTopicsResponse) { } rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) { } rpc LookupTopicBrokers (LookupTopicBrokersRequest) returns (LookupTopicBrokersResponse) { } // invoked by the balancer, running on each broker rpc AssignTopicPartitions (AssignTopicPartitionsRequest) returns (AssignTopicPartitionsResponse) { } rpc ClosePublishers(ClosePublishersRequest) returns (ClosePublishersResponse) { } rpc CloseSubscribers(CloseSubscribersRequest) returns (CloseSubscribersResponse) { } // subscriber connects to broker balancer, which coordinates with the subscribers rpc SubscriberToSubCoordinator (stream SubscriberToSubCoordinatorRequest) returns (stream SubscriberToSubCoordinatorResponse) { } // data plane for each topic partition rpc PublishMessage (stream PublishMessageRequest) returns (stream PublishMessageResponse) { } rpc SubscribeMessage (stream SubscribeMessageRequest) returns (stream SubscribeMessageResponse) { } // The lead broker asks a follower broker to follow itself rpc PublishFollowMe (stream PublishFollowMeRequest) returns (stream PublishFollowMeResponse) { } rpc SubscribeFollowMe (stream SubscribeFollowMeRequest) returns (SubscribeFollowMeResponse) { } } ////////////////////////////////////////////////// message FindBrokerLeaderRequest { string filer_group = 1; } message FindBrokerLeaderResponse { string broker = 1; } ////////////////////////////////////////////////// message BrokerStats { int32 cpu_usage_percent = 1; map stats = 2; } message TopicPartitionStats { schema_pb.Topic topic = 1; schema_pb.Partition partition = 2; int32 publisher_count = 3; int32 subscriber_count = 4; string follower = 5; } message PublisherToPubBalancerRequest { message InitMessage { string broker = 1; } oneof message { InitMessage init = 1; BrokerStats stats = 2; } } message PublisherToPubBalancerResponse { } message BalanceTopicsRequest { } message BalanceTopicsResponse { } ////////////////////////////////////////////////// message ConfigureTopicRequest { schema_pb.Topic topic = 1; int32 partition_count = 2; schema_pb.RecordType record_type = 3; } message ConfigureTopicResponse { repeated BrokerPartitionAssignment broker_partition_assignments = 2; schema_pb.RecordType record_type = 3; } message ListTopicsRequest { } message ListTopicsResponse { repeated schema_pb.Topic topics = 1; } message LookupTopicBrokersRequest { schema_pb.Topic topic = 1; } message LookupTopicBrokersResponse { schema_pb.Topic topic = 1; repeated BrokerPartitionAssignment broker_partition_assignments = 2; } message BrokerPartitionAssignment { schema_pb.Partition partition = 1; string leader_broker = 2; string follower_broker = 3; } message AssignTopicPartitionsRequest { schema_pb.Topic topic = 1; repeated BrokerPartitionAssignment broker_partition_assignments = 2; bool is_leader = 3; bool is_draining = 4; } message AssignTopicPartitionsResponse { } message SubscriberToSubCoordinatorRequest { message InitMessage { string consumer_group = 1; string consumer_group_instance_id = 2; schema_pb.Topic topic = 3; // The consumer group instance will be assigned at most max_partition_count partitions. // If the number of partitions is less than the sum of max_partition_count, // the consumer group instance may be assigned partitions less than max_partition_count. // Default is 1. int32 max_partition_count = 4; // If consumer group instance changes, wait for rebalance_seconds before reassigning partitions // Exception: if adding a new consumer group instance and sum of max_partition_count equals the number of partitions, // the rebalance will happen immediately. // Default is 10 seconds. int32 rebalance_seconds = 5; } message AckUnAssignmentMessage { schema_pb.Partition partition = 1; } message AckAssignmentMessage { schema_pb.Partition partition = 1; } oneof message { InitMessage init = 1; AckAssignmentMessage ack_assignment = 2; AckUnAssignmentMessage ack_un_assignment = 3; } } message SubscriberToSubCoordinatorResponse { message Assignment { BrokerPartitionAssignment partition_assignment = 1; } message UnAssignment { schema_pb.Partition partition = 1; } oneof message { Assignment assignment = 1; UnAssignment un_assignment = 2; } } ////////////////////////////////////////////////// message ControlMessage { bool is_close = 1; string publisher_name = 2; } message DataMessage { bytes key = 1; bytes value = 2; int64 ts_ns = 3; ControlMessage ctrl = 4; } message PublishMessageRequest { message InitMessage { schema_pb.Topic topic = 1; schema_pb.Partition partition = 2; int32 ack_interval = 3; string follower_broker = 4; string publisher_name = 5; // for debugging } oneof message { InitMessage init = 1; DataMessage data = 2; } } message PublishMessageResponse { int64 ack_sequence = 1; string error = 2; bool should_close = 3; } message PublishFollowMeRequest { message InitMessage { schema_pb.Topic topic = 1; schema_pb.Partition partition = 2; } message FlushMessage { int64 ts_ns = 1; } message CloseMessage { } oneof message { InitMessage init = 1; DataMessage data = 2; FlushMessage flush = 3; CloseMessage close = 4; } } message PublishFollowMeResponse { int64 ack_ts_ns = 1; } message SubscribeMessageRequest { message InitMessage { string consumer_group = 1; string consumer_id = 2; string client_id = 3; schema_pb.Topic topic = 4; schema_pb.PartitionOffset partition_offset = 5; string filter = 6; string follower_broker = 7; int32 sliding_window_size = 8; } message AckMessage { int64 sequence = 1; bytes key = 2; } oneof message { InitMessage init = 1; AckMessage ack = 2; } } message SubscribeMessageResponse { message SubscribeCtrlMessage { string error = 1; bool is_end_of_stream = 2; bool is_end_of_topic = 3; } oneof message { SubscribeCtrlMessage ctrl = 1; DataMessage data = 2; } } message SubscribeFollowMeRequest { message InitMessage { schema_pb.Topic topic = 1; schema_pb.Partition partition = 2; string consumer_group = 3; } message AckMessage { int64 ts_ns = 1; } message CloseMessage { } oneof message { InitMessage init = 1; AckMessage ack = 2; CloseMessage close = 3; } } message SubscribeFollowMeResponse { int64 ack_ts_ns = 1; } message ClosePublishersRequest { schema_pb.Topic topic = 1; int64 unix_time_ns = 2; } message ClosePublishersResponse { } message CloseSubscribersRequest { schema_pb.Topic topic = 1; int64 unix_time_ns = 2; } message CloseSubscribersResponse { }