diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go index a5a6e3566..da7f59596 100644 --- a/weed/command/mq_broker.go +++ b/weed/command/mq_broker.go @@ -10,10 +10,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/mq/broker" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -100,7 +100,7 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool { glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err) } grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) - messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs) + mq_pb.RegisterSeaweedMessagingServer(grpcS, qs) reflection.Register(grpcS) grpcS.Serve(grpcL) diff --git a/weed/messaging/broker/broker_append.go b/weed/mq/broker/broker_append.go similarity index 92% rename from weed/messaging/broker/broker_append.go rename to weed/mq/broker/broker_append.go index 9a31a8ac0..4f3af0ff8 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/mq/broker/broker_append.go @@ -10,11 +10,11 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" "github.com/chrislusf/seaweedfs/weed/util" ) -func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error { +func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *mq_pb.TopicConfiguration, data []byte) error { assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data) if err2 != nil { @@ -46,7 +46,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag return nil } -func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) { +func (broker *MessageBroker) assignAndUpload(topicConfig *mq_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) { var assignResult = &operation.AssignResult{} diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/mq/broker/broker_grpc_server.go similarity index 70% rename from weed/messaging/broker/broker_grpc_server.go rename to weed/mq/broker/broker_grpc_server.go index ba141fdd0..9aa9b1908 100644 --- a/weed/messaging/broker/broker_grpc_server.go +++ b/weed/mq/broker/broker_grpc_server.go @@ -6,15 +6,15 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) -func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) { +func (broker *MessageBroker) ConfigureTopic(c context.Context, request *mq_pb.ConfigureTopicRequest) (*mq_pb.ConfigureTopicResponse, error) { panic("implement me") } -func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) { - resp := &messaging_pb.DeleteTopicResponse{} +func (broker *MessageBroker) DeleteTopic(c context.Context, request *mq_pb.DeleteTopicRequest) (*mq_pb.DeleteTopicResponse, error) { + resp := &mq_pb.DeleteTopicResponse{} dir, entry := genTopicDirEntry(request.Namespace, request.Topic) if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil { return nil, err @@ -24,7 +24,7 @@ func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_p return resp, nil } -func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) { +func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *mq_pb.GetTopicConfigurationRequest) (*mq_pb.GetTopicConfigurationResponse, error) { panic("implement me") } diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/mq/broker/broker_grpc_server_discovery.go similarity index 94% rename from weed/messaging/broker/broker_grpc_server_discovery.go rename to weed/mq/broker/broker_grpc_server_discovery.go index 5cd8edd33..0c8d70e68 100644 --- a/weed/messaging/broker/broker_grpc_server_discovery.go +++ b/weed/mq/broker/broker_grpc_server_discovery.go @@ -10,7 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) /* @@ -26,9 +26,9 @@ If one of the pub or sub connects very late, and the system topo changed quite a */ -func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) { +func (broker *MessageBroker) FindBroker(c context.Context, request *mq_pb.FindBrokerRequest) (*mq_pb.FindBrokerResponse, error) { - t := &messaging_pb.FindBrokerResponse{} + t := &mq_pb.FindBrokerResponse{} var peers []string targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition) diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/mq/broker/broker_grpc_server_publish.go similarity index 86% rename from weed/messaging/broker/broker_grpc_server_publish.go rename to weed/mq/broker/broker_grpc_server_publish.go index 6e6b723d1..4ff9ad809 100644 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ b/weed/mq/broker/broker_grpc_server_publish.go @@ -10,10 +10,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) -func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error { +func (broker *MessageBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { // process initial request in, err := stream.Recv() @@ -25,12 +25,12 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis } // TODO look it up - topicConfig := &messaging_pb.TopicConfiguration{ + topicConfig := &mq_pb.TopicConfiguration{ // IsTransient: true, } // send init response - initResponse := &messaging_pb.PublishResponse{ + initResponse := &mq_pb.PublishResponse{ Config: nil, Redirect: nil, } @@ -104,7 +104,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis // send the close ack // println("server send ack closing") - if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil { + if err := stream.Send(&mq_pb.PublishResponse{IsClosed: true}); err != nil { glog.V(0).Infof("err sending close response: %v", err) } return nil diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/mq/broker/broker_grpc_server_subscribe.go similarity index 88% rename from weed/messaging/broker/broker_grpc_server_subscribe.go rename to weed/mq/broker/broker_grpc_server_subscribe.go index 20d529239..1a9c62d75 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/mq/broker/broker_grpc_server_subscribe.go @@ -13,10 +13,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) -func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error { +func (broker *MessageBroker) Subscribe(stream mq_pb.SeaweedMessaging_SubscribeServer) error { // process initial request in, err := stream.Recv() @@ -32,7 +32,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs subscriberId := in.Init.SubscriberId // TODO look it up - topicConfig := &messaging_pb.TopicConfiguration{ + topicConfig := &mq_pb.TopicConfiguration{ // IsTransient: true, } @@ -63,17 +63,17 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs lastReadTime := time.Now() switch in.Init.StartPosition { - case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP: + case mq_pb.SubscriberMessage_InitMessage_TIMESTAMP: lastReadTime = time.Unix(0, in.Init.TimestampNs) - case messaging_pb.SubscriberMessage_InitMessage_LATEST: - case messaging_pb.SubscriberMessage_InitMessage_EARLIEST: + case mq_pb.SubscriberMessage_InitMessage_LATEST: + case mq_pb.SubscriberMessage_InitMessage_EARLIEST: lastReadTime = time.Unix(0, 0) } // how to process each message // an error returned will end the subscription - eachMessageFn := func(m *messaging_pb.Message) error { - err := stream.Send(&messaging_pb.BrokerMessage{ + eachMessageFn := func(m *mq_pb.Message) error { + err := stream.Send(&mq_pb.BrokerMessage{ Data: m, }) if err != nil { @@ -83,9 +83,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs } eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { - m := &messaging_pb.Message{} + m := &mq_pb.Message{} if err = proto.Unmarshal(logEntry.Data, m); err != nil { - glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) + glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err) return err } // fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs) diff --git a/weed/messaging/broker/broker_server.go b/weed/mq/broker/broker_server.go similarity index 96% rename from weed/messaging/broker/broker_server.go rename to weed/mq/broker/broker_server.go index acf2d6d34..5aa5285c9 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -2,7 +2,7 @@ package broker import ( "context" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" "time" "google.golang.org/grpc" @@ -23,7 +23,7 @@ type MessageBrokerOption struct { } type MessageBroker struct { - messaging_pb.UnimplementedSeaweedMessagingServer + mq_pb.UnimplementedSeaweedMessagingServer option *MessageBrokerOption grpcDialOption grpc.DialOption topicManager *TopicManager diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/mq/broker/consistent_distribution.go similarity index 100% rename from weed/messaging/broker/consistent_distribution.go rename to weed/mq/broker/consistent_distribution.go diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/mq/broker/consistent_distribution_test.go similarity index 100% rename from weed/messaging/broker/consistent_distribution_test.go rename to weed/mq/broker/consistent_distribution_test.go diff --git a/weed/messaging/broker/topic_manager.go b/weed/mq/broker/topic_manager.go similarity index 92% rename from weed/messaging/broker/topic_manager.go rename to weed/mq/broker/topic_manager.go index c303c29b3..1acf085fa 100644 --- a/weed/messaging/broker/topic_manager.go +++ b/weed/mq/broker/topic_manager.go @@ -7,7 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" "github.com/chrislusf/seaweedfs/weed/util/log_buffer" ) @@ -46,7 +46,7 @@ func NewTopicManager(messageBroker *MessageBroker) *TopicManager { } } -func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { +func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *mq_pb.TopicConfiguration) *log_buffer.LogBuffer { flushFn := func(startTime, stopTime time.Time, buf []byte) { @@ -75,7 +75,7 @@ func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topi return logBuffer } -func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl { +func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *mq_pb.TopicConfiguration, isPublisher bool) *TopicControl { tm.Lock() defer tm.Unlock() diff --git a/weed/messaging/msgclient/chan_config.go b/weed/mq/msgclient/chan_config.go similarity index 100% rename from weed/messaging/msgclient/chan_config.go rename to weed/mq/msgclient/chan_config.go diff --git a/weed/messaging/msgclient/chan_pub.go b/weed/mq/msgclient/chan_pub.go similarity index 78% rename from weed/messaging/msgclient/chan_pub.go rename to weed/mq/msgclient/chan_pub.go index 9bc88f7c0..f4ffe832a 100644 --- a/weed/messaging/msgclient/chan_pub.go +++ b/weed/mq/msgclient/chan_pub.go @@ -8,12 +8,12 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/mq/broker" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) type PubChannel struct { - client messaging_pb.SeaweedMessaging_PublishClient + client mq_pb.SeaweedMessaging_PublishClient grpcConnection *grpc.ClientConn md5hash hash.Hash } @@ -40,8 +40,8 @@ func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { } func (pc *PubChannel) Publish(m []byte) error { - err := pc.client.Send(&messaging_pb.PublishRequest{ - Data: &messaging_pb.Message{ + err := pc.client.Send(&mq_pb.PublishRequest{ + Data: &mq_pb.Message{ Value: m, }, }) @@ -53,8 +53,8 @@ func (pc *PubChannel) Publish(m []byte) error { func (pc *PubChannel) Close() error { // println("send closing") - if err := pc.client.Send(&messaging_pb.PublishRequest{ - Data: &messaging_pb.Message{ + if err := pc.client.Send(&mq_pb.PublishRequest{ + Data: &mq_pb.Message{ IsClose: true, }, }); err != nil { diff --git a/weed/messaging/msgclient/chan_sub.go b/weed/mq/msgclient/chan_sub.go similarity index 86% rename from weed/messaging/msgclient/chan_sub.go rename to weed/mq/msgclient/chan_sub.go index 213ff4666..859b482ef 100644 --- a/weed/messaging/msgclient/chan_sub.go +++ b/weed/mq/msgclient/chan_sub.go @@ -8,13 +8,13 @@ import ( "log" "time" - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/mq/broker" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) type SubChannel struct { ch chan []byte - stream messaging_pb.SeaweedMessaging_SubscribeClient + stream mq_pb.SeaweedMessaging_SubscribeClient md5hash hash.Hash cancel context.CancelFunc } @@ -57,7 +57,7 @@ func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubCha continue } if resp.Data.IsClose { - t.stream.Send(&messaging_pb.SubscriberMessage{ + t.stream.Send(&mq_pb.SubscriberMessage{ IsClose: true, }) close(t.ch) diff --git a/weed/messaging/msgclient/client.go b/weed/mq/msgclient/client.go similarity index 83% rename from weed/messaging/msgclient/client.go rename to weed/mq/msgclient/client.go index 4d7ef2b8e..cc64f1acb 100644 --- a/weed/messaging/msgclient/client.go +++ b/weed/mq/msgclient/client.go @@ -7,9 +7,9 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/mq/broker" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -38,8 +38,8 @@ func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientCon } defer grpcConnection.Close() - resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(), - &messaging_pb.FindBrokerRequest{ + resp, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(), + &mq_pb.FindBrokerRequest{ Namespace: tp.Namespace, Topic: tp.Topic, Parition: tp.Partition, diff --git a/weed/messaging/msgclient/config.go b/weed/mq/msgclient/config.go similarity index 62% rename from weed/messaging/msgclient/config.go rename to weed/mq/msgclient/config.go index 2b9eba1a8..263ee856e 100644 --- a/weed/messaging/msgclient/config.go +++ b/weed/mq/msgclient/config.go @@ -4,19 +4,19 @@ import ( "context" "log" - "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/mq/broker" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error { - return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error { + return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error { _, err := client.ConfigureTopic(context.Background(), - &messaging_pb.ConfigureTopicRequest{ + &mq_pb.ConfigureTopicRequest{ Namespace: tp.Namespace, Topic: tp.Topic, - Configuration: &messaging_pb.TopicConfiguration{ + Configuration: &mq_pb.TopicConfiguration{ PartitionCount: 0, Collection: "", Replication: "", @@ -31,9 +31,9 @@ func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error { func (mc *MessagingClient) DeleteTopic(namespace, topic string) error { - return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error { + return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error { _, err := client.DeleteTopic(context.Background(), - &messaging_pb.DeleteTopicRequest{ + &mq_pb.DeleteTopicRequest{ Namespace: namespace, Topic: topic, }) @@ -41,7 +41,7 @@ func (mc *MessagingClient) DeleteTopic(namespace, topic string) error { }) } -func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMessagingClient) error) error { +func (mc *MessagingClient) withAnyBroker(fn func(client mq_pb.SeaweedMessagingClient) error) error { var lastErr error for _, broker := range mc.bootstrapBrokers { @@ -52,7 +52,7 @@ func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMess } defer grpcConnection.Close() - err = fn(messaging_pb.NewSeaweedMessagingClient(grpcConnection)) + err = fn(mq_pb.NewSeaweedMessagingClient(grpcConnection)) if err == nil { return nil } diff --git a/weed/messaging/msgclient/publisher.go b/weed/mq/msgclient/publisher.go similarity index 65% rename from weed/messaging/msgclient/publisher.go rename to weed/mq/msgclient/publisher.go index 1aa483ff8..823791d10 100644 --- a/weed/messaging/msgclient/publisher.go +++ b/weed/mq/msgclient/publisher.go @@ -6,23 +6,23 @@ import ( "github.com/OneOfOne/xxhash" "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/mq/broker" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) type Publisher struct { - publishClients []messaging_pb.SeaweedMessaging_PublishClient - topicConfiguration *messaging_pb.TopicConfiguration + publishClients []mq_pb.SeaweedMessaging_PublishClient + topicConfiguration *mq_pb.TopicConfiguration messageCount uint64 publisherId string } func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { // read topic configuration - topicConfiguration := &messaging_pb.TopicConfiguration{ + topicConfiguration := &mq_pb.TopicConfiguration{ PartitionCount: 4, } - publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) + publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { tp := broker.TopicPartition{ Namespace: namespace, @@ -45,16 +45,16 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (* }, nil } -func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) { +func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) { - stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background()) + stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background()) if err != nil { return nil, err } // send init message - err = stream.Send(&messaging_pb.PublishRequest{ - Init: &messaging_pb.PublishRequest_InitMessage{ + err = stream.Send(&mq_pb.PublishRequest{ + Init: &mq_pb.PublishRequest_InitMessage{ Namespace: tp.Namespace, Topic: tp.Topic, Partition: tp.Partition, @@ -95,14 +95,14 @@ func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartit } -func (p *Publisher) Publish(m *messaging_pb.Message) error { +func (p *Publisher) Publish(m *mq_pb.Message) error { hashValue := p.messageCount p.messageCount++ - if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash { + if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash { if m.Key != nil { hashValue = xxhash.Checksum64(m.Key) } - } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash { + } else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash { hashValue = xxhash.Checksum64(m.Key) } else { // round robin @@ -112,7 +112,7 @@ func (p *Publisher) Publish(m *messaging_pb.Message) error { if idx < 0 { idx += len(p.publishClients) } - return p.publishClients[idx].Send(&messaging_pb.PublishRequest{ + return p.publishClients[idx].Send(&mq_pb.PublishRequest{ Data: m, }) } diff --git a/weed/messaging/msgclient/subscriber.go b/weed/mq/msgclient/subscriber.go similarity index 70% rename from weed/messaging/msgclient/subscriber.go rename to weed/mq/msgclient/subscriber.go index 6c7dc1ab7..f3da40fb3 100644 --- a/weed/messaging/msgclient/subscriber.go +++ b/weed/mq/msgclient/subscriber.go @@ -6,23 +6,23 @@ import ( "sync" "time" - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/mq/broker" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" ) type Subscriber struct { - subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient + subscriberClients []mq_pb.SeaweedMessaging_SubscribeClient subscriberCancels []context.CancelFunc subscriberId string } func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) { // read topic configuration - topicConfiguration := &messaging_pb.TopicConfiguration{ + topicConfiguration := &mq_pb.TopicConfiguration{ PartitionCount: 4, } - subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) + subscriberClients := make([]mq_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { @@ -54,19 +54,19 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, }, nil } -func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { - stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx) +func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream mq_pb.SeaweedMessaging_SubscribeClient, err error) { + stream, err = mq_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx) if err != nil { return } // send init message - err = stream.Send(&messaging_pb.SubscriberMessage{ - Init: &messaging_pb.SubscriberMessage_InitMessage{ + err = stream.Send(&mq_pb.SubscriberMessage{ + Init: &mq_pb.SubscriberMessage_InitMessage{ Namespace: tp.Namespace, Topic: tp.Topic, Partition: tp.Partition, - StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, + StartPosition: mq_pb.SubscriberMessage_InitMessage_TIMESTAMP, TimestampNs: startTime.UnixNano(), SubscriberId: subscriberId, }, @@ -78,7 +78,7 @@ func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, return stream, nil } -func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error { +func doSubscribe(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient, processFn func(m *mq_pb.Message)) error { for { resp, listenErr := subscriberClient.Recv() if listenErr == io.EOF { @@ -97,12 +97,12 @@ func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, } // Subscribe starts goroutines to process the messages -func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { +func (s *Subscriber) Subscribe(processFn func(m *mq_pb.Message)) { var wg sync.WaitGroup for i := 0; i < len(s.subscriberClients); i++ { if s.subscriberClients[i] != nil { wg.Add(1) - go func(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient) { + go func(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient) { defer wg.Done() doSubscribe(subscriberClient, processFn) }(s.subscriberClients[i]) diff --git a/weed/pb/Makefile b/weed/pb/Makefile index a8992bde2..01322ffda 100644 --- a/weed/pb/Makefile +++ b/weed/pb/Makefile @@ -10,6 +10,6 @@ gen: protoc iam.proto --go_out=./iam_pb --go-grpc_out=./iam_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc mount.proto --go_out=./mount_pb --go-grpc_out=./mount_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc s3.proto --go_out=./s3_pb --go-grpc_out=./s3_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative - protoc messaging.proto --go_out=./messaging_pb --go-grpc_out=./messaging_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative + protoc mq.proto --go_out=./mq_pb --go-grpc_out=./mq_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative # protoc filer.proto --java_out=../../other/java/client/src/main/java cp filer.proto ../../other/java/client/src/main/proto diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index c7cb82a22..0a0d65a87 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -18,7 +18,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) const ( @@ -231,10 +231,10 @@ func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[stri return err } -func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error { +func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error { return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { - client := messaging_pb.NewSeaweedMessagingClient(grpcConnection) + client := mq_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) }, brokerGrpcAddress, grpcDialOption) diff --git a/weed/pb/messaging.proto b/weed/pb/messaging.proto index 04446ad16..a73bc99a6 100644 --- a/weed/pb/messaging.proto +++ b/weed/pb/messaging.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package messaging_pb; -option go_package = "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"; +option go_package = "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"; option java_package = "seaweedfs.client"; option java_outer_classname = "MessagingProto"; diff --git a/weed/pb/mount_pb/mount.pb.go b/weed/pb/mount_pb/mount.pb.go index cbaf533fe..642ffe59c 100644 --- a/weed/pb/mount_pb/mount.pb.go +++ b/weed/pb/mount_pb/mount.pb.go @@ -143,12 +143,12 @@ func file_mount_proto_rawDescGZIP() []byte { var file_mount_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_mount_proto_goTypes = []interface{}{ - (*ConfigureRequest)(nil), // 0: messaging_pb.ConfigureRequest - (*ConfigureResponse)(nil), // 1: messaging_pb.ConfigureResponse + (*ConfigureRequest)(nil), // 0: mq_pb.ConfigureRequest + (*ConfigureResponse)(nil), // 1: mq_pb.ConfigureResponse } var file_mount_proto_depIdxs = []int32{ - 0, // 0: messaging_pb.SeaweedMount.Configure:input_type -> messaging_pb.ConfigureRequest - 1, // 1: messaging_pb.SeaweedMount.Configure:output_type -> messaging_pb.ConfigureResponse + 0, // 0: mq_pb.SeaweedMount.Configure:input_type -> mq_pb.ConfigureRequest + 1, // 1: mq_pb.SeaweedMount.Configure:output_type -> mq_pb.ConfigureResponse 1, // [1:2] is the sub-list for method output_type 0, // [0:1] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name diff --git a/weed/pb/mount_pb/mount_grpc.pb.go b/weed/pb/mount_pb/mount_grpc.pb.go index 41737aa21..1da6542dc 100644 --- a/weed/pb/mount_pb/mount_grpc.pb.go +++ b/weed/pb/mount_pb/mount_grpc.pb.go @@ -31,7 +31,7 @@ func NewSeaweedMountClient(cc grpc.ClientConnInterface) SeaweedMountClient { func (c *seaweedMountClient) Configure(ctx context.Context, in *ConfigureRequest, opts ...grpc.CallOption) (*ConfigureResponse, error) { out := new(ConfigureResponse) - err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMount/Configure", in, out, opts...) + err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMount/Configure", in, out, opts...) if err != nil { return nil, err } @@ -76,7 +76,7 @@ func _SeaweedMount_Configure_Handler(srv interface{}, ctx context.Context, dec f } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/messaging_pb.SeaweedMount/Configure", + FullMethod: "/mq_pb.SeaweedMount/Configure", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SeaweedMountServer).Configure(ctx, req.(*ConfigureRequest)) @@ -88,7 +88,7 @@ func _SeaweedMount_Configure_Handler(srv interface{}, ctx context.Context, dec f // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var SeaweedMount_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "messaging_pb.SeaweedMount", + ServiceName: "mq_pb.SeaweedMount", HandlerType: (*SeaweedMountServer)(nil), Methods: []grpc.MethodDesc{ { diff --git a/weed/pb/messaging_pb/messaging.pb.go b/weed/pb/mq_pb/messaging.pb.go similarity index 92% rename from weed/pb/messaging_pb/messaging.pb.go rename to weed/pb/mq_pb/messaging.pb.go index 5b9ca1ee3..8de152f1b 100644 --- a/weed/pb/messaging_pb/messaging.pb.go +++ b/weed/pb/mq_pb/messaging.pb.go @@ -2,9 +2,9 @@ // versions: // protoc-gen-go v1.26.0 // protoc v3.17.3 -// source: messaging.proto +// source: mq.proto -package messaging_pb +package mq_pb import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" @@ -840,7 +840,7 @@ type TopicConfiguration struct { Collection string `protobuf:"bytes,2,opt,name=collection,proto3" json:"collection,omitempty"` Replication string `protobuf:"bytes,3,opt,name=replication,proto3" json:"replication,omitempty"` IsTransient bool `protobuf:"varint,4,opt,name=is_transient,json=isTransient,proto3" json:"is_transient,omitempty"` - Partitoning TopicConfiguration_Partitioning `protobuf:"varint,5,opt,name=partitoning,proto3,enum=messaging_pb.TopicConfiguration_Partitioning" json:"partitoning,omitempty"` + Partitoning TopicConfiguration_Partitioning `protobuf:"varint,5,opt,name=partitoning,proto3,enum=mq_pb.TopicConfiguration_Partitioning" json:"partitoning,omitempty"` } func (x *TopicConfiguration) Reset() { @@ -918,7 +918,7 @@ type SubscriberMessage_InitMessage struct { Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` Partition int32 `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"` - StartPosition SubscriberMessage_InitMessage_StartPosition `protobuf:"varint,4,opt,name=startPosition,proto3,enum=messaging_pb.SubscriberMessage_InitMessage_StartPosition" json:"startPosition,omitempty"` // Where to begin consuming from + StartPosition SubscriberMessage_InitMessage_StartPosition `protobuf:"varint,4,opt,name=startPosition,proto3,enum=mq_pb.SubscriberMessage_InitMessage_StartPosition" json:"startPosition,omitempty"` // Where to begin consuming from TimestampNs int64 `protobuf:"varint,5,opt,name=timestampNs,proto3" json:"timestampNs,omitempty"` // timestamp in nano seconds SubscriberId string `protobuf:"bytes,6,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"` // uniquely identify a subscriber to track consumption } @@ -1407,54 +1407,54 @@ func file_messaging_proto_rawDescGZIP() []byte { var file_messaging_proto_enumTypes = make([]protoimpl.EnumInfo, 2) var file_messaging_proto_msgTypes = make([]protoimpl.MessageInfo, 20) var file_messaging_proto_goTypes = []interface{}{ - (SubscriberMessage_InitMessage_StartPosition)(0), // 0: messaging_pb.SubscriberMessage.InitMessage.StartPosition - (TopicConfiguration_Partitioning)(0), // 1: messaging_pb.TopicConfiguration.Partitioning - (*SubscriberMessage)(nil), // 2: messaging_pb.SubscriberMessage - (*Message)(nil), // 3: messaging_pb.Message - (*BrokerMessage)(nil), // 4: messaging_pb.BrokerMessage - (*PublishRequest)(nil), // 5: messaging_pb.PublishRequest - (*PublishResponse)(nil), // 6: messaging_pb.PublishResponse - (*DeleteTopicRequest)(nil), // 7: messaging_pb.DeleteTopicRequest - (*DeleteTopicResponse)(nil), // 8: messaging_pb.DeleteTopicResponse - (*ConfigureTopicRequest)(nil), // 9: messaging_pb.ConfigureTopicRequest - (*ConfigureTopicResponse)(nil), // 10: messaging_pb.ConfigureTopicResponse - (*GetTopicConfigurationRequest)(nil), // 11: messaging_pb.GetTopicConfigurationRequest - (*GetTopicConfigurationResponse)(nil), // 12: messaging_pb.GetTopicConfigurationResponse - (*FindBrokerRequest)(nil), // 13: messaging_pb.FindBrokerRequest - (*FindBrokerResponse)(nil), // 14: messaging_pb.FindBrokerResponse - (*TopicConfiguration)(nil), // 15: messaging_pb.TopicConfiguration - (*SubscriberMessage_InitMessage)(nil), // 16: messaging_pb.SubscriberMessage.InitMessage - (*SubscriberMessage_AckMessage)(nil), // 17: messaging_pb.SubscriberMessage.AckMessage - nil, // 18: messaging_pb.Message.HeadersEntry - (*PublishRequest_InitMessage)(nil), // 19: messaging_pb.PublishRequest.InitMessage - (*PublishResponse_ConfigMessage)(nil), // 20: messaging_pb.PublishResponse.ConfigMessage - (*PublishResponse_RedirectMessage)(nil), // 21: messaging_pb.PublishResponse.RedirectMessage + (SubscriberMessage_InitMessage_StartPosition)(0), // 0: mq_pb.SubscriberMessage.InitMessage.StartPosition + (TopicConfiguration_Partitioning)(0), // 1: mq_pb.TopicConfiguration.Partitioning + (*SubscriberMessage)(nil), // 2: mq_pb.SubscriberMessage + (*Message)(nil), // 3: mq_pb.Message + (*BrokerMessage)(nil), // 4: mq_pb.BrokerMessage + (*PublishRequest)(nil), // 5: mq_pb.PublishRequest + (*PublishResponse)(nil), // 6: mq_pb.PublishResponse + (*DeleteTopicRequest)(nil), // 7: mq_pb.DeleteTopicRequest + (*DeleteTopicResponse)(nil), // 8: mq_pb.DeleteTopicResponse + (*ConfigureTopicRequest)(nil), // 9: mq_pb.ConfigureTopicRequest + (*ConfigureTopicResponse)(nil), // 10: mq_pb.ConfigureTopicResponse + (*GetTopicConfigurationRequest)(nil), // 11: mq_pb.GetTopicConfigurationRequest + (*GetTopicConfigurationResponse)(nil), // 12: mq_pb.GetTopicConfigurationResponse + (*FindBrokerRequest)(nil), // 13: mq_pb.FindBrokerRequest + (*FindBrokerResponse)(nil), // 14: mq_pb.FindBrokerResponse + (*TopicConfiguration)(nil), // 15: mq_pb.TopicConfiguration + (*SubscriberMessage_InitMessage)(nil), // 16: mq_pb.SubscriberMessage.InitMessage + (*SubscriberMessage_AckMessage)(nil), // 17: mq_pb.SubscriberMessage.AckMessage + nil, // 18: mq_pb.Message.HeadersEntry + (*PublishRequest_InitMessage)(nil), // 19: mq_pb.PublishRequest.InitMessage + (*PublishResponse_ConfigMessage)(nil), // 20: mq_pb.PublishResponse.ConfigMessage + (*PublishResponse_RedirectMessage)(nil), // 21: mq_pb.PublishResponse.RedirectMessage } var file_messaging_proto_depIdxs = []int32{ - 16, // 0: messaging_pb.SubscriberMessage.init:type_name -> messaging_pb.SubscriberMessage.InitMessage - 17, // 1: messaging_pb.SubscriberMessage.ack:type_name -> messaging_pb.SubscriberMessage.AckMessage - 18, // 2: messaging_pb.Message.headers:type_name -> messaging_pb.Message.HeadersEntry - 3, // 3: messaging_pb.BrokerMessage.data:type_name -> messaging_pb.Message - 19, // 4: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage - 3, // 5: messaging_pb.PublishRequest.data:type_name -> messaging_pb.Message - 20, // 6: messaging_pb.PublishResponse.config:type_name -> messaging_pb.PublishResponse.ConfigMessage - 21, // 7: messaging_pb.PublishResponse.redirect:type_name -> messaging_pb.PublishResponse.RedirectMessage - 15, // 8: messaging_pb.ConfigureTopicRequest.configuration:type_name -> messaging_pb.TopicConfiguration - 15, // 9: messaging_pb.GetTopicConfigurationResponse.configuration:type_name -> messaging_pb.TopicConfiguration - 1, // 10: messaging_pb.TopicConfiguration.partitoning:type_name -> messaging_pb.TopicConfiguration.Partitioning - 0, // 11: messaging_pb.SubscriberMessage.InitMessage.startPosition:type_name -> messaging_pb.SubscriberMessage.InitMessage.StartPosition - 2, // 12: messaging_pb.SeaweedMessaging.Subscribe:input_type -> messaging_pb.SubscriberMessage - 5, // 13: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest - 7, // 14: messaging_pb.SeaweedMessaging.DeleteTopic:input_type -> messaging_pb.DeleteTopicRequest - 9, // 15: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest - 11, // 16: messaging_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> messaging_pb.GetTopicConfigurationRequest - 13, // 17: messaging_pb.SeaweedMessaging.FindBroker:input_type -> messaging_pb.FindBrokerRequest - 4, // 18: messaging_pb.SeaweedMessaging.Subscribe:output_type -> messaging_pb.BrokerMessage - 6, // 19: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse - 8, // 20: messaging_pb.SeaweedMessaging.DeleteTopic:output_type -> messaging_pb.DeleteTopicResponse - 10, // 21: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse - 12, // 22: messaging_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> messaging_pb.GetTopicConfigurationResponse - 14, // 23: messaging_pb.SeaweedMessaging.FindBroker:output_type -> messaging_pb.FindBrokerResponse + 16, // 0: mq_pb.SubscriberMessage.init:type_name -> mq_pb.SubscriberMessage.InitMessage + 17, // 1: mq_pb.SubscriberMessage.ack:type_name -> mq_pb.SubscriberMessage.AckMessage + 18, // 2: mq_pb.Message.headers:type_name -> mq_pb.Message.HeadersEntry + 3, // 3: mq_pb.BrokerMessage.data:type_name -> mq_pb.Message + 19, // 4: mq_pb.PublishRequest.init:type_name -> mq_pb.PublishRequest.InitMessage + 3, // 5: mq_pb.PublishRequest.data:type_name -> mq_pb.Message + 20, // 6: mq_pb.PublishResponse.config:type_name -> mq_pb.PublishResponse.ConfigMessage + 21, // 7: mq_pb.PublishResponse.redirect:type_name -> mq_pb.PublishResponse.RedirectMessage + 15, // 8: mq_pb.ConfigureTopicRequest.configuration:type_name -> mq_pb.TopicConfiguration + 15, // 9: mq_pb.GetTopicConfigurationResponse.configuration:type_name -> mq_pb.TopicConfiguration + 1, // 10: mq_pb.TopicConfiguration.partitoning:type_name -> mq_pb.TopicConfiguration.Partitioning + 0, // 11: mq_pb.SubscriberMessage.InitMessage.startPosition:type_name -> mq_pb.SubscriberMessage.InitMessage.StartPosition + 2, // 12: mq_pb.SeaweedMessaging.Subscribe:input_type -> mq_pb.SubscriberMessage + 5, // 13: mq_pb.SeaweedMessaging.Publish:input_type -> mq_pb.PublishRequest + 7, // 14: mq_pb.SeaweedMessaging.DeleteTopic:input_type -> mq_pb.DeleteTopicRequest + 9, // 15: mq_pb.SeaweedMessaging.ConfigureTopic:input_type -> mq_pb.ConfigureTopicRequest + 11, // 16: mq_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> mq_pb.GetTopicConfigurationRequest + 13, // 17: mq_pb.SeaweedMessaging.FindBroker:input_type -> mq_pb.FindBrokerRequest + 4, // 18: mq_pb.SeaweedMessaging.Subscribe:output_type -> mq_pb.BrokerMessage + 6, // 19: mq_pb.SeaweedMessaging.Publish:output_type -> mq_pb.PublishResponse + 8, // 20: mq_pb.SeaweedMessaging.DeleteTopic:output_type -> mq_pb.DeleteTopicResponse + 10, // 21: mq_pb.SeaweedMessaging.ConfigureTopic:output_type -> mq_pb.ConfigureTopicResponse + 12, // 22: mq_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> mq_pb.GetTopicConfigurationResponse + 14, // 23: mq_pb.SeaweedMessaging.FindBroker:output_type -> mq_pb.FindBrokerResponse 18, // [18:24] is the sub-list for method output_type 12, // [12:18] is the sub-list for method input_type 12, // [12:12] is the sub-list for extension type_name diff --git a/weed/pb/messaging_pb/messaging_grpc.pb.go b/weed/pb/mq_pb/messaging_grpc.pb.go similarity index 93% rename from weed/pb/messaging_pb/messaging_grpc.pb.go rename to weed/pb/mq_pb/messaging_grpc.pb.go index 234cffa95..0249a0b9c 100644 --- a/weed/pb/messaging_pb/messaging_grpc.pb.go +++ b/weed/pb/mq_pb/messaging_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. -package messaging_pb +package mq_pb import ( context "context" @@ -35,7 +35,7 @@ func NewSeaweedMessagingClient(cc grpc.ClientConnInterface) SeaweedMessagingClie } func (c *seaweedMessagingClient) Subscribe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeClient, error) { - stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[0], "/messaging_pb.SeaweedMessaging/Subscribe", opts...) + stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[0], "/mq_pb.SeaweedMessaging/Subscribe", opts...) if err != nil { return nil, err } @@ -66,7 +66,7 @@ func (x *seaweedMessagingSubscribeClient) Recv() (*BrokerMessage, error) { } func (c *seaweedMessagingClient) Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error) { - stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[1], "/messaging_pb.SeaweedMessaging/Publish", opts...) + stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[1], "/mq_pb.SeaweedMessaging/Publish", opts...) if err != nil { return nil, err } @@ -98,7 +98,7 @@ func (x *seaweedMessagingPublishClient) Recv() (*PublishResponse, error) { func (c *seaweedMessagingClient) DeleteTopic(ctx context.Context, in *DeleteTopicRequest, opts ...grpc.CallOption) (*DeleteTopicResponse, error) { out := new(DeleteTopicResponse) - err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/DeleteTopic", in, out, opts...) + err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMessaging/DeleteTopic", in, out, opts...) if err != nil { return nil, err } @@ -107,7 +107,7 @@ func (c *seaweedMessagingClient) DeleteTopic(ctx context.Context, in *DeleteTopi func (c *seaweedMessagingClient) ConfigureTopic(ctx context.Context, in *ConfigureTopicRequest, opts ...grpc.CallOption) (*ConfigureTopicResponse, error) { out := new(ConfigureTopicResponse) - err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/ConfigureTopic", in, out, opts...) + err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMessaging/ConfigureTopic", in, out, opts...) if err != nil { return nil, err } @@ -116,7 +116,7 @@ func (c *seaweedMessagingClient) ConfigureTopic(ctx context.Context, in *Configu func (c *seaweedMessagingClient) GetTopicConfiguration(ctx context.Context, in *GetTopicConfigurationRequest, opts ...grpc.CallOption) (*GetTopicConfigurationResponse, error) { out := new(GetTopicConfigurationResponse) - err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/GetTopicConfiguration", in, out, opts...) + err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMessaging/GetTopicConfiguration", in, out, opts...) if err != nil { return nil, err } @@ -125,7 +125,7 @@ func (c *seaweedMessagingClient) GetTopicConfiguration(ctx context.Context, in * func (c *seaweedMessagingClient) FindBroker(ctx context.Context, in *FindBrokerRequest, opts ...grpc.CallOption) (*FindBrokerResponse, error) { out := new(FindBrokerResponse) - err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/FindBroker", in, out, opts...) + err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMessaging/FindBroker", in, out, opts...) if err != nil { return nil, err } @@ -242,7 +242,7 @@ func _SeaweedMessaging_DeleteTopic_Handler(srv interface{}, ctx context.Context, } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/messaging_pb.SeaweedMessaging/DeleteTopic", + FullMethod: "/mq_pb.SeaweedMessaging/DeleteTopic", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SeaweedMessagingServer).DeleteTopic(ctx, req.(*DeleteTopicRequest)) @@ -260,7 +260,7 @@ func _SeaweedMessaging_ConfigureTopic_Handler(srv interface{}, ctx context.Conte } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/messaging_pb.SeaweedMessaging/ConfigureTopic", + FullMethod: "/mq_pb.SeaweedMessaging/ConfigureTopic", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SeaweedMessagingServer).ConfigureTopic(ctx, req.(*ConfigureTopicRequest)) @@ -278,7 +278,7 @@ func _SeaweedMessaging_GetTopicConfiguration_Handler(srv interface{}, ctx contex } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/messaging_pb.SeaweedMessaging/GetTopicConfiguration", + FullMethod: "/mq_pb.SeaweedMessaging/GetTopicConfiguration", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SeaweedMessagingServer).GetTopicConfiguration(ctx, req.(*GetTopicConfigurationRequest)) @@ -296,7 +296,7 @@ func _SeaweedMessaging_FindBroker_Handler(srv interface{}, ctx context.Context, } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/messaging_pb.SeaweedMessaging/FindBroker", + FullMethod: "/mq_pb.SeaweedMessaging/FindBroker", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SeaweedMessagingServer).FindBroker(ctx, req.(*FindBrokerRequest)) @@ -308,7 +308,7 @@ func _SeaweedMessaging_FindBroker_Handler(srv interface{}, ctx context.Context, // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "messaging_pb.SeaweedMessaging", + ServiceName: "mq_pb.SeaweedMessaging", HandlerType: (*SeaweedMessagingServer)(nil), Methods: []grpc.MethodDesc{ { @@ -342,5 +342,5 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{ ClientStreams: true, }, }, - Metadata: "messaging.proto", + Metadata: "mq.proto", } diff --git a/weed/pb/s3_pb/s3.pb.go b/weed/pb/s3_pb/s3.pb.go index c1bd23556..9923961a8 100644 --- a/weed/pb/s3_pb/s3.pb.go +++ b/weed/pb/s3_pb/s3.pb.go @@ -283,20 +283,20 @@ func file_s3_proto_rawDescGZIP() []byte { var file_s3_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_s3_proto_goTypes = []interface{}{ - (*S3ConfigureRequest)(nil), // 0: messaging_pb.S3ConfigureRequest - (*S3ConfigureResponse)(nil), // 1: messaging_pb.S3ConfigureResponse - (*S3CircuitBreakerConfig)(nil), // 2: messaging_pb.S3CircuitBreakerConfig - (*S3CircuitBreakerOptions)(nil), // 3: messaging_pb.S3CircuitBreakerOptions - nil, // 4: messaging_pb.S3CircuitBreakerConfig.BucketsEntry - nil, // 5: messaging_pb.S3CircuitBreakerOptions.ActionsEntry + (*S3ConfigureRequest)(nil), // 0: mq_pb.S3ConfigureRequest + (*S3ConfigureResponse)(nil), // 1: mq_pb.S3ConfigureResponse + (*S3CircuitBreakerConfig)(nil), // 2: mq_pb.S3CircuitBreakerConfig + (*S3CircuitBreakerOptions)(nil), // 3: mq_pb.S3CircuitBreakerOptions + nil, // 4: mq_pb.S3CircuitBreakerConfig.BucketsEntry + nil, // 5: mq_pb.S3CircuitBreakerOptions.ActionsEntry } var file_s3_proto_depIdxs = []int32{ - 3, // 0: messaging_pb.S3CircuitBreakerConfig.global:type_name -> messaging_pb.S3CircuitBreakerOptions - 4, // 1: messaging_pb.S3CircuitBreakerConfig.buckets:type_name -> messaging_pb.S3CircuitBreakerConfig.BucketsEntry - 5, // 2: messaging_pb.S3CircuitBreakerOptions.actions:type_name -> messaging_pb.S3CircuitBreakerOptions.ActionsEntry - 3, // 3: messaging_pb.S3CircuitBreakerConfig.BucketsEntry.value:type_name -> messaging_pb.S3CircuitBreakerOptions - 0, // 4: messaging_pb.SeaweedS3.Configure:input_type -> messaging_pb.S3ConfigureRequest - 1, // 5: messaging_pb.SeaweedS3.Configure:output_type -> messaging_pb.S3ConfigureResponse + 3, // 0: mq_pb.S3CircuitBreakerConfig.global:type_name -> mq_pb.S3CircuitBreakerOptions + 4, // 1: mq_pb.S3CircuitBreakerConfig.buckets:type_name -> mq_pb.S3CircuitBreakerConfig.BucketsEntry + 5, // 2: mq_pb.S3CircuitBreakerOptions.actions:type_name -> mq_pb.S3CircuitBreakerOptions.ActionsEntry + 3, // 3: mq_pb.S3CircuitBreakerConfig.BucketsEntry.value:type_name -> mq_pb.S3CircuitBreakerOptions + 0, // 4: mq_pb.SeaweedS3.Configure:input_type -> mq_pb.S3ConfigureRequest + 1, // 5: mq_pb.SeaweedS3.Configure:output_type -> mq_pb.S3ConfigureResponse 5, // [5:6] is the sub-list for method output_type 4, // [4:5] is the sub-list for method input_type 4, // [4:4] is the sub-list for extension type_name diff --git a/weed/pb/s3_pb/s3_grpc.pb.go b/weed/pb/s3_pb/s3_grpc.pb.go index 1bc956be6..c00268bad 100644 --- a/weed/pb/s3_pb/s3_grpc.pb.go +++ b/weed/pb/s3_pb/s3_grpc.pb.go @@ -31,7 +31,7 @@ func NewSeaweedS3Client(cc grpc.ClientConnInterface) SeaweedS3Client { func (c *seaweedS3Client) Configure(ctx context.Context, in *S3ConfigureRequest, opts ...grpc.CallOption) (*S3ConfigureResponse, error) { out := new(S3ConfigureResponse) - err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedS3/Configure", in, out, opts...) + err := c.cc.Invoke(ctx, "/mq_pb.SeaweedS3/Configure", in, out, opts...) if err != nil { return nil, err } @@ -76,7 +76,7 @@ func _SeaweedS3_Configure_Handler(srv interface{}, ctx context.Context, dec func } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/messaging_pb.SeaweedS3/Configure", + FullMethod: "/mq_pb.SeaweedS3/Configure", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SeaweedS3Server).Configure(ctx, req.(*S3ConfigureRequest)) @@ -88,7 +88,7 @@ func _SeaweedS3_Configure_Handler(srv interface{}, ctx context.Context, dec func // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var SeaweedS3_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "messaging_pb.SeaweedS3", + ServiceName: "mq_pb.SeaweedS3", HandlerType: (*SeaweedS3Server)(nil), Methods: []grpc.MethodDesc{ { diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 99532b47b..35bc8ffd5 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -68,7 +68,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime logEntry := &filer_pb.LogEntry{} if err = proto.Unmarshal(entryData, logEntry); err != nil { - glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) + glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err) pos += 4 + int(size) continue }