chrislu
2 years ago
10 changed files with 425 additions and 0 deletions
-
1go.mod
-
2go.sum
-
66weed/mq/README.md
-
48weed/mq/segment/message_serde.go
-
47weed/mq/segment/message_serde_test.go
-
1weed/mq/segment/segment_serde.go
-
3weed/pb/Makefile
-
15weed/pb/message.fbs
-
179weed/pb/message_fbs/Message.go
-
63weed/pb/message_fbs/NameValue.go
@ -0,0 +1,66 @@ |
|||
# SeaweedMQ Message Queue on SeaweedFS (WIP, not ready) |
|||
|
|||
## What are the use cases it is designed for? |
|||
|
|||
Message queues are like water pipes. Messages flow in the pipes to their destinations. |
|||
|
|||
However, what if a flood comes? Of course, you can increase the number of partitions, add more brokers, restart, |
|||
and watch the traffic level closely. |
|||
|
|||
Sometimes the flood is expected. For example, backfill some old data in batch, and switch to online messages. |
|||
You may want to ensure enough brokers to handle the data and reduce them later to cut cost. |
|||
|
|||
SeaweedMQ is designed for use cases that need to: |
|||
* Receive and save large number of messages. |
|||
* Handle spike traffic automatically. |
|||
|
|||
## What is special about SeaweedMQ? |
|||
|
|||
* Separate computation and storage nodes that scales independently. |
|||
* Offline messages can be operated as normal files. |
|||
* Unlimited storage space by adding volume servers. |
|||
* Unlimited message brokers. |
|||
* Scale up and down with auto split and merge message topics. |
|||
* Topics can automatically split into segments when traffic increases, and vice verse. |
|||
* |
|||
|
|||
# Design |
|||
|
|||
# How it works? |
|||
|
|||
Brokers are just computation nodes without storage. When a broker starts, it reports itself to masters. |
|||
Among all the brokers, one of them will be selected as the leader by the masters. |
|||
|
|||
A topic needs to define its partition key on its messages. |
|||
|
|||
Messages for a topic are divided into segments. |
|||
|
|||
During write time, the client will ask the broker leader for a few brokers to process the segment. |
|||
|
|||
The broker leader will check whether the segment already has assigned the brokers. If not, select a few based |
|||
on their loads, save the selection into filer, and tell the client. |
|||
|
|||
The client will write the messages for this segment to the selected brokers. |
|||
|
|||
## Failover |
|||
|
|||
The broker leader does not contain any state. If it fails, the masters will select a different broker. |
|||
|
|||
For a segment, if any one of the selected brokers is down, the remaining brokers should try to write received messages |
|||
to the filer, and close the segment to the clients. |
|||
|
|||
Then the clients should start a new segment. The masters should other healthy brokers to handle the new segment. |
|||
|
|||
So any brokers can go down without losing data. |
|||
|
|||
## Auto Split or Merge |
|||
|
|||
(The idea is learned from Pravega.) |
|||
|
|||
The brokers should report its traffic load to the broker leader periodically. |
|||
|
|||
If any segment has too much load, the broker leader will ask the brokers to tell the client to |
|||
close current one and create two new segments. |
|||
|
|||
If 2 neighboring segments have the combined load below average load per segment, the broker leader will ask |
|||
the brokers to tell the client to close this 2 segments and create a new segment. |
@ -0,0 +1,48 @@ |
|||
package segment |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/pb/message_fbs" |
|||
flatbuffers "github.com/google/flatbuffers/go" |
|||
) |
|||
|
|||
func CreateMessage(b *flatbuffers.Builder, producerId int32, producerSeq int64, segmentId int32, segmentSeq int64, |
|||
eventTsNs int64, recvTsNs int64, properties map[string]string, key []byte, value []byte) { |
|||
b.Reset() |
|||
|
|||
var names, values, pairs []flatbuffers.UOffsetT |
|||
for k, v := range properties { |
|||
names = append(names, b.CreateString(k)) |
|||
values = append(values, b.CreateString(v)) |
|||
} |
|||
|
|||
for i, _ := range names { |
|||
message_fbs.NameValueStart(b) |
|||
message_fbs.NameValueAddName(b, names[i]) |
|||
message_fbs.NameValueAddValue(b, values[i]) |
|||
pair := message_fbs.NameValueEnd(b) |
|||
pairs = append(pairs, pair) |
|||
} |
|||
message_fbs.MessageStartPropertiesVector(b, len(properties)) |
|||
for i := len(pairs) - 1; i >= 0; i-- { |
|||
b.PrependUOffsetT(pairs[i]) |
|||
} |
|||
prop := b.EndVector(len(properties)) |
|||
|
|||
k := b.CreateByteVector(key) |
|||
v := b.CreateByteVector(value) |
|||
|
|||
message_fbs.MessageStart(b) |
|||
message_fbs.MessageAddProducerId(b, producerId) |
|||
message_fbs.MessageAddProducerSeq(b, producerSeq) |
|||
message_fbs.MessageAddSegmentId(b, segmentId) |
|||
message_fbs.MessageAddSegmentSeq(b, segmentSeq) |
|||
message_fbs.MessageAddEventTsNs(b, eventTsNs) |
|||
message_fbs.MessageAddRecvTsNs(b, recvTsNs) |
|||
|
|||
message_fbs.MessageAddProperties(b, prop) |
|||
message_fbs.MessageAddKey(b, k) |
|||
message_fbs.MessageAddData(b, v) |
|||
message := message_fbs.MessageEnd(b) |
|||
|
|||
b.Finish(message) |
|||
} |
@ -0,0 +1,47 @@ |
|||
package segment |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/pb/message_fbs" |
|||
flatbuffers "github.com/google/flatbuffers/go" |
|||
"github.com/stretchr/testify/assert" |
|||
"testing" |
|||
) |
|||
|
|||
func TestMessageSerde(t *testing.T) { |
|||
b := flatbuffers.NewBuilder(1024) |
|||
|
|||
prop := make(map[string]string) |
|||
prop["n1"] = "v1" |
|||
prop["n2"] = "v2" |
|||
|
|||
CreateMessage(b, 1, 2, 3, 4, 5, 6, prop, |
|||
[]byte("the primary key"), []byte("body is here")) |
|||
|
|||
buf := b.FinishedBytes() |
|||
|
|||
println("serialized size", len(buf)) |
|||
|
|||
m := message_fbs.GetRootAsMessage(buf, 0) |
|||
|
|||
assert.Equal(t, int32(1), m.ProducerId()) |
|||
assert.Equal(t, int64(2), m.ProducerSeq()) |
|||
assert.Equal(t, int32(3), m.SegmentId()) |
|||
assert.Equal(t, int64(4), m.SegmentSeq()) |
|||
assert.Equal(t, int64(5), m.EventTsNs()) |
|||
assert.Equal(t, int64(6), m.RecvTsNs()) |
|||
|
|||
assert.Equal(t, 2, m.PropertiesLength()) |
|||
nv := &message_fbs.NameValue{} |
|||
m.Properties(nv, 0) |
|||
assert.Equal(t, "n1", string(nv.Name())) |
|||
assert.Equal(t, "v1", string(nv.Value())) |
|||
m.Properties(nv, 1) |
|||
assert.Equal(t, "n2", string(nv.Name())) |
|||
assert.Equal(t, "v2", string(nv.Value())) |
|||
assert.Equal(t, []byte("the primary key"), m.Key()) |
|||
assert.Equal(t, []byte("body is here"), m.Data()) |
|||
|
|||
m.MutateSegmentSeq(123) |
|||
assert.Equal(t, int64(123), m.SegmentSeq()) |
|||
|
|||
} |
@ -0,0 +1 @@ |
|||
package segment |
@ -0,0 +1,15 @@ |
|||
table NameValue { |
|||
name:string (key); |
|||
value:string; |
|||
} |
|||
table Message { |
|||
producer_id:int32 (id:0); |
|||
producer_seq:int64 (id:2); |
|||
segment_id:int32 (id:1); |
|||
segment_seq:int64 (id:3); |
|||
event_ts_ns:int64 (id:4); |
|||
recv_ts_ns:int64 (id:5); |
|||
properties:[NameValue] (id:6); |
|||
key:string (id:7); // bytes |
|||
data:string (id:8); // bytes |
|||
} |
@ -0,0 +1,179 @@ |
|||
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
|
|||
|
|||
package message_fbs |
|||
|
|||
import ( |
|||
flatbuffers "github.com/google/flatbuffers/go" |
|||
) |
|||
|
|||
type Message struct { |
|||
_tab flatbuffers.Table |
|||
} |
|||
|
|||
func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { |
|||
n := flatbuffers.GetUOffsetT(buf[offset:]) |
|||
x := &Message{} |
|||
x.Init(buf, n+offset) |
|||
return x |
|||
} |
|||
|
|||
func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { |
|||
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) |
|||
x := &Message{} |
|||
x.Init(buf, n+offset+flatbuffers.SizeUint32) |
|||
return x |
|||
} |
|||
|
|||
func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) { |
|||
rcv._tab.Bytes = buf |
|||
rcv._tab.Pos = i |
|||
} |
|||
|
|||
func (rcv *Message) Table() flatbuffers.Table { |
|||
return rcv._tab |
|||
} |
|||
|
|||
func (rcv *Message) ProducerId() int32 { |
|||
o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) |
|||
if o != 0 { |
|||
return rcv._tab.GetInt32(o + rcv._tab.Pos) |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (rcv *Message) MutateProducerId(n int32) bool { |
|||
return rcv._tab.MutateInt32Slot(4, n) |
|||
} |
|||
|
|||
func (rcv *Message) SegmentId() int32 { |
|||
o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) |
|||
if o != 0 { |
|||
return rcv._tab.GetInt32(o + rcv._tab.Pos) |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (rcv *Message) MutateSegmentId(n int32) bool { |
|||
return rcv._tab.MutateInt32Slot(6, n) |
|||
} |
|||
|
|||
func (rcv *Message) ProducerSeq() int64 { |
|||
o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) |
|||
if o != 0 { |
|||
return rcv._tab.GetInt64(o + rcv._tab.Pos) |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (rcv *Message) MutateProducerSeq(n int64) bool { |
|||
return rcv._tab.MutateInt64Slot(8, n) |
|||
} |
|||
|
|||
func (rcv *Message) SegmentSeq() int64 { |
|||
o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) |
|||
if o != 0 { |
|||
return rcv._tab.GetInt64(o + rcv._tab.Pos) |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (rcv *Message) MutateSegmentSeq(n int64) bool { |
|||
return rcv._tab.MutateInt64Slot(10, n) |
|||
} |
|||
|
|||
func (rcv *Message) EventTsNs() int64 { |
|||
o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) |
|||
if o != 0 { |
|||
return rcv._tab.GetInt64(o + rcv._tab.Pos) |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (rcv *Message) MutateEventTsNs(n int64) bool { |
|||
return rcv._tab.MutateInt64Slot(12, n) |
|||
} |
|||
|
|||
func (rcv *Message) RecvTsNs() int64 { |
|||
o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) |
|||
if o != 0 { |
|||
return rcv._tab.GetInt64(o + rcv._tab.Pos) |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (rcv *Message) MutateRecvTsNs(n int64) bool { |
|||
return rcv._tab.MutateInt64Slot(14, n) |
|||
} |
|||
|
|||
func (rcv *Message) Properties(obj *NameValue, j int) bool { |
|||
o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) |
|||
if o != 0 { |
|||
x := rcv._tab.Vector(o) |
|||
x += flatbuffers.UOffsetT(j) * 4 |
|||
x = rcv._tab.Indirect(x) |
|||
obj.Init(rcv._tab.Bytes, x) |
|||
return true |
|||
} |
|||
return false |
|||
} |
|||
|
|||
func (rcv *Message) PropertiesLength() int { |
|||
o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) |
|||
if o != 0 { |
|||
return rcv._tab.VectorLen(o) |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (rcv *Message) Key() []byte { |
|||
o := flatbuffers.UOffsetT(rcv._tab.Offset(18)) |
|||
if o != 0 { |
|||
return rcv._tab.ByteVector(o + rcv._tab.Pos) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (rcv *Message) Data() []byte { |
|||
o := flatbuffers.UOffsetT(rcv._tab.Offset(20)) |
|||
if o != 0 { |
|||
return rcv._tab.ByteVector(o + rcv._tab.Pos) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func MessageStart(builder *flatbuffers.Builder) { |
|||
builder.StartObject(9) |
|||
} |
|||
func MessageAddProducerId(builder *flatbuffers.Builder, producerId int32) { |
|||
builder.PrependInt32Slot(0, producerId, 0) |
|||
} |
|||
func MessageAddSegmentId(builder *flatbuffers.Builder, segmentId int32) { |
|||
builder.PrependInt32Slot(1, segmentId, 0) |
|||
} |
|||
func MessageAddProducerSeq(builder *flatbuffers.Builder, producerSeq int64) { |
|||
builder.PrependInt64Slot(2, producerSeq, 0) |
|||
} |
|||
func MessageAddSegmentSeq(builder *flatbuffers.Builder, segmentSeq int64) { |
|||
builder.PrependInt64Slot(3, segmentSeq, 0) |
|||
} |
|||
func MessageAddEventTsNs(builder *flatbuffers.Builder, eventTsNs int64) { |
|||
builder.PrependInt64Slot(4, eventTsNs, 0) |
|||
} |
|||
func MessageAddRecvTsNs(builder *flatbuffers.Builder, recvTsNs int64) { |
|||
builder.PrependInt64Slot(5, recvTsNs, 0) |
|||
} |
|||
func MessageAddProperties(builder *flatbuffers.Builder, properties flatbuffers.UOffsetT) { |
|||
builder.PrependUOffsetTSlot(6, flatbuffers.UOffsetT(properties), 0) |
|||
} |
|||
func MessageStartPropertiesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { |
|||
return builder.StartVector(4, numElems, 4) |
|||
} |
|||
func MessageAddKey(builder *flatbuffers.Builder, key flatbuffers.UOffsetT) { |
|||
builder.PrependUOffsetTSlot(7, flatbuffers.UOffsetT(key), 0) |
|||
} |
|||
func MessageAddData(builder *flatbuffers.Builder, data flatbuffers.UOffsetT) { |
|||
builder.PrependUOffsetTSlot(8, flatbuffers.UOffsetT(data), 0) |
|||
} |
|||
func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { |
|||
return builder.EndObject() |
|||
} |
@ -0,0 +1,63 @@ |
|||
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
|
|||
|
|||
package message_fbs |
|||
|
|||
import ( |
|||
flatbuffers "github.com/google/flatbuffers/go" |
|||
) |
|||
|
|||
type NameValue struct { |
|||
_tab flatbuffers.Table |
|||
} |
|||
|
|||
func GetRootAsNameValue(buf []byte, offset flatbuffers.UOffsetT) *NameValue { |
|||
n := flatbuffers.GetUOffsetT(buf[offset:]) |
|||
x := &NameValue{} |
|||
x.Init(buf, n+offset) |
|||
return x |
|||
} |
|||
|
|||
func GetSizePrefixedRootAsNameValue(buf []byte, offset flatbuffers.UOffsetT) *NameValue { |
|||
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) |
|||
x := &NameValue{} |
|||
x.Init(buf, n+offset+flatbuffers.SizeUint32) |
|||
return x |
|||
} |
|||
|
|||
func (rcv *NameValue) Init(buf []byte, i flatbuffers.UOffsetT) { |
|||
rcv._tab.Bytes = buf |
|||
rcv._tab.Pos = i |
|||
} |
|||
|
|||
func (rcv *NameValue) Table() flatbuffers.Table { |
|||
return rcv._tab |
|||
} |
|||
|
|||
func (rcv *NameValue) Name() []byte { |
|||
o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) |
|||
if o != 0 { |
|||
return rcv._tab.ByteVector(o + rcv._tab.Pos) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (rcv *NameValue) Value() []byte { |
|||
o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) |
|||
if o != 0 { |
|||
return rcv._tab.ByteVector(o + rcv._tab.Pos) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func NameValueStart(builder *flatbuffers.Builder) { |
|||
builder.StartObject(2) |
|||
} |
|||
func NameValueAddName(builder *flatbuffers.Builder, name flatbuffers.UOffsetT) { |
|||
builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(name), 0) |
|||
} |
|||
func NameValueAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { |
|||
builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(value), 0) |
|||
} |
|||
func NameValueEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { |
|||
return builder.EndObject() |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue