Browse Source

fix: flusher OOM on multi-block writes + testrunner enhancements

Bug: flusher.go:336 allocated make([]byte, entryLen) per dirty block
instead of per unique WAL entry. A 4MB WriteLBA creates 1024 dirty map
entries (one per 4KB block), all sharing the same WAL offset. The flusher
read the full 4MB WAL entry 1024 times into separate buffers:
1024 × 4MB = 4GB per 4MB write → OOM on mkfs.ext4.

Root cause: flusher assumed 1:1 dirty-block-to-WAL-entry mapping.
WriteLBA supports multi-block writes but the flusher never deduplicated
shared WAL offsets.

Fix: deduplicate WAL reads by WalOffset in flushOnceLocked(). Multiple
dirty blocks from the same WAL entry share one read buffer and one
DecodeWALEntry call. Memory: O(WAL_entries × size) not O(blocks × size).
For a 4MB write: 4GB → 4MB.

Verified on hardware (m01/M02 25Gbps RoCE):
- Before: mkfs.ext4 → VS RSS 100MB→25GB → OOM killed
- After: mkfs.ext4 → VS RSS 129MB stable, mkfs succeeds
- pgbench TPC-B c=4: 1,248 TPS (RF=1, previously blocked by OOM)

Tests added:
- flusher_test.go: flush_multiblock_shared_wal_read (16 blocks share
  one WAL offset, flush dedup verified)
- flusher_test.go: flush_multiblock_data_correct (3 mixed multi-block
  writes, all data correct after flush)
- test/component/large_write_test.go: 7 component tests (single 4MB,
  sequential mkfs sim, concurrent, mixed sizes, production volume,
  flusher throughput 30s sustained)
- iscsi/large_write_mem_test.go: 2 iSCSI session memory tests (4MB
  R2T flow, slow device)

Testrunner enhancements (same commit — all tested on hardware):
- discover_primary action: maps primary IP → topology node name,
  supports alt_ips for multi-NIC (RoCE + management)
- NodeSpec.AltIPs field for multi-NIC node identification
- 5 new YAML scenarios: ec3, ec5, degraded sync_all/best_effort, pgbench
- All 13 hardware-verified scenarios PASS

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 1 day ago
parent
commit
ebe95b6e2e
  1. 46
      weed/storage/blockvol/flusher.go
  2. 153
      weed/storage/blockvol/flusher_test.go
  3. 248
      weed/storage/blockvol/iscsi/large_write_mem_test.go
  4. 283
      weed/storage/blockvol/test/component/large_write_test.go
  5. 112
      weed/storage/blockvol/testrunner/actions/devops.go
  6. 20
      weed/storage/blockvol/testrunner/actions/devops_test.go
  7. 209
      weed/storage/blockvol/testrunner/scenarios/internal/benchmark-pgbench.yaml
  8. 306
      weed/storage/blockvol/testrunner/scenarios/internal/ec3-fast-reconnect-skips-failover.yaml
  9. 241
      weed/storage/blockvol/testrunner/scenarios/internal/ec5-wrong-primary-master-restart.yaml
  10. 215
      weed/storage/blockvol/testrunner/scenarios/internal/stable-degraded-best-effort.yaml
  11. 228
      weed/storage/blockvol/testrunner/scenarios/internal/stable-degraded-mode.yaml
  12. 11
      weed/storage/blockvol/testrunner/types.go

46
weed/storage/blockvol/flusher.go

@ -197,6 +197,14 @@ func (f *Flusher) PauseAndFlush() error {
return f.flushOnceLocked()
}
// Pause acquires flushMu, pausing the flusher without flushing.
// The caller must call Resume() when done.
// Used by TruncateToLSN to prevent the flusher from flushing ahead
// entries while the dirty map and WAL are being cleared.
func (f *Flusher) Pause() {
f.flushMu.Lock()
}
// Resume releases flushMu, allowing the flusher to resume.
func (f *Flusher) Resume() {
f.flushMu.Unlock()
@ -321,12 +329,26 @@ func (f *Flusher) flushOnceLocked() error {
}
// Batch-read full WAL entries for write ops.
// Deduplicate by WalOffset: multiple dirty blocks from the same multi-block
// write share one WAL entry. Without dedup, a 4MB write (1024 blocks) would
// allocate 1024 × 4MB buffers = 4GB instead of 1 × 4MB.
type walReadInfo struct {
buf []byte
entry *WALEntry // decoded once, shared by all blocks in this WAL entry
}
walReadByOffset := make(map[uint64]*walReadInfo, len(pending))
var walReadOps []batchio.Op
for _, p := range pending {
if p.entryType == EntryTypeWrite {
if p.entryType != EntryTypeWrite {
continue
}
off := entries[p.idx].WalOffset
if _, ok := walReadByOffset[off]; !ok {
buf := make([]byte, p.entryLen)
walReadByOffset[off] = &walReadInfo{buf: buf}
walReadOps = append(walReadOps, batchio.Op{
Buf: make([]byte, p.entryLen),
Offset: int64(f.walOffset + entries[p.idx].WalOffset),
Buf: buf,
Offset: int64(f.walOffset + off),
})
}
}
@ -334,22 +356,26 @@ func (f *Flusher) flushOnceLocked() error {
if err := f.bio.PreadBatch(f.fd, walReadOps); err != nil {
return fmt.Errorf("flusher: batch read WAL entries: %w", err)
}
// Decode each WAL entry once.
for _, ri := range walReadByOffset {
decoded, err := DecodeWALEntry(ri.buf)
if err == nil {
ri.entry = &decoded
}
}
}
// Step 2c: Decode entries and build extent write ops.
// Step 2c: Build extent write ops from decoded (shared) WAL entries.
var extentWriteOps []batchio.Op
walReadI := 0
for _, p := range pending {
e := entries[p.idx]
if p.entryType == EntryTypeWrite {
fullBuf := walReadOps[walReadI].Buf
walReadI++
entry, err := DecodeWALEntry(fullBuf)
if err != nil {
ri := walReadByOffset[e.WalOffset]
if ri == nil || ri.entry == nil {
continue
}
entry := ri.entry
if e.Lba < entry.LBA {
continue
}

153
weed/storage/blockvol/flusher_test.go

@ -24,6 +24,9 @@ func TestFlusher(t *testing.T) {
{name: "flusher_notify_urgent_triggers_flush", run: testFlusherNotifyUrgentTriggersFlush},
// Phase 3 bug fix: P3-BUG-4 error logging.
{name: "flusher_error_logged", run: testFlusherErrorLogged},
// Multi-block write WAL dedup (flusher OOM fix).
{name: "flush_multiblock_shared_wal_read", run: testFlushMultiblockSharedWALRead},
{name: "flush_multiblock_data_correct", run: testFlushMultiblockDataCorrect},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -384,3 +387,153 @@ func openAndClose(path string) (*os.File, error) {
fd.Close()
return fd, nil
}
// --- Multi-block WAL dedup tests (flusher OOM fix) ---
// testFlushMultiblockSharedWALRead verifies that a multi-block WriteLBA
// (e.g. 64KB = 16 blocks) results in ONE WAL read during flush, not 16.
// This is the core invariant of the flusher dedup fix.
func testFlushMultiblockSharedWALRead(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "multiblock.blockvol")
// Volume large enough for multi-block writes: 1MB vol, 256KB WAL.
v, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
WALSize: 256 * 1024,
})
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
defer v.Close()
// Write 64KB (16 blocks) in one call. This creates 1 WAL entry, 16 dirty map entries.
multiBlock := make([]byte, 64*1024) // 16 × 4KB
for i := range multiBlock {
multiBlock[i] = byte(i & 0xFF)
}
if err := v.WriteLBA(0, multiBlock); err != nil {
t.Fatalf("WriteLBA(64KB): %v", err)
}
// Dirty map should have 16 entries (one per block).
if n := v.dirtyMap.Len(); n != 16 {
t.Fatalf("dirty map len = %d, want 16", n)
}
// Verify all 16 entries share the same WAL offset.
snap := v.dirtyMap.Snapshot()
firstOff := snap[0].WalOffset
for i, e := range snap {
if e.WalOffset != firstOff {
t.Fatalf("block %d has WalOffset=%d, want %d (shared)", i, e.WalOffset, firstOff)
}
}
t.Logf("16 dirty blocks share WalOffset=%d — dedup should read WAL once", firstOff)
// Flush. Before the fix, this would allocate 16 × 64KB = 1MB.
// After the fix, it allocates 1 × 64KB = 64KB.
f := NewFlusher(FlusherConfig{
FD: v.fd,
Super: &v.super,
SuperMu: &v.superMu,
WAL: v.wal,
DirtyMap: v.dirtyMap,
Interval: 1 * time.Hour,
})
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce: %v", err)
}
// Dirty map should be empty after flush.
if n := v.dirtyMap.Len(); n != 0 {
t.Fatalf("dirty map len after flush = %d, want 0", n)
}
}
// testFlushMultiblockDataCorrect verifies that after flushing a multi-block
// write, each block is readable with correct data from the extent region.
func testFlushMultiblockDataCorrect(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "multidata.blockvol")
v, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
WALSize: 256 * 1024,
})
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
defer v.Close()
// Write 3 different multi-block writes to exercise dedup across entries.
// Write 1: 32KB (8 blocks) at LBA 0, filled with 0xAA.
w1 := bytes.Repeat([]byte{0xAA}, 32*1024)
if err := v.WriteLBA(0, w1); err != nil {
t.Fatalf("WriteLBA w1: %v", err)
}
// Write 2: 16KB (4 blocks) at LBA 100, filled with 0xBB.
w2 := bytes.Repeat([]byte{0xBB}, 16*1024)
if err := v.WriteLBA(100, w2); err != nil {
t.Fatalf("WriteLBA w2: %v", err)
}
// Write 3: single 4KB block at LBA 50, filled with 0xCC.
w3 := bytes.Repeat([]byte{0xCC}, 4096)
if err := v.WriteLBA(50, w3); err != nil {
t.Fatalf("WriteLBA w3: %v", err)
}
// Should have 8 + 4 + 1 = 13 dirty entries from 3 WAL entries.
if n := v.dirtyMap.Len(); n != 13 {
t.Fatalf("dirty map len = %d, want 13", n)
}
f := NewFlusher(FlusherConfig{
FD: v.fd,
Super: &v.super,
SuperMu: &v.superMu,
WAL: v.wal,
DirtyMap: v.dirtyMap,
Interval: 1 * time.Hour,
})
if err := f.FlushOnce(); err != nil {
t.Fatalf("FlushOnce: %v", err)
}
// Verify each block from extent region.
// w1: LBA 0-7, each block = 0xAA.
for i := uint64(0); i < 8; i++ {
got, err := v.ReadLBA(i, 4096)
if err != nil {
t.Fatalf("ReadLBA(%d): %v", i, err)
}
if got[0] != 0xAA || got[4095] != 0xAA {
t.Errorf("LBA %d: expected 0xAA, got first=0x%02x last=0x%02x", i, got[0], got[4095])
}
}
// w3: LBA 50, block = 0xCC.
got50, err := v.ReadLBA(50, 4096)
if err != nil {
t.Fatalf("ReadLBA(50): %v", err)
}
if got50[0] != 0xCC {
t.Errorf("LBA 50: expected 0xCC, got 0x%02x", got50[0])
}
// w2: LBA 100-103, each block = 0xBB.
for i := uint64(100); i < 104; i++ {
got, err := v.ReadLBA(i, 4096)
if err != nil {
t.Fatalf("ReadLBA(%d): %v", i, err)
}
if got[0] != 0xBB || got[4095] != 0xBB {
t.Errorf("LBA %d: expected 0xBB, got first=0x%02x last=0x%02x", i, got[0], got[4095])
}
}
t.Log("multi-block flush dedup: 3 writes (8+4+1 blocks, 3 WAL entries) → all data correct")
}

