Browse Source

test: add integration test infrastructure for blockvol iSCSI

Test harness for running blockvol iSCSI tests on WSL2 and remote nodes
(m01/M02). Includes Node (SSH/local exec), ISCSIClient (discover/login/
logout), WeedTarget (weed volume server lifecycle), and test suites for
smoke, stress, crash recovery, chaos, perf benchmarks, and apps (fio/dd).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
44da35faf6
  1. 311
      weed/storage/blockvol/test/apps_test.go
  2. 181
      weed/storage/blockvol/test/chaos_test.go
  3. 228
      weed/storage/blockvol/test/crash_test.go
  4. 281
      weed/storage/blockvol/test/integration_test.go
  5. 229
      weed/storage/blockvol/test/iscsi.go
  6. 316
      weed/storage/blockvol/test/node.go
  7. 168
      weed/storage/blockvol/test/perf_test.go
  8. 190
      weed/storage/blockvol/test/smoke_test.go
  9. 182
      weed/storage/blockvol/test/stress_test.go
  10. 212
      weed/storage/blockvol/test/weed_target.go
  11. 736
      weed/storage/blockvol/test/weedvol_test.go

311
weed/storage/blockvol/test/apps_test.go

@ -0,0 +1,311 @@
//go:build integration && apps
package test
import (
"context"
"fmt"
"strings"
"testing"
"time"
)
func TestApps(t *testing.T) {
t.Run("Postgres", testAppsPostgres)
t.Run("MySQL", testAppsMySQL)
t.Run("SQLiteWAL", testAppsSQLiteWAL)
t.Run("QemuBoot", testAppsQemuBoot)
t.Run("QemuFio", testAppsQemuFio)
t.Run("DockerOverlay", testAppsDockerOverlay)
t.Run("LVMStripe", testAppsLVMStripe)
t.Run("MdRaid1", testAppsMdRaid1)
}
func testAppsPostgres(t *testing.T) {
requireCmd(t, "pg_isready")
requireCmd(t, "pgbench")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "500M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
mnt := "/tmp/blockvol-pg"
pgdata := mnt + "/pgdata"
t.Cleanup(func() {
cleanCtx, c := context.WithTimeout(context.Background(), 15*time.Second)
defer c()
clientNode.RunRoot(cleanCtx, fmt.Sprintf("sudo -u postgres pg_ctl -D %s stop -m fast 2>/dev/null || true", pgdata))
clientNode.RunRoot(cleanCtx, fmt.Sprintf("umount -f %s 2>/dev/null", mnt))
clientNode.RunRoot(cleanCtx, fmt.Sprintf("rm -rf %s", mnt))
})
// mkfs + mount
clientNode.RunRoot(ctx, fmt.Sprintf("mkfs.ext4 -F %s", dev))
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s", mnt))
clientNode.RunRoot(ctx, fmt.Sprintf("mount %s %s", dev, mnt))
// initdb -- use full path since sudo doesn't inherit PG bin dir
// chown the entire mount point so postgres can write pg.log there
clientNode.RunRoot(ctx, fmt.Sprintf("chown postgres:postgres %s", mnt))
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s", pgdata))
clientNode.RunRoot(ctx, fmt.Sprintf("chown postgres:postgres %s", pgdata))
clientNode.RunRoot(ctx, fmt.Sprintf("chmod 700 %s", pgdata))
_, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("sudo -u postgres /usr/lib/postgresql/*/bin/initdb -D %s", pgdata))
if code != 0 {
t.Fatalf("initdb: code=%d stderr=%s", code, stderr)
}
// Start postgres with custom port to avoid conflict with system instance
_, stderr, code, _ = clientNode.RunRoot(ctx,
fmt.Sprintf("sudo -u postgres /usr/lib/postgresql/*/bin/pg_ctl -D %s -l %s/pg.log -o '-p 15432' start", pgdata, mnt))
if code != 0 {
t.Fatalf("pg_ctl start: code=%d stderr=%s", code, stderr)
}
// pgbench init + run
clientNode.RunRoot(ctx, "sudo -u postgres /usr/lib/postgresql/*/bin/createdb -p 15432 pgbench 2>/dev/null")
_, stderr, code, _ = clientNode.RunRoot(ctx, "sudo -u postgres pgbench -p 15432 -i pgbench")
if code != 0 {
t.Fatalf("pgbench init: code=%d stderr=%s", code, stderr)
}
stdout, stderr, code, _ := clientNode.RunRoot(ctx, "sudo -u postgres pgbench -p 15432 -T 30 pgbench")
if code != 0 {
t.Fatalf("pgbench run: code=%d stderr=%s", code, stderr)
}
// Extract TPS from pgbench output
for _, line := range strings.Split(stdout, "\n") {
if strings.Contains(line, "tps") {
t.Logf("pgbench: %s", strings.TrimSpace(line))
}
}
// Kill9 target
clientNode.RunRoot(ctx, fmt.Sprintf("sudo -u postgres /usr/lib/postgresql/*/bin/pg_ctl -D %s stop -m fast 2>/dev/null || true", pgdata))
clientNode.RunRoot(ctx, fmt.Sprintf("umount -f %s 2>/dev/null", mnt))
iscsi.Logout(ctx, tgt.config.IQN)
iscsi.CleanupAll(ctx, tgt.config.IQN)
tgt.Kill9()
// Restart and verify recovery
if err := tgt.Start(ctx, false); err != nil {
t.Fatalf("restart: %v", err)
}
dev, err := iscsi.Login(ctx, tgt.config.IQN)
if err != nil {
t.Fatalf("re-login: %v", err)
}
clientNode.RunRoot(ctx, fmt.Sprintf("mount %s %s", dev, mnt))
_, stderr, code, _ = clientNode.RunRoot(ctx,
fmt.Sprintf("sudo -u postgres /usr/lib/postgresql/*/bin/pg_ctl -D %s -l %s/pg.log -o '-p 15432' start", pgdata, mnt))
if code != 0 {
t.Fatalf("pg recovery start: code=%d stderr=%s", code, stderr)
}
// Verify recovery -- pg_isready should succeed
_, _, code, _ = clientNode.RunRoot(ctx, "pg_isready -p 15432")
if code != 0 {
t.Fatalf("pg_isready failed after recovery")
}
t.Log("postgres recovery after Kill9 succeeded")
}
func testAppsMySQL(t *testing.T) {
requireCmd(t, "mysqld")
requireCmd(t, "sysbench")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "500M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
mnt := "/tmp/blockvol-mysql"
mysqlData := mnt + "/mysql"
sock := "/tmp/mysql-blockvol-test.sock"
t.Cleanup(func() {
cleanCtx, c := context.WithTimeout(context.Background(), 15*time.Second)
defer c()
clientNode.RunRoot(cleanCtx, fmt.Sprintf("mysqladmin -u root -S %s shutdown 2>/dev/null || true", sock))
time.Sleep(2 * time.Second)
clientNode.RunRoot(cleanCtx, fmt.Sprintf("umount -f %s 2>/dev/null", mnt))
clientNode.RunRoot(cleanCtx, fmt.Sprintf("rm -rf %s %s", mnt, sock))
})
clientNode.RunRoot(ctx, fmt.Sprintf("mkfs.ext4 -F %s", dev))
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s", mnt))
clientNode.RunRoot(ctx, fmt.Sprintf("mount %s %s", dev, mnt))
// Stop any system mysqld to avoid port/socket conflicts
clientNode.RunRoot(ctx, "systemctl stop mysql 2>/dev/null || true")
clientNode.RunRoot(ctx, fmt.Sprintf("rm -f %s", sock))
// Initialize MySQL with custom datadir
// Run as root to avoid AppArmor ownership issues on iSCSI-backed ext4
clientNode.RunRoot(ctx, fmt.Sprintf("chown -R mysql:mysql %s", mnt))
_, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("mysqld --initialize-insecure --datadir=%s --user=root 2>&1", mysqlData))
if code != 0 {
t.Fatalf("mysqld init: code=%d stderr=%s", code, stderr)
}
// Start mysqld with custom socket and port
clientNode.RunRoot(ctx, fmt.Sprintf(
"bash -c 'mysqld --datadir=%s --socket=%s --port=13306 --user=root --skip-grant-tables &'",
mysqlData, sock))
// Wait for mysqld to be ready
for i := 0; i < 30; i++ {
_, _, code, _ = clientNode.RunRoot(ctx, fmt.Sprintf("mysqladmin -u root -S %s ping 2>/dev/null", sock))
if code == 0 {
break
}
time.Sleep(time.Second)
}
if code != 0 {
t.Fatalf("mysqld did not start")
}
// Sysbench
clientNode.RunRoot(ctx, fmt.Sprintf("mysql -u root -S %s -e 'CREATE DATABASE IF NOT EXISTS sbtest'", sock))
_, stderr, code, _ = clientNode.RunRoot(ctx, fmt.Sprintf(
"sysbench oltp_read_write --mysql-socket=%s --mysql-user=root --db-driver=mysql --tables=4 --table-size=1000 prepare", sock))
if code != 0 {
t.Fatalf("sysbench prepare: code=%d stderr=%s", code, stderr)
}
stdout, stderr, code, _ := clientNode.RunRoot(ctx, fmt.Sprintf(
"sysbench oltp_read_write --mysql-socket=%s --mysql-user=root --db-driver=mysql --tables=4 --table-size=1000 --time=30 run", sock))
if code != 0 {
t.Fatalf("sysbench run: code=%d stderr=%s", code, stderr)
}
for _, line := range strings.Split(stdout, "\n") {
if strings.Contains(line, "transactions:") || strings.Contains(line, "queries:") {
t.Logf("sysbench: %s", strings.TrimSpace(line))
}
}
// Clean shutdown
clientNode.RunRoot(ctx, fmt.Sprintf("mysqladmin -u root -S %s shutdown", sock))
t.Log("MySQL + sysbench test passed")
}
func testAppsSQLiteWAL(t *testing.T) {
requireCmd(t, "sqlite3")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "100M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
mnt := "/tmp/blockvol-sqlite"
t.Cleanup(func() {
cleanCtx, c := context.WithTimeout(context.Background(), 10*time.Second)
defer c()
clientNode.RunRoot(cleanCtx, fmt.Sprintf("umount -f %s 2>/dev/null", mnt))
clientNode.RunRoot(cleanCtx, fmt.Sprintf("rm -rf %s", mnt))
})
clientNode.RunRoot(ctx, fmt.Sprintf("mkfs.ext4 -F %s", dev))
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s && mount %s %s", mnt, dev, mnt))
// Create DB in WAL mode, insert 10K rows via batched inserts
// Use a script file to avoid shell quoting issues over SSH
script := fmt.Sprintf(`bash -c '
set -e
DB="%s/test.db"
rm -f "$DB" "$DB-wal" "$DB-shm"
sqlite3 "$DB" "PRAGMA journal_mode=WAL; CREATE TABLE t(id INTEGER PRIMARY KEY, val TEXT);"
for i in $(seq 1 100); do
SQL="BEGIN;"
for j in $(seq 1 100); do
n=$(( (i-1)*100 + j ))
SQL="${SQL} INSERT INTO t(val) VALUES('"'"'row_${n}'"'"');"
done
SQL="${SQL} COMMIT;"
sqlite3 "$DB" "$SQL"
done
sqlite3 "$DB" "SELECT count(*) FROM t;"
'`, mnt)
stdout, stderr, code, _ := clientNode.RunRoot(ctx, script)
if code != 0 {
t.Fatalf("sqlite3 failed: code=%d stderr=%s", code, stderr)
}
// Last line of stdout should be the count
lines := strings.Split(strings.TrimSpace(stdout), "\n")
lastLine := lines[len(lines)-1]
if lastLine != "10000" {
t.Fatalf("expected 10000 rows, got last line: %q (full output: %s)", lastLine, stdout)
}
t.Log("SQLite WAL: 10K rows inserted and verified")
}
func testAppsQemuBoot(t *testing.T) {
requireCmd(t, "qemu-system-x86_64")
t.Skip("QEMU boot test requires Alpine ISO setup")
}
func testAppsQemuFio(t *testing.T) {
requireCmd(t, "qemu-system-x86_64")
t.Skip("QEMU fio test requires VM image setup")
}
func testAppsDockerOverlay(t *testing.T) {
requireCmd(t, "docker")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "500M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
mnt := "/tmp/blockvol-docker"
t.Cleanup(func() {
cleanCtx, c := context.WithTimeout(context.Background(), 15*time.Second)
defer c()
clientNode.RunRoot(cleanCtx, fmt.Sprintf("umount -f %s 2>/dev/null", mnt))
clientNode.RunRoot(cleanCtx, fmt.Sprintf("rm -rf %s", mnt))
})
clientNode.RunRoot(ctx, fmt.Sprintf("mkfs.ext4 -F %s", dev))
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s && mount %s %s", mnt, dev, mnt))
// Write a file via Docker bind-mount to the iSCSI-backed filesystem
clientNode.RunRoot(ctx, "docker pull alpine:latest 2>/dev/null")
testContent := "blockvol-docker-integration-test"
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("docker run --rm -v %s:/data alpine:latest sh -c 'echo %s > /data/docker-test.txt && cat /data/docker-test.txt'",
mnt, testContent))
if code != 0 {
t.Fatalf("docker run failed: code=%d stderr=%s stdout=%s", code, stderr, stdout)
}
if !strings.Contains(stdout, testContent) {
t.Fatalf("expected %q in output, got: %s", testContent, stdout)
}
// Verify file persists on host
stdout2, _, _, _ := clientNode.RunRoot(ctx, fmt.Sprintf("cat %s/docker-test.txt", mnt))
if !strings.Contains(stdout2, testContent) {
t.Fatalf("file not persisted: %s", stdout2)
}
t.Log("Docker on iSCSI-backed ext4 passed")
}
func testAppsLVMStripe(t *testing.T) {
requireCmd(t, "pvcreate")
t.Skip("LVM stripe test requires 2 iSCSI volumes")
}
func testAppsMdRaid1(t *testing.T) {
requireCmd(t, "mdadm")
t.Skip("MD RAID-1 test requires 2 iSCSI volumes")
}
func requireCmd(t *testing.T, cmd string) {
t.Helper()
if !clientNode.HasCommand(cmd) {
t.Skipf("%s not available", cmd)
}
}

