Browse Source

feat: Phase 4A CP4a -- simulated master, assignment sequence tests, BlockVolumeStatus

Add SimulatedMaster test helper + 20 assignment sequence tests (8 sequence,
5 failover, 5 adversarial, 2 status). Add BlockVolumeStatus struct and
Status() method. Includes QA test files for CP1-CP4a. 940 total unit tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
09c7e40d29
  1. 40
      weed/storage/blockvol/blockvol.go
  2. 136
      weed/storage/blockvol/blockvol_qa_test.go
  3. 862
      weed/storage/blockvol/blockvol_test.go
  4. 2
      weed/storage/blockvol/dist_group_commit.go
  5. 6
      weed/storage/blockvol/flusher.go
  6. 2
      weed/storage/blockvol/flusher_test.go
  7. 2
      weed/storage/blockvol/group_commit.go
  8. 4
      weed/storage/blockvol/iscsi/bug_dataout_timeout_test.go
  9. 6
      weed/storage/blockvol/iscsi/cmd/iscsi-target/smoke-test.sh
  10. 8
      weed/storage/blockvol/iscsi/dataio.go
  11. 2
      weed/storage/blockvol/iscsi/discovery.go
  12. 2
      weed/storage/blockvol/iscsi/discovery_test.go
  13. 6
      weed/storage/blockvol/iscsi/integration_test.go
  14. 2
      weed/storage/blockvol/iscsi/login.go
  15. 2
      weed/storage/blockvol/iscsi/pdu.go
  16. 4
      weed/storage/blockvol/iscsi/pdu_test.go
  17. 4
      weed/storage/blockvol/iscsi/qa_test.go
  18. 18
      weed/storage/blockvol/iscsi/scsi.go
  19. 4
      weed/storage/blockvol/iscsi/scsi_test.go
  20. 2
      weed/storage/blockvol/iscsi/session.go
  21. 26
      weed/storage/blockvol/iscsi/session_test.go
  22. 6
      weed/storage/blockvol/iscsi/target_test.go
  23. 11
      weed/storage/blockvol/promotion.go
  24. 56
      weed/storage/blockvol/qa_phase4a_cp1_test.go
  25. 84
      weed/storage/blockvol/qa_phase4a_cp2_test.go
  26. 1088
      weed/storage/blockvol/qa_phase4a_cp3_test.go
  27. 840
      weed/storage/blockvol/qa_phase4a_cp4a_test.go
  28. 2
      weed/storage/blockvol/rebuild.go
  29. 2
      weed/storage/blockvol/recovery.go
  30. 10
      weed/storage/blockvol/repl_proto.go
  31. 2
      weed/storage/blockvol/replica_apply.go
  32. 2
      weed/storage/blockvol/replica_barrier.go
  33. 2
      weed/storage/blockvol/role.go
  34. 2
      weed/storage/blockvol/scripts/sw-block-attach.sh
  35. 4
      weed/storage/blockvol/wal_writer.go

40
weed/storage/blockvol/blockvol.go