248
weed/storage/blockvol/iscsi/large_write_mem_test.go

@ -0,0 +1,248 @@
package iscsi
import (
"bytes"
"encoding/binary"
"io"
"log"
"net"
"runtime"
"sync/atomic"
"testing"
"time"
)
// slowDevice wraps a device and adds configurable delay to WriteAt.
// This simulates a real blockvol that blocks on WAL-full during large writes.
type slowDevice struct {
inner BlockDevice
writeDelay time.Duration
writeCalls atomic.Int64
}
func (d *slowDevice) ReadAt(lba uint64, length uint32) ([]byte, error) {
return d.inner.ReadAt(lba, length)
}
func (d *slowDevice) WriteAt(lba uint64, data []byte) error {
d.writeCalls.Add(1)
if d.writeDelay > 0 {
time.Sleep(d.writeDelay)
}
return d.inner.WriteAt(lba, data)
}
func (d *slowDevice) Trim(lba uint64, length uint32) error { return d.inner.Trim(lba, length) }
func (d *slowDevice) SyncCache() error { return d.inner.SyncCache() }
func (d *slowDevice) BlockSize() uint32 { return d.inner.BlockSize() }
func (d *slowDevice) VolumeSize() uint64 { return d.inner.VolumeSize() }
func (d *slowDevice) IsHealthy() bool { return true }
// TestLargeWriteMemory_4MB sends 4MB WRITE(10) commands through a real
// iSCSI session and measures heap growth. This is the in-process version
// of the hardware test that showed 25GB RSS.
func TestLargeWriteMemory_4MB(t *testing.T) {
// Use a mock device with 64MB (16K blocks).
mockDev := newMockDevice(64 * 1024 * 1024)
dev := &slowDevice{inner: mockDev, writeDelay: 0}
client, server := net.Pipe()
defer client.Close()
config := DefaultTargetConfig()
config.TargetName = testTargetName
config.MaxRecvDataSegmentLength = 262144 // 256KB
config.MaxBurstLength = 4 * 1024 * 1024 // 4MB — allow full 4MB burst
config.FirstBurstLength = 65536 // 64KB immediate
config.ImmediateData = true
config.InitialR2T = true
resolver := newTestResolverWithDevice(dev)
logger := log.New(io.Discard, "", 0)
sess := NewSession(server, config, resolver, resolver, logger)
done := make(chan error, 1)
go func() { done <- sess.HandleConnection() }()
defer func() { client.Close(); <-done }()
doLogin(t, client)
runtime.GC()
var m runtime.MemStats
runtime.ReadMemStats(&m)
heapBefore := int64(m.HeapAlloc)
// Send 10 × 4MB WRITE(10) commands with immediate data + R2T flow.
writeSize := 4 * 1024 * 1024
data := make([]byte, writeSize)
for i := range data {
data[i] = byte(i)
}
cmdSN := uint32(0)
for i := 0; i < 10; i++ {
lba := uint32(i * 1024) // 4MB = 1024 blocks of 4KB
blocks := uint16(1024)
// Send SCSI WRITE(10) with immediate data (first 64KB).
cmd := &PDU{}
cmd.SetOpcode(OpSCSICmd)
cmd.SetOpSpecific1(FlagF | FlagW)
cmd.SetInitiatorTaskTag(uint32(i + 1))
cmd.SetExpectedDataTransferLength(uint32(writeSize))
cmd.SetCmdSN(cmdSN)
cmdSN++
var cdb [16]byte
cdb[0] = ScsiWrite10
binary.BigEndian.PutUint32(cdb[2:6], lba)
binary.BigEndian.PutUint16(cdb[7:9], blocks)
cmd.SetCDB(cdb)
// Immediate data: first 64KB.
immLen := 65536
cmd.DataSegment = data[:immLen]
if err := WritePDU(client, cmd); err != nil {
t.Fatalf("write cmd %d: %v", i, err)
}
// Handle R2T + Data-Out for remaining data.
sent := immLen
for sent < writeSize {
// Read R2T.
r2t, err := ReadPDU(client)
if err != nil {
t.Fatalf("read R2T %d: %v", i, err)
}
if r2t.Opcode() == OpSCSIResp {
status := r2t.SCSIStatus()
t.Fatalf("write %d: early SCSI response status=0x%02x (sent %d/%d)", i, status, sent, writeSize)
}
if r2t.Opcode() != OpR2T {
t.Fatalf("write %d: expected R2T, got %s", i, OpcodeName(r2t.Opcode()))
}
desiredLen := int(r2t.DesiredDataLength())
ttt := r2t.TargetTransferTag()
// Send Data-Out PDUs in MaxRecvDataSegmentLength chunks.
seqSent := 0
dataSN := uint32(0)
for seqSent < desiredLen && sent < writeSize {
chunk := config.MaxRecvDataSegmentLength
if desiredLen-seqSent < chunk {
chunk = desiredLen - seqSent
}
if writeSize-sent < chunk {
chunk = writeSize - sent
}
doPDU := &PDU{}
doPDU.SetOpcode(OpSCSIDataOut)
doPDU.SetInitiatorTaskTag(uint32(i + 1))
doPDU.SetTargetTransferTag(ttt)
doPDU.SetBufferOffset(uint32(sent))
doPDU.SetDataSN(dataSN)
dataSN++
doPDU.DataSegment = data[sent : sent+chunk]
if seqSent+chunk >= desiredLen || sent+chunk >= writeSize {
doPDU.SetOpSpecific1(FlagF) // Final
}
if err := WritePDU(client, doPDU); err != nil {
t.Fatalf("write data-out %d: %v", i, err)
}
sent += chunk
seqSent += chunk
}
}
// Read SCSI Response.
client.SetReadDeadline(time.Now().Add(10 * time.Second))
resp, err := ReadPDU(client)
client.SetReadDeadline(time.Time{})
if err != nil {
t.Fatalf("read response %d: %v", i, err)
}
if resp.Opcode() != OpSCSIResp {
t.Fatalf("write %d: expected SCSIResp, got %s", i, OpcodeName(resp.Opcode()))
}
if resp.SCSIStatus() != SCSIStatusGood {
t.Fatalf("write %d: status=0x%02x", i, resp.SCSIStatus())
}
runtime.GC()
runtime.ReadMemStats(&m)
heap := int64(m.HeapAlloc)
t.Logf("write %d: heap=%d MB (delta=%d MB)", i, heap/(1024*1024), (heap-heapBefore)/(1024*1024))
}
runtime.GC()
runtime.ReadMemStats(&m)
heapAfter := int64(m.HeapAlloc)
deltaMB := (heapAfter - heapBefore) / (1024 * 1024)
t.Logf("final: heap=%d MB, delta=%d MB, writes=%d", heapAfter/(1024*1024), deltaMB, dev.writeCalls.Load())
if deltaMB > 200 {
t.Errorf("MEMORY LEAK: heap grew %d MB for 10 × 4MB iSCSI writes", deltaMB)
}
}
// TestLargeWriteMemory_SlowDevice simulates WAL-full blocking: writes
// take 100ms each (as if WAL admission is throttling). This keeps buffers
// alive longer and tests if they accumulate.
func TestLargeWriteMemory_SlowDevice(t *testing.T) {
mockDev := newMockDevice(64 * 1024 * 1024)
dev := &slowDevice{inner: mockDev, writeDelay: 100 * time.Millisecond}
client, server := net.Pipe()
defer client.Close()
config := DefaultTargetConfig()
config.TargetName = testTargetName
config.ImmediateData = true
config.InitialR2T = false // allow full immediate data
resolver := newTestResolverWithDevice(dev)
logger := log.New(io.Discard, "", 0)
sess := NewSession(server, config, resolver, resolver, logger)
doneCh := make(chan error, 1)
go func() { doneCh <- sess.HandleConnection() }()
defer func() { client.Close(); <-doneCh }()
doLogin(t, client)
runtime.GC()
var m runtime.MemStats
runtime.ReadMemStats(&m)
heapBefore := int64(m.HeapAlloc)
// Send many 4KB writes rapidly (simulating inode table writes).
cmdSN := uint32(0)
for i := 0; i < 200; i++ {
sendSCSIWriteImmediate(t, client, uint32(i), bytes.Repeat([]byte{0xBB}, 4096), uint32(i+1), cmdSN)
cmdSN++
resp, err := ReadPDU(client)
if err != nil {
t.Fatalf("write %d response: %v", i, err)
}
if resp.SCSIStatus() != SCSIStatusGood {
t.Fatalf("write %d: status=0x%02x", i, resp.SCSIStatus())
}
}
runtime.GC()
runtime.ReadMemStats(&m)
heapAfter := int64(m.HeapAlloc)
delta := (heapAfter - heapBefore) / (1024 * 1024)
t.Logf("200 × 4KB writes with 100ms delay: heap delta=%d MB, writes=%d", delta, dev.writeCalls.Load())
if delta > 100 {
t.Errorf("MEMORY LEAK with slow device: heap grew %d MB", delta)
}
}

