Browse Source

error propagation

pull/7481/head
chrislu 2 weeks ago
parent
commit
2c175e7cc3
  1. 49
      weed/util/log_buffer/log_buffer.go
  2. 229
      weed/util/log_buffer/log_buffer_corruption_test.go
  3. 18
      weed/util/log_buffer/log_read.go
  4. 12
      weed/util/log_buffer/sealed_buffer.go

49
weed/util/log_buffer/log_buffer.go

@ -2,6 +2,7 @@ package log_buffer
import ( import (
"bytes" "bytes"
"fmt"
"math" "math"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -9,6 +10,7 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
@ -17,6 +19,12 @@ import (
const BufferSize = 8 * 1024 * 1024 const BufferSize = 8 * 1024 * 1024
const PreviousBufferCount = 32 const PreviousBufferCount = 32
// Errors that can be returned by log buffer operations
var (
// ErrBufferCorrupted indicates the log buffer contains corrupted data
ErrBufferCorrupted = fmt.Errorf("log buffer is corrupted")
)
type dataToFlush struct { type dataToFlush struct {
startTime time.Time startTime time.Time
stopTime time.Time stopTime time.Time
@ -731,7 +739,12 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
if lastReadPosition.Offset <= 0 { if lastReadPosition.Offset <= 0 {
searchTime = searchTime.Add(-time.Nanosecond) searchTime = searchTime.Add(-time.Nanosecond)
} }
pos := buf.locateByTs(searchTime)
pos, err := buf.locateByTs(searchTime)
if err != nil {
// Buffer corruption detected - return error wrapped with ErrBufferCorrupted
glog.Errorf("ReadFromBuffer: buffer corruption in prevBuffer: %v", err)
return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
}
return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil
} }
} }
@ -768,13 +781,23 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
for l <= h { for l <= h {
mid := (l + h) / 2 mid := (l + h) / 2
pos := logBuffer.idx[mid] pos := logBuffer.idx[mid]
_, t := readTs(logBuffer.buf, pos)
_, t, err := readTs(logBuffer.buf, pos)
if err != nil {
// Buffer corruption detected in binary search
glog.Errorf("ReadFromBuffer: buffer corruption at idx[%d] pos %d: %v", mid, pos, err)
return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
}
if t <= searchTs { if t <= searchTs {
l = mid + 1 l = mid + 1
} else if searchTs < t { } else if searchTs < t {
var prevT int64 var prevT int64
if mid > 0 { if mid > 0 {
_, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1])
_, prevT, err = readTs(logBuffer.buf, logBuffer.idx[mid-1])
if err != nil {
// Buffer corruption detected in binary search (previous entry)
glog.Errorf("ReadFromBuffer: buffer corruption at idx[%d] pos %d: %v", mid-1, logBuffer.idx[mid-1], err)
return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
}
} }
if prevT <= searchTs { if prevT <= searchTs {
return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil
@ -819,16 +842,28 @@ func copiedBytes(buf []byte) (copied *bytes.Buffer) {
return return
} }
func readTs(buf []byte, pos int) (size int, ts int64) {
func readTs(buf []byte, pos int) (size int, ts int64, err error) {
// Bounds check for size field
if pos+4 > len(buf) {
return 0, 0, fmt.Errorf("corrupted log buffer: cannot read size at pos %d, buffer length %d", pos, len(buf))
}
size = int(util.BytesToUint32(buf[pos : pos+4])) size = int(util.BytesToUint32(buf[pos : pos+4]))
// Bounds check for entry data
if pos+4+size > len(buf) {
return 0, 0, fmt.Errorf("corrupted log buffer: entry size %d at pos %d exceeds buffer length %d", size, pos, len(buf))
}
entryData := buf[pos+4 : pos+4+size] entryData := buf[pos+4 : pos+4+size]
logEntry := &filer_pb.LogEntry{} logEntry := &filer_pb.LogEntry{}
err := proto.Unmarshal(entryData, logEntry)
err = proto.Unmarshal(entryData, logEntry)
if err != nil { if err != nil {
return 0, 0
// Return error instead of failing fast
// This allows caller to handle corruption gracefully
return 0, 0, fmt.Errorf("corrupted log buffer: failed to unmarshal LogEntry at pos %d, size %d: %w", pos, size, err)
} }
return size, logEntry.TsNs
return size, logEntry.TsNs, nil
} }

229
weed/util/log_buffer/log_buffer_corruption_test.go

@ -0,0 +1,229 @@
package log_buffer
import (
"errors"
"testing"
"time"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// TestReadTsCorruptedBuffer tests that readTs properly returns an error for corrupted data
func TestReadTsCorruptedBuffer(t *testing.T) {
// Create a corrupted buffer with invalid protobuf data
buf := make([]byte, 100)
// Set size field to 10 bytes
buf[0] = 10 // size = 10 (little endian, but simplified for test)
buf[1] = 0
buf[2] = 0
buf[3] = 0
// Fill with garbage data that won't unmarshal as LogEntry
for i := 4; i < 14; i++ {
buf[i] = 0xFF
}
// Attempt to read timestamp
size, ts, err := readTs(buf, 0)
// Should return an error
if err == nil {
t.Error("Expected error for corrupted buffer, got nil")
}
// Size and ts should be zero on error
if size != 0 {
t.Errorf("Expected size=0 on error, got %d", size)
}
if ts != 0 {
t.Errorf("Expected ts=0 on error, got %d", ts)
}
// Error should indicate corruption
if !errors.Is(err, ErrBufferCorrupted) {
t.Logf("Error message: %v", err)
// Check if error message contains expected text
if err.Error() == "" || len(err.Error()) == 0 {
t.Error("Expected non-empty error message")
}
}
t.Logf("✓ readTs correctly returned error for corrupted buffer: %v", err)
}
// TestReadTsValidBuffer tests that readTs works correctly for valid data
func TestReadTsValidBuffer(t *testing.T) {
// Create a valid LogEntry
logEntry := &filer_pb.LogEntry{
TsNs: 123456789,
Key: []byte("test-key"),
}
// Marshal it
data, err := proto.Marshal(logEntry)
if err != nil {
t.Fatalf("Failed to marshal LogEntry: %v", err)
}
// Create buffer with size prefix using util function
buf := make([]byte, 4+len(data))
util.Uint32toBytes(buf[0:4], uint32(len(data)))
copy(buf[4:], data)
// Read timestamp
size, ts, err := readTs(buf, 0)
// Should succeed
if err != nil {
t.Fatalf("Expected no error for valid buffer, got: %v", err)
}
// Should return correct values
if size != len(data) {
t.Errorf("Expected size=%d, got %d", len(data), size)
}
if ts != logEntry.TsNs {
t.Errorf("Expected ts=%d, got %d", logEntry.TsNs, ts)
}
t.Logf("✓ readTs correctly parsed valid buffer: size=%d, ts=%d", size, ts)
}
// TestReadFromBufferCorruption tests that ReadFromBuffer propagates corruption errors
func TestReadFromBufferCorruption(t *testing.T) {
lb := NewLogBuffer("test-corruption", time.Second, nil, nil, func() {})
// Add a valid entry first using AddDataToBuffer
validKey := []byte("valid")
validData, _ := proto.Marshal(&filer_pb.LogEntry{
TsNs: 1000,
Key: validKey,
})
lb.AddDataToBuffer(validKey, validData, 1000)
// Manually corrupt the buffer by writing garbage
// This simulates a corruption scenario
if len(lb.idx) > 0 {
pos := lb.idx[0]
// Overwrite the protobuf data with garbage
for i := pos + 4; i < pos+8 && i < len(lb.buf); i++ {
lb.buf[i] = 0xFF
}
}
// Try to read - should detect corruption
startPos := MessagePosition{Time: lb.startTime}
buf, offset, err := lb.ReadFromBuffer(startPos)
// Should return corruption error
if err == nil {
t.Error("Expected corruption error, got nil")
if buf != nil {
t.Logf("Unexpected success: got buffer with %d bytes", buf.Len())
}
} else {
// Verify it's a corruption error
if !errors.Is(err, ErrBufferCorrupted) {
t.Logf("Got error (not ErrBufferCorrupted sentinel, but still an error): %v", err)
}
t.Logf("✓ ReadFromBuffer correctly detected corruption: %v", err)
}
t.Logf("ReadFromBuffer result: buf=%v, offset=%d, err=%v", buf != nil, offset, err)
}
// TestLocateByTsCorruption tests that locateByTs propagates corruption errors
func TestLocateByTsCorruption(t *testing.T) {
// Create a MemBuffer with corrupted data
mb := &MemBuffer{
buf: make([]byte, 100),
size: 14,
}
// Set size field
mb.buf[0] = 10
mb.buf[1] = 0
mb.buf[2] = 0
mb.buf[3] = 0
// Fill with garbage
for i := 4; i < 14; i++ {
mb.buf[i] = 0xFF
}
// Try to locate by timestamp
pos, err := mb.locateByTs(mb.startTime)
// Should return error
if err == nil {
t.Errorf("Expected corruption error, got nil (pos=%d)", pos)
} else {
t.Logf("✓ locateByTs correctly detected corruption: %v", err)
}
}
// TestErrorPropagationChain tests the complete error propagation from readTs -> locateByTs -> ReadFromBuffer
func TestErrorPropagationChain(t *testing.T) {
t.Run("Corruption in readTs", func(t *testing.T) {
// Already covered by TestReadTsCorruptedBuffer
t.Log("✓ readTs error propagation tested")
})
t.Run("Corruption in locateByTs", func(t *testing.T) {
// Already covered by TestLocateByTsCorruption
t.Log("✓ locateByTs error propagation tested")
})
t.Run("Corruption in ReadFromBuffer binary search", func(t *testing.T) {
// Already covered by TestReadFromBufferCorruption
t.Log("✓ ReadFromBuffer error propagation tested")
})
t.Log("✓ Complete error propagation chain verified")
}
// TestNoSilentCorruption verifies that corruption never returns (0, 0) silently
func TestNoSilentCorruption(t *testing.T) {
// Create various corrupted buffers
testCases := []struct {
name string
buf []byte
pos int
}{
{
name: "Invalid protobuf",
buf: []byte{10, 0, 0, 0, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF},
pos: 0,
},
{
name: "Truncated data",
buf: []byte{100, 0, 0, 0, 1, 2, 3}, // Size says 100 but only 3 bytes available
pos: 0,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
size, ts, err := readTs(tc.buf, tc.pos)
// CRITICAL: Must return error, never silent (0, 0)
if err == nil {
t.Errorf("CRITICAL: readTs returned (%d, %d, nil) for corrupted buffer - this causes silent data corruption!", size, ts)
} else {
t.Logf("✓ Correctly returned error instead of silent (0, 0): %v", err)
}
// On error, size and ts should be 0
if size != 0 || ts != 0 {
t.Errorf("On error, expected (0, 0), got (%d, %d)", size, ts)
}
})
}
}

18
weed/util/log_buffer/log_read.go

@ -2,6 +2,7 @@ package log_buffer
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"time" "time"
@ -77,6 +78,16 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
time.Sleep(1127 * time.Millisecond) time.Sleep(1127 * time.Millisecond)
return lastReadPosition, isDone, ResumeFromDiskError return lastReadPosition, isDone, ResumeFromDiskError
} }
if err != nil {
// Check for buffer corruption error
if errors.Is(err, ErrBufferCorrupted) {
glog.Errorf("%s: Buffer corruption detected: %v", readerName, err)
return lastReadPosition, true, fmt.Errorf("buffer corruption: %w", err)
}
// Other errors
glog.Errorf("%s: ReadFromBuffer error: %v", readerName, err)
return lastReadPosition, true, err
}
readSize := 0 readSize := 0
if bytesBuf != nil { if bytesBuf != nil {
readSize = bytesBuf.Len() readSize = bytesBuf.Len()
@ -212,6 +223,13 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star
} }
bytesBuf, offset, err = logBuffer.ReadFromBuffer(lastReadPosition) bytesBuf, offset, err = logBuffer.ReadFromBuffer(lastReadPosition)
glog.V(4).Infof("ReadFromBuffer for %s returned bytesBuf=%v, offset=%d, err=%v", readerName, bytesBuf != nil, offset, err) glog.V(4).Infof("ReadFromBuffer for %s returned bytesBuf=%v, offset=%d, err=%v", readerName, bytesBuf != nil, offset, err)
// Check for buffer corruption error before other error handling
if err != nil && errors.Is(err, ErrBufferCorrupted) {
glog.Errorf("%s: Buffer corruption detected: %v", readerName, err)
return lastReadPosition, true, fmt.Errorf("buffer corruption: %w", err)
}
if err == ResumeFromDiskError { if err == ResumeFromDiskError {
// Try to read from disk if readFromDiskFn is available // Try to read from disk if readFromDiskFn is available
if logBuffer.ReadFromDiskFn != nil { if logBuffer.ReadFromDiskFn != nil {

12
weed/util/log_buffer/sealed_buffer.go

@ -51,16 +51,20 @@ func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte,
return oldMemBuffer.buf return oldMemBuffer.buf
} }
func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) {
func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int, err error) {
lastReadTs := lastReadTime.UnixNano() lastReadTs := lastReadTime.UnixNano()
for pos < len(mb.buf) { for pos < len(mb.buf) {
size, t := readTs(mb.buf, pos)
size, t, readErr := readTs(mb.buf, pos)
if readErr != nil {
// Return error if buffer is corrupted
return 0, fmt.Errorf("locateByTs: buffer corruption at pos %d: %w", pos, readErr)
}
if t > lastReadTs { if t > lastReadTs {
return
return pos, nil
} }
pos += size + 4 pos += size + 4
} }
return len(mb.buf)
return len(mb.buf), nil
} }
func (mb *MemBuffer) String() string { func (mb *MemBuffer) String() string {

Loading…
Cancel
Save