You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

350 lines
9.6 KiB

  1. package filer
  2. import (
  3. "container/heap"
  4. "context"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  8. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  9. "google.golang.org/protobuf/proto"
  10. "io"
  11. "math"
  12. "strings"
  13. "time"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. )
  17. type LogFileEntry struct {
  18. TsNs int64
  19. FileEntry *Entry
  20. }
  21. func (f *Filer) collectPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64) (v *OrderedLogVisitor, err error) {
  22. if stopTsNs != 0 && startPosition.Time.UnixNano() > stopTsNs {
  23. return nil, io.EOF
  24. }
  25. startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
  26. dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "")
  27. if listDayErr != nil {
  28. return nil, fmt.Errorf("fail to list log by day: %v", listDayErr)
  29. }
  30. return NewOrderedLogVisitor(f, startPosition, stopTsNs, dayEntries)
  31. }
  32. // ----------
  33. type LogEntryItem struct {
  34. Entry *filer_pb.LogEntry
  35. filer string
  36. }
  37. // LogEntryItemPriorityQueue a priority queue for LogEntry
  38. type LogEntryItemPriorityQueue []*LogEntryItem
  39. func (pq LogEntryItemPriorityQueue) Len() int { return len(pq) }
  40. func (pq LogEntryItemPriorityQueue) Less(i, j int) bool {
  41. return pq[i].Entry.TsNs < pq[j].Entry.TsNs
  42. }
  43. func (pq LogEntryItemPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
  44. func (pq *LogEntryItemPriorityQueue) Push(x any) {
  45. item := x.(*LogEntryItem)
  46. *pq = append(*pq, item)
  47. }
  48. func (pq *LogEntryItemPriorityQueue) Pop() any {
  49. n := len(*pq)
  50. item := (*pq)[n-1]
  51. *pq = (*pq)[:n-1]
  52. return item
  53. }
  54. // ----------
  55. type OrderedLogVisitor struct {
  56. perFilerIteratorMap map[string]*LogFileQueueIterator
  57. pq *LogEntryItemPriorityQueue
  58. logFileEntryCollector *LogFileEntryCollector
  59. }
  60. func NewOrderedLogVisitor(f *Filer, startPosition log_buffer.MessagePosition, stopTsNs int64, dayEntries []*Entry) (*OrderedLogVisitor, error) {
  61. perFilerQueueMap := make(map[string]*LogFileQueueIterator)
  62. // initialize the priority queue
  63. pq := &LogEntryItemPriorityQueue{}
  64. heap.Init(pq)
  65. t := &OrderedLogVisitor{
  66. perFilerIteratorMap: perFilerQueueMap,
  67. pq: pq,
  68. logFileEntryCollector: NewLogFileEntryCollector(f, startPosition, stopTsNs, dayEntries),
  69. }
  70. if err := t.logFileEntryCollector.collectMore(t); err != nil && err != io.EOF {
  71. return nil, err
  72. }
  73. return t, nil
  74. }
  75. func (o *OrderedLogVisitor) GetNext() (logEntry *filer_pb.LogEntry, err error) {
  76. if o.pq.Len() == 0 {
  77. return nil, io.EOF
  78. }
  79. item := heap.Pop(o.pq).(*LogEntryItem)
  80. filerId := item.filer
  81. // fill the pq with the next log entry from the same filer
  82. it := o.perFilerIteratorMap[filerId]
  83. next, nextErr := it.getNext(o)
  84. if nextErr != nil {
  85. if nextErr == io.EOF {
  86. // do nothing since the filer has no more log entries
  87. }else {
  88. return nil, fmt.Errorf("failed to get next log entry: %v", nextErr)
  89. }
  90. } else {
  91. heap.Push(o.pq, &LogEntryItem{
  92. Entry: next,
  93. filer: filerId,
  94. })
  95. }
  96. return item.Entry, nil
  97. }
  98. func getFilerId(name string) string {
  99. idx := strings.LastIndex(name, ".")
  100. if idx < 0 {
  101. return ""
  102. }
  103. return name[idx+1:]
  104. }
  105. // ----------
  106. type LogFileEntryCollector struct {
  107. f *Filer
  108. startTsNs int64
  109. stopTsNs int64
  110. dayEntryQueue *util.Queue[*Entry]
  111. startDate string
  112. startHourMinute string
  113. stopDate string
  114. stopHourMinute string
  115. }
  116. func NewLogFileEntryCollector(f *Filer, startPosition log_buffer.MessagePosition, stopTsNs int64, dayEntries []*Entry) *LogFileEntryCollector {
  117. dayEntryQueue := util.NewQueue[*Entry]()
  118. for _, dayEntry := range dayEntries {
  119. dayEntryQueue.Enqueue(dayEntry)
  120. println("enqueue day entry", dayEntry.Name())
  121. }
  122. startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
  123. startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute())
  124. var stopDate, stopHourMinute string
  125. if stopTsNs != 0 {
  126. stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC()
  127. stopDate = fmt.Sprintf("%04d-%02d-%02d", stopTime.Year(), stopTime.Month(), stopTime.Day())
  128. stopHourMinute = fmt.Sprintf("%02d-%02d", stopTime.Hour(), stopTime.Minute())
  129. }
  130. return &LogFileEntryCollector{
  131. f: f,
  132. startTsNs: startPosition.UnixNano(),
  133. stopTsNs: stopTsNs,
  134. dayEntryQueue: dayEntryQueue,
  135. startDate: startDate,
  136. startHourMinute: startHourMinute,
  137. stopDate: stopDate,
  138. stopHourMinute: stopHourMinute,
  139. }
  140. }
  141. func (c *LogFileEntryCollector) hasMore() bool {
  142. return c.dayEntryQueue.Len() > 0
  143. }
  144. func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) {
  145. dayEntry := c.dayEntryQueue.Dequeue()
  146. if dayEntry == nil {
  147. return io.EOF
  148. }
  149. println("dequeue day entry", dayEntry.Name())
  150. if c.stopDate != "" {
  151. if strings.Compare(dayEntry.Name(), c.stopDate) > 0 {
  152. return io.EOF
  153. }
  154. }
  155. hourMinuteEntries, _, listHourMinuteErr := c.f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "")
  156. if listHourMinuteErr != nil {
  157. return fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
  158. }
  159. freshFilerIds := make(map[string]string)
  160. for _, hourMinuteEntry := range hourMinuteEntries {
  161. // println("checking hh-mm", hourMinuteEntry.FullPath)
  162. hourMinute := util.FileNameBase(hourMinuteEntry.Name())
  163. if dayEntry.Name() == c.startDate {
  164. if strings.Compare(hourMinute, c.startHourMinute) < 0 {
  165. continue
  166. }
  167. }
  168. if dayEntry.Name() == c.stopDate {
  169. if strings.Compare(hourMinute, c.stopHourMinute) > 0 {
  170. break
  171. }
  172. }
  173. tsMinute := fmt.Sprintf("%s-%s", dayEntry.Name(), hourMinute)
  174. println(" enqueue", tsMinute)
  175. t, parseErr := time.Parse("2006-01-02-15-04", tsMinute)
  176. if parseErr != nil {
  177. glog.Errorf("failed to parse %s: %v", tsMinute, parseErr)
  178. continue
  179. }
  180. filerId := getFilerId(hourMinuteEntry.Name())
  181. iter, found := v.perFilerIteratorMap[filerId]
  182. if !found {
  183. iter = newLogFileQueueIterator(c.f.MasterClient, util.NewQueue[*LogFileEntry](), c.startTsNs, c.stopTsNs)
  184. v.perFilerIteratorMap[filerId] = iter
  185. freshFilerIds[filerId] = hourMinuteEntry.Name()
  186. }
  187. iter.q.Enqueue(&LogFileEntry{
  188. TsNs: t.UnixNano(),
  189. FileEntry: hourMinuteEntry,
  190. })
  191. }
  192. // fill the pq with the next log entry if it is a new filer
  193. for filerId, entryName := range freshFilerIds {
  194. iter, found := v.perFilerIteratorMap[filerId]
  195. if !found {
  196. glog.Errorf("Unexpected! failed to find iterator for filer %s", filerId)
  197. continue
  198. }
  199. next, err := iter.getNext(v)
  200. if err != nil {
  201. if err == io.EOF {
  202. // do nothing since the filer has no more log entries
  203. }
  204. return fmt.Errorf("failed to get next log entry for %v: %v", entryName, err)
  205. }
  206. heap.Push(v.pq, &LogEntryItem{
  207. Entry: next,
  208. filer: filerId,
  209. })
  210. }
  211. return nil
  212. }
  213. // ----------
  214. type LogFileQueueIterator struct {
  215. q *util.Queue[*LogFileEntry]
  216. masterClient *wdclient.MasterClient
  217. startTsNs int64
  218. stopTsNs int64
  219. currentFileIterator *LogFileIterator
  220. }
  221. func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[*LogFileEntry], startTsNs, stopTsNs int64) *LogFileQueueIterator {
  222. return &LogFileQueueIterator{
  223. q: q,
  224. masterClient: masterClient,
  225. startTsNs: startTsNs,
  226. stopTsNs: stopTsNs,
  227. }
  228. }
  229. // getNext will return io.EOF when done
  230. func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer_pb.LogEntry, err error) {
  231. for {
  232. if iter.currentFileIterator != nil {
  233. logEntry, err = iter.currentFileIterator.getNext()
  234. if err != io.EOF {
  235. return
  236. }
  237. }
  238. // now either iter.currentFileIterator is nil or err is io.EOF
  239. if iter.q.Len() == 0 {
  240. return nil, io.EOF
  241. }
  242. t := iter.q.Dequeue()
  243. if t == nil {
  244. continue
  245. }
  246. // skip the file if it is after the stopTsNs
  247. if iter.stopTsNs != 0 && t.TsNs > iter.stopTsNs {
  248. return nil, io.EOF
  249. }
  250. next := iter.q.Peek()
  251. if next == nil {
  252. if collectErr := v.logFileEntryCollector.collectMore(v); collectErr != nil && collectErr != io.EOF {
  253. return nil, collectErr
  254. }
  255. }
  256. // skip the file if the next entry is before the startTsNs
  257. if next != nil && next.TsNs <= iter.startTsNs {
  258. continue
  259. }
  260. iter.currentFileIterator = newLogFileIterator(iter.masterClient, t.FileEntry, iter.startTsNs, iter.stopTsNs)
  261. }
  262. }
  263. // ----------
  264. type LogFileIterator struct {
  265. r io.Reader
  266. sizeBuf []byte
  267. startTsNs int64
  268. stopTsNs int64
  269. }
  270. func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator {
  271. return &LogFileIterator{
  272. r: NewChunkStreamReaderFromFiler(masterClient, fileEntry.Chunks),
  273. sizeBuf: make([]byte, 4),
  274. startTsNs: startTsNs,
  275. stopTsNs: stopTsNs,
  276. }
  277. }
  278. // getNext will return io.EOF when done
  279. func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error) {
  280. var n int
  281. for {
  282. n, err = iter.r.Read(iter.sizeBuf)
  283. if err != nil {
  284. return
  285. }
  286. if n != 4 {
  287. return nil, fmt.Errorf("size %d bytes, expected 4 bytes", n)
  288. }
  289. size := util.BytesToUint32(iter.sizeBuf)
  290. // println("entry size", size)
  291. entryData := make([]byte, size)
  292. n, err = iter.r.Read(entryData)
  293. if err != nil {
  294. return
  295. }
  296. if n != int(size) {
  297. return nil, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size)
  298. }
  299. logEntry = &filer_pb.LogEntry{}
  300. if err = proto.Unmarshal(entryData, logEntry); err != nil {
  301. return
  302. }
  303. if logEntry.TsNs <= iter.startTsNs {
  304. continue
  305. }
  306. if iter.stopTsNs != 0 && logEntry.TsNs > iter.stopTsNs {
  307. return nil, io.EOF
  308. }
  309. return
  310. }
  311. }