From 4bf959edf0195bdd80fc268f795f6e9710e0b269 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Apr 2020 17:26:38 -0700 Subject: [PATCH] message broker: read also from sealed memory buffer --- weed/util/log_buffer/log_buffer.go | 5 +++++ weed/util/log_buffer/log_buffer_test.go | 2 +- weed/util/log_buffer/sealed_buffer.go | 17 ++++++++++++++++- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index d875dd54b..f84a58c74 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -122,6 +122,11 @@ func (m *LogBuffer) loopInterval() { m.Unlock() m.flushChan <- toFlush time.Sleep(m.flushInterval) + if m.notifyFn != nil { + // check whether blocked clients are already disconnected + println("notifying log buffer readers") + m.notifyFn() + } } } diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index bf1d51703..f9ccc95c2 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -19,7 +19,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { startTime := time.Now() messageSize := 1024 - messageCount := 100 + messageCount := 5000 var buf = make([]byte, messageSize) for i := 0; i < messageCount; i++ { rand.Read(buf) diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go index c5160fad0..e412b5f32 100644 --- a/weed/util/log_buffer/sealed_buffer.go +++ b/weed/util/log_buffer/sealed_buffer.go @@ -4,6 +4,7 @@ import "time" type MemBuffer struct { buf []byte + size int startTime time.Time stopTime time.Time } @@ -25,16 +26,30 @@ func newSealedBuffers(size int) *SealedBuffers { return sbs } -func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte) (newBuf []byte) { +func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int) (newBuf []byte) { oldMemBuffer := sbs.buffers[0] size := len(sbs.buffers) for i := 0; i < size-1; i++ { sbs.buffers[i].buf = sbs.buffers[i+1].buf + sbs.buffers[i].size = sbs.buffers[i+1].size sbs.buffers[i].startTime = sbs.buffers[i+1].startTime sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime } sbs.buffers[size-1].buf = buf + sbs.buffers[size-1].size = pos sbs.buffers[size-1].startTime = startTime sbs.buffers[size-1].stopTime = stopTime return oldMemBuffer.buf } + +func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) { + lastReadTs := lastReadTime.UnixNano() + for pos < len(mb.buf) { + size, t := readTs(mb.buf, pos) + if t > lastReadTs { + return + } + pos += size + 4 + } + return len(mb.buf) +}