181
weed/storage/blockvol/test/chaos_test.go

@ -0,0 +1,181 @@
//go:build integration
package test
import (
"context"
"fmt"
"strings"
"testing"
"time"
)
func TestChaos(t *testing.T) {
t.Run("Reconnect20", testChaosReconnect20)
t.Run("MultiSession4", testChaosMultiSession4)
t.Run("WALFull", testChaosWALFull)
t.Run("AttachDetach10", testChaosAttachDetach10)
t.Run("ConfigRestart", testChaosConfigRestart)
}
func testChaosReconnect20(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 25*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "100M", "")
if err := tgt.Start(ctx, true); err != nil {
t.Fatalf("start: %v", err)
}
n := 20
if testing.Short() {
n = 5
}
for i := 0; i < n; i++ {
t.Logf("reconnect %d/%d", i+1, n)
iscsi.Discover(ctx, host, tgt.config.Port)
dev, err := iscsi.Login(ctx, tgt.config.IQN)
if err != nil {
t.Fatalf("iter %d login: %v", i, err)
}
// Write 1MB + verify
_, _, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=/dev/urandom of=%s bs=1M count=1 oflag=direct 2>/dev/null", dev))
if code != 0 {
t.Fatalf("iter %d dd write failed", i)
}
sum, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=1M count=1 iflag=direct 2>/dev/null | md5sum", dev))
if firstLine(sum) == "" {
t.Fatalf("iter %d empty checksum", i)
}
if err := iscsi.Logout(ctx, tgt.config.IQN); err != nil {
t.Fatalf("iter %d logout: %v", i, err)
}
// Brief pause for session teardown
time.Sleep(200 * time.Millisecond)
}
t.Logf("%dx reconnect completed", n)
}
func testChaosMultiSession4(t *testing.T) {
t.Skip("multi-session test requires multiple target IQN support")
}
func testChaosWALFull(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "100M", "4M") // tiny WAL
dev := startAndLogin(t, ctx, tgt, iscsi, host)
// Sustained write much larger than WAL
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=walfull --filename=%s --rw=write --bs=64k "+
"--size=80M --direct=1 --ioengine=libaio", dev))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s stdout=%s", code, stderr, stdout)
}
t.Log("WAL full test passed (4MB WAL, 80MB write)")
}
func testChaosAttachDetach10(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "100M", "")
if err := tgt.Start(ctx, true); err != nil {
t.Fatalf("start: %v", err)
}
n2 := 10
if testing.Short() {
n2 = 3
}
for i := 0; i < n2; i++ {
t.Logf("attach/detach %d/%d", i+1, n2)
iscsi.Discover(ctx, host, tgt.config.Port)
dev, err := iscsi.Login(ctx, tgt.config.IQN)
if err != nil {
t.Fatalf("iter %d login: %v", i, err)
}
// Quick fio
_, _, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=ad%d --filename=%s --rw=randrw --verify=crc32 "+
"--bs=4k --size=10M --direct=1 --ioengine=libaio --randrepeat=1", i, dev))
if code != 0 {
t.Fatalf("iter %d fio failed", i)
}
if err := iscsi.Logout(ctx, tgt.config.IQN); err != nil {
t.Fatalf("iter %d logout: %v", i, err)
}
time.Sleep(200 * time.Millisecond)
}
// Verify no stale devices
stdout, _, _, _ := clientNode.RunRoot(ctx, "iscsiadm -m session 2>&1")
if strings.Contains(stdout, tgt.config.IQN) {
t.Fatalf("stale session after 10 cycles: %s", stdout)
}
t.Logf("%dx attach/detach completed, no stale devices", n2)
}
func testChaosConfigRestart(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "100M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
// fio with default config
_, _, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=cfg1 --filename=%s --rw=randrw --bs=4k "+
"--size=10M --direct=1 --ioengine=libaio --randrepeat=1", dev))
if code != 0 {
t.Fatalf("fio phase 1 failed")
}
// Logout + stop
iscsi.Logout(ctx, tgt.config.IQN)
tgt.Stop(ctx)
// Restart (open existing vol)
if err := tgt.Start(ctx, false); err != nil {
t.Fatalf("restart: %v", err)
}
iscsi.Discover(ctx, host, tgt.config.Port)
dev, err := iscsi.Login(ctx, tgt.config.IQN)
if err != nil {
t.Fatalf("re-login: %v", err)
}
// fio again
_, _, code, _ = clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=cfg2 --filename=%s --rw=randrw --bs=4k "+
"--size=10M --direct=1 --ioengine=libaio --randrepeat=1", dev))
if code != 0 {
t.Fatalf("fio phase 2 failed")
}
t.Log("config restart test passed")
}

228
weed/storage/blockvol/test/crash_test.go

@ -0,0 +1,228 @@
//go:build integration
package test
import (
"context"
"fmt"
"testing"
"time"
)
func TestCrash(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required for crash tests")
}
t.Run("Kill9Fsync", testCrashKill9Fsync)
t.Run("Kill9NoSync", testCrashKill9NoSync)
t.Run("WALReplay", testCrashWALReplay)
t.Run("RapidKill10x", testCrashRapidKill10x)
t.Run("FsckAfterCrash", testCrashFsckAfterCrash)
}
func testCrashKill9Fsync(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "100M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
// Write with fdatasync
_, _, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=sync --filename=%s --rw=write --bs=4k --size=10M "+
"--fdatasync=1 --direct=1 --ioengine=libaio", dev))
if code != 0 {
t.Fatalf("fio write failed: code=%d", code)
}
// Record checksum of synced data
sum1, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=4k count=2560 iflag=direct 2>/dev/null | md5sum", dev))
// Kill9
t.Log("killing target...")
tgt.Kill9()
// Clean up stale iSCSI state before restart
iscsi.Logout(ctx, tgt.config.IQN)
iscsi.CleanupAll(ctx, tgt.config.IQN)
// Restart
t.Log("restarting target...")
if err := tgt.Start(ctx, false); err != nil {
t.Fatalf("restart: %v", err)
}
// Re-login
dev, err := iscsi.Login(ctx, tgt.config.IQN)
if err != nil {
t.Fatalf("re-login: %v", err)
}
// Verify synced data intact
sum2, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=4k count=2560 iflag=direct 2>/dev/null | md5sum", dev))
if firstLine(sum1) != firstLine(sum2) {
t.Fatalf("synced data corrupted: %s vs %s", firstLine(sum1), firstLine(sum2))
}
t.Log("synced data intact after Kill9")
}
func testCrashKill9NoSync(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "100M", "")
_ = startAndLogin(t, ctx, tgt, iscsi, host)
// Kill9 without sync
tgt.Kill9()
iscsi.Logout(ctx, tgt.config.IQN)
// Restart -- volume must open without corruption
if err := tgt.Start(ctx, false); err != nil {
t.Fatalf("restart after unclean kill: %v", err)
}
// Login to verify volume is usable
_, err := iscsi.Login(ctx, tgt.config.IQN)
if err != nil {
t.Fatalf("login after restart: %v", err)
}
t.Log("volume opened successfully after Kill9 (no sync)")
}
func testCrashWALReplay(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "100M", "4M") // small WAL
dev := startAndLogin(t, ctx, tgt, iscsi, host)
// Write a 4k block of known data (O_DIRECT requires sector-aligned writes)
_, _, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=/dev/urandom of=%s bs=4k count=1 oflag=direct 2>/dev/null", dev))
if code != 0 {
t.Fatalf("pattern write failed")
}
// Read back the checksum before kill
sumBefore, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=4k count=1 iflag=direct 2>/dev/null | md5sum", dev))
// Kill9 before flush can happen
tgt.Kill9()
iscsi.Logout(ctx, tgt.config.IQN)
// Restart (WAL replay)
if err := tgt.Start(ctx, false); err != nil {
t.Fatalf("restart: %v", err)
}
// Re-login and verify
dev, err := iscsi.Login(ctx, tgt.config.IQN)
if err != nil {
t.Fatalf("re-login: %v", err)
}
sumAfter, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=4k count=1 iflag=direct 2>/dev/null | md5sum", dev))
// Non-fdatasync writes have no durability guarantee, but volume must be readable
if firstLine(sumAfter) == "" {
t.Fatalf("could not read data after WAL replay")
}
t.Logf("WAL replay: before=%s after=%s (match=%v)",
firstLine(sumBefore), firstLine(sumAfter), firstLine(sumBefore) == firstLine(sumAfter))
t.Log("WAL replay completed, volume intact")
}
func testCrashRapidKill10x(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "50M", "")
n := 10
if testing.Short() {
n = 3
}
for i := 0; i < n; i++ {
t.Logf("iteration %d/%d", i+1, n)
create := (i == 0)
if err := tgt.Start(ctx, create); err != nil {
t.Fatalf("iter %d start: %v", i, err)
}
_, err := iscsi.Discover(ctx, host, tgt.config.Port)
if err != nil {
t.Fatalf("iter %d discover: %v", i, err)
}
dev, err := iscsi.Login(ctx, tgt.config.IQN)
if err != nil {
t.Fatalf("iter %d login: %v", i, err)
}
// Write 1MB
_, _, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=/dev/urandom of=%s bs=1M count=1 oflag=direct 2>/dev/null", dev))
if code != 0 {
t.Fatalf("iter %d dd write failed", i)
}
iscsi.Logout(ctx, tgt.config.IQN)
tgt.Kill9()
}
t.Logf("%dx rapid kill completed", n)
}
func testCrashFsckAfterCrash(t *testing.T) {
t.Skip("P3-BUG-11: WRITE SAME(16) not implemented, XFS sends it for inode zeroing")
if !clientNode.HasCommand("mkfs.xfs") {
t.Skip("mkfs.xfs required")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "500M", "") // XFS needs >= 300MB
dev := startAndLogin(t, ctx, tgt, iscsi, host)
mnt := "/tmp/blockvol-mnt"
// mkfs.xfs + mount
clientNode.RunRoot(ctx, fmt.Sprintf("mkfs.xfs -f %s", dev))
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s", mnt))
_, _, code, _ := clientNode.RunRoot(ctx, fmt.Sprintf("mount %s %s", dev, mnt))
if code != 0 {
t.Fatalf("mount failed")
}
// Workload: create some files
for i := 0; i < 50; i++ {
clientNode.RunRoot(ctx, fmt.Sprintf("dd if=/dev/urandom of=%s/file%d bs=4k count=10 2>/dev/null", mnt, i))
}
// Sync filesystem metadata to device, then unmount + Kill9
clientNode.RunRoot(ctx, "sync")
clientNode.RunRoot(ctx, fmt.Sprintf("umount %s 2>/dev/null", mnt))
iscsi.Logout(ctx, tgt.config.IQN)
iscsi.CleanupAll(ctx, tgt.config.IQN)
tgt.Kill9()
// Restart
if err := tgt.Start(ctx, false); err != nil {
t.Fatalf("restart: %v", err)
}
dev, err := iscsi.Login(ctx, tgt.config.IQN)
if err != nil {
t.Fatalf("re-login: %v", err)
}
// xfs_repair -n (read-only check)
stdout, stderr, code, _ := clientNode.RunRoot(ctx, fmt.Sprintf("xfs_repair -n %s", dev))
if code != 0 {
t.Fatalf("xfs_repair failed: stdout=%s stderr=%s", stdout, stderr)
}
t.Log("xfs_repair -n passed (filesystem clean)")
}

