Browse Source

add flatbuffer serde for message

pull/3379/head
chrislu 3 years ago
parent
commit
7f672b37e1
  1. 1
      go.mod
  2. 2
      go.sum
  3. 66
      weed/mq/README.md
  4. 48
      weed/mq/segment/message_serde.go
  5. 47
      weed/mq/segment/message_serde_test.go
  6. 1
      weed/mq/segment/segment_serde.go
  7. 3
      weed/pb/Makefile
  8. 15
      weed/pb/message.fbs
  9. 179
      weed/pb/message_fbs/Message.go
  10. 63
      weed/pb/message_fbs/NameValue.go

1
go.mod

@ -184,6 +184,7 @@ require (
github.com/fatih/color v1.13.0 // indirect
github.com/fclairamb/go-log v0.3.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/flatbuffers v2.0.6+incompatible // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
github.com/googleapis/go-type-adapters v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect

2
go.sum

@ -415,6 +415,8 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/flatbuffers v2.0.6+incompatible h1:XHFReMv7nFFusa+CEokzWbzaYocKXI6C7hdU5Kgh9Lw=
github.com/google/flatbuffers v2.0.6+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=

66
weed/mq/README.md

@ -0,0 +1,66 @@
# SeaweedMQ Message Queue on SeaweedFS (WIP, not ready)
## What are the use cases it is designed for?
Message queues are like water pipes. Messages flow in the pipes to their destinations.
However, what if a flood comes? Of course, you can increase the number of partitions, add more brokers, restart,
and watch the traffic level closely.
Sometimes the flood is expected. For example, backfill some old data in batch, and switch to online messages.
You may want to ensure enough brokers to handle the data and reduce them later to cut cost.
SeaweedMQ is designed for use cases that need to:
* Receive and save large number of messages.
* Handle spike traffic automatically.
## What is special about SeaweedMQ?
* Separate computation and storage nodes that scales independently.
* Offline messages can be operated as normal files.
* Unlimited storage space by adding volume servers.
* Unlimited message brokers.
* Scale up and down with auto split and merge message topics.
* Topics can automatically split into segments when traffic increases, and vice verse.
*
# Design
# How it works?
Brokers are just computation nodes without storage. When a broker starts, it reports itself to masters.
Among all the brokers, one of them will be selected as the leader by the masters.
A topic needs to define its partition key on its messages.
Messages for a topic are divided into segments.
During write time, the client will ask the broker leader for a few brokers to process the segment.
The broker leader will check whether the segment already has assigned the brokers. If not, select a few based
on their loads, save the selection into filer, and tell the client.
The client will write the messages for this segment to the selected brokers.
## Failover
The broker leader does not contain any state. If it fails, the masters will select a different broker.
For a segment, if any one of the selected brokers is down, the remaining brokers should try to write received messages
to the filer, and close the segment to the clients.
Then the clients should start a new segment. The masters should other healthy brokers to handle the new segment.
So any brokers can go down without losing data.
## Auto Split or Merge
(The idea is learned from Pravega.)
The brokers should report its traffic load to the broker leader periodically.
If any segment has too much load, the broker leader will ask the brokers to tell the client to
close current one and create two new segments.
If 2 neighboring segments have the combined load below average load per segment, the broker leader will ask
the brokers to tell the client to close this 2 segments and create a new segment.

48
weed/mq/segment/message_serde.go

@ -0,0 +1,48 @@
package segment
import (
"github.com/chrislusf/seaweedfs/weed/pb/message_fbs"
flatbuffers "github.com/google/flatbuffers/go"
)
func CreateMessage(b *flatbuffers.Builder, producerId int32, producerSeq int64, segmentId int32, segmentSeq int64,
eventTsNs int64, recvTsNs int64, properties map[string]string, key []byte, value []byte) {
b.Reset()
var names, values, pairs []flatbuffers.UOffsetT
for k, v := range properties {
names = append(names, b.CreateString(k))
values = append(values, b.CreateString(v))
}
for i, _ := range names {
message_fbs.NameValueStart(b)
message_fbs.NameValueAddName(b, names[i])
message_fbs.NameValueAddValue(b, values[i])
pair := message_fbs.NameValueEnd(b)
pairs = append(pairs, pair)
}
message_fbs.MessageStartPropertiesVector(b, len(properties))
for i := len(pairs) - 1; i >= 0; i-- {
b.PrependUOffsetT(pairs[i])
}
prop := b.EndVector(len(properties))
k := b.CreateByteVector(key)
v := b.CreateByteVector(value)
message_fbs.MessageStart(b)
message_fbs.MessageAddProducerId(b, producerId)
message_fbs.MessageAddProducerSeq(b, producerSeq)
message_fbs.MessageAddSegmentId(b, segmentId)
message_fbs.MessageAddSegmentSeq(b, segmentSeq)
message_fbs.MessageAddEventTsNs(b, eventTsNs)
message_fbs.MessageAddRecvTsNs(b, recvTsNs)
message_fbs.MessageAddProperties(b, prop)
message_fbs.MessageAddKey(b, k)
message_fbs.MessageAddData(b, v)
message := message_fbs.MessageEnd(b)
b.Finish(message)
}

47
weed/mq/segment/message_serde_test.go

@ -0,0 +1,47 @@
package segment
import (
"github.com/chrislusf/seaweedfs/weed/pb/message_fbs"
flatbuffers "github.com/google/flatbuffers/go"
"github.com/stretchr/testify/assert"
"testing"
)
func TestMessageSerde(t *testing.T) {
b := flatbuffers.NewBuilder(1024)
prop := make(map[string]string)
prop["n1"] = "v1"
prop["n2"] = "v2"
CreateMessage(b, 1, 2, 3, 4, 5, 6, prop,
[]byte("the primary key"), []byte("body is here"))
buf := b.FinishedBytes()
println("serialized size", len(buf))
m := message_fbs.GetRootAsMessage(buf, 0)
assert.Equal(t, int32(1), m.ProducerId())
assert.Equal(t, int64(2), m.ProducerSeq())
assert.Equal(t, int32(3), m.SegmentId())
assert.Equal(t, int64(4), m.SegmentSeq())
assert.Equal(t, int64(5), m.EventTsNs())
assert.Equal(t, int64(6), m.RecvTsNs())
assert.Equal(t, 2, m.PropertiesLength())
nv := &message_fbs.NameValue{}
m.Properties(nv, 0)
assert.Equal(t, "n1", string(nv.Name()))
assert.Equal(t, "v1", string(nv.Value()))
m.Properties(nv, 1)
assert.Equal(t, "n2", string(nv.Name()))
assert.Equal(t, "v2", string(nv.Value()))
assert.Equal(t, []byte("the primary key"), m.Key())
assert.Equal(t, []byte("body is here"), m.Data())
m.MutateSegmentSeq(123)
assert.Equal(t, int64(123), m.SegmentSeq())
}

1
weed/mq/segment/segment_serde.go

@ -0,0 +1 @@
package segment

3
weed/pb/Makefile

@ -13,3 +13,6 @@ gen:
protoc mq.proto --go_out=./mq_pb --go-grpc_out=./mq_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
# protoc filer.proto --java_out=../../other/java/client/src/main/java
cp filer.proto ../../other/java/client/src/main/proto
fbs:
flatc --go -o . --go-namespace message_fbs message.fbs

15
weed/pb/message.fbs

@ -0,0 +1,15 @@
table NameValue {
name:string (key);
value:string;
}
table Message {
producer_id:int32 (id:0);
producer_seq:int64 (id:2);
segment_id:int32 (id:1);
segment_seq:int64 (id:3);
event_ts_ns:int64 (id:4);
recv_ts_ns:int64 (id:5);
properties:[NameValue] (id:6);
key:string (id:7); // bytes
data:string (id:8); // bytes
}

179
weed/pb/message_fbs/Message.go

@ -0,0 +1,179 @@
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package message_fbs
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type Message struct {
_tab flatbuffers.Table
}
func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &Message{}
x.Init(buf, n+offset)
return x
}
func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
x := &Message{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *Message) Table() flatbuffers.Table {
return rcv._tab
}
func (rcv *Message) ProducerId() int32 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.GetInt32(o + rcv._tab.Pos)
}
return 0
}
func (rcv *Message) MutateProducerId(n int32) bool {
return rcv._tab.MutateInt32Slot(4, n)
}
func (rcv *Message) SegmentId() int32 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.GetInt32(o + rcv._tab.Pos)
}
return 0
}
func (rcv *Message) MutateSegmentId(n int32) bool {
return rcv._tab.MutateInt32Slot(6, n)
}
func (rcv *Message) ProducerSeq() int64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.GetInt64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *Message) MutateProducerSeq(n int64) bool {
return rcv._tab.MutateInt64Slot(8, n)
}
func (rcv *Message) SegmentSeq() int64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.GetInt64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *Message) MutateSegmentSeq(n int64) bool {
return rcv._tab.MutateInt64Slot(10, n)
}
func (rcv *Message) EventTsNs() int64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
return rcv._tab.GetInt64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *Message) MutateEventTsNs(n int64) bool {
return rcv._tab.MutateInt64Slot(12, n)
}
func (rcv *Message) RecvTsNs() int64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(14))
if o != 0 {
return rcv._tab.GetInt64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *Message) MutateRecvTsNs(n int64) bool {
return rcv._tab.MutateInt64Slot(14, n)
}
func (rcv *Message) Properties(obj *NameValue, j int) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(16))
if o != 0 {
x := rcv._tab.Vector(o)
x += flatbuffers.UOffsetT(j) * 4
x = rcv._tab.Indirect(x)
obj.Init(rcv._tab.Bytes, x)
return true
}
return false
}
func (rcv *Message) PropertiesLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(16))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *Message) Key() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(18))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *Message) Data() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(20))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func MessageStart(builder *flatbuffers.Builder) {
builder.StartObject(9)
}
func MessageAddProducerId(builder *flatbuffers.Builder, producerId int32) {
builder.PrependInt32Slot(0, producerId, 0)
}
func MessageAddSegmentId(builder *flatbuffers.Builder, segmentId int32) {
builder.PrependInt32Slot(1, segmentId, 0)
}
func MessageAddProducerSeq(builder *flatbuffers.Builder, producerSeq int64) {
builder.PrependInt64Slot(2, producerSeq, 0)
}
func MessageAddSegmentSeq(builder *flatbuffers.Builder, segmentSeq int64) {
builder.PrependInt64Slot(3, segmentSeq, 0)
}
func MessageAddEventTsNs(builder *flatbuffers.Builder, eventTsNs int64) {
builder.PrependInt64Slot(4, eventTsNs, 0)
}
func MessageAddRecvTsNs(builder *flatbuffers.Builder, recvTsNs int64) {
builder.PrependInt64Slot(5, recvTsNs, 0)
}
func MessageAddProperties(builder *flatbuffers.Builder, properties flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(6, flatbuffers.UOffsetT(properties), 0)
}
func MessageStartPropertiesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(4, numElems, 4)
}
func MessageAddKey(builder *flatbuffers.Builder, key flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(7, flatbuffers.UOffsetT(key), 0)
}
func MessageAddData(builder *flatbuffers.Builder, data flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(8, flatbuffers.UOffsetT(data), 0)
}
func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}

63
weed/pb/message_fbs/NameValue.go

@ -0,0 +1,63 @@
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package message_fbs
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type NameValue struct {
_tab flatbuffers.Table
}
func GetRootAsNameValue(buf []byte, offset flatbuffers.UOffsetT) *NameValue {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &NameValue{}
x.Init(buf, n+offset)
return x
}
func GetSizePrefixedRootAsNameValue(buf []byte, offset flatbuffers.UOffsetT) *NameValue {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
x := &NameValue{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
func (rcv *NameValue) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *NameValue) Table() flatbuffers.Table {
return rcv._tab
}
func (rcv *NameValue) Name() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *NameValue) Value() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func NameValueStart(builder *flatbuffers.Builder) {
builder.StartObject(2)
}
func NameValueAddName(builder *flatbuffers.Builder, name flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(name), 0)
}
func NameValueAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(value), 0)
}
func NameValueEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}
Loading…
Cancel
Save