Browse Source
feat: add real v2bridge integration tests against file-backed BlockVol
feat: add real v2bridge integration tests against file-backed BlockVol
7 tests in weed/storage/blockvol/v2bridge/bridge_test.go: Reader (2 tests): - StatusSnapshot reads real nextLSN, WALCheckpointLSN, flusher state - HeadLSN advances with real writes Pinner (2 tests): - HoldWALRetention: hold tracked, MinWALRetentionFloor reports position, release clears hold - HoldRejectsRecycled: validates against real WAL tail Executor (2 tests): - StreamWALEntries: real ScanFrom reads WAL entries from disk - StreamPartialRange: partial range scan works Stubs (1 test): - TransferSnapshot/TransferFullBase/TruncateWAL return not-implemented All tests use createTestVol (1MB file-backed BlockVol with 256KB WAL). No mock/push adapters — direct real blockvol instances. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>feature/sw-block
1 changed files with 231 additions and 0 deletions
@ -0,0 +1,231 @@ |
|||
package v2bridge |
|||
|
|||
import ( |
|||
"path/filepath" |
|||
"testing" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol" |
|||
) |
|||
|
|||
// createTestVol creates a real file-backed BlockVol for integration tests.
|
|||
func createTestVol(t *testing.T) *blockvol.BlockVol { |
|||
t.Helper() |
|||
dir := t.TempDir() |
|||
path := filepath.Join(dir, "test.blockvol") |
|||
v, err := blockvol.CreateBlockVol(path, blockvol.CreateOptions{ |
|||
VolumeSize: 1 * 1024 * 1024, |
|||
BlockSize: 4096, |
|||
WALSize: 256 * 1024, |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("CreateBlockVol: %v", err) |
|||
} |
|||
return v |
|||
} |
|||
|
|||
func makeBlock(fill byte) []byte { |
|||
b := make([]byte, 4096) |
|||
for i := range b { |
|||
b[i] = fill |
|||
} |
|||
return b |
|||
} |
|||
|
|||
// --- Real Reader ---
|
|||
|
|||
func TestReader_RealBlockVol_StatusSnapshot(t *testing.T) { |
|||
vol := createTestVol(t) |
|||
defer vol.Close() |
|||
|
|||
reader := NewReader(vol) |
|||
|
|||
// Before any writes: head=0, tail=0, committed=0.
|
|||
state := reader.ReadState() |
|||
if state.WALHeadLSN != 0 { |
|||
t.Fatalf("initial HeadLSN=%d, want 0", state.WALHeadLSN) |
|||
} |
|||
|
|||
// Write some data.
|
|||
vol.WriteLBA(0, makeBlock('A')) |
|||
vol.WriteLBA(1, makeBlock('B')) |
|||
vol.SyncCache() |
|||
|
|||
state = reader.ReadState() |
|||
if state.WALHeadLSN < 2 { |
|||
t.Fatalf("after writes: HeadLSN=%d, want >= 2", state.WALHeadLSN) |
|||
} |
|||
// CommittedLSN should reflect flushed state.
|
|||
if state.CommittedLSN == 0 { |
|||
// After SyncCache, the flusher should have checkpointed.
|
|||
// This may or may not be > 0 depending on flusher timing.
|
|||
t.Log("CommittedLSN=0 after SyncCache (flusher may not have run yet)") |
|||
} |
|||
// WALTailLSN is an LSN boundary (from super.WALCheckpointLSN).
|
|||
// It should be 0 initially (nothing checkpointed yet).
|
|||
if state.WALTailLSN != 0 { |
|||
t.Logf("WALTailLSN=%d (checkpoint advanced)", state.WALTailLSN) |
|||
} |
|||
} |
|||
|
|||
func TestReader_RealBlockVol_HeadAdvancesWithWrites(t *testing.T) { |
|||
vol := createTestVol(t) |
|||
defer vol.Close() |
|||
|
|||
reader := NewReader(vol) |
|||
|
|||
state0 := reader.ReadState() |
|||
vol.WriteLBA(0, makeBlock('X')) |
|||
state1 := reader.ReadState() |
|||
|
|||
if state1.WALHeadLSN <= state0.WALHeadLSN { |
|||
t.Fatalf("HeadLSN should advance: before=%d after=%d", |
|||
state0.WALHeadLSN, state1.WALHeadLSN) |
|||
} |
|||
} |
|||
|
|||
// --- Real Pinner ---
|
|||
|
|||
func TestPinner_RealBlockVol_HoldWALRetention(t *testing.T) { |
|||
vol := createTestVol(t) |
|||
defer vol.Close() |
|||
|
|||
pinner := NewPinner(vol) |
|||
|
|||
// Write data so WAL has entries.
|
|||
vol.WriteLBA(0, makeBlock('A')) |
|||
vol.WriteLBA(1, makeBlock('B')) |
|||
|
|||
// Hold WAL from LSN 0 (should succeed — nothing recycled yet).
|
|||
release, err := pinner.HoldWALRetention(0) |
|||
if err != nil { |
|||
t.Fatalf("HoldWALRetention: %v", err) |
|||
} |
|||
|
|||
if pinner.ActiveHoldCount() != 1 { |
|||
t.Fatalf("holds=%d, want 1", pinner.ActiveHoldCount()) |
|||
} |
|||
|
|||
// MinWALRetentionFloor should report the held position.
|
|||
floor, hasFloor := pinner.MinWALRetentionFloor() |
|||
if !hasFloor || floor != 0 { |
|||
t.Fatalf("floor=%d hasFloor=%v, want 0/true", floor, hasFloor) |
|||
} |
|||
|
|||
// Release.
|
|||
release() |
|||
|
|||
if pinner.ActiveHoldCount() != 0 { |
|||
t.Fatal("hold should be released") |
|||
} |
|||
|
|||
_, hasFloor = pinner.MinWALRetentionFloor() |
|||
if hasFloor { |
|||
t.Fatal("no floor after release") |
|||
} |
|||
} |
|||
|
|||
func TestPinner_RealBlockVol_HoldRejectsRecycled(t *testing.T) { |
|||
vol := createTestVol(t) |
|||
defer vol.Close() |
|||
|
|||
// Write + flush to advance the checkpoint (WAL tail).
|
|||
for i := 0; i < 10; i++ { |
|||
vol.WriteLBA(uint64(i), makeBlock(byte('A'+i))) |
|||
} |
|||
vol.SyncCache() |
|||
|
|||
pinner := NewPinner(vol) |
|||
|
|||
// Check current tail.
|
|||
state := NewReader(vol).ReadState() |
|||
if state.WALTailLSN > 0 { |
|||
// Tail advanced — try to hold below it.
|
|||
_, err := pinner.HoldWALRetention(0) |
|||
if err == nil { |
|||
t.Fatal("should reject hold below recycled tail") |
|||
} |
|||
} else { |
|||
t.Log("WALTailLSN=0, checkpoint not advanced (hold from 0 is valid)") |
|||
} |
|||
} |
|||
|
|||
// --- Real Executor ---
|
|||
|
|||
func TestExecutor_RealBlockVol_StreamWALEntries(t *testing.T) { |
|||
vol := createTestVol(t) |
|||
defer vol.Close() |
|||
|
|||
// Write entries.
|
|||
vol.WriteLBA(0, makeBlock('A')) |
|||
vol.WriteLBA(1, makeBlock('B')) |
|||
vol.WriteLBA(2, makeBlock('C')) |
|||
|
|||
reader := NewReader(vol) |
|||
state := reader.ReadState() |
|||
headLSN := state.WALHeadLSN |
|||
if headLSN < 3 { |
|||
t.Fatalf("HeadLSN=%d, want >= 3", headLSN) |
|||
} |
|||
|
|||
executor := NewExecutor(vol) |
|||
|
|||
// Stream from start to head.
|
|||
transferred, err := executor.StreamWALEntries(0, headLSN) |
|||
if err != nil { |
|||
t.Fatalf("StreamWALEntries: %v", err) |
|||
} |
|||
if transferred == 0 { |
|||
t.Fatal("should have transferred entries") |
|||
} |
|||
t.Logf("streamed: transferred to LSN %d (head=%d)", transferred, headLSN) |
|||
} |
|||
|
|||
func TestExecutor_RealBlockVol_StreamPartialRange(t *testing.T) { |
|||
vol := createTestVol(t) |
|||
defer vol.Close() |
|||
|
|||
// Write 5 entries.
|
|||
for i := 0; i < 5; i++ { |
|||
vol.WriteLBA(uint64(i), makeBlock(byte('A'+i))) |
|||
} |
|||
|
|||
reader := NewReader(vol) |
|||
state := reader.ReadState() |
|||
|
|||
executor := NewExecutor(vol) |
|||
|
|||
// Stream only entries 2-4 (partial range).
|
|||
startLSN := uint64(1) // exclusive: start after LSN 1
|
|||
endLSN := uint64(3) // inclusive
|
|||
if endLSN > state.WALHeadLSN { |
|||
endLSN = state.WALHeadLSN |
|||
} |
|||
|
|||
transferred, err := executor.StreamWALEntries(startLSN, endLSN) |
|||
if err != nil { |
|||
t.Fatalf("StreamWALEntries partial: %v", err) |
|||
} |
|||
if transferred == 0 { |
|||
t.Fatal("should have transferred partial entries") |
|||
} |
|||
t.Logf("partial stream: %d→%d, transferred to %d", startLSN, endLSN, transferred) |
|||
} |
|||
|
|||
// --- Stubs remain stubs ---
|
|||
|
|||
func TestExecutor_Stubs_ReturnError(t *testing.T) { |
|||
vol := createTestVol(t) |
|||
defer vol.Close() |
|||
|
|||
executor := NewExecutor(vol) |
|||
|
|||
if err := executor.TransferSnapshot(50); err == nil { |
|||
t.Fatal("TransferSnapshot should be stub") |
|||
} |
|||
if err := executor.TransferFullBase(100); err == nil { |
|||
t.Fatal("TransferFullBase should be stub") |
|||
} |
|||
if err := executor.TruncateWAL(50); err == nil { |
|||
t.Fatal("TruncateWAL should be stub") |
|||
} |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue