|
|
@ -13,10 +13,17 @@ import ( |
|
|
|
|
|
|
|
func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error { |
|
|
|
|
|
|
|
ctx := stream.Context() |
|
|
|
clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) |
|
|
|
|
|
|
|
t := topic.FromPbTopic(req.GetInit().Topic) |
|
|
|
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) |
|
|
|
localTopicPartition := b.localTopicManager.GetTopicPartition(t, partition) |
|
|
|
if localTopicPartition == nil { |
|
|
|
|
|
|
|
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) |
|
|
|
|
|
|
|
var localTopicPartition *topic.LocalPartition |
|
|
|
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) |
|
|
|
for localTopicPartition == nil { |
|
|
|
stream.Send(&mq_pb.SubscribeMessageResponse{ |
|
|
|
Message: &mq_pb.SubscribeMessageResponse_Ctrl{ |
|
|
|
Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{ |
|
|
@ -24,10 +31,23 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest |
|
|
|
}, |
|
|
|
}, |
|
|
|
}) |
|
|
|
return nil |
|
|
|
time.Sleep(337 * time.Millisecond) |
|
|
|
// Check if the client has disconnected by monitoring the context
|
|
|
|
select { |
|
|
|
case <-ctx.Done(): |
|
|
|
err := ctx.Err() |
|
|
|
if err == context.Canceled { |
|
|
|
// Client disconnected
|
|
|
|
return nil |
|
|
|
} |
|
|
|
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) |
|
|
|
return nil |
|
|
|
default: |
|
|
|
// Continue processing the request
|
|
|
|
} |
|
|
|
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) |
|
|
|
} |
|
|
|
|
|
|
|
clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) |
|
|
|
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber()) |
|
|
|
glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition) |
|
|
|
isConnected := true |
|
|
@ -38,7 +58,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest |
|
|
|
glog.V(0).Infof("Subscriber %s on %v %v disconnected", clientName, t, partition) |
|
|
|
}() |
|
|
|
|
|
|
|
ctx := stream.Context() |
|
|
|
var startPosition log_buffer.MessagePosition |
|
|
|
var inMemoryOnly bool |
|
|
|
if req.GetInit()!=nil && req.GetInit().GetPartitionOffset() != nil { |
|
|
|