281
weed/storage/blockvol/test/integration_test.go

@ -0,0 +1,281 @@
//go:build integration
package test
import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"
)
var (
flagEnv = flag.String("env", "wsl2", "wsl2 or remote")
flagTargetHost = flag.String("target-host", "127.0.0.1", "target node IP (SSH)")
flagClientHost = flag.String("client-host", "127.0.0.1", "initiator node IP (SSH)")
flagISCSIHost = flag.String("iscsi-host", "", "iSCSI target IP for discovery/login (defaults to target-host)")
flagSSHKey = flag.String("ssh-key", "", "SSH private key path")
flagSSHUser = flag.String("ssh-user", "testdev", "SSH user")
flagRepoDir = flag.String("repo-dir", "C:/work/seaweedfs", "seaweedfs repo path")
)
// Global state shared across tests.
var (
targetNode *Node
clientNode *Node
artifacts *ArtifactCollector
)
const iqnPrefix = "iqn.2024.com.seaweedfs:test"
func TestMain(m *testing.M) {
flag.Parse()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Setup nodes
if *flagEnv == "wsl2" {
targetNode = &Node{IsLocal: true}
clientNode = targetNode // same node for WSL2
} else {
targetNode = &Node{Host: *flagTargetHost, User: *flagSSHUser, KeyFile: *flagSSHKey}
clientNode = &Node{Host: *flagClientHost, User: *flagSSHUser, KeyFile: *flagSSHKey}
}
if err := targetNode.Connect(); err != nil {
fmt.Fprintf(os.Stderr, "FATAL: target connect: %v\n", err)
os.Exit(1)
}
if clientNode != targetNode {
if err := clientNode.Connect(); err != nil {
fmt.Fprintf(os.Stderr, "FATAL: client connect: %v\n", err)
os.Exit(1)
}
}
// Preflight: print versions
preflight(ctx)
// Build target binary
fmt.Println("=== Building iscsi-target binary ===")
tgt := NewTarget(targetNode, DefaultTargetConfig())
if err := tgt.Build(ctx, *flagRepoDir); err != nil {
fmt.Fprintf(os.Stderr, "FATAL: build target: %v\n", err)
os.Exit(1)
}
if err := tgt.Deploy(*flagRepoDir + "/iscsi-target-linux"); err != nil {
fmt.Fprintf(os.Stderr, "FATAL: deploy target: %v\n", err)
os.Exit(1)
}
fmt.Println("=== Build + deploy complete ===")
// Setup artifact collector (no Target -- each test provides its own)
artDir, _ := filepath.Abs("artifacts")
artifacts = NewArtifactCollector(artDir, clientNode)
// Run tests
code := m.Run()
// Global cleanup (unconditional)
cleanup()
os.Exit(code)
}
func preflight(ctx context.Context) {
fmt.Println("=== Preflight ===")
checks := []struct {
name string
cmd string
node *Node
}{
{"fio", "fio --version", clientNode},
{"iscsiadm", "iscsiadm --version 2>&1", clientNode},
{"go", "go version", targetNode},
{"kernel", "uname -r", targetNode},
}
for _, c := range checks {
stdout, _, code, err := c.node.Run(ctx, c.cmd)
if err != nil || code != 0 {
fmt.Printf(" %-10s MISSING\n", c.name)
} else {
fmt.Printf(" %-10s %s\n", c.name, firstLine(stdout))
}
}
fmt.Println("=== End Preflight ===")
}
func cleanup() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
fmt.Println("=== Global Cleanup ===")
iscsi := NewISCSIClient(clientNode)
iscsi.CleanupAll(ctx, iqnPrefix)
// Unmount any test mount points
clientNode.RunRoot(ctx, "umount -f /tmp/blockvol-mnt 2>/dev/null")
// Kill any leftover target process
targetNode.Run(ctx, "pkill -f iscsi-target-test 2>/dev/null")
// Remove temp files
targetNode.Run(ctx, "rm -f /tmp/blockvol-test.blk /tmp/blockvol-test.blk.wal /tmp/iscsi-target-test /tmp/iscsi-target-test.log")
if clientNode != targetNode {
clientNode.Close()
}
targetNode.Close()
fmt.Println("=== Cleanup Done ===")
}
// TestHarnessSelfCheck validates the test framework itself.
// Run first: go test -tags integration -run TestHarnessSelfCheck -v
func TestHarnessSelfCheck(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
cfg := DefaultTargetConfig()
cfg.IQN = iqnPrefix + "-harness"
cfg.VolSize = "50M"
tgt := NewTarget(targetNode, cfg)
iscsi := NewISCSIClient(clientNode)
host := targetHost()
t.Cleanup(func() {
iscsi.Logout(ctx, cfg.IQN)
tgt.Stop(ctx)
tgt.Cleanup(ctx)
})
t.Cleanup(func() { artifacts.Collect(t, tgt) })
// Start target
t.Log("starting target...")
if err := tgt.Start(ctx, true); err != nil {
t.Fatalf("start target: %v", err)
}
// Discovery
t.Log("discovering...")
iqns, err := iscsi.Discover(ctx, host, cfg.Port)
if err != nil {
t.Fatalf("discover: %v", err)
}
found := false
for _, iqn := range iqns {
if iqn == cfg.IQN {
found = true
}
}
if !found {
t.Fatalf("IQN %s not in discovery: %v", cfg.IQN, iqns)
}
// Login
t.Log("logging in...")
dev, err := iscsi.Login(ctx, cfg.IQN)
if err != nil {
t.Fatalf("login: %v", err)
}
t.Logf("device: %s", dev)
// DD 1MB write + read + verify
t.Log("dd write/read verify...")
_, _, code, err := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=/dev/urandom of=%s bs=1M count=1 oflag=direct 2>/dev/null", dev))
if err != nil || code != 0 {
t.Fatalf("dd write failed: code=%d err=%v", code, err)
}
wSum, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=1M count=1 iflag=direct 2>/dev/null | md5sum", dev))
t.Logf("md5: %s", firstLine(wSum))
// Logout
t.Log("logging out...")
if err := iscsi.Logout(ctx, cfg.IQN); err != nil {
t.Fatalf("logout: %v", err)
}
// Stop target
t.Log("stopping target...")
if err := tgt.Stop(ctx); err != nil {
t.Fatalf("stop: %v", err)
}
t.Log("harness self-check passed")
}
// targetHost returns the iSCSI target address for discovery/login from the initiator.
// Uses -iscsi-host if set, otherwise falls back to -target-host.
func targetHost() string {
if *flagEnv == "wsl2" {
return "127.0.0.1"
}
if *flagISCSIHost != "" {
return *flagISCSIHost
}
return *flagTargetHost
}
func firstLine(s string) string {
for i, c := range s {
if c == '\n' || c == '\r' {
return s[:i]
}
}
return s
}
// newTestTarget creates a target with test-specific IQN, unique vol file, and cleanup.
// Tests must not run in parallel -- they share the same target node and port.
func newTestTarget(t *testing.T, volSize, walSize string) (*Target, *ISCSIClient, string) {
cfg := DefaultTargetConfig()
// Sanitize test name for IQN -- replace / with - (subtests use /)
name := strings.ReplaceAll(t.Name(), "/", "-")
cfg.IQN = iqnPrefix + "-" + strings.ToLower(name)
if volSize != "" {
cfg.VolSize = volSize
}
if walSize != "" {
cfg.WALSize = walSize
}
tgt := NewTarget(targetNode, cfg)
iscsi := NewISCSIClient(clientNode)
host := targetHost()
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
iscsi.Logout(ctx, cfg.IQN)
tgt.Stop(ctx)
tgt.Cleanup(ctx)
})
t.Cleanup(func() { artifacts.Collect(t, tgt) })
return tgt, iscsi, host
}
// startAndLogin creates volume, starts target, discovers, logs in, returns device.
func startAndLogin(t *testing.T, ctx context.Context, tgt *Target, iscsi *ISCSIClient, host string) string {
t.Helper()
if err := tgt.Start(ctx, true); err != nil {
t.Fatalf("start target: %v", err)
}
if _, err := iscsi.Discover(ctx, host, tgt.config.Port); err != nil {
t.Fatalf("discover: %v", err)
}
dev, err := iscsi.Login(ctx, tgt.config.IQN)
if err != nil {
t.Fatalf("login: %v", err)
}
return dev
}

229
weed/storage/blockvol/test/iscsi.go

@ -0,0 +1,229 @@
//go:build integration
package test
import (
"context"
"fmt"
"strings"
"time"
)
// ISCSIClient wraps iscsiadm commands on a node.
type ISCSIClient struct {
node *Node
targetHost string // set after Discover, used to fix wildcard portals
targetPort int
}
// NewISCSIClient creates an iSCSI client bound to a node.
func NewISCSIClient(node *Node) *ISCSIClient {
return &ISCSIClient{node: node}
}
// Discover runs iSCSI SendTargets discovery and returns discovered IQNs.
// Remembers the target host for subsequent Login calls.
func (c *ISCSIClient) Discover(ctx context.Context, host string, port int) ([]string, error) {
c.targetHost = host
c.targetPort = port
cmd := fmt.Sprintf("iscsiadm -m discovery -t sendtargets -p %s:%d", host, port)
stdout, stderr, code, err := c.node.RunRoot(ctx, cmd)
if err != nil {
return nil, fmt.Errorf("discovery error: %w", err)
}
if code != 0 {
return nil, fmt.Errorf("discovery failed (code %d): %s", code, stderr)
}
var iqns []string
for _, line := range strings.Split(stdout, "\n") {
line = strings.TrimSpace(line)
if line == "" {
continue
}
// Format: "10.0.0.1:3260,1 iqn.2024.com.seaweedfs:vol1"
// or: "[::]:3260,-1 iqn.2024.com.seaweedfs:vol1"
parts := strings.Fields(line)
if len(parts) >= 2 {
iqns = append(iqns, parts[1])
}
}
// Fix wildcard portals: target may advertise [::]:3260 but remote
// initiators need the real IP. Delete wildcard records and re-create
// with the correct portal address.
for _, iqn := range iqns {
c.fixNodePortal(ctx, iqn, host, port)
}
return iqns, nil
}
// fixNodePortal ensures the node record for iqn uses the actual target
// host address, not a wildcard like [::] or 0.0.0.0.
func (c *ISCSIClient) fixNodePortal(ctx context.Context, iqn, host string, port int) {
// List node records for this IQN
stdout, _, _, _ := c.node.RunRoot(ctx,
fmt.Sprintf("iscsiadm -m node -T %s 2>/dev/null", iqn))
// Check if any record has a wildcard address
hasWildcard := false
for _, line := range strings.Split(stdout, "\n") {
if strings.Contains(line, "node.conn[0].address") {
if strings.Contains(line, "::") || strings.Contains(line, "0.0.0.0") {
hasWildcard = true
}
}
}
if !hasWildcard {
return
}
// Delete ALL node records for this IQN (wildcard ones)
c.node.RunRoot(ctx, fmt.Sprintf("iscsiadm -m node -T %s -o delete 2>/dev/null", iqn))
// Create a new node record with the correct portal
portal := fmt.Sprintf("%s:%d", host, port)
c.node.RunRoot(ctx, fmt.Sprintf("iscsiadm -m node -T %s -p %s -o new 2>/dev/null", iqn, portal))
}
// Login connects to the target and returns the device path (e.g. /dev/sda).
// Uses explicit portal from Discover when available to avoid wildcard issues.
func (c *ISCSIClient) Login(ctx context.Context, iqn string) (string, error) {
var cmd string
if c.targetHost != "" && c.targetHost != "127.0.0.1" && c.targetHost != "localhost" {
// Remote mode: use explicit portal to avoid wildcard [::] issue
portal := fmt.Sprintf("%s:%d", c.targetHost, c.targetPort)
cmd = fmt.Sprintf("iscsiadm -m node -T %s -p %s --login", iqn, portal)
} else {
// Local/WSL2 mode: IQN-only works fine
cmd = fmt.Sprintf("iscsiadm -m node -T %s --login", iqn)
}
_, stderr, code, err := c.node.RunRoot(ctx, cmd)
if err != nil {
return "", fmt.Errorf("login error: %w", err)
}
if code != 0 {
return "", fmt.Errorf("login failed (code %d): %s", code, stderr)
}
// Poll for device to appear (kernel creates /dev/sdX asynchronously)
return c.waitForDevice(ctx, iqn)
}
// Logout disconnects from the target.
func (c *ISCSIClient) Logout(ctx context.Context, iqn string) error {
cmd := fmt.Sprintf("iscsiadm -m node -T %s --logout", iqn)
_, stderr, code, err := c.node.RunRoot(ctx, cmd)
if err != nil {
return fmt.Errorf("logout error: %w", err)
}
if code != 0 {
return fmt.Errorf("logout failed (code %d): %s", code, stderr)
}
return nil
}
// GetDevice returns the device path for an active session.
func (c *ISCSIClient) GetDevice(ctx context.Context, iqn string) (string, error) {
return c.waitForDevice(ctx, iqn)
}
func (c *ISCSIClient) waitForDevice(ctx context.Context, iqn string) (string, error) {
deadline := time.Now().Add(30 * time.Second)
rescanned := false
for time.Now().Before(deadline) {
select {
case <-ctx.Done():
return "", ctx.Err()
default:
}
// Parse session details to find the attached disk
stdout, _, code, _ := c.node.RunRoot(ctx, "iscsiadm -m session -P3")
if code == 0 {
dev := parseDeviceFromSession(stdout, iqn)
if dev != "" {
return dev, nil
}
}
// After 5s without device, force a LUN rescan (WSL2 needs this)
if !rescanned && time.Until(deadline) < 25*time.Second {
c.node.RunRoot(ctx, "iscsiadm -m session -R")
rescanned = true
}
time.Sleep(500 * time.Millisecond)
}
return "", fmt.Errorf("device for %s did not appear within 30s", iqn)
}
// parseDeviceFromSession extracts /dev/sdX from iscsiadm -m session -P3 output.
func parseDeviceFromSession(output, iqn string) string {
lines := strings.Split(output, "\n")
inTarget := false
for _, line := range lines {
if strings.Contains(line, "Target: "+iqn) {
inTarget = true
continue
}
if inTarget && strings.Contains(line, "Target: ") {
break // next target
}
if inTarget && strings.Contains(line, "Attached scsi disk") {
// "Attached scsi disk sda State: running"
fields := strings.Fields(line)
for i, f := range fields {
if f == "disk" && i+1 < len(fields) {
return "/dev/" + fields[i+1]
}
}
}
}
return ""
}
// WaitForSession polls until a session for the given IQN is in LOGGED_IN state.
// Used after Kill9+Restart to wait for iSCSI session recovery.
func (c *ISCSIClient) WaitForSession(ctx context.Context, iqn string) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("session %s did not recover: %w", iqn, ctx.Err())
default:
}
stdout, _, code, _ := c.node.RunRoot(ctx, "iscsiadm -m session")
if code == 0 && strings.Contains(stdout, iqn) {
return nil
}
time.Sleep(500 * time.Millisecond)
}
}
// CleanupAll force-logouts sessions matching the IQN prefix only.
// Does not touch other iSCSI sessions on the node.
func (c *ISCSIClient) CleanupAll(ctx context.Context, iqnPrefix string) error {
stdout, _, _, _ := c.node.RunRoot(ctx, "iscsiadm -m session 2>&1")
if stdout == "" || strings.Contains(stdout, "No active sessions") {
return nil
}
// Parse session lines: "tcp: [N] 10.0.0.1:3260,1 iqn.2024.com.seaweedfs:test-..."
for _, line := range strings.Split(stdout, "\n") {
line = strings.TrimSpace(line)
if !strings.Contains(line, iqnPrefix) {
continue
}
// Extract IQN from the line
fields := strings.Fields(line)
for _, f := range fields {
if strings.HasPrefix(f, iqnPrefix) {
c.node.RunRoot(ctx, fmt.Sprintf("iscsiadm -m node -T %s --logout 2>/dev/null", f))
c.node.RunRoot(ctx, fmt.Sprintf("iscsiadm -m node -T %s -o delete 2>/dev/null", f))
}
}
}
return nil
}

316
weed/storage/blockvol/test/node.go

@ -0,0 +1,316 @@
//go:build integration
package test
import (
"bytes"
"context"
"fmt"
"io"
"net"
"os"
"os/exec"
"strings"
"sync"
"time"
"golang.org/x/crypto/ssh"
)
// Node represents an SSH-accessible (or local WSL2) machine.
type Node struct {
Host string
User string
KeyFile string
IsLocal bool // WSL2 mode: use exec.CommandContext instead of SSH
mu sync.Mutex
client *ssh.Client
}
// Connect establishes the SSH connection (no-op for local mode).
func (n *Node) Connect() error {
if n.IsLocal {
return nil
}
key, err := os.ReadFile(n.KeyFile)
if err != nil {
return fmt.Errorf("read SSH key %s: %w", n.KeyFile, err)
}
signer, err := ssh.ParsePrivateKey(key)
if err != nil {
return fmt.Errorf("parse SSH key: %w", err)
}
config := &ssh.ClientConfig{
User: n.User,
Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: 10 * time.Second,
}
addr := n.Host
if !strings.Contains(addr, ":") {
addr += ":22"
}
n.mu.Lock()
defer n.mu.Unlock()
n.client, err = ssh.Dial("tcp", addr, config)
if err != nil {
return fmt.Errorf("SSH dial %s: %w", addr, err)
}
return nil
}
// Run executes a command and returns stdout, stderr, exit code.
// The context controls timeout -- cancelled context kills the command.
func (n *Node) Run(ctx context.Context, cmd string) (stdout, stderr string, exitCode int, err error) {
if n.IsLocal {
return n.runLocal(ctx, cmd)
}
return n.runSSH(ctx, cmd)
}
func (n *Node) runLocal(ctx context.Context, cmd string) (string, string, int, error) {
c := exec.CommandContext(ctx, "wsl", "-e", "bash", "-c", cmd)
var outBuf, errBuf bytes.Buffer
c.Stdout = &outBuf
c.Stderr = &errBuf
err := c.Run()
if ctx.Err() != nil {
return outBuf.String(), errBuf.String(), -1, fmt.Errorf("command timed out: %w", ctx.Err())
}
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
return outBuf.String(), errBuf.String(), exitErr.ExitCode(), nil
}
return outBuf.String(), errBuf.String(), -1, err
}
return outBuf.String(), errBuf.String(), 0, nil
}
func (n *Node) runSSH(ctx context.Context, cmd string) (string, string, int, error) {
n.mu.Lock()
if n.client == nil {
n.mu.Unlock()
return "", "", -1, fmt.Errorf("SSH not connected")
}
session, err := n.client.NewSession()
n.mu.Unlock()
if err != nil {
return "", "", -1, fmt.Errorf("new SSH session: %w", err)
}
defer session.Close()
var outBuf, errBuf bytes.Buffer
session.Stdout = &outBuf
session.Stderr = &errBuf
done := make(chan error, 1)
go func() { done <- session.Run(cmd) }()
select {
case <-ctx.Done():
_ = session.Signal(ssh.SIGKILL)
return outBuf.String(), errBuf.String(), -1, fmt.Errorf("command timed out: %w", ctx.Err())
case err := <-done:
if err != nil {
if exitErr, ok := err.(*ssh.ExitError); ok {
return outBuf.String(), errBuf.String(), exitErr.ExitStatus(), nil
}
return outBuf.String(), errBuf.String(), -1, err
}
return outBuf.String(), errBuf.String(), 0, nil
}
}
// RunRoot executes a command with sudo -n (non-interactive).
// Fails immediately if sudo requires a password instead of hanging.
func (n *Node) RunRoot(ctx context.Context, cmd string) (string, string, int, error) {
return n.Run(ctx, "sudo -n "+cmd)
}
// Upload copies a local file to the remote node via SCP.
func (n *Node) Upload(local, remote string) error {
if n.IsLocal {
// Convert Windows path to WSL path for cp
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
wslLocal := toWSLPath(local)
_, stderr, code, err := n.Run(ctx, fmt.Sprintf("cp %s %s && chmod +x %s", wslLocal, remote, remote))
if err != nil || code != 0 {
return fmt.Errorf("local upload: code=%d stderr=%s err=%v", code, stderr, err)
}
return nil
}
return n.scpUpload(local, remote)
}
func (n *Node) scpUpload(local, remote string) error {
data, err := os.ReadFile(local)
if err != nil {
return fmt.Errorf("read local file %s: %w", local, err)
}
n.mu.Lock()
if n.client == nil {
n.mu.Unlock()
return fmt.Errorf("SSH not connected")
}
session, err := n.client.NewSession()
n.mu.Unlock()
if err != nil {
return fmt.Errorf("new SSH session: %w", err)
}
defer session.Close()
go func() {
w, _ := session.StdinPipe()
fmt.Fprintf(w, "C0755 %d %s\n", len(data), remoteName(remote))
w.Write(data)
fmt.Fprint(w, "\x00")
w.Close()
}()
dir := remoteDir(remote)
return session.Run(fmt.Sprintf("scp -t %s", dir))
}
// Download copies a remote file to local via SCP.
func (n *Node) Download(remote, local string) error {
if n.IsLocal {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
wslLocal := toWSLPath(local)
_, stderr, code, err := n.Run(ctx, fmt.Sprintf("cp %s %s", remote, wslLocal))
if err != nil || code != 0 {
return fmt.Errorf("local download: code=%d stderr=%s err=%v", code, stderr, err)
}
return nil
}
return n.scpDownload(remote, local)
}
func (n *Node) scpDownload(remote, local string) error {
n.mu.Lock()
if n.client == nil {
n.mu.Unlock()
return fmt.Errorf("SSH not connected")
}
session, err := n.client.NewSession()
n.mu.Unlock()
if err != nil {
return fmt.Errorf("new SSH session: %w", err)
}
defer session.Close()
var buf bytes.Buffer
session.Stdout = &buf
if err := session.Run(fmt.Sprintf("cat %s", remote)); err != nil {
return fmt.Errorf("read remote %s: %w", remote, err)
}
return os.WriteFile(local, buf.Bytes(), 0644)
}
// Kill sends SIGKILL to a process by PID.
func (n *Node) Kill(pid int) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, _, _, err := n.RunRoot(ctx, fmt.Sprintf("kill -9 %d", pid))
return err
}
// HasCommand checks if a command is available on the node.
func (n *Node) HasCommand(cmd string) bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, _, code, err := n.Run(ctx, fmt.Sprintf("which %s", cmd))
return err == nil && code == 0
}
// Close closes the SSH connection.
func (n *Node) Close() {
n.mu.Lock()
defer n.mu.Unlock()
if n.client != nil {
n.client.Close()
n.client = nil
}
}
// DialTCP opens a direct TCP connection through the SSH tunnel.
func (n *Node) DialTCP(addr string) (net.Conn, error) {
if n.IsLocal {
return net.DialTimeout("tcp", addr, 5*time.Second)
}
n.mu.Lock()
defer n.mu.Unlock()
if n.client == nil {
return nil, fmt.Errorf("SSH not connected")
}
return n.client.Dial("tcp", addr)
}
// StreamRun executes a command and streams stdout to the writer.
func (n *Node) StreamRun(ctx context.Context, cmd string, w io.Writer) error {
if n.IsLocal {
c := exec.CommandContext(ctx, "wsl", "-e", "bash", "-c", cmd)
c.Stdout = w
c.Stderr = w
return c.Run()
}
n.mu.Lock()
if n.client == nil {
n.mu.Unlock()
return fmt.Errorf("SSH not connected")
}
session, err := n.client.NewSession()
n.mu.Unlock()
if err != nil {
return err
}
defer session.Close()
session.Stdout = w
session.Stderr = w
done := make(chan error, 1)
go func() { done <- session.Run(cmd) }()
select {
case <-ctx.Done():
_ = session.Signal(ssh.SIGKILL)
return ctx.Err()
case err := <-done:
return err
}
}
// Helper functions
func toWSLPath(winPath string) string {
// Convert C:\foo\bar to /mnt/c/foo/bar
p := strings.ReplaceAll(winPath, "\\", "/")
if len(p) >= 2 && p[1] == ':' {
drive := strings.ToLower(string(p[0]))
p = "/mnt/" + drive + p[2:]
}
return p
}
func remoteName(path string) string {
parts := strings.Split(path, "/")
return parts[len(parts)-1]
}
func remoteDir(path string) string {
idx := strings.LastIndex(path, "/")
if idx < 0 {
return "."
}
return path[:idx]
}

