|
|
@ -1,8 +1,6 @@ |
|
|
|
package queue |
|
|
|
|
|
|
|
import ( |
|
|
|
"fmt" |
|
|
|
"runtime/debug" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
|
|
|
@ -121,8 +119,7 @@ func (m *LogBuffer) loopInterval() { |
|
|
|
func (m *LogBuffer) copyToFlush() *dataToFlush { |
|
|
|
|
|
|
|
if m.flushFn != nil && m.pos > 0 { |
|
|
|
fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) |
|
|
|
debug.PrintStack() |
|
|
|
// fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
|
|
|
|
d := &dataToFlush{ |
|
|
|
startTime: m.startTime, |
|
|
|
stopTime: m.stopTime, |
|
|
@ -154,26 +151,34 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer |
|
|
|
|
|
|
|
lastTs := lastReadTime.UnixNano() |
|
|
|
l, h := 0, len(m.idx)-1 |
|
|
|
|
|
|
|
/* |
|
|
|
for i, pos := range m.idx { |
|
|
|
ts := readTs(m.buf, pos) |
|
|
|
fmt.Printf("entry %d ts: %v offset:%d\n", i, time.Unix(0,ts), pos) |
|
|
|
for i, pos := range m.idx { |
|
|
|
logEntry, ts := readTs(m.buf, pos) |
|
|
|
event := &filer_pb.FullEventNotification{} |
|
|
|
proto.Unmarshal(logEntry.Data, event) |
|
|
|
entry := event.EventNotification.OldEntry |
|
|
|
if entry == nil { |
|
|
|
entry = event.EventNotification.NewEntry |
|
|
|
} |
|
|
|
fmt.Printf("l=%d, h=%d\n", l, h) |
|
|
|
*/ |
|
|
|
fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name) |
|
|
|
} |
|
|
|
fmt.Printf("l=%d, h=%d\n", l, h) |
|
|
|
*/ |
|
|
|
|
|
|
|
for l <= h { |
|
|
|
mid := (l + h) / 2 |
|
|
|
pos := m.idx[mid] |
|
|
|
t := readTs(m.buf, m.idx[mid]) |
|
|
|
_, t := readTs(m.buf, m.idx[mid]) |
|
|
|
if t <= lastTs { |
|
|
|
l = mid + 1 |
|
|
|
} else if lastTs < t { |
|
|
|
var prevT int64 |
|
|
|
if mid > 0 { |
|
|
|
prevT = readTs(m.buf, m.idx[mid-1]) |
|
|
|
_, prevT = readTs(m.buf, m.idx[mid-1]) |
|
|
|
} |
|
|
|
if prevT <= lastTs { |
|
|
|
// println("found mid = ", mid)
|
|
|
|
return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos]) |
|
|
|
} |
|
|
|
h = mid - 1 |
|
|
@ -181,6 +186,10 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer |
|
|
|
// fmt.Printf("l=%d, h=%d\n", l, h)
|
|
|
|
} |
|
|
|
|
|
|
|
// FIXME: this could be that the buffer has been flushed already
|
|
|
|
// println("not found")
|
|
|
|
return lastReadTime, nil |
|
|
|
|
|
|
|
} |
|
|
|
func copiedBytes(buf []byte) (copied []byte) { |
|
|
|
copied = make([]byte, len(buf)) |
|
|
@ -188,7 +197,7 @@ func copiedBytes(buf []byte) (copied []byte) { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
func readTs(buf []byte, pos int) int64 { |
|
|
|
func readTs(buf []byte, pos int) (*filer_pb.LogEntry, int64) { |
|
|
|
|
|
|
|
size := util.BytesToUint32(buf[pos : pos+4]) |
|
|
|
entryData := buf[pos+4 : pos+4+int(size)] |
|
|
@ -198,6 +207,6 @@ func readTs(buf []byte, pos int) int64 { |
|
|
|
if err != nil { |
|
|
|
glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err) |
|
|
|
} |
|
|
|
return logEntry.TsNs |
|
|
|
return logEntry, logEntry.TsNs |
|
|
|
|
|
|
|
} |