Browse Source

subscribe from a timestamp

pull/1298/head
Chris Lu 5 years ago
parent
commit
8c73410a51
  1. 8
      weed/messaging/client/subscriber.go

8
weed/messaging/client/subscriber.go

@ -13,7 +13,7 @@ type Subscriber struct {
subscriberId string subscriberId string
} }
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) (*Subscriber, error) {
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
// read topic configuration // read topic configuration
topicConfiguration := &messaging_pb.TopicConfiguration{ topicConfiguration := &messaging_pb.TopicConfiguration{
PartitionCount: 4, PartitionCount: 4,
@ -21,7 +21,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string)
subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
for i := 0; i < int(topicConfiguration.PartitionCount); i++ { for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i))
client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -34,7 +34,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string)
}, nil }, nil
} }
func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background()) stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background())
if err != nil { if err != nil {
@ -48,7 +48,7 @@ func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic
Topic: topic, Topic: topic,
Partition: partition, Partition: partition,
StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
TimestampNs: time.Now().UnixNano(),
TimestampNs: startTime.UnixNano(),
SubscriberId: subscriberId, SubscriberId: subscriberId,
}, },
}) })

Loading…
Cancel
Save