@ -378,13 +378,13 @@ func (v *BlockVol) readBlockFromWAL(walOff uint64, lba uint64, expectedLSN uint6
// WAL reuse guard: validate LSN before trusting the entry.
// If the flusher reclaimed this slot and a new write reused it,
// the LSN will differ fall back to extent read.
// the LSN will differ --fall back to extent read.
entryLSN := binary.LittleEndian.Uint64(headerBuf[0:8])
if entryLSN != expectedLSN {
return nil, true, nil // stale WAL slot reused
return nil, true, nil // stale --WAL slot reused
}
// Check entry type first TRIM has no data payload, so Length is
// Check entry type first --TRIM has no data payload, so Length is
// metadata (trim extent), not a data size to allocate.
entryType := headerBuf[16] // Type is at offset LSN(8) + Epoch(8) = 16
if entryType == EntryTypeTrim {
@ -392,14 +392,14 @@ func (v *BlockVol) readBlockFromWAL(walOff uint64, lba uint64, expectedLSN uint6
return make([]byte, v.super.BlockSize), false, nil
}
if entryType != EntryTypeWrite {
// Unexpected type at a supposedly valid offset treat as stale.
// Unexpected type at a supposedly valid offset --treat as stale.
return nil, true, nil
}
// Parse and validate the data Length field before allocating (WRITE only).
dataLen := v.parseDataLength(headerBuf)
if uint64(dataLen) > v.super.WALSize || uint64(dataLen) > maxWALEntryDataLen {
// LSN matched but length is corrupt real data integrity error.
// LSN matched but length is corrupt --real data integrity error.
return nil, false, fmt.Errorf("readBlockFromWAL: corrupt entry length %d exceeds WAL size %d", dataLen, v.super.WALSize)
}
@ -411,17 +411,17 @@ func (v *BlockVol) readBlockFromWAL(walOff uint64, lba uint64, expectedLSN uint6
entry, err := DecodeWALEntry(fullBuf)
if err != nil {
// LSN matched but CRC failed real corruption.
// LSN matched but CRC failed --real corruption.
return nil, false, fmt.Errorf("readBlockFromWAL: decode: %w", err)
}
// Final guard: verify the entry actually covers this LBA.
if lba < entry.LBA {
return nil, true, nil // stale different entry at same offset
return nil, true, nil // stale --different entry at same offset
}
blockOffset := (lba - entry.LBA) * uint64(v.super.BlockSize)
if blockOffset+uint64(v.super.BlockSize) > uint64(len(entry.Data)) {
return nil, true, nil // stale LBA out of range
return nil, true, nil // stale --LBA out of range
}
block := make([]byte, v.super.BlockSize)
@ -584,6 +584,30 @@ func (v *BlockVol) degradeReplica(err error) {
log.Printf("blockvol: replica degraded: %v", err)
}
// BlockVolumeStatus contains block volume state for heartbeat reporting.
type BlockVolumeStatus struct {
Epoch uint64
WALHeadLSN uint64
Role Role
CheckpointLSN uint64
HasLease bool
}
// Status returns the current block volume status for heartbeat reporting.
func (v *BlockVol) Status() BlockVolumeStatus {
var cpLSN uint64
if v.flusher != nil {
cpLSN = v.flusher.CheckpointLSN()
}
return BlockVolumeStatus{
Epoch: v.epoch.Load(),
WALHeadLSN: v.nextLSN.Load() - 1,
Role: v.Role(),
CheckpointLSN: cpLSN,
HasLease: v.lease.IsValid(),
}
}
// Close shuts down the block volume and closes the file.
// Shutdown order: shipper -> replica receiver -> rebuild server -> drain ops -> group committer -> flusher -> final flush -> close fd.
func (v *BlockVol) Close() error {

136
weed/storage/blockvol/blockvol_qa_test.go

@ -1,6 +1,6 @@
package blockvol
// QA adversarial tests written by QA Manager (separate from dev team's unit tests).
// QA adversarial tests -- written by QA Manager (separate from dev team's unit tests).
// Attack vectors: boundary conditions, multi-block I/O, trim semantics,
// concurrency, oracle pattern, corruption injection, lifecycle edge cases.
@ -69,7 +69,7 @@ func testQAMultiBlockWriteReadMiddle(t *testing.T) {
t.Fatalf("WriteLBA: %v", err)
}
// Read only block at LBA 6 (the middle one should be 'B')
// Read only block at LBA 6 (the middle one -- should be 'B')
got, err := v.ReadLBA(6, 4096)
if err != nil {
t.Fatalf("ReadLBA(6): %v", err)
@ -78,7 +78,7 @@ func testQAMultiBlockWriteReadMiddle(t *testing.T) {
t.Errorf("middle block: got %q..., want all 'B'", got[:8])
}
// Read only block at LBA 7 (last should be 'C')
// Read only block at LBA 7 (last -- should be 'C')
got, err = v.ReadLBA(7, 4096)
if err != nil {
t.Fatalf("ReadLBA(7): %v", err)
@ -99,7 +99,7 @@ func testQAMultiBlockWriteReadMiddle(t *testing.T) {
// --- Trim semantics ---
// testQATrimThenReadZeros: Write a block, trim it, read back must get zeros.
// testQATrimThenReadZeros: Write a block, trim it, read back -- must get zeros.
func testQATrimThenReadZeros(t *testing.T) {
v := createTestVol(t)
defer v.Close()
@ -123,7 +123,7 @@ func testQATrimThenReadZeros(t *testing.T) {
t.Fatalf("Trim: %v", err)
}
// Read after trim must be zeros (from extent, which was never written).
// Read after trim -- must be zeros (from extent, which was never written).
got, err = v.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA after trim: %v", err)
@ -223,7 +223,7 @@ func testQAOverwriteWider(t *testing.T) {
t.Fatalf("WriteLBA 2-block: %v", err)
}
// Read LBA 0 should be 'X' (not 'A').
// Read LBA 0 -- should be 'X' (not 'A').
got, err := v.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA(0): %v", err)
@ -232,7 +232,7 @@ func testQAOverwriteWider(t *testing.T) {
t.Error("LBA 0 should be 'X' after wider overwrite")
}
// Read LBA 1 should be 'Y'.
// Read LBA 1 -- should be 'Y'.
got, err = v.ReadLBA(1, 4096)
if err != nil {
t.Fatalf("ReadLBA(1): %v", err)
@ -278,7 +278,7 @@ func testQAOverwriteNarrower(t *testing.T) {
t.Fatalf("ReadLBA(1): %v", err)
}
if !bytes.Equal(got, makeBlock('B')) {
t.Error("LBA 1 should still be 'B' narrower overwrite shouldn't touch it")
t.Error("LBA 1 should still be 'B' -- narrower overwrite shouldn't touch it")
}
}
@ -467,7 +467,7 @@ func testQAWALFillAdvanceRefill(t *testing.T) {
halfEntries := uint64(lastOK/2+1) * entrySize
v.wal.AdvanceTail(halfEntries)
// Write more should succeed now.
// Write more -- should succeed now.
for i := 0; i < 5; i++ {
if err := v.WriteLBA(uint64(i), makeBlock(byte('a'+i))); err != nil {
t.Fatalf("write after tail advance %d: %v", i, err)
@ -554,7 +554,7 @@ func testQAValidateWriteZeroLength(t *testing.T) {
// This should arguably be rejected, but current code may allow it.
// If it's allowed, at least it shouldn't crash.
if err != nil {
// Good zero-length writes rejected.
// Good -- zero-length writes rejected.
return
}
// If allowed, WriteLBA with empty data should be caught by WAL entry validation.
@ -888,7 +888,7 @@ func TestQAWALWriterEdgeCases(t *testing.T) {
walSize := entrySize + uint64(walEntryHeaderSize) - 5
// gap = walSize - entrySize = walEntryHeaderSize - 5 = 33 bytes
// free after wrap = walSize - gap = entrySize = 102 bytes
// entrySize = 102. free == entrySize fits with logical tracking (uses >, not <)
// entrySize = 102. free == entrySize -> fits with logical tracking (uses >, not <)
fd, cleanup := createTestWAL(t, walOffset, walSize)
defer cleanup()
@ -932,7 +932,7 @@ func TestQAWALWriterEdgeCases(t *testing.T) {
t.Fatalf("first append: %v", err)
}
// Write second entry pushes head to 2*entrySize = 204.
// Write second entry -- pushes head to 2*entrySize = 204.
entry2 := &WALEntry{LSN: 2, Type: EntryTypeWrite, LBA: 1, Length: 64, Data: make([]byte, 64)}
if _, err := w.Append(entry2); err != nil {
t.Fatalf("second append: %v", err)
@ -942,8 +942,8 @@ func TestQAWALWriterEdgeCases(t *testing.T) {
w.AdvanceTail(entrySize * 2)
// remaining = walSize - 204 = 30 bytes (< walEntryHeaderSize=38)
// padding uses zero-fill path, head wraps to 0
// → free = tail - head = 204 - 0 = 204 > 102 → fits!
// -> padding uses zero-fill path, head wraps to 0
// -> free = tail - head = 204 - 0 = 204 > 102 -> fits!
entry3 := &WALEntry{LSN: 3, Type: EntryTypeWrite, LBA: 2, Length: 64, Data: make([]byte, 64)}
off, err := w.Append(entry3)
if err != nil {
@ -994,11 +994,11 @@ func TestQAWALEntryEdgeCases(t *testing.T) {
})
t.Run("unknown_entry_type_encode", func(t *testing.T) {
// Type 0x99 is not recognized Encode should still work
// Type 0x99 is not recognized -- Encode should still work
// (only WRITE/TRIM/BARRIER have special validation).
entry := &WALEntry{LSN: 1, Type: 0x99, LBA: 0}
_, err := entry.Encode()
// Unknown type with no data may or may not error.
// Unknown type with no data -- may or may not error.
// Just verify no panic.
_ = err
})
@ -1012,7 +1012,7 @@ func min(a, b int) int {
}
// ============================================================================
// QA Adversarial Tests Tasks 1.7 (GroupCommitter), 1.8 (Flusher), 1.9 (Recovery)
// QA Adversarial Tests -- Tasks 1.7 (GroupCommitter), 1.8 (Flusher), 1.9 (Recovery)
// ============================================================================
// --- Task 1.7: GroupCommitter adversarial tests ---
@ -1046,7 +1046,7 @@ func testQAGCDoubleStop(t *testing.T) {
gc.Stop()
// Second stop must not panic or deadlock.
// Second stop -- must not panic or deadlock.
done := make(chan struct{})
go func() {
gc.Stop()
@ -1062,7 +1062,7 @@ func testQAGCDoubleStop(t *testing.T) {
}
// testQAGCSubmitStormDuringStop: Many goroutines Submit() while Stop() is called.
// All must either succeed or get ErrGroupCommitShutdown no panics, no deadlocks.
// All must either succeed or get ErrGroupCommitShutdown -- no panics, no deadlocks.
//
// BUG QA-002: There is a race between drainPending() and close(gc.done) in Run().
// Goroutines that pass the gc.done double-check AFTER drainPending() releases gc.mu
@ -1070,9 +1070,9 @@ func testQAGCDoubleStop(t *testing.T) {
// causing a permanent hang on <-ch in Submit().
//
// Race sequence:
// 1. Run(): drainPending() → gc.mu.Lock → take pending → gc.mu.Unlock → send errors
// 1. Run(): drainPending() -> gc.mu.Lock -> take pending -> gc.mu.Unlock -> send errors
// 2. Submit(): passes first select<-gc.done (not closed yet)
// 3. Submit(): gc.mu.Lock → passes second select<-gc.done → append ch → gc.mu.Unlock
// 3. Submit(): gc.mu.Lock -> passes second select<-gc.done -> append ch -> gc.mu.Unlock
// 4. Run(): close(gc.done) ← too late, ch already enqueued with no consumer
// 5. Submit(): <-ch blocks forever
func testQAGCSubmitStormDuringStop(t *testing.T) {
@ -1120,7 +1120,7 @@ func testQAGCSubmitStormDuringStop(t *testing.T) {
// All goroutines exited cleanly.
case <-time.After(5 * time.Second):
// QA-002: Submit() goroutines stuck waiting for response after Stop().
t.Fatal("BUG QA-002: Submit() goroutines deadlocked during Stop() " +
t.Fatal("BUG QA-002: Submit() goroutines deadlocked during Stop() -- " +
"drainPending/close(done) race allows enqueue after drain")
}
@ -1208,7 +1208,7 @@ func testQAGCMaxBatchExact(t *testing.T) {
const maxBatch = 8
gc := NewGroupCommitter(GroupCommitterConfig{
SyncFunc: func() error { return nil },
MaxDelay: 10 * time.Second, // very long should NOT wait
MaxDelay: 10 * time.Second, // very long -- should NOT wait
MaxBatch: maxBatch,
})
go gc.Run()
@ -1231,7 +1231,7 @@ func testQAGCMaxBatchExact(t *testing.T) {
select {
case <-done:
// Good completed quickly.
// Good -- completed quickly.
case <-time.After(3 * time.Second):
t.Fatal("maxBatch exact count did not trigger immediate flush")
}
@ -1314,7 +1314,7 @@ func testQAFlushEmptyDirtyMap(t *testing.T) {
v, f := createTestVolWithFlusher(t)
defer v.Close()
// No writes flush should not error or change anything.
// No writes -- flush should not error or change anything.
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce on empty: %v", err)
}
@ -1334,7 +1334,7 @@ func testQAFlushOverwriteDuringFlush(t *testing.T) {
t.Fatalf("WriteLBA(A): %v", err)
}
// Flush moves 'A' to extent.
// Flush -- moves 'A' to extent.
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce 1: %v", err)
}
@ -1353,7 +1353,7 @@ func testQAFlushOverwriteDuringFlush(t *testing.T) {
t.Error("before second flush: should read 'B' from WAL")
}
// Flush again moves 'B' to extent.
// Flush again -- moves 'B' to extent.
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce 2: %v", err)
}
@ -1379,7 +1379,7 @@ func testQAFlushTrimZerosExtent(t *testing.T) {
t.Fatalf("WriteLBA: %v", err)
}
// Flush 'X' goes to extent.
// Flush -- 'X' goes to extent.
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce 1: %v", err)
}
@ -1398,7 +1398,7 @@ func testQAFlushTrimZerosExtent(t *testing.T) {
t.Fatalf("Trim: %v", err)
}
// Read from dirty map (TRIM entry) should return zeros.
// Read from dirty map (TRIM entry) -- should return zeros.
got, err = v.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA after trim: %v", err)
@ -1407,7 +1407,7 @@ func testQAFlushTrimZerosExtent(t *testing.T) {
t.Error("after trim: dirty map read should return zeros")
}
// Flush again flusher zeros the extent.
// Flush again -- flusher zeros the extent.
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce 2: %v", err)
}
@ -1417,7 +1417,7 @@ func testQAFlushTrimZerosExtent(t *testing.T) {
t.Errorf("dirty map should be empty after flush, got %d", v.dirtyMap.Len())
}
// Read from extent should be zeros (flusher zeroed it).
// Read from extent -- should be zeros (flusher zeroed it).
got, err = v.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA after trim flush: %v", err)
@ -1453,7 +1453,7 @@ func testQAFlushPreservesNewerWrites(t *testing.T) {
t.Fatalf("WriteLBA(B): %v", err)
}
// Now flush again flusher should see the new entry for LBA 0.
// Now flush again -- flusher should see the new entry for LBA 0.
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce 2: %v", err)
}
@ -1521,7 +1521,7 @@ func testQAFlushCheckpointPersists(t *testing.T) {
// Update superblock and crash.
path = simulateCrashWithSuper(v)
// Reopen recovery should skip LSN <= checkpoint and replay 5-9.
// Reopen -- recovery should skip LSN <= checkpoint and replay 5-9.
v2, err := OpenBlockVol(path)
if err != nil {
t.Fatalf("OpenBlockVol: %v", err)
@ -1582,7 +1582,7 @@ func testQAFlushWALReclaimThenWrite(t *testing.T) {
}
}
// Flush reclaim all WAL space.
// Flush -- reclaim all WAL space.
f := NewFlusher(FlusherConfig{
FD: v.fd,
Super: &v.super,
@ -1594,7 +1594,7 @@ func testQAFlushWALReclaimThenWrite(t *testing.T) {
t.Fatalf("FlushOnce: %v", err)
}
// WAL should be empty now. Write again should succeed.
// WAL should be empty now. Write again -- should succeed.
for i := 0; i < 5; i++ {
if err := v.WriteLBA(uint64(i), makeBlock(byte('a'+i))); err != nil {
t.Fatalf("write after reclaim %d: %v", i, err)
@ -1655,7 +1655,7 @@ func testQAFlusherStopIdempotent(t *testing.T) {
go f.Run()
f.Stop()
// Second stop must not panic or deadlock.
// Second stop -- must not panic or deadlock.
done := make(chan struct{})
go func() {
f.Stop()
@ -1795,7 +1795,7 @@ func testQARecoverMixedWriteTrimBarrier(t *testing.T) {
t.Error("LBA 0 should be 'A'")
}
// LBA 1: trimmed zeros.
// LBA 1: trimmed -- zeros.
got, err = v2.ReadLBA(1, 4096)
if err != nil {
t.Fatalf("ReadLBA(1): %v", err)
@ -1932,7 +1932,7 @@ func testQARecoverOverwriteSameLBA(t *testing.T) {
}
}
// testQARecoverCrashLoop: Write, sync, crash, recover 20 iterations.
// testQARecoverCrashLoop: Write, sync, crash, recover -- 20 iterations.
// Each iteration writes new data and verifies previous data survived.
func testQARecoverCrashLoop(t *testing.T) {
dir := t.TempDir()
@ -2107,8 +2107,8 @@ func testQARecoverMultiBlockWrite(t *testing.T) {
}
// testQARecoverOracleWithCrash: Oracle pattern with periodic crash+recover.
// This is the most valuable adversarial test it exercises the full
// write→sync→crash→recover→verify cycle with random operations.
// This is the most valuable adversarial test -- it exercises the full
// write->sync->crash->recover->verify cycle with random operations.
func testQARecoverOracleWithCrash(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "oracle_crash.blockvol")
@ -2210,7 +2210,7 @@ func testQARecoverOracleWithCrash(t *testing.T) {
}
// ============================================================================
// QA Adversarial Tests Tasks 1.10 (Lifecycle), 1.11 (Crash Stress)
// QA Adversarial Tests -- Tasks 1.10 (Lifecycle), 1.11 (Crash Stress)
// ============================================================================
// --- Task 1.10: Lifecycle adversarial tests ---
@ -2241,7 +2241,7 @@ func testQALifecycleWriteAfterClose(t *testing.T) {
v := createTestVol(t)
v.Close()
// Write after close fd is closed, should get an error, never a panic.
// Write after close -- fd is closed, should get an error, never a panic.
err := v.WriteLBA(0, makeBlock('X'))
if err == nil {
t.Error("WriteLBA after Close should fail")
@ -2259,7 +2259,7 @@ func testQALifecycleReadAfterClose(t *testing.T) {
v.Close()
// Read after close fd is closed, should error, not panic.
// Read after close -- fd is closed, should error, not panic.
_, err := v.ReadLBA(0, 4096)
if err == nil {
t.Error("ReadLBA after Close should fail")
@ -2281,7 +2281,7 @@ func testQALifecycleSyncAfterClose(t *testing.T) {
}
}
// testQALifecycleCloseDrainsDirty: Close does a final flush dirty map should
// testQALifecycleCloseDrainsDirty: Close does a final flush -- dirty map should
// be empty and data should be in extent region. Reopen should find blocks in
// extent (not WAL) and dirty map should be empty.
func testQALifecycleCloseDrainsDirty(t *testing.T) {
@ -2303,12 +2303,12 @@ func testQALifecycleCloseDrainsDirty(t *testing.T) {
}
}
// Close should flush all dirty blocks to extent.
// Close -- should flush all dirty blocks to extent.
if err := v.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
// Reopen recovery should find nothing in WAL (all flushed).
// Reopen -- recovery should find nothing in WAL (all flushed).
v2, err := OpenBlockVol(path)
if err != nil {
t.Fatalf("OpenBlockVol: %v", err)
@ -2333,7 +2333,7 @@ func testQALifecycleCloseDrainsDirty(t *testing.T) {
}
}
// testQALifecycleMultiCycleAccumulate: Write→sync→close→reopen→write more, 5 cycles.
// testQALifecycleMultiCycleAccumulate: Write->sync->close->reopen->write more, 5 cycles.
// Each cycle adds new blocks. Verify all accumulated blocks survive.
func testQALifecycleMultiCycleAccumulate(t *testing.T) {
dir := t.TempDir()
@ -2413,7 +2413,7 @@ func testQALifecycleCloseWithBackgroundFlusher(t *testing.T) {
// Wait a bit to let the flusher potentially run.
time.Sleep(150 * time.Millisecond)
// Close must coordinate with flusher goroutine.
// Close -- must coordinate with flusher goroutine.
if err := v.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
@ -2493,7 +2493,7 @@ func testQALifecycleOpenCloseRapid(t *testing.T) {
}
}
// Final open verify data survived 20 open/close cycles.
// Final open -- verify data survived 20 open/close cycles.
v, err = OpenBlockVol(path)
if err != nil {
t.Fatalf("final open: %v", err)
@ -2532,7 +2532,7 @@ func TestQACrashStress(t *testing.T) {
}
// testQACrashNoSyncDataLossOK: Write WITHOUT SyncCache, crash, recover.
// Un-synced data MAY be lost this is correct behavior, not a bug.
// Un-synced data MAY be lost -- this is correct behavior, not a bug.
// The key invariant: volume must open without error and previously synced
// data must survive.
func testQACrashNoSyncDataLossOK(t *testing.T) {
@ -2566,7 +2566,7 @@ func testQACrashNoSyncDataLossOK(t *testing.T) {
v.super.WriteTo(v.fd)
v.fd.Sync()
// Write block 1 WITHOUT sync this write's WAL head is NOT in the superblock.
// Write block 1 WITHOUT sync -- this write's WAL head is NOT in the superblock.
if err := v.WriteLBA(1, makeBlock('U')); err != nil {
t.Fatalf("WriteLBA(unsynced): %v", err)
}
@ -2590,7 +2590,7 @@ func testQACrashNoSyncDataLossOK(t *testing.T) {
t.Error("synced block 0 should survive crash")
}
// Un-synced block 1: may or may not be there both are correct.
// Un-synced block 1: may or may not be there -- both are correct.
// Just verify we can read without error.
_, err = v2.ReadLBA(1, 4096)
if err != nil {
@ -2988,7 +2988,7 @@ func testQACrashOverwriteStorm(t *testing.T) {
}
// ============================================================================
// QA Adversarial Tests Round 4 (Architect-directed edge cases)
// QA Adversarial Tests -- Round 4 (Architect-directed edge cases)
// ============================================================================
// --- WAL / Recovery Edge Cases ---
@ -3075,7 +3075,7 @@ func testQARecoverEntrySizeMismatchAtTail(t *testing.T) {
}
}
// Entry 3 (corrupt) should NOT be recovered read returns zeros.
// Entry 3 (corrupt) should NOT be recovered -- read returns zeros.
got, err := v2.ReadLBA(2, 4096)
if err != nil {
t.Fatalf("ReadLBA(2): %v", err)
@ -3121,7 +3121,7 @@ func testQARecoverPartialPadding(t *testing.T) {
// Advance tail past entry 1 so we have room to wrap.
v.wal.AdvanceTail(entrySize)
// Write entry 3 this should trigger padding (50 bytes) at end and wrap to 0.
// Write entry 3 -- this should trigger padding (50 bytes) at end and wrap to 0.
if err := v.WriteLBA(2, makeBlock('C')); err != nil {
t.Fatalf("WriteLBA(2) after wrap: %v", err)
}
@ -3150,7 +3150,7 @@ func testQARecoverPartialPadding(t *testing.T) {
v.fd.Close()
path = v.Path()
// Recovery should handle this either skip corrupt padding and find
// Recovery should handle this -- either skip corrupt padding and find
// entry 3, or stop at the corruption. Either way, no panic.
v2, err := OpenBlockVol(path)
if err != nil {
@ -3255,7 +3255,7 @@ func testQARecoverWriteThenTrimSameLBA(t *testing.T) {
}
defer v2.Close()
// TRIM is latest should return zeros.
// TRIM is latest -- should return zeros.
got, err := v2.ReadLBA(7, 4096)
if err != nil {
t.Fatalf("ReadLBA(7): %v", err)
@ -3310,7 +3310,7 @@ func testQARecoverBarrierOnlyFullWAL(t *testing.T) {
}
defer v2.Close()
// No data changes from barriers read should return zeros.
// No data changes from barriers -- read should return zeros.
got, err := v2.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA: %v", err)
@ -3352,7 +3352,7 @@ func testQAFlushInterleavedOverwrite(t *testing.T) {
t.Fatalf("WriteLBA(A): %v", err)
}
// Flush moves 'A' to extent, removes dirty entry for LSN 1.
// Flush -- moves 'A' to extent, removes dirty entry for LSN 1.
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce 1: %v", err)
}
@ -3373,7 +3373,7 @@ func testQAFlushInterleavedOverwrite(t *testing.T) {
t.Fatal("LBA 0 should be in dirty map")
}
// Flush snapshot captures LSN 3. After flush, extent has 'C'.
// Flush -- snapshot captures LSN 3. After flush, extent has 'C'.
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce 2: %v", err)
}
@ -3429,12 +3429,12 @@ func testQAFlushPartialWALWrap(t *testing.T) {
}
}
// Flush moves all to extent, advances tail.
// Flush -- moves all to extent, advances tail.
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce 1: %v", err)
}
// Write more these will wrap around in the WAL.
// Write more -- these will wrap around in the WAL.
for i := 0; i < firstBatch; i++ {
lba := uint64(firstBatch + i)
if lba >= 256 { // stay within volume
@ -3448,7 +3448,7 @@ func testQAFlushPartialWALWrap(t *testing.T) {
}
}
// Flush again should handle wrapped entries correctly.
// Flush again -- should handle wrapped entries correctly.
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce 2: %v", err)
}
@ -3498,7 +3498,7 @@ func testQAFlushTrimMixedWrite(t *testing.T) {
t.Fatalf("Trim(3): %v", err)
}
// Flush should write data for 0,2,4 and zeros for 1,3.
// Flush -- should write data for 0,2,4 and zeros for 1,3.
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce: %v", err)
}
@ -3652,7 +3652,7 @@ func testQACloseWhileSyncCacheWaits(t *testing.T) {
select {
case err := <-closeDone:
// Close may return nil or an error from final flush both are OK.
// Close may return nil or an error from final flush -- both are OK.
_ = err
case <-time.After(5 * time.Second):
t.Fatal("Close deadlocked")
@ -3680,12 +3680,12 @@ func testQACloseWithPendingDirtyMap(t *testing.T) {
}
}
// Close should stop group committer, stop flusher, do final flush.
// Close -- should stop group committer, stop flusher, do final flush.
if err := v.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
// Reopen verify dirty map is empty (all data in extent).
// Reopen -- verify dirty map is empty (all data in extent).
v2, err := OpenBlockVol(path)
if err != nil {
t.Fatalf("OpenBlockVol: %v", err)
@ -3783,7 +3783,7 @@ func testQAWALSizeMinHeader(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "tiny_wal.blockvol")
// WAL = walEntryHeaderSize + 1 byte can't fit any entry with data.
// WAL = walEntryHeaderSize + 1 byte -- can't fit any entry with data.
walSize := uint64(walEntryHeaderSize + 1)
v, err := CreateBlockVol(path, CreateOptions{

862
weed/storage/blockvol/blockvol_test.go
File diff suppressed because it is too large
View File

2
weed/storage/blockvol/dist_group_commit.go

@ -37,7 +37,7 @@ func MakeDistributedSync(walSync func() error, shipper *WALShipper, vol *BlockVo
return localErr
}
if remoteErr != nil {
// Local succeeded, replica failed degrade but don't fail the client.
// Local succeeded, replica failed --degrade but don't fail the client.
vol.degradeReplica(remoteErr)
return nil
}

6
weed/storage/blockvol/flusher.go

@ -160,7 +160,7 @@ func (f *Flusher) FlushOnce() error {
// reused (by a previous flush cycle advancing the tail + new writes).
entryLSN := binary.LittleEndian.Uint64(headerBuf[0:8])
if entryLSN != e.Lsn {
continue // stale WAL slot reused, skip this entry
continue // stale --WAL slot reused, skip this entry
}
// Parse entry type and length.
@ -177,14 +177,14 @@ func (f *Flusher) FlushOnce() error {
entry, err := DecodeWALEntry(fullBuf)
if err != nil {
continue // corrupt or partially overwritten skip
continue // corrupt or partially overwritten --skip
}
// Write only this block's data to extent (not all blocks in the
// WAL entry). Other blocks may have been overwritten by newer
// writes and their dirty map entries point elsewhere.
if e.Lba < entry.LBA {
continue // LBA mismatch stale entry
continue // LBA mismatch --stale entry
}
blockIdx := e.Lba - entry.LBA
dataStart := blockIdx * uint64(f.blockSize)

2
weed/storage/blockvol/flusher_test.go

@ -349,7 +349,7 @@ func testFlusherErrorLogged(t *testing.T) {
Logger: logger,
})
// Run flusher briefly FlushOnce should error, and Run should log it.
// Run flusher briefly -- FlushOnce should error, and Run should log it.
go f.Run()
f.Notify()
time.Sleep(50 * time.Millisecond)

2
weed/storage/blockvol/group_commit.go

@ -129,7 +129,7 @@ func (gc *GroupCommitter) Run() {
}
// Perform fsync with panic recovery. A panic in syncFunc must
// not leave waiters hung notify them and drain stragglers.
// not leave waiters hung --notify them and drain stragglers.
err := gc.callSyncFunc()
gc.syncCount.Add(1)
if err == nil && gc.postSyncCheck != nil {

4
weed/storage/blockvol/iscsi/bug_dataout_timeout_test.go

@ -40,7 +40,7 @@ func TestBugCollectDataOutNoTimeout(t *testing.T) {
binary.BigEndian.PutUint32(cdb[2:6], 0) // LBA 0
binary.BigEndian.PutUint16(cdb[7:9], 1) // 1 block
cmd.SetCDB(cdb)
// No DataSegment forces R2T.
// No DataSegment -- forces R2T.
if err := WritePDU(env.clientConn, cmd); err != nil {
t.Fatalf("WritePDU: %v", err)
@ -56,7 +56,7 @@ func TestBugCollectDataOutNoTimeout(t *testing.T) {
}
// DO NOT send Data-Out. The session should time out and close.
// Currently it blocks forever in collectDataOut ReadPDU(s.conn).
// Currently it blocks forever in collectDataOut -> ReadPDU(s.conn).
select {
case err := <-env.done:

6
weed/storage/blockvol/iscsi/cmd/iscsi-target/smoke-test.sh

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# smoke-test.sh iscsiadm smoke test for SeaweedFS iSCSI target
# smoke-test.sh -- iscsiadm smoke test for SeaweedFS iSCSI target
#
# Prerequisites:
# - Linux host with iscsiadm (open-iscsi) installed
@ -122,7 +122,7 @@ log "Login OK"
sleep 2
ISCSI_DEV=$(iscsiadm -m session -P 3 2>/dev/null | grep "Attached scsi disk" | awk '{print $4}' | head -1)
if [[ -z "$ISCSI_DEV" ]]; then
warn "Could not determine attached device trying /dev/sdb"
warn "Could not determine attached device -- trying /dev/sdb"
ISCSI_DEV="sdb"
fi
DEV_PATH="/dev/$ISCSI_DEV"
@ -158,7 +158,7 @@ rm -f /tmp/iscsi-smoke-pattern /tmp/iscsi-smoke-readback
# -------------------------------------------------------
log "Creating ext4 filesystem..."
mkfs.ext4 -F -q "$DEV_PATH" || {
warn "mkfs.ext4 failed skipping filesystem tests"
warn "mkfs.ext4 failed -- skipping filesystem tests"
# Still consider the test a pass if dd worked
log "SMOKE TEST PASSED (dd only, mkfs skipped)"
exit 0

8
weed/storage/blockvol/iscsi/dataio.go

@ -33,7 +33,7 @@ func NewDataInWriter(maxSegLen uint32) *DataInWriter {
func (d *DataInWriter) WriteDataIn(w io.Writer, data []byte, itt uint32, expCmdSN, maxCmdSN uint32, statSN *uint32) (int, error) {
totalLen := uint32(len(data))
if totalLen == 0 {
// Zero-length read send single Data-In with S-bit, no data
// Zero-length read -- send single Data-In with S-bit, no data
pdu := &PDU{}
pdu.SetOpcode(OpSCSIDataIn)
pdu.SetOpSpecific1(FlagF | FlagS) // Final + Status
@ -172,7 +172,7 @@ func (c *DataOutCollector) Remaining() uint32 {
}
// BuildR2T creates an R2T PDU requesting more data from the initiator.
// StatSN is NOT set here txLoop assigns it (statSNCopy mode, no increment).
// StatSN is NOT set here -- txLoop assigns it (statSNCopy mode, no increment).
func BuildR2T(itt, ttt uint32, r2tSN uint32, bufferOffset, desiredLen uint32, expCmdSN, maxCmdSN uint32) *PDU {
pdu := &PDU{}
pdu.SetOpcode(OpR2T)
@ -196,7 +196,7 @@ func SendSCSIResponse(w io.Writer, result SCSIResult, itt uint32, statSN *uint32
}
// BuildSCSIResponse creates a SCSI Response PDU without writing it.
// StatSN is NOT set the caller (or txLoop) assigns it.
// StatSN is NOT set -- the caller (or txLoop) assigns it.
func BuildSCSIResponse(result SCSIResult, itt uint32, expCmdSN, maxCmdSN uint32) *PDU {
pdu := &PDU{}
pdu.SetOpcode(OpSCSIResp)
@ -219,7 +219,7 @@ func BuildSCSIResponse(result SCSIResult, itt uint32, expCmdSN, maxCmdSN uint32)
}
// BuildDataInPDUs splits data into Data-In PDUs and returns them.
// StatSN is NOT set on the final PDU the txLoop assigns it.
// StatSN is NOT set on the final PDU -- the txLoop assigns it.
// Intermediate PDUs (without S-bit) never carry StatSN.
func (d *DataInWriter) BuildDataInPDUs(data []byte, itt uint32, expCmdSN, maxCmdSN uint32) []*PDU {
totalLen := uint32(len(data))

2
weed/storage/blockvol/iscsi/discovery.go

@ -11,7 +11,7 @@ func HandleTextRequest(req *PDU, targets []DiscoveryTarget) *PDU {
params, err := ParseParams(req.DataSegment)
if err != nil {
// Malformed return empty response
// Malformed -- return empty response
return resp
}

2
weed/storage/blockvol/iscsi/discovery_test.go

@ -168,7 +168,7 @@ func testMultipleTargets(t *testing.T) {
body := string(resp.DataSegment)
// The response uses EncodeDiscoveryTargets internally via the params,
// but since Params doesn't allow duplicate keys, the last one wins.
// This is a known limitation for multi-target discovery, we use
// This is a known limitation -- for multi-target discovery, we use
// EncodeDiscoveryTargets directly. Let's verify at least the last target.
if !strings.Contains(body, "TargetName=") {
t.Fatal("no TargetName in response")

6
weed/storage/blockvol/iscsi/integration_test.go

@ -229,7 +229,7 @@ func testMultiBlockWriteRead(t *testing.T) {
resp2 := sendSCSICmd(t, conn, rCDB, 3, true, false, nil, uint32(dataLen))
// May be split across multiple Data-In PDUs reassemble
// May be split across multiple Data-In PDUs -- reassemble
var readData []byte
readData = append(readData, resp2.DataSegment...)
@ -360,7 +360,7 @@ func testWriteAtBoundary(t *testing.T) {
t.Fatal("boundary write failed")
}
// Write past end should fail
// Write past end -- should fail
var wCDB2 [16]byte
wCDB2[0] = iscsi.ScsiWrite10
binary.BigEndian.PutUint32(wCDB2[2:6], 1024) // out of bounds
@ -396,7 +396,7 @@ func testUnmapIntegration(t *testing.T) {
t.Fatalf("unmap failed: %d", resp.SCSIStatus())
}
// Read back should be zeros
// Read back -- should be zeros
var rCDB [16]byte
rCDB[0] = iscsi.ScsiRead10
binary.BigEndian.PutUint32(rCDB[2:6], 5)

2
weed/storage/blockvol/iscsi/login.go

@ -197,7 +197,7 @@ func (ln *LoginNegotiator) HandleLoginPDU(req *PDU, resolver TargetResolver) *PD
// ISID
ln.isid = req.ISID()
// We don't implement CHAP declare AuthMethod=None
// We don't implement CHAP -- declare AuthMethod=None
respParams.Set("AuthMethod", "None")
if transit {

2
weed/storage/blockvol/iscsi/pdu.go

@ -11,7 +11,7 @@ import (
// RFC 7143, Section 12.1
const BHSLength = 48
// Opcode constants initiator opcodes (RFC 7143, Section 12.1.1)
// Opcode constants -- initiator opcodes (RFC 7143, Section 12.1.1)
const (
OpNOPOut uint8 = 0x00
OpSCSICmd uint8 = 0x01

4
weed/storage/blockvol/iscsi/pdu_test.go

@ -165,7 +165,7 @@ func testDataPadding(t *testing.T) {
func testOpcodeAccessors(t *testing.T) {
p := &PDU{}
// Set immediate first, then opcode verify no interference
// Set immediate first, then opcode -- verify no interference
p.SetImmediate(true)
p.SetOpcode(OpLoginReq)
if p.Opcode() != OpLoginReq {
@ -411,7 +411,7 @@ func testOpcodeName(t *testing.T) {
}
func testReadTruncatedBHS(t *testing.T) {
// Only 20 bytes not enough for BHS
// Only 20 bytes -- not enough for BHS
buf := bytes.NewReader(make([]byte, 20))
_, err := ReadPDU(buf)
if err == nil {

4
weed/storage/blockvol/iscsi/qa_test.go

@ -411,7 +411,7 @@ func testQA_Login(t *testing.T) {
// Send SCSI Command during login phase -> rejected
conn, _ := qaSessionDefault(t)
// Don't login send a SCSI command directly
// Don't login -- send a SCSI command directly
cmd := &PDU{}
cmd.SetOpcode(OpSCSICmd)
cmd.SetOpSpecific1(FlagF | FlagR)
@ -881,7 +881,7 @@ func testQA_DataIO(t *testing.T) {
cmd.SetCmdSN(cmdSN)
cmd.SetCDB(cdb)
// With default config: ImmediateData=true, InitialR2T=true
// No immediate data sent let R2T drive everything
// No immediate data sent -- let R2T drive everything
if err := WritePDU(conn, cmd); err != nil {
t.Fatal(err)
}

18
weed/storage/blockvol/iscsi/scsi.go

@ -257,7 +257,7 @@ func (h *SCSIHandler) inquiryVPDB0(allocLen uint16) SCSIResult {
data[1] = 0xb0 // page code
binary.BigEndian.PutUint16(data[2:4], 0x003c) // page length = 60
// Byte 5: WSNZ (Write Same No Zero) = 0 we accept zero-length WRITE SAME
// Byte 5: WSNZ (Write Same No Zero) = 0 -- we accept zero-length WRITE SAME
// Bytes 6-7: Maximum compare and write length = 0 (not supported)
// Bytes 8-9: Optimal transfer length granularity = 1 block
@ -275,7 +275,7 @@ func (h *SCSIHandler) inquiryVPDB0(allocLen uint16) SCSIResult {
// Bytes 32-35: Optimal UNMAP granularity = 1 block
binary.BigEndian.PutUint32(data[32:36], 1)
// Bytes 36-39: UNMAP granularity alignment (bit 31 = UGAVALID)
// Not set no alignment requirement.
// Not set -- no alignment requirement.
// Bytes 40-47: Maximum WRITE SAME length (uint64)
binary.BigEndian.PutUint64(data[40:48], maxWS)
@ -304,9 +304,9 @@ func (h *SCSIHandler) inquiryVPDB2(allocLen uint16) SCSIResult {
data[5] = 0x02
// Byte 6: Provisioning group descriptor flags
// Bit 7: LBPU (Logical Block Provisioning UNMAP) = 1 we support UNMAP
// Bit 6: LBPWS (LBP Write Same) = 1 we support WRITE SAME with UNMAP bit
// Bit 5: LBPWS10 = 0 we don't support WRITE SAME(10)
// Bit 7: LBPU (Logical Block Provisioning UNMAP) = 1 -- we support UNMAP
// Bit 6: LBPWS (LBP Write Same) = 1 -- we support WRITE SAME with UNMAP bit
// Bit 5: LBPWS10 = 0 -- we don't support WRITE SAME(10)
data[6] = 0xC0 // LBPU=1, LBPWS=1
if int(allocLen) < len(data) {
@ -353,7 +353,7 @@ func (h *SCSIHandler) readCapacity16(cdb [16]byte) SCSIResult {
}
func (h *SCSIHandler) modeSense6(cdb [16]byte) SCSIResult {
// Minimal MODE SENSE(6) response no mode pages
// Minimal MODE SENSE(6) response -- no mode pages
allocLen := cdb[4]
if allocLen == 0 {
allocLen = 4
@ -372,7 +372,7 @@ func (h *SCSIHandler) modeSense6(cdb [16]byte) SCSIResult {
}
func (h *SCSIHandler) modeSense10(cdb [16]byte) SCSIResult {
// MODE SENSE(10) 8-byte header, no mode pages
// MODE SENSE(10) -- 8-byte header, no mode pages
allocLen := binary.BigEndian.Uint16(cdb[7:9])
if allocLen == 0 {
allocLen = 8
@ -545,7 +545,7 @@ func (h *SCSIHandler) unmap(cdb [16]byte, dataOut []byte) SCSIResult {
return SCSIResult{Status: SCSIStatusGood}
}
// writeSame16 handles WRITE SAME(16) SBC-4, Section 5.42.
// writeSame16 handles WRITE SAME(16) -- SBC-4, Section 5.42.
// If UNMAP bit is set, the range is trimmed (zeroed). Otherwise the single
// logical block from dataOut is written repeatedly across the range.
// NDOB (No Data-Out Buffer) means zero the range with no data payload.
@ -585,7 +585,7 @@ func (h *SCSIHandler) writeSame16(cdb [16]byte, dataOut []byte) SCSIResult {
}
pattern := dataOut[:blockSize]
// Check if the pattern is all zeros use Trim for efficiency.
// Check if the pattern is all zeros -- use Trim for efficiency.
allZero := true
for _, b := range pattern {
if b != 0 {

4
weed/storage/blockvol/iscsi/scsi_test.go

@ -759,7 +759,7 @@ func testWriteSame16Pattern(t *testing.T) {
var cdb [16]byte
cdb[0] = ScsiWriteSame16
// No UNMAP, no NDOB normal write same
// No UNMAP, no NDOB -- normal write same
binary.BigEndian.PutUint64(cdb[2:10], 30)
binary.BigEndian.PutUint32(cdb[10:14], 3)
@ -785,7 +785,7 @@ func testWriteSame16Zeros(t *testing.T) {
dev.blocks[40] = make([]byte, 4096)
dev.blocks[40][0] = 0xFF
// WRITE SAME with all-zero pattern should use Trim for efficiency
// WRITE SAME with all-zero pattern -- should use Trim for efficiency
pattern := make([]byte, 4096)
var cdb [16]byte
cdb[0] = ScsiWriteSame16

2
weed/storage/blockvol/iscsi/session.go

@ -370,7 +370,7 @@ func (s *Session) handleSCSICmd(pdu *PDU) error {
isRead := flags&FlagR != 0
expectedLen := pdu.ExpectedDataTransferLength()
// Handle write commands collect data
// Handle write commands -- collect data
var dataOut []byte
if isWrite && expectedLen > 0 {
collector := NewDataOutCollector(expectedLen)

26
weed/storage/blockvol/iscsi/session_test.go

@ -285,7 +285,7 @@ func testLogout(t *testing.T) {
}
// After logout, the connection should be closed by the target.
// Verify by trying to read should get EOF.
// Verify by trying to read -- should get EOF.
_, err = ReadPDU(env.clientConn)
if err == nil {
t.Fatal("expected EOF after logout")
@ -512,7 +512,7 @@ func testRXTXWriteWithR2T(t *testing.T) {
env := setupSession(t)
doLogin(t, env.clientConn)
// Write without immediate data should trigger R2T + Data-Out.
// Write without immediate data -- should trigger R2T + Data-Out.
cmd := &PDU{}
cmd.SetOpcode(OpSCSICmd)
cmd.SetOpSpecific1(FlagF | FlagW) // no immediate data
@ -794,7 +794,7 @@ func testRXTXConnDropReader(t *testing.T) {
env := setupSession(t)
doLogin(t, env.clientConn)
// Close client conn reader should detect EOF.
// Close client conn -- reader should detect EOF.
env.clientConn.Close()
select {
@ -829,7 +829,7 @@ func testRXTXConnDropWriter(t *testing.T) {
// Session should exit.
select {
case <-done:
// Good session exited.
// Good -- session exited.
case <-time.After(2 * time.Second):
t.Fatal("session did not exit after server-side close")
}
@ -843,7 +843,7 @@ func testRXTXR2TStatSNFresh(t *testing.T) {
env := setupSession(t)
doLogin(t, env.clientConn)
// Send a NOP-Out (immediate) its NOP-In response will consume one StatSN.
// Send a NOP-Out (immediate) -- its NOP-In response will consume one StatSN.
nop := &PDU{}
nop.SetOpcode(OpNOPOut)
nop.SetOpSpecific1(FlagF)
@ -881,7 +881,7 @@ func testRXTXR2TStatSNFresh(t *testing.T) {
t.Fatal(err)
}
// Read R2T its StatSN should be nopStatSN+1 (fresh, not stale).
// Read R2T -- its StatSN should be nopStatSN+1 (fresh, not stale).
r2t, err := ReadPDU(env.clientConn)
if err != nil {
t.Fatal(err)
@ -910,7 +910,7 @@ func testRXTXR2TStatSNFresh(t *testing.T) {
t.Fatal(err)
}
// Read SCSI response its StatSN should be nopStatSN+1 (R2T didn't increment).
// Read SCSI response -- its StatSN should be nopStatSN+1 (R2T didn't increment).
resp, err := ReadPDU(env.clientConn)
if err != nil {
t.Fatal(err)
@ -941,7 +941,7 @@ func testRXTXTxErrorExitsClean(t *testing.T) {
doLogin(t, client)
// Close the server side this makes txLoop's WritePDU fail.
// Close the server side -- this makes txLoop's WritePDU fail.
server.Close()
// Send something from client side so rxLoop dispatches and enqueues a response.
@ -965,9 +965,9 @@ func testRXTXTxErrorExitsClean(t *testing.T) {
// Verify txDone is closed (no goroutine leak).
select {
case <-sess.txDone:
// Good txLoop exited.
// Good -- txLoop exited.
default:
t.Fatal("txDone not closed txLoop may be leaked")
t.Fatal("txDone not closed -- txLoop may be leaked")
}
client.Close()
@ -994,7 +994,7 @@ func testRXTXLoginPhaseReject(t *testing.T) {
client.Close()
}()
// Send a TextReq before login should get a Reject (inline, not buffered).
// Send a TextReq before login -- should get a Reject (inline, not buffered).
textParams := NewParams()
textParams.Set("SendTargets", "All")
textReq := makeTextReq(textParams)
@ -1087,7 +1087,7 @@ func testRXTXPendingQueueOverflow(t *testing.T) {
// Session should exit with an error (pending queue overflow).
select {
case <-env.done:
// Good session exited.
// Good -- session exited.
case <-time.After(3 * time.Second):
t.Fatal("session did not exit after pending overflow")
}
@ -1134,6 +1134,6 @@ func testRXTXDataOutTimeout(t *testing.T) {
case err := <-env.done:
t.Logf("session exited: %v", err)
case <-time.After(3 * time.Second):
t.Fatal("session did not time out DataOutTimeout not working")
t.Fatal("session did not time out -- DataOutTimeout not working")
}
}

6
weed/storage/blockvol/iscsi/target_test.go

@ -186,7 +186,7 @@ func testGracefulShutdown(t *testing.T) {
conn := dialTarget(t, addr)
loginToTarget(t, conn)
// Close the target should shut down cleanly
// Close the target -- should shut down cleanly
ts.Close()
// Connection should be dropped
@ -289,13 +289,13 @@ func testCloseRejectsLateConn(t *testing.T) {
select {
case <-done:
// Good handleConn returned promptly.
// Good -- handleConn returned promptly.
case <-time.After(2 * time.Second):
t.Fatal("handleConn did not return after ts.closed was signaled")
}
// The server-side conn should be closed by handleConn's defer.
// Verify by reading from client should get EOF.
// Verify by reading from client -- should get EOF.
buf := make([]byte, 1)
_, err := client.Read(buf)
if err == nil {

11
weed/storage/blockvol/promotion.go

@ -9,6 +9,7 @@ import (
var (
ErrInvalidAssignment = errors.New("blockvol: invalid assignment transition")
ErrDrainTimeout = errors.New("blockvol: drain timeout waiting for in-flight ops")
ErrEpochRegression = errors.New("blockvol: epoch regression")
)
const defaultDrainTimeout = 10 * time.Second
@ -56,6 +57,9 @@ func HandleAssignment(vol *BlockVol, newEpoch uint64, newRole Role, leaseTTL tim
// promote transitions Replica/None -> Primary.
// Order matters: epoch durable BEFORE writes possible.
func promote(vol *BlockVol, newEpoch uint64, leaseTTL time.Duration) error {
if newEpoch < vol.Epoch() {
return fmt.Errorf("%w: new %d < current %d", ErrEpochRegression, newEpoch, vol.Epoch())
}
if err := vol.SetEpoch(newEpoch); err != nil {
return fmt.Errorf("promote: set epoch: %w", err)
}
@ -70,7 +74,12 @@ func promote(vol *BlockVol, newEpoch uint64, leaseTTL time.Duration) error {
// demote transitions Primary -> Draining -> Stale.
// Revokes lease first, drains in-flight ops, then persists new epoch.
func demote(vol *BlockVol, newEpoch uint64) error {
// Revoke lease — writeGate blocks new writes immediately.
// Guard epoch monotonicity before any state changes.
if newEpoch < vol.Epoch() {
return fmt.Errorf("%w: new %d < current %d", ErrEpochRegression, newEpoch, vol.Epoch())
}
// Revoke lease --writeGate blocks new writes immediately.
vol.lease.Revoke()
// Transition to Draining.

56
weed/storage/blockvol/qa_phase4a_cp1_test.go

@ -207,7 +207,7 @@ func testQAEpochSetDuringWrite(t *testing.T) {
t.Fatal("epoch_set_during_write hung for 10s")
}
// Close and reopen verify no corruption and epoch in expected range.
// Close and reopen -- verify no corruption and epoch in expected range.
path := v.Path()
v.Close()
@ -266,7 +266,7 @@ func testQAEpochMaxUint64(t *testing.T) {
}
func testQAEpochReopenAfterConcurrentSets(t *testing.T) {
// BUG-4A-3: concurrent SetEpoch → close → reopen → verify no corruption.
// BUG-4A-3: concurrent SetEpoch -> close -> reopen -> verify no corruption.
dir := t.TempDir()
path := filepath.Join(dir, "epoch_reopen.blockvol")
@ -336,20 +336,20 @@ func testQAEpochReopenAfterConcurrentSets(t *testing.T) {
// --- QA-4A-2: Lease Adversarial ---
func testQALeaseGrantZeroTTL(t *testing.T) {
// BUG-4A-5: Grant(0) stores time.Now() immediately expired.
// BUG-4A-5: Grant(0) stores time.Now() -- immediately expired.
var l Lease
l.Grant(0)
if l.IsValid() {
t.Error("Grant(0): IsValid() = true, want false (immediately expired)")
}
// Grant(1ns) may or may not be valid, but must not panic.
// Grant(1ns) -- may or may not be valid, but must not panic.
l.Grant(time.Nanosecond)
_ = l.IsValid() // just verify no panic
}
func testQALeaseGrantNegativeTTL(t *testing.T) {
// BUG-4A-5: Grant(-1s) stores past time always expired.
// BUG-4A-5: Grant(-1s) stores past time -- always expired.
var l Lease
l.Grant(-1 * time.Second)
if l.IsValid() {
@ -507,7 +507,7 @@ func testQARoleConcurrentTransitions(t *testing.T) {
v := createFencedVol(t)
defer v.Close()
// Get to Stale: Primary → Draining → Stale.
// Get to Stale: Primary -> Draining -> Stale.
if err := v.SetRole(RoleDraining); err != nil {
t.Fatalf("SetRole(Draining): %v", err)
}
@ -536,10 +536,10 @@ func testQARoleConcurrentTransitions(t *testing.T) {
errA, errB, finalRole)
if errA == nil && errB == nil {
t.Error("BUG-4A-2 REGRESSION: both transitions succeeded CAS not working")
t.Error("BUG-4A-2 REGRESSION: both transitions succeeded -- CAS not working")
}
if errA != nil && errB != nil {
t.Error("both transitions failed unexpected, at least one should succeed")
t.Error("both transitions failed -- unexpected, at least one should succeed")
}
if finalRole != RoleRebuilding && finalRole != RoleReplica {
t.Errorf("final role = %s, want Rebuilding or Replica", finalRole)
@ -555,7 +555,7 @@ func testQARoleRapidCycle100x(t *testing.T) {
callbackCount.Add(1)
})
// Full cycle: Primary→Draining→Stale→Rebuilding→Replica→Primary
// Full cycle: Primary->Draining->Stale->Rebuilding->Replica->Primary
cycle := []Role{RoleDraining, RoleStale, RoleRebuilding, RoleReplica, RolePrimary}
const rounds = 100
@ -589,7 +589,7 @@ func testQARoleCallbackPanicRecovery(t *testing.T) {
panic("callback panic!")
})
// SetRole(Draining) callback panics but safeCallback recovers it.
// SetRole(Draining) -- callback panics but safeCallback recovers it.
// SetRole should return nil (CAS succeeded), not propagate the panic.
err := v.SetRole(RoleDraining)
if err != nil {
@ -657,7 +657,7 @@ func testQARoleUnknownValue(t *testing.T) {
t.Errorf("writeGate with role=255: got %v, want ErrNotPrimary", err)
}
// SetRole(Primary) from unknown role not in transition table.
// SetRole(Primary) from unknown role -- not in transition table.
err = v.SetRole(RolePrimary)
if !errors.Is(err, ErrInvalidRoleTransition) {
t.Errorf("SetRole(Primary) from role=255: got %v, want ErrInvalidRoleTransition", err)
@ -740,7 +740,7 @@ func testQAGateEpochBumpDuringWrite(t *testing.T) {
v, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
WALSize: 1 * 1024 * 1024, // 1MB WAL plenty of room
WALSize: 1 * 1024 * 1024, // 1MB WAL -- plenty of room
}, cfg)
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
@ -794,7 +794,7 @@ func testQAGateEpochBumpDuringWrite(t *testing.T) {
t.Fatal("gate_epoch_bump_during_write hung for 10s")
}
// After bump, both epochs are 2 writes must succeed.
// After bump, both epochs are 2 -- writes must succeed.
if v.Epoch() != 2 {
t.Errorf("Epoch() = %d after bump, want 2", v.Epoch())
}
@ -807,7 +807,7 @@ func testQAGateLeaseExpireDuringWrite(t *testing.T) {
v := createFencedVol(t)
defer v.Close()
// Short lease write in a loop that spans the expiry window.
// Short lease -- write in a loop that spans the expiry window.
v.lease.Grant(10 * time.Millisecond)
var succeeded, expired int
@ -833,10 +833,10 @@ loop:
t.Logf("lease expire: %d succeeded, %d expired (out of %d)", succeeded, expired, i)
if expired == 0 {
t.Error("no writes expired lease didn't expire in time?")
t.Error("no writes expired -- lease didn't expire in time?")
}
if succeeded == 0 {
t.Error("no writes succeeded lease expired immediately?")
t.Error("no writes succeeded -- lease expired immediately?")
}
// Verify monotonicity: once expired, no more successes.
@ -939,7 +939,7 @@ func testQAGateGotchaA(t *testing.T) {
// Grant a short lease.
v.lease.Grant(50 * time.Millisecond)
// Write data (succeeds lease still valid).
// Write data (succeeds -- lease still valid).
if err := v.WriteLBA(0, makeBlock('G')); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
@ -947,7 +947,7 @@ func testQAGateGotchaA(t *testing.T) {
// Wait for lease to expire.
time.Sleep(100 * time.Millisecond)
// SyncCache must fail PostSyncCheck calls writeGate, which checks lease.
// SyncCache must fail -- PostSyncCheck calls writeGate, which checks lease.
err := v.SyncCache()
if !errors.Is(err, ErrLeaseExpired) {
t.Errorf("SyncCache after lease expiry: got %v, want ErrLeaseExpired (Gotcha A)", err)
@ -991,7 +991,7 @@ func testQAFencingFullCycle(t *testing.T) {
}
}
// Primary → Draining — writes should fail.
// Primary -> Draining -- writes should fail.
if err := v.SetRole(RoleDraining); err != nil {
t.Fatalf("SetRole(Draining): %v", err)
}
@ -999,7 +999,7 @@ func testQAFencingFullCycle(t *testing.T) {
t.Errorf("write as Draining: got %v, want ErrNotPrimary", err)
}
// Draining → Stale — writes rejected.
// Draining -> Stale -- writes rejected.
if err := v.SetRole(RoleStale); err != nil {
t.Fatalf("SetRole(Stale): %v", err)
}
@ -1007,7 +1007,7 @@ func testQAFencingFullCycle(t *testing.T) {
t.Errorf("write as Stale: got %v, want ErrNotPrimary", err)
}
// Bump epoch, transition through Rebuilding Replica.
// Bump epoch, transition through Rebuilding -> Replica.
if err := v.SetEpoch(2); err != nil {
t.Fatalf("SetEpoch(2): %v", err)
}
@ -1027,7 +1027,7 @@ func testQAFencingFullCycle(t *testing.T) {
t.Errorf("write as Replica: got %v, want ErrNotPrimary", err)
}
// Replica → Primary with fresh lease — writes succeed again.
// Replica -> Primary with fresh lease -- writes succeed again.
if err := v.SetRole(RolePrimary); err != nil {
t.Fatalf("SetRole(Primary): %v", err)
}
@ -1047,7 +1047,7 @@ func testQAFencingFullCycle(t *testing.T) {
t.Errorf("final LBA %d: data[0] = %c, want %c", i, data[0], byte('A'+i%26))
}
}
t.Log("full cycle: Primary→Draining→Stale→Rebuilding→Replica→Primary complete")
t.Log("full cycle: Primary->Draining->Stale->Rebuilding->Replica->Primary complete")
}
func testQAFencingWritesRejectedAfterDemotion(t *testing.T) {
@ -1062,7 +1062,7 @@ func testQAFencingWritesRejectedAfterDemotion(t *testing.T) {
t.Fatalf("SyncCache: %v", err)
}
// Revoke lease ErrLeaseExpired.
// Revoke lease -> ErrLeaseExpired.
v.lease.Revoke()
err := v.WriteLBA(1, makeBlock('X'))
if !errors.Is(err, ErrLeaseExpired) {
@ -1072,7 +1072,7 @@ func testQAFencingWritesRejectedAfterDemotion(t *testing.T) {
// Re-grant lease for role checks, then demote.
v.lease.Grant(10 * time.Second)
// Primary → Draining → ErrNotPrimary.
// Primary -> Draining -> ErrNotPrimary.
if err := v.SetRole(RoleDraining); err != nil {
t.Fatalf("SetRole(Draining): %v", err)
}
@ -1081,7 +1081,7 @@ func testQAFencingWritesRejectedAfterDemotion(t *testing.T) {
t.Errorf("after Draining: got %v, want ErrNotPrimary", err)
}
// Draining → Stale → ErrNotPrimary.
// Draining -> Stale -> ErrNotPrimary.
if err := v.SetRole(RoleStale); err != nil {
t.Fatalf("SetRole(Stale): %v", err)
}
@ -1150,11 +1150,11 @@ func testQAFencingReadAlwaysWorks(t *testing.T) {
}
verifyReads("Replica")
// Expired lease reads still work.
// Expired lease -- reads still work.
v.lease.Revoke()
verifyReads("expired_lease")
// Stale epoch reads still work.
// Stale epoch -- reads still work.
v.SetMasterEpoch(999)
verifyReads("stale_epoch")

84
weed/storage/blockvol/qa_phase4a_cp2_test.go

@ -120,8 +120,8 @@ func createReplicaPair(t *testing.T) (primary, replica *BlockVol) {
// --- QA-4A-CP2-1: Frame Protocol Adversarial ---
func testQAFrameMaxPayloadBoundary(t *testing.T) {
// Payload exactly at maxFramePayload should succeed.
// Payload at maxFramePayload+1 ReadFrame must return ErrFrameTooLarge.
// Payload exactly at maxFramePayload -> should succeed.
// Payload at maxFramePayload+1 -> ReadFrame must return ErrFrameTooLarge.
// Test at boundary: we can't allocate 256MB in a test, but we can test
// the ReadFrame parser with a crafted header.
@ -144,7 +144,7 @@ func testQAFrameMaxPayloadBoundary(t *testing.T) {
hdr[0] = MsgWALEntry
bigEndianPut32(hdr[1:5], uint32(maxFramePayload))
buf.Write(hdr)
// No payload data ReadFrame should return io.ErrUnexpectedEOF (not ErrFrameTooLarge).
// No payload data -> ReadFrame should return io.ErrUnexpectedEOF (not ErrFrameTooLarge).
_, _, err = ReadFrame(&buf)
if errors.Is(err, ErrFrameTooLarge) {
t.Error("payload exactly at maxFramePayload should not be rejected as too large")
@ -156,7 +156,7 @@ func testQAFrameMaxPayloadBoundary(t *testing.T) {
}
func testQAFrameTruncatedHeader(t *testing.T) {
// Only 3 bytes (< 5 byte header) ReadFrame must return error.
// Only 3 bytes (< 5 byte header) -> ReadFrame must return error.
buf := bytes.NewReader([]byte{0x01, 0x00, 0x00})
_, _, err := ReadFrame(buf)
if err == nil {
@ -209,7 +209,7 @@ func testQAFrameZeroPayload(t *testing.T) {
func testQAFrameConcurrentWrites(t *testing.T) {
// Multiple goroutines writing frames to the same connection.
// Frames must not interleave (each frame readable as a complete unit).
// This relies on the WALShipper's mutex test at the connection level.
// This relies on the WALShipper's mutex -- test at the connection level.
server, client := net.Pipe()
defer server.Close()
defer client.Close()
@ -304,7 +304,7 @@ func testQAShipperBarrierAfterDegraded(t *testing.T) {
}
func testQAShipperStaleEpochNoShippedLSN(t *testing.T) {
// Ship with epoch != current entry silently dropped, shippedLSN unchanged.
// Ship with epoch != current -> entry silently dropped, shippedLSN unchanged.
currentEpoch := uint64(5)
s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return currentEpoch })
@ -333,14 +333,14 @@ func testQAShipperDegradedPermanent(t *testing.T) {
if err != nil {
return
}
conn.Close() // immediately close Ship will get write error
conn.Close() // immediately close -> Ship will get write error
}
}()
s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 })
defer s.Stop()
// Ship triggers connection → immediate close → write error → degraded.
// Ship triggers connection -> immediate close -> write error -> degraded.
entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('D')}
_ = s.Ship(entry) // first attempt may or may not degrade
_ = s.Ship(entry) // second attempt more likely to hit closed conn
@ -359,7 +359,7 @@ func testQAShipperDegradedPermanent(t *testing.T) {
}
lsnAfter := s.ShippedLSN()
if lsnAfter > lsnBefore {
t.Errorf("ShippedLSN advanced from %d to %d after degradation should not ship when degraded", lsnBefore, lsnAfter)
t.Errorf("ShippedLSN advanced from %d to %d after degradation -- should not ship when degraded", lsnBefore, lsnAfter)
}
// Barrier must immediately return ErrReplicaDegraded.
@ -370,7 +370,7 @@ func testQAShipperDegradedPermanent(t *testing.T) {
}
func testQAShipperConcurrentShipStop(t *testing.T) {
// Ship and Stop from concurrent goroutines no deadlock, no panic.
// Ship and Stop from concurrent goroutines -> no deadlock, no panic.
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen: %v", err)
@ -423,10 +423,10 @@ func testQAShipperConcurrentShipStop(t *testing.T) {
func testQAReceiverOutOfOrderLSN(t *testing.T) {
// Tests contiguity enforcement:
// 1. Contiguous LSN=1..5 all applied
// 2. Duplicate LSN=3 skipped
// 3. Gap LSN=7 (skipping 6) rejected
// 4. Correct next LSN=6 accepted
// 1. Contiguous LSN=1..5 -> all applied
// 2. Duplicate LSN=3 -> skipped
// 3. Gap LSN=7 (skipping 6) -> rejected
// 4. Correct next LSN=6 -> accepted
dir := t.TempDir()
cfg := DefaultConfig()
cfg.FlushInterval = 5 * time.Millisecond
@ -455,7 +455,7 @@ func testQAReceiverOutOfOrderLSN(t *testing.T) {
}
defer conn.Close()
// Case 1: Contiguous LSN=1..5 all applied.
// Case 1: Contiguous LSN=1..5 -> all applied.
for lsn := uint64(1); lsn <= 5; lsn++ {
entry := &WALEntry{LSN: lsn, Epoch: 1, Type: EntryTypeWrite, LBA: lsn - 1, Length: 4096, Data: makeBlock(byte('A' + lsn))}
encoded, _ := entry.Encode()
@ -468,7 +468,7 @@ func testQAReceiverOutOfOrderLSN(t *testing.T) {
t.Fatalf("ReceivedLSN = %d after LSN 1-5, want 5", recv.ReceivedLSN())
}
// Case 2: Duplicate LSN=3 skipped.
// Case 2: Duplicate LSN=3 -> skipped.
entry3 := &WALEntry{LSN: 3, Epoch: 1, Type: EntryTypeWrite, LBA: 2, Length: 4096, Data: makeBlock('Z')}
encoded3, _ := entry3.Encode()
if err := WriteFrame(conn, MsgWALEntry, encoded3); err != nil {
@ -479,7 +479,7 @@ func testQAReceiverOutOfOrderLSN(t *testing.T) {
t.Errorf("ReceivedLSN = %d after duplicate LSN=3, want 5", recv.ReceivedLSN())
}
// Case 3: Gap LSN=7 (skips 6) rejected by contiguity check.
// Case 3: Gap LSN=7 (skips 6) -> rejected by contiguity check.
entry7 := &WALEntry{LSN: 7, Epoch: 1, Type: EntryTypeWrite, LBA: 6, Length: 4096, Data: makeBlock('G')}
encoded7, _ := entry7.Encode()
if err := WriteFrame(conn, MsgWALEntry, encoded7); err != nil {
@ -490,7 +490,7 @@ func testQAReceiverOutOfOrderLSN(t *testing.T) {
t.Errorf("ReceivedLSN = %d after gap LSN=7, want 5 (gap must be rejected)", recv.ReceivedLSN())
}
// Case 4: Correct next LSN=6 accepted.
// Case 4: Correct next LSN=6 -> accepted.
entry6 := &WALEntry{LSN: 6, Epoch: 1, Type: EntryTypeWrite, LBA: 5, Length: 4096, Data: makeBlock('F')}
encoded6, _ := entry6.Encode()
if err := WriteFrame(conn, MsgWALEntry, encoded6); err != nil {
@ -550,7 +550,7 @@ func testQAReceiverConcurrentDataConns(t *testing.T) {
t.Errorf("ReceivedLSN = %d after 40 contiguous entries, want 40", got)
}
// Second connection sending a duplicate LSN=5 must be skipped.
// Second connection sending a duplicate LSN=5 -> must be skipped.
conn2, err := net.Dial("tcp", recv.DataAddr())
if err != nil {
t.Fatalf("dial conn2: %v", err)
@ -571,7 +571,7 @@ func testQAReceiverConcurrentDataConns(t *testing.T) {
func testQAReceiverFutureEpochRejected(t *testing.T) {
// R1 fix: replica must reject entries with epoch > local (not just <).
// A future epoch in the WAL stream is a protocol violation only master
// A future epoch in the WAL stream is a protocol violation -- only master
// can bump epochs via SetEpoch.
dir := t.TempDir()
cfg := DefaultConfig()
@ -601,7 +601,7 @@ func testQAReceiverFutureEpochRejected(t *testing.T) {
}
defer conn.Close()
// Send entry with stale epoch=3 rejected.
// Send entry with stale epoch=3 -> rejected.
stale := &WALEntry{LSN: 1, Epoch: 3, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('S')}
encodedStale, _ := stale.Encode()
if err := WriteFrame(conn, MsgWALEntry, encodedStale); err != nil {
@ -612,7 +612,7 @@ func testQAReceiverFutureEpochRejected(t *testing.T) {
t.Errorf("ReceivedLSN = %d after stale epoch entry, want 0", recv.ReceivedLSN())
}
// Send entry with future epoch=10 also rejected (R1 fix).
// Send entry with future epoch=10 -> also rejected (R1 fix).
future := &WALEntry{LSN: 1, Epoch: 10, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('F')}
encodedFuture, _ := future.Encode()
if err := WriteFrame(conn, MsgWALEntry, encodedFuture); err != nil {
@ -623,7 +623,7 @@ func testQAReceiverFutureEpochRejected(t *testing.T) {
t.Errorf("ReceivedLSN = %d after future epoch entry, want 0 (future epoch must be rejected)", recv.ReceivedLSN())
}
// Send entry with correct epoch=5 accepted.
// Send entry with correct epoch=5 -> accepted.
correct := &WALEntry{LSN: 1, Epoch: 5, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('C')}
encodedCorrect, _ := correct.Encode()
if err := WriteFrame(conn, MsgWALEntry, encodedCorrect); err != nil {
@ -636,7 +636,7 @@ func testQAReceiverFutureEpochRejected(t *testing.T) {
}
func testQAReceiverBarrierEpochMismatch(t *testing.T) {
// Barrier with wrong epoch immediate BarrierEpochMismatch (no wait).
// Barrier with wrong epoch -> immediate BarrierEpochMismatch (no wait).
dir := t.TempDir()
cfg := DefaultConfig()
cfg.FlushInterval = 100 * time.Millisecond
@ -665,7 +665,7 @@ func testQAReceiverBarrierEpochMismatch(t *testing.T) {
}
defer ctrlConn.Close()
// Barrier with stale epoch=3 immediate EpochMismatch.
// Barrier with stale epoch=3 -> immediate EpochMismatch.
req := EncodeBarrierRequest(BarrierRequest{LSN: 1, Epoch: 3})
start := time.Now()
if err := WriteFrame(ctrlConn, MsgBarrierReq, req); err != nil {
@ -686,7 +686,7 @@ func testQAReceiverBarrierEpochMismatch(t *testing.T) {
t.Errorf("epoch mismatch barrier took %v, expected immediate response", elapsed)
}
// Barrier with future epoch=99 also immediate EpochMismatch.
// Barrier with future epoch=99 -> also immediate EpochMismatch.
req2 := EncodeBarrierRequest(BarrierRequest{LSN: 1, Epoch: 99})
if err := WriteFrame(ctrlConn, MsgBarrierReq, req2); err != nil {
t.Fatalf("send barrier future: %v", err)
@ -703,7 +703,7 @@ func testQAReceiverBarrierEpochMismatch(t *testing.T) {
}
func testQAReceiverBarrierBeforeEntries(t *testing.T) {
// Barrier arrives BEFORE data entries must wait, then succeed when entries arrive.
// Barrier arrives BEFORE data entries -> must wait, then succeed when entries arrive.
dir := t.TempDir()
cfg := DefaultConfig()
cfg.FlushInterval = 5 * time.Millisecond
@ -751,7 +751,7 @@ func testQAReceiverBarrierBeforeEntries(t *testing.T) {
case resp := <-barrierDone:
t.Fatalf("barrier returned immediately with status %d, expected it to wait", resp.Status)
case <-time.After(50 * time.Millisecond):
// Good barrier is waiting.
// Good -- barrier is waiting.
}
// Now send entries LSN=1,2,3.
@ -781,7 +781,7 @@ func testQAReceiverBarrierBeforeEntries(t *testing.T) {
}
func testQAReceiverBarrierTimeoutNoEntries(t *testing.T) {
// Barrier for LSN=999 with no entries must timeout (not hang forever).
// Barrier for LSN=999 with no entries -> must timeout (not hang forever).
// Uses short configurable barrierTimeout for fast test.
dir := t.TempDir()
cfg := DefaultConfig()
@ -834,7 +834,7 @@ func testQAReceiverBarrierTimeoutNoEntries(t *testing.T) {
}
func testQAReceiverStopUnblocksBarrier(t *testing.T) {
// Barrier waiting for entries → Stop called → barrier must return (not hang).
// Barrier waiting for entries -> Stop called -> barrier must return (not hang).
dir := t.TempDir()
cfg := DefaultConfig()
cfg.FlushInterval = 100 * time.Millisecond
@ -877,8 +877,8 @@ func testQAReceiverStopUnblocksBarrier(t *testing.T) {
ctrlConn.SetReadDeadline(time.Now().Add(3 * time.Second))
_, payload, err := ReadFrame(ctrlConn)
if err != nil {
// Connection closed by Stop also acceptable.
t.Logf("barrier read after Stop: %v (connection closed acceptable)", err)
// Connection closed by Stop -> also acceptable.
t.Logf("barrier read after Stop: %v (connection closed -- acceptable)", err)
return
}
if payload[0] != BarrierTimeout {
@ -889,7 +889,7 @@ func testQAReceiverStopUnblocksBarrier(t *testing.T) {
// --- QA-4A-CP2-4: DistributedSync Adversarial ---
func testQADSyncLocalFailReturnsError(t *testing.T) {
// Local fsync fails must return error regardless of remote result.
// Local fsync fails -> must return error regardless of remote result.
localErr := errors.New("disk on fire")
vol := &BlockVol{}
@ -897,7 +897,7 @@ func testQADSyncLocalFailReturnsError(t *testing.T) {
syncFn := MakeDistributedSync(
func() error { return localErr },
nil, // no shipper local only
nil, // no shipper -> local only
vol,
)
@ -908,7 +908,7 @@ func testQADSyncLocalFailReturnsError(t *testing.T) {
}
func testQADSyncRemoteFailDegrades(t *testing.T) {
// Remote barrier fails → local succeeded → must return nil + degrade shipper.
// Remote barrier fails -> local succeeded -> must return nil + degrade shipper.
primary, replica := createReplicaPair(t)
defer primary.Close()
defer replica.Close()
@ -929,9 +929,9 @@ func testQADSyncRemoteFailDegrades(t *testing.T) {
}
primary.shipper.ctrlMu.Unlock()
// SyncCache triggers distributed sync → barrier fails → degrade.
// SyncCache triggers distributed sync -> barrier fails -> degrade.
err := primary.SyncCache()
// Should succeed (local fsync succeeded, remote degraded returned nil).
// Should succeed (local fsync succeeded, remote degraded -- returned nil).
if err != nil {
t.Errorf("SyncCache after replica stop: got %v, want nil (local succeeded, remote should degrade silently)", err)
}
@ -943,7 +943,7 @@ func testQADSyncRemoteFailDegrades(t *testing.T) {
}
func testQADSyncBothFail(t *testing.T) {
// Both local and remote fail must return local error.
// Both local and remote fail -> must return local error.
localErr := errors.New("local disk fail")
vol := &BlockVol{}
@ -1011,14 +1011,14 @@ func testQADSyncParallelExecution(t *testing.T) {
// Verify local fsync actually ran (not skipped).
if localStart.Load() == 0 {
t.Error("local fsync was never called distributed sync may have fallen back to local-only")
t.Error("local fsync was never called -- distributed sync may have fallen back to local-only")
}
}
// --- QA-4A-CP2-5: End-to-end Adversarial ---
func testQAE2EReplicaDataMatchesPrimary(t *testing.T) {
// Write N blocks on primary verify replica has identical data via ReadLBA.
// Write N blocks on primary -> verify replica has identical data via ReadLBA.
primary, replica := createReplicaPair(t)
defer primary.Close()
defer replica.Close()
@ -1059,7 +1059,7 @@ func testQAE2EReplicaDataMatchesPrimary(t *testing.T) {
}
func testQAE2EClosePrimaryDuringShip(t *testing.T) {
// Close primary while writes + shipping in progress no hang, no panic.
// Close primary while writes + shipping in progress -> no hang, no panic.
primary, replica := createReplicaPair(t)
defer replica.Close()
@ -1072,7 +1072,7 @@ func testQAE2EClosePrimaryDuringShip(t *testing.T) {
for i := 0; i < 100; i++ {
err := primary.WriteLBA(uint64(i%256), makeBlock(byte('W')))
if err != nil {
return // closed or WAL full expected
return // closed or WAL full -- expected
}
}
}()

1088
weed/storage/blockvol/qa_phase4a_cp3_test.go
File diff suppressed because it is too large
View File

840
weed/storage/blockvol/qa_phase4a_cp4a_test.go

@ -0,0 +1,840 @@
package blockvol
import (
"errors"
"fmt"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"
)
// TestQAPhase4ACP4a runs adversarial tests for Phase 4A CP4a:
// SimulatedMaster, HandleAssignment sequences, failover, and Status().
func TestQAPhase4ACP4a(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
// --- SimulatedMaster correctness ---
{name: "master_epoch_monotonic_across_operations", run: testMasterEpochMonotonic},
{name: "master_promote_replica_both_fenced", run: testMasterPromoteReplicaBothFenced},
{name: "master_demote_then_grant_reuses_vol", run: testMasterDemoteThenGrantReusesVol},
// --- HandleAssignment edge cases ---
{name: "assign_same_role_equal_epoch_no_persist", run: testAssignSameRoleEqualEpochNoPersist},
{name: "assign_none_to_replica_epoch_not_persisted", run: testAssignNoneToReplicaEpochNotPersisted},
{name: "assign_demote_epoch_less_than_current", run: testAssignDemoteEpochLessThanCurrent},
{name: "assign_closed_volume", run: testAssignClosedVolume},
{name: "assign_concurrent_demote_and_refresh", run: testAssignConcurrentDemoteAndRefresh},
{name: "assign_rapid_promote_demote_10x", run: testAssignRapidPromoteDemote10x},
// --- Status() edge cases ---
{name: "status_fresh_primary_no_writes", run: testStatusFreshPrimaryNoWrites},
{name: "status_during_writes", run: testStatusDuringWrites},
{name: "status_all_roles", run: testStatusAllRoles},
{name: "status_after_close", run: testStatusAfterClose},
{name: "status_checkpoint_advances_after_flush", run: testStatusCheckpointAdvances},
// --- Failover adversarial ---
{name: "failover_triple_A_B_A_B", run: testFailoverTriple},
{name: "failover_concurrent_promote_two_vols", run: testFailoverConcurrentPromoteTwoVols},
{name: "failover_demote_during_active_writes", run: testFailoverDemoteDuringActiveWrites},
{name: "failover_epoch_always_increases", run: testFailoverEpochAlwaysIncreases},
{name: "failover_rebuild_then_immediate_failover", run: testFailoverRebuildThenImmediateFailover},
{name: "failover_dead_zone_no_writes_anywhere", run: testFailoverDeadZoneNoWritesAnywhere},
// --- beginOp/endOp adversarial ---
{name: "ops_begin_after_close", run: testOpsBeginAfterClose},
{name: "ops_drain_blocks_demote", run: testOpsDrainBlocksDemote},
{name: "ops_concurrent_begin_end_100", run: testOpsConcurrentBeginEnd100},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.run(t)
})
}
}
// cp4aVol creates a minimal test volume.
func cp4aVol(t *testing.T, cfgs ...BlockVolConfig) *BlockVol {
t.Helper()
dir := t.TempDir()
path := filepath.Join(dir, "cp4a.blockvol")
var cfg BlockVolConfig
if len(cfgs) > 0 {
cfg = cfgs[0]
}
v, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
}, cfg)
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
return v
}
func cp4aBlock(b byte) []byte {
buf := make([]byte, 4096)
for i := range buf {
buf[i] = b
}
return buf
}
// ---------------------------------------------------------------------------
// SimulatedMaster correctness
// ---------------------------------------------------------------------------
func testMasterEpochMonotonic(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
m := NewSimulatedMaster(30 * time.Second)
// GrantPrimary -> epoch 1
m.GrantPrimary(v)
e1 := v.Epoch()
// BumpEpoch -> epoch 2
m.BumpEpoch(v)
e2 := v.Epoch()
// Demote -> epoch 3
m.Demote(v)
e3 := v.Epoch()
if e1 >= e2 || e2 >= e3 {
t.Errorf("epoch not monotonic: %d -> %d -> %d", e1, e2, e3)
}
// Rebuild back to Primary -> epoch 4 then 5
m.InitiateRebuild(v)
v.SetRole(RoleReplica)
m.GrantPrimary(v)
e4 := v.Epoch()
if e4 <= e3 {
t.Errorf("epoch after re-promote (%d) should be > demote epoch (%d)", e4, e3)
}
}
func testMasterPromoteReplicaBothFenced(t *testing.T) {
a := cp4aVol(t)
defer a.Close()
b := cp4aVol(t)
defer b.Close()
m := NewSimulatedMaster(30 * time.Second)
m.GrantPrimary(a)
// Write to A.
if err := a.WriteLBA(0, cp4aBlock('A')); err != nil {
t.Fatalf("A write: %v", err)
}
// Assign B as replica.
m.AssignReplica(b)
// PromoteReplica: demotes A, promotes B.
_, err := m.PromoteReplica(a, b)
if err != nil {
t.Fatalf("PromoteReplica: %v", err)
}
// A must be fenced (Stale).
if err := a.WriteLBA(0, cp4aBlock('X')); err == nil {
t.Error("old primary should be fenced after PromoteReplica")
}
// B must be writable (Primary).
if err := b.WriteLBA(0, cp4aBlock('B')); err != nil {
t.Errorf("new primary should accept writes: %v", err)
}
// Both should have the same epoch.
if a.Epoch() != b.Epoch() {
t.Errorf("epochs should match: A=%d B=%d", a.Epoch(), b.Epoch())
}
}
func testMasterDemoteThenGrantReusesVol(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
m := NewSimulatedMaster(30 * time.Second)
// Primary -> Stale -> Rebuilding -> Replica -> Primary again.
m.GrantPrimary(v)
if err := v.WriteLBA(0, cp4aBlock('1')); err != nil {
t.Fatalf("first write: %v", err)
}
m.Demote(v)
m.InitiateRebuild(v)
v.SetRole(RoleReplica)
m.GrantPrimary(v)
// Write again with new epoch.
if err := v.WriteLBA(1, cp4aBlock('2')); err != nil {
t.Fatalf("second write after re-promotion: %v", err)
}
// Read both blocks.
d0, _ := v.ReadLBA(0, 4096)
d1, _ := v.ReadLBA(1, 4096)
if d0[0] != '1' {
t.Errorf("block 0: got %c, want '1'", d0[0])
}
if d1[0] != '2' {
t.Errorf("block 1: got %c, want '2'", d1[0])
}
}
// ---------------------------------------------------------------------------
// HandleAssignment edge cases
// ---------------------------------------------------------------------------
func testAssignSameRoleEqualEpochNoPersist(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
// Promote to Primary at epoch 1.
v.HandleAssignment(1, RolePrimary, 30*time.Second)
// Same role, same epoch -- should be a no-op on epoch, but refreshes lease.
v.lease.Revoke()
if v.lease.IsValid() {
t.Fatal("lease should be revoked")
}
err := v.HandleAssignment(1, RolePrimary, 30*time.Second)
if err != nil {
t.Fatalf("same role/epoch assignment: %v", err)
}
if !v.lease.IsValid() {
t.Error("lease should be refreshed on same-role assignment")
}
if v.Epoch() != 1 {
t.Errorf("epoch should remain 1, got %d", v.Epoch())
}
}
func testAssignNoneToReplicaEpochNotPersisted(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
// None -> Replica at epoch 5.
err := v.HandleAssignment(5, RoleReplica, 0)
if err != nil {
t.Fatalf("None->Replica: %v", err)
}
if v.Role() != RoleReplica {
t.Errorf("role: got %s, want Replica", v.Role())
}
// masterEpoch is set, but Epoch() (persisted) may or may not be 5.
// This is by design: replicas don't need persisted epoch until promoted.
// Key invariant: promote() will call SetEpoch, so the epoch gets
// persisted when it matters.
masterEpoch := v.masterEpoch.Load()
if masterEpoch != 5 {
t.Errorf("masterEpoch: got %d, want 5", masterEpoch)
}
}
func testAssignDemoteEpochLessThanCurrent(t *testing.T) {
// BUG-CP4A-1 (fixed): demote with epoch < current must be rejected
// to preserve monotonicity.
v := cp4aVol(t)
defer v.Close()
// Promote at epoch 5.
v.HandleAssignment(5, RolePrimary, 30*time.Second)
// Demote with epoch 3 (stale) -- must fail with ErrEpochRegression.
err := v.HandleAssignment(3, RoleStale, 0)
if err == nil {
t.Fatal("expected error for epoch regression, got nil")
}
if !errors.Is(err, ErrEpochRegression) {
t.Fatalf("expected ErrEpochRegression, got: %v", err)
}
// Volume should still be Primary at epoch 5 (demote was rejected).
if v.Role() != RolePrimary {
t.Errorf("role should still be Primary, got %s", v.Role())
}
if v.Epoch() != 5 {
t.Errorf("epoch should still be 5, got %d", v.Epoch())
}
}
func testAssignClosedVolume(t *testing.T) {
v := cp4aVol(t)
v.HandleAssignment(1, RolePrimary, 30*time.Second)
v.Close()
// HandleAssignment on closed volume -- should it error?
// Currently it acquires assignMu and proceeds. SetEpoch writes to fd,
// which is closed. This should fail with an I/O error.
err := v.HandleAssignment(2, RolePrimary, 30*time.Second)
t.Logf("HandleAssignment on closed vol: %v", err)
// We don't prescribe the exact error, but it shouldn't panic.
}
func testAssignConcurrentDemoteAndRefresh(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
v.HandleAssignment(1, RolePrimary, 30*time.Second)
v.drainTimeout = 2 * time.Second
var wg sync.WaitGroup
results := make(chan string, 2)
// Goroutine 1: demote.
wg.Add(1)
go func() {
defer wg.Done()
err := v.HandleAssignment(2, RoleStale, 0)
if err != nil {
results <- fmt.Sprintf("demote: %v", err)
} else {
results <- "demote: ok"
}
}()
// Goroutine 2: same-role refresh (races with demote for assignMu).
wg.Add(1)
go func() {
defer wg.Done()
err := v.HandleAssignment(1, RolePrimary, 30*time.Second)
if err != nil {
results <- fmt.Sprintf("refresh: %v", err)
} else {
results <- "refresh: ok"
}
}()
wg.Wait()
close(results)
// One should win the assignMu. If demote wins first, the refresh
// will see role=Stale and try Primary->Primary (same-role) or
// Stale->Primary (invalid). Either way, no panic.
for r := range results {
t.Logf(" %s", r)
}
// Final role should be determinate.
role := v.Role()
t.Logf("final role: %s", role)
}
func testAssignRapidPromoteDemote10x(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
v.drainTimeout = 100 * time.Millisecond
for i := 0; i < 10; i++ {
epoch := uint64(i*2 + 1)
// Promote.
if err := v.HandleAssignment(epoch, RolePrimary, 30*time.Second); err != nil {
// After first demote, role is Stale. Can't go directly to Primary.
// Need to go through Rebuilding -> Replica first.
if errors.Is(err, ErrInvalidAssignment) && i > 0 {
v.HandleAssignment(epoch, RoleRebuilding, 0)
v.SetRole(RoleReplica)
if err := v.HandleAssignment(epoch, RolePrimary, 30*time.Second); err != nil {
t.Fatalf("promote iter %d after rebuild: %v", i, err)
}
} else {
t.Fatalf("promote iter %d: %v", i, err)
}
}
// Write to verify writable.
if err := v.WriteLBA(0, cp4aBlock(byte('0'+i%10))); err != nil {
t.Fatalf("write iter %d: %v", i, err)
}
// Demote.
demoteEpoch := epoch + 1
if err := v.HandleAssignment(demoteEpoch, RoleStale, 0); err != nil {
t.Fatalf("demote iter %d: %v", i, err)
}
}
// Final: role should be Stale, epoch should be 20.
if v.Role() != RoleStale {
t.Errorf("final role: got %s, want Stale", v.Role())
}
if v.Epoch() != 20 {
t.Errorf("final epoch: got %d, want 20", v.Epoch())
}
}
// ---------------------------------------------------------------------------
// Status() edge cases
// ---------------------------------------------------------------------------
func testStatusFreshPrimaryNoWrites(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
v.HandleAssignment(1, RolePrimary, 30*time.Second)
st := v.Status()
if st.Epoch != 1 {
t.Errorf("Epoch: got %d, want 1", st.Epoch)
}
if st.Role != RolePrimary {
t.Errorf("Role: got %s, want Primary", st.Role)
}
if !st.HasLease {
t.Error("HasLease: got false, want true")
}
// WALHeadLSN = nextLSN - 1 = 1 - 1 = 0 (no writes yet).
if st.WALHeadLSN != 0 {
t.Errorf("WALHeadLSN: got %d, want 0 (no writes)", st.WALHeadLSN)
}
}
func testStatusDuringWrites(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
v.HandleAssignment(1, RolePrimary, 30*time.Second)
// Write some blocks, check status advances.
for i := 0; i < 5; i++ {
v.WriteLBA(uint64(i), cp4aBlock(byte('A'+i)))
}
st := v.Status()
if st.WALHeadLSN < 4 {
t.Errorf("WALHeadLSN: got %d, want >= 4 after 5 writes", st.WALHeadLSN)
}
}
func testStatusAllRoles(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
v.drainTimeout = 100 * time.Millisecond
roleChecks := []struct {
setup func()
role Role
lease bool
}{
{
setup: func() { v.HandleAssignment(1, RolePrimary, 30*time.Second) },
role: RolePrimary, lease: true,
},
{
setup: func() { v.HandleAssignment(2, RoleStale, 0) },
role: RoleStale, lease: false,
},
{
setup: func() { v.HandleAssignment(2, RoleRebuilding, 0) },
role: RoleRebuilding, lease: false,
},
{
setup: func() { v.SetRole(RoleReplica) },
role: RoleReplica, lease: false,
},
{
setup: func() { v.HandleAssignment(3, RolePrimary, 30*time.Second) },
role: RolePrimary, lease: true,
},
}
for _, rc := range roleChecks {
rc.setup()
st := v.Status()
if st.Role != rc.role {
t.Errorf("after transition: Role got %s, want %s", st.Role, rc.role)
}
if st.HasLease != rc.lease {
t.Errorf("role %s: HasLease got %v, want %v", rc.role, st.HasLease, rc.lease)
}
}
}
func testStatusAfterClose(t *testing.T) {
v := cp4aVol(t)
v.HandleAssignment(1, RolePrimary, 30*time.Second)
v.WriteLBA(0, cp4aBlock('X'))
v.Close()
// Status() on closed volume should not panic.
st := v.Status()
t.Logf("Status after close: epoch=%d role=%s walHead=%d lease=%v cp=%d",
st.Epoch, st.Role, st.WALHeadLSN, st.HasLease, st.CheckpointLSN)
}
func testStatusCheckpointAdvances(t *testing.T) {
v := cp4aVol(t, BlockVolConfig{
FlushInterval: 50 * time.Millisecond,
})
defer v.Close()
v.HandleAssignment(1, RolePrimary, 30*time.Second)
// Write a block.
v.WriteLBA(0, cp4aBlock('C'))
st1 := v.Status()
initialCP := st1.CheckpointLSN
// Force flush by syncing and waiting for flusher.
v.SyncCache()
time.Sleep(200 * time.Millisecond) // wait for flusher interval
v.flusher.NotifyUrgent()
time.Sleep(200 * time.Millisecond)
st2 := v.Status()
t.Logf("checkpoint: before=%d after=%d", initialCP, st2.CheckpointLSN)
// Checkpoint may or may not have advanced depending on flusher timing.
// The key thing is no panic and the value is plausible.
}
// ---------------------------------------------------------------------------
// Failover adversarial
// ---------------------------------------------------------------------------
func testFailoverTriple(t *testing.T) {
a := cp4aVol(t)
defer a.Close()
b := cp4aVol(t)
defer b.Close()
m := NewSimulatedMaster(30 * time.Second)
// Round 1: A primary.
m.GrantPrimary(a)
a.WriteLBA(0, cp4aBlock('1'))
m.AssignReplica(b)
// Failover 1: A -> B.
m.PromoteReplica(a, b)
b.WriteLBA(1, cp4aBlock('2'))
// Rebuild A from B (simulate).
m.InitiateRebuild(a)
a.SetRole(RoleReplica)
// Failover 2: B -> A.
m.PromoteReplica(b, a)
a.WriteLBA(2, cp4aBlock('3'))
// Rebuild B from A (simulate).
m.InitiateRebuild(b)
b.SetRole(RoleReplica)
// Failover 3: A -> B.
m.PromoteReplica(a, b)
b.WriteLBA(3, cp4aBlock('4'))
// Verify B is primary, A is stale.
if b.Role() != RolePrimary {
t.Errorf("B role: %s, want Primary", b.Role())
}
if a.Role() != RoleStale {
t.Errorf("A role: %s, want Stale", a.Role())
}
// After PromoteReplica, both volumes share the same epoch.
if a.Epoch() != b.Epoch() {
t.Errorf("epochs should match after PromoteReplica: A=%d B=%d", a.Epoch(), b.Epoch())
}
// Epoch should be > 1 after 3 failovers.
if b.Epoch() < 3 {
t.Errorf("epoch after 3 failovers: got %d, want >= 3", b.Epoch())
}
}
func testFailoverConcurrentPromoteTwoVols(t *testing.T) {
// Adversarial: two volumes both try to become Primary at same epoch.
// This simulates a split-brain scenario where master sends conflicting assignments.
a := cp4aVol(t)
defer a.Close()
b := cp4aVol(t)
defer b.Close()
var wg sync.WaitGroup
errA := make(chan error, 1)
errB := make(chan error, 1)
// Both try None -> Primary at epoch 1 concurrently.
wg.Add(2)
go func() {
defer wg.Done()
errA <- a.HandleAssignment(1, RolePrimary, 30*time.Second)
}()
go func() {
defer wg.Done()
errB <- b.HandleAssignment(1, RolePrimary, 30*time.Second)
}()
wg.Wait()
// Both should succeed (they're independent volumes).
// The split-brain prevention is at the master level, not per-volume.
eA := <-errA
eB := <-errB
if eA != nil {
t.Errorf("A promote: %v", eA)
}
if eB != nil {
t.Errorf("B promote: %v", eB)
}
// Both are Primary -- this is the split-brain that master must prevent.
if a.Role() != RolePrimary || b.Role() != RolePrimary {
t.Errorf("both should be Primary: A=%s B=%s", a.Role(), b.Role())
}
t.Log("split-brain: both volumes Primary at epoch 1 -- master must prevent this")
}
func testFailoverDemoteDuringActiveWrites(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
v.HandleAssignment(1, RolePrimary, 30*time.Second)
v.drainTimeout = 2 * time.Second
var wg sync.WaitGroup
var writeSucceeded, writeFailed atomic.Int64
// Start 50 concurrent writers.
for i := 0; i < 50; i++ {
wg.Add(1)
go func(lba int) {
defer wg.Done()
for j := 0; j < 10; j++ {
if err := v.WriteLBA(uint64(lba%256), cp4aBlock(byte('A'+j%26))); err != nil {
writeFailed.Add(1)
} else {
writeSucceeded.Add(1)
}
}
}(i)
}
// Let writers get started.
time.Sleep(5 * time.Millisecond)
// Demote.
demoteErr := v.HandleAssignment(2, RoleStale, 0)
wg.Wait()
t.Logf("writes: %d succeeded, %d failed; demote err: %v",
writeSucceeded.Load(), writeFailed.Load(), demoteErr)
// After demote, no more writes should succeed.
if err := v.WriteLBA(0, cp4aBlock('Z')); err == nil {
t.Error("write should fail after demote")
}
}
func testFailoverEpochAlwaysIncreases(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
v.drainTimeout = 100 * time.Millisecond
var lastEpoch uint64
for i := 0; i < 5; i++ {
promoteEpoch := uint64(i*2 + 1)
demoteEpoch := uint64(i*2 + 2)
// Route through rebuild path after first demote.
if i > 0 {
v.HandleAssignment(promoteEpoch, RoleRebuilding, 0)
v.SetRole(RoleReplica)
}
v.HandleAssignment(promoteEpoch, RolePrimary, 30*time.Second)
if v.Epoch() <= lastEpoch && i > 0 {
t.Errorf("epoch regression at promote iter %d: %d <= %d", i, v.Epoch(), lastEpoch)
}
lastEpoch = v.Epoch()
v.HandleAssignment(demoteEpoch, RoleStale, 0)
if v.Epoch() <= lastEpoch {
// BUG-CP4A-1 would cause epoch to regress if demoteEpoch < promoteEpoch.
// Here we always use increasing epochs, so this should pass.
}
lastEpoch = v.Epoch()
}
t.Logf("final epoch after 5 failovers: %d", v.Epoch())
}
func testFailoverRebuildThenImmediateFailover(t *testing.T) {
a := cp4aVol(t)
defer a.Close()
b := cp4aVol(t)
defer b.Close()
m := NewSimulatedMaster(30 * time.Second)
// A primary, writes data.
m.GrantPrimary(a)
a.WriteLBA(0, cp4aBlock('R'))
// Start rebuild server on A.
a.StartRebuildServer("127.0.0.1:0")
defer a.StopRebuildServer()
// B: None -> Primary -> Stale -> Rebuilding.
bEpoch := m.epoch + 1
b.HandleAssignment(bEpoch, RolePrimary, 30*time.Second)
b.HandleAssignment(bEpoch, RoleStale, 0)
b.HandleAssignment(bEpoch, RoleRebuilding, 0)
// Rebuild B from A.
StartRebuild(b, a.rebuildServer.Addr(), 1, a.Epoch())
// B is now Replica. Immediately failover: demote A, promote B.
m.epoch = bEpoch + 1
a.HandleAssignment(m.epoch, RoleStale, 0)
b.HandleAssignment(m.epoch, RolePrimary, 30*time.Second)
// Verify B can write.
if err := b.WriteLBA(1, cp4aBlock('S')); err != nil {
t.Fatalf("B write after immediate failover: %v", err)
}
// Verify A is fenced.
if err := a.WriteLBA(0, cp4aBlock('X')); err == nil {
t.Error("A should be fenced after demote")
}
}
func testFailoverDeadZoneNoWritesAnywhere(t *testing.T) {
// After demote of A but before promote of B, NO volume should accept writes.
// This is the "dead zone" -- both A and B must reject.
a := cp4aVol(t)
defer a.Close()
b := cp4aVol(t)
defer b.Close()
c := cp4aVol(t)
defer c.Close()
m := NewSimulatedMaster(30 * time.Second)
m.GrantPrimary(a)
m.AssignReplica(b)
// C is a third volume that also has a stale view.
c.HandleAssignment(m.epoch, RoleReplica, 0)
// Demote A.
m.Demote(a)
// Dead zone: nobody should accept writes.
vols := []*BlockVol{a, b, c}
for i, v := range vols {
if err := v.WriteLBA(0, cp4aBlock('X')); err == nil {
t.Errorf("volume %d (role=%s) accepted write in dead zone", i, v.Role())
}
}
// Now promote B.
b.HandleAssignment(m.epoch, RolePrimary, 30*time.Second)
if err := b.WriteLBA(0, cp4aBlock('Y')); err != nil {
t.Errorf("B should accept writes after promotion: %v", err)
}
}
// ---------------------------------------------------------------------------
// beginOp/endOp adversarial
// ---------------------------------------------------------------------------
func testOpsBeginAfterClose(t *testing.T) {
v := cp4aVol(t)
v.HandleAssignment(1, RolePrimary, 30*time.Second)
v.Close()
// beginOp after close should return ErrVolumeClosed.
err := v.beginOp()
if !errors.Is(err, ErrVolumeClosed) {
t.Errorf("beginOp after close: got %v, want ErrVolumeClosed", err)
}
// opsOutstanding should be 0 (beginOp increments then decrements on closed).
if ops := v.opsOutstanding.Load(); ops != 0 {
t.Errorf("opsOutstanding: got %d, want 0", ops)
}
}
func testOpsDrainBlocksDemote(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
v.HandleAssignment(1, RolePrimary, 30*time.Second)
v.drainTimeout = 2 * time.Second
// Simulate 3 in-flight ops.
v.beginOp()
v.beginOp()
v.beginOp()
demoteDone := make(chan error, 1)
go func() {
demoteDone <- v.HandleAssignment(2, RoleStale, 0)
}()
// Demote should be blocked.
select {
case <-demoteDone:
t.Fatal("demote should be blocked by outstanding ops")
case <-time.After(50 * time.Millisecond):
// Good, still blocked.
}
// Release 2 ops -- still blocked.
v.endOp()
v.endOp()
select {
case <-demoteDone:
t.Fatal("demote should still be blocked with 1 op outstanding")
case <-time.After(50 * time.Millisecond):
}
// Release last op.
v.endOp()
select {
case err := <-demoteDone:
if err != nil {
t.Fatalf("demote failed: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("demote should have completed after all ops drained")
}
if v.Role() != RoleStale {
t.Errorf("role: got %s, want Stale", v.Role())
}
}
func testOpsConcurrentBeginEnd100(t *testing.T) {
v := cp4aVol(t)
defer v.Close()
v.HandleAssignment(1, RolePrimary, 30*time.Second)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
if err := v.beginOp(); err != nil {
return
}
// Brief work.
v.endOp()
}
}()
}
wg.Wait()
ops := v.opsOutstanding.Load()
if ops != 0 {
t.Errorf("opsOutstanding: got %d, want 0 after all goroutines done", ops)
}
}

2
weed/storage/blockvol/rebuild.go

@ -146,7 +146,7 @@ func (s *RebuildServer) handleWALCatchUp(conn net.Conn, req RebuildRequest) {
}
func (s *RebuildServer) handleFullExtent(conn net.Conn) {
// Capture snapshot LSN before streaming client will use this
// Capture snapshot LSN before streaming -- client will use this
// for a second catch-up scan to capture writes during copy.
snapshotLSN := s.vol.nextLSN.Load()

2
weed/storage/blockvol/recovery.go

@ -110,7 +110,7 @@ func RecoverWAL(fd *os.File, sb *Superblock, dirtyMap *DirtyMap) (RecoveryResult
// Decode and validate CRC.
entry, err := DecodeWALEntry(fullBuf)
if err != nil {
// CRC failure or corrupt entry stop here (torn write).
// CRC failure or corrupt entry -- stop here (torn write).
result.TornEntries++
break
}

10
weed/storage/blockvol/repl_proto.go

@ -87,11 +87,11 @@ func ReadFrame(r io.Reader) (msgType byte, payload []byte, err error) {
// Rebuild message types (on rebuild channel).
const (
MsgRebuildReq byte = 0x10 // client server
MsgRebuildEntry byte = 0x11 // server client: WAL entry
MsgRebuildExtent byte = 0x12 // server client: extent chunk
MsgRebuildDone byte = 0x13 // server client: stream complete
MsgRebuildError byte = 0x14 // server client: error
MsgRebuildReq byte = 0x10 // client -> server
MsgRebuildEntry byte = 0x11 // server -> client: WAL entry
MsgRebuildExtent byte = 0x12 // server -> client: extent chunk
MsgRebuildDone byte = 0x13 // server -> client: stream complete
MsgRebuildError byte = 0x14 // server -> client: error
)
// Rebuild request types.

2
weed/storage/blockvol/replica_apply.go

@ -186,7 +186,7 @@ func (r *ReplicaReceiver) handleDataConn(conn net.Conn) {
}
// applyEntry decodes and applies a single WAL entry to the local volume.
// The entire apply (LSN check → WAL append → dirty map → receivedLSN update)
// The entire apply (LSN check -> WAL append -> dirty map -> receivedLSN update)
// is serialized under mu to prevent TOCTOU races between concurrent entries.
func (r *ReplicaReceiver) applyEntry(payload []byte) error {
entry, err := DecodeWALEntry(payload)

2
weed/storage/blockvol/replica_barrier.go

@ -86,7 +86,7 @@ func (r *ReplicaReceiver) handleBarrier(req BarrierRequest) BarrierResponse {
default:
}
// Block on cond.Wait woken by applyEntry or timeout goroutine.
// Block on cond.Wait -- woken by applyEntry or timeout goroutine.
r.cond.Wait()
}
r.mu.Unlock()

2
weed/storage/blockvol/role.go

@ -81,7 +81,7 @@ func (v *BlockVol) SetRole(r Role) error {
}
return nil
}
// CAS failed another goroutine changed the role; retry.
// CAS failed -- another goroutine changed the role; retry.
}
}

2
weed/storage/blockvol/scripts/sw-block-attach.sh

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# sw-block-attach.sh Discover and attach a SeaweedFS block volume via iSCSI
# sw-block-attach.sh -- Discover and attach a SeaweedFS block volume via iSCSI
#
# Prerequisites:
# - Linux host with iscsiadm (open-iscsi) installed

4
weed/storage/blockvol/wal_writer.go

@ -228,7 +228,7 @@ func (w *WALWriter) ScanFrom(fd *os.File, walOffset uint64,
// Need at least a header to proceed.
if remaining < uint64(walEntryHeaderSize) {
// Too small for a header skip padding at end of region.
// Too small for a header -- skip padding at end of region.
pos += remaining
continue
}
@ -272,7 +272,7 @@ func (w *WALWriter) ScanFrom(fd *os.File, walOffset uint64,
entry, err := DecodeWALEntry(fullBuf)
if err != nil {
// CRC failure stop scanning (torn write).
// CRC failure -- stop scanning (torn write).
return nil
}

Loading…
Cancel
Save