From 06cd491abc82a6b799c47ae600a4bb81d24091ce Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 22 Jul 2022 01:12:32 -0700 Subject: [PATCH] add message batch --- weed/mq/segment/message_serde.go | 113 ++++++++++++---- weed/mq/segment/message_serde_test.go | 41 +++--- weed/pb/message.fbs | 26 ++-- weed/pb/message_fbs/Message.go | 92 +++---------- weed/pb/message_fbs/MessageBatch.go | 187 ++++++++++++++++++++++++++ 5 files changed, 332 insertions(+), 127 deletions(-) create mode 100644 weed/pb/message_fbs/MessageBatch.go diff --git a/weed/mq/segment/message_serde.go b/weed/mq/segment/message_serde.go index b69d78cab..8efa9fe21 100644 --- a/weed/mq/segment/message_serde.go +++ b/weed/mq/segment/message_serde.go @@ -5,44 +5,105 @@ import ( flatbuffers "github.com/google/flatbuffers/go" ) -func CreateMessage(b *flatbuffers.Builder, producerId int32, producerSeq int64, segmentId int32, segmentSeq int64, - eventTsNs int64, recvTsNs int64, properties map[string]string, key []byte, value []byte) { +type MessageBatchBuilder struct { + b *flatbuffers.Builder + producerId int32 + producerEpoch int32 + segmentId int32 + flags int32 + messageOffsets []flatbuffers.UOffsetT + segmentSeqBase int64 + segmentSeqLast int64 + tsMsBase int64 + tsMsLast int64 +} + +func NewMessageBatchBuilder(b *flatbuffers.Builder, + producerId int32, + producerEpoch int32, + segmentId int32, + flags int32) *MessageBatchBuilder { + b.Reset() + return &MessageBatchBuilder{ + b: b, + producerId: producerId, + producerEpoch: producerEpoch, + segmentId: segmentId, + flags: flags, + } +} + +func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string][]byte, key []byte, value []byte) { + if builder.segmentSeqBase == 0 { + builder.segmentSeqBase = segmentSeq + } + builder.segmentSeqLast = segmentSeq + if builder.tsMsBase == 0 { + builder.tsMsBase = tsMs + } + builder.tsMsLast = tsMs + var names, values, pairs []flatbuffers.UOffsetT for k, v := range properties { - names = append(names, b.CreateString(k)) - values = append(values, b.CreateString(v)) + names = append(names, builder.b.CreateString(k)) + values = append(values, builder.b.CreateByteVector(v)) } - for i, _ := range names { - message_fbs.NameValueStart(b) - message_fbs.NameValueAddName(b, names[i]) - message_fbs.NameValueAddValue(b, values[i]) - pair := message_fbs.NameValueEnd(b) + message_fbs.NameValueStart(builder.b) + message_fbs.NameValueAddName(builder.b, names[i]) + message_fbs.NameValueAddValue(builder.b, values[i]) + pair := message_fbs.NameValueEnd(builder.b) pairs = append(pairs, pair) } - message_fbs.MessageStartPropertiesVector(b, len(properties)) + + message_fbs.MessageStartPropertiesVector(builder.b, len(properties)) for i := len(pairs) - 1; i >= 0; i-- { - b.PrependUOffsetT(pairs[i]) + builder.b.PrependUOffsetT(pairs[i]) } - prop := b.EndVector(len(properties)) + propOffset := builder.b.EndVector(len(properties)) - k := b.CreateByteVector(key) - v := b.CreateByteVector(value) + keyOffset := builder.b.CreateByteVector(key) + valueOffset := builder.b.CreateByteVector(value) - message_fbs.MessageStart(b) - message_fbs.MessageAddProducerId(b, producerId) - message_fbs.MessageAddProducerSeq(b, producerSeq) - message_fbs.MessageAddSegmentId(b, segmentId) - message_fbs.MessageAddSegmentSeq(b, segmentSeq) - message_fbs.MessageAddEventTsNs(b, eventTsNs) - message_fbs.MessageAddRecvTsNs(b, recvTsNs) + message_fbs.MessageStart(builder.b) + message_fbs.MessageAddSeqDelta(builder.b, int32(segmentSeq-builder.segmentSeqBase)) + message_fbs.MessageAddTsMsDelta(builder.b, int32(tsMs-builder.tsMsBase)) - message_fbs.MessageAddProperties(b, prop) - message_fbs.MessageAddKey(b, k) - message_fbs.MessageAddData(b, v) - message := message_fbs.MessageEnd(b) + message_fbs.MessageAddProperties(builder.b, propOffset) + message_fbs.MessageAddKey(builder.b, keyOffset) + message_fbs.MessageAddData(builder.b, valueOffset) + messageOffset := message_fbs.MessageEnd(builder.b) + + builder.messageOffsets = append(builder.messageOffsets, messageOffset) + +} + +func (builder *MessageBatchBuilder) BuildMessageBatch() { + message_fbs.MessageBatchStartMessagesVector(builder.b, len(builder.messageOffsets)) + for i := len(builder.messageOffsets) - 1; i >= 0; i-- { + builder.b.PrependUOffsetT(builder.messageOffsets[i]) + } + messagesOffset := builder.b.EndVector(len(builder.messageOffsets)) + + message_fbs.MessageBatchStart(builder.b) + message_fbs.MessageBatchAddProducerId(builder.b, builder.producerId) + message_fbs.MessageBatchAddProducerEpoch(builder.b, builder.producerEpoch) + message_fbs.MessageBatchAddSegmentId(builder.b, builder.segmentId) + message_fbs.MessageBatchAddFlags(builder.b, builder.flags) + message_fbs.MessageBatchAddSegmentSeqBase(builder.b, builder.segmentSeqBase) + message_fbs.MessageBatchAddSegmentSeqMaxDelta(builder.b, int32(builder.segmentSeqLast-builder.segmentSeqBase)) + message_fbs.MessageBatchAddTsMsBase(builder.b, builder.tsMsBase) + message_fbs.MessageBatchAddTsMsMaxDelta(builder.b, int32(builder.tsMsLast-builder.tsMsBase)) + + message_fbs.MessageBatchAddMessages(builder.b, messagesOffset) + + messageBatch := message_fbs.MessageBatchEnd(builder.b) + + builder.b.Finish(messageBatch) +} - b.Finish(message) +func (builder *MessageBatchBuilder) GetBytes() []byte { + return builder.b.FinishedBytes() } diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go index 7ba0febf0..bf561d2c0 100644 --- a/weed/mq/segment/message_serde_test.go +++ b/weed/mq/segment/message_serde_test.go @@ -10,27 +10,36 @@ import ( func TestMessageSerde(t *testing.T) { b := flatbuffers.NewBuilder(1024) - prop := make(map[string]string) - prop["n1"] = "v1" - prop["n2"] = "v2" + prop := make(map[string][]byte) + prop["n1"] = []byte("v1") + prop["n2"] = []byte("v2") - CreateMessage(b, 1, 2, 3, 4, 5, 6, prop, - []byte("the primary key"), []byte("body is here")) + bb := NewMessageBatchBuilder(b, 1, 2, 3, 4) - buf := b.FinishedBytes() + bb.AddMessage(5, 6, prop, []byte("the primary key"), []byte("body is here")) + + bb.BuildMessageBatch() + + buf := bb.GetBytes() println("serialized size", len(buf)) - m := message_fbs.GetRootAsMessage(buf, 0) + mb := message_fbs.GetRootAsMessageBatch(buf, 0) + + assert.Equal(t, int32(1), mb.ProducerId()) + assert.Equal(t, int32(2), mb.ProducerEpoch()) + assert.Equal(t, int32(3), mb.SegmentId()) + assert.Equal(t, int32(4), mb.Flags()) + assert.Equal(t, int64(5), mb.SegmentSeqBase()) + assert.Equal(t, int32(0), mb.SegmentSeqMaxDelta()) + assert.Equal(t, int64(6), mb.TsMsBase()) + assert.Equal(t, int32(0), mb.TsMsMaxDelta()) + + assert.Equal(t, 1, mb.MessagesLength()) - assert.Equal(t, int32(1), m.ProducerId()) - assert.Equal(t, int64(2), m.ProducerSeq()) - assert.Equal(t, int32(3), m.SegmentId()) - assert.Equal(t, int64(4), m.SegmentSeq()) - assert.Equal(t, int64(5), m.EventTsNs()) - assert.Equal(t, int64(6), m.RecvTsNs()) + m := &message_fbs.Message{} + mb.Messages(m, 0) - assert.Equal(t, 2, m.PropertiesLength()) nv := &message_fbs.NameValue{} m.Properties(nv, 0) assert.Equal(t, "n1", string(nv.Name())) @@ -41,7 +50,7 @@ func TestMessageSerde(t *testing.T) { assert.Equal(t, []byte("the primary key"), m.Key()) assert.Equal(t, []byte("body is here"), m.Data()) - m.MutateSegmentSeq(123) - assert.Equal(t, int64(123), m.SegmentSeq()) + assert.Equal(t, int32(0), m.SeqDelta()) + assert.Equal(t, int32(0), m.TsMsDelta()) } diff --git a/weed/pb/message.fbs b/weed/pb/message.fbs index 8ee6c5b55..170551df7 100644 --- a/weed/pb/message.fbs +++ b/weed/pb/message.fbs @@ -3,13 +3,21 @@ table NameValue { value:string; } table Message { - producer_id:int32 (id:0); - producer_seq:int64 (id:2); - segment_id:int32 (id:1); - segment_seq:int64 (id:3); - event_ts_ns:int64 (id:4); - recv_ts_ns:int64 (id:5); - properties:[NameValue] (id:6); - key:string (id:7); // bytes - data:string (id:8); // bytes + seq_delta:int32 (id:0); + ts_ms_delta:int32 (id:1); + properties:[NameValue] (id:2); + key:string (id:3); // bytes + data:string (id:4); // bytes +} + +table MessageBatch { + producer_id:int32 (id:0); + producer_epoch:int32 (id:1); + segment_id:int32 (id:2); + flags: int32 (id:3); + segment_seq_base:int64 (id:4); + segment_seq_max_delta:int32 (id:5); + ts_ms_base:int64 (id:6); + ts_ms_max_delta:int32 (id:7); + messages: [Message] (id:8); } diff --git a/weed/pb/message_fbs/Message.go b/weed/pb/message_fbs/Message.go index 058abfeaa..e9ef83616 100644 --- a/weed/pb/message_fbs/Message.go +++ b/weed/pb/message_fbs/Message.go @@ -33,7 +33,7 @@ func (rcv *Message) Table() flatbuffers.Table { return rcv._tab } -func (rcv *Message) ProducerId() int32 { +func (rcv *Message) SeqDelta() int32 { o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) if o != 0 { return rcv._tab.GetInt32(o + rcv._tab.Pos) @@ -41,11 +41,11 @@ func (rcv *Message) ProducerId() int32 { return 0 } -func (rcv *Message) MutateProducerId(n int32) bool { +func (rcv *Message) MutateSeqDelta(n int32) bool { return rcv._tab.MutateInt32Slot(4, n) } -func (rcv *Message) SegmentId() int32 { +func (rcv *Message) TsMsDelta() int32 { o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) if o != 0 { return rcv._tab.GetInt32(o + rcv._tab.Pos) @@ -53,60 +53,12 @@ func (rcv *Message) SegmentId() int32 { return 0 } -func (rcv *Message) MutateSegmentId(n int32) bool { +func (rcv *Message) MutateTsMsDelta(n int32) bool { return rcv._tab.MutateInt32Slot(6, n) } -func (rcv *Message) ProducerSeq() int64 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) - if o != 0 { - return rcv._tab.GetInt64(o + rcv._tab.Pos) - } - return 0 -} - -func (rcv *Message) MutateProducerSeq(n int64) bool { - return rcv._tab.MutateInt64Slot(8, n) -} - -func (rcv *Message) SegmentSeq() int64 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) - if o != 0 { - return rcv._tab.GetInt64(o + rcv._tab.Pos) - } - return 0 -} - -func (rcv *Message) MutateSegmentSeq(n int64) bool { - return rcv._tab.MutateInt64Slot(10, n) -} - -func (rcv *Message) EventTsNs() int64 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) - if o != 0 { - return rcv._tab.GetInt64(o + rcv._tab.Pos) - } - return 0 -} - -func (rcv *Message) MutateEventTsNs(n int64) bool { - return rcv._tab.MutateInt64Slot(12, n) -} - -func (rcv *Message) RecvTsNs() int64 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) - if o != 0 { - return rcv._tab.GetInt64(o + rcv._tab.Pos) - } - return 0 -} - -func (rcv *Message) MutateRecvTsNs(n int64) bool { - return rcv._tab.MutateInt64Slot(14, n) -} - func (rcv *Message) Properties(obj *NameValue, j int) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { x := rcv._tab.Vector(o) x += flatbuffers.UOffsetT(j) * 4 @@ -118,7 +70,7 @@ func (rcv *Message) Properties(obj *NameValue, j int) bool { } func (rcv *Message) PropertiesLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { return rcv._tab.VectorLen(o) } @@ -126,7 +78,7 @@ func (rcv *Message) PropertiesLength() int { } func (rcv *Message) Key() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(18)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) if o != 0 { return rcv._tab.ByteVector(o + rcv._tab.Pos) } @@ -134,7 +86,7 @@ func (rcv *Message) Key() []byte { } func (rcv *Message) Data() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(20)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) if o != 0 { return rcv._tab.ByteVector(o + rcv._tab.Pos) } @@ -142,37 +94,25 @@ func (rcv *Message) Data() []byte { } func MessageStart(builder *flatbuffers.Builder) { - builder.StartObject(9) -} -func MessageAddProducerId(builder *flatbuffers.Builder, producerId int32) { - builder.PrependInt32Slot(0, producerId, 0) -} -func MessageAddSegmentId(builder *flatbuffers.Builder, segmentId int32) { - builder.PrependInt32Slot(1, segmentId, 0) -} -func MessageAddProducerSeq(builder *flatbuffers.Builder, producerSeq int64) { - builder.PrependInt64Slot(2, producerSeq, 0) -} -func MessageAddSegmentSeq(builder *flatbuffers.Builder, segmentSeq int64) { - builder.PrependInt64Slot(3, segmentSeq, 0) + builder.StartObject(5) } -func MessageAddEventTsNs(builder *flatbuffers.Builder, eventTsNs int64) { - builder.PrependInt64Slot(4, eventTsNs, 0) +func MessageAddSeqDelta(builder *flatbuffers.Builder, seqDelta int32) { + builder.PrependInt32Slot(0, seqDelta, 0) } -func MessageAddRecvTsNs(builder *flatbuffers.Builder, recvTsNs int64) { - builder.PrependInt64Slot(5, recvTsNs, 0) +func MessageAddTsMsDelta(builder *flatbuffers.Builder, tsMsDelta int32) { + builder.PrependInt32Slot(1, tsMsDelta, 0) } func MessageAddProperties(builder *flatbuffers.Builder, properties flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(6, flatbuffers.UOffsetT(properties), 0) + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(properties), 0) } func MessageStartPropertiesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4) } func MessageAddKey(builder *flatbuffers.Builder, key flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(7, flatbuffers.UOffsetT(key), 0) + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(key), 0) } func MessageAddData(builder *flatbuffers.Builder, data flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(8, flatbuffers.UOffsetT(data), 0) + builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(data), 0) } func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() diff --git a/weed/pb/message_fbs/MessageBatch.go b/weed/pb/message_fbs/MessageBatch.go new file mode 100644 index 000000000..19d6a4816 --- /dev/null +++ b/weed/pb/message_fbs/MessageBatch.go @@ -0,0 +1,187 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package message_fbs + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type MessageBatch struct { + _tab flatbuffers.Table +} + +func GetRootAsMessageBatch(buf []byte, offset flatbuffers.UOffsetT) *MessageBatch { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &MessageBatch{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMessageBatch(buf []byte, offset flatbuffers.UOffsetT) *MessageBatch { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &MessageBatch{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *MessageBatch) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *MessageBatch) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *MessageBatch) ProducerId() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateProducerId(n int32) bool { + return rcv._tab.MutateInt32Slot(4, n) +} + +func (rcv *MessageBatch) ProducerEpoch() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateProducerEpoch(n int32) bool { + return rcv._tab.MutateInt32Slot(6, n) +} + +func (rcv *MessageBatch) SegmentId() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateSegmentId(n int32) bool { + return rcv._tab.MutateInt32Slot(8, n) +} + +func (rcv *MessageBatch) Flags() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateFlags(n int32) bool { + return rcv._tab.MutateInt32Slot(10, n) +} + +func (rcv *MessageBatch) SegmentSeqBase() int64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.GetInt64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateSegmentSeqBase(n int64) bool { + return rcv._tab.MutateInt64Slot(12, n) +} + +func (rcv *MessageBatch) SegmentSeqMaxDelta() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateSegmentSeqMaxDelta(n int32) bool { + return rcv._tab.MutateInt32Slot(14, n) +} + +func (rcv *MessageBatch) TsMsBase() int64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + if o != 0 { + return rcv._tab.GetInt64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateTsMsBase(n int64) bool { + return rcv._tab.MutateInt64Slot(16, n) +} + +func (rcv *MessageBatch) TsMsMaxDelta() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(18)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateTsMsMaxDelta(n int32) bool { + return rcv._tab.MutateInt32Slot(18, n) +} + +func (rcv *MessageBatch) Messages(obj *Message, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(20)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *MessageBatch) MessagesLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(20)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func MessageBatchStart(builder *flatbuffers.Builder) { + builder.StartObject(9) +} +func MessageBatchAddProducerId(builder *flatbuffers.Builder, producerId int32) { + builder.PrependInt32Slot(0, producerId, 0) +} +func MessageBatchAddProducerEpoch(builder *flatbuffers.Builder, producerEpoch int32) { + builder.PrependInt32Slot(1, producerEpoch, 0) +} +func MessageBatchAddSegmentId(builder *flatbuffers.Builder, segmentId int32) { + builder.PrependInt32Slot(2, segmentId, 0) +} +func MessageBatchAddFlags(builder *flatbuffers.Builder, flags int32) { + builder.PrependInt32Slot(3, flags, 0) +} +func MessageBatchAddSegmentSeqBase(builder *flatbuffers.Builder, segmentSeqBase int64) { + builder.PrependInt64Slot(4, segmentSeqBase, 0) +} +func MessageBatchAddSegmentSeqMaxDelta(builder *flatbuffers.Builder, segmentSeqMaxDelta int32) { + builder.PrependInt32Slot(5, segmentSeqMaxDelta, 0) +} +func MessageBatchAddTsMsBase(builder *flatbuffers.Builder, tsMsBase int64) { + builder.PrependInt64Slot(6, tsMsBase, 0) +} +func MessageBatchAddTsMsMaxDelta(builder *flatbuffers.Builder, tsMsMaxDelta int32) { + builder.PrependInt32Slot(7, tsMsMaxDelta, 0) +} +func MessageBatchAddMessages(builder *flatbuffers.Builder, messages flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(8, flatbuffers.UOffsetT(messages), 0) +} +func MessageBatchStartMessagesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func MessageBatchEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +}