From 53626734d4ac7724604ca434aaab7c892b22510d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 5 Apr 2020 16:51:30 -0700 Subject: [PATCH] iterate through the log buffer --- weed/filer2/filer_notify.go | 5 ++++ weed/queue/log_buffer.go | 49 +++++++++++++++++++++++++++++++------ 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index 29ba3eead..f85ee1db4 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -81,6 +81,7 @@ func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath var buf []byte newLastReadTime, buf = f.metaLogBuffer.ReadFromBuffer(lastReadTime) + var processedTs int64 for pos := 0; pos+4 < len(buf); { @@ -103,7 +104,10 @@ func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath err = eachEventFn(event.Directory, event.EventNotification) + processedTs = logEntry.TsNs + if err != nil { + newLastReadTime = time.Unix(0, processedTs) return } @@ -111,6 +115,7 @@ func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath } + newLastReadTime = time.Unix(0, processedTs) return } diff --git a/weed/queue/log_buffer.go b/weed/queue/log_buffer.go index 325fcbf48..0a8298be4 100644 --- a/weed/queue/log_buffer.go +++ b/weed/queue/log_buffer.go @@ -1,6 +1,8 @@ package queue import ( + "fmt" + "runtime/debug" "sync" "time" @@ -11,6 +13,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +type dataToFlush struct { + startTime time.Time + stopTime time.Time + data []byte +} + type LogBuffer struct { buf []byte idx []int @@ -22,6 +30,7 @@ type LogBuffer struct { flushFn func(startTime, stopTime time.Time, buf []byte) notifyFn func() isStopping bool + flushChan chan *dataToFlush sync.RWMutex } @@ -32,8 +41,10 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime flushInterval: flushInterval, flushFn: flushFn, notifyFn: notifyFn, + flushChan: make(chan *dataToFlush, 256), } go lb.loopFlush() + go lb.loopInterval() return lb } @@ -62,7 +73,7 @@ func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) { } if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { - m.flush() + m.flushChan <- m.copyToFlush() m.startTime = ts if len(m.buf) < size+4 { m.buf = make([]byte, 2*size+4) @@ -83,27 +94,45 @@ func (m *LogBuffer) Shutdown() { } m.isStopping = true m.Lock() - m.flush() + toFlush := m.copyToFlush() m.Unlock() + m.flushChan <- toFlush + close(m.flushChan) } func (m *LogBuffer) loopFlush() { + for d := range m.flushChan { + if d != nil { + m.flushFn(d.startTime, d.stopTime, d.data) + } + } +} + +func (m *LogBuffer) loopInterval() { for !m.isStopping { m.Lock() - m.flush() + toFlush := m.copyToFlush() m.Unlock() + m.flushChan <- toFlush time.Sleep(m.flushInterval) } } -func (m *LogBuffer) flush() { +func (m *LogBuffer) copyToFlush() *dataToFlush { if m.flushFn != nil && m.pos > 0 { - // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) - m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos]) + fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) + debug.PrintStack() + d := &dataToFlush{ + startTime: m.startTime, + stopTime: m.stopTime, + data: copiedBytes(m.buf[:m.pos]), + } m.pos = 0 m.idx = m.idx[:0] + return d } + return nil } func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) { @@ -125,8 +154,14 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer lastTs := lastReadTime.UnixNano() l, h := 0, len(m.idx)-1 + /* + for i, pos := range m.idx { + ts := readTs(m.buf, pos) + fmt.Printf("entry %d ts: %v offset:%d\n", i, time.Unix(0,ts), pos) + } + fmt.Printf("l=%d, h=%d\n", l, h) + */ - // fmt.Printf("l=%d, h=%d\n", l, h) for { mid := (l + h) / 2 pos := m.idx[mid]