You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

92 lines
1.7 KiB

5 years ago
  1. package queue
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/golang/protobuf/proto"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. )
  9. type LogBuffer struct {
  10. buf []byte
  11. pos int
  12. startTime time.Time
  13. stopTime time.Time
  14. sizeBuf []byte
  15. flushInterval time.Duration
  16. flushFn func(startTime, stopTime time.Time, buf []byte)
  17. isStopping bool
  18. sync.Mutex
  19. }
  20. func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte)) *LogBuffer {
  21. lb := &LogBuffer{
  22. buf: make([]byte, 4*0124*1024),
  23. sizeBuf: make([]byte, 4),
  24. flushInterval: flushInterval,
  25. flushFn: flushFn,
  26. }
  27. go lb.loopFlush()
  28. return lb
  29. }
  30. func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
  31. logEntry := &filer_pb.LogEntry{
  32. TsNs: ts.UnixNano(),
  33. PartitionKeyHash: util.HashToInt32(key),
  34. Data: data,
  35. }
  36. logEntryData, _ := proto.Marshal(logEntry)
  37. size := len(logEntryData)
  38. m.Lock()
  39. defer m.Unlock()
  40. if m.pos == 0 {
  41. m.startTime = ts
  42. }
  43. if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
  44. m.flush()
  45. m.startTime = ts
  46. }
  47. m.stopTime = ts
  48. util.Uint32toBytes(m.sizeBuf, uint32(size))
  49. copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
  50. copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
  51. m.pos += size + 4
  52. }
  53. func (m *LogBuffer) Shutdown() {
  54. if m.isStopping {
  55. return
  56. }
  57. m.isStopping = true
  58. m.Lock()
  59. m.flush()
  60. m.Unlock()
  61. }
  62. func (m *LogBuffer) loopFlush() {
  63. for !m.isStopping {
  64. m.Lock()
  65. m.flush()
  66. m.Unlock()
  67. time.Sleep(m.flushInterval)
  68. }
  69. }
  70. func (m *LogBuffer) flush() {
  71. if m.flushFn != nil && m.pos > 0 {
  72. m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos])
  73. m.pos = 0
  74. }
  75. }