From acc5f8110415a1b9650f7ca2d3d2ba6c5c68e249 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 13:38:00 -0700 Subject: [PATCH] Revert "fix: Add offset increment to AddDataToBuffer to prevent flush gaps" This reverts commit 2c28860aadbc598d22a94d048f03f1eac81d48cf. --- weed/util/log_buffer/log_buffer.go | 4 - .../log_buffer/log_buffer_flush_gap_test.go | 178 ------------------ 2 files changed, 182 deletions(-) diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 36a18c44a..4795227fa 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -417,10 +417,6 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) logBuffer.pos += size + 4 - // CRITICAL FIX: Increment offset to match AddLogEntryToBuffer behavior - // This ensures bufferStartOffset advances correctly after flush - // Without this, gaps are created between flushed and in-memory offsets - logBuffer.offset++ } func (logBuffer *LogBuffer) IsStopping() bool { diff --git a/weed/util/log_buffer/log_buffer_flush_gap_test.go b/weed/util/log_buffer/log_buffer_flush_gap_test.go index b05d94a3c..e00b3754f 100644 --- a/weed/util/log_buffer/log_buffer_flush_gap_test.go +++ b/weed/util/log_buffer/log_buffer_flush_gap_test.go @@ -334,184 +334,6 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { } } -// TestFlushOffsetGap_AddToBufferDoesNotIncrementOffset reproduces THE ACTUAL BUG -// The broker uses AddToBuffer() which calls AddDataToBuffer() -// AddDataToBuffer() does NOT increment logBuffer.offset! -// When flush happens, bufferStartOffset = stale offset → GAP! -func TestFlushOffsetGap_AddToBufferDoesNotIncrementOffset(t *testing.T) { - flushedRanges := []struct{ min, max int64 }{} - var flushMu sync.Mutex - - flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { - flushMu.Lock() - flushedRanges = append(flushedRanges, struct{ min, max int64 }{minOffset, maxOffset}) - flushMu.Unlock() - t.Logf("FLUSH: offsets %d-%d (%d bytes)", minOffset, maxOffset, len(buf)) - } - - logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) - defer logBuffer.ShutdownLogBuffer() - - // Manually initialize offset to simulate broker state after restart - // (e.g., broker finds 1000 messages on disk, sets offset=1000) - logBuffer.Lock() - logBuffer.offset = 1000 - logBuffer.bufferStartOffset = 1000 - logBuffer.Unlock() - - t.Logf("Initial state: offset=%d, bufferStartOffset=%d", 1000, 1000) - - // Now add messages using AddToBuffer (like the broker does) - // This is the REAL PRODUCTION PATH - for i := 0; i < 100; i++ { - message := &mq_pb.DataMessage{ - Key: []byte(fmt.Sprintf("key-%d", i)), - Value: []byte(fmt.Sprintf("message-%d", i)), - TsNs: time.Now().UnixNano(), - } - logBuffer.AddToBuffer(message) // ← BUG: Does NOT increment logBuffer.offset! - } - - // Check state before flush - logBuffer.RLock() - beforeFlushOffset := logBuffer.offset - beforeFlushStart := logBuffer.bufferStartOffset - logBuffer.RUnlock() - - t.Logf("Before flush: offset=%d, bufferStartOffset=%d", beforeFlushOffset, beforeFlushStart) - - // Force flush - logBuffer.ForceFlush() - time.Sleep(100 * time.Millisecond) - - // Check state after flush - logBuffer.RLock() - afterFlushOffset := logBuffer.offset - afterFlushStart := logBuffer.bufferStartOffset - logBuffer.RUnlock() - - flushMu.Lock() - if len(flushedRanges) > 0 { - flushedMin := flushedRanges[0].min - flushedMax := flushedRanges[0].max - flushMu.Unlock() - - t.Logf("After flush: offset=%d, bufferStartOffset=%d", afterFlushOffset, afterFlushStart) - t.Logf("Flushed range: %d-%d (but these are minOffset/maxOffset, which are 0!)", flushedMin, flushedMax) - - // Expected behavior: offset should increment from 1000 to 1100 - expectedOffsetAfterAdd := int64(1100) - expectedBufferStartAfterFlush := int64(1100) - - // Check if offset was incremented - if beforeFlushOffset != expectedOffsetAfterAdd { - t.Errorf("") - t.Errorf("❌❌❌ BUG: offset not incremented correctly ❌❌❌") - t.Errorf(" Expected offset after adding 100 messages: %d", expectedOffsetAfterAdd) - t.Errorf(" Actual offset: %d", beforeFlushOffset) - t.Errorf(" This means AddDataToBuffer() is not incrementing offset!") - } else { - t.Logf("✅ offset correctly incremented to %d", beforeFlushOffset) - } - - // Check if bufferStartOffset was set correctly after flush - if afterFlushStart != expectedBufferStartAfterFlush { - t.Errorf("") - t.Errorf("❌❌❌ BUG: bufferStartOffset not set correctly after flush ❌❌❌") - t.Errorf(" Expected bufferStartOffset after flush: %d", expectedBufferStartAfterFlush) - t.Errorf(" Actual bufferStartOffset: %d", afterFlushStart) - t.Errorf(" Gap: %d offsets will be LOST!", expectedBufferStartAfterFlush-afterFlushStart) - } else { - t.Logf("✅ bufferStartOffset correctly set to %d after flush", afterFlushStart) - } - - // Overall verdict - if beforeFlushOffset == expectedOffsetAfterAdd && afterFlushStart == expectedBufferStartAfterFlush { - t.Logf("") - t.Logf("🎉🎉🎉 FIX VERIFIED: Buffer flush offset gap bug is FIXED! 🎉🎉🎉") - } - } else { - flushMu.Unlock() - t.Error("No flush occurred!") - } -} - -// TestFlushOffsetGap_WithExplicitOffsets reproduces the bug using explicit offset assignment -// This matches how the MQ broker uses LogBuffer in production. -func TestFlushOffsetGap_WithExplicitOffsets(t *testing.T) { - flushedRanges := []struct{ min, max int64 }{} - var flushMu sync.Mutex - - flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { - flushMu.Lock() - flushedRanges = append(flushedRanges, struct{ min, max int64 }{minOffset, maxOffset}) - flushMu.Unlock() - t.Logf("FLUSH: offsets %d-%d (%d bytes)", minOffset, maxOffset, len(buf)) - } - - logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) - defer logBuffer.ShutdownLogBuffer() - - // Simulate MQ broker behavior: add messages with explicit offsets - for i := int64(0); i < 30; i++ { - logEntry := &filer_pb.LogEntry{ - Key: []byte(fmt.Sprintf("key-%d", i)), - Data: []byte(fmt.Sprintf("message-%d", i)), - TsNs: time.Now().UnixNano(), - Offset: i, // EXPLICIT OFFSET - } - logBuffer.AddLogEntryToBuffer(logEntry) - } - - // Check state before flush - logBuffer.RLock() - beforeFlushOffset := logBuffer.offset - beforeFlushStart := logBuffer.bufferStartOffset - logBuffer.RUnlock() - - t.Logf("Before flush: offset=%d, bufferStartOffset=%d", beforeFlushOffset, beforeFlushStart) - - // Force flush - logBuffer.ForceFlush() - time.Sleep(100 * time.Millisecond) - - // Check state after flush - logBuffer.RLock() - afterFlushOffset := logBuffer.offset - afterFlushStart := logBuffer.bufferStartOffset - logBuffer.RUnlock() - - flushMu.Lock() - if len(flushedRanges) > 0 { - flushedMin := flushedRanges[0].min - flushedMax := flushedRanges[0].max - flushMu.Unlock() - - t.Logf("After flush: offset=%d, bufferStartOffset=%d", afterFlushOffset, afterFlushStart) - t.Logf("Flushed range: %d-%d", flushedMin, flushedMax) - - // CRITICAL BUG CHECK: bufferStartOffset should be flushedMax + 1 - expectedBufferStart := flushedMax + 1 - if afterFlushStart != expectedBufferStart { - t.Errorf("❌ CRITICAL BUG REPRODUCED!") - t.Errorf(" Flushed messages: offsets %d-%d", flushedMin, flushedMax) - t.Errorf(" Expected bufferStartOffset: %d (= maxOffset + 1)", expectedBufferStart) - t.Errorf(" Actual bufferStartOffset: %d", afterFlushStart) - t.Errorf(" This creates a GAP: offsets %d-%d will be LOST!", expectedBufferStart, afterFlushStart-1) - t.Errorf("") - t.Errorf("ROOT CAUSE: logBuffer.offset is not updated when AddLogEntryToBuffer is called") - t.Errorf(" AddLogEntryToBuffer only updates minOffset/maxOffset") - t.Errorf(" But copyToFlush sets bufferStartOffset = logBuffer.offset") - t.Errorf(" Since logBuffer.offset is stale, a gap is created!") - } else { - t.Logf("✅ bufferStartOffset correctly set to %d", afterFlushStart) - } - } else { - flushMu.Unlock() - t.Error("No flush occurred!") - } -} - // TestFlushOffsetGap_ForceFlushAdvancesBuffer tests if ForceFlush // properly advances bufferStartOffset after flushing. func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) {