From 47a4963d7ae1b827b51fe1d0c0748ad6a31fcae5 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 3 Jan 2024 15:57:36 -0800 Subject: [PATCH] subscription start from specified timestamp --- weed/mq/broker/broker_grpc_sub.go | 4 +--- weed/mq/client/cmd/weed_sub/subscriber.go | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index c98ce4684..ecf771b9f 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -38,11 +38,9 @@ func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb }() ctx := stream.Context() - var startTime time.Time + startTime := time.Now() if startTs := req.GetInit().GetStartTimestampNs(); startTs > 0 { startTime = time.Unix(0, startTs) - } else { - startTime = time.Now() } localTopicPartition.Subscribe(clientName, startTime, func() bool { diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go index 310e5ac78..7488e60f0 100644 --- a/weed/mq/client/cmd/weed_sub/subscriber.go +++ b/weed/mq/client/cmd/weed_sub/subscriber.go @@ -30,7 +30,7 @@ func main() { Namespace: *namespace, Topic: *topic, Filter: "", - StartTime: time.Unix(0, 0), + StartTime: time.Unix(1, 1), } processorConfig := sub_client.ProcessorConfiguration{