283
weed/storage/blockvol/test/component/large_write_test.go

@ -0,0 +1,283 @@
package component
import (
"fmt"
"runtime"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
// These tests verify that multi-block writes (4MB from mkfs.ext4) work
// correctly and do not cause excessive memory usage in the flusher.
//
// Bug: flusher.go allocated make([]byte, 4MB) per dirty block instead of
// per unique WAL entry. A 4MB write (1024 blocks) caused 1024 × 4MB = 4GB.
// Fix: deduplicate WAL reads by WalOffset.
func createLargeWriteVol(t *testing.T, volSize, walSize uint64) *blockvol.BlockVol {
t.Helper()
path := t.TempDir() + "/test.blk"
vol, err := blockvol.CreateBlockVol(path, blockvol.CreateOptions{
VolumeSize: volSize,
BlockSize: 4096,
WALSize: walSize,
})
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
t.Cleanup(func() { vol.Close() })
return vol
}
// heapMB returns current heap usage in MB as int64 (safe for subtraction).
func heapMB() int64 {
var m runtime.MemStats
runtime.ReadMemStats(&m)
return int64(m.HeapAlloc) / (1024 * 1024)
}
// Test 1: Single 4MB write — does it succeed at all?
func TestLargeWrite_Single4MB(t *testing.T) {
vol := createLargeWriteVol(t, 64*1024*1024, 64*1024*1024)
data := make([]byte, 4*1024*1024)
for i := range data {
data[i] = 0xAA
}
if err := vol.WriteLBA(0, data); err != nil {
t.Fatalf("4MB WriteLBA failed: %v", err)
}
readBack, err := vol.ReadLBA(0, 4*1024*1024)
if err != nil {
t.Fatalf("4MB ReadLBA failed: %v", err)
}
if readBack[0] != 0xAA || readBack[4*1024*1024-1] != 0xAA {
t.Fatal("read data mismatch")
}
t.Log("single 4MB write+read: OK")
}
// Test 2: Sequential 4MB writes — simulates mkfs journal creation.
func TestLargeWrite_Sequential4MB_MkfsSim(t *testing.T) {
vol := createLargeWriteVol(t, 128*1024*1024, 64*1024*1024)
data := make([]byte, 4*1024*1024)
for i := range data {
data[i] = byte(i & 0xFF)
}
before := heapMB()
t.Logf("heap before: %d MB", before)
start := time.Now()
const writeCount = 20
for i := range writeCount {
lba := uint64(i) * 1024
if err := vol.WriteLBA(lba, data); err != nil {
t.Fatalf("write %d at LBA %d failed after %s: %v", i, lba, time.Since(start), err)
}
if i > 0 && i%5 == 0 {
t.Logf(" write %d/%d: elapsed=%s heap=%dMB", i, writeCount, time.Since(start), heapMB())
}
}
after := heapMB()
delta := after - before
t.Logf("heap after: %d MB (delta=%d MB)", after, delta)
t.Logf("%d × 4MB writes in %s (%.1f MB/s)", writeCount, time.Since(start),
float64(writeCount*4)/time.Since(start).Seconds())
if delta > 500 {
t.Errorf("MEMORY BLOWUP: heap grew by %d MB for %d MB of writes", delta, writeCount*4)
}
}
// Test 3: Concurrent 4MB writes — worst case for WAL admission.
func TestLargeWrite_Concurrent4MB(t *testing.T) {
vol := createLargeWriteVol(t, 512*1024*1024, 64*1024*1024)
data := make([]byte, 4*1024*1024)
for i := range data {
data[i] = byte(i & 0xFF)
}
before := heapMB()
t.Logf("heap before concurrent: %d MB", before)
const writers = 16
const writesPerWriter = 5
var wg sync.WaitGroup
errs := make([]error, writers)
start := time.Now()
for w := range writers {
wg.Add(1)
go func() {
defer wg.Done()
for i := range writesPerWriter {
lba := uint64(w*writesPerWriter+i) * 1024
if err := vol.WriteLBA(lba, data); err != nil {
errs[w] = fmt.Errorf("writer %d write %d: %v", w, i, err)
return
}
}
}()
}
done := make(chan struct{})
go func() { wg.Wait(); close(done) }()
select {
case <-done:
case <-time.After(60 * time.Second):
t.Fatal("TIMEOUT: concurrent 4MB writes hung for 60s")
}
for _, err := range errs {
if err != nil {
t.Error(err)
}
}
after := heapMB()
delta := after - before
t.Logf("heap after concurrent: %d MB (delta=%d MB)", after, delta)
t.Logf("%d writers × %d × 4MB = %d MB in %s",
writers, writesPerWriter, writers*writesPerWriter*4, time.Since(start))
if delta > 1000 {
t.Errorf("MEMORY BLOWUP: heap grew by %d MB", delta)
}
}
// Test 4: WAL size vs entry size — 4MB entry in 1MB WAL should fail, not hang.
func TestLargeWrite_EntryLargerThanWAL(t *testing.T) {
vol := createLargeWriteVol(t, 64*1024*1024, 1*1024*1024)
err := vol.WriteLBA(0, make([]byte, 4*1024*1024))
if err == nil {
t.Log("4MB write succeeded in 1MB WAL — entry was split or WAL grew")
} else {
t.Logf("4MB write in 1MB WAL returned error (expected): %v", err)
}
}
// Test 5: Mixed 4K+1M+4M writes — simulates real mkfs pattern.
func TestLargeWrite_MixedSizes(t *testing.T) {
vol := createLargeWriteVol(t, 128*1024*1024, 64*1024*1024)
small := make([]byte, 4096)
large := make([]byte, 1<<20)
huge := make([]byte, 4<<20)
for i := range small {
small[i] = 0x11
}
for i := range large {
large[i] = 0x22
}
for i := range huge {
huge[i] = 0x33
}
start := time.Now()
for i := range 100 {
if err := vol.WriteLBA(uint64(i), small); err != nil {
t.Fatalf("small write %d: %v", i, err)
}
}
t.Logf("phase 1: 100 × 4KB in %s", time.Since(start))
phase2 := time.Now()
for i := range 10 {
if err := vol.WriteLBA(uint64(1000+i*256), large); err != nil {
t.Fatalf("large write %d: %v", i, err)
}
}
t.Logf("phase 2: 10 × 1MB in %s", time.Since(phase2))
phase3 := time.Now()
for i := range 10 {
if err := vol.WriteLBA(uint64(10000+i*1024), huge); err != nil {
t.Fatalf("huge write %d: %v (elapsed %s)", i, err, time.Since(phase3))
}
}
t.Logf("phase 3: 10 × 4MB in %s", time.Since(phase3))
t.Logf("total: %s, final heap: %d MB", time.Since(start), heapMB())
}
// Test 6: Production-sized volume (2GB) — memory stays bounded.
func TestLargeWrite_ProductionVolumeMemory(t *testing.T) {
if testing.Short() {
t.Skip("skipping: creates 2GB file on disk")
}
runtime.GC()
before := heapMB()
t.Logf("heap before create: %d MB", before)
vol := createLargeWriteVol(t, 2*1024*1024*1024, 64*1024*1024)
runtime.GC()
afterCreate := heapMB()
t.Logf("heap after create: %d MB (delta=%d MB)", afterCreate, afterCreate-before)
small := make([]byte, 4096)
for i := range 1000 {
if err := vol.WriteLBA(uint64(i)*100, small); err != nil {
t.Fatalf("small write %d: %v", i, err)
}
}
runtime.GC()
afterSmall := heapMB()
t.Logf("heap after 1000 × 4KB: %d MB (delta=%d MB)", afterSmall, afterSmall-before)
large := make([]byte, 4*1024*1024)
for i := range 10 {
if err := vol.WriteLBA(uint64(200000+i*1024), large); err != nil {
t.Logf("large write %d failed: %v", i, err)
break
}
}
runtime.GC()
afterLarge := heapMB()
delta := afterLarge - before
t.Logf("heap after 4MB writes: %d MB (delta=%d MB)", afterLarge, delta)
if delta > 500 {
t.Errorf("EXCESSIVE MEMORY: heap grew %d MB for 2GB volume", delta)
}
}
// Test 7: Sustained 4MB writes for 30s — flusher keeps up.
func TestLargeWrite_FlusherThroughput(t *testing.T) {
if testing.Short() {
t.Skip("30s sustained write test")
}
vol := createLargeWriteVol(t, 256*1024*1024, 64*1024*1024)
data := make([]byte, 4*1024*1024)
start := time.Now()
deadline := start.Add(30 * time.Second)
writes := 0
for time.Now().Before(deadline) {
lba := uint64(writes%50) * 1024
if err := vol.WriteLBA(lba, data); err != nil {
t.Fatalf("write %d failed after %s: %v", writes, time.Since(start), err)
}
writes++
}
t.Logf("flusher throughput: %d × 4MB in %s (%.1f writes/s, %.1f MB/s)",
writes, time.Since(start), float64(writes)/time.Since(start).Seconds(),
float64(writes*4)/time.Since(start).Seconds())
t.Logf("final heap: %d MB", heapMB())
}

112
weed/storage/blockvol/testrunner/actions/devops.go

@ -31,6 +31,7 @@ func RegisterDevOpsActions(r *tr.Registry) {
r.RegisterFunc("block_status", tr.TierDevOps, blockStatus)
r.RegisterFunc("block_promote", tr.TierDevOps, blockPromote)
r.RegisterFunc("wait_volume_healthy", tr.TierDevOps, waitVolumeHealthy)
r.RegisterFunc("discover_primary", tr.TierDevOps, discoverPrimary)
}
// setISCSIVars sets the save_as_iscsi_host/port/addr/iqn vars from a VolumeInfo.
@ -364,9 +365,25 @@ func lookupBlockVolume(ctx context.Context, actx *tr.ActionContext, act tr.Actio
if act.SaveAs != "" {
setISCSIVars(actx, act.SaveAs, info)
// Also save replica data/ctrl addresses + ports for fault injection.
if info.ReplicaDataAddr != "" {
actx.Vars[act.SaveAs+"_replica_data_addr"] = info.ReplicaDataAddr
_, dataPort, _ := net.SplitHostPort(info.ReplicaDataAddr)
actx.Vars[act.SaveAs+"_replica_data_port"] = dataPort
}
if info.ReplicaCtrlAddr != "" {
actx.Vars[act.SaveAs+"_replica_ctrl_addr"] = info.ReplicaCtrlAddr
_, ctrlPort, _ := net.SplitHostPort(info.ReplicaCtrlAddr)
actx.Vars[act.SaveAs+"_replica_ctrl_port"] = ctrlPort
}
// Save primary server host for iptables targeting.
if info.VolumeServer != "" {
host, _, _ := net.SplitHostPort(info.VolumeServer)
actx.Vars[act.SaveAs+"_primary_host"] = host
}
}
actx.Log(" looked up %s: size=%d iscsi=%s", name, info.SizeBytes, info.ISCSIAddr)
actx.Log(" looked up %s: size=%d iscsi=%s repl_data=%s", name, info.SizeBytes, info.ISCSIAddr, info.ReplicaDataAddr)
return map[string]string{"value": strconv.FormatUint(info.SizeBytes, 10)}, nil
}
@ -741,6 +758,99 @@ func waitVolumeHealthy(ctx context.Context, actx *tr.ActionContext, act tr.Actio
}
}
// discoverPrimary looks up a block volume and maps the primary's IP to a topology node name.
// This solves the "which node to kill?" problem for degraded-mode and failover scenarios.
//
// Params:
// - name: volume name (required)
// - master_url: master API (or from var)
//
// Saves (using save_as prefix):
// - save_as = node name (e.g. "m01")
// - save_as_server = full server address (e.g. "10.0.0.3:18480")
// - save_as_host = server IP (e.g. "10.0.0.3")
// - save_as_replica_node = replica node name (if RF>1 and replica found)
func discoverPrimary(ctx context.Context, actx *tr.ActionContext, act tr.Action) (map[string]string, error) {
client, err := blockAPIClient(actx, act)
if err != nil {
return nil, fmt.Errorf("discover_primary: %w", err)
}
name := act.Params["name"]
if name == "" {
name = actx.Vars["volume_name"]
}
if name == "" {
return nil, fmt.Errorf("discover_primary: name param required")
}
info, err := client.LookupVolume(ctx, name)
if err != nil {
return nil, fmt.Errorf("discover_primary: lookup %s: %w", name, err)
}
primaryHost, _, _ := net.SplitHostPort(info.VolumeServer)
if primaryHost == "" {
return nil, fmt.Errorf("discover_primary: volume_server %q has no host", info.VolumeServer)
}
// Match primary host to topology node (check Host and AltIPs).
primaryNode := ""
if actx.Scenario != nil {
for nodeName, nodeSpec := range actx.Scenario.Topology.Nodes {
if nodeSpec.Host == primaryHost {
primaryNode = nodeName
break
}
for _, altIP := range nodeSpec.AltIPs {
if altIP == primaryHost {
primaryNode = nodeName
break
}
}
if primaryNode != "" {
break
}
}
}
if primaryNode == "" {
return nil, fmt.Errorf("discover_primary: no topology node matches primary host %s", primaryHost)
}
result := map[string]string{"value": primaryNode}
if act.SaveAs != "" {
actx.Vars[act.SaveAs] = primaryNode
actx.Vars[act.SaveAs+"_server"] = info.VolumeServer
actx.Vars[act.SaveAs+"_host"] = primaryHost
}
// Also discover replica node if available.
if info.ReplicaServer != "" {
replicaHost, _, _ := net.SplitHostPort(info.ReplicaServer)
for nodeName, nodeSpec := range actx.Scenario.Topology.Nodes {
match := nodeSpec.Host == replicaHost
if !match {
for _, altIP := range nodeSpec.AltIPs {
if altIP == replicaHost {
match = true
break
}
}
}
if match {
if act.SaveAs != "" {
actx.Vars[act.SaveAs+"_replica_node"] = nodeName
}
actx.Log(" primary=%s (node %s), replica=%s (node %s)", info.VolumeServer, primaryNode, info.ReplicaServer, nodeName)
return result, nil
}
}
}
actx.Log(" primary=%s (node %s)", info.VolumeServer, primaryNode)
return result, nil
}
// clusterStatus fetches the full cluster status JSON.
func clusterStatus(ctx context.Context, actx *tr.ActionContext, act tr.Action) (map[string]string, error) {
node, err := GetNode(actx, act.Node)

20
weed/storage/blockvol/testrunner/actions/devops_test.go

@ -27,6 +27,8 @@ func TestDevOpsActions_Registration(t *testing.T) {
"assert_block_field",
"block_status",
"block_promote",
"wait_volume_healthy",
"discover_primary",
}
for _, name := range expected {
@ -43,8 +45,8 @@ func TestDevOpsActions_Tier(t *testing.T) {
byTier := registry.ListByTier()
devopsActions := byTier[tr.TierDevOps]
if len(devopsActions) != 16 {
t.Errorf("devops tier has %d actions, want 16", len(devopsActions))
if len(devopsActions) != 17 {
t.Errorf("devops tier has %d actions, want 17", len(devopsActions))
}
// Verify all are in devops tier.
@ -97,11 +99,11 @@ func TestAllActions_Registration(t *testing.T) {
if n := len(byTier[tr.TierCore]); n != 17 {
t.Errorf("core: %d, want 17", n)
}
if n := len(byTier[tr.TierBlock]); n != 62 {
t.Errorf("block: %d, want 62", n)
if n := len(byTier[tr.TierBlock]); n != 64 {
t.Errorf("block: %d, want 64", n)
}
if n := len(byTier[tr.TierDevOps]); n != 16 {
t.Errorf("devops: %d, want 16", n)
if n := len(byTier[tr.TierDevOps]); n != 17 {
t.Errorf("devops: %d, want 17", n)
}
if n := len(byTier[tr.TierChaos]); n != 5 {
t.Errorf("chaos: %d, want 5", n)
@ -110,13 +112,13 @@ func TestAllActions_Registration(t *testing.T) {
t.Errorf("k8s: %d, want 14", n)
}
// Total should be 114 (112 prev + 2 recovery: measure_recovery, validate_recovery_regression).
// Total should be 116 (115 prev + 1 recovery: measure_rebuild).
total := 0
for _, actions := range byTier {
total += len(actions)
}
if total != 114 {
t.Errorf("total actions: %d, want 114", total)
if total != 117 {
t.Errorf("total actions: %d, want 117", total)
}
}

209
weed/storage/blockvol/testrunner/scenarios/internal/benchmark-pgbench.yaml

@ -0,0 +1,209 @@
name: benchmark-pgbench
timeout: 15m
# Database benchmark: PostgreSQL pgbench on sw-block.
#
# Measures TPS for TPC-B (mixed read/write) and SELECT-only at various
# client counts. This is the most product-relevant benchmark — operators
# care about database performance on block storage.
#
# Competitive reference (March 2024, RF=2 sync, 25Gbps):
# Ceph RBD: 431 TPS
# Longhorn V1: 2,754 TPS
# sw-block: 3,226 TPS (old code, commit e92263b)
# Mayastor: 5,801 TPS
#
# Current sw-block number after syncWithWALProgress fix: unknown.
env:
master_url: "http://10.0.0.3:9433"
volume_name: bench-pgbench
# 2GB is enough for pgbench scale=10 (~150MB data).
# 4GB caused OOM on VS (20GB RSS → OOM killed).
vol_size: "2147483648"
topology:
nodes:
m01:
host: 192.168.1.181
alt_ips: ["10.0.0.1"]
user: testdev
key: "/opt/work/testdev_key"
m02:
host: 192.168.1.184
alt_ips: ["10.0.0.3"]
user: testdev
key: "/opt/work/testdev_key"
phases:
- name: cluster-start
actions:
- action: exec
node: m02
cmd: "fuser -k 9433/tcp 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-pgb-master /tmp/sw-pgb-vs1; mkdir -p /tmp/sw-pgb-master /tmp/sw-pgb-vs1/blocks"
root: "true"
ignore_error: true
- action: exec
node: m01
cmd: "fuser -k 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-pgb-vs2; mkdir -p /tmp/sw-pgb-vs2/blocks"
root: "true"
ignore_error: true
- action: start_weed_master
node: m02
port: "9433"
dir: /tmp/sw-pgb-master
extra_args: "-ip=10.0.0.3"
save_as: master_pid
- action: sleep
duration: 3s
- action: start_weed_volume
node: m02
port: "18480"
master: "10.0.0.3:9433"
dir: /tmp/sw-pgb-vs1
extra_args: "-block.dir=/tmp/sw-pgb-vs1/blocks -block.listen=:3295 -ip=10.0.0.3"
save_as: vs1_pid
- action: start_weed_volume
node: m01
port: "18480"
master: "10.0.0.3:9433"
dir: /tmp/sw-pgb-vs2
extra_args: "-block.dir=/tmp/sw-pgb-vs2/blocks -block.listen=:3295 -ip=10.0.0.1"
save_as: vs2_pid
- action: sleep
duration: 3s
- action: wait_cluster_ready
node: m02
master_url: "{{ master_url }}"
- action: wait_block_servers
count: "2"
- name: create-volume
actions:
- action: create_block_volume
name: "{{ volume_name }}"
size_bytes: "{{ vol_size }}"
replica_factor: "2"
durability_mode: "sync_all"
- action: wait_volume_healthy
name: "{{ volume_name }}"
timeout: 60s
- name: connect
actions:
- action: lookup_block_volume
name: "{{ volume_name }}"
save_as: vol
- action: iscsi_login_direct
node: m01
host: "{{ vol_iscsi_host }}"
port: "{{ vol_iscsi_port }}"
iqn: "{{ vol_iqn }}"
save_as: device
# Short sleep for device to settle.
- action: sleep
duration: 3s
- name: pgbench-setup
actions:
- action: pgbench_init
node: m01
device: "{{ device }}"
mount: "/mnt/pgbench"
port: "5434"
scale: "10"
fstype: "ext4"
- name: tpcb-c1
actions:
- action: pgbench_run
node: m01
clients: "1"
duration: "30"
save_as: tps_c1
- action: print
msg: "TPC-B c=1: {{ tps_c1 }} TPS"
- name: tpcb-c4
actions:
- action: pgbench_run
node: m01
clients: "4"
duration: "30"
save_as: tps_c4
- action: print
msg: "TPC-B c=4: {{ tps_c4 }} TPS"
- name: tpcb-c8
actions:
- action: pgbench_run
node: m01
clients: "8"
duration: "30"
save_as: tps_c8
- action: print
msg: "TPC-B c=8: {{ tps_c8 }} TPS"
- name: select-c4
actions:
- action: pgbench_run
node: m01
clients: "4"
duration: "30"
select_only: "true"
save_as: tps_sel
- action: print
msg: "SELECT-only c=4: {{ tps_sel }} TPS"
- name: results
actions:
- action: print
msg: "=== pgbench: RF=2 sync_all iSCSI/RoCE ==="
- action: print
msg: "TPC-B c=1: {{ tps_c1 }} TPS"
- action: print
msg: "TPC-B c=4: {{ tps_c4 }} TPS"
- action: print
msg: "TPC-B c=8: {{ tps_c8 }} TPS"
- action: print
msg: "SELECT-only c=4: {{ tps_sel }} TPS"
- action: collect_results
title: "pgbench: RF=2 sync_all iSCSI/RoCE"
volume_name: "{{ volume_name }}"
- name: cleanup
always: true
actions:
- action: pgbench_cleanup
node: m01
ignore_error: true
- action: iscsi_cleanup
node: m01
ignore_error: true
- action: stop_weed
node: m01
pid: "{{ vs2_pid }}"
ignore_error: true
- action: stop_weed
node: m02
pid: "{{ vs1_pid }}"
ignore_error: true
- action: stop_weed
node: m02
pid: "{{ master_pid }}"
ignore_error: true

306
weed/storage/blockvol/testrunner/scenarios/internal/ec3-fast-reconnect-skips-failover.yaml

@ -0,0 +1,306 @@
name: ec3-fast-reconnect-skips-failover
timeout: 10m
# Edge Case #3: Fast reconnect skips failover.
#
# Scenario: Kill primary VS and restart it BEFORE lease expires (~30s TTL).
# Risk: The restarted VS re-registers via heartbeat and master accepts it
# as primary again without going through the failover+promotion path.
# The VS may serve with stale epoch/role if the fast reconnect bypasses
# the assignment pipeline.
#
# What we verify:
# 1. After restart, the primary epoch is >= the original epoch
# 2. I/O works after the fast restart
# 3. Replication is healthy (not stuck in degraded)
env:
master_url: "http://10.0.0.3:9433"
volume_name: ec3-test
vol_size: "1073741824"
topology:
nodes:
m01:
host: 192.168.1.181
alt_ips: ["10.0.0.1"]
user: testdev
key: "/opt/work/testdev_key"
m02:
host: 192.168.1.184
alt_ips: ["10.0.0.3"]
user: testdev
key: "/opt/work/testdev_key"
phases:
- name: cluster-start
actions:
- action: exec
node: m02
cmd: "fuser -k 9433/tcp 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-ec3-master /tmp/sw-ec3-vs1 && mkdir -p /tmp/sw-ec3-master /tmp/sw-ec3-vs1/blocks"
root: "true"
ignore_error: true
- action: exec
node: m01
cmd: "fuser -k 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-ec3-vs2 && mkdir -p /tmp/sw-ec3-vs2/blocks"
root: "true"
ignore_error: true
- action: start_weed_master
node: m02
port: "9433"
dir: /tmp/sw-ec3-master
extra_args: "-ip=10.0.0.3"
save_as: master_pid
- action: sleep
duration: 3s
- action: start_weed_volume
node: m02
port: "18480"
master: "10.0.0.3:9433"
dir: /tmp/sw-ec3-vs1
extra_args: "-block.dir=/tmp/sw-ec3-vs1/blocks -block.listen=:3295 -ip=10.0.0.3"
save_as: vs1_pid
- action: start_weed_volume
node: m01
port: "18480"
master: "10.0.0.3:9433"
dir: /tmp/sw-ec3-vs2
extra_args: "-block.dir=/tmp/sw-ec3-vs2/blocks -block.listen=:3295 -ip=10.0.0.1"
save_as: vs2_pid
- action: sleep
duration: 3s
- action: wait_cluster_ready
node: m02
master_url: "{{ master_url }}"
- action: wait_block_servers
count: "2"
- name: create-volume
actions:
- action: create_block_volume
name: "{{ volume_name }}"
size_bytes: "{{ vol_size }}"
replica_factor: "2"
durability_mode: "sync_all"
- action: wait_volume_healthy
name: "{{ volume_name }}"
timeout: 60s
- name: record-state-before
actions:
- action: discover_primary
name: "{{ volume_name }}"
save_as: before
- action: lookup_block_volume
name: "{{ volume_name }}"
save_as: vol_before
- action: assert_block_field
name: "{{ volume_name }}"
field: epoch
expected: "1"
# Write data so state is non-trivial.
- action: iscsi_login_direct
node: m01
host: "{{ vol_before_iscsi_host }}"
port: "{{ vol_before_iscsi_port }}"
iqn: "{{ vol_before_iqn }}"
save_as: device
- action: fio_json
node: m01
device: "{{ device }}"
rw: randwrite
bs: 4k
iodepth: "16"
runtime: "10"
time_based: "true"
name: pre-write
- action: iscsi_cleanup
node: m01
ignore_error: true
- action: print
msg: "Before: primary={{ before }} server={{ before_server }}"
- name: force-primary-m02
actions:
# Force primary to m02 so we have a known node to kill.
- action: block_promote
name: "{{ volume_name }}"
target_server: "10.0.0.3:18480"
force: "true"
reason: "ec3-setup"
- action: sleep
duration: 5s
- action: wait_volume_healthy
name: "{{ volume_name }}"
timeout: 60s
- action: discover_primary
name: "{{ volume_name }}"
save_as: primary_pre
- action: print
msg: "Primary forced to m02: {{ primary_pre_server }}"
- name: fast-kill-restart
actions:
# Kill PRIMARY VS on m02 with SIGKILL.
- action: exec
node: m02
cmd: "kill -9 {{ vs1_pid }}"
root: "true"
ignore_error: true
- action: print
msg: "Primary killed on m02. Fast restart in 3s (before 30s lease expires)..."
# CRITICAL: restart FAST, well within the 30s lease TTL.
- action: sleep
duration: 3s
- action: start_weed_volume
node: m02
port: "18480"
master: "10.0.0.3:9433"
dir: /tmp/sw-ec3-vs1
extra_args: "-block.dir=/tmp/sw-ec3-vs1/blocks -block.listen=:3295 -ip=10.0.0.3"
save_as: vs1_pid_new
- action: print
msg: "VS restarted on m02. Waiting for re-registration..."
# Wait for heartbeat re-registration.
- action: sleep
duration: 10s
- action: wait_block_servers
count: "2"
timeout: 30s
- name: verify-state-after
actions:
# Wait for volume to become healthy again.
# Use longer timeout — primary fast-restart may need full re-assignment cycle.
- action: wait_volume_healthy
name: "{{ volume_name }}"
timeout: 180s
- action: discover_primary
name: "{{ volume_name }}"
save_as: after
- action: print
msg: "After fast restart: primary={{ after }} server={{ after_server }}"
# Verify replication is not stuck degraded.
- action: assert_block_field
name: "{{ volume_name }}"
field: replica_degraded
expected: "false"
# Verify epoch is valid (>= 1, ideally bumped if re-assignment happened).
- action: lookup_block_volume
name: "{{ volume_name }}"
save_as: vol_after
- action: print
msg: "Epoch after: checking via lookup..."
- name: verify-io-after
actions:
# Fresh lookup to get current primary's iSCSI address
# (primary moved to m02 after promote, so original vars point to wrong host).
- action: lookup_block_volume
name: "{{ volume_name }}"
save_as: vol_cur
- action: iscsi_login_direct
node: m01
host: "{{ vol_cur_iscsi_host }}"
port: "{{ vol_cur_iscsi_port }}"
iqn: "{{ vol_cur_iqn }}"
save_as: device2
- action: fio_json
node: m01
device: "{{ device2 }}"
rw: randwrite
bs: 4k
iodepth: "16"
runtime: "10"
time_based: "true"
name: post-restart-write
save_as: fio_after
- action: fio_parse
json_var: fio_after
metric: iops
save_as: iops_after
- action: print
msg: "EC-3 post-restart write IOPS: {{ iops_after }}"
# Also verify read works.
- action: fio_json
node: m01
device: "{{ device2 }}"
rw: randread
bs: 4k
iodepth: "16"
runtime: "5"
time_based: "true"
name: post-restart-read
save_as: fio_read
- action: fio_parse
json_var: fio_read
metric: iops
save_as: iops_read
- action: print
msg: "EC-3 post-restart read IOPS: {{ iops_read }}"
- action: iscsi_cleanup
node: m01
ignore_error: true
- action: print
msg: "EC-3 PASS: fast reconnect did not leave system in broken state"
- name: cleanup
always: true
actions:
- action: iscsi_cleanup
node: m01
ignore_error: true
- action: stop_weed
node: m01
pid: "{{ vs2_pid }}"
ignore_error: true
- action: stop_weed
node: m02
pid: "{{ vs1_pid_new }}"
ignore_error: true
- action: stop_weed
node: m02
pid: "{{ vs1_pid }}"
ignore_error: true
- action: stop_weed
node: m02
pid: "{{ master_pid }}"
ignore_error: true

241
weed/storage/blockvol/testrunner/scenarios/internal/ec5-wrong-primary-master-restart.yaml

@ -0,0 +1,241 @@
name: ec5-wrong-primary-master-restart
timeout: 10m
# Edge Case #5: Wrong primary after master restart.
#
# Scenario: Create RF=2 volume, note primary. Kill master, restart it.
# Verify the SAME volume server remains primary (master state survived restart
# via heartbeat re-registration, not random re-assignment).
#
# Risk: If master loses all state on restart and re-assigns primary based on
# first heartbeat, the "wrong" VS could become primary, causing split-brain
# or data loss if the old primary still has unflushed WAL entries.
env:
master_url: "http://10.0.0.3:9433"
volume_name: ec5-test
vol_size: "1073741824"
topology:
nodes:
m01:
host: 192.168.1.181
alt_ips: ["10.0.0.1"]
user: testdev
key: "/opt/work/testdev_key"
m02:
host: 192.168.1.184
alt_ips: ["10.0.0.3"]
user: testdev
key: "/opt/work/testdev_key"
phases:
- name: cluster-start
actions:
- action: exec
node: m02
cmd: "fuser -k 9433/tcp 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-ec5-master /tmp/sw-ec5-vs1 && mkdir -p /tmp/sw-ec5-master /tmp/sw-ec5-vs1/blocks"
root: "true"
ignore_error: true
- action: exec
node: m01
cmd: "fuser -k 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-ec5-vs2 && mkdir -p /tmp/sw-ec5-vs2/blocks"
root: "true"
ignore_error: true
- action: start_weed_master
node: m02
port: "9433"
dir: /tmp/sw-ec5-master
extra_args: "-ip=10.0.0.3"
save_as: master_pid
- action: sleep
duration: 3s
- action: start_weed_volume
node: m02
port: "18480"
master: "10.0.0.3:9433"
dir: /tmp/sw-ec5-vs1
extra_args: "-block.dir=/tmp/sw-ec5-vs1/blocks -block.listen=:3295 -ip=10.0.0.3"
save_as: vs1_pid
- action: start_weed_volume
node: m01
port: "18480"
master: "10.0.0.3:9433"
dir: /tmp/sw-ec5-vs2
extra_args: "-block.dir=/tmp/sw-ec5-vs2/blocks -block.listen=:3295 -ip=10.0.0.1"
save_as: vs2_pid
- action: sleep
duration: 3s
- action: wait_cluster_ready
node: m02
master_url: "{{ master_url }}"
- action: wait_block_servers
count: "2"
- name: create-volume
actions:
- action: create_block_volume
name: "{{ volume_name }}"
size_bytes: "{{ vol_size }}"
replica_factor: "2"
durability_mode: "sync_all"
- action: wait_volume_healthy
name: "{{ volume_name }}"
timeout: 60s
- name: record-primary-before
actions:
- action: discover_primary
name: "{{ volume_name }}"
save_as: before
- action: print
msg: "Before master restart: primary={{ before }} (server={{ before_server }})"
# Write some data so primary has real state.
- action: lookup_block_volume
name: "{{ volume_name }}"
save_as: vol
- action: iscsi_login_direct
node: m01
host: "{{ vol_iscsi_host }}"
port: "{{ vol_iscsi_port }}"
iqn: "{{ vol_iqn }}"
save_as: device
- action: fio_json
node: m01
device: "{{ device }}"
rw: randwrite
bs: 4k
iodepth: "16"
runtime: "10"
time_based: "true"
name: pre-write
- action: iscsi_cleanup
node: m01
ignore_error: true
- name: kill-master
actions:
- action: exec
node: m02
cmd: "kill -9 {{ master_pid }}"
root: "true"
- action: print
msg: "Master killed. Waiting 5s before restart..."
- action: sleep
duration: 5s
- name: restart-master
actions:
- action: start_weed_master
node: m02
port: "9433"
dir: /tmp/sw-ec5-master
extra_args: "-ip=10.0.0.3"
save_as: master_pid_new
- action: sleep
duration: 5s
- action: wait_cluster_ready
node: m02
master_url: "{{ master_url }}"
# Wait for volume servers to re-register via heartbeat.
- action: wait_block_servers
count: "2"
timeout: 30s
# Give time for block volume re-registration via heartbeat.
- action: sleep
duration: 10s
- name: verify-primary-after
actions:
- action: discover_primary
name: "{{ volume_name }}"
save_as: after
- action: print
msg: "After master restart: primary={{ after }} (server={{ after_server }})"
# CRITICAL ASSERTION: primary must be the same node as before.
- action: assert_equal
actual: "{{ after_server }}"
expected: "{{ before_server }}"
msg: "EC-5 FAIL: primary changed from {{ before_server }} to {{ after_server }} after master restart"
- action: print
msg: "EC-5 PASS: primary unchanged after master restart ({{ before_server }})"
- name: verify-io-after
actions:
# Verify I/O still works after master restart.
# Use original lookup vars (vol_*) since re-registered volume
# may not have iSCSI addr populated until VS re-reports it.
- action: iscsi_login_direct
node: m01
host: "{{ vol_iscsi_host }}"
port: "{{ vol_iscsi_port }}"
iqn: "{{ vol_iqn }}"
save_as: device2
- action: fio_json
node: m01
device: "{{ device2 }}"
rw: randwrite
bs: 4k
iodepth: "16"
runtime: "10"
time_based: "true"
name: post-restart-write
save_as: fio_after
- action: fio_parse
json_var: fio_after
metric: iops
save_as: iops_after
- action: print
msg: "Post-restart write IOPS: {{ iops_after }}"
- action: iscsi_cleanup
node: m01
ignore_error: true
- name: cleanup
always: true
actions:
- action: iscsi_cleanup
node: m01
ignore_error: true
- action: stop_weed
node: m01
pid: "{{ vs2_pid }}"
ignore_error: true
- action: stop_weed
node: m02
pid: "{{ vs1_pid }}"
ignore_error: true
- action: stop_weed
node: m02
pid: "{{ master_pid_new }}"
ignore_error: true
- action: stop_weed
node: m02
pid: "{{ master_pid }}"
ignore_error: true

215
weed/storage/blockvol/testrunner/scenarios/internal/stable-degraded-best-effort.yaml

@ -0,0 +1,215 @@
name: stable-degraded-best-effort
timeout: 10m
# Stable dimension: RF=2 best_effort with replica down.
# best_effort writes succeed without barrier — should show ~0% IOPS impact
# when replica dies. Contrast with sync_all degraded mode.
env:
master_url: "http://10.0.0.3:9433"
volume_name: stable-be
vol_size: "1073741824"
topology:
nodes:
m01:
host: 192.168.1.181
alt_ips: ["10.0.0.1"]
user: testdev
key: "/opt/work/testdev_key"
m02:
host: 192.168.1.184
alt_ips: ["10.0.0.3"]
user: testdev
key: "/opt/work/testdev_key"
phases:
- name: cluster-start
actions:
- action: exec
node: m02
cmd: "fuser -k 9433/tcp 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-be-master /tmp/sw-be-vs1 && mkdir -p /tmp/sw-be-master /tmp/sw-be-vs1/blocks"
root: "true"
ignore_error: true
- action: exec
node: m01
cmd: "fuser -k 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-be-vs2 && mkdir -p /tmp/sw-be-vs2/blocks"
root: "true"
ignore_error: true
- action: start_weed_master
node: m02
port: "9433"
dir: /tmp/sw-be-master
extra_args: "-ip=10.0.0.3"
save_as: master_pid
- action: sleep
duration: 3s
- action: start_weed_volume
node: m02
port: "18480"
master: "10.0.0.3:9433"
dir: /tmp/sw-be-vs1
extra_args: "-block.dir=/tmp/sw-be-vs1/blocks -block.listen=:3295 -ip=10.0.0.3"
save_as: vs1_pid
- action: start_weed_volume
node: m01
port: "18480"
master: "10.0.0.3:9433"
dir: /tmp/sw-be-vs2
extra_args: "-block.dir=/tmp/sw-be-vs2/blocks -block.listen=:3295 -ip=10.0.0.1"
save_as: vs2_pid
- action: sleep
duration: 3s
- action: wait_cluster_ready
node: m02
master_url: "{{ master_url }}"
- action: wait_block_servers
count: "2"
- name: create-volume
actions:
- action: create_block_volume
name: "{{ volume_name }}"
size_bytes: "{{ vol_size }}"
replica_factor: "2"
durability_mode: "best_effort"
- action: wait_volume_healthy
name: "{{ volume_name }}"
timeout: 60s
# Force primary to m02 so we can kill m01 (replica) cleanly.
- action: block_promote
name: "{{ volume_name }}"
target_server: "10.0.0.3:18480"
force: "true"
reason: "be-degraded-setup"
- action: sleep
duration: 5s
- action: wait_volume_healthy
name: "{{ volume_name }}"
timeout: 60s
- action: discover_primary
name: "{{ volume_name }}"
save_as: pri
- action: print
msg: "Primary on {{ pri }} ({{ pri_server }}), replica on {{ pri_replica_node }}"
- name: connect
actions:
- action: lookup_block_volume
name: "{{ volume_name }}"
save_as: vol
- action: iscsi_login_direct
node: m01
host: "{{ vol_iscsi_host }}"
port: "{{ vol_iscsi_port }}"
iqn: "{{ vol_iqn }}"
save_as: device
- name: baseline
actions:
- action: print
msg: "=== Baseline: RF=2 best_effort, healthy ==="
- action: fio_json
node: m01
device: "{{ device }}"
rw: randwrite
bs: 4k
iodepth: "32"
runtime: "30"
time_based: "true"
name: baseline
save_as: fio_baseline
- action: fio_parse
json_var: fio_baseline
metric: iops
save_as: iops_baseline
- action: print
msg: "Healthy: {{ iops_baseline }} IOPS"
- name: kill-replica
actions:
- action: print
msg: "=== Killing replica VS on m01 ==="
- action: exec
node: m01
cmd: "kill -9 {{ vs2_pid }}"
root: "true"
ignore_error: true
# Wait for shipper to detect dead replica.
- action: sleep
duration: 10s
- name: degraded-write
actions:
- action: print
msg: "=== Degraded: best_effort, primary only ==="
- action: fio_json
node: m01
device: "{{ device }}"
rw: randwrite
bs: 4k
iodepth: "32"
runtime: "30"
time_based: "true"
name: degraded
save_as: fio_degraded
- action: fio_parse
json_var: fio_degraded
metric: iops
save_as: iops_degraded
- action: print
msg: "Degraded: {{ iops_degraded }} IOPS"
- name: results
actions:
- action: print
msg: "=== Stable: Degraded Mode (best_effort RF=2, iSCSI/RoCE) ==="
- action: print
msg: "Healthy: {{ iops_baseline }} IOPS"
- action: print
msg: "Degraded: {{ iops_degraded }} IOPS"
- action: collect_results
title: "Stable: Degraded Mode (best_effort RF=2)"
volume_name: "{{ volume_name }}"
- name: cleanup
always: true
actions:
- action: iscsi_cleanup
node: m01
ignore_error: true
- action: stop_weed
node: m01
pid: "{{ vs2_pid }}"
ignore_error: true
- action: stop_weed
node: m02
pid: "{{ vs1_pid }}"
ignore_error: true
- action: stop_weed
node: m02
pid: "{{ master_pid }}"
ignore_error: true

228
weed/storage/blockvol/testrunner/scenarios/internal/stable-degraded-mode.yaml

@ -0,0 +1,228 @@
name: stable-degraded-mode
timeout: 10m
# Stable dimension: write IOPS with replica down (sync_all RF=2).
#
# Answers: how much does primary performance change when replication
# barrier has no healthy replica to wait for?
#
# Flow:
# 1. Create volume, force primary to m02
# 2. Baseline: healthy RF=2 sync_all fio → IOPS
# 3. Kill replica (m01 VS)
# 4. Degraded: fio → IOPS (sync_all with dead replica)
# 5. Compare
env:
master_url: "http://10.0.0.3:9433"
volume_name: stable-deg
vol_size: "1073741824"
topology:
nodes:
m01:
host: 192.168.1.181
alt_ips: ["10.0.0.1"]
user: testdev
key: "/opt/work/testdev_key"
m02:
host: 192.168.1.184
alt_ips: ["10.0.0.3"]
user: testdev
key: "/opt/work/testdev_key"
phases:
- name: cluster-start
actions:
- action: exec
node: m02
cmd: "fuser -k 9433/tcp 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-deg-master /tmp/sw-deg-vs1 && mkdir -p /tmp/sw-deg-master /tmp/sw-deg-vs1/blocks"
root: "true"
ignore_error: true
- action: exec
node: m01
cmd: "fuser -k 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-deg-vs2 && mkdir -p /tmp/sw-deg-vs2/blocks"
root: "true"
ignore_error: true
- action: start_weed_master
node: m02
port: "9433"
dir: /tmp/sw-deg-master
extra_args: "-ip=10.0.0.3"
save_as: master_pid
- action: sleep
duration: 3s
- action: start_weed_volume
node: m02
port: "18480"
master: "10.0.0.3:9433"
dir: /tmp/sw-deg-vs1
extra_args: "-block.dir=/tmp/sw-deg-vs1/blocks -block.listen=:3295 -ip=10.0.0.3"
save_as: vs1_pid
- action: start_weed_volume
node: m01
port: "18480"
master: "10.0.0.3:9433"
dir: /tmp/sw-deg-vs2
extra_args: "-block.dir=/tmp/sw-deg-vs2/blocks -block.listen=:3295 -ip=10.0.0.1"
save_as: vs2_pid
- action: sleep
duration: 3s
- action: wait_cluster_ready
node: m02
master_url: "{{ master_url }}"
- action: wait_block_servers
count: "2"
- name: create-volume
actions:
- action: create_block_volume
name: "{{ volume_name }}"
size_bytes: "{{ vol_size }}"
replica_factor: "2"
durability_mode: "sync_all"
- action: wait_volume_healthy
name: "{{ volume_name }}"
timeout: 60s
# Force primary to m02 so we know which node to keep alive.
- action: block_promote
name: "{{ volume_name }}"
target_server: "10.0.0.3:18480"
force: "true"
reason: "degraded-setup"
- action: sleep
duration: 5s
- action: wait_volume_healthy
name: "{{ volume_name }}"
timeout: 60s
- action: discover_primary
name: "{{ volume_name }}"
save_as: pri
- action: print
msg: "Primary on {{ pri }} ({{ pri_server }}), replica on {{ pri_replica_node }}"
- name: connect
actions:
- action: lookup_block_volume
name: "{{ volume_name }}"
save_as: vol
# Connect iSCSI client (on m01) to primary (on m02).
- action: iscsi_login_direct
node: m01
host: "{{ vol_iscsi_host }}"
port: "{{ vol_iscsi_port }}"
iqn: "{{ vol_iqn }}"
save_as: device
- name: baseline-healthy
actions:
- action: print
msg: "=== Baseline: RF=2 sync_all, both replicas healthy ==="
- action: fio_json
node: m01
device: "{{ device }}"
rw: randwrite
bs: 4k
iodepth: "32"
runtime: "30"
time_based: "true"
name: baseline-healthy
save_as: fio_healthy
- action: fio_parse
json_var: fio_healthy
metric: iops
save_as: iops_healthy
- action: print
msg: "Healthy: {{ iops_healthy }} IOPS"
- name: kill-replica
actions:
- action: print
msg: "=== Killing replica VS on m01 ==="
# Kill replica (m01 VS). Primary (m02) keeps serving.
- action: exec
node: m01
cmd: "kill -9 {{ vs2_pid }}"
root: "true"
ignore_error: true
# Wait for shipper to detect dead replica and enter degraded mode.
- action: sleep
duration: 10s
- action: print
msg: "Replica killed. Shipper should be degraded now."
- name: degraded-write
actions:
- action: print
msg: "=== Degraded: sync_all primary only, replica dead ==="
- action: fio_json
node: m01
device: "{{ device }}"
rw: randwrite
bs: 4k
iodepth: "32"
runtime: "30"
time_based: "true"
name: degraded-write
save_as: fio_degraded
- action: fio_parse
json_var: fio_degraded
metric: iops
save_as: iops_degraded
- action: print
msg: "Degraded: {{ iops_degraded }} IOPS"
- name: results
actions:
- action: print
msg: "=== Stable: Degraded Mode (sync_all RF=2, iSCSI/RoCE) ==="
- action: print
msg: "Healthy: {{ iops_healthy }} IOPS"
- action: print
msg: "Degraded: {{ iops_degraded }} IOPS"
- action: collect_results
title: "Stable: Degraded Mode (sync_all RF=2)"
volume_name: "{{ volume_name }}"
- name: cleanup
always: true
actions:
- action: iscsi_cleanup
node: m01
ignore_error: true
- action: stop_weed
node: m01
pid: "{{ vs2_pid }}"
ignore_error: true
- action: stop_weed
node: m02
pid: "{{ vs1_pid }}"
ignore_error: true
- action: stop_weed
node: m02
pid: "{{ master_pid }}"
ignore_error: true

11
weed/storage/blockvol/testrunner/types.go

@ -76,11 +76,12 @@ type Topology struct {
// NodeSpec defines a remote (or local) machine.
type NodeSpec struct {
Host string `yaml:"host"`
User string `yaml:"user"`
KeyFile string `yaml:"key"`
IsLocal bool `yaml:"is_local"`
Agent string `yaml:"agent"` // maps node to an agent (coordinator mode)
Host string `yaml:"host"`
AltIPs []string `yaml:"alt_ips"` // additional IPs (e.g. RDMA) for node identification
User string `yaml:"user"`
KeyFile string `yaml:"key"`
IsLocal bool `yaml:"is_local"`
Agent string `yaml:"agent"` // maps node to an agent (coordinator mode)
}
// TargetSpec defines an iSCSI/NVMe target instance.

Loading…
Cancel
Save