Browse Source

subscription start from specified timestamp

pull/5637/head
chrislu 1 year ago
parent
commit
47a4963d7a
  1. 4
      weed/mq/broker/broker_grpc_sub.go
  2. 2
      weed/mq/client/cmd/weed_sub/subscriber.go

4
weed/mq/broker/broker_grpc_sub.go

@ -38,11 +38,9 @@ func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb
}() }()
ctx := stream.Context() ctx := stream.Context()
var startTime time.Time
startTime := time.Now()
if startTs := req.GetInit().GetStartTimestampNs(); startTs > 0 { if startTs := req.GetInit().GetStartTimestampNs(); startTs > 0 {
startTime = time.Unix(0, startTs) startTime = time.Unix(0, startTs)
} else {
startTime = time.Now()
} }
localTopicPartition.Subscribe(clientName, startTime, func() bool { localTopicPartition.Subscribe(clientName, startTime, func() bool {

2
weed/mq/client/cmd/weed_sub/subscriber.go

@ -30,7 +30,7 @@ func main() {
Namespace: *namespace, Namespace: *namespace,
Topic: *topic, Topic: *topic,
Filter: "", Filter: "",
StartTime: time.Unix(0, 0),
StartTime: time.Unix(1, 1),
} }
processorConfig := sub_client.ProcessorConfiguration{ processorConfig := sub_client.ProcessorConfiguration{

Loading…
Cancel
Save