From 8c73410a51441d7f9f1140a8996dd3eb1f191f2e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 30 Apr 2020 02:19:51 -0700 Subject: [PATCH] subscribe from a timestamp --- weed/messaging/client/subscriber.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go index 2ebad4ce6..53e7ffc7d 100644 --- a/weed/messaging/client/subscriber.go +++ b/weed/messaging/client/subscriber.go @@ -13,7 +13,7 @@ type Subscriber struct { subscriberId string } -func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) (*Subscriber, error) { +func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { // read topic configuration topicConfiguration := &messaging_pb.TopicConfiguration{ PartitionCount: 4, @@ -21,7 +21,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i)) + client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime) if err != nil { return nil, err } @@ -34,7 +34,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) }, nil } -func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { +func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background()) if err != nil { @@ -48,7 +48,7 @@ func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic Topic: topic, Partition: partition, StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, - TimestampNs: time.Now().UnixNano(), + TimestampNs: startTime.UnixNano(), SubscriberId: subscriberId, }, })