You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
3.5 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package queue
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/golang/protobuf/proto"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. type LogBuffer struct {
  11. buf []byte
  12. idx []int
  13. pos int
  14. startTime time.Time
  15. stopTime time.Time
  16. sizeBuf []byte
  17. flushInterval time.Duration
  18. flushFn func(startTime, stopTime time.Time, buf []byte)
  19. notifyFn func()
  20. isStopping bool
  21. sync.RWMutex
  22. }
  23. func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer {
  24. lb := &LogBuffer{
  25. buf: make([]byte, 4*1024*1024),
  26. sizeBuf: make([]byte, 4),
  27. flushInterval: flushInterval,
  28. flushFn: flushFn,
  29. notifyFn: notifyFn,
  30. }
  31. go lb.loopFlush()
  32. return lb
  33. }
  34. func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
  35. logEntry := &filer_pb.LogEntry{
  36. TsNs: ts.UnixNano(),
  37. PartitionKeyHash: util.HashToInt32(key),
  38. Data: data,
  39. }
  40. logEntryData, _ := proto.Marshal(logEntry)
  41. size := len(logEntryData)
  42. m.Lock()
  43. defer func() {
  44. m.Unlock()
  45. if m.notifyFn != nil {
  46. m.notifyFn()
  47. }
  48. }()
  49. if m.pos == 0 {
  50. m.startTime = ts
  51. }
  52. if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
  53. m.flush()
  54. m.startTime = ts
  55. if len(m.buf) < size+4 {
  56. m.buf = make([]byte, 2*size+4)
  57. }
  58. }
  59. m.stopTime = ts
  60. m.idx = append(m.idx, m.pos)
  61. util.Uint32toBytes(m.sizeBuf, uint32(size))
  62. copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
  63. copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
  64. m.pos += size + 4
  65. }
  66. func (m *LogBuffer) Shutdown() {
  67. if m.isStopping {
  68. return
  69. }
  70. m.isStopping = true
  71. m.Lock()
  72. m.flush()
  73. m.Unlock()
  74. }
  75. func (m *LogBuffer) loopFlush() {
  76. for !m.isStopping {
  77. m.Lock()
  78. m.flush()
  79. m.Unlock()
  80. time.Sleep(m.flushInterval)
  81. }
  82. }
  83. func (m *LogBuffer) flush() {
  84. if m.flushFn != nil && m.pos > 0 {
  85. // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
  86. m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos])
  87. m.pos = 0
  88. m.idx = m.idx[:0]
  89. }
  90. }
  91. func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) {
  92. m.RLock()
  93. defer m.RUnlock()
  94. // fmt.Printf("read from buffer: %v\n", lastReadTime)
  95. if lastReadTime.Equal(m.stopTime) {
  96. return lastReadTime, nil
  97. }
  98. if lastReadTime.After(m.stopTime) {
  99. // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime)
  100. return lastReadTime, nil
  101. }
  102. if lastReadTime.Before(m.startTime) {
  103. return m.stopTime, copiedBytes(m.buf[:m.pos])
  104. }
  105. lastTs := lastReadTime.UnixNano()
  106. l, h := 0, len(m.idx)-1
  107. // fmt.Printf("l=%d, h=%d\n", l, h)
  108. for {
  109. mid := (l + h) / 2
  110. pos := m.idx[mid]
  111. t := readTs(m.buf, m.idx[mid])
  112. if t <= lastTs {
  113. l = mid + 1
  114. } else if lastTs < t {
  115. var prevT int64
  116. if mid > 0 {
  117. prevT = readTs(m.buf, m.idx[mid-1])
  118. }
  119. if prevT <= lastTs {
  120. return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos])
  121. }
  122. h = mid - 1
  123. }
  124. // fmt.Printf("l=%d, h=%d\n", l, h)
  125. }
  126. }
  127. func copiedBytes(buf []byte) (copied []byte) {
  128. copied = make([]byte, len(buf))
  129. copy(copied, buf)
  130. return
  131. }
  132. func readTs(buf []byte, pos int) int64 {
  133. size := util.BytesToUint32(buf[pos : pos+4])
  134. entryData := buf[pos+4 : pos+4+int(size)]
  135. logEntry := &filer_pb.LogEntry{}
  136. err := proto.Unmarshal(entryData, logEntry)
  137. if err != nil {
  138. glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err)
  139. }
  140. return logEntry.TsNs
  141. }