Browse Source

continue for reading from sealed memory buffer

pull/1293/head
Chris Lu 5 years ago
parent
commit
258fba8a0f
  1. 36
      weed/util/log_buffer/log_buffer.go

36
weed/util/log_buffer/log_buffer.go

@ -92,6 +92,9 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
copy(m.buf[m.pos:m.pos+4], m.sizeBuf) copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
m.pos += size + 4 m.pos += size + 4
// fmt.Printf("entry size %d total %d count %d\n", size, m.pos, len(m.idx))
} }
func (m *LogBuffer) Shutdown() { func (m *LogBuffer) Shutdown() {
@ -117,16 +120,12 @@ func (m *LogBuffer) loopFlush() {
func (m *LogBuffer) loopInterval() { func (m *LogBuffer) loopInterval() {
for !m.isStopping { for !m.isStopping {
time.Sleep(m.flushInterval)
m.Lock() m.Lock()
// println("loop interval")
toFlush := m.copyToFlush() toFlush := m.copyToFlush()
m.Unlock() m.Unlock()
m.flushChan <- toFlush m.flushChan <- toFlush
time.Sleep(m.flushInterval)
if m.notifyFn != nil {
// check whether blocked clients are already disconnected
println("notifying log buffer readers")
m.notifyFn()
}
} }
} }
@ -139,7 +138,8 @@ func (m *LogBuffer) copyToFlush() *dataToFlush {
stopTime: m.stopTime, stopTime: m.stopTime,
data: copiedBytes(m.buf[:m.pos]), data: copiedBytes(m.buf[:m.pos]),
} }
m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf)
// fmt.Printf("flusing [0,%d) with %d entries\n", m.pos, len(m.idx))
m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos)
m.pos = 0 m.pos = 0
m.idx = m.idx[:0] m.idx = m.idx[:0]
return d return d
@ -166,6 +166,20 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
return nil return nil
} }
if lastReadTime.Before(m.startTime) { if lastReadTime.Before(m.startTime) {
// println("checking ", lastReadTime.UnixNano())
for i, buf := range m.prevBuffers.buffers {
if buf.startTime.After(lastReadTime) {
if i == 0 {
println("return the earliest in memory")
return copiedBytes(buf.buf[:buf.size])
}
return copiedBytes(buf.buf[:buf.size])
}
if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) {
pos := buf.locateByTs(lastReadTime)
return copiedBytes(buf.buf[pos:])
}
}
return copiedBytes(m.buf[:m.pos]) return copiedBytes(m.buf[:m.pos])
} }
@ -227,16 +241,16 @@ func copiedBytes(buf []byte) (copied *bytes.Buffer) {
return return
} }
func readTs(buf []byte, pos int) (*filer_pb.LogEntry, int64) {
func readTs(buf []byte, pos int) (size int, ts int64) {
size := util.BytesToUint32(buf[pos : pos+4])
entryData := buf[pos+4 : pos+4+int(size)]
size = int(util.BytesToUint32(buf[pos : pos+4]))
entryData := buf[pos+4 : pos+4+size]
logEntry := &filer_pb.LogEntry{} logEntry := &filer_pb.LogEntry{}
err := proto.Unmarshal(entryData, logEntry) err := proto.Unmarshal(entryData, logEntry)
if err != nil { if err != nil {
glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err) glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err)
} }
return logEntry, logEntry.TsNs
return size, logEntry.TsNs
} }
Loading…
Cancel
Save