syntax = "proto3";

package messaging_pb;

option go_package = "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb";
option java_package = "seaweedfs.client";
option java_outer_classname = "MessagingProto";

//////////////////////////////////////////////////

service SeaweedMessaging {

    rpc Subscribe (stream SubscriberMessage) returns (stream BrokerMessage) {
    }

    rpc Publish (stream PublishRequest) returns (stream PublishResponse) {
    }

    rpc DeleteTopic (DeleteTopicRequest) returns (DeleteTopicResponse) {
    }

    rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) {
    }

    rpc GetTopicConfiguration (GetTopicConfigurationRequest) returns (GetTopicConfigurationResponse) {
    }

    rpc FindBroker (FindBrokerRequest) returns (FindBrokerResponse) {
    }

}

//////////////////////////////////////////////////

message SubscriberMessage {
    message InitMessage {
        string namespace = 1;
        string topic = 2;
        int32 partition = 3;
        enum StartPosition {
            LATEST = 0; // Start at the newest message
            EARLIEST = 1; // Start at the oldest message
            TIMESTAMP = 2; // Start after a specified timestamp, exclusive
        }
        StartPosition startPosition = 4; // Where to begin consuming from
        int64 timestampNs = 5; // timestamp in nano seconds
        string subscriber_id = 6; // uniquely identify a subscriber to track consumption
    }
    InitMessage init = 1;
    message AckMessage {
        int64 message_id = 1;
    }
    AckMessage ack = 2;
    bool is_close = 3;
}

message Message {
    int64 event_time_ns = 1 [jstype = JS_STRING];
    bytes key = 2; // Message key
    bytes value = 3; // Message payload
    map<string, bytes> headers = 4; // Message headers
    bool is_close = 5;
}

message BrokerMessage {
    Message data = 1;
}

message PublishRequest {
    message InitMessage {
        string namespace = 1; // only needed on the initial request
        string topic = 2; // only needed on the initial request
        int32 partition = 3;
    }
    InitMessage init = 1;
    Message data = 2;
}

message PublishResponse {
    message ConfigMessage {
        int32 partition_count = 1;
    }
    ConfigMessage config = 1;
    message RedirectMessage {
        string new_broker = 1;
    }
    RedirectMessage redirect = 2;
    bool is_closed = 3;
}

message DeleteTopicRequest {
    string namespace = 1;
    string topic = 2;
}
message DeleteTopicResponse {
}

message ConfigureTopicRequest {
    string namespace = 1;
    string topic = 2;
    TopicConfiguration configuration = 3;
}
message ConfigureTopicResponse {
}

message GetTopicConfigurationRequest {
    string namespace = 1;
    string topic = 2;
}
message GetTopicConfigurationResponse {
    TopicConfiguration configuration = 1;
}

message FindBrokerRequest {
    string namespace = 1;
    string topic = 2;
    int32 parition = 3;
}

message FindBrokerResponse {
    string broker = 1;
}

message TopicConfiguration {
    int32 partition_count = 1;
    string collection = 2;
    string replication = 3;
    bool is_transient = 4;
    enum Partitioning {
        NonNullKeyHash = 0; // If not null, hash by key value. If null, round robin
        KeyHash = 1; // hash by key value
        RoundRobin = 2; // round robin pick one partition
    }
    Partitioning partitoning = 5;
}