diff --git a/weed/mq/broker/brokder_grpc_pub.go b/weed/mq/broker/brokder_grpc_pub.go
index a26be5171..cbcc83f9b 100644
--- a/weed/mq/broker/brokder_grpc_pub.go
+++ b/weed/mq/broker/brokder_grpc_pub.go
@@ -1,6 +1,7 @@
 package broker
 
 import (
+	"github.com/seaweedfs/seaweedfs/weed/pb/message_fbs"
 	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
 )
 
@@ -11,6 +12,36 @@ The messages is buffered in memory, and saved to filer under
 	/topics/<topic>/info/segment_<id>.meta
 */
 
-func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
+func (broker *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error {
+	println("connected")
+	for {
+		request, recvErr := stream.Recv()
+		if recvErr != nil {
+			return recvErr
+		}
+
+		print(">")
+		if request.Control != nil {
+
+		}
+		if request.Data != nil {
+			if err := broker.processDataMessage(stream, request.Data); err != nil {
+				return err
+			}
+		}
+
+	}
+	return nil
+}
+
+func (broker *MessageQueueBroker) processDataMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer, data *mq_pb.PublishRequest_DataMessage) error {
+	mb := message_fbs.GetRootAsMessageBatch(data.Message, 0)
+
+	println("message count:", mb.MessagesLength(), len(data.Message))
+	m := &message_fbs.Message{}
+	for i := 0; i < mb.MessagesLength(); i++ {
+		mb.Messages(m, i)
+		println(i, ">", string(m.Data()))
+	}
 	return nil
 }
