You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

53 lines
1.1 KiB

  1. package messages
  2. import (
  3. flatbuffers "github.com/google/flatbuffers/go"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/segment"
  5. )
  6. type MessageBuffer struct {
  7. fbsBuffer *flatbuffers.Builder
  8. sequenceBase int64
  9. counter int64
  10. bb *segment.MessageBatchBuilder
  11. isSealed bool
  12. }
  13. func NewMessageBuffer() *MessageBuffer {
  14. t := &MessageBuffer{
  15. fbsBuffer: flatbuffers.NewBuilder(4 * 1024 * 1024),
  16. }
  17. t.bb = segment.NewMessageBatchBuilder(t.fbsBuffer)
  18. return t
  19. }
  20. func (mb *MessageBuffer) Reset(sequenceBase int64) {
  21. mb.sequenceBase = sequenceBase
  22. mb.counter = 0
  23. mb.bb.Reset()
  24. }
  25. func (mb *MessageBuffer) AddMessage(message *Message) {
  26. mb.bb.AddMessage(mb.sequenceBase, message.Ts.UnixMilli(), message.Properties, message.Key, message.Content)
  27. mb.sequenceBase++
  28. mb.counter++
  29. }
  30. func (mb *MessageBuffer) Len() int {
  31. return int(mb.counter)
  32. }
  33. func (mb *MessageBuffer) Seal(producerId int32,
  34. producerEpoch int32,
  35. segmentId int32,
  36. flags int32) {
  37. mb.isSealed = true
  38. mb.bb.BuildMessageBatch(producerId, producerEpoch, segmentId, flags)
  39. }
  40. func (mb *MessageBuffer) Bytes() []byte {
  41. if !mb.isSealed {
  42. return nil
  43. }
  44. return mb.bb.GetBytes()
  45. }