|
@ -116,7 +116,7 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) |
|
|
copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) |
|
|
copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) |
|
|
m.pos += size + 4 |
|
|
m.pos += size + 4 |
|
|
|
|
|
|
|
|
// fmt.Printf("entry size %d total %d count %d, buffer:%p\n", size, m.pos, len(m.idx), m)
|
|
|
|
|
|
|
|
|
// fmt.Printf("partitionKey %v entry size %d total %d count %d\n", string(partitionKey), size, m.pos, len(m.idx))
|
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -157,6 +157,8 @@ func (m *LogBuffer) loopInterval() { |
|
|
m.Unlock() |
|
|
m.Unlock() |
|
|
if toFlush != nil { |
|
|
if toFlush != nil { |
|
|
m.flushChan <- toFlush |
|
|
m.flushChan <- toFlush |
|
|
|
|
|
} else { |
|
|
|
|
|
// glog.V(0).Infof("%s no flush", m.name)
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|