Browse Source

add control message to data message

mq-subscribe
chrislu 9 months ago
parent
commit
66a878af39
  1. 3
      weed/mq/broker/broker_grpc_pub.go
  2. 4
      weed/mq/client/pub_client/publish.go
  3. 2
      weed/mq/client/pub_client/scheduler.go
  4. 9
      weed/pb/mq.proto
  5. 911
      weed/pb/mq_pb/mq.pb.go

3
weed/mq/broker/broker_grpc_pub.go

@ -140,6 +140,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
continue continue
} }
// The control message should still be sent to the follower
// to avoid timing issue when ack messages.
// send to the local partition // send to the local partition
if err = localTopicPartition.Publish(dataMessage); err != nil { if err = localTopicPartition.Publish(dataMessage); err != nil {
return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err) return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err)

4
weed/mq/client/pub_client/publish.go

@ -29,8 +29,10 @@ func (p *TopicPublisher) FinishPublish() error {
if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found { if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
for _, inputBuffer := range inputBuffers { for _, inputBuffer := range inputBuffers {
inputBuffer.Enqueue(&mq_pb.DataMessage{ inputBuffer.Enqueue(&mq_pb.DataMessage{
IsClose: true,
TsNs: time.Now().UnixNano(), TsNs: time.Now().UnixNano(),
Ctrl: &mq_pb.ControlMessage{
IsClose: true,
},
}) })
} }
} }

2
weed/mq/client/pub_client/scheduler.go

@ -195,7 +195,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
publishCounter := 0 publishCounter := 0
for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() { for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
if data.IsClose {
if data.Ctrl != nil && data.Ctrl.IsClose {
// need to set this before sending to brokers, to avoid timing issue // need to set this before sending to brokers, to avoid timing issue
atomic.StoreInt32(&hasMoreData, 0) atomic.StoreInt32(&hasMoreData, 0)
} }

9
weed/pb/mq.proto

@ -180,11 +180,14 @@ message SubscriberToSubCoordinatorResponse {
} }
////////////////////////////////////////////////// //////////////////////////////////////////////////
message ControlMessage {
bool is_close = 1;
}
message DataMessage { message DataMessage {
bytes key = 1; bytes key = 1;
bytes value = 2; bytes value = 2;
int64 ts_ns = 3; int64 ts_ns = 3;
bool is_close = 4;
ControlMessage ctrl = 4;
} }
message PublishMessageRequest { message PublishMessageRequest {
message InitMessage { message InitMessage {
@ -244,13 +247,13 @@ message SubscribeMessageRequest {
} }
} }
message SubscribeMessageResponse { message SubscribeMessageResponse {
message CtrlMessage {
message SubscribeCtrlMessage {
string error = 1; string error = 1;
bool is_end_of_stream = 2; bool is_end_of_stream = 2;
bool is_end_of_topic = 3; bool is_end_of_topic = 3;
} }
oneof message { oneof message {
CtrlMessage ctrl = 1;
SubscribeCtrlMessage ctrl = 1;
DataMessage data = 2; DataMessage data = 2;
} }
} }

911
weed/pb/mq_pb/mq.pb.go
File diff suppressed because it is too large
View File

Loading…
Cancel
Save