From 3833dac3f71b23ef246753e79448a9402a4ddd02 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 17 Oct 2021 13:53:04 -0700 Subject: [PATCH] continue to read from memory if there is no flush --- weed/util/log_buffer/log_buffer.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index ed56a710e..d5e6cb214 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -29,6 +29,7 @@ type LogBuffer struct { pos int startTime time.Time stopTime time.Time + lastFlushTime time.Time sizeBuf []byte flushInterval time.Duration flushFn func(startTime, stopTime time.Time, buf []byte) @@ -132,6 +133,8 @@ func (m *LogBuffer) loopFlush() { // glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes())) m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) d.releaseMemory() + // local logbuffer is different from aggregate logbuffer here + m.lastFlushTime = d.stopTime } } } @@ -162,6 +165,9 @@ func (m *LogBuffer) copyToFlush() *dataToFlush { data: copiedBytes(m.buf[:m.pos]), } // glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) + } else { + // glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) + m.lastFlushTime = m.stopTime } m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) m.startTime = time.Unix(0, 0) @@ -203,7 +209,10 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu if tsMemory.IsZero() { // case 2.2 return nil, nil } else if lastReadTime.Before(tsMemory) { // case 2.3 - return nil, ResumeFromDiskError + if !m.lastFlushTime.IsZero() { + glog.V(0).Infof("resume with last flush time: %v", m.lastFlushTime) + return nil, ResumeFromDiskError + } } // the following is case 2.1