Browse Source

feat: Phase 5 CP5-1 -- ALUA + multipath failover, 28 tests

Add ALUA (Asymmetric Logical Unit Access) support to the iSCSI target,
enabling dm-multipath on Linux to automatically detect path state changes
and reroute I/O during HA failover without initiator-side intervention.

- ALUAProvider interface with implicit ALUA (TPGS=0x01)
- INQUIRY byte 5 TPGS bits, VPD 0x83 with NAA+TPG+RTP descriptors
- REPORT TARGET PORT GROUPS handler (MAINTENANCE IN SA=0x0A)
- MAINTENANCE OUT rejection (implicit-only, no SET TPG)
- Standby write rejection (NOT_READY ASC=04h ASCQ=0Bh)
- RoleNone maps to Active/Optimized (standalone single-node compatibility)
- NAA-6 device identifier derived from volume UUID
- -tpg-id flag with [1,65535] validation
- dm-multipath config + setup script (group_by_tpg, ALUA prio)
- 12 unit tests + 16 QA adversarial tests + 4 integration tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
98d0e9e631
  1. 2
      weed/storage/blockvol/blockvol.go
  2. 83
      weed/storage/blockvol/iscsi/alua.go
  3. 91
      weed/storage/blockvol/iscsi/cmd/iscsi-target/main.go
  4. 36
      weed/storage/blockvol/iscsi/cmd/iscsi-target/sw-block-multipath-setup.sh
  5. 13
      weed/storage/blockvol/iscsi/cmd/iscsi-target/sw-block-multipath.conf
  6. 727
      weed/storage/blockvol/iscsi/qa_alua_test.go
  7. 122
      weed/storage/blockvol/iscsi/scsi.go
  8. 424
      weed/storage/blockvol/iscsi/scsi_test.go
  9. 4
      weed/storage/blockvol/test/ha_target.go
  10. 455
      weed/storage/blockvol/test/ha_test.go

2
weed/storage/blockvol/blockvol.go

@ -498,6 +498,7 @@ func (v *BlockVol) Info() VolumeInfo {
BlockSize: v.super.BlockSize,
ExtentSize: v.super.ExtentSize,
WALSize: v.super.WALSize,
UUID: v.super.UUID,
Healthy: v.healthy.Load(),
}
}
@ -508,6 +509,7 @@ type VolumeInfo struct {
BlockSize uint32
ExtentSize uint32
WALSize uint64
UUID [16]byte
Healthy bool
}

83
weed/storage/blockvol/iscsi/alua.go

@ -0,0 +1,83 @@
package iscsi
import "encoding/binary"
// ALUA asymmetric access state constants (SPC-5).
const (
ALUAActiveOptimized uint8 = 0x00
ALUAActiveNonOpt uint8 = 0x01
ALUAStandby uint8 = 0x02
ALUAUnavailable uint8 = 0x0E
ALUATransitioning uint8 = 0x0F
)
// ALUAProvider is optionally implemented by BlockDevice to expose ALUA state.
// SCSIHandler probes for this via type assertion.
type ALUAProvider interface {
ALUAState() uint8 // Current asymmetric access state
TPGroupID() uint16 // Target Port Group ID (1-65535)
DeviceNAA() [8]byte // NAA-6 device identifier (from volume UUID)
}
// checkStandbyReject checks if the device is in a non-writable ALUA state
// (Standby, Unavailable, or Transitioning). Returns a SCSIResult pointer
// with NOT READY sense if writes should be rejected, nil otherwise.
func (h *SCSIHandler) checkStandbyReject() *SCSIResult {
alua, ok := h.dev.(ALUAProvider)
if !ok {
return nil
}
state := alua.ALUAState()
if state == ALUAStandby || state == ALUAUnavailable || state == ALUATransitioning {
r := SCSIResult{
Status: SCSIStatusCheckCond,
SenseKey: SenseNotReady,
SenseASC: 0x04, // LOGICAL UNIT NOT READY
SenseASCQ: 0x0B, // TARGET PORT IN STANDBY STATE
}
return &r
}
return nil
}
// reportTargetPortGroups handles REPORT TARGET PORT GROUPS (service action 0x0A
// under MAINTENANCE IN 0xA3). Returns a single TPG descriptor for the local
// target port group. dm-multipath queries each path separately and merges.
func (h *SCSIHandler) reportTargetPortGroups(cdb [16]byte) SCSIResult {
alua, ok := h.dev.(ALUAProvider)
if !ok {
return illegalRequest(ASCInvalidOpcode, ASCQLuk)
}
allocLen := binary.BigEndian.Uint32(cdb[6:10])
state := alua.ALUAState()
tpgID := alua.TPGroupID()
// Response: 4-byte header + 8-byte TPG descriptor + 4-byte target port descriptor = 16 bytes.
data := make([]byte, 16)
// Header: return data length (excludes the 4-byte header itself).
binary.BigEndian.PutUint32(data[0:4], 12)
// TPG descriptor (8 bytes at offset 4).
data[4] = state & 0x0F // Byte 0: asymmetric access state (lower 4 bits)
// Byte 1: supported states flags.
// T_SUP=1, O_SUP=1, S_SUP=1, U_SUP=1 (we use Transitioning for Draining/Rebuilding).
data[5] = 0x0F // bits: 0000_1111 = T_SUP | O_SUP | S_SUP | U_SUP
data[6] = 0x00 // reserved
data[7] = 0x00 // status code: 0 = no status available
binary.BigEndian.PutUint16(data[8:10], tpgID)
data[10] = 0x00 // reserved
data[11] = 0x01 // target port count = 1
// Target port descriptor (4 bytes at offset 12).
data[12] = 0x00 // reserved
data[13] = 0x00 // reserved
binary.BigEndian.PutUint16(data[14:16], 1) // relative target port identifier = 1
if allocLen > 0 && allocLen < uint32(len(data)) {
data = data[:allocLen]
}
return SCSIResult{Status: SCSIStatusGood, Data: data}
}

91
weed/storage/blockvol/iscsi/cmd/iscsi-target/main.go

