Browse Source

less logs

pull/7481/head
chrislu 1 month ago
parent
commit
befe0a149e
  1. 62
      weed/util/log_buffer/log_buffer.go

62
weed/util/log_buffer/log_buffer.go

@ -2,7 +2,6 @@ package log_buffer
import ( import (
"bytes" "bytes"
"fmt"
"math" "math"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -10,7 +9,6 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
@ -196,7 +194,7 @@ func (logBuffer *LogBuffer) notifySubscribers() {
return // No subscribers, skip notification return // No subscribers, skip notification
} }
for subscriberID, notifyChan := range logBuffer.subscribers {
for _, notifyChan := range logBuffer.subscribers {
select { select {
case notifyChan <- struct{}{}: case notifyChan <- struct{}{}:
// Notification sent successfully // Notification sent successfully
@ -204,7 +202,6 @@ func (logBuffer *LogBuffer) notifySubscribers() {
// Channel full - subscriber hasn't consumed previous notification yet // Channel full - subscriber hasn't consumed previous notification yet
// This is OK because one notification is sufficient to wake the subscriber // This is OK because one notification is sufficient to wake the subscriber
} }
_ = subscriberID
} }
} }
@ -217,7 +214,6 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn
highestOffset, err := getHighestOffsetFn() highestOffset, err := getHighestOffsetFn()
if err != nil { if err != nil {
glog.V(0).Infof("Failed to get highest offset for %s: %v, starting from 0", logBuffer.name, err)
return nil // Continue with offset 0 if we can't read existing data return nil // Continue with offset 0 if we can't read existing data
} }
@ -233,12 +229,9 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn
logBuffer.lastFlushedOffset.Store(highestOffset) logBuffer.lastFlushedOffset.Store(highestOffset)
// Set lastFlushedTime to current time (we know data up to highestOffset is on disk) // Set lastFlushedTime to current time (we know data up to highestOffset is on disk)
logBuffer.lastFlushTsNs.Store(time.Now().UnixNano()) logBuffer.lastFlushTsNs.Store(time.Now().UnixNano())
glog.V(0).Infof("Initialized LogBuffer %s offset to %d (highest existing: %d), buffer starts at %d, lastFlushedOffset=%d, lastFlushedTime=%v",
logBuffer.name, nextOffset, highestOffset, nextOffset, highestOffset, time.Now())
} else { } else {
logBuffer.bufferStartOffset = 0 // Start from offset 0 logBuffer.bufferStartOffset = 0 // Start from offset 0
// No data on disk yet // No data on disk yet
glog.V(0).Infof("No existing data found for %s, starting from offset 0, lastFlushedOffset=-1, lastFlushedTime=0", logBuffer.name)
} }
return nil return nil
@ -250,8 +243,6 @@ func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) {
// AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information // AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information
func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) {
logEntryData, _ := proto.Marshal(logEntry)
var toFlush *dataToFlush var toFlush *dataToFlush
logBuffer.Lock() logBuffer.Lock()
defer func() { defer func() {
@ -275,11 +266,11 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) {
ts = time.Unix(0, processingTsNs) ts = time.Unix(0, processingTsNs)
// Re-marshal with corrected timestamp // Re-marshal with corrected timestamp
logEntry.TsNs = processingTsNs logEntry.TsNs = processingTsNs
logEntryData, _ = proto.Marshal(logEntry)
} else { } else {
logBuffer.LastTsNs.Store(processingTsNs) logBuffer.LastTsNs.Store(processingTsNs)
} }
logEntryData, _ := proto.Marshal(logEntry)
size := len(logEntryData) size := len(logEntryData)
if logBuffer.pos == 0 { if logBuffer.pos == 0 {
@ -313,7 +304,6 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) {
const maxBufferSize = 1 << 30 // 1 GiB practical limit const maxBufferSize = 1 << 30 // 1 GiB practical limit
// Ensure 2*size + 4 won't overflow int and stays within practical bounds // Ensure 2*size + 4 won't overflow int and stays within practical bounds
if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 {
glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size)
return return
} }
// Safe to compute now that we've validated size is in valid range // Safe to compute now that we've validated size is in valid range
@ -350,8 +340,6 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
Key: partitionKey, Key: partitionKey,
} }
logEntryData, _ := proto.Marshal(logEntry)
var toFlush *dataToFlush var toFlush *dataToFlush
logBuffer.Lock() logBuffer.Lock()
defer func() { defer func() {
@ -380,20 +368,8 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
// Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads // Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads
logEntry.Offset = logBuffer.offset logEntry.Offset = logBuffer.offset
// DEBUG: Log data being added to buffer for GitHub Actions debugging
dataPreview := ""
if len(data) > 0 {
if len(data) <= 50 {
dataPreview = string(data)
} else {
dataPreview = fmt.Sprintf("%s...(total %d bytes)", string(data[:50]), len(data))
}
}
glog.V(2).Infof("[LOG_BUFFER_ADD] buffer=%s offset=%d dataLen=%d dataPreview=%q",
logBuffer.name, logBuffer.offset, len(data), dataPreview)
// Marshal with correct timestamp and offset // Marshal with correct timestamp and offset
logEntryData, _ = proto.Marshal(logEntry)
logEntryData, _ := proto.Marshal(logEntry)
size := len(logEntryData) size := len(logEntryData)
@ -419,7 +395,6 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
} }
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
// glog.V(0).Infof("%s copyToFlush1 offset:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.offset, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos)
toFlush = logBuffer.copyToFlush() toFlush = logBuffer.copyToFlush()
logBuffer.startTime = ts logBuffer.startTime = ts
if len(logBuffer.buf) < size+4 { if len(logBuffer.buf) < size+4 {
@ -427,7 +402,6 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
const maxBufferSize = 1 << 30 // 1 GiB practical limit const maxBufferSize = 1 << 30 // 1 GiB practical limit
// Ensure 2*size + 4 won't overflow int and stays within practical bounds // Ensure 2*size + 4 won't overflow int and stays within practical bounds
if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 {
glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size)
return return
} }
// Safe to compute now that we've validated size is in valid range // Safe to compute now that we've validated size is in valid range
@ -470,14 +444,11 @@ func (logBuffer *LogBuffer) ForceFlush() {
select { select {
case <-toFlush.done: case <-toFlush.done:
// Flush completed successfully // Flush completed successfully
glog.V(1).Infof("ForceFlush completed for %s", logBuffer.name)
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
// Timeout waiting for flush - this shouldn't happen // Timeout waiting for flush - this shouldn't happen
glog.Warningf("ForceFlush timed out waiting for completion on %s", logBuffer.name)
} }
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
// If flush channel is still blocked after 2s, something is wrong // If flush channel is still blocked after 2s, something is wrong
glog.Warningf("ForceFlush channel timeout for %s - flush channel busy for 2s", logBuffer.name)
} }
} }
} }
@ -501,7 +472,6 @@ func (logBuffer *LogBuffer) IsAllFlushed() bool {
func (logBuffer *LogBuffer) loopFlush() { func (logBuffer *LogBuffer) loopFlush() {
for d := range logBuffer.flushChan { for d := range logBuffer.flushChan {
if d != nil { if d != nil {
// glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes()))
logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset) logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset)
d.releaseMemory() d.releaseMemory()
// local logbuffer is different from aggregate logbuffer here // local logbuffer is different from aggregate logbuffer here
@ -536,10 +506,7 @@ func (logBuffer *LogBuffer) loopInterval() {
toFlush := logBuffer.copyToFlush() toFlush := logBuffer.copyToFlush()
logBuffer.Unlock() logBuffer.Unlock()
if toFlush != nil { if toFlush != nil {
glog.V(4).Infof("%s flush [%v, %v] size %d", logBuffer.name, toFlush.startTime, toFlush.stopTime, len(toFlush.data.Bytes()))
logBuffer.flushChan <- toFlush logBuffer.flushChan <- toFlush
} else {
// glog.V(0).Infof("%s no flush", m.name)
} }
} }
} }
@ -568,9 +535,7 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush
if withCallback { if withCallback {
d.done = make(chan struct{}) d.done = make(chan struct{})
} }
// glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime)
} else { } else {
// glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime)
logBuffer.lastFlushDataTime = logBuffer.stopTime logBuffer.lastFlushDataTime = logBuffer.stopTime
} }
// CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1 // CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1
@ -637,8 +602,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
defer logBuffer.RUnlock() defer logBuffer.RUnlock()
isOffsetBased := lastReadPosition.IsOffsetBased isOffsetBased := lastReadPosition.IsOffsetBased
glog.V(2).Infof("[ReadFromBuffer] %s: isOffsetBased=%v, position=%+v, bufferStartOffset=%d, offset=%d, pos=%d",
logBuffer.name, isOffsetBased, lastReadPosition, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos)
// For offset-based subscriptions, use offset comparisons, not time comparisons! // For offset-based subscriptions, use offset comparisons, not time comparisons!
if isOffsetBased { if isOffsetBased {
@ -719,11 +682,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
if !logBuffer.startTime.IsZero() { if !logBuffer.startTime.IsZero() {
tsMemory = logBuffer.startTime tsMemory = logBuffer.startTime
} }
glog.V(2).Infof("[ReadFromBuffer] %s: checking prevBuffers, count=%d, currentStartTime=%v",
logBuffer.name, len(logBuffer.prevBuffers.buffers), logBuffer.startTime)
for i, prevBuf := range logBuffer.prevBuffers.buffers {
glog.V(2).Infof("[ReadFromBuffer] %s: prevBuf[%d]: startTime=%v stopTime=%v size=%d startOffset=%d endOffset=%d",
logBuffer.name, i, prevBuf.startTime, prevBuf.stopTime, prevBuf.size, prevBuf.startOffset, prevBuf.offset)
for _, prevBuf := range logBuffer.prevBuffers.buffers {
if !prevBuf.startTime.IsZero() { if !prevBuf.startTime.IsZero() {
// If tsMemory is zero, assign directly; otherwise compare // If tsMemory is zero, assign directly; otherwise compare
if tsMemory.IsZero() || prevBuf.startTime.Before(tsMemory) { if tsMemory.IsZero() || prevBuf.startTime.Before(tsMemory) {
@ -744,19 +703,12 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
// Fall through to case 2.1 to read from earliest buffer // Fall through to case 2.1 to read from earliest buffer
} else if lastReadPosition.Offset <= 0 && lastReadPosition.Time.Before(tsMemory) { } else if lastReadPosition.Offset <= 0 && lastReadPosition.Time.Before(tsMemory) {
// Treat first read with sentinel/zero offset as inclusive of earliest in-memory data // Treat first read with sentinel/zero offset as inclusive of earliest in-memory data
glog.V(4).Infof("first read (offset=%d) at time %v before earliest memory %v, reading from memory",
lastReadPosition.Offset, lastReadPosition.Time, tsMemory)
} else { } else {
// Data not in memory buffers - read from disk // Data not in memory buffers - read from disk
glog.V(0).Infof("[ReadFromBuffer] %s resume from disk: requested time %v < earliest memory time %v",
logBuffer.name, lastReadPosition.Time, tsMemory)
return nil, -2, ResumeFromDiskError return nil, -2, ResumeFromDiskError
} }
} }
glog.V(2).Infof("[ReadFromBuffer] %s: time-based read continuing, tsMemory=%v, lastReadPos=%v",
logBuffer.name, tsMemory, lastReadPosition.Time)
// the following is case 2.1 // the following is case 2.1
if lastReadPosition.Time.Equal(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() { if lastReadPosition.Time.Equal(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() {
@ -766,14 +718,12 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
} }
} }
if lastReadPosition.Time.After(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() { if lastReadPosition.Time.After(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() {
// glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime)
return nil, logBuffer.offset, nil return nil, logBuffer.offset, nil
} }
// Also check prevBuffers when current buffer is empty (startTime is zero) // Also check prevBuffers when current buffer is empty (startTime is zero)
if lastReadPosition.Time.Before(logBuffer.startTime) || logBuffer.startTime.IsZero() { if lastReadPosition.Time.Before(logBuffer.startTime) || logBuffer.startTime.IsZero() {
for _, buf := range logBuffer.prevBuffers.buffers { for _, buf := range logBuffer.prevBuffers.buffers {
if buf.startTime.After(lastReadPosition.Time) { if buf.startTime.After(lastReadPosition.Time) {
// glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime)
return copiedBytes(buf.buf[:buf.size]), buf.offset, nil return copiedBytes(buf.buf[:buf.size]), buf.offset, nil
} }
if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) { if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) {
@ -782,13 +732,11 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
searchTime = searchTime.Add(-time.Nanosecond) searchTime = searchTime.Add(-time.Nanosecond)
} }
pos := buf.locateByTs(searchTime) pos := buf.locateByTs(searchTime)
glog.V(2).Infof("[ReadFromBuffer] %s: found data in prevBuffer at pos %d, bufSize=%d", logBuffer.name, pos, buf.size)
return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil
} }
} }
// If current buffer is not empty, return it // If current buffer is not empty, return it
if logBuffer.pos > 0 { if logBuffer.pos > 0 {
// glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition)
return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
} }
// Buffer is empty and no data in prevBuffers - wait for new data // Buffer is empty and no data in prevBuffers - wait for new data
@ -879,7 +827,7 @@ func readTs(buf []byte, pos int) (size int, ts int64) {
err := proto.Unmarshal(entryData, logEntry) err := proto.Unmarshal(entryData, logEntry)
if err != nil { if err != nil {
glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err)
return 0, 0
} }
return size, logEntry.TsNs return size, logEntry.TsNs

Loading…
Cancel
Save