From befe0a149e982d3054a0818499386f4274c86d67 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Nov 2025 13:11:15 -0800 Subject: [PATCH] less logs --- weed/util/log_buffer/log_buffer.go | 62 +++--------------------------- 1 file changed, 5 insertions(+), 57 deletions(-) diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index e87aa318f..63297fd1f 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -2,7 +2,6 @@ package log_buffer import ( "bytes" - "fmt" "math" "sync" "sync/atomic" @@ -10,7 +9,6 @@ import ( "google.golang.org/protobuf/proto" - "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" @@ -196,7 +194,7 @@ func (logBuffer *LogBuffer) notifySubscribers() { return // No subscribers, skip notification } - for subscriberID, notifyChan := range logBuffer.subscribers { + for _, notifyChan := range logBuffer.subscribers { select { case notifyChan <- struct{}{}: // Notification sent successfully @@ -204,7 +202,6 @@ func (logBuffer *LogBuffer) notifySubscribers() { // Channel full - subscriber hasn't consumed previous notification yet // This is OK because one notification is sufficient to wake the subscriber } - _ = subscriberID } } @@ -217,7 +214,6 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn highestOffset, err := getHighestOffsetFn() if err != nil { - glog.V(0).Infof("Failed to get highest offset for %s: %v, starting from 0", logBuffer.name, err) return nil // Continue with offset 0 if we can't read existing data } @@ -233,12 +229,9 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn logBuffer.lastFlushedOffset.Store(highestOffset) // Set lastFlushedTime to current time (we know data up to highestOffset is on disk) logBuffer.lastFlushTsNs.Store(time.Now().UnixNano()) - glog.V(0).Infof("Initialized LogBuffer %s offset to %d (highest existing: %d), buffer starts at %d, lastFlushedOffset=%d, lastFlushedTime=%v", - logBuffer.name, nextOffset, highestOffset, nextOffset, highestOffset, time.Now()) } else { logBuffer.bufferStartOffset = 0 // Start from offset 0 // No data on disk yet - glog.V(0).Infof("No existing data found for %s, starting from offset 0, lastFlushedOffset=-1, lastFlushedTime=0", logBuffer.name) } return nil @@ -250,8 +243,6 @@ func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) { // AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { - logEntryData, _ := proto.Marshal(logEntry) - var toFlush *dataToFlush logBuffer.Lock() defer func() { @@ -275,11 +266,11 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { ts = time.Unix(0, processingTsNs) // Re-marshal with corrected timestamp logEntry.TsNs = processingTsNs - logEntryData, _ = proto.Marshal(logEntry) } else { logBuffer.LastTsNs.Store(processingTsNs) } + logEntryData, _ := proto.Marshal(logEntry) size := len(logEntryData) if logBuffer.pos == 0 { @@ -313,7 +304,6 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { const maxBufferSize = 1 << 30 // 1 GiB practical limit // Ensure 2*size + 4 won't overflow int and stays within practical bounds if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { - glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size) return } // Safe to compute now that we've validated size is in valid range @@ -350,8 +340,6 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin Key: partitionKey, } - logEntryData, _ := proto.Marshal(logEntry) - var toFlush *dataToFlush logBuffer.Lock() defer func() { @@ -380,20 +368,8 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin // Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads logEntry.Offset = logBuffer.offset - // DEBUG: Log data being added to buffer for GitHub Actions debugging - dataPreview := "" - if len(data) > 0 { - if len(data) <= 50 { - dataPreview = string(data) - } else { - dataPreview = fmt.Sprintf("%s...(total %d bytes)", string(data[:50]), len(data)) - } - } - glog.V(2).Infof("[LOG_BUFFER_ADD] buffer=%s offset=%d dataLen=%d dataPreview=%q", - logBuffer.name, logBuffer.offset, len(data), dataPreview) - // Marshal with correct timestamp and offset - logEntryData, _ = proto.Marshal(logEntry) + logEntryData, _ := proto.Marshal(logEntry) size := len(logEntryData) @@ -419,7 +395,6 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin } if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { - // glog.V(0).Infof("%s copyToFlush1 offset:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.offset, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos) toFlush = logBuffer.copyToFlush() logBuffer.startTime = ts if len(logBuffer.buf) < size+4 { @@ -427,7 +402,6 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin const maxBufferSize = 1 << 30 // 1 GiB practical limit // Ensure 2*size + 4 won't overflow int and stays within practical bounds if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { - glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size) return } // Safe to compute now that we've validated size is in valid range @@ -470,14 +444,11 @@ func (logBuffer *LogBuffer) ForceFlush() { select { case <-toFlush.done: // Flush completed successfully - glog.V(1).Infof("ForceFlush completed for %s", logBuffer.name) case <-time.After(5 * time.Second): // Timeout waiting for flush - this shouldn't happen - glog.Warningf("ForceFlush timed out waiting for completion on %s", logBuffer.name) } case <-time.After(2 * time.Second): // If flush channel is still blocked after 2s, something is wrong - glog.Warningf("ForceFlush channel timeout for %s - flush channel busy for 2s", logBuffer.name) } } } @@ -501,7 +472,6 @@ func (logBuffer *LogBuffer) IsAllFlushed() bool { func (logBuffer *LogBuffer) loopFlush() { for d := range logBuffer.flushChan { if d != nil { - // glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes())) logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset) d.releaseMemory() // local logbuffer is different from aggregate logbuffer here @@ -536,10 +506,7 @@ func (logBuffer *LogBuffer) loopInterval() { toFlush := logBuffer.copyToFlush() logBuffer.Unlock() if toFlush != nil { - glog.V(4).Infof("%s flush [%v, %v] size %d", logBuffer.name, toFlush.startTime, toFlush.stopTime, len(toFlush.data.Bytes())) logBuffer.flushChan <- toFlush - } else { - // glog.V(0).Infof("%s no flush", m.name) } } } @@ -568,9 +535,7 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush if withCallback { d.done = make(chan struct{}) } - // glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) } else { - // glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) logBuffer.lastFlushDataTime = logBuffer.stopTime } // CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1 @@ -637,8 +602,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu defer logBuffer.RUnlock() isOffsetBased := lastReadPosition.IsOffsetBased - glog.V(2).Infof("[ReadFromBuffer] %s: isOffsetBased=%v, position=%+v, bufferStartOffset=%d, offset=%d, pos=%d", - logBuffer.name, isOffsetBased, lastReadPosition, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos) // For offset-based subscriptions, use offset comparisons, not time comparisons! if isOffsetBased { @@ -719,11 +682,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu if !logBuffer.startTime.IsZero() { tsMemory = logBuffer.startTime } - glog.V(2).Infof("[ReadFromBuffer] %s: checking prevBuffers, count=%d, currentStartTime=%v", - logBuffer.name, len(logBuffer.prevBuffers.buffers), logBuffer.startTime) - for i, prevBuf := range logBuffer.prevBuffers.buffers { - glog.V(2).Infof("[ReadFromBuffer] %s: prevBuf[%d]: startTime=%v stopTime=%v size=%d startOffset=%d endOffset=%d", - logBuffer.name, i, prevBuf.startTime, prevBuf.stopTime, prevBuf.size, prevBuf.startOffset, prevBuf.offset) + for _, prevBuf := range logBuffer.prevBuffers.buffers { if !prevBuf.startTime.IsZero() { // If tsMemory is zero, assign directly; otherwise compare if tsMemory.IsZero() || prevBuf.startTime.Before(tsMemory) { @@ -744,19 +703,12 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu // Fall through to case 2.1 to read from earliest buffer } else if lastReadPosition.Offset <= 0 && lastReadPosition.Time.Before(tsMemory) { // Treat first read with sentinel/zero offset as inclusive of earliest in-memory data - glog.V(4).Infof("first read (offset=%d) at time %v before earliest memory %v, reading from memory", - lastReadPosition.Offset, lastReadPosition.Time, tsMemory) } else { // Data not in memory buffers - read from disk - glog.V(0).Infof("[ReadFromBuffer] %s resume from disk: requested time %v < earliest memory time %v", - logBuffer.name, lastReadPosition.Time, tsMemory) return nil, -2, ResumeFromDiskError } } - glog.V(2).Infof("[ReadFromBuffer] %s: time-based read continuing, tsMemory=%v, lastReadPos=%v", - logBuffer.name, tsMemory, lastReadPosition.Time) - // the following is case 2.1 if lastReadPosition.Time.Equal(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() { @@ -766,14 +718,12 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu } } if lastReadPosition.Time.After(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() { - // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime) return nil, logBuffer.offset, nil } // Also check prevBuffers when current buffer is empty (startTime is zero) if lastReadPosition.Time.Before(logBuffer.startTime) || logBuffer.startTime.IsZero() { for _, buf := range logBuffer.prevBuffers.buffers { if buf.startTime.After(lastReadPosition.Time) { - // glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime) return copiedBytes(buf.buf[:buf.size]), buf.offset, nil } if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) { @@ -782,13 +732,11 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu searchTime = searchTime.Add(-time.Nanosecond) } pos := buf.locateByTs(searchTime) - glog.V(2).Infof("[ReadFromBuffer] %s: found data in prevBuffer at pos %d, bufSize=%d", logBuffer.name, pos, buf.size) return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil } } // If current buffer is not empty, return it if logBuffer.pos > 0 { - // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition) return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil } // Buffer is empty and no data in prevBuffers - wait for new data @@ -879,7 +827,7 @@ func readTs(buf []byte, pos int) (size int, ts int64) { err := proto.Unmarshal(entryData, logEntry) if err != nil { - glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err) + return 0, 0 } return size, logEntry.TsNs