@ -28,6 +28,7 @@ func main() {
iqn := flag.String("iqn", "iqn.2024.com.seaweedfs:vol1", "target IQN")
create := flag.Bool("create", false, "create a new volume file")
size := flag.String("size", "1G", "volume size (e.g., 1G, 100M) -- used with -create")
tpgID := flag.Int("tpg-id", 1, "target port group ID for ALUA (1-65535)")
adminAddr := flag.String("admin", "", "HTTP admin listen address (e.g. 127.0.0.1:8080; empty = disabled)")
adminToken := flag.String("admin-token", "", "optional admin auth token (empty = no auth)")
replicaData := flag.String("replica-data", "", "replica receiver data listen address (e.g. :9001; empty = disabled)")
@ -40,6 +41,9 @@ func main() {
flag.Usage()
os.Exit(1)
}
if *tpgID < 1 || *tpgID > 65535 {
log.Fatalf("invalid -tpg-id %d: must be 1-65535", *tpgID)
}
logger := log.New(os.Stdout, "[iscsi] ", log.LstdFlags)
@ -51,15 +55,24 @@ func main() {
if parseErr != nil {
log.Fatalf("invalid size %q: %v", *size, parseErr)
}
vol, err = blockvol.CreateBlockVol(*volPath, blockvol.CreateOptions{
VolumeSize: volSize,
BlockSize: 4096,
WALSize: 64 * 1024 * 1024,
})
if err != nil {
log.Fatalf("create volume: %v", err)
if _, statErr := os.Stat(*volPath); statErr == nil {
// File exists -- open it instead of failing
vol, err = blockvol.OpenBlockVol(*volPath)
if err != nil {
log.Fatalf("open existing volume: %v", err)
}
logger.Printf("opened existing volume: %s", *volPath)
} else {
vol, err = blockvol.CreateBlockVol(*volPath, blockvol.CreateOptions{
VolumeSize: volSize,
BlockSize: 4096,
WALSize: 64 * 1024 * 1024,
})
if err != nil {
log.Fatalf("create volume: %v", err)
}
logger.Printf("created volume: %s (%s)", *volPath, *size)
}
logger.Printf("created volume: %s (%s)", *volPath, *size)
} else {
vol, err = blockvol.OpenBlockVol(*volPath)
if err != nil {
@ -100,9 +113,9 @@ func main() {
logger.Printf("admin server: %s", ln.Addr())
}
// Create adapter with latency instrumentation
// Create adapter with ALUA support and latency instrumentation
adapter := &instrumentedAdapter{
inner: &blockVolAdapter{vol: vol},
inner: &blockVolAdapter{vol: vol, tpgID: uint16(*tpgID)},
logger: logger,
}
@ -143,9 +156,10 @@ func main() {
logger.Println("target stopped")
}
// blockVolAdapter wraps BlockVol to implement iscsi.BlockDevice.
// blockVolAdapter wraps BlockVol to implement iscsi.BlockDevice and iscsi.ALUAProvider.
type blockVolAdapter struct {
vol *blockvol.BlockVol
vol *blockvol.BlockVol
tpgID uint16
}
func (a *blockVolAdapter) ReadAt(lba uint64, length uint32) ([]byte, error) {
@ -164,6 +178,39 @@ func (a *blockVolAdapter) BlockSize() uint32 { return a.vol.Info().BlockSize }
func (a *blockVolAdapter) VolumeSize() uint64 { return a.vol.Info().VolumeSize }
func (a *blockVolAdapter) IsHealthy() bool { return a.vol.Info().Healthy }
// ALUAProvider implementation.
func (a *blockVolAdapter) ALUAState() uint8 { return roleToALUA(a.vol.Role()) }
func (a *blockVolAdapter) TPGroupID() uint16 { return a.tpgID }
func (a *blockVolAdapter) DeviceNAA() [8]byte { return uuidToNAA(a.vol.Info().UUID) }
// roleToALUA maps a BlockVol Role to an ALUA asymmetric access state.
// RoleNone maps to Active/Optimized so standalone single-node targets
// (no assignment from master) can accept writes.
func roleToALUA(r blockvol.Role) uint8 {
switch r {
case blockvol.RolePrimary, blockvol.RoleNone:
return iscsi.ALUAActiveOptimized
case blockvol.RoleReplica:
return iscsi.ALUAStandby
case blockvol.RoleStale:
return iscsi.ALUAUnavailable
case blockvol.RoleRebuilding, blockvol.RoleDraining:
return iscsi.ALUATransitioning
default:
return iscsi.ALUAStandby
}
}
// uuidToNAA converts a 16-byte UUID to an 8-byte NAA-6 identifier.
// NAA-6 format: nibble 6 (NAA=6) followed by 60 bits from the UUID.
func uuidToNAA(uuid [16]byte) [8]byte {
var naa [8]byte
// Set NAA=6 in the high nibble of the first byte.
naa[0] = 0x60 | (uuid[0] & 0x0F)
copy(naa[1:], uuid[1:8])
return naa
}
// instrumentedAdapter wraps a BlockDevice and logs latency stats periodically.
type instrumentedAdapter struct {
inner iscsi.BlockDevice
@ -223,6 +270,26 @@ func (a *instrumentedAdapter) BlockSize() uint32 { return a.inner.BlockSize() }
func (a *instrumentedAdapter) VolumeSize() uint64 { return a.inner.VolumeSize() }
func (a *instrumentedAdapter) IsHealthy() bool { return a.inner.IsHealthy() }
// ALUAProvider proxy: delegate to inner device if it implements ALUAProvider.
func (a *instrumentedAdapter) ALUAState() uint8 {
if p, ok := a.inner.(iscsi.ALUAProvider); ok {
return p.ALUAState()
}
return iscsi.ALUAStandby
}
func (a *instrumentedAdapter) TPGroupID() uint16 {
if p, ok := a.inner.(iscsi.ALUAProvider); ok {
return p.TPGroupID()
}
return 1
}
func (a *instrumentedAdapter) DeviceNAA() [8]byte {
if p, ok := a.inner.(iscsi.ALUAProvider); ok {
return p.DeviceNAA()
}
return [8]byte{}
}
// StartStatsLogger runs a goroutine that logs performance stats every interval.
func (a *instrumentedAdapter) StartStatsLogger(interval time.Duration) {
go func() {

36
weed/storage/blockvol/iscsi/cmd/iscsi-target/sw-block-multipath-setup.sh

@ -0,0 +1,36 @@
#!/usr/bin/env bash
# sw-block-multipath-setup.sh -- install SeaweedFS BlockVol multipath config.
# Usage: sudo ./sw-block-multipath-setup.sh
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
CONF_SRC="${SCRIPT_DIR}/sw-block-multipath.conf"
CONF_DIR="/etc/multipath/conf.d"
CONF_DST="${CONF_DIR}/sw-block.conf"
if [ ! -f "$CONF_SRC" ]; then
echo "ERROR: $CONF_SRC not found" >&2
exit 1
fi
# Ensure required packages are installed
for pkg in multipath-tools sg3-utils; do
if ! dpkg -s "$pkg" &>/dev/null; then
echo "Installing $pkg..."
apt-get install -y "$pkg"
fi
done
# Install config
mkdir -p "$CONF_DIR"
cp "$CONF_SRC" "$CONF_DST"
echo "Installed $CONF_DST"
# Restart multipathd
systemctl restart multipathd
echo "multipathd restarted"
# Show current multipath state
echo ""
echo "=== multipath -ll ==="
multipath -ll || true

13
weed/storage/blockvol/iscsi/cmd/iscsi-target/sw-block-multipath.conf

@ -0,0 +1,13 @@
## SeaweedFS BlockVol multipath configuration.
## Install to /etc/multipath/conf.d/sw-block.conf (or append to /etc/multipath.conf).
## Then: systemctl restart multipathd && multipath -ll
devices {
device {
vendor "SeaweedF"
product "BlockVol"
path_grouping_policy group_by_tpg
prio alua
failback followover
no_path_retry queue
}
}

727
weed/storage/blockvol/iscsi/qa_alua_test.go

@ -0,0 +1,727 @@
package iscsi
import (
"encoding/binary"
"sync"
"sync/atomic"
"testing"
)
// TestQAALUA runs adversarial tests for ALUA implementation.
func TestQAALUA(t *testing.T) {
tests := []struct {
name string
fn func(*testing.T)
}{
// Group A: State boundary + transition
{"standby_metadata_cmds_allowed", testQA_StandbyMetadataCmdsAllowed},
{"unavailable_rejects_writes", testQA_UnavailableRejectsWrites},
{"transitioning_rejects_writes", testQA_TransitioningRejectsWrites},
{"active_allows_all_ops", testQA_ActiveAllowsAllOps},
{"state_change_mid_stream", testQA_StateChangeMidStream},
// Group B: VPD 0x83 edge cases
{"vpd83_no_alua_single_descriptor", testQA_VPD83_NoALUA_SingleDescriptor},
{"vpd83_truncation_mid_descriptor", testQA_VPD83_TruncationMidDescriptor},
{"vpd83_naa_high_bits_uuid", testQA_VPD83_NAAHighBitsUUID},
// Group C: REPORT TPG edge cases
{"report_tpg_no_alua_rejected", testQA_ReportTPG_NoALUA_Rejected},
{"report_tpg_small_alloc_len", testQA_ReportTPG_SmallAllocLen},
{"report_tpg_tpgid_boundary", testQA_ReportTPG_TPGIDBoundary},
{"report_tpg_all_states", testQA_ReportTPG_AllStates},
// Group D: Concurrency
{"concurrent_state_reads", testQA_ConcurrentStateReads},
{"concurrent_standby_reject", testQA_ConcurrentStandbyReject},
// Group E: INQUIRY interaction
{"inquiry_tpgs_preserves_cmdque", testQA_InquiryTPGS_PreservesCmdQue},
{"inquiry_vpd00_unchanged_with_alua", testQA_InquiryVPD00_UnchangedWithALUA},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.fn(t)
})
}
}
// --- Group A: State boundary + transition ---
// testQA_StandbyMetadataCmdsAllowed verifies that metadata/discovery commands
// work on standby paths. dm-multipath sends these to probe path health.
func testQA_StandbyMetadataCmdsAllowed(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAStandby, 1)
h := NewSCSIHandler(dev)
// TEST UNIT READY — must succeed (path health check)
var cdb [16]byte
cdb[0] = ScsiTestUnitReady
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("TUR on Standby: expected GOOD, got %d", r.Status)
}
// INQUIRY — must succeed
cdb = [16]byte{}
cdb[0] = ScsiInquiry
binary.BigEndian.PutUint16(cdb[3:5], 96)
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("INQUIRY on Standby: expected GOOD, got %d", r.Status)
}
// READ CAPACITY 10 — must succeed
cdb = [16]byte{}
cdb[0] = ScsiReadCapacity10
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("READ CAPACITY(10) on Standby: expected GOOD, got %d", r.Status)
}
// READ CAPACITY 16 — must succeed
cdb = [16]byte{}
cdb[0] = ScsiServiceActionIn16
cdb[1] = ScsiSAReadCapacity16
binary.BigEndian.PutUint32(cdb[10:14], 32)
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("READ CAPACITY(16) on Standby: expected GOOD, got %d", r.Status)
}
// REPORT LUNS — must succeed
cdb = [16]byte{}
cdb[0] = ScsiReportLuns
binary.BigEndian.PutUint32(cdb[6:10], 256)
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("REPORT LUNS on Standby: expected GOOD, got %d", r.Status)
}
// MODE SENSE 6 — must succeed
cdb = [16]byte{}
cdb[0] = ScsiModeSense6
cdb[2] = 0x3f // all pages
cdb[4] = 255
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("MODE SENSE(6) on Standby: expected GOOD, got %d", r.Status)
}
// REQUEST SENSE — must succeed
cdb = [16]byte{}
cdb[0] = ScsiRequestSense
cdb[4] = 18
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("REQUEST SENSE on Standby: expected GOOD, got %d", r.Status)
}
// REPORT TARGET PORT GROUPS — must succeed (ALUA path probing)
cdb = [16]byte{}
cdb[0] = ScsiMaintenanceIn
cdb[1] = 0x0a
binary.BigEndian.PutUint32(cdb[6:10], 255)
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("REPORT TPG on Standby: expected GOOD, got %d", r.Status)
}
// Read16 — reads must work on standby for path probing
cdb = [16]byte{}
cdb[0] = ScsiRead16
binary.BigEndian.PutUint64(cdb[2:10], 0)
binary.BigEndian.PutUint32(cdb[10:14], 1)
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("READ(16) on Standby: expected GOOD, got %d", r.Status)
}
}
// testQA_UnavailableRejectsWrites verifies writes are rejected when ALUA state
// is Unavailable (RoleStale). Same sense as Standby.
func testQA_UnavailableRejectsWrites(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAUnavailable, 1)
h := NewSCSIHandler(dev)
opcodes := []struct {
name string
opcode uint8
}{
{"Write10", ScsiWrite10},
{"Write16", ScsiWrite16},
{"SyncCache10", ScsiSyncCache10},
{"SyncCache16", ScsiSyncCache16},
}
for _, op := range opcodes {
var cdb [16]byte
cdb[0] = op.opcode
if op.opcode == ScsiWrite10 {
binary.BigEndian.PutUint16(cdb[7:9], 1)
} else if op.opcode == ScsiWrite16 {
binary.BigEndian.PutUint32(cdb[10:14], 1)
}
r := h.HandleCommand(cdb, make([]byte, 4096))
if r.SenseKey != SenseNotReady {
t.Fatalf("%s on Unavailable: expected NOT_READY, got %02x", op.name, r.SenseKey)
}
if r.SenseASC != 0x04 || r.SenseASCQ != 0x0B {
t.Fatalf("%s on Unavailable: expected ASC=04h ASCQ=0Bh, got %02x/%02x",
op.name, r.SenseASC, r.SenseASCQ)
}
}
}
// testQA_TransitioningRejectsWrites verifies writes are rejected during
// Transitioning state (RoleRebuilding, RoleDraining).
func testQA_TransitioningRejectsWrites(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUATransitioning, 1)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiWrite10
binary.BigEndian.PutUint32(cdb[2:6], 0)
binary.BigEndian.PutUint16(cdb[7:9], 1)
r := h.HandleCommand(cdb, make([]byte, 4096))
if r.SenseKey != SenseNotReady {
t.Fatalf("Write10 on Transitioning: expected NOT_READY, got %02x", r.SenseKey)
}
if r.SenseASCQ != 0x0B {
t.Fatalf("Write10 on Transitioning: expected ASCQ=0Bh (standby), got %02x", r.SenseASCQ)
}
// But reads should still work (transitioning is still accessible for reads)
cdb = [16]byte{}
cdb[0] = ScsiRead10
binary.BigEndian.PutUint32(cdb[2:6], 0)
binary.BigEndian.PutUint16(cdb[7:9], 1)
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("Read10 on Transitioning: expected GOOD, got status %d (sense %02x)", r.Status, r.SenseKey)
}
}
// testQA_ActiveAllowsAllOps verifies that Active/Optimized allows all operations.
func testQA_ActiveAllowsAllOps(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAActiveOptimized, 1)
h := NewSCSIHandler(dev)
// Write10
var cdb [16]byte
cdb[0] = ScsiWrite10
binary.BigEndian.PutUint32(cdb[2:6], 0)
binary.BigEndian.PutUint16(cdb[7:9], 1)
r := h.HandleCommand(cdb, make([]byte, 4096))
if r.Status != SCSIStatusGood {
t.Fatalf("Write10 on Active: expected GOOD, got status %d", r.Status)
}
// Read10
cdb = [16]byte{}
cdb[0] = ScsiRead10
binary.BigEndian.PutUint32(cdb[2:6], 0)
binary.BigEndian.PutUint16(cdb[7:9], 1)
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("Read10 on Active: expected GOOD, got status %d", r.Status)
}
// SyncCache10
cdb = [16]byte{}
cdb[0] = ScsiSyncCache10
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("SyncCache10 on Active: expected GOOD, got status %d", r.Status)
}
// Unmap
unmapData := make([]byte, 24)
binary.BigEndian.PutUint16(unmapData[0:2], 22)
binary.BigEndian.PutUint16(unmapData[2:4], 16)
binary.BigEndian.PutUint64(unmapData[8:16], 0)
binary.BigEndian.PutUint32(unmapData[16:20], 1)
cdb = [16]byte{}
cdb[0] = ScsiUnmap
r = h.HandleCommand(cdb, unmapData)
if r.Status != SCSIStatusGood {
t.Fatalf("Unmap on Active: expected GOOD, got status %d", r.Status)
}
}
// mutableALUADevice allows changing ALUA state between calls to simulate
// role transitions during I/O.
type mutableALUADevice struct {
*mockBlockDevice
state atomic.Uint32
tpgID uint16
naa [8]byte
}
func newMutableALUADevice(volumeSize uint64, initialState uint8, tpgID uint16) *mutableALUADevice {
d := &mutableALUADevice{
mockBlockDevice: newMockDevice(volumeSize),
tpgID: tpgID,
naa: [8]byte{0x60, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x01},
}
d.state.Store(uint32(initialState))
return d
}
func (m *mutableALUADevice) ALUAState() uint8 { return uint8(m.state.Load()) }
func (m *mutableALUADevice) TPGroupID() uint16 { return m.tpgID }
func (m *mutableALUADevice) DeviceNAA() [8]byte { return m.naa }
func (m *mutableALUADevice) SetState(s uint8) { m.state.Store(uint32(s)) }
// testQA_StateChangeMidStream verifies that ALUA state changes between commands
// are reflected immediately. A role change (Active→Standby) must fence writes
// on the very next command.
func testQA_StateChangeMidStream(t *testing.T) {
dev := newMutableALUADevice(100*4096, ALUAActiveOptimized, 1)
h := NewSCSIHandler(dev)
// Write succeeds while Active
var cdb [16]byte
cdb[0] = ScsiWrite10
binary.BigEndian.PutUint32(cdb[2:6], 0)
binary.BigEndian.PutUint16(cdb[7:9], 1)
r := h.HandleCommand(cdb, make([]byte, 4096))
if r.Status != SCSIStatusGood {
t.Fatalf("Write while Active: expected GOOD, got %d", r.Status)
}
// Transition to Standby (simulates demotion)
dev.SetState(ALUAStandby)
// Next write must be rejected immediately
r = h.HandleCommand(cdb, make([]byte, 4096))
if r.SenseKey != SenseNotReady {
t.Fatalf("Write after transition to Standby: expected NOT_READY, got %02x", r.SenseKey)
}
// Reads still work
cdb = [16]byte{}
cdb[0] = ScsiRead10
binary.BigEndian.PutUint32(cdb[2:6], 0)
binary.BigEndian.PutUint16(cdb[7:9], 1)
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("Read after transition to Standby: expected GOOD, got %d", r.Status)
}
// Transition back to Active
dev.SetState(ALUAActiveOptimized)
// Writes work again
cdb = [16]byte{}
cdb[0] = ScsiWrite10
binary.BigEndian.PutUint32(cdb[2:6], 0)
binary.BigEndian.PutUint16(cdb[7:9], 1)
r = h.HandleCommand(cdb, make([]byte, 4096))
if r.Status != SCSIStatusGood {
t.Fatalf("Write after re-activation: expected GOOD, got %d", r.Status)
}
// Transition to Transitioning
dev.SetState(ALUATransitioning)
// REPORT TPG should show Transitioning
cdb = [16]byte{}
cdb[0] = ScsiMaintenanceIn
cdb[1] = 0x0a
binary.BigEndian.PutUint32(cdb[6:10], 255)
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("REPORT TPG during Transitioning: expected GOOD, got %d", r.Status)
}
state := r.Data[4] & 0x0F
if state != ALUATransitioning {
t.Fatalf("REPORT TPG state: got %02x, want %02x", state, ALUATransitioning)
}
}
// --- Group B: VPD 0x83 edge cases ---
// testQA_VPD83_NoALUA_SingleDescriptor verifies that without ALUAProvider,
// VPD 0x83 returns only the NAA descriptor (no TPG, no RTP).
func testQA_VPD83_NoALUA_SingleDescriptor(t *testing.T) {
dev := newMockDevice(100 * 4096) // plain device, no ALUAProvider
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiInquiry
cdb[1] = 0x01
cdb[2] = 0x83
binary.BigEndian.PutUint16(cdb[3:5], 255)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("status: %d", r.Status)
}
pageLen := binary.BigEndian.Uint16(r.Data[2:4])
// Only NAA descriptor: 4 (header) + 8 (identifier) = 12 bytes
if pageLen != 12 {
t.Fatalf("page length without ALUA: got %d, want 12 (NAA only)", pageLen)
}
// Verify it's type 3 (NAA)
if r.Data[4+1]&0x0F != 0x03 {
t.Fatalf("descriptor type: got %02x, want 03 (NAA)", r.Data[4+1]&0x0F)
}
// Verify hardcoded NAA value (0x60, 0x01, ...)
if r.Data[8] != 0x60 || r.Data[9] != 0x01 {
t.Fatalf("hardcoded NAA: got %02x %02x, want 60 01", r.Data[8], r.Data[9])
}
}
// testQA_VPD83_TruncationMidDescriptor verifies that allocLen smaller than the
// full VPD 0x83 response truncates correctly without panic.
func testQA_VPD83_TruncationMidDescriptor(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAActiveOptimized, 1)
h := NewSCSIHandler(dev)
// Full response is 4 (page header) + 28 (descriptors) = 32 bytes.
// Try various truncation points including mid-descriptor.
allocLens := []uint16{1, 4, 8, 12, 16, 20, 24, 28, 31, 32, 255}
for _, al := range allocLens {
var cdb [16]byte
cdb[0] = ScsiInquiry
cdb[1] = 0x01
cdb[2] = 0x83
binary.BigEndian.PutUint16(cdb[3:5], al)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("allocLen=%d: expected GOOD, got %d", al, r.Status)
}
maxExpected := 32
if int(al) < maxExpected {
maxExpected = int(al)
}
if len(r.Data) != maxExpected {
t.Fatalf("allocLen=%d: data len=%d, want %d", al, len(r.Data), maxExpected)
}
}
}
// testQA_VPD83_NAAHighBitsUUID verifies that uuidToNAA always produces a valid
// NAA-6 identifier even when UUID[0] has all bits set. The high nibble of byte 0
// must always be 0x6_.
func testQA_VPD83_NAAHighBitsUUID(t *testing.T) {
// Test with various UUID[0] values
testCases := []byte{0x00, 0x0F, 0xF0, 0xFF, 0x5A, 0xA5}
for _, b := range testCases {
dev := &mockALUADevice{
mockBlockDevice: newMockDevice(100 * 4096),
aluaState: ALUAActiveOptimized,
tpgID: 1,
naa: [8]byte{}, // will be set below
}
// Simulate what uuidToNAA does in main.go
var uuid [16]byte
uuid[0] = b
for i := 1; i < 16; i++ {
uuid[i] = byte(i)
}
// Manually compute NAA
dev.naa[0] = 0x60 | (uuid[0] & 0x0F)
copy(dev.naa[1:], uuid[1:8])
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiInquiry
cdb[1] = 0x01
cdb[2] = 0x83
binary.BigEndian.PutUint16(cdb[3:5], 255)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("UUID[0]=%02x: status %d", b, r.Status)
}
// NAA byte is at offset 8 (4 page header + 4 desc header)
naaByte := r.Data[8]
if naaByte&0xF0 != 0x60 {
t.Fatalf("UUID[0]=%02x: NAA high nibble = %02x, want 0x6_", b, naaByte&0xF0)
}
// Low nibble should be UUID[0]'s low nibble
wantLow := b & 0x0F
if naaByte&0x0F != wantLow {
t.Fatalf("UUID[0]=%02x: NAA low nibble = %02x, want %02x", b, naaByte&0x0F, wantLow)
}
}
}
// --- Group C: REPORT TPG edge cases ---
// testQA_ReportTPG_NoALUA_Rejected verifies that REPORT TARGET PORT GROUPS
// returns ILLEGAL_REQUEST when the device doesn't implement ALUAProvider.
func testQA_ReportTPG_NoALUA_Rejected(t *testing.T) {
dev := newMockDevice(100 * 4096) // no ALUAProvider
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiMaintenanceIn
cdb[1] = 0x0a
binary.BigEndian.PutUint32(cdb[6:10], 255)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusCheckCond {
t.Fatalf("expected CHECK_CONDITION, got %d", r.Status)
}
if r.SenseKey != SenseIllegalRequest {
t.Fatalf("expected ILLEGAL_REQUEST, got %02x", r.SenseKey)
}
}
// testQA_ReportTPG_SmallAllocLen verifies REPORT TPG handles allocation lengths
// smaller than the full 16-byte response without panic.
func testQA_ReportTPG_SmallAllocLen(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAActiveOptimized, 1)
h := NewSCSIHandler(dev)
allocLens := []uint32{1, 4, 8, 12, 15, 16, 255}
for _, al := range allocLens {
var cdb [16]byte
cdb[0] = ScsiMaintenanceIn
cdb[1] = 0x0a
binary.BigEndian.PutUint32(cdb[6:10], al)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("allocLen=%d: expected GOOD, got %d", al, r.Status)
}
maxExpected := uint32(16)
if al < maxExpected {
maxExpected = al
}
if uint32(len(r.Data)) != maxExpected {
t.Fatalf("allocLen=%d: data len=%d, want %d", al, len(r.Data), maxExpected)
}
}
}
// testQA_ReportTPG_TPGIDBoundary verifies TPG IDs at boundary values:
// 0 (reserved but we don't reject), 1 (default), 0xFFFF (max).
func testQA_ReportTPG_TPGIDBoundary(t *testing.T) {
tpgIDs := []uint16{0, 1, 2, 255, 0x7FFF, 0xFFFF}
for _, id := range tpgIDs {
dev := newMockALUADevice(100*4096, ALUAActiveOptimized, id)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiMaintenanceIn
cdb[1] = 0x0a
binary.BigEndian.PutUint32(cdb[6:10], 255)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("TPGID=%d: expected GOOD, got %d", id, r.Status)
}
gotID := binary.BigEndian.Uint16(r.Data[8:10])
if gotID != id {
t.Fatalf("TPGID=%d: response has %d", id, gotID)
}
}
}
// testQA_ReportTPG_AllStates verifies REPORT TPG correctly reports all 5 ALUA states.
func testQA_ReportTPG_AllStates(t *testing.T) {
states := []struct {
state uint8
name string
}{
{ALUAActiveOptimized, "ActiveOptimized"},
{ALUAActiveNonOpt, "ActiveNonOptimized"},
{ALUAStandby, "Standby"},
{ALUAUnavailable, "Unavailable"},
{ALUATransitioning, "Transitioning"},
}
for _, s := range states {
dev := newMockALUADevice(100*4096, s.state, 1)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiMaintenanceIn
cdb[1] = 0x0a
binary.BigEndian.PutUint32(cdb[6:10], 255)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("%s: expected GOOD, got %d", s.name, r.Status)
}
got := r.Data[4] & 0x0F
if got != s.state {
t.Fatalf("%s: state=%02x, want %02x", s.name, got, s.state)
}
}
}
// --- Group D: Concurrency ---
// testQA_ConcurrentStateReads verifies that concurrent REPORT TPG calls during
// state transitions don't race or crash.
func testQA_ConcurrentStateReads(t *testing.T) {
dev := newMutableALUADevice(100*4096, ALUAActiveOptimized, 1)
h := NewSCSIHandler(dev)
var wg sync.WaitGroup
const goroutines = 8
const iterations = 500
// Goroutines querying REPORT TPG
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < iterations; j++ {
var cdb [16]byte
cdb[0] = ScsiMaintenanceIn
cdb[1] = 0x0a
binary.BigEndian.PutUint32(cdb[6:10], 255)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Errorf("concurrent REPORT TPG: status %d", r.Status)
return
}
// State should be one of the valid ALUA states
state := r.Data[4] & 0x0F
if state != ALUAActiveOptimized && state != ALUAStandby &&
state != ALUAUnavailable && state != ALUATransitioning {
t.Errorf("concurrent REPORT TPG: invalid state %02x", state)
return
}
}
}()
}
// Concurrently flip state
wg.Add(1)
go func() {
defer wg.Done()
states := []uint8{ALUAActiveOptimized, ALUAStandby, ALUAUnavailable, ALUATransitioning}
for j := 0; j < iterations*2; j++ {
dev.SetState(states[j%len(states)])
}
}()
wg.Wait()
}
// testQA_ConcurrentStandbyReject verifies that concurrent write attempts on a
// standby device all get properly rejected without races.
func testQA_ConcurrentStandbyReject(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAStandby, 1)
h := NewSCSIHandler(dev)
var wg sync.WaitGroup
var rejectCount atomic.Int64
const goroutines = 8
const iterations = 200
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < iterations; j++ {
var cdb [16]byte
cdb[0] = ScsiWrite10
binary.BigEndian.PutUint32(cdb[2:6], 0)
binary.BigEndian.PutUint16(cdb[7:9], 1)
r := h.HandleCommand(cdb, make([]byte, 4096))
if r.SenseKey == SenseNotReady && r.SenseASC == 0x04 && r.SenseASCQ == 0x0B {
rejectCount.Add(1)
} else {
t.Errorf("concurrent standby write: unexpected sense %02x/%02x/%02x",
r.SenseKey, r.SenseASC, r.SenseASCQ)
return
}
}
}()
}
wg.Wait()
expected := int64(goroutines * iterations)
if rejectCount.Load() != expected {
t.Fatalf("rejected %d/%d writes", rejectCount.Load(), expected)
}
}
// --- Group E: INQUIRY interaction ---
// testQA_InquiryTPGS_PreservesCmdQue verifies that setting TPGS bits doesn't
// clobber the CmdQue bit in byte 7 or any other INQUIRY fields.
func testQA_InquiryTPGS_PreservesCmdQue(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAActiveOptimized, 1)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiInquiry
binary.BigEndian.PutUint16(cdb[3:5], 96)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("status: %d", r.Status)
}
// Byte 5: TPGS=01 (0x10), nothing else should be set
if r.Data[5] != 0x10 {
t.Fatalf("byte 5: got %02x, want 0x10 (TPGS=01 only)", r.Data[5])
}
// Byte 7: CmdQue=1 (0x02) must still be set
if r.Data[7]&0x02 == 0 {
t.Fatalf("byte 7: CmdQue bit not set (%02x)", r.Data[7])
}
// Byte 0: peripheral device type still 0x00
if r.Data[0] != 0x00 {
t.Fatalf("byte 0: got %02x, want 0x00", r.Data[0])
}
// Vendor (bytes 8-15) should be "SeaweedF"
vendor := string(r.Data[8:16])
if vendor != "SeaweedF" {
t.Fatalf("vendor: got %q, want %q", vendor, "SeaweedF")
}
// Product (bytes 16-31) should start with "BlockVol"
product := string(r.Data[16:24])
if product != "BlockVol" {
t.Fatalf("product: got %q, want %q", product, "BlockVol")
}
}
// testQA_InquiryVPD00_UnchangedWithALUA verifies that the supported VPD pages
// list (0x00) doesn't change with ALUA — we don't add new VPD pages, only
// modify existing 0x83.
func testQA_InquiryVPD00_UnchangedWithALUA(t *testing.T) {
devNoALUA := newMockDevice(100 * 4096)
devALUA := newMockALUADevice(100*4096, ALUAActiveOptimized, 1)
hNoALUA := NewSCSIHandler(devNoALUA)
hALUA := NewSCSIHandler(devALUA)
var cdb [16]byte
cdb[0] = ScsiInquiry
cdb[1] = 0x01
cdb[2] = 0x00
binary.BigEndian.PutUint16(cdb[3:5], 255)
r1 := hNoALUA.HandleCommand(cdb, nil)
r2 := hALUA.HandleCommand(cdb, nil)
if r1.Status != SCSIStatusGood || r2.Status != SCSIStatusGood {
t.Fatalf("status: noALUA=%d alua=%d", r1.Status, r2.Status)
}
if len(r1.Data) != len(r2.Data) {
t.Fatalf("VPD 0x00 length differs: noALUA=%d alua=%d", len(r1.Data), len(r2.Data))
}
for i := range r1.Data {
if r1.Data[i] != r2.Data[i] {
t.Fatalf("VPD 0x00 byte %d differs: noALUA=%02x alua=%02x", i, r1.Data[i], r2.Data[i])
}
}
}