diff --git a/weed/mq/client/publish_stream_processor.go b/weed/mq/client/publish_stream_processor.go
new file mode 100644
index 000000000..c23c6a64a
--- /dev/null
+++ b/weed/mq/client/publish_stream_processor.go
@@ -0,0 +1,178 @@
+package client
+
+import (
+	"context"
+	flatbuffers "github.com/google/flatbuffers/go"
+	"github.com/seaweedfs/seaweedfs/weed/mq/segment"
+	"github.com/seaweedfs/seaweedfs/weed/pb"
+	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+	"github.com/seaweedfs/seaweedfs/weed/util"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+	"log"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+const (
+	batchCountLimit = 3
+)
+
+type PublishStreamProcessor struct {
+	// attributes
+	ProducerId     int32
+	ProducerEpoch  int32
+	grpcDialOption grpc.DialOption
+
+	// input
+	sync.Mutex
+
+	timeout time.Duration
+
+	// convert into bytes
+	messagesChan           chan *Message
+	builders               chan *flatbuffers.Builder
+	batchMessageCountLimit int
+
+	messagesSequence int64
+
+	// done channel
+	doneChan chan struct{}
+}
+
+type UploadProcess struct {
+	bufferBuilder *flatbuffers.Builder
+	batchBuilder  *segment.MessageBatchBuilder
+}
+
+func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration) *PublishStreamProcessor {
+	t := &PublishStreamProcessor{
+		grpcDialOption:         grpc.WithTransportCredentials(insecure.NewCredentials()),
+		batchMessageCountLimit: batchMessageCountLimit,
+		builders:               make(chan *flatbuffers.Builder, batchCountLimit),
+		messagesChan:           make(chan *Message, 1024),
+		doneChan:               make(chan struct{}),
+		timeout:                timeout,
+	}
+	for i := 0; i < batchCountLimit; i++ {
+		t.builders <- flatbuffers.NewBuilder(4 * 1024 * 1024)
+	}
+	go t.doLoopUpload()
+	return t
+}
+
+func (p *PublishStreamProcessor) AddMessage(m *Message) error {
+	p.messagesChan <- m
+	return nil
+}
+
+func (p *PublishStreamProcessor) Shutdown() error {
+	p.doneChan <- struct{}{}
+	return nil
+}
+
+func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*Message) error {
+
+	if len(messages) == 0 {
+		return nil
+	}
+
+	builder := <-p.builders
+	bb := segment.NewMessageBatchBuilder(builder, p.ProducerId, p.ProducerEpoch, 3, 4)
+	for _, m := range messages {
+		bb.AddMessage(p.messagesSequence, m.Ts.UnixNano(), m.Properties, m.Key, m.Content)
+		p.messagesSequence++
+	}
+	bb.BuildMessageBatch()
+	defer func() {
+		p.builders <- builder
+	}()
+
+	return stream.Send(&mq_pb.PublishRequest{
+		Data: &mq_pb.PublishRequest_DataMessage{
+			Message: bb.GetBytes(),
+		},
+	})
+
+}
+
+func (p *PublishStreamProcessor) doLoopUpload() {
+
+	brokerGrpcAddress := "localhost:17777"
+
+	// TOOD parallelize the uploading with separate uploader
+	messages := make([]*Message, 0, p.batchMessageCountLimit)
+
+	util.RetryForever("publish message", func() error {
+		return pb.WithBrokerGrpcClient(false, brokerGrpcAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+
+			ctx, cancel := context.WithCancel(context.Background())
+			defer cancel()
+
+			stream, err := client.PublishMessage(ctx)
+			if err != nil {
+				log.Printf("grpc PublishMessage: %v", err)
+				return err
+			}
+
+			var atomicStatus int64
+			go func() {
+				resp, err := stream.Recv()
+				if err != nil {
+					log.Printf("response error: %v", err)
+				} else {
+					log.Printf("response: %v", resp.AckSequence)
+				}
+				if atomic.LoadInt64(&atomicStatus) < 0 {
+					return
+				}
+			}()
+
+			var flushErr error
+			// retry previously failed messages
+			if len(messages) >= p.batchMessageCountLimit {
+				flushErr = p.doFlush(stream, messages)
+				if flushErr != nil {
+					return flushErr
+				}
+				messages = messages[:0]
+			}
+
+			for {
+				select {
+				case m := <-p.messagesChan:
+					messages = append(messages, m)
+					if len(messages) >= p.batchMessageCountLimit {
+						if flushErr = p.doFlush(stream, messages); flushErr != nil {
+							return flushErr
+						}
+						messages = messages[:0]
+					}
+				case <-time.After(p.timeout):
+					if flushErr = p.doFlush(stream, messages); flushErr != nil {
+						return flushErr
+					}
+					messages = messages[:0]
+				case <-p.doneChan:
+					if flushErr = p.doFlush(stream, messages); flushErr != nil {
+						return flushErr
+					}
+					messages = messages[:0]
+					println("$ stopping ...")
+					break
+				}
+			}
+
+			// stop the response consuming goroutine
+			atomic.StoreInt64(&atomicStatus, -1)
+
+			return flushErr
+
+		})
+	}, func(err error) (shouldContinue bool) {
+		log.Printf("failed with grpc %s: %v", brokerGrpcAddress, err)
+		return true
+	})
+
+}
diff --git a/weed/mq/client/publisher.go b/weed/mq/client/publisher.go
new file mode 100644
index 000000000..30de47665
--- /dev/null
+++ b/weed/mq/client/publisher.go
@@ -0,0 +1,46 @@
+package client
+
+import (
+	"github.com/seaweedfs/seaweedfs/weed/pb"
+	"time"
+)
+
+type PublishProcessor interface {
+	AddMessage(m *Message) error
+	Shutdown() error
+}
+
+type PublisherOption struct {
+	Masters string
+	Topic   string
+}
+
+type Publisher struct {
+	option    *PublisherOption
+	masters   []pb.ServerAddress
+	processor *PublishStreamProcessor
+}
+
+func NewPublisher(option *PublisherOption) *Publisher {
+	p := &Publisher{
+		masters:   pb.ServerAddresses(option.Masters).ToAddresses(),
+		option:    option,
+		processor: NewPublishStreamProcessor(3, 887*time.Millisecond),
+	}
+	return p
+}
+
+type Message struct {
+	Key        []byte
+	Content    []byte
+	Properties map[string]string
+	Ts         time.Time
+}
+
+func (p Publisher) Publish(m *Message) error {
+	return p.processor.AddMessage(m)
+}
+
+func (p Publisher) Shutdown() error {
+	return p.processor.Shutdown()
+}
diff --git a/weed/mq/cmd/qsend/qsend.go b/weed/mq/cmd/qsend/qsend.go
new file mode 100644
index 000000000..34f7e6dc5
--- /dev/null
+++ b/weed/mq/cmd/qsend/qsend.go
@@ -0,0 +1,61 @@
+package main
+
+import (
+	"bufio"
+	"flag"
+	"fmt"
+	"github.com/seaweedfs/seaweedfs/weed/mq/client"
+	"os"
+	"time"
+)
+
+var (
+	master = flag.String("master", "localhost:9333", "master csv list")
+	topic  = flag.String("topic", "", "topic name")
+)
+
+func main() {
+	flag.Parse()
+
+	publisher := client.NewPublisher(&client.PublisherOption{
+		Masters: *master,
+		Topic:   *topic,
+	})
+
+	err := eachLineStdin(func(line string) error {
+		if len(line) > 0 {
+			if err := publisher.Publish(&client.Message{
+				Key:        nil,
+				Content:    []byte(line),
+				Properties: nil,
+				Ts:         time.Time{},
+			}); err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+
+	publisher.Shutdown()
+
+	if err != nil {
+		fmt.Printf("error: %v\n", err)
+	}
+}
+
+func eachLineStdin(eachLineFn func(string) error) error {
+	scanner := bufio.NewScanner(os.Stdin)
+	for scanner.Scan() {
+		text := scanner.Text()
+		if err := eachLineFn(text); err != nil {
+			return err
+		}
+	}
+
+	// handle error
+	if scanner.Err() != nil {
+		return fmt.Errorf("scan stdin: %v", scanner.Err())
+	}
+
+	return nil
+}
diff --git a/weed/mq/segment/message_serde.go b/weed/mq/segment/message_serde.go
index 66a76c57d..ee55b18a7 100644
--- a/weed/mq/segment/message_serde.go
+++ b/weed/mq/segment/message_serde.go
@@ -35,7 +35,7 @@ func NewMessageBatchBuilder(b *flatbuffers.Builder,
 	}
 }
 
-func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string][]byte, key []byte, value []byte) {
+func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string]string, key []byte, value []byte) {
 	if builder.segmentSeqBase == 0 {
 		builder.segmentSeqBase = segmentSeq
 	}
@@ -48,7 +48,7 @@ func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, pro
 	var names, values, pairs []flatbuffers.UOffsetT
 	for k, v := range properties {
 		names = append(names, builder.b.CreateString(k))
-		values = append(values, builder.b.CreateByteVector(v))
+		values = append(values, builder.b.CreateString(v))
 	}
 	for i, _ := range names {
 		message_fbs.NameValueStart(builder.b)
diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go
index c65bffb84..30c3d784a 100644
--- a/weed/mq/segment/message_serde_test.go
+++ b/weed/mq/segment/message_serde_test.go
@@ -10,9 +10,9 @@ import (
 func TestMessageSerde(t *testing.T) {
 	b := flatbuffers.NewBuilder(1024)
 
-	prop := make(map[string][]byte)
-	prop["n1"] = []byte("v1")
-	prop["n2"] = []byte("v2")
+	prop := make(map[string]string)
+	prop["n1"] = "v1"
+	prop["n2"] = "v2"
 
 	bb := NewMessageBatchBuilder(b, 1, 2, 3, 4)
 
diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto
index bb53f635e..90b68b6c1 100644
--- a/weed/pb/mq.proto
+++ b/weed/pb/mq.proto
@@ -21,7 +21,7 @@ service SeaweedMessaging {
     }
 
     // data plane
-    rpc Publish (stream PublishRequest) returns (stream PublishResponse) {
+    rpc PublishMessage (stream PublishRequest) returns (stream PublishResponse) {
     }
 }
 
@@ -85,11 +85,14 @@ message CheckBrokerLoadResponse {
 
 //////////////////////////////////////////////////
 message PublishRequest {
-    message InitMessage {
+    message DataMessage {
+        bytes message = 1;
+    }
+    DataMessage data = 1;
+    message ControlMessage {
         Segment segment = 1;
     }
-    InitMessage init = 1;
-    bytes message = 2;
+    ControlMessage control = 2;
 }
 message PublishResponse {
     int64 ack_sequence = 1;
diff --git a/weed/pb/mq_pb/mq.pb.go b/weed/pb/mq_pb/mq.pb.go
index a9a3cb890..57dfe6353 100644
--- a/weed/pb/mq_pb/mq.pb.go
+++ b/weed/pb/mq_pb/mq.pb.go
@@ -1,7 +1,7 @@
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
-// 	protoc-gen-go v1.28.0
-// 	protoc        v3.21.4
+// 	protoc-gen-go v1.26.0
+// 	protoc        v3.17.3
 // source: mq.proto
 
 package mq_pb
@@ -623,8 +623,8 @@ type PublishRequest struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Init    *PublishRequest_InitMessage `protobuf:"bytes,1,opt,name=init,proto3" json:"init,omitempty"`
-	Message []byte                      `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
+	Data    *PublishRequest_DataMessage    `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
+	Control *PublishRequest_ControlMessage `protobuf:"bytes,2,opt,name=control,proto3" json:"control,omitempty"`
 }
 
 func (x *PublishRequest) Reset() {
@@ -659,16 +659,16 @@ func (*PublishRequest) Descriptor() ([]byte, []int) {
 	return file_mq_proto_rawDescGZIP(), []int{11}
 }
 
-func (x *PublishRequest) GetInit() *PublishRequest_InitMessage {
+func (x *PublishRequest) GetData() *PublishRequest_DataMessage {
 	if x != nil {
-		return x.Init
+		return x.Data
 	}
 	return nil
 }
 
-func (x *PublishRequest) GetMessage() []byte {
+func (x *PublishRequest) GetControl() *PublishRequest_ControlMessage {
 	if x != nil {
-		return x.Message
+		return x.Control
 	}
 	return nil
 }
@@ -728,16 +728,16 @@ func (x *PublishResponse) GetIsClosed() bool {
 	return false
 }
 
-type PublishRequest_InitMessage struct {
+type PublishRequest_DataMessage struct {
 	state         protoimpl.MessageState
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Segment *Segment `protobuf:"bytes,1,opt,name=segment,proto3" json:"segment,omitempty"`
+	Message []byte `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
 }
 
-func (x *PublishRequest_InitMessage) Reset() {
-	*x = PublishRequest_InitMessage{}
+func (x *PublishRequest_DataMessage) Reset() {
+	*x = PublishRequest_DataMessage{}
 	if protoimpl.UnsafeEnabled {
 		mi := &file_mq_proto_msgTypes[13]
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -745,13 +745,13 @@ func (x *PublishRequest_InitMessage) Reset() {
 	}
 }
 
-func (x *PublishRequest_InitMessage) String() string {
+func (x *PublishRequest_DataMessage) String() string {
 	return protoimpl.X.MessageStringOf(x)
 }
 
-func (*PublishRequest_InitMessage) ProtoMessage() {}
+func (*PublishRequest_DataMessage) ProtoMessage() {}
 
-func (x *PublishRequest_InitMessage) ProtoReflect() protoreflect.Message {
+func (x *PublishRequest_DataMessage) ProtoReflect() protoreflect.Message {
 	mi := &file_mq_proto_msgTypes[13]
 	if protoimpl.UnsafeEnabled && x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -763,12 +763,59 @@ func (x *PublishRequest_InitMessage) ProtoReflect() protoreflect.Message {
 	return mi.MessageOf(x)
 }
 
-// Deprecated: Use PublishRequest_InitMessage.ProtoReflect.Descriptor instead.
-func (*PublishRequest_InitMessage) Descriptor() ([]byte, []int) {
+// Deprecated: Use PublishRequest_DataMessage.ProtoReflect.Descriptor instead.
+func (*PublishRequest_DataMessage) Descriptor() ([]byte, []int) {
 	return file_mq_proto_rawDescGZIP(), []int{11, 0}
 }
 
-func (x *PublishRequest_InitMessage) GetSegment() *Segment {
+func (x *PublishRequest_DataMessage) GetMessage() []byte {
+	if x != nil {
+		return x.Message
+	}
+	return nil
+}
+
+type PublishRequest_ControlMessage struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Segment *Segment `protobuf:"bytes,1,opt,name=segment,proto3" json:"segment,omitempty"`
+}
+
+func (x *PublishRequest_ControlMessage) Reset() {
+	*x = PublishRequest_ControlMessage{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_mq_proto_msgTypes[14]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *PublishRequest_ControlMessage) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*PublishRequest_ControlMessage) ProtoMessage() {}
+
+func (x *PublishRequest_ControlMessage) ProtoReflect() protoreflect.Message {
+	mi := &file_mq_proto_msgTypes[14]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use PublishRequest_ControlMessage.ProtoReflect.Descriptor instead.
+func (*PublishRequest_ControlMessage) Descriptor() ([]byte, []int) {
+	return file_mq_proto_rawDescGZIP(), []int{11, 1}
+}
+
+func (x *PublishRequest_ControlMessage) GetSegment() *Segment {
 	if x != nil {
 		return x.Segment
 	}
@@ -841,51 +888,57 @@ var file_mq_proto_rawDesc = []byte{
 	0x01, 0x28, 0x03, 0x52, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x43, 0x6f, 0x75, 0x6e,
 	0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74,
 	0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x62, 0x79, 0x74, 0x65, 0x73, 0x43, 0x6f, 0x75,
-	0x6e, 0x74, 0x22, 0xa8, 0x01, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65,
-	0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3c, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20,
+	0x6e, 0x74, 0x22, 0x81, 0x02, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65,
+	0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3c, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20,
 	0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
 	0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
-	0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x04, 0x69,
-	0x6e, 0x69, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02,
-	0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x3e, 0x0a,
-	0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x2f, 0x0a, 0x07,
-	0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e,
-	0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x67,
-	0x6d, 0x65, 0x6e, 0x74, 0x52, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x51, 0x0a,
-	0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
-	0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x6b, 0x5f, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65,
-	0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x61, 0x63, 0x6b, 0x53, 0x65, 0x71, 0x75, 0x65,
-	0x6e, 0x63, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73, 0x5f, 0x63, 0x6c, 0x6f, 0x73, 0x65, 0x64,
-	0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x64,
-	0x32, 0x83, 0x04, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73,
-	0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f,
-	0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73,
+	0x74, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x04, 0x64,
+	0x61, 0x74, 0x61, 0x12, 0x45, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x18, 0x02,
+	0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
+	0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65,
+	0x73, 0x74, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
+	0x65, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x1a, 0x27, 0x0a, 0x0b, 0x44, 0x61,
+	0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73,
+	0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73,
+	0x61, 0x67, 0x65, 0x1a, 0x41, 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x4d, 0x65,
+	0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x2f, 0x0a, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74,
+	0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69,
+	0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x07, 0x73,
+	0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x51, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73,
+	0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x6b,
+	0x5f, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52,
+	0x0b, 0x61, 0x63, 0x6b, 0x53, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x12, 0x1b, 0x0a, 0x09,
+	0x69, 0x73, 0x5f, 0x63, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52,
+	0x08, 0x69, 0x73, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x32, 0x8a, 0x04, 0x0a, 0x10, 0x53, 0x65,
+	0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63,
+	0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64,
+	0x65, 0x72, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
+	0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64,
+	0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73,
 	0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f,
-	0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
-	0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
-	0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72,
-	0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x14, 0x41, 0x73,
-	0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65,
-	0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
-	0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42,
-	0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e,
-	0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73,
-	0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72,
-	0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x43,
-	0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75,
-	0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62,
-	0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61,
-	0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73,
+	0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
+	0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x14, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67,
+	0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65,
+	0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67,
+	0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52,
+	0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69,
+	0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d,
+	0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
+	0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67,
+	0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73,
 	0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53,
-	0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70,
-	0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42,
-	0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73,
-	0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72,
-	0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
-	0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43,
-	0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65,
-	0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c,
-	0x69, 0x73, 0x68, 0x12, 0x1c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
+	0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75,
+	0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
+	0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53,
+	0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
+	0x60, 0x0a, 0x0f, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f,
+	0x61, 0x64, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
+	0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61,
+	0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
+	0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f,
+	0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
+	0x00, 0x12, 0x53, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73,
+	0x61, 0x67, 0x65, 0x12, 0x1c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
 	0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
 	0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62,
 	0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
@@ -909,45 +962,47 @@ func file_mq_proto_rawDescGZIP() []byte {
 	return file_mq_proto_rawDescData
 }
 
-var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 14)
+var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 15)
 var file_mq_proto_goTypes = []interface{}{
-	(*SegmentInfo)(nil),                  // 0: messaging_pb.SegmentInfo
-	(*FindBrokerLeaderRequest)(nil),      // 1: messaging_pb.FindBrokerLeaderRequest
-	(*FindBrokerLeaderResponse)(nil),     // 2: messaging_pb.FindBrokerLeaderResponse
-	(*Partition)(nil),                    // 3: messaging_pb.Partition
-	(*Segment)(nil),                      // 4: messaging_pb.Segment
-	(*AssignSegmentBrokersRequest)(nil),  // 5: messaging_pb.AssignSegmentBrokersRequest
-	(*AssignSegmentBrokersResponse)(nil), // 6: messaging_pb.AssignSegmentBrokersResponse
-	(*CheckSegmentStatusRequest)(nil),    // 7: messaging_pb.CheckSegmentStatusRequest
-	(*CheckSegmentStatusResponse)(nil),   // 8: messaging_pb.CheckSegmentStatusResponse
-	(*CheckBrokerLoadRequest)(nil),       // 9: messaging_pb.CheckBrokerLoadRequest
-	(*CheckBrokerLoadResponse)(nil),      // 10: messaging_pb.CheckBrokerLoadResponse
-	(*PublishRequest)(nil),               // 11: messaging_pb.PublishRequest
-	(*PublishResponse)(nil),              // 12: messaging_pb.PublishResponse
-	(*PublishRequest_InitMessage)(nil),   // 13: messaging_pb.PublishRequest.InitMessage
+	(*SegmentInfo)(nil),                   // 0: messaging_pb.SegmentInfo
+	(*FindBrokerLeaderRequest)(nil),       // 1: messaging_pb.FindBrokerLeaderRequest
+	(*FindBrokerLeaderResponse)(nil),      // 2: messaging_pb.FindBrokerLeaderResponse
+	(*Partition)(nil),                     // 3: messaging_pb.Partition
+	(*Segment)(nil),                       // 4: messaging_pb.Segment
+	(*AssignSegmentBrokersRequest)(nil),   // 5: messaging_pb.AssignSegmentBrokersRequest
+	(*AssignSegmentBrokersResponse)(nil),  // 6: messaging_pb.AssignSegmentBrokersResponse
+	(*CheckSegmentStatusRequest)(nil),     // 7: messaging_pb.CheckSegmentStatusRequest
+	(*CheckSegmentStatusResponse)(nil),    // 8: messaging_pb.CheckSegmentStatusResponse
+	(*CheckBrokerLoadRequest)(nil),        // 9: messaging_pb.CheckBrokerLoadRequest
+	(*CheckBrokerLoadResponse)(nil),       // 10: messaging_pb.CheckBrokerLoadResponse
+	(*PublishRequest)(nil),                // 11: messaging_pb.PublishRequest
+	(*PublishResponse)(nil),               // 12: messaging_pb.PublishResponse
+	(*PublishRequest_DataMessage)(nil),    // 13: messaging_pb.PublishRequest.DataMessage
+	(*PublishRequest_ControlMessage)(nil), // 14: messaging_pb.PublishRequest.ControlMessage
 }
 var file_mq_proto_depIdxs = []int32{
 	4,  // 0: messaging_pb.SegmentInfo.segment:type_name -> messaging_pb.Segment
 	3,  // 1: messaging_pb.Segment.partition:type_name -> messaging_pb.Partition
 	4,  // 2: messaging_pb.AssignSegmentBrokersRequest.segment:type_name -> messaging_pb.Segment
 	4,  // 3: messaging_pb.CheckSegmentStatusRequest.segment:type_name -> messaging_pb.Segment
-	13, // 4: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage
-	4,  // 5: messaging_pb.PublishRequest.InitMessage.segment:type_name -> messaging_pb.Segment
-	1,  // 6: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest
-	5,  // 7: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest
-	7,  // 8: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest
-	9,  // 9: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest
-	11, // 10: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest
-	2,  // 11: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse
-	6,  // 12: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse
-	8,  // 13: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse
-	10, // 14: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse
-	12, // 15: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse
-	11, // [11:16] is the sub-list for method output_type
-	6,  // [6:11] is the sub-list for method input_type
-	6,  // [6:6] is the sub-list for extension type_name
-	6,  // [6:6] is the sub-list for extension extendee
-	0,  // [0:6] is the sub-list for field type_name
+	13, // 4: messaging_pb.PublishRequest.data:type_name -> messaging_pb.PublishRequest.DataMessage
+	14, // 5: messaging_pb.PublishRequest.control:type_name -> messaging_pb.PublishRequest.ControlMessage
+	4,  // 6: messaging_pb.PublishRequest.ControlMessage.segment:type_name -> messaging_pb.Segment
+	1,  // 7: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest
+	5,  // 8: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest
+	7,  // 9: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest
+	9,  // 10: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest
+	11, // 11: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishRequest
+	2,  // 12: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse
+	6,  // 13: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse
+	8,  // 14: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse
+	10, // 15: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse
+	12, // 16: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishResponse
+	12, // [12:17] is the sub-list for method output_type
+	7,  // [7:12] is the sub-list for method input_type
+	7,  // [7:7] is the sub-list for extension type_name
+	7,  // [7:7] is the sub-list for extension extendee
+	0,  // [0:7] is the sub-list for field type_name
 }
 
 func init() { file_mq_proto_init() }
@@ -1113,7 +1168,19 @@ func file_mq_proto_init() {
 			}
 		}
 		file_mq_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} {
-			switch v := v.(*PublishRequest_InitMessage); i {
+			switch v := v.(*PublishRequest_DataMessage); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_mq_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*PublishRequest_ControlMessage); i {
 			case 0:
 				return &v.state
 			case 1:
@@ -1131,7 +1198,7 @@ func file_mq_proto_init() {
 			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
 			RawDescriptor: file_mq_proto_rawDesc,
 			NumEnums:      0,
-			NumMessages:   14,
+			NumMessages:   15,
 			NumExtensions: 0,
 			NumServices:   1,
 		},
diff --git a/weed/pb/mq_pb/mq_grpc.pb.go b/weed/pb/mq_pb/mq_grpc.pb.go
index 4793f060f..3fe4f9a3c 100644
--- a/weed/pb/mq_pb/mq_grpc.pb.go
+++ b/weed/pb/mq_pb/mq_grpc.pb.go
@@ -11,6 +11,7 @@ import (
 
 // This is a compile-time assertion to ensure that this generated file
 // is compatible with the grpc package it is being compiled against.
+// Requires gRPC-Go v1.32.0 or later.
 const _ = grpc.SupportPackageIsVersion7
 
 // SeaweedMessagingClient is the client API for SeaweedMessaging service.
@@ -23,7 +24,7 @@ type SeaweedMessagingClient interface {
 	CheckSegmentStatus(ctx context.Context, in *CheckSegmentStatusRequest, opts ...grpc.CallOption) (*CheckSegmentStatusResponse, error)
 	CheckBrokerLoad(ctx context.Context, in *CheckBrokerLoadRequest, opts ...grpc.CallOption) (*CheckBrokerLoadResponse, error)
 	// data plane
-	Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error)
+	PublishMessage(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishMessageClient, error)
 }
 
 type seaweedMessagingClient struct {
@@ -70,30 +71,30 @@ func (c *seaweedMessagingClient) CheckBrokerLoad(ctx context.Context, in *CheckB
 	return out, nil
 }
 
-func (c *seaweedMessagingClient) Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error) {
-	stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[0], "/messaging_pb.SeaweedMessaging/Publish", opts...)
+func (c *seaweedMessagingClient) PublishMessage(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishMessageClient, error) {
+	stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[0], "/messaging_pb.SeaweedMessaging/PublishMessage", opts...)
 	if err != nil {
 		return nil, err
 	}
-	x := &seaweedMessagingPublishClient{stream}
+	x := &seaweedMessagingPublishMessageClient{stream}
 	return x, nil
 }
 
-type SeaweedMessaging_PublishClient interface {
+type SeaweedMessaging_PublishMessageClient interface {
 	Send(*PublishRequest) error
 	Recv() (*PublishResponse, error)
 	grpc.ClientStream
 }
 
-type seaweedMessagingPublishClient struct {
+type seaweedMessagingPublishMessageClient struct {
 	grpc.ClientStream
 }
 
-func (x *seaweedMessagingPublishClient) Send(m *PublishRequest) error {
+func (x *seaweedMessagingPublishMessageClient) Send(m *PublishRequest) error {
 	return x.ClientStream.SendMsg(m)
 }
 
-func (x *seaweedMessagingPublishClient) Recv() (*PublishResponse, error) {
+func (x *seaweedMessagingPublishMessageClient) Recv() (*PublishResponse, error) {
 	m := new(PublishResponse)
 	if err := x.ClientStream.RecvMsg(m); err != nil {
 		return nil, err
@@ -111,7 +112,7 @@ type SeaweedMessagingServer interface {
 	CheckSegmentStatus(context.Context, *CheckSegmentStatusRequest) (*CheckSegmentStatusResponse, error)
 	CheckBrokerLoad(context.Context, *CheckBrokerLoadRequest) (*CheckBrokerLoadResponse, error)
 	// data plane
-	Publish(SeaweedMessaging_PublishServer) error
+	PublishMessage(SeaweedMessaging_PublishMessageServer) error
 	mustEmbedUnimplementedSeaweedMessagingServer()
 }
 
@@ -131,8 +132,8 @@ func (UnimplementedSeaweedMessagingServer) CheckSegmentStatus(context.Context, *
 func (UnimplementedSeaweedMessagingServer) CheckBrokerLoad(context.Context, *CheckBrokerLoadRequest) (*CheckBrokerLoadResponse, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method CheckBrokerLoad not implemented")
 }
-func (UnimplementedSeaweedMessagingServer) Publish(SeaweedMessaging_PublishServer) error {
-	return status.Errorf(codes.Unimplemented, "method Publish not implemented")
+func (UnimplementedSeaweedMessagingServer) PublishMessage(SeaweedMessaging_PublishMessageServer) error {
+	return status.Errorf(codes.Unimplemented, "method PublishMessage not implemented")
 }
 func (UnimplementedSeaweedMessagingServer) mustEmbedUnimplementedSeaweedMessagingServer() {}
 
@@ -219,25 +220,25 @@ func _SeaweedMessaging_CheckBrokerLoad_Handler(srv interface{}, ctx context.Cont
 	return interceptor(ctx, in, info, handler)
 }
 
-func _SeaweedMessaging_Publish_Handler(srv interface{}, stream grpc.ServerStream) error {
-	return srv.(SeaweedMessagingServer).Publish(&seaweedMessagingPublishServer{stream})
+func _SeaweedMessaging_PublishMessage_Handler(srv interface{}, stream grpc.ServerStream) error {
+	return srv.(SeaweedMessagingServer).PublishMessage(&seaweedMessagingPublishMessageServer{stream})
 }
 
-type SeaweedMessaging_PublishServer interface {
+type SeaweedMessaging_PublishMessageServer interface {
 	Send(*PublishResponse) error
 	Recv() (*PublishRequest, error)
 	grpc.ServerStream
 }
 
-type seaweedMessagingPublishServer struct {
+type seaweedMessagingPublishMessageServer struct {
 	grpc.ServerStream
 }
 
-func (x *seaweedMessagingPublishServer) Send(m *PublishResponse) error {
+func (x *seaweedMessagingPublishMessageServer) Send(m *PublishResponse) error {
 	return x.ServerStream.SendMsg(m)
 }
 
-func (x *seaweedMessagingPublishServer) Recv() (*PublishRequest, error) {
+func (x *seaweedMessagingPublishMessageServer) Recv() (*PublishRequest, error) {
 	m := new(PublishRequest)
 	if err := x.ServerStream.RecvMsg(m); err != nil {
 		return nil, err
@@ -271,8 +272,8 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{
 	},
 	Streams: []grpc.StreamDesc{
 		{
-			StreamName:    "Publish",
-			Handler:       _SeaweedMessaging_Publish_Handler,
+			StreamName:    "PublishMessage",
+			Handler:       _SeaweedMessaging_PublishMessage_Handler,
 			ServerStreams: true,
 			ClientStreams: true,
 		},