diff --git a/weed/messaging/client/publisher.go b/weed/messaging/client/publisher.go index 4854ab92e..68e5729c1 100644 --- a/weed/messaging/client/publisher.go +++ b/weed/messaging/client/publisher.go @@ -3,14 +3,38 @@ package client import ( "context" + "github.com/OneOfOne/xxhash" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" ) type Publisher struct { - publishClient messaging_pb.SeaweedMessaging_PublishClient + publishClients []messaging_pb.SeaweedMessaging_PublishClient + topicConfiguration *messaging_pb.TopicConfiguration + messageCount uint64 + publisherId string } -func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, error) { +func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { + // read topic configuration + topicConfiguration := &messaging_pb.TopicConfiguration{ + PartitionCount: 4, + } + publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) + for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + client, err := mc.setupPublisherClient(namespace, topic, int32(i)) + if err != nil { + return nil, err + } + publishClients[i] = client + } + return &Publisher{ + publishClients: publishClients, + topicConfiguration: topicConfiguration, + }, nil +} + +func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_PublishClient, error) { stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background()) if err != nil { @@ -22,7 +46,7 @@ func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, er Init: &messaging_pb.PublishRequest_InitMessage{ Namespace: namespace, Topic: topic, - Partition: 0, + Partition: partition, }, }) if err != nil { @@ -56,20 +80,34 @@ func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, er } }() - return &Publisher{ - publishClient: stream, - }, nil + return stream, nil + } func (p *Publisher) Publish(m *messaging_pb.Message) error { + hashValue := p.messageCount + p.messageCount++ + if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash { + if m.Key != nil { + hashValue = xxhash.Checksum64(m.Key) + } + } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash { + hashValue = xxhash.Checksum64(m.Key) + } else { + // round robin + } - return p.publishClient.Send(&messaging_pb.PublishRequest{ + idx := int(hashValue) % len(p.publishClients) + if idx < 0 { + idx += len(p.publishClients) + } + return p.publishClients[idx].Send(&messaging_pb.PublishRequest{ Data: m, }) - } -func (p *Publisher) Close() error { - - return p.publishClient.CloseSend() +func (p *Publisher) Shutdown() { + for _, client := range p.publishClients { + client.CloseSend() + } } diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go index 407cd4ac6..ddf1f82e6 100644 --- a/weed/messaging/client/subscriber.go +++ b/weed/messaging/client/subscriber.go @@ -9,10 +9,33 @@ import ( ) type Subscriber struct { - subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient + subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient + subscriberId string } func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) (*Subscriber, error) { + // read topic configuration + topicConfiguration := &messaging_pb.TopicConfiguration{ + PartitionCount: 4, + } + subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) + + for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i)) + if err != nil { + return nil, err + } + subscriberClients[i] = client + } + + return &Subscriber{ + subscriberClients: subscriberClients, + subscriberId: subscriberId, + }, nil +} + +func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { + stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background()) if err != nil { return nil, err @@ -23,7 +46,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) Init: &messaging_pb.SubscriberMessage_InitMessage{ Namespace: namespace, Topic: topic, - Partition: 0, + Partition: partition, StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, TimestampNs: time.Now().UnixNano(), SubscriberId: subscriberId, @@ -42,20 +65,27 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) // TODO follow redirection } - return &Subscriber{ - subscriberClient: stream, - }, nil + return stream, nil + } -func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) error { +func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error { for { - resp, listenErr := s.subscriberClient.Recv() + resp, listenErr := s.subscriberClients[partition].Recv() if listenErr == io.EOF { return nil } if listenErr != nil { + println(listenErr.Error()) return listenErr } processFn(resp.Data) } } + +// Subscribe starts goroutines to process the messages +func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { + for i:=0;i