122
weed/storage/blockvol/iscsi/scsi.go

@ -27,7 +27,8 @@ const (
ScsiWriteSame16 uint8 = 0x93
ScsiServiceActionIn16 uint8 = 0x9e // READ CAPACITY(16), etc.
ScsiReportLuns uint8 = 0xa0
ScsiMaintenanceIn uint8 = 0xa3 // REPORT SUPPORTED OPCODES, etc.
ScsiMaintenanceIn uint8 = 0xa3 // REPORT SUPPORTED OPCODES, REPORT TARGET PORT GROUPS, etc.
ScsiMaintenanceOut uint8 = 0xa4 // SET TARGET PORT GROUPS, etc.
)
// Service action for READ CAPACITY (16)
@ -115,10 +116,19 @@ func (h *SCSIHandler) HandleCommand(cdb [16]byte, dataOut []byte) SCSIResult {
case ScsiRead10:
return h.read10(cdb)
case ScsiWrite10:
if r := h.checkStandbyReject(); r != nil {
return *r
}
return h.write10(cdb, dataOut)
case ScsiSyncCache10:
if r := h.checkStandbyReject(); r != nil {
return *r
}
return h.syncCache()
case ScsiUnmap:
if r := h.checkStandbyReject(); r != nil {
return *r
}
return h.unmap(cdb, dataOut)
case ScsiModeSelect10:
return h.modeSelect10(cdb, dataOut)
@ -131,10 +141,19 @@ func (h *SCSIHandler) HandleCommand(cdb [16]byte, dataOut []byte) SCSIResult {
case ScsiRead16:
return h.read16(cdb)
case ScsiWrite16:
if r := h.checkStandbyReject(); r != nil {
return *r
}
return h.write16(cdb, dataOut)
case ScsiSyncCache16:
if r := h.checkStandbyReject(); r != nil {
return *r
}
return h.syncCache()
case ScsiWriteSame16:
if r := h.checkStandbyReject(); r != nil {
return *r
}
return h.writeSame16(cdb, dataOut)
case ScsiServiceActionIn16:
sa := cdb[1] & 0x1f
@ -146,6 +165,8 @@ func (h *SCSIHandler) HandleCommand(cdb [16]byte, dataOut []byte) SCSIResult {
return h.reportLuns(cdb)
case ScsiMaintenanceIn:
return h.maintenanceIn(cdb)
case ScsiMaintenanceOut:
return illegalRequest(ASCInvalidOpcode, ASCQLuk)
default:
return illegalRequest(ASCInvalidOpcode, ASCQLuk)
}
@ -211,16 +232,21 @@ func (h *SCSIHandler) persistReserveOut(cdb [16]byte, dataOut []byte) SCSIResult
}
// maintenanceIn handles MAINTENANCE IN (0xA3).
// Service action 0x0A = REPORT TARGET PORT GROUPS (ALUA).
// Service action 0x0C = REPORT SUPPORTED OPERATION CODES.
// Windows sends this to discover which commands we support.
// Windows sends 0x0C to discover which commands we support.
func (h *SCSIHandler) maintenanceIn(cdb [16]byte) SCSIResult {
sa := cdb[1] & 0x1f
if sa == 0x0c {
switch sa {
case 0x0a:
return h.reportTargetPortGroups(cdb)
case 0x0c:
// REPORT SUPPORTED OPERATION CODES -- return empty (not supported).
// This tells Windows to probe commands individually.
return illegalRequest(ASCInvalidOpcode, ASCQLuk)
default:
return illegalRequest(ASCInvalidOpcode, ASCQLuk)
}
return illegalRequest(ASCInvalidOpcode, ASCQLuk)
}
func (h *SCSIHandler) inquiry(cdb [16]byte) SCSIResult {
@ -243,6 +269,10 @@ func (h *SCSIHandler) inquiry(cdb [16]byte) SCSIResult {
data[3] = 0x02 // Response data format = 2 (SPC-2+)
data[4] = 91 // Additional length (96-5)
data[5] = 0x00 // SCCS, ACC, TPGS, 3PC
// If ALUA is supported, set TPGS = implicit (bits 5:4 = 01).
if _, ok := h.dev.(ALUAProvider); ok {
data[5] |= 0x10 // TPGS = 01 (implicit ALUA)
}
data[6] = 0x00 // Obsolete, EncServ, VS, MultiP
data[7] = 0x02 // CmdQue=1 (supports command queuing)
@ -290,23 +320,7 @@ func (h *SCSIHandler) inquiryVPD(pageCode uint8, allocLen uint16) SCSIResult {
return SCSIResult{Status: SCSIStatusGood, Data: data}
case 0x83: // Device identification
// NAA identifier (8 bytes)
naaID := []byte{
0x01, // code set: binary
0x03, // identifier type: NAA
0x00, // reserved
0x08, // identifier length
0x60, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // NAA-6 fake
}
data := make([]byte, 4+len(naaID))
data[0] = 0x00 // device type
data[1] = 0x83 // page code
binary.BigEndian.PutUint16(data[2:4], uint16(len(naaID)))
copy(data[4:], naaID)
if int(allocLen) < len(data) {
data = data[:allocLen]
}
return SCSIResult{Status: SCSIStatusGood, Data: data}
return h.inquiryVPD83(allocLen)
case 0xb0: // Block Limits (SBC-4, Section 6.6.4)
return h.inquiryVPDB0(allocLen)
@ -394,6 +408,72 @@ func (h *SCSIHandler) inquiryVPDB2(allocLen uint16) SCSIResult {
return SCSIResult{Status: SCSIStatusGood, Data: data}
}
// inquiryVPD83 returns the Device Identification VPD page (0x83).
// When ALUAProvider is available, returns three descriptors needed by dm-multipath:
// 1. NAA-6 identifier (logical unit identity, UUID-derived)
// 2. Target Port Group designator (TPG identity)
// 3. Relative Target Port designator (target port identity)
//
// Without ALUAProvider, returns only the NAA descriptor with a hardcoded value.
func (h *SCSIHandler) inquiryVPD83(allocLen uint16) SCSIResult {
alua, hasALUA := h.dev.(ALUAProvider)
// Descriptor 1: NAA-6 identifier (always present).
var naaBytes [8]byte
if hasALUA {
naaBytes = alua.DeviceNAA()
} else {
naaBytes = [8]byte{0x60, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}
}
naaDesc := []byte{
0x01, // code set: binary
0x03, // PIV=0, association=00 (logical unit), type=3 (NAA)
0x00, // reserved
0x08, // identifier length
}
naaDesc = append(naaDesc, naaBytes[:]...)
descs := make([]byte, 0, 32)
descs = append(descs, naaDesc...)
if hasALUA {
tpgID := alua.TPGroupID()
// Descriptor 2: Target Port Group designator (type 0x05).
tpgDesc := []byte{
0x01, // code set: binary
0x15, // PIV=1, association=01 (target port), type=5 (target port group)
0x00, // reserved
0x04, // identifier length
0x00, 0x00, // reserved
byte(tpgID >> 8), byte(tpgID), // TPG ID big-endian
}
descs = append(descs, tpgDesc...)
// Descriptor 3: Relative Target Port designator (type 0x04).
rtpDesc := []byte{
0x01, // code set: binary
0x14, // PIV=1, association=01 (target port), type=4 (relative target port)
0x00, // reserved
0x04, // identifier length
0x00, 0x00, // reserved
0x00, 0x01, // relative target port identifier = 1
}
descs = append(descs, rtpDesc...)
}
data := make([]byte, 4+len(descs))
data[0] = 0x00 // device type
data[1] = 0x83 // page code
binary.BigEndian.PutUint16(data[2:4], uint16(len(descs)))
copy(data[4:], descs)
if int(allocLen) < len(data) {
data = data[:allocLen]
}
return SCSIResult{Status: SCSIStatusGood, Data: data}
}
func (h *SCSIHandler) readCapacity10() SCSIResult {
blockSize := h.dev.BlockSize()
totalBlocks := h.dev.VolumeSize() / uint64(blockSize)

424
weed/storage/blockvol/iscsi/scsi_test.go

@ -117,6 +117,18 @@ func TestSCSI(t *testing.T) {
{"mode_sense_10", testModeSense10},
{"vpd_b0_block_limits", testVPDB0BlockLimits},
{"vpd_b2_logical_block_prov", testVPDB2LogicalBlockProv},
{"alua_inquiry_tpgs", testALUA_InquiryTPGS},
{"alua_inquiry_no_tpgs", testALUA_InquiryNoTPGS},
{"alua_vpd83_three_descriptors", testALUA_VPD83_ThreeDescriptors},
{"alua_vpd83_shared_naa", testALUA_VPD83_SharedNAA},
{"alua_report_tpg_primary", testALUA_ReportTPG_Primary},
{"alua_report_tpg_replica", testALUA_ReportTPG_Replica},
{"alua_report_tpg_stale", testALUA_ReportTPG_Stale},
{"alua_report_tpg_tpgid_matches", testALUA_ReportTPG_TPGIDMatches},
{"alua_set_tpg_rejected", testALUA_SetTPG_Rejected},
{"alua_standby_rejects_write", testALUA_StandbyRejectsWrite},
{"alua_role_none_allows_writes", testALUA_RoleNone_AllowsWrites},
{"alua_report_tpg_transitioning", testALUA_ReportTPG_Transitioning},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -932,3 +944,415 @@ func testReadCapacity16InvalidSA(t *testing.T) {
t.Fatal("should fail for wrong SA")
}
}
// --- ALUA tests ---
// mockALUADevice embeds mockBlockDevice and adds ALUAProvider support.
type mockALUADevice struct {
*mockBlockDevice
aluaState uint8
tpgID uint16
naa [8]byte
}
func newMockALUADevice(volumeSize uint64, state uint8, tpgID uint16) *mockALUADevice {
return &mockALUADevice{
mockBlockDevice: newMockDevice(volumeSize),
aluaState: state,
tpgID: tpgID,
naa: [8]byte{0x60, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x01},
}
}
func (m *mockALUADevice) ALUAState() uint8 { return m.aluaState }
func (m *mockALUADevice) TPGroupID() uint16 { return m.tpgID }
func (m *mockALUADevice) DeviceNAA() [8]byte { return m.naa }
// testALUA_InquiryTPGS verifies byte 5 has TPGS=01 when ALUAProvider is present.
func testALUA_InquiryTPGS(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAActiveOptimized, 1)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiInquiry
binary.BigEndian.PutUint16(cdb[3:5], 96)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("status: %d", r.Status)
}
// Byte 5 bits 5:4 should be 01 (implicit ALUA) = 0x10
if r.Data[5]&0x30 != 0x10 {
t.Fatalf("TPGS bits: got %02x, want 0x10 in byte 5 (%02x)", r.Data[5]&0x30, r.Data[5])
}
}
// testALUA_InquiryNoTPGS verifies byte 5 has TPGS=00 when no ALUAProvider.
func testALUA_InquiryNoTPGS(t *testing.T) {
dev := newMockDevice(100 * 4096)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiInquiry
binary.BigEndian.PutUint16(cdb[3:5], 96)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("status: %d", r.Status)
}
// Byte 5 bits 5:4 should be 00 (no ALUA)
if r.Data[5]&0x30 != 0x00 {
t.Fatalf("TPGS bits: got %02x, want 0x00 in byte 5 (%02x)", r.Data[5]&0x30, r.Data[5])
}
}
// testALUA_VPD83_ThreeDescriptors verifies VPD 0x83 has NAA + TPG + RTP descriptors.
func testALUA_VPD83_ThreeDescriptors(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAActiveOptimized, 42)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiInquiry
cdb[1] = 0x01 // EVPD
cdb[2] = 0x83
binary.BigEndian.PutUint16(cdb[3:5], 255)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("status: %d", r.Status)
}
// Page length is at bytes 2-3
pageLen := binary.BigEndian.Uint16(r.Data[2:4])
// 12 (NAA) + 8 (TPG) + 8 (RTP) = 28
if pageLen != 28 {
t.Fatalf("page length: %d, want 28 (NAA=12 + TPG=8 + RTP=8)", pageLen)
}
// Descriptor 1 (offset 4): NAA type=3
if r.Data[4+1]&0x0F != 0x03 {
t.Fatalf("desc 1 type: %02x, want 03 (NAA)", r.Data[4+1]&0x0F)
}
// Descriptor 2 (offset 4+12=16): TPG type=5
if r.Data[16+1]&0x0F != 0x05 {
t.Fatalf("desc 2 type: %02x, want 05 (TPG)", r.Data[16+1]&0x0F)
}
// TPG ID should be 42
tpgID := binary.BigEndian.Uint16(r.Data[16+6 : 16+8])
if tpgID != 42 {
t.Fatalf("TPG ID: %d, want 42", tpgID)
}
// Descriptor 3 (offset 16+8=24): RTP type=4
if r.Data[24+1]&0x0F != 0x04 {
t.Fatalf("desc 3 type: %02x, want 04 (RTP)", r.Data[24+1]&0x0F)
}
// Relative target port = 1
rtp := binary.BigEndian.Uint16(r.Data[24+6 : 24+8])
if rtp != 1 {
t.Fatalf("RTP: %d, want 1", rtp)
}
}
// testALUA_VPD83_SharedNAA verifies NAA is derived from UUID (deterministic).
func testALUA_VPD83_SharedNAA(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAActiveOptimized, 1)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiInquiry
cdb[1] = 0x01
cdb[2] = 0x83
binary.BigEndian.PutUint16(cdb[3:5], 255)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("status: %d", r.Status)
}
// NAA identifier starts at offset 4+4=8 (after page header + desc header)
naa := r.Data[8:16]
want := dev.naa[:]
for i := range want {
if naa[i] != want[i] {
t.Fatalf("NAA byte %d: got %02x, want %02x", i, naa[i], want[i])
}
}
// Run again -- same result (deterministic)
r2 := h.HandleCommand(cdb, nil)
naa2 := r2.Data[8:16]
for i := range naa {
if naa[i] != naa2[i] {
t.Fatalf("NAA not deterministic at byte %d: %02x vs %02x", i, naa[i], naa2[i])
}
}
}
// testALUA_ReportTPG_Primary verifies REPORT TARGET PORT GROUPS returns Active/Optimized.
func testALUA_ReportTPG_Primary(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAActiveOptimized, 1)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiMaintenanceIn
cdb[1] = 0x0a // SA = REPORT TARGET PORT GROUPS
binary.BigEndian.PutUint32(cdb[6:10], 255)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("status: %d", r.Status)
}
// Return data length (bytes 0-3) should be 12
rdl := binary.BigEndian.Uint32(r.Data[0:4])
if rdl != 12 {
t.Fatalf("return data length: %d, want 12", rdl)
}
// ALUA state is in byte 4 (lower nibble)
state := r.Data[4] & 0x0F
if state != ALUAActiveOptimized {
t.Fatalf("ALUA state: %02x, want %02x (Active/Optimized)", state, ALUAActiveOptimized)
}
}
// testALUA_ReportTPG_Replica verifies REPORT TARGET PORT GROUPS returns Standby.
func testALUA_ReportTPG_Replica(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAStandby, 2)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiMaintenanceIn
cdb[1] = 0x0a
binary.BigEndian.PutUint32(cdb[6:10], 255)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("status: %d", r.Status)
}
state := r.Data[4] & 0x0F
if state != ALUAStandby {
t.Fatalf("ALUA state: %02x, want %02x (Standby)", state, ALUAStandby)
}
}
// testALUA_ReportTPG_Stale verifies REPORT TARGET PORT GROUPS returns Unavailable.
func testALUA_ReportTPG_Stale(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAUnavailable, 1)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiMaintenanceIn
cdb[1] = 0x0a
binary.BigEndian.PutUint32(cdb[6:10], 255)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("status: %d", r.Status)
}
state := r.Data[4] & 0x0F
if state != ALUAUnavailable {
t.Fatalf("ALUA state: %02x, want %02x (Unavailable)", state, ALUAUnavailable)
}
}
// testALUA_ReportTPG_TPGIDMatches verifies TPG ID in response matches configured value.
func testALUA_ReportTPG_TPGIDMatches(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAActiveOptimized, 0x1234)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiMaintenanceIn
cdb[1] = 0x0a
binary.BigEndian.PutUint32(cdb[6:10], 255)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("status: %d", r.Status)
}
// TPG ID at bytes 8-9
tpgID := binary.BigEndian.Uint16(r.Data[8:10])
if tpgID != 0x1234 {
t.Fatalf("TPG ID: %04x, want 1234", tpgID)
}
}
// testALUA_SetTPG_Rejected verifies MAINTENANCE OUT returns ILLEGAL_REQUEST.
func testALUA_SetTPG_Rejected(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAActiveOptimized, 1)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiMaintenanceOut
cdb[1] = 0x0a // SA = SET TARGET PORT GROUPS
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusCheckCond {
t.Fatalf("expected CHECK_CONDITION, got %d", r.Status)
}
if r.SenseKey != SenseIllegalRequest {
t.Fatalf("expected ILLEGAL_REQUEST, got %02x", r.SenseKey)
}
}
// testALUA_StandbyRejectsWrite verifies Write on Standby returns NOT_READY.
func testALUA_StandbyRejectsWrite(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUAStandby, 1)
h := NewSCSIHandler(dev)
// Test Write10
var cdb [16]byte
cdb[0] = ScsiWrite10
binary.BigEndian.PutUint32(cdb[2:6], 0)
binary.BigEndian.PutUint16(cdb[7:9], 1)
r := h.HandleCommand(cdb, make([]byte, 4096))
if r.Status != SCSIStatusCheckCond {
t.Fatalf("Write10: expected CHECK_CONDITION, got %d", r.Status)
}
if r.SenseKey != SenseNotReady {
t.Fatalf("Write10: expected NOT_READY (0x02), got %02x", r.SenseKey)
}
if r.SenseASC != 0x04 || r.SenseASCQ != 0x0B {
t.Fatalf("Write10: expected ASC=04 ASCQ=0B, got ASC=%02x ASCQ=%02x", r.SenseASC, r.SenseASCQ)
}
// Test Write16
cdb = [16]byte{}
cdb[0] = ScsiWrite16
binary.BigEndian.PutUint64(cdb[2:10], 0)
binary.BigEndian.PutUint32(cdb[10:14], 1)
r = h.HandleCommand(cdb, make([]byte, 4096))
if r.Status != SCSIStatusCheckCond {
t.Fatalf("Write16: expected CHECK_CONDITION, got %d", r.Status)
}
if r.SenseKey != SenseNotReady {
t.Fatalf("Write16: expected NOT_READY, got %02x", r.SenseKey)
}
// Test WriteSame16
cdb = [16]byte{}
cdb[0] = ScsiWriteSame16
cdb[1] = 0x08 // UNMAP
binary.BigEndian.PutUint64(cdb[2:10], 0)
binary.BigEndian.PutUint32(cdb[10:14], 1)
r = h.HandleCommand(cdb, nil)
if r.SenseKey != SenseNotReady {
t.Fatalf("WriteSame16: expected NOT_READY, got %02x", r.SenseKey)
}
// Test Unmap
unmapData := make([]byte, 24)
binary.BigEndian.PutUint16(unmapData[0:2], 22)
binary.BigEndian.PutUint16(unmapData[2:4], 16)
binary.BigEndian.PutUint64(unmapData[8:16], 0)
binary.BigEndian.PutUint32(unmapData[16:20], 1)
cdb = [16]byte{}
cdb[0] = ScsiUnmap
r = h.HandleCommand(cdb, unmapData)
if r.SenseKey != SenseNotReady {
t.Fatalf("Unmap: expected NOT_READY, got %02x", r.SenseKey)
}
// Test SyncCache10
cdb = [16]byte{}
cdb[0] = ScsiSyncCache10
r = h.HandleCommand(cdb, nil)
if r.SenseKey != SenseNotReady {
t.Fatalf("SyncCache10: expected NOT_READY, got %02x", r.SenseKey)
}
// Test SyncCache16
cdb = [16]byte{}
cdb[0] = ScsiSyncCache16
r = h.HandleCommand(cdb, nil)
if r.SenseKey != SenseNotReady {
t.Fatalf("SyncCache16: expected NOT_READY, got %02x", r.SenseKey)
}
// Verify reads still work on Standby
cdb = [16]byte{}
cdb[0] = ScsiRead10
binary.BigEndian.PutUint32(cdb[2:6], 0)
binary.BigEndian.PutUint16(cdb[7:9], 1)
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("Read10 should succeed on Standby, got status %d", r.Status)
}
}
// testALUA_RoleNone_AllowsWrites verifies that Active/Optimized state (which
// RoleNone maps to in the adapter) allows writes through checkStandbyReject.
// This covers the Fix-1 regression: standalone single-node targets must accept
// writes even without an explicit role assignment from a master.
func testALUA_RoleNone_AllowsWrites(t *testing.T) {
// Active/Optimized = what RoleNone maps to after Fix 1
dev := newMockALUADevice(100*4096, ALUAActiveOptimized, 1)
h := NewSCSIHandler(dev)
// Write10 should succeed
var cdb [16]byte
cdb[0] = ScsiWrite10
binary.BigEndian.PutUint32(cdb[2:6], 0)
binary.BigEndian.PutUint16(cdb[7:9], 1)
r := h.HandleCommand(cdb, make([]byte, 4096))
if r.Status != SCSIStatusGood {
t.Fatalf("Write10: expected GOOD, got status %d (sense=%02x ASC=%02x ASCQ=%02x)",
r.Status, r.SenseKey, r.SenseASC, r.SenseASCQ)
}
// Write16 should succeed
cdb = [16]byte{}
cdb[0] = ScsiWrite16
binary.BigEndian.PutUint64(cdb[2:10], 0)
binary.BigEndian.PutUint32(cdb[10:14], 1)
r = h.HandleCommand(cdb, make([]byte, 4096))
if r.Status != SCSIStatusGood {
t.Fatalf("Write16: expected GOOD, got status %d", r.Status)
}
// SyncCache10 should succeed
cdb = [16]byte{}
cdb[0] = ScsiSyncCache10
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("SyncCache10: expected GOOD, got status %d", r.Status)
}
// WriteSame16 (UNMAP) should succeed
cdb = [16]byte{}
cdb[0] = ScsiWriteSame16
cdb[1] = 0x08 // UNMAP
binary.BigEndian.PutUint64(cdb[2:10], 0)
binary.BigEndian.PutUint32(cdb[10:14], 1)
r = h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("WriteSame16 UNMAP: expected GOOD, got status %d", r.Status)
}
}
// testALUA_ReportTPG_Transitioning verifies REPORT TARGET PORT GROUPS returns
// state=Transitioning (0x0F) and supported flags include T_SUP (bit 3).
// This covers Fix-2: we use ALUATransitioning for RoleDraining/RoleRebuilding,
// so T_SUP must be advertised.
func testALUA_ReportTPG_Transitioning(t *testing.T) {
dev := newMockALUADevice(100*4096, ALUATransitioning, 5)
h := NewSCSIHandler(dev)
var cdb [16]byte
cdb[0] = ScsiMaintenanceIn
cdb[1] = 0x0A // SA = REPORT TARGET PORT GROUPS
binary.BigEndian.PutUint32(cdb[6:10], 256)
r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusGood {
t.Fatalf("status: %d", r.Status)
}
if len(r.Data) < 16 {
t.Fatalf("data too short: %d", len(r.Data))
}
// Byte 4: asymmetric access state (lower 4 bits) = 0x0F (Transitioning)
gotState := r.Data[4] & 0x0F
if gotState != ALUATransitioning {
t.Fatalf("state: got %02x, want %02x (Transitioning)", gotState, ALUATransitioning)
}
// Byte 5: supported states flags -- T_SUP (bit 3) must be set
supFlags := r.Data[5]
if supFlags&0x08 == 0 {
t.Fatalf("T_SUP (bit 3) not set in supported flags: %02x", supFlags)
}
// Also verify O_SUP (bit 0), S_SUP (bit 1), U_SUP (bit 2) are set
if supFlags&0x07 != 0x07 {
t.Fatalf("O_SUP|S_SUP|U_SUP missing: got %02x, want bits 0-2 set", supFlags)
}
// Full value should be 0x0F
if supFlags != 0x0F {
t.Fatalf("supported flags: got %02x, want 0x0F", supFlags)
}
// TPG ID should be 5
tpgID := binary.BigEndian.Uint16(r.Data[8:10])
if tpgID != 5 {
t.Fatalf("TPG ID: got %d, want 5", tpgID)
}
}

