Browse Source

refactoring

message_send
chrislu 2 years ago
parent
commit
bfd0601fd3
  1. 11
      weed/mq/client/publish_stream_processor.go
  2. 12
      weed/mq/client/publisher.go
  3. 3
      weed/mq/cmd/qsend/qsend.go
  4. 10
      weed/mq/messages/messages.go

11
weed/mq/client/publish_stream_processor.go

@ -3,6 +3,7 @@ package client
import ( import (
"context" "context"
flatbuffers "github.com/google/flatbuffers/go" flatbuffers "github.com/google/flatbuffers/go"
"github.com/seaweedfs/seaweedfs/weed/mq/messages"
"github.com/seaweedfs/seaweedfs/weed/mq/segment" "github.com/seaweedfs/seaweedfs/weed/mq/segment"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@ -31,7 +32,7 @@ type PublishStreamProcessor struct {
timeout time.Duration timeout time.Duration
// convert into bytes // convert into bytes
messagesChan chan *Message
messagesChan chan *messages.Message
builders chan *flatbuffers.Builder builders chan *flatbuffers.Builder
batchMessageCountLimit int batchMessageCountLimit int
@ -51,7 +52,7 @@ func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
batchMessageCountLimit: batchMessageCountLimit, batchMessageCountLimit: batchMessageCountLimit,
builders: make(chan *flatbuffers.Builder, batchCountLimit), builders: make(chan *flatbuffers.Builder, batchCountLimit),
messagesChan: make(chan *Message, 1024),
messagesChan: make(chan *messages.Message, 1024),
doneChan: make(chan struct{}), doneChan: make(chan struct{}),
timeout: timeout, timeout: timeout,
} }
@ -62,7 +63,7 @@ func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration
return t return t
} }
func (p *PublishStreamProcessor) AddMessage(m *Message) error {
func (p *PublishStreamProcessor) AddMessage(m *messages.Message) error {
p.messagesChan <- m p.messagesChan <- m
return nil return nil
} }
@ -72,7 +73,7 @@ func (p *PublishStreamProcessor) Shutdown() error {
return nil return nil
} }
func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*Message) error {
func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*messages.Message) error {
if len(messages) == 0 { if len(messages) == 0 {
return nil return nil
@ -102,7 +103,7 @@ func (p *PublishStreamProcessor) doLoopUpload() {
brokerGrpcAddress := "localhost:17777" brokerGrpcAddress := "localhost:17777"
// TOOD parallelize the uploading with separate uploader // TOOD parallelize the uploading with separate uploader
messages := make([]*Message, 0, p.batchMessageCountLimit)
messages := make([]*messages.Message, 0, p.batchMessageCountLimit)
util.RetryForever("publish message", func() error { util.RetryForever("publish message", func() error {
return pb.WithBrokerGrpcClient(false, brokerGrpcAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { return pb.WithBrokerGrpcClient(false, brokerGrpcAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {

12
weed/mq/client/publisher.go

@ -1,12 +1,13 @@
package client package client
import ( import (
"github.com/seaweedfs/seaweedfs/weed/mq/messages"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"time" "time"
) )
type PublishProcessor interface { type PublishProcessor interface {
AddMessage(m *Message) error
AddMessage(m *messages.Message) error
Shutdown() error Shutdown() error
} }
@ -30,14 +31,7 @@ func NewPublisher(option *PublisherOption) *Publisher {
return p return p
} }
type Message struct {
Key []byte
Content []byte
Properties map[string]string
Ts time.Time
}
func (p Publisher) Publish(m *Message) error {
func (p Publisher) Publish(m *messages.Message) error {
return p.processor.AddMessage(m) return p.processor.AddMessage(m)
} }

3
weed/mq/cmd/qsend/qsend.go

@ -5,6 +5,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client" "github.com/seaweedfs/seaweedfs/weed/mq/client"
"github.com/seaweedfs/seaweedfs/weed/mq/messages"
"os" "os"
"time" "time"
) )
@ -24,7 +25,7 @@ func main() {
err := eachLineStdin(func(line string) error { err := eachLineStdin(func(line string) error {
if len(line) > 0 { if len(line) > 0 {
if err := publisher.Publish(&client.Message{
if err := publisher.Publish(&messages.Message{
Key: nil, Key: nil,
Content: []byte(line), Content: []byte(line),
Properties: nil, Properties: nil,

10
weed/mq/messages/messages.go

@ -0,0 +1,10 @@
package messages
import "time"
type Message struct {
Key []byte
Content []byte
Properties map[string]string
Ts time.Time
}
Loading…
Cancel
Save