168
weed/storage/blockvol/test/perf_test.go

@ -0,0 +1,168 @@
//go:build integration
package test
import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"
)
func TestPerf(t *testing.T) {
if *flagEnv != "remote" {
t.Skip("perf tests require remote mode (-env remote)")
}
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
t.Run("GoBench", testPerfGoBench)
t.Run("FioRandWrite", testPerfFioRandWrite)
t.Run("FioRandRead", testPerfFioRandRead)
t.Run("LatencyP99", testPerfLatencyP99)
}
func testPerfGoBench(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
benchDir := "/opt/work/seaweedfs/weed/storage/blockvol"
stdout, stderr, code, err := targetNode.Run(ctx,
fmt.Sprintf("cd %s && go test -run=^$ -bench=. -benchmem -count=1 -timeout=5m ./...", benchDir))
if err != nil || code != 0 {
t.Fatalf("go bench: code=%d stderr=%s err=%v", code, stderr, err)
}
t.Log(stdout)
artifacts.CollectPerf(t, "gobench", stdout)
}
func testPerfFioRandWrite(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "1G", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=randwrite --filename=%s --rw=randwrite "+
"--bs=4k --size=500M --direct=1 --ioengine=libaio --iodepth=32 "+
"--numjobs=4 --runtime=120 --time_based --group_reporting "+
"--output-format=json", dev))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s", code, stderr)
}
iops := extractIOPS(stdout, "write")
t.Logf("random write IOPS: %.0f", iops)
if iops < 10000 {
t.Fatalf("IOPS %.0f below threshold 10000", iops)
}
artifacts.CollectPerf(t, "fio-randwrite", stdout)
}
func testPerfFioRandRead(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "1G", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
// Pre-fill
clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=prefill --filename=%s --rw=write --bs=1M "+
"--size=500M --direct=1 --ioengine=libaio", dev))
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=randread --filename=%s --rw=randread "+
"--bs=4k --size=500M --direct=1 --ioengine=libaio --iodepth=32 "+
"--numjobs=4 --runtime=120 --time_based --group_reporting "+
"--output-format=json", dev))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s", code, stderr)
}
iops := extractIOPS(stdout, "read")
t.Logf("random read IOPS: %.0f", iops)
if iops < 10000 {
t.Fatalf("IOPS %.0f below threshold 10000", iops)
}
artifacts.CollectPerf(t, "fio-randread", stdout)
}
func testPerfLatencyP99(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "1G", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=latency --filename=%s --rw=randwrite "+
"--bs=4k --size=500M --direct=1 --ioengine=libaio --iodepth=1 "+
"--numjobs=1 --runtime=60 --time_based "+
"--lat_percentiles=1 --output-format=json", dev))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s", code, stderr)
}
p99 := extractP99Latency(stdout) // nanoseconds (fio clat_ns)
p99ms := p99 / 1_000_000 // ns -> ms
t.Logf("P99 latency: %.2f ms", p99ms)
if p99ms > 10 {
t.Fatalf("P99 %.2fms exceeds 10ms threshold", p99ms)
}
artifacts.CollectPerf(t, "fio-latency", stdout)
}
// extractIOPS parses fio JSON output for IOPS.
func extractIOPS(fioJSON string, rw string) float64 {
var result struct {
Jobs []struct {
Read struct{ IOPS float64 `json:"iops"` } `json:"read"`
Write struct{ IOPS float64 `json:"iops"` } `json:"write"`
} `json:"jobs"`
}
if err := json.Unmarshal([]byte(fioJSON), &result); err != nil {
return 0
}
if len(result.Jobs) == 0 {
return 0
}
if rw == "read" {
return result.Jobs[0].Read.IOPS
}
return result.Jobs[0].Write.IOPS
}
// extractP99Latency parses fio JSON output for P99 latency in microseconds.
func extractP99Latency(fioJSON string) float64 {
// Look for clat_ns percentile 99.000000
idx := strings.Index(fioJSON, "99.000000")
if idx < 0 {
return 0
}
// Find the value after the colon
sub := fioJSON[idx:]
colonIdx := strings.Index(sub, ":")
if colonIdx < 0 {
return 0
}
valStr := strings.TrimSpace(sub[colonIdx+1:])
// Take until comma or closing bracket
for i, c := range valStr {
if c == ',' || c == '}' || c == ']' {
valStr = valStr[:i]
break
}
}
var val float64
fmt.Sscanf(strings.TrimSpace(valStr), "%f", &val)
return val
}

190
weed/storage/blockvol/test/smoke_test.go

@ -0,0 +1,190 @@
//go:build integration
package test
import (
"context"
"fmt"
"strings"
"testing"
"time"
)
func TestSmoke(t *testing.T) {
t.Run("Discovery", testSmokeDiscovery)
t.Run("DDIntegrity", testSmokeDDIntegrity)
t.Run("MkfsExt4", testSmokeMkfsExt4)
t.Run("MkfsXfs", testSmokeMkfsXfs)
t.Run("FioVerify", testSmokeFioVerify)
t.Run("LogoutClean", testSmokeLogoutClean)
}
func testSmokeDiscovery(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "50M", "")
if err := tgt.Start(ctx, true); err != nil {
t.Fatalf("start: %v", err)
}
iqns, err := iscsi.Discover(ctx, host, tgt.config.Port)
if err != nil {
t.Fatalf("discover: %v", err)
}
found := false
for _, iqn := range iqns {
if iqn == tgt.config.IQN {
found = true
}
}
if !found {
t.Fatalf("IQN %s not found in discovery response: %v", tgt.config.IQN, iqns)
}
}
func testSmokeDDIntegrity(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "50M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
// Write 1MB of random data
_, _, code, err := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=/dev/urandom of=%s bs=1M count=1 oflag=direct 2>/dev/null", dev))
if err != nil || code != 0 {
t.Fatalf("dd write: code=%d err=%v", code, err)
}
// Read back and checksum
sum1, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=1M count=1 iflag=direct 2>/dev/null | md5sum", dev))
sum2, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=1M count=1 iflag=direct 2>/dev/null | md5sum", dev))
s1 := firstLine(sum1)
s2 := firstLine(sum2)
if s1 != s2 {
t.Fatalf("checksum mismatch: %s vs %s", s1, s2)
}
t.Logf("checksums match: %s", s1)
}
func testSmokeMkfsExt4(t *testing.T) {
testSmokeMkfs(t, "ext4", "mkfs.ext4", "100M")
}
func testSmokeMkfsXfs(t *testing.T) {
t.Skip("P3-BUG-11: WRITE SAME(16) not implemented, XFS sends it for inode zeroing")
if !clientNode.HasCommand("mkfs.xfs") {
t.Skip("mkfs.xfs not available")
}
testSmokeMkfs(t, "xfs", "mkfs.xfs", "500M") // XFS needs >= 300MB
}
func testSmokeMkfs(t *testing.T, fstype, mkfsCmd, volSize string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, volSize, "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
mnt := "/tmp/blockvol-mnt"
t.Cleanup(func() {
cleanCtx, cleanCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cleanCancel()
clientNode.RunRoot(cleanCtx, fmt.Sprintf("umount -f %s 2>/dev/null", mnt))
clientNode.RunRoot(cleanCtx, fmt.Sprintf("rm -rf %s", mnt))
})
// mkfs
mkfsArgs := " -F" // ext4: force, xfs: force overwrite
if fstype == "xfs" {
mkfsArgs = " -f"
}
_, stderr, code, err := clientNode.RunRoot(ctx,
fmt.Sprintf("%s%s %s", mkfsCmd, mkfsArgs, dev))
if err != nil || code != 0 {
t.Fatalf("mkfs.%s: code=%d stderr=%s err=%v", fstype, code, stderr, err)
}
// Mount, write, unmount
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s", mnt))
_, _, code, _ = clientNode.RunRoot(ctx, fmt.Sprintf("mount %s %s", dev, mnt))
if code != 0 {
t.Fatalf("mount failed")
}
testContent := "blockvol-integration-test-data"
// Use bash -c with tee to ensure redirect works under sudo
clientNode.RunRoot(ctx, fmt.Sprintf("bash -c 'echo %s | tee %s/testfile.txt'", testContent, mnt))
clientNode.RunRoot(ctx, "sync")
clientNode.RunRoot(ctx, fmt.Sprintf("umount %s", mnt))
// Brief pause to let device settle after unmount
time.Sleep(1 * time.Second)
// Remount and verify
mountOpts := ""
if fstype == "xfs" {
mountOpts = "-o nouuid" // avoid UUID conflict with stale kernel state
}
_, stderr2, code, _ := clientNode.RunRoot(ctx, fmt.Sprintf("mount %s %s %s", mountOpts, dev, mnt))
if code != 0 {
t.Fatalf("remount failed: %s", stderr2)
}
stdout, _, _, _ := clientNode.RunRoot(ctx, fmt.Sprintf("cat %s/testfile.txt", mnt))
if !strings.Contains(stdout, testContent) {
t.Fatalf("file content mismatch: got %q, want %q", stdout, testContent)
}
t.Logf("%s: file persists across mount cycles", fstype)
}
func testSmokeFioVerify(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio not available")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "100M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
cmd := fmt.Sprintf("fio --name=verify --filename=%s --rw=randrw --verify=crc32 "+
"--bs=4k --size=50M --randrepeat=1 --direct=1 --ioengine=libaio "+
"--runtime=60 --time_based=0 --output-format=json", dev)
stdout, stderr, code, err := clientNode.RunRoot(ctx, cmd)
if err != nil || code != 0 {
t.Fatalf("fio: code=%d stderr=%s err=%v", code, stderr, err)
}
if strings.Contains(stdout, "\"verify_errors\"") && !strings.Contains(stdout, "\"verify_errors\" : 0") {
t.Fatalf("fio verify errors detected")
}
t.Log("fio verify passed with 0 errors")
}
func testSmokeLogoutClean(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "50M", "")
_ = startAndLogin(t, ctx, tgt, iscsi, host)
// Logout
if err := iscsi.Logout(ctx, tgt.config.IQN); err != nil {
t.Fatalf("logout: %v", err)
}
// Verify no stale sessions
stdout, _, _, _ := clientNode.RunRoot(ctx, "iscsiadm -m session 2>&1")
if strings.Contains(stdout, tgt.config.IQN) {
t.Fatalf("stale session found after logout: %s", stdout)
}
t.Log("no stale sessions after logout")
}