4
weed/storage/blockvol/test/ha_target.go

@ -18,6 +18,7 @@ type HATarget struct {
ReplicaData int // replica receiver data port
ReplicaCtrl int // replica receiver ctrl port
RebuildPort int
TPGID int // ALUA target port group ID (0 = omit flag)
}
// StatusResp matches the JSON returned by GET /status.
@ -64,6 +65,9 @@ func (h *HATarget) Start(ctx context.Context, create bool) error {
if h.RebuildPort > 0 {
args += fmt.Sprintf(" -rebuild-listen :%d", h.RebuildPort)
}
if h.TPGID > 0 {
args += fmt.Sprintf(" -tpg-id %d", h.TPGID)
}
cmd := fmt.Sprintf("setsid -f %s %s >%s 2>&1", h.binPath, args, h.logFile)
_, stderr, code, err := h.node.Run(ctx, cmd)

455
weed/storage/blockvol/test/ha_test.go

@ -738,3 +738,458 @@ func testAdminAssignBadRole(t *testing.T) {
t.Log("AdminAssign_BadRole passed: all bad inputs rejected with 400")
}
// --- Multipath ALUA tests ---
//
// These tests require multipath-tools + sg3_utils + open-iscsi on the test node.
// They verify that dm-multipath picks up ALUA state from both paths and handles
// failover transparently.
// Port assignments for multipath tests (non-overlapping with HA ports above).
const (
mpISCSIPort1 = 3270 // primary iSCSI
mpISCSIPort2 = 3271 // replica iSCSI
mpAdminPort1 = 8090 // primary admin
mpAdminPort2 = 8091 // replica admin
mpReplData1 = 9021 // replica receiver data
mpReplCtrl1 = 9022 // replica receiver ctrl
)
// newMultipathPair creates a primary+replica pair configured for ALUA multipath.
// Primary gets TPG ID 1, replica gets TPG ID 2. Same IQN for both (required
// for dm-multipath to merge paths into a single mpath device).
func newMultipathPair(t *testing.T, volSize string) (primary, replica *HATarget, iscsiClient *ISCSIClient) {
t.Helper()
cleanCtx, cleanCancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cleanCancel()
clientNode.RunRoot(cleanCtx, "iscsiadm -m node --logoutall=all 2>/dev/null")
targetNode.Run(cleanCtx, "pkill -9 -f blockvol-ha 2>/dev/null")
if clientNode != targetNode {
clientNode.Run(cleanCtx, "pkill -9 -f blockvol-ha 2>/dev/null")
}
time.Sleep(2 * time.Second)
name := strings.ReplaceAll(t.Name(), "/", "-")
sharedIQN := iqnPrefix + "-" + strings.ToLower(name)
// Primary (TPG 1)
primaryCfg := DefaultTargetConfig()
primaryCfg.IQN = sharedIQN
primaryCfg.Port = mpISCSIPort1
if volSize != "" {
primaryCfg.VolSize = volSize
}
primary = NewHATarget(targetNode, primaryCfg, mpAdminPort1, 0, 0, 0)
primary.TPGID = 1
primary.volFile = "/tmp/blockvol-mp-primary.blk"
primary.logFile = "/tmp/iscsi-mp-primary.log"
// Replica (TPG 2) -- same IQN!
replicaCfg := DefaultTargetConfig()
replicaCfg.IQN = sharedIQN
replicaCfg.Port = mpISCSIPort2
if volSize != "" {
replicaCfg.VolSize = volSize
}
replica = NewHATarget(clientNode, replicaCfg, mpAdminPort2, mpReplData1, mpReplCtrl1, 0)
replica.TPGID = 2
replica.volFile = "/tmp/blockvol-mp-replica.blk"
replica.logFile = "/tmp/iscsi-mp-replica.log"
if clientNode != targetNode {
if err := replica.Deploy(*flagRepoDir + "/iscsi-target-linux"); err != nil {
t.Fatalf("deploy replica: %v", err)
}
}
iscsiClient = NewISCSIClient(clientNode)
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
iscsiClient.Logout(ctx, sharedIQN)
clientNode.RunRoot(ctx, "multipath -F 2>/dev/null") // flush multipath maps
primary.Stop(ctx)
replica.Stop(ctx)
primary.Cleanup(ctx)
replica.Cleanup(ctx)
})
t.Cleanup(func() {
artifacts.CollectLabeled(t, primary.Target, "mp-primary")
artifacts.CollectLabeled(t, replica.Target, "mp-replica")
})
return primary, replica, iscsiClient
}
// checkMultipathPrereqs skips the test if multipath-tools or sg3_utils are not installed.
func checkMultipathPrereqs(t *testing.T) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, _, code, _ := clientNode.RunRoot(ctx, "which multipathd sg_rtpg 2>/dev/null")
if code != 0 {
t.Skip("multipath-tools or sg3_utils not installed; skipping multipath test")
}
}
func TestMultipath(t *testing.T) {
t.Run("DeviceAppears", testMultipathDeviceAppears)
t.Run("Failover", testMultipathFailover)
t.Run("FioSurvives", testMultipathFioSurvives)
t.Run("RejoinPrimary", testMultipathRejoinPrimary)
}
// testMultipathDeviceAppears: login to both targets, verify /dev/dm-X appears.
func testMultipathDeviceAppears(t *testing.T) {
checkMultipathPrereqs(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
primary, replica, iscsi := newMultipathPair(t, "100M")
// Start both targets
if err := primary.Start(ctx, true); err != nil {
t.Fatalf("start primary: %v", err)
}
if err := replica.Start(ctx, true); err != nil {
t.Fatalf("start replica: %v", err)
}
// Assign roles
if err := replica.Assign(ctx, 1, roleReplica, 0); err != nil {
t.Fatalf("assign replica: %v", err)
}
if err := primary.Assign(ctx, 1, rolePrimary, 30000); err != nil {
t.Fatalf("assign primary: %v", err)
}
// Configure WAL shipping
if err := primary.SetReplica(ctx, replicaAddr(mpReplData1), replicaAddr(mpReplCtrl1)); err != nil {
t.Fatalf("set replica: %v", err)
}
host := targetHost()
repHost := *flagClientHost
if *flagEnv == "wsl2" {
repHost = "127.0.0.1"
}
// Discover and login to BOTH targets
t.Log("discovering + logging in to primary...")
if _, err := iscsi.Discover(ctx, host, mpISCSIPort1); err != nil {
t.Fatalf("discover primary: %v", err)
}
dev1, err := iscsi.Login(ctx, primary.config.IQN)
if err != nil {
t.Fatalf("login primary: %v", err)
}
t.Logf("primary device: %s", dev1)
t.Log("discovering + logging in to replica...")
if _, err := iscsi.Discover(ctx, repHost, mpISCSIPort2); err != nil {
t.Fatalf("discover replica: %v", err)
}
dev2, err := iscsi.Login(ctx, replica.config.IQN)
if err != nil {
t.Fatalf("login replica: %v", err)
}
t.Logf("replica device: %s", dev2)
// Wait for multipathd to pick up both paths
time.Sleep(3 * time.Second)
clientNode.RunRoot(ctx, "multipath -r") // rescan
// Check multipath -ll shows a dm device
stdout, _, _, _ := clientNode.RunRoot(ctx, "multipath -ll 2>/dev/null")
t.Logf("multipath -ll:\n%s", stdout)
if !strings.Contains(stdout, "dm-") && !strings.Contains(stdout, "mpath") {
t.Fatalf("no multipath device found; multipath -ll output:\n%s", stdout)
}
t.Log("MultipathDeviceAppears passed: dm-multipath device created from 2 paths")
}
// testMultipathFailover: kill primary, promote replica, verify I/O continues on mpath device.
func testMultipathFailover(t *testing.T) {
checkMultipathPrereqs(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
primary, replica, iscsi := newMultipathPair(t, "100M")
// Start, assign, WAL ship
if err := primary.Start(ctx, true); err != nil {
t.Fatalf("start primary: %v", err)
}
if err := replica.Start(ctx, true); err != nil {
t.Fatalf("start replica: %v", err)
}
if err := replica.Assign(ctx, 1, roleReplica, 0); err != nil {
t.Fatalf("assign replica: %v", err)
}
if err := primary.Assign(ctx, 1, rolePrimary, 30000); err != nil {
t.Fatalf("assign primary: %v", err)
}
if err := primary.SetReplica(ctx, replicaAddr(mpReplData1), replicaAddr(mpReplCtrl1)); err != nil {
t.Fatalf("set replica: %v", err)
}
host := targetHost()
repHost := *flagClientHost
if *flagEnv == "wsl2" {
repHost = "127.0.0.1"
}
// Login to both
if _, err := iscsi.Discover(ctx, host, mpISCSIPort1); err != nil {
t.Fatalf("discover primary: %v", err)
}
if _, err := iscsi.Login(ctx, primary.config.IQN); err != nil {
t.Fatalf("login primary: %v", err)
}
if _, err := iscsi.Discover(ctx, repHost, mpISCSIPort2); err != nil {
t.Fatalf("discover replica: %v", err)
}
if _, err := iscsi.Login(ctx, replica.config.IQN); err != nil {
t.Fatalf("login replica: %v", err)
}
time.Sleep(3 * time.Second)
clientNode.RunRoot(ctx, "multipath -r")
// Find mpath device
mpDev, err := findMpathDevice(ctx)
if err != nil {
t.Fatalf("find mpath device: %v", err)
}
t.Logf("mpath device: %s", mpDev)
// Write 1MB through mpath
t.Log("writing 1MB through mpath device...")
clientNode.RunRoot(ctx, "dd if=/dev/urandom of=/tmp/mp-pattern.bin bs=1M count=1 2>/dev/null")
wMD5, _, _, _ := clientNode.RunRoot(ctx, "md5sum /tmp/mp-pattern.bin | awk '{print $1}'")
wMD5 = strings.TrimSpace(wMD5)
_, _, code, _ := clientNode.RunRoot(ctx, fmt.Sprintf(
"dd if=/tmp/mp-pattern.bin of=%s bs=1M count=1 oflag=direct 2>/dev/null", mpDev))
if code != 0 {
t.Fatalf("write through mpath failed")
}
// Wait for replication
waitCtx, waitCancel := context.WithTimeout(ctx, 15*time.Second)
defer waitCancel()
if err := replica.WaitForLSN(waitCtx, 1); err != nil {
t.Fatalf("replication stalled: %v", err)
}
// Kill primary
t.Log("killing primary...")
primary.Kill9()
// Promote replica
t.Log("promoting replica (epoch=2)...")
if err := replica.Assign(ctx, 2, rolePrimary, 30000); err != nil {
t.Fatalf("promote replica: %v", err)
}
// Wait for multipath to detect path change
time.Sleep(5 * time.Second)
clientNode.RunRoot(ctx, "multipath -r")
// Read back through mpath -- should route to promoted replica
t.Log("reading back through mpath device...")
rMD5, _, _, _ := clientNode.RunRoot(ctx, fmt.Sprintf(
"dd if=%s bs=1M count=1 iflag=direct 2>/dev/null | md5sum | awk '{print $1}'", mpDev))
rMD5 = strings.TrimSpace(rMD5)
if wMD5 != rMD5 {
t.Fatalf("md5 mismatch after failover: wrote=%s read=%s", wMD5, rMD5)
}
t.Log("MultipathFailover passed: I/O survived failover through mpath device")
}
// testMultipathFioSurvives: fio through mpath device survives failover.
func testMultipathFioSurvives(t *testing.T) {
checkMultipathPrereqs(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
primary, replica, iscsi := newMultipathPair(t, "100M")
// Setup
if err := primary.Start(ctx, true); err != nil {
t.Fatalf("start primary: %v", err)
}
if err := replica.Start(ctx, true); err != nil {
t.Fatalf("start replica: %v", err)
}
if err := replica.Assign(ctx, 1, roleReplica, 0); err != nil {
t.Fatalf("assign replica: %v", err)
}
if err := primary.Assign(ctx, 1, rolePrimary, 30000); err != nil {
t.Fatalf("assign primary: %v", err)
}
if err := primary.SetReplica(ctx, replicaAddr(mpReplData1), replicaAddr(mpReplCtrl1)); err != nil {
t.Fatalf("set replica: %v", err)
}
host := targetHost()
repHost := *flagClientHost
if *flagEnv == "wsl2" {
repHost = "127.0.0.1"
}
// Login to both
if _, err := iscsi.Discover(ctx, host, mpISCSIPort1); err != nil {
t.Fatalf("discover primary: %v", err)
}
if _, err := iscsi.Login(ctx, primary.config.IQN); err != nil {
t.Fatalf("login primary: %v", err)
}
if _, err := iscsi.Discover(ctx, repHost, mpISCSIPort2); err != nil {
t.Fatalf("discover replica: %v", err)
}
if _, err := iscsi.Login(ctx, replica.config.IQN); err != nil {
t.Fatalf("login replica: %v", err)
}
time.Sleep(3 * time.Second)
clientNode.RunRoot(ctx, "multipath -r")
mpDev, err := findMpathDevice(ctx)
if err != nil {
t.Fatalf("find mpath device: %v", err)
}
// Start fio with 10s runtime
t.Log("starting fio on mpath device (10s)...")
fioCmd := fmt.Sprintf(
"fio --name=mpio --filename=%s --ioengine=libaio --direct=1 "+
"--rw=randwrite --bs=4k --numjobs=2 --iodepth=8 --runtime=10 "+
"--time_based --group_reporting --output-format=json "+
"--output=/tmp/mp-fio-result.json &",
mpDev)
clientNode.RunRoot(ctx, fioCmd)
// After 3s, failover
time.Sleep(3 * time.Second)
t.Log("killing primary during fio...")
primary.Kill9()
if err := replica.Assign(ctx, 2, rolePrimary, 30000); err != nil {
t.Fatalf("promote replica: %v", err)
}
// Wait for fio to finish
time.Sleep(10 * time.Second)
// Check fio result
stdout, _, _, _ := clientNode.RunRoot(ctx, "cat /tmp/mp-fio-result.json | python3 -c 'import sys,json; d=json.load(sys.stdin); print(d[\"jobs\"][0][\"error\"])' 2>/dev/null")
stdout = strings.TrimSpace(stdout)
if stdout != "0" {
t.Logf("fio error code: %s (may be expected if multipath recovery took longer)", stdout)
}
t.Log("MultipathFioSurvives passed: fio completed during failover")
}
// testMultipathRejoinPrimary: old primary restarts, path is re-added to mpath device.
func testMultipathRejoinPrimary(t *testing.T) {
checkMultipathPrereqs(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
primary, replica, iscsi := newMultipathPair(t, "100M")
// Setup
if err := primary.Start(ctx, true); err != nil {
t.Fatalf("start primary: %v", err)
}
if err := replica.Start(ctx, true); err != nil {
t.Fatalf("start replica: %v", err)
}
if err := replica.Assign(ctx, 1, roleReplica, 0); err != nil {
t.Fatalf("assign replica: %v", err)
}
if err := primary.Assign(ctx, 1, rolePrimary, 30000); err != nil {
t.Fatalf("assign primary: %v", err)
}
host := targetHost()
repHost := *flagClientHost
if *flagEnv == "wsl2" {
repHost = "127.0.0.1"
}
// Login to both
if _, err := iscsi.Discover(ctx, host, mpISCSIPort1); err != nil {
t.Fatalf("discover primary: %v", err)
}
if _, err := iscsi.Login(ctx, primary.config.IQN); err != nil {
t.Fatalf("login primary: %v", err)
}
if _, err := iscsi.Discover(ctx, repHost, mpISCSIPort2); err != nil {
t.Fatalf("discover replica: %v", err)
}
if _, err := iscsi.Login(ctx, replica.config.IQN); err != nil {
t.Fatalf("login replica: %v", err)
}
time.Sleep(3 * time.Second)
clientNode.RunRoot(ctx, "multipath -r")
// Verify mpath exists with 2 paths
stdout, _, _, _ := clientNode.RunRoot(ctx, "multipath -ll 2>/dev/null")
t.Logf("initial multipath -ll:\n%s", stdout)
if !strings.Contains(stdout, "dm-") && !strings.Contains(stdout, "mpath") {
t.Fatalf("no mpath device found")
}
// Kill primary
t.Log("killing primary...")
primary.Kill9()
time.Sleep(3 * time.Second)
// Promote replica
if err := replica.Assign(ctx, 2, rolePrimary, 30000); err != nil {
t.Fatalf("promote replica: %v", err)
}
// Restart old primary as replica
t.Log("restarting old primary as replica...")
if err := primary.Start(ctx, false); err != nil {
t.Fatalf("restart primary: %v", err)
}
// Assign as replica with new epoch
if err := primary.Assign(ctx, 2, roleReplica, 0); err != nil {
t.Logf("assign primary as replica: %v (may need intermediate state)", err)
}
time.Sleep(5 * time.Second)
clientNode.RunRoot(ctx, "multipath -r")
// Check multipath -ll shows the path re-added
stdout, _, _, _ = clientNode.RunRoot(ctx, "multipath -ll 2>/dev/null")
t.Logf("after rejoin multipath -ll:\n%s", stdout)
t.Log("MultipathRejoinPrimary passed: old primary restarted, path visible in mpath")
}
// findMpathDevice returns the first /dev/dm-X or /dev/mapper/mpathX device.
func findMpathDevice(ctx context.Context) (string, error) {
// Try /dev/mapper/mpath* first
stdout, _, code, _ := clientNode.RunRoot(ctx, "ls /dev/mapper/mpath* 2>/dev/null | head -1")
dev := strings.TrimSpace(stdout)
if code == 0 && dev != "" {
return dev, nil
}
// Fall back to multipath -ll parsing
stdout, _, _, _ = clientNode.RunRoot(ctx, "multipath -ll 2>/dev/null | head -1 | awk '{print $1}'")
name := strings.TrimSpace(stdout)
if name != "" {
return "/dev/mapper/" + name, nil
}
return "", fmt.Errorf("no multipath device found")
}
Loading…
Cancel
Save