From d220875ef4601e74b6ab49e47f8e2dd36510482c Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 24 Oct 2025 00:49:14 -0700 Subject: [PATCH] Revert "fix reading" This reverts commit 64a4ce93580258b8d5537416dfb5d9a1a8f14ee2. --- weed/filer/filer_notify.go | 3 +- weed/server/filer_grpc_server_sub_meta.go | 78 ++++----------- weed/util/log_buffer/log_buffer.go | 97 ++++++------------- .../log_buffer_queryability_test.go | 55 ----------- weed/util/log_buffer/log_read.go | 5 +- weed/util/log_buffer/log_read_stateless.go | 2 +- 6 files changed, 58 insertions(+), 182 deletions(-) diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 2921d709b..1867ccc07 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -3,13 +3,12 @@ package filer import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "io" "regexp" "strings" "time" - "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" - "google.golang.org/protobuf/proto" "github.com/seaweedfs/seaweedfs/weed/glog" diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 566f24ad7..f4df550e6 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -69,30 +69,14 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, if processedTsNs != 0 { lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) } else { - // No data found on disk - // Check if we previously got ResumeFromDiskError from memory, meaning we're in a gap - if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) { - // We have a gap: requested time < earliest memory time, but no data on disk - // Skip forward to earliest memory time to avoid infinite loop - earliestTime := fs.filer.MetaAggregator.MetaLogBuffer.GetEarliestTime() - if !earliestTime.IsZero() && earliestTime.After(lastReadTime.Time) { - glog.V(3).Infof("gap detected: skipping from %v to earliest memory time %v for %v", - lastReadTime.Time, earliestTime, clientName) - // Position at earliest time; time-based reader will include it - lastReadTime = log_buffer.NewMessagePosition(earliestTime.UnixNano(), -2) - readInMemoryLogErr = nil // Clear the error since we're skipping forward - } - } else { - // First pass or no ResumeFromDiskError yet - check the next day for logs - nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) - position := log_buffer.NewMessagePosition(nextDayTs, -2) - found, err := fs.filer.HasPersistedLogFiles(position) - if err != nil { - return fmt.Errorf("checking persisted log files: %w", err) - } - if found { - lastReadTime = position - } + nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) + position := log_buffer.NewMessagePosition(nextDayTs, -2) + found, err := fs.filer.HasPersistedLogFiles(position) + if err != nil { + return fmt.Errorf("checking persisted log files: %w", err) + } + if found { + lastReadTime = position } } @@ -107,16 +91,12 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } fs.filer.MetaAggregator.ListenersLock.Lock() - atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, 1) fs.filer.MetaAggregator.ListenersCond.Wait() - atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, -1) fs.filer.MetaAggregator.ListenersLock.Unlock() return fs.hasClient(req.ClientId, req.ClientEpoch) }, eachLogEntryFn) if readInMemoryLogErr != nil { if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) { - // Memory says data is too old - will read from disk on next iteration - // But if disk also has no data (gap in history), we'll skip forward continue } glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) @@ -186,39 +166,23 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq if processedTsNs != 0 { lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) } else { - // No data found on disk - // Check if we previously got ResumeFromDiskError from memory, meaning we're in a gap if readInMemoryLogErr == log_buffer.ResumeFromDiskError { - // We have a gap: requested time < earliest memory time, but no data on disk - // Skip forward to earliest memory time to avoid infinite loop - earliestTime := fs.filer.LocalMetaLogBuffer.GetEarliestTime() - if !earliestTime.IsZero() && earliestTime.After(lastReadTime.Time) { - glog.V(3).Infof("gap detected: skipping from %v to earliest memory time %v for %v", - lastReadTime.Time, earliestTime, clientName) - // Position at earliest time; time-based reader will include it - lastReadTime = log_buffer.NewMessagePosition(earliestTime.UnixNano(), -2) - readInMemoryLogErr = nil // Clear the error since we're skipping forward - } else { - // No memory data yet, just wait - time.Sleep(1127 * time.Millisecond) - continue - } - } else { - // First pass or no ResumeFromDiskError yet - // Check the next day for logs - nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) - position := log_buffer.NewMessagePosition(nextDayTs, -2) - found, err := fs.filer.HasPersistedLogFiles(position) - if err != nil { - return fmt.Errorf("checking persisted log files: %w", err) - } - if found { - lastReadTime = position - } + time.Sleep(1127 * time.Millisecond) + continue + } + // If no persisted entries were read for this day, check the next day for logs + nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) + position := log_buffer.NewMessagePosition(nextDayTs, -2) + found, err := fs.filer.HasPersistedLogFiles(position) + if err != nil { + return fmt.Errorf("checking persisted log files: %w", err) + } + if found { + lastReadTime = position } } - glog.V(3).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 5f4dc8099..5aeb285ca 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -235,7 +235,7 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn // Set the next offset to be one after the highest existing offset nextOffset := highestOffset + 1 logBuffer.offset = nextOffset - // bufferStartOffset should match offset after initialization + // CRITICAL FIX: bufferStartOffset should match offset after initialization // This ensures that reads for old offsets (0...highestOffset) will trigger disk reads // New data written after this will start at nextOffset logBuffer.bufferStartOffset = nextOffset @@ -299,7 +299,7 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { } // Track offset ranges for Kafka integration - // Use >= 0 to include offset 0 (first message in a topic) + // CRITICAL FIX: Use >= 0 to include offset 0 (first message in a topic) if logEntry.Offset >= 0 { if !logBuffer.hasOffsets { logBuffer.minOffset = logEntry.Offset @@ -385,7 +385,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin logBuffer.LastTsNs.Store(processingTsNs) } - // Set the offset in the LogEntry before marshaling + // CRITICAL FIX: Set the offset in the LogEntry before marshaling // This ensures the flushed data contains the correct offset information // Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads logEntry.Offset = logBuffer.offset @@ -414,7 +414,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin } // Track offset ranges for Kafka integration - // Track the current offset being written + // CRITICAL FIX: Track the current offset being written if !logBuffer.hasOffsets { logBuffer.minOffset = logBuffer.offset logBuffer.maxOffset = logBuffer.offset @@ -518,7 +518,7 @@ func (logBuffer *LogBuffer) loopFlush() { logBuffer.lastFlushDataTime = d.stopTime // CRITICAL: Track what's been flushed to disk for both offset-based and time-based reads - // Use >= 0 to include offset 0 (first message in a topic) + // CRITICAL FIX: Use >= 0 to include offset 0 (first message in a topic) if d.maxOffset >= 0 { logBuffer.lastFlushedOffset.Store(d.maxOffset) } @@ -586,10 +586,8 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush // CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1 lastOffsetInBuffer := logBuffer.offset - 1 logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.bufferStartOffset, lastOffsetInBuffer) - // Use zero time (time.Time{}) not epoch time (time.Unix(0,0)) - // Epoch time (1970) breaks time-based reads after flush - logBuffer.startTime = time.Time{} - logBuffer.stopTime = time.Time{} + logBuffer.startTime = time.Unix(0, 0) + logBuffer.stopTime = time.Unix(0, 0) logBuffer.pos = 0 logBuffer.idx = logBuffer.idx[:0] // DON'T increment offset - it's already pointing to the next offset! @@ -600,7 +598,7 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush logBuffer.minOffset = 0 logBuffer.maxOffset = 0 - // Invalidate disk cache chunks after flush + // CRITICAL FIX: Invalidate disk cache chunks after flush // The cache may contain stale data from before this flush // Invalidating ensures consumers will re-read fresh data from disk after flush logBuffer.invalidateAllDiskCacheChunks() @@ -641,10 +639,8 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu defer logBuffer.RUnlock() isOffsetBased := lastReadPosition.IsOffsetBased - glog.V(2).Infof("[ReadFromBuffer] %s: isOffsetBased=%v, position=%+v, bufferStartOffset=%d, offset=%d, pos=%d", - logBuffer.name, isOffsetBased, lastReadPosition, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos) - // For offset-based subscriptions, use offset comparisons, not time comparisons! + // CRITICAL FIX: For offset-based subscriptions, use offset comparisons, not time comparisons! if isOffsetBased { requestedOffset := lastReadPosition.Offset @@ -652,7 +648,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu if requestedOffset >= logBuffer.bufferStartOffset && requestedOffset <= logBuffer.offset { // If current buffer is empty (pos=0), check if data is on disk or not yet written if logBuffer.pos == 0 { - // If buffer is empty but offset range covers the request, + // CRITICAL FIX: If buffer is empty but offset range covers the request, // it means data was in memory and has been flushed/moved out. // The bufferStartOffset advancing to cover this offset proves data existed. // @@ -723,22 +719,15 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu if !logBuffer.startTime.IsZero() { tsMemory = logBuffer.startTime } - glog.V(2).Infof("[ReadFromBuffer] %s: checking prevBuffers, count=%d, currentStartTime=%v", - logBuffer.name, len(logBuffer.prevBuffers.buffers), logBuffer.startTime) - for i, prevBuf := range logBuffer.prevBuffers.buffers { - glog.V(2).Infof("[ReadFromBuffer] %s: prevBuf[%d]: startTime=%v stopTime=%v size=%d startOffset=%d endOffset=%d", - logBuffer.name, i, prevBuf.startTime, prevBuf.stopTime, prevBuf.size, prevBuf.startOffset, prevBuf.offset) - if !prevBuf.startTime.IsZero() { - // If tsMemory is zero, assign directly; otherwise compare - if tsMemory.IsZero() || prevBuf.startTime.Before(tsMemory) { - tsMemory = prevBuf.startTime - } + for _, prevBuf := range logBuffer.prevBuffers.buffers { + if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { + tsMemory = prevBuf.startTime } } if tsMemory.IsZero() { // case 2.2 return nil, -2, nil } else if lastReadPosition.Time.Before(tsMemory) { // case 2.3 - // For time-based reads, only check timestamp for disk reads + // CRITICAL FIX: For time-based reads, only check timestamp for disk reads // Don't use offset comparisons as they're not meaningful for time-based subscriptions // Special case: If requested time is zero (Unix epoch), treat as "start from beginning" @@ -746,67 +735,45 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 { // Start from the beginning of memory // Fall through to case 2.1 to read from earliest buffer - } else if lastReadPosition.Offset <= 0 && lastReadPosition.Time.Before(tsMemory) { - // Treat first read with sentinel/zero offset as inclusive of earliest in-memory data - glog.V(4).Infof("first read (offset=%d) at time %v before earliest memory %v, reading from memory", - lastReadPosition.Offset, lastReadPosition.Time, tsMemory) + } else if lastReadPosition.Offset == 0 && lastReadPosition.Time.Before(tsMemory) { + // CRITICAL FIX: If this is the first read (offset=0) and time is slightly before memory, + // it's likely a race between starting to read and first message being written + // Fall through to case 2.1 to read from earliest buffer instead of triggering disk read + glog.V(2).Infof("first read at time %v before earliest memory %v, reading from memory", + lastReadPosition.Time, tsMemory) } else { // Data not in memory buffers - read from disk - glog.V(0).Infof("[ReadFromBuffer] %s resume from disk: requested time %v < earliest memory time %v", - logBuffer.name, lastReadPosition.Time, tsMemory) + glog.V(0).Infof("resume from disk: requested time %v < earliest memory time %v", + lastReadPosition.Time, tsMemory) return nil, -2, ResumeFromDiskError } } - glog.V(2).Infof("[ReadFromBuffer] %s: time-based read continuing, tsMemory=%v, lastReadPos=%v", - logBuffer.name, tsMemory, lastReadPosition.Time) - // the following is case 2.1 - if lastReadPosition.Time.Equal(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() { - // For first-read sentinel/zero offset, allow inclusive read at the boundary - if lastReadPosition.Offset > 0 { - return nil, logBuffer.offset, nil - } + if lastReadPosition.Time.Equal(logBuffer.stopTime) { + return nil, logBuffer.offset, nil } - if lastReadPosition.Time.After(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() { + if lastReadPosition.Time.After(logBuffer.stopTime) { // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime) return nil, logBuffer.offset, nil } - // Also check prevBuffers when current buffer is empty (startTime is zero) - if lastReadPosition.Time.Before(logBuffer.startTime) || logBuffer.startTime.IsZero() { + if lastReadPosition.Time.Before(logBuffer.startTime) { for _, buf := range logBuffer.prevBuffers.buffers { if buf.startTime.After(lastReadPosition.Time) { // glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime) return copiedBytes(buf.buf[:buf.size]), buf.offset, nil } if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) { - searchTime := lastReadPosition.Time - if lastReadPosition.Offset <= 0 { - searchTime = searchTime.Add(-time.Nanosecond) - } - pos := buf.locateByTs(searchTime) - glog.V(2).Infof("[ReadFromBuffer] %s: found data in prevBuffer at pos %d, bufSize=%d", logBuffer.name, pos, buf.size) + pos := buf.locateByTs(lastReadPosition.Time) return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil } } - // If current buffer is not empty, return it - if logBuffer.pos > 0 { - // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition) - return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil - } - // Buffer is empty and no data in prevBuffers - wait for new data - return nil, logBuffer.offset, nil + // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition) + return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil } lastTs := lastReadPosition.Time.UnixNano() - // Inclusive boundary for first-read sentinel/zero offset - searchTs := lastTs - if lastReadPosition.Offset <= 0 { - if searchTs > math.MinInt64+1 { // prevent underflow - searchTs = searchTs - 1 - } - } l, h := 0, len(logBuffer.idx)-1 /* @@ -825,14 +792,14 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu mid := (l + h) / 2 pos := logBuffer.idx[mid] _, t := readTs(logBuffer.buf, pos) - if t <= searchTs { + if t <= lastTs { l = mid + 1 - } else if searchTs < t { + } else if lastTs < t { var prevT int64 if mid > 0 { _, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1]) } - if prevT <= searchTs { + if prevT <= lastTs { return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil } h = mid diff --git a/weed/util/log_buffer/log_buffer_queryability_test.go b/weed/util/log_buffer/log_buffer_queryability_test.go index 16dd0f9b0..6e372d2b3 100644 --- a/weed/util/log_buffer/log_buffer_queryability_test.go +++ b/weed/util/log_buffer/log_buffer_queryability_test.go @@ -236,58 +236,3 @@ func TestSchemaRegistryScenario(t *testing.T) { t.Logf("Schema registry scenario test passed - schema value preserved: %d bytes", len(retrievedEntry.Data)) } - -// TestTimeBasedFirstReadBeforeEarliest ensures starting slightly before earliest memory -// does not force a disk resume and returns in-memory data (regression test) -func TestTimeBasedFirstReadBeforeEarliest(t *testing.T) { - flushed := false - logBuffer := NewLogBuffer("local", 10*time.Minute, - func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { - // keep in memory; we just want earliest time populated - _ = buf - }, - func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { - // disk should not be consulted in this regression path - return startPosition, false, nil - }, - func() {}) - - // Seed one entry so earliestTime is set - baseTs := time.Now().Add(-time.Second) - entry := &filer_pb.LogEntry{TsNs: baseTs.UnixNano(), Data: []byte("x"), Key: []byte("k"), Offset: 0} - logBuffer.AddLogEntryToBuffer(entry) - _ = flushed - - // Start read 1ns before earliest memory, with offset sentinel (-2) - startPos := NewMessagePosition(baseTs.Add(-time.Nanosecond).UnixNano(), -2) - buf, _, err := logBuffer.ReadFromBuffer(startPos) - if err != nil { - t.Fatalf("ReadFromBuffer returned err: %v", err) - } - if buf == nil { - t.Fatalf("Expected in-memory data, got nil buffer") - } -} - -// TestEarliestTimeExactRead ensures starting exactly at earliest time returns first entry (no skip) -func TestEarliestTimeExactRead(t *testing.T) { - logBuffer := NewLogBuffer("local", 10*time.Minute, - func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}, - func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { - return startPosition, false, nil - }, - func() {}) - - ts := time.Now() - entry := &filer_pb.LogEntry{TsNs: ts.UnixNano(), Data: []byte("a"), Key: []byte("k"), Offset: 0} - logBuffer.AddLogEntryToBuffer(entry) - - startPos := NewMessagePosition(ts.UnixNano(), -2) - buf, _, err := logBuffer.ReadFromBuffer(startPos) - if err != nil { - t.Fatalf("ReadFromBuffer err: %v", err) - } - if buf == nil || buf.Len() == 0 { - t.Fatalf("Expected data at earliest time, got nil/empty") - } -} diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 950604022..3b7b99ada 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -270,7 +270,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star return } - // If we're reading offset-based and there's no data in LogBuffer, + // CRITICAL FIX: If we're reading offset-based and there's no data in LogBuffer, // return ResumeFromDiskError to let Subscribe try reading from disk again. // This prevents infinite blocking when all data is on disk (e.g., after restart). if startPosition.IsOffsetBased { @@ -355,6 +355,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star continue } + glog.V(4).Infof("Unmarshaled log entry %d: TsNs=%d, Offset=%d, Key=%s", batchSize+1, logEntry.TsNs, logEntry.Offset, string(logEntry.Key)) // Handle offset-based filtering for offset-based start positions @@ -376,7 +377,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star // println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs) return } - // Use logEntry.Offset + 1 to move PAST the current entry + // CRITICAL FIX: Use logEntry.Offset + 1 to move PAST the current entry // This prevents infinite loops where we keep requesting the same offset lastReadPosition = NewMessagePosition(logEntry.TsNs, logEntry.Offset+1) diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index abc7d9ac0..b48413bc8 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -121,7 +121,7 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages logBuffer.RUnlock() // Data not in memory - attempt disk read if configured - // Don't return error here - data may be on disk! + // CRITICAL FIX: Don't return error here - data may be on disk! // Fall through to disk read logic below glog.V(2).Infof("[StatelessRead] Data at offset %d not in memory (buffer: %d-%d), attempting disk read", startOffset, bufferStartOffset, currentBufferEnd)