You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
3.4 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package queue
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/golang/protobuf/proto"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. type LogBuffer struct {
  11. buf []byte
  12. idx []int
  13. pos int
  14. startTime time.Time
  15. stopTime time.Time
  16. sizeBuf []byte
  17. flushInterval time.Duration
  18. flushFn func(startTime, stopTime time.Time, buf []byte)
  19. notifyFn func()
  20. isStopping bool
  21. sync.RWMutex
  22. }
  23. func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer {
  24. lb := &LogBuffer{
  25. buf: make([]byte, 4*0124*1024),
  26. sizeBuf: make([]byte, 4),
  27. flushInterval: flushInterval,
  28. flushFn: flushFn,
  29. notifyFn: notifyFn,
  30. }
  31. go lb.loopFlush()
  32. return lb
  33. }
  34. func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
  35. logEntry := &filer_pb.LogEntry{
  36. TsNs: ts.UnixNano(),
  37. PartitionKeyHash: util.HashToInt32(key),
  38. Data: data,
  39. }
  40. logEntryData, _ := proto.Marshal(logEntry)
  41. size := len(logEntryData)
  42. m.Lock()
  43. defer func() {
  44. m.Unlock()
  45. if m.notifyFn != nil {
  46. m.notifyFn()
  47. }
  48. }()
  49. if m.pos == 0 {
  50. m.startTime = ts
  51. }
  52. if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
  53. m.flush()
  54. m.startTime = ts
  55. if len(m.buf) < size+4 {
  56. m.buf = make([]byte, 2*size+4)
  57. }
  58. }
  59. m.stopTime = ts
  60. m.idx = append(m.idx, m.pos)
  61. util.Uint32toBytes(m.sizeBuf, uint32(size))
  62. copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
  63. copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
  64. m.pos += size + 4
  65. }
  66. func (m *LogBuffer) Shutdown() {
  67. if m.isStopping {
  68. return
  69. }
  70. m.isStopping = true
  71. m.Lock()
  72. m.flush()
  73. m.Unlock()
  74. }
  75. func (m *LogBuffer) loopFlush() {
  76. for !m.isStopping {
  77. m.Lock()
  78. m.flush()
  79. m.Unlock()
  80. time.Sleep(m.flushInterval)
  81. }
  82. }
  83. func (m *LogBuffer) flush() {
  84. if m.flushFn != nil && m.pos > 0 {
  85. m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos])
  86. m.pos = 0
  87. m.idx = m.idx[:0]
  88. }
  89. }
  90. func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) {
  91. m.RLock()
  92. defer m.RUnlock()
  93. // fmt.Printf("read from buffer: %v\n", lastReadTime)
  94. if lastReadTime.Equal(m.stopTime) {
  95. return lastReadTime, nil
  96. }
  97. if lastReadTime.After(m.stopTime) {
  98. // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime)
  99. return lastReadTime, nil
  100. }
  101. if lastReadTime.Before(m.startTime) {
  102. return m.stopTime, copiedBytes(m.buf[:m.pos])
  103. }
  104. lastTs := lastReadTime.UnixNano()
  105. l, h := 0, len(m.idx)-1
  106. // fmt.Printf("l=%d, h=%d\n", l, h)
  107. for {
  108. mid := (l + h) / 2
  109. pos := m.idx[mid]
  110. t := readTs(m.buf, m.idx[mid])
  111. if t <= lastTs {
  112. l = mid + 1
  113. } else if lastTs < t {
  114. var prevT int64
  115. if mid > 0 {
  116. prevT = readTs(m.buf, m.idx[mid-1])
  117. }
  118. if prevT <= lastTs {
  119. return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos])
  120. }
  121. h = mid - 1
  122. }
  123. // fmt.Printf("l=%d, h=%d\n", l, h)
  124. }
  125. }
  126. func copiedBytes(buf []byte) (copied []byte) {
  127. copied = make([]byte, len(buf))
  128. copy(copied, buf)
  129. return
  130. }
  131. func readTs(buf []byte, pos int) int64 {
  132. size := util.BytesToUint32(buf[pos : pos+4])
  133. entryData := buf[pos+4 : pos+4+int(size)]
  134. logEntry := &filer_pb.LogEntry{}
  135. err := proto.Unmarshal(entryData, logEntry)
  136. if err != nil {
  137. glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err)
  138. }
  139. return logEntry.TsNs
  140. }