182
weed/storage/blockvol/test/stress_test.go

@ -0,0 +1,182 @@
//go:build integration
package test
import (
"context"
"fmt"
"strings"
"testing"
"time"
)
func TestStress(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required for stress tests")
}
t.Run("Fio5Min", testStressFio5Min)
t.Run("WALPressure", testStressWALPressure)
t.Run("SyncBatch", testStressSyncBatch)
t.Run("TarExtract", testStressTarExtract)
t.Run("Soak30Min", testStressSoak30Min)
t.Run("MixedBlockSize", testStressMixedBlockSize)
}
func testStressFio5Min(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "200M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
runtime := 300
if testing.Short() {
runtime = 30
}
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=stress5m --filename=%s --rw=randrw --verify=crc32 "+
"--bs=4k --size=100M --randrepeat=1 --direct=1 --ioengine=libaio "+
"--runtime=%d --time_based --output-format=json", dev, runtime))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s", code, stderr)
}
if strings.Contains(stdout, "\"verify_errors\"") && !strings.Contains(stdout, "\"verify_errors\" : 0") {
t.Fatal("fio verify errors")
}
t.Log("5-minute fio randrw+verify passed")
}
func testStressWALPressure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "100M", "4M") // small WAL
dev := startAndLogin(t, ctx, tgt, iscsi, host)
// Write more than WAL size to force WAL wrap
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=walpressure --filename=%s --rw=write --bs=64k "+
"--size=50M --direct=1 --ioengine=libaio", dev))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s stdout=%s", code, stderr, stdout)
}
t.Log("WAL pressure test passed (4MB WAL, 50MB write)")
}
func testStressSyncBatch(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "100M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=syncbatch --filename=%s --rw=randwrite --bs=4k "+
"--size=50M --fdatasync=1 --numjobs=16 --direct=1 --ioengine=libaio "+
"--runtime=60 --time_based --group_reporting --output-format=json", dev))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s", code, stderr)
}
// Extract IOPS from output
if idx := strings.Index(stdout, "\"iops\""); idx >= 0 {
end := idx + 30
if end > len(stdout) {
end = len(stdout)
}
t.Logf("sync batch IOPS: %s...", stdout[idx:end])
}
t.Log("sync batch test passed (16 jobs, fdatasync)")
}
func testStressTarExtract(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "200M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
mnt := "/tmp/blockvol-mnt"
t.Cleanup(func() {
cleanCtx, cleanCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cleanCancel()
clientNode.RunRoot(cleanCtx, fmt.Sprintf("umount -f %s 2>/dev/null", mnt))
})
// mkfs + mount
clientNode.RunRoot(ctx, fmt.Sprintf("mkfs.ext4 -F %s", dev))
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s", mnt))
_, _, code, _ := clientNode.RunRoot(ctx, fmt.Sprintf("mount %s %s", dev, mnt))
if code != 0 {
t.Fatalf("mount failed")
}
// Create a tarball with known content, extract, verify
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s/src", mnt))
for i := 0; i < 100; i++ {
clientNode.RunRoot(ctx, fmt.Sprintf("dd if=/dev/urandom of=%s/src/file%d bs=1k count=10 2>/dev/null", mnt, i))
}
// Tar and extract
clientNode.RunRoot(ctx, fmt.Sprintf("cd %s && tar cf archive.tar src/", mnt))
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s/dst && cd %s/dst && tar xf %s/archive.tar", mnt, mnt, mnt))
// Verify
sum1, _, _, _ := clientNode.RunRoot(ctx, fmt.Sprintf("cd %s/src && find . -type f -exec md5sum {} \\; | sort", mnt))
sum2, _, _, _ := clientNode.RunRoot(ctx, fmt.Sprintf("cd %s/dst/src && find . -type f -exec md5sum {} \\; | sort", mnt))
if sum1 != sum2 {
t.Fatalf("tar extract checksums differ")
}
t.Log("tar extract + verify passed (100 files)")
}
func testStressSoak30Min(t *testing.T) {
if testing.Short() {
t.Skip("skipping 30-minute soak in short mode")
}
ctx, cancel := context.WithTimeout(context.Background(), 35*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "200M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
soakTime := 1800
if testing.Short() {
soakTime = 60
}
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=soak --filename=%s --rw=randrw --verify=crc32 "+
"--bs=4k --size=100M --randrepeat=1 --direct=1 --ioengine=libaio "+
"--runtime=%d --time_based --output-format=json", dev, soakTime))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s", code, stderr)
}
if strings.Contains(stdout, "\"verify_errors\"") && !strings.Contains(stdout, "\"verify_errors\" : 0") {
t.Fatal("fio verify errors during 30-min soak")
}
t.Log("30-minute soak passed")
}
func testStressMixedBlockSize(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tgt, iscsi, host := newTestTarget(t, "200M", "")
dev := startAndLogin(t, ctx, tgt, iscsi, host)
sizes := []string{"4k", "64k", "1M"} // 512 below logical block size (4096)
for _, bs := range sizes {
t.Logf("testing bs=%s", bs)
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=mixed_%s --filename=%s --rw=randrw --verify=crc32 "+
"--bs=%s --size=20M --randrepeat=1 --direct=1 --ioengine=libaio", bs, dev, bs))
if code != 0 {
t.Fatalf("fio bs=%s: code=%d stderr=%s", bs, code, stderr)
}
if strings.Contains(stdout, "\"verify_errors\"") && !strings.Contains(stdout, "\"verify_errors\" : 0") {
t.Fatalf("fio verify errors at bs=%s", bs)
}
}
t.Log("mixed block size test passed (512, 4k, 64k, 1M)")
}

212
weed/storage/blockvol/test/weed_target.go

@ -0,0 +1,212 @@
//go:build integration
package test
import (
"context"
"fmt"
"os"
"os/exec"
"strconv"
"strings"
"time"
)
// WeedTarget manages the lifecycle of a `weed volume --block.listen` process.
// Unlike Target (standalone iscsi-target binary), this builds and runs the
// full weed binary with block volume support.
type WeedTarget struct {
node *Node
config TargetConfig
binPath string // remote path to weed binary
pid int
logFile string
blockDir string // remote dir containing .blk files
volFile string // remote path to the .blk file
iqnPrefix string
}
// NewWeedTarget creates a WeedTarget bound to a node.
func NewWeedTarget(node *Node, config TargetConfig) *WeedTarget {
return &WeedTarget{
node: node,
config: config,
binPath: "/tmp/weed-test",
logFile: "/tmp/weed-test.log",
blockDir: "/tmp/blockvol-weedtest",
iqnPrefix: "iqn.2024-01.com.seaweedfs:vol.",
}
}
// Build cross-compiles the weed binary for linux/amd64.
func (t *WeedTarget) Build(ctx context.Context, repoDir string) error {
binDir := repoDir + "/weed"
outPath := repoDir + "/weed-linux"
cmd := exec.CommandContext(ctx, "go", "build", "-o", outPath, ".")
cmd.Dir = binDir
cmd.Env = append(os.Environ(), "GOOS=linux", "GOARCH=amd64", "CGO_ENABLED=0")
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("build weed failed: %s\n%w", out, err)
}
return nil
}
// Deploy uploads the pre-built weed binary to the target node.
func (t *WeedTarget) Deploy(localBin string) error {
return t.node.Upload(localBin, t.binPath)
}
// Start launches `weed volume --block.listen`. If create is true, creates
// the block directory and volume file first.
func (t *WeedTarget) Start(ctx context.Context, create bool) error {
// Remove old log
t.node.Run(ctx, fmt.Sprintf("rm -f %s", t.logFile))
if create {
// Create block directory and volume file
t.node.Run(ctx, fmt.Sprintf("rm -rf %s", t.blockDir))
t.node.Run(ctx, fmt.Sprintf("mkdir -p %s", t.blockDir))
// Derive volume name from IQN suffix
volName := t.volName()
t.volFile = fmt.Sprintf("%s/%s.blk", t.blockDir, volName)
// Create the .blk file (truncate to size)
_, _, code, err := t.node.Run(ctx,
fmt.Sprintf("truncate -s %s %s", t.config.VolSize, t.volFile))
if err != nil || code != 0 {
return fmt.Errorf("create volume file: code=%d err=%v", code, err)
}
}
// Start weed volume with block support
args := fmt.Sprintf("volume -port=19333 -block.listen=:%d -block.dir=%s",
t.config.Port, t.blockDir)
cmd := fmt.Sprintf("setsid -f %s %s >%s 2>&1", t.binPath, args, t.logFile)
_, stderr, code, err := t.node.Run(ctx, cmd)
if err != nil || code != 0 {
return fmt.Errorf("start weed volume: code=%d stderr=%s err=%v", code, stderr, err)
}
// Wait for iSCSI port
if err := t.WaitForPort(ctx); err != nil {
return err
}
// Discover PID
stdout, _, _, _ := t.node.Run(ctx,
fmt.Sprintf("ps -eo pid,args | grep '%s' | grep -v grep | awk '{print $1}'", t.binPath))
pidStr := strings.TrimSpace(stdout)
if idx := strings.IndexByte(pidStr, '\n'); idx > 0 {
pidStr = pidStr[:idx]
}
pid, err := strconv.Atoi(pidStr)
if err != nil {
return fmt.Errorf("find weed PID: %q: %w", pidStr, err)
}
t.pid = pid
return nil
}
// Stop sends SIGTERM, waits up to 10s, then Kill9.
func (t *WeedTarget) Stop(ctx context.Context) error {
if t.pid == 0 {
return nil
}
t.node.Run(ctx, fmt.Sprintf("kill %d", t.pid))
deadline := time.Now().Add(10 * time.Second)
for time.Now().Before(deadline) {
_, _, code, _ := t.node.Run(ctx, fmt.Sprintf("kill -0 %d 2>/dev/null", t.pid))
if code != 0 {
t.pid = 0
return nil
}
time.Sleep(500 * time.Millisecond)
}
return t.Kill9()
}
// Kill9 sends SIGKILL immediately.
func (t *WeedTarget) Kill9() error {
if t.pid == 0 {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
t.node.Run(ctx, fmt.Sprintf("kill -9 %d", t.pid))
t.pid = 0
return nil
}
// Restart stops and starts weed volume (preserving the volume file).
func (t *WeedTarget) Restart(ctx context.Context) error {
if err := t.Stop(ctx); err != nil {
return fmt.Errorf("restart stop: %w", err)
}
return t.Start(ctx, false)
}
// WaitForPort polls until the iSCSI port is listening.
func (t *WeedTarget) WaitForPort(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("wait for port %d: %w", t.config.Port, ctx.Err())
default:
}
stdout, _, code, _ := t.node.Run(ctx, fmt.Sprintf("ss -tln | grep :%d", t.config.Port))
if code == 0 && strings.Contains(stdout, fmt.Sprintf(":%d", t.config.Port)) {
return nil
}
time.Sleep(200 * time.Millisecond)
}
}
// CollectLog downloads the log file contents.
func (t *WeedTarget) CollectLog() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stdout, _, _, err := t.node.Run(ctx, fmt.Sprintf("cat %s 2>/dev/null", t.logFile))
if err != nil {
return "", err
}
return stdout, nil
}
// Cleanup removes the block directory, volume files, and log.
func (t *WeedTarget) Cleanup(ctx context.Context) {
t.node.Run(ctx, fmt.Sprintf("rm -rf %s %s", t.blockDir, t.logFile))
}
// IQN returns the expected IQN for the volume.
func (t *WeedTarget) IQN() string {
return t.iqnPrefix + t.volName()
}
// volName derives the volume name from the config IQN or a default.
func (t *WeedTarget) volName() string {
// Use IQN suffix if set, otherwise "test"
if t.config.IQN != "" {
parts := strings.Split(t.config.IQN, ":")
if len(parts) > 1 {
return parts[len(parts)-1]
}
}
return "test"
}
// PID returns the current process ID.
func (t *WeedTarget) PID() int { return t.pid }
// VolFilePath returns the remote volume file path.
func (t *WeedTarget) VolFilePath() string { return t.volFile }
// LogFile returns the remote log file path.
func (t *WeedTarget) LogFile() string { return t.logFile }

736
weed/storage/blockvol/test/weedvol_test.go

@ -0,0 +1,736 @@
//go:build integration
package test
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"
)
// weedBinary is built once in TestWeedVol and reused across subtests.
var weedBinary string
func TestWeedVol(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
// Build weed binary once
repoDir := *flagRepoDir
t.Log("building weed binary...")
wt := NewWeedTarget(targetNode, DefaultTargetConfig())
if err := wt.Build(ctx, repoDir); err != nil {
t.Fatalf("build weed: %v", err)
}
weedBinary = repoDir + "/weed-linux"
if err := wt.Deploy(weedBinary); err != nil {
t.Fatalf("deploy weed: %v", err)
}
t.Log("weed binary built and deployed")
// 3B-1: Smoke
t.Run("Discovery", testWeedVolDiscovery)
t.Run("LoginIO", testWeedVolLoginIO)
t.Run("MkfsExt4", testWeedVolMkfsExt4)
t.Run("FioVerify", testWeedVolFioVerify)
t.Run("Heartbeat", testWeedVolHeartbeat)
t.Run("AttachScript", testWeedVolAttachScript)
// 3B-2: WAL Pressure
t.Run("PressureSustained", testWeedVolPressureSustained)
t.Run("PressureSync", testWeedVolPressureSync)
t.Run("PressureCrash", testWeedVolPressureCrash)
t.Run("PressureBatch", testWeedVolPressureBatch)
// 3B-3: Chaos
t.Run("MonkeyReconnect", testWeedVolMonkeyReconnect)
t.Run("MonkeyMultiVol", testWeedVolMonkeyMultiVol)
t.Run("MonkeyConfigRestart", testWeedVolMonkeyConfigRestart)
t.Run("MonkeyAttachDetach", testWeedVolMonkeyAttachDetach)
t.Run("MonkeyWALFull", testWeedVolMonkeyWALFull)
// 3B-4: Filesystem Stress
t.Run("FsMkfsExt4Stress", testWeedVolFsMkfsStress)
t.Run("FsTarExtract", testWeedVolFsTarExtract)
t.Run("FsLongSoak", testWeedVolFsLongSoak)
t.Run("FsPostgres", testWeedVolFsPostgres)
t.Run("FsFsstress", testWeedVolFsFsstress)
}
// newWeedTestTarget creates a WeedTarget with test-specific config and cleanup.
func newWeedTestTarget(t *testing.T, volSize string) (*WeedTarget, *ISCSIClient, string) {
cfg := DefaultTargetConfig()
name := strings.ReplaceAll(t.Name(), "/", "-")
cfg.IQN = "weedvol:" + strings.ToLower(name)
if volSize != "" {
cfg.VolSize = volSize
}
wt := NewWeedTarget(targetNode, cfg)
iscsiC := NewISCSIClient(clientNode)
host := targetHost()
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
iscsiC.Logout(ctx, wt.IQN())
iscsiC.CleanupAll(ctx, wt.iqnPrefix)
wt.Stop(ctx)
wt.Cleanup(ctx)
})
t.Cleanup(func() { artifacts.Collect(t, wt) })
return wt, iscsiC, host
}
// startAndLoginWeed creates vol, starts weed volume, discovers, logs in.
func startAndLoginWeed(t *testing.T, ctx context.Context, wt *WeedTarget, iscsiC *ISCSIClient, host string) string {
t.Helper()
if err := wt.Start(ctx, true); err != nil {
t.Fatalf("start weed: %v", err)
}
if _, err := iscsiC.Discover(ctx, host, wt.config.Port); err != nil {
t.Fatalf("discover: %v", err)
}
dev, err := iscsiC.Login(ctx, wt.IQN())
if err != nil {
t.Fatalf("login: %v", err)
}
return dev
}
// ============================================================
// 3B-1: Smoke Tests
// ============================================================
func testWeedVolDiscovery(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "50M")
if err := wt.Start(ctx, true); err != nil {
t.Fatalf("start: %v", err)
}
iqns, err := iscsiC.Discover(ctx, host, wt.config.Port)
if err != nil {
t.Fatalf("discover: %v", err)
}
found := false
for _, iqn := range iqns {
if iqn == wt.IQN() {
found = true
}
}
if !found {
t.Fatalf("IQN %s not found in discovery: %v", wt.IQN(), iqns)
}
t.Logf("discovered IQN: %s", wt.IQN())
}
func testWeedVolLoginIO(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "50M")
dev := startAndLoginWeed(t, ctx, wt, iscsiC, host)
// Write 1MB + read back + verify
_, _, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=/dev/urandom of=%s bs=4k count=1000 oflag=direct 2>/dev/null", dev))
if code != 0 {
t.Fatalf("dd write failed")
}
sum1, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=4k count=1000 iflag=direct 2>/dev/null | md5sum", dev))
sum2, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=4k count=1000 iflag=direct 2>/dev/null | md5sum", dev))
if firstLine(sum1) != firstLine(sum2) {
t.Fatalf("checksum mismatch: %s vs %s", firstLine(sum1), firstLine(sum2))
}
t.Logf("checksums match: %s", firstLine(sum1))
}
func testWeedVolMkfsExt4(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "100M")
dev := startAndLoginWeed(t, ctx, wt, iscsiC, host)
mnt := "/tmp/blockvol-mnt"
t.Cleanup(func() {
cctx, cc := context.WithTimeout(context.Background(), 10*time.Second)
defer cc()
clientNode.RunRoot(cctx, fmt.Sprintf("umount -f %s 2>/dev/null", mnt))
})
// mkfs + mount + write + unmount + remount + verify
_, stderr, code, _ := clientNode.RunRoot(ctx, fmt.Sprintf("mkfs.ext4 -F %s", dev))
if code != 0 {
t.Fatalf("mkfs.ext4 failed: %s", stderr)
}
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s", mnt))
_, _, code, _ = clientNode.RunRoot(ctx, fmt.Sprintf("mount %s %s", dev, mnt))
if code != 0 {
t.Fatalf("mount failed")
}
clientNode.RunRoot(ctx, fmt.Sprintf("bash -c 'echo weedvol-test-data | tee %s/testfile.txt'", mnt))
clientNode.RunRoot(ctx, "sync")
clientNode.RunRoot(ctx, fmt.Sprintf("umount %s", mnt))
time.Sleep(1 * time.Second)
_, _, code, _ = clientNode.RunRoot(ctx, fmt.Sprintf("mount %s %s", dev, mnt))
if code != 0 {
t.Fatalf("remount failed")
}
stdout, _, _, _ := clientNode.RunRoot(ctx, fmt.Sprintf("cat %s/testfile.txt", mnt))
if !strings.Contains(stdout, "weedvol-test-data") {
t.Fatalf("file content mismatch: %q", stdout)
}
t.Log("ext4: file persists across mount cycles via weed volume")
}
func testWeedVolFioVerify(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "100M")
dev := startAndLoginWeed(t, ctx, wt, iscsiC, host)
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=wv-verify --filename=%s --rw=randrw --verify=crc32 "+
"--bs=4k --size=50M --randrepeat=1 --direct=1 --ioengine=libaio "+
"--output-format=json", dev))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s", code, stderr)
}
if strings.Contains(stdout, "\"verify_errors\"") && !strings.Contains(stdout, "\"verify_errors\" : 0") {
t.Fatal("fio verify errors")
}
t.Log("fio verify passed via weed volume")
}
func testWeedVolHeartbeat(t *testing.T) {
// Heartbeat requires weed master running. Skip for now -- would need
// a full master+volume setup. Test that the volume starts and serves.
t.Skip("requires weed master for heartbeat verification")
}
func testWeedVolAttachScript(t *testing.T) {
// The attach script requires weed master to look up volumes.
// Skip for now -- script works via master API.
t.Skip("requires weed master for attach script")
}
// ============================================================
// 3B-2: WAL Pressure + Group Commit
// ============================================================
func testWeedVolPressureSustained(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "100M")
dev := startAndLoginWeed(t, ctx, wt, iscsiC, host)
// Sustained write larger than default WAL
_, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=wv-sustained --filename=%s --rw=write --bs=64k "+
"--size=80M --direct=1 --ioengine=libaio", dev))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s", code, stderr)
}
t.Log("sustained write pressure passed via weed volume")
}
func testWeedVolPressureSync(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "100M")
dev := startAndLoginWeed(t, ctx, wt, iscsiC, host)
runtime := 60
if testing.Short() {
runtime = 15
}
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=wv-sync --filename=%s --rw=randwrite --bs=4k "+
"--size=50M --fdatasync=1 --numjobs=16 --direct=1 --ioengine=libaio "+
"--runtime=%d --time_based --group_reporting --output-format=json", dev, runtime))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s", code, stderr)
}
if idx := strings.Index(stdout, "\"iops\""); idx >= 0 {
end := idx + 30
if end > len(stdout) {
end = len(stdout)
}
t.Logf("sync batch IOPS: %s...", stdout[idx:end])
}
t.Log("fdatasync pressure passed via weed volume")
}
func testWeedVolPressureCrash(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "100M")
dev := startAndLoginWeed(t, ctx, wt, iscsiC, host)
// Write with fdatasync
_, _, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=wv-crash --filename=%s --rw=write --bs=4k --size=10M "+
"--fdatasync=1 --direct=1 --ioengine=libaio", dev))
if code != 0 {
t.Fatalf("fio write failed")
}
// Record checksum
sum1, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=4k count=2560 iflag=direct 2>/dev/null | md5sum", dev))
// Kill
t.Log("killing weed volume...")
iscsiC.Logout(ctx, wt.IQN())
iscsiC.CleanupAll(ctx, wt.iqnPrefix)
wt.Kill9()
// Restart
t.Log("restarting weed volume...")
if err := wt.Start(ctx, false); err != nil {
t.Fatalf("restart: %v", err)
}
iscsiC.Discover(ctx, host, wt.config.Port)
dev, err := iscsiC.Login(ctx, wt.IQN())
if err != nil {
t.Fatalf("re-login: %v", err)
}
sum2, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=4k count=2560 iflag=direct 2>/dev/null | md5sum", dev))
if firstLine(sum1) != firstLine(sum2) {
t.Fatalf("synced data corrupted: %s vs %s", firstLine(sum1), firstLine(sum2))
}
t.Log("crash recovery: synced data intact via weed volume")
}
func testWeedVolPressureBatch(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "100M")
dev := startAndLoginWeed(t, ctx, wt, iscsiC, host)
runtime := 30
if testing.Short() {
runtime = 10
}
// Heavy concurrent fdatasync -- should trigger group commit batching
_, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=wv-batch --filename=%s --rw=randwrite --bs=4k "+
"--size=50M --fdatasync=1 --numjobs=32 --direct=1 --ioengine=libaio "+
"--runtime=%d --time_based --group_reporting", dev, runtime))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s", code, stderr)
}
t.Log("group commit batch pressure passed via weed volume")
}
// ============================================================
// 3B-3: Chaos Monkey
// ============================================================
func testWeedVolMonkeyReconnect(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "100M")
if err := wt.Start(ctx, true); err != nil {
t.Fatalf("start: %v", err)
}
n := 10
if testing.Short() {
n = 3
}
for i := 0; i < n; i++ {
t.Logf("reconnect %d/%d", i+1, n)
iscsiC.Discover(ctx, host, wt.config.Port)
dev, err := iscsiC.Login(ctx, wt.IQN())
if err != nil {
t.Fatalf("iter %d login: %v", i, err)
}
_, _, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=/dev/urandom of=%s bs=1M count=1 oflag=direct 2>/dev/null", dev))
if code != 0 {
t.Fatalf("iter %d dd write failed", i)
}
if err := iscsiC.Logout(ctx, wt.IQN()); err != nil {
t.Fatalf("iter %d logout: %v", i, err)
}
time.Sleep(200 * time.Millisecond)
}
t.Logf("%dx reconnect completed via weed volume", n)
}
func testWeedVolMonkeyMultiVol(t *testing.T) {
// Multi-volume: create 2 .blk files in block dir, verify both discoverable
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
wt := NewWeedTarget(targetNode, DefaultTargetConfig())
iscsiC := NewISCSIClient(clientNode)
host := targetHost()
t.Cleanup(func() {
cctx, cc := context.WithTimeout(context.Background(), 30*time.Second)
defer cc()
iscsiC.CleanupAll(cctx, wt.iqnPrefix)
wt.Stop(cctx)
wt.Cleanup(cctx)
})
t.Cleanup(func() { artifacts.Collect(t, wt) })
// Create block dir with 2 volume files
wt.node.Run(ctx, fmt.Sprintf("rm -rf %s && mkdir -p %s", wt.blockDir, wt.blockDir))
wt.node.Run(ctx, fmt.Sprintf("truncate -s 50M %s/vol1.blk", wt.blockDir))
wt.node.Run(ctx, fmt.Sprintf("truncate -s 50M %s/vol2.blk", wt.blockDir))
if err := wt.Start(ctx, false); err != nil {
t.Fatalf("start: %v", err)
}
iqns, err := iscsiC.Discover(ctx, host, wt.config.Port)
if err != nil {
t.Fatalf("discover: %v", err)
}
iqn1 := wt.iqnPrefix + "vol1"
iqn2 := wt.iqnPrefix + "vol2"
found1, found2 := false, false
for _, iqn := range iqns {
if iqn == iqn1 {
found1 = true
}
if iqn == iqn2 {
found2 = true
}
}
if !found1 || !found2 {
t.Fatalf("expected both %s and %s in discovery, got: %v", iqn1, iqn2, iqns)
}
// Login to both and do I/O
dev1, err := iscsiC.Login(ctx, iqn1)
if err != nil {
t.Fatalf("login vol1: %v", err)
}
dev2, err := iscsiC.Login(ctx, iqn2)
if err != nil {
t.Fatalf("login vol2: %v", err)
}
// Write different data to each
clientNode.RunRoot(ctx, fmt.Sprintf("dd if=/dev/urandom of=%s bs=4k count=100 oflag=direct 2>/dev/null", dev1))
clientNode.RunRoot(ctx, fmt.Sprintf("dd if=/dev/urandom of=%s bs=4k count=100 oflag=direct 2>/dev/null", dev2))
sum1, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=4k count=100 iflag=direct 2>/dev/null | md5sum", dev1))
sum2, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=%s bs=4k count=100 iflag=direct 2>/dev/null | md5sum", dev2))
if firstLine(sum1) == firstLine(sum2) {
t.Fatalf("volumes should have different data")
}
iscsiC.Logout(ctx, iqn1)
iscsiC.Logout(ctx, iqn2)
t.Logf("2 volumes served independently: %s %s", dev1, dev2)
}
func testWeedVolMonkeyConfigRestart(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "100M")
dev := startAndLoginWeed(t, ctx, wt, iscsiC, host)
// fio phase 1
_, _, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=wv-cfg1 --filename=%s --rw=randrw --bs=4k "+
"--size=10M --direct=1 --ioengine=libaio --randrepeat=1", dev))
if code != 0 {
t.Fatalf("fio phase 1 failed")
}
// Logout + stop + restart
iscsiC.Logout(ctx, wt.IQN())
iscsiC.CleanupAll(ctx, wt.iqnPrefix)
wt.Stop(ctx)
if err := wt.Start(ctx, false); err != nil {
t.Fatalf("restart: %v", err)
}
iscsiC.Discover(ctx, host, wt.config.Port)
dev, err := iscsiC.Login(ctx, wt.IQN())
if err != nil {
t.Fatalf("re-login: %v", err)
}
// fio phase 2
_, _, code, _ = clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=wv-cfg2 --filename=%s --rw=randrw --bs=4k "+
"--size=10M --direct=1 --ioengine=libaio --randrepeat=1", dev))
if code != 0 {
t.Fatalf("fio phase 2 failed")
}
t.Log("config restart passed via weed volume")
}
func testWeedVolMonkeyAttachDetach(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "100M")
if err := wt.Start(ctx, true); err != nil {
t.Fatalf("start: %v", err)
}
n := 5
if testing.Short() {
n = 3
}
for i := 0; i < n; i++ {
t.Logf("attach/detach %d/%d", i+1, n)
iscsiC.Discover(ctx, host, wt.config.Port)
dev, err := iscsiC.Login(ctx, wt.IQN())
if err != nil {
t.Fatalf("iter %d login: %v", i, err)
}
_, _, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=wv-ad%d --filename=%s --rw=randrw --verify=crc32 "+
"--bs=4k --size=10M --direct=1 --ioengine=libaio --randrepeat=1", i, dev))
if code != 0 {
t.Fatalf("iter %d fio failed", i)
}
if err := iscsiC.Logout(ctx, wt.IQN()); err != nil {
t.Fatalf("iter %d logout: %v", i, err)
}
time.Sleep(200 * time.Millisecond)
}
t.Logf("%dx attach/detach completed via weed volume", n)
}
func testWeedVolMonkeyWALFull(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Use small volume to pressure WAL
wt, iscsiC, host := newWeedTestTarget(t, "50M")
dev := startAndLoginWeed(t, ctx, wt, iscsiC, host)
_, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=wv-walfull --filename=%s --rw=write --bs=64k "+
"--size=40M --direct=1 --ioengine=libaio", dev))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s", code, stderr)
}
t.Log("WAL full pressure passed via weed volume")
}
// ============================================================
// 3B-4: Filesystem Stress
// ============================================================
func testWeedVolFsMkfsStress(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "100M")
dev := startAndLoginWeed(t, ctx, wt, iscsiC, host)
mnt := "/tmp/blockvol-mnt"
t.Cleanup(func() {
cctx, cc := context.WithTimeout(context.Background(), 10*time.Second)
defer cc()
clientNode.RunRoot(cctx, fmt.Sprintf("umount -f %s 2>/dev/null", mnt))
})
// mkfs + mount + create many files + verify
clientNode.RunRoot(ctx, fmt.Sprintf("mkfs.ext4 -F %s", dev))
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s", mnt))
_, _, code, _ := clientNode.RunRoot(ctx, fmt.Sprintf("mount %s %s", dev, mnt))
if code != 0 {
t.Fatalf("mount failed")
}
// Create 200 files
for i := 0; i < 200; i++ {
clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=/dev/urandom of=%s/file%d bs=1k count=5 2>/dev/null", mnt, i))
}
clientNode.RunRoot(ctx, "sync")
// Count files
stdout, _, _, _ := clientNode.RunRoot(ctx, fmt.Sprintf("ls %s | wc -l", mnt))
count := strings.TrimSpace(stdout)
t.Logf("created %s files on ext4 via weed volume", count)
// Unmount + remount + verify count
clientNode.RunRoot(ctx, fmt.Sprintf("umount %s", mnt))
time.Sleep(1 * time.Second)
_, _, code, _ = clientNode.RunRoot(ctx, fmt.Sprintf("mount %s %s", dev, mnt))
if code != 0 {
t.Fatalf("remount failed")
}
stdout2, _, _, _ := clientNode.RunRoot(ctx, fmt.Sprintf("ls %s | wc -l", mnt))
if strings.TrimSpace(stdout2) != count {
t.Fatalf("file count mismatch after remount: %s vs %s", count, strings.TrimSpace(stdout2))
}
t.Log("ext4 stress: 200 files persist via weed volume")
}
func testWeedVolFsTarExtract(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "200M")
dev := startAndLoginWeed(t, ctx, wt, iscsiC, host)
mnt := "/tmp/blockvol-mnt"
t.Cleanup(func() {
cctx, cc := context.WithTimeout(context.Background(), 10*time.Second)
defer cc()
clientNode.RunRoot(cctx, fmt.Sprintf("umount -f %s 2>/dev/null", mnt))
})
clientNode.RunRoot(ctx, fmt.Sprintf("mkfs.ext4 -F %s", dev))
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s", mnt))
_, _, code, _ := clientNode.RunRoot(ctx, fmt.Sprintf("mount %s %s", dev, mnt))
if code != 0 {
t.Fatalf("mount failed")
}
// Create source files, tar, extract, verify checksums
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s/src", mnt))
for i := 0; i < 100; i++ {
clientNode.RunRoot(ctx,
fmt.Sprintf("dd if=/dev/urandom of=%s/src/file%d bs=1k count=10 2>/dev/null", mnt, i))
}
clientNode.RunRoot(ctx, fmt.Sprintf("cd %s && tar cf archive.tar src/", mnt))
clientNode.RunRoot(ctx, fmt.Sprintf("mkdir -p %s/dst && cd %s/dst && tar xf %s/archive.tar", mnt, mnt, mnt))
sum1, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("cd %s/src && find . -type f -exec md5sum {} \\; | sort", mnt))
sum2, _, _, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("cd %s/dst/src && find . -type f -exec md5sum {} \\; | sort", mnt))
if sum1 != sum2 {
t.Fatalf("tar extract checksums differ")
}
t.Log("tar extract + verify passed via weed volume")
}
func testWeedVolFsLongSoak(t *testing.T) {
if !clientNode.HasCommand("fio") {
t.Skip("fio required")
}
if testing.Short() {
t.Skip("skipping long soak in short mode")
}
ctx, cancel := context.WithTimeout(context.Background(), 35*time.Minute)
defer cancel()
wt, iscsiC, host := newWeedTestTarget(t, "200M")
dev := startAndLoginWeed(t, ctx, wt, iscsiC, host)
stdout, stderr, code, _ := clientNode.RunRoot(ctx,
fmt.Sprintf("fio --name=wv-soak --filename=%s --rw=randrw --verify=crc32 "+
"--bs=4k --size=100M --randrepeat=1 --direct=1 --ioengine=libaio "+
"--runtime=1800 --time_based --output-format=json", dev))
if code != 0 {
t.Fatalf("fio: code=%d stderr=%s", code, stderr)
}
if strings.Contains(stdout, "\"verify_errors\"") && !strings.Contains(stdout, "\"verify_errors\" : 0") {
t.Fatal("fio verify errors during soak")
}
t.Log("30-minute soak passed via weed volume")
}
func testWeedVolFsPostgres(t *testing.T) {
if !clientNode.HasCommand("pg_isready") {
t.Skip("postgresql not available")
}
t.Skip("postgres integration requires dedicated setup")
}
func testWeedVolFsFsstress(t *testing.T) {
if !clientNode.HasCommand("fsstress") {
t.Skip("fsstress not available (xfstests)")
}
t.Skip("fsstress requires XFS support (P3-BUG-11)")
}
// ensureWeedBinaryDeployed verifies the weed binary was built in TestWeedVol.
// Individual subtests should not be run standalone since they depend on TestWeedVol
// building and deploying the binary first.
func ensureWeedBinaryDeployed(t *testing.T) {
t.Helper()
if weedBinary == "" {
t.Skip("weed binary not built -- run TestWeedVol parent test")
}
// Verify it exists
if _, err := os.Stat(weedBinary); err != nil {
absPath, _ := filepath.Abs(weedBinary)
t.Skipf("weed binary not found at %s", absPath)
}
}
Loading…
Cancel
Save