diff --git a/go.mod b/go.mod index fc956e42a..64e051b49 100644 --- a/go.mod +++ b/go.mod @@ -184,6 +184,7 @@ require ( github.com/fatih/color v1.13.0 // indirect github.com/fclairamb/go-log v0.3.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/flatbuffers v2.0.6+incompatible // indirect github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect github.com/googleapis/go-type-adapters v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect diff --git a/go.sum b/go.sum index 22a9a1ac4..93070fccc 100644 --- a/go.sum +++ b/go.sum @@ -415,6 +415,8 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= +github.com/google/flatbuffers v2.0.6+incompatible h1:XHFReMv7nFFusa+CEokzWbzaYocKXI6C7hdU5Kgh9Lw= +github.com/google/flatbuffers v2.0.6+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= diff --git a/weed/mq/README.md b/weed/mq/README.md new file mode 100644 index 000000000..2f1127869 --- /dev/null +++ b/weed/mq/README.md @@ -0,0 +1,66 @@ +# SeaweedMQ Message Queue on SeaweedFS (WIP, not ready) + +## What are the use cases it is designed for? + +Message queues are like water pipes. Messages flow in the pipes to their destinations. + +However, what if a flood comes? Of course, you can increase the number of partitions, add more brokers, restart, +and watch the traffic level closely. + +Sometimes the flood is expected. For example, backfill some old data in batch, and switch to online messages. +You may want to ensure enough brokers to handle the data and reduce them later to cut cost. + +SeaweedMQ is designed for use cases that need to: +* Receive and save large number of messages. +* Handle spike traffic automatically. + +## What is special about SeaweedMQ? + +* Separate computation and storage nodes that scales independently. + * Offline messages can be operated as normal files. + * Unlimited storage space by adding volume servers. + * Unlimited message brokers. +* Scale up and down with auto split and merge message topics. + * Topics can automatically split into segments when traffic increases, and vice verse. + * + +# Design + +# How it works? + +Brokers are just computation nodes without storage. When a broker starts, it reports itself to masters. +Among all the brokers, one of them will be selected as the leader by the masters. + +A topic needs to define its partition key on its messages. + +Messages for a topic are divided into segments. + +During write time, the client will ask the broker leader for a few brokers to process the segment. + +The broker leader will check whether the segment already has assigned the brokers. If not, select a few based +on their loads, save the selection into filer, and tell the client. + +The client will write the messages for this segment to the selected brokers. + +## Failover + +The broker leader does not contain any state. If it fails, the masters will select a different broker. + +For a segment, if any one of the selected brokers is down, the remaining brokers should try to write received messages +to the filer, and close the segment to the clients. + +Then the clients should start a new segment. The masters should other healthy brokers to handle the new segment. + +So any brokers can go down without losing data. + +## Auto Split or Merge + +(The idea is learned from Pravega.) + +The brokers should report its traffic load to the broker leader periodically. + +If any segment has too much load, the broker leader will ask the brokers to tell the client to +close current one and create two new segments. + +If 2 neighboring segments have the combined load below average load per segment, the broker leader will ask +the brokers to tell the client to close this 2 segments and create a new segment. diff --git a/weed/mq/segment/message_serde.go b/weed/mq/segment/message_serde.go new file mode 100644 index 000000000..b69d78cab --- /dev/null +++ b/weed/mq/segment/message_serde.go @@ -0,0 +1,48 @@ +package segment + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/message_fbs" + flatbuffers "github.com/google/flatbuffers/go" +) + +func CreateMessage(b *flatbuffers.Builder, producerId int32, producerSeq int64, segmentId int32, segmentSeq int64, + eventTsNs int64, recvTsNs int64, properties map[string]string, key []byte, value []byte) { + b.Reset() + + var names, values, pairs []flatbuffers.UOffsetT + for k, v := range properties { + names = append(names, b.CreateString(k)) + values = append(values, b.CreateString(v)) + } + + for i, _ := range names { + message_fbs.NameValueStart(b) + message_fbs.NameValueAddName(b, names[i]) + message_fbs.NameValueAddValue(b, values[i]) + pair := message_fbs.NameValueEnd(b) + pairs = append(pairs, pair) + } + message_fbs.MessageStartPropertiesVector(b, len(properties)) + for i := len(pairs) - 1; i >= 0; i-- { + b.PrependUOffsetT(pairs[i]) + } + prop := b.EndVector(len(properties)) + + k := b.CreateByteVector(key) + v := b.CreateByteVector(value) + + message_fbs.MessageStart(b) + message_fbs.MessageAddProducerId(b, producerId) + message_fbs.MessageAddProducerSeq(b, producerSeq) + message_fbs.MessageAddSegmentId(b, segmentId) + message_fbs.MessageAddSegmentSeq(b, segmentSeq) + message_fbs.MessageAddEventTsNs(b, eventTsNs) + message_fbs.MessageAddRecvTsNs(b, recvTsNs) + + message_fbs.MessageAddProperties(b, prop) + message_fbs.MessageAddKey(b, k) + message_fbs.MessageAddData(b, v) + message := message_fbs.MessageEnd(b) + + b.Finish(message) +} diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go new file mode 100644 index 000000000..7ba0febf0 --- /dev/null +++ b/weed/mq/segment/message_serde_test.go @@ -0,0 +1,47 @@ +package segment + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/message_fbs" + flatbuffers "github.com/google/flatbuffers/go" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestMessageSerde(t *testing.T) { + b := flatbuffers.NewBuilder(1024) + + prop := make(map[string]string) + prop["n1"] = "v1" + prop["n2"] = "v2" + + CreateMessage(b, 1, 2, 3, 4, 5, 6, prop, + []byte("the primary key"), []byte("body is here")) + + buf := b.FinishedBytes() + + println("serialized size", len(buf)) + + m := message_fbs.GetRootAsMessage(buf, 0) + + assert.Equal(t, int32(1), m.ProducerId()) + assert.Equal(t, int64(2), m.ProducerSeq()) + assert.Equal(t, int32(3), m.SegmentId()) + assert.Equal(t, int64(4), m.SegmentSeq()) + assert.Equal(t, int64(5), m.EventTsNs()) + assert.Equal(t, int64(6), m.RecvTsNs()) + + assert.Equal(t, 2, m.PropertiesLength()) + nv := &message_fbs.NameValue{} + m.Properties(nv, 0) + assert.Equal(t, "n1", string(nv.Name())) + assert.Equal(t, "v1", string(nv.Value())) + m.Properties(nv, 1) + assert.Equal(t, "n2", string(nv.Name())) + assert.Equal(t, "v2", string(nv.Value())) + assert.Equal(t, []byte("the primary key"), m.Key()) + assert.Equal(t, []byte("body is here"), m.Data()) + + m.MutateSegmentSeq(123) + assert.Equal(t, int64(123), m.SegmentSeq()) + +} diff --git a/weed/mq/segment/segment_serde.go b/weed/mq/segment/segment_serde.go new file mode 100644 index 000000000..e076271d6 --- /dev/null +++ b/weed/mq/segment/segment_serde.go @@ -0,0 +1 @@ +package segment diff --git a/weed/pb/Makefile b/weed/pb/Makefile index 01322ffda..f76ec3814 100644 --- a/weed/pb/Makefile +++ b/weed/pb/Makefile @@ -13,3 +13,6 @@ gen: protoc mq.proto --go_out=./mq_pb --go-grpc_out=./mq_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative # protoc filer.proto --java_out=../../other/java/client/src/main/java cp filer.proto ../../other/java/client/src/main/proto + +fbs: + flatc --go -o . --go-namespace message_fbs message.fbs diff --git a/weed/pb/message.fbs b/weed/pb/message.fbs new file mode 100644 index 000000000..8ee6c5b55 --- /dev/null +++ b/weed/pb/message.fbs @@ -0,0 +1,15 @@ +table NameValue { + name:string (key); + value:string; +} +table Message { + producer_id:int32 (id:0); + producer_seq:int64 (id:2); + segment_id:int32 (id:1); + segment_seq:int64 (id:3); + event_ts_ns:int64 (id:4); + recv_ts_ns:int64 (id:5); + properties:[NameValue] (id:6); + key:string (id:7); // bytes + data:string (id:8); // bytes +} diff --git a/weed/pb/message_fbs/Message.go b/weed/pb/message_fbs/Message.go new file mode 100644 index 000000000..058abfeaa --- /dev/null +++ b/weed/pb/message_fbs/Message.go @@ -0,0 +1,179 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package message_fbs + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Message struct { + _tab flatbuffers.Table +} + +func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Message{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Message{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Message) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Message) ProducerId() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *Message) MutateProducerId(n int32) bool { + return rcv._tab.MutateInt32Slot(4, n) +} + +func (rcv *Message) SegmentId() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *Message) MutateSegmentId(n int32) bool { + return rcv._tab.MutateInt32Slot(6, n) +} + +func (rcv *Message) ProducerSeq() int64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.GetInt64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *Message) MutateProducerSeq(n int64) bool { + return rcv._tab.MutateInt64Slot(8, n) +} + +func (rcv *Message) SegmentSeq() int64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.GetInt64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *Message) MutateSegmentSeq(n int64) bool { + return rcv._tab.MutateInt64Slot(10, n) +} + +func (rcv *Message) EventTsNs() int64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.GetInt64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *Message) MutateEventTsNs(n int64) bool { + return rcv._tab.MutateInt64Slot(12, n) +} + +func (rcv *Message) RecvTsNs() int64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) + if o != 0 { + return rcv._tab.GetInt64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *Message) MutateRecvTsNs(n int64) bool { + return rcv._tab.MutateInt64Slot(14, n) +} + +func (rcv *Message) Properties(obj *NameValue, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Message) PropertiesLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Message) Key() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(18)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Message) Data() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(20)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func MessageStart(builder *flatbuffers.Builder) { + builder.StartObject(9) +} +func MessageAddProducerId(builder *flatbuffers.Builder, producerId int32) { + builder.PrependInt32Slot(0, producerId, 0) +} +func MessageAddSegmentId(builder *flatbuffers.Builder, segmentId int32) { + builder.PrependInt32Slot(1, segmentId, 0) +} +func MessageAddProducerSeq(builder *flatbuffers.Builder, producerSeq int64) { + builder.PrependInt64Slot(2, producerSeq, 0) +} +func MessageAddSegmentSeq(builder *flatbuffers.Builder, segmentSeq int64) { + builder.PrependInt64Slot(3, segmentSeq, 0) +} +func MessageAddEventTsNs(builder *flatbuffers.Builder, eventTsNs int64) { + builder.PrependInt64Slot(4, eventTsNs, 0) +} +func MessageAddRecvTsNs(builder *flatbuffers.Builder, recvTsNs int64) { + builder.PrependInt64Slot(5, recvTsNs, 0) +} +func MessageAddProperties(builder *flatbuffers.Builder, properties flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(6, flatbuffers.UOffsetT(properties), 0) +} +func MessageStartPropertiesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func MessageAddKey(builder *flatbuffers.Builder, key flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(7, flatbuffers.UOffsetT(key), 0) +} +func MessageAddData(builder *flatbuffers.Builder, data flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(8, flatbuffers.UOffsetT(data), 0) +} +func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/weed/pb/message_fbs/NameValue.go b/weed/pb/message_fbs/NameValue.go new file mode 100644 index 000000000..b5dfdad16 --- /dev/null +++ b/weed/pb/message_fbs/NameValue.go @@ -0,0 +1,63 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package message_fbs + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type NameValue struct { + _tab flatbuffers.Table +} + +func GetRootAsNameValue(buf []byte, offset flatbuffers.UOffsetT) *NameValue { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &NameValue{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsNameValue(buf []byte, offset flatbuffers.UOffsetT) *NameValue { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &NameValue{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *NameValue) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *NameValue) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *NameValue) Name() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *NameValue) Value() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func NameValueStart(builder *flatbuffers.Builder) { + builder.StartObject(2) +} +func NameValueAddName(builder *flatbuffers.Builder, name flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(name), 0) +} +func NameValueAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(value), 0) +} +func NameValueEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +}