Browse Source

feat: Phase 4A CP4b-4 Windows iSCSI + instrumentation + QA tests

Windows iSCSI Initiator compatibility:
- Add TargetPortalGroupTag to login response (RFC 7143 S13.9)
- Add REQUEST_SENSE, START_STOP_UNIT, MODE_SELECT(6/10) handlers
- Add PERSISTENT_RESERVE_IN/OUT, MAINTENANCE_IN (REPORT SUPPORTED OPCODES)
- Implement MODE SENSE caching (page 0x08) and control (page 0x0A) pages
- Fix Data-In residual underflow/overflow flags (U/O bits on final PDU)
- Rename ScsiReadCapacity16 -> ScsiServiceActionIn16 for correctness

Instrumentation and tooling:
- Add instrumentedAdapter with periodic PERF stats logging
- Add pprof endpoints on admin HTTP server (/debug/pprof/*)
- Add blockbench CLI tool for standalone block device benchmarking
- Add SCSI CDB debug logging in session dispatch

HA integration fixes:
- Move HA test replica ports to 9011-9014 to avoid conflicts
- Add QA adversarial tests for Phase 4A CP4b-4 (755 lines)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
7940e6b7c9
  1. 189
      weed/storage/blockvol/iscsi/cmd/blockbench/main.go
  2. 11
      weed/storage/blockvol/iscsi/cmd/iscsi-target/admin.go
  3. 133
      weed/storage/blockvol/iscsi/cmd/iscsi-target/main.go
  4. 42
      weed/storage/blockvol/iscsi/dataio.go
  5. 14
      weed/storage/blockvol/iscsi/dataio_test.go
  6. 8
      weed/storage/blockvol/iscsi/login.go
  7. 209
      weed/storage/blockvol/iscsi/scsi.go
  8. 6
      weed/storage/blockvol/iscsi/scsi_test.go
  9. 11
      weed/storage/blockvol/iscsi/session.go
  10. 755
      weed/storage/blockvol/qa_phase4a_cp4b4_test.go
  11. 8
      weed/storage/blockvol/test/ha_test.go

189
weed/storage/blockvol/iscsi/cmd/blockbench/main.go

@ -0,0 +1,189 @@
// blockbench is a simple block device benchmark tool.
// Usage: blockbench [-f path] [-size 256M] [-bs 4K] [-duration 10s] [-pattern seq|rand] [-rw read|write|mixed]
package main
import (
"crypto/rand"
"flag"
"fmt"
"math/big"
"os"
"strings"
"sync/atomic"
"time"
)
func main() {
filePath := flag.String("f", "", "test file path (e.g. F:\\testfile.dat)")
fileSize := flag.String("size", "256M", "test file size")
blockSize := flag.String("bs", "4K", "block size")
duration := flag.Duration("duration", 10*time.Second, "test duration")
pattern := flag.String("pattern", "seq", "access pattern: seq or rand")
mode := flag.String("rw", "write", "read, write, or mixed")
workers := flag.Int("workers", 1, "number of concurrent workers")
direct := flag.Bool("direct", false, "use O_DIRECT (not supported on all OS)")
flag.Parse()
if *filePath == "" {
fmt.Fprintln(os.Stderr, "error: -f is required")
flag.Usage()
os.Exit(1)
}
fSize, err := parseSize(*fileSize)
if err != nil {
fmt.Fprintf(os.Stderr, "bad -size: %v\n", err)
os.Exit(1)
}
bSize, err := parseSize(*blockSize)
if err != nil {
fmt.Fprintf(os.Stderr, "bad -bs: %v\n", err)
os.Exit(1)
}
_ = *direct // placeholder for future O_DIRECT support
fmt.Printf("=== BlockBench ===\n")
fmt.Printf("file=%s size=%s bs=%s duration=%s pattern=%s mode=%s workers=%d\n",
*filePath, *fileSize, *blockSize, *duration, *pattern, *mode, *workers)
// Create/open file
f, err := os.OpenFile(*filePath, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
fmt.Fprintf(os.Stderr, "open: %v\n", err)
os.Exit(1)
}
defer f.Close()
// Pre-allocate
if err := f.Truncate(int64(fSize)); err != nil {
fmt.Fprintf(os.Stderr, "truncate: %v\n", err)
os.Exit(1)
}
totalBlocks := fSize / bSize
// Prepare write buffer
writeBuf := make([]byte, bSize)
rand.Read(writeBuf)
var ops atomic.Int64
var bytes atomic.Int64
var maxLatUs atomic.Int64
isRand := *pattern == "rand"
doWrite := *mode == "write" || *mode == "mixed"
doRead := *mode == "read" || *mode == "mixed"
stop := make(chan struct{})
fmt.Printf("running %s %s for %s...\n", *pattern, *mode, *duration)
start := time.Now()
for w := 0; w < *workers; w++ {
wf, err := os.OpenFile(*filePath, os.O_RDWR, 0644)
if err != nil {
fmt.Fprintf(os.Stderr, "worker open: %v\n", err)
os.Exit(1)
}
defer wf.Close()
go func(wf *os.File, workerID int) {
buf := make([]byte, bSize)
copy(buf, writeBuf)
var seqBlock uint64
readBuf := make([]byte, bSize)
for {
select {
case <-stop:
return
default:
}
var blockNum uint64
if isRand {
n, _ := rand.Int(rand.Reader, big.NewInt(int64(totalBlocks)))
blockNum = n.Uint64()
} else {
blockNum = seqBlock % totalBlocks
seqBlock++
}
off := int64(blockNum * bSize)
opStart := time.Now()
if doWrite && (!doRead || seqBlock%2 == 0) {
_, err = wf.WriteAt(buf, off)
} else if doRead {
_, err = wf.ReadAt(readBuf, off)
}
us := time.Since(opStart).Microseconds()
if err == nil {
ops.Add(1)
bytes.Add(int64(bSize))
for {
old := maxLatUs.Load()
if us <= old || maxLatUs.CompareAndSwap(old, us) {
break
}
}
}
}
}(wf, w)
}
time.Sleep(*duration)
close(stop)
elapsed := time.Since(start)
// Small sleep to let workers finish
time.Sleep(10 * time.Millisecond)
totalOps := ops.Load()
totalBytes := bytes.Load()
maxLat := maxLatUs.Load()
iops := float64(totalOps) / elapsed.Seconds()
mbps := float64(totalBytes) / (1024 * 1024) / elapsed.Seconds()
avgUs := int64(0)
if totalOps > 0 {
avgUs = int64(elapsed.Microseconds()) / totalOps * int64(*workers)
// Rough approximation — actual avg needs per-op tracking
avgUs = int64(float64(elapsed.Microseconds()) / float64(totalOps) * float64(*workers))
}
fmt.Printf("\n=== Results ===\n")
fmt.Printf("ops: %d\n", totalOps)
fmt.Printf("IOPS: %.0f\n", iops)
fmt.Printf("throughput: %.1f MB/s\n", mbps)
fmt.Printf("max_lat: %d us (%.1f ms)\n", maxLat, float64(maxLat)/1000)
fmt.Printf("elapsed: %.1fs\n", elapsed.Seconds())
if totalOps > 0 {
fmt.Printf("avg_lat: ~%d us (%.2f ms) [estimated]\n", avgUs, float64(avgUs)/1000)
}
}
func parseSize(s string) (uint64, error) {
s = strings.TrimSpace(s)
if len(s) == 0 {
return 0, fmt.Errorf("empty size")
}
mul := uint64(1)
switch s[len(s)-1] {
case 'K', 'k':
mul = 1024
s = s[:len(s)-1]
case 'M', 'm':
mul = 1024 * 1024
s = s[:len(s)-1]
case 'G', 'g':
mul = 1024 * 1024 * 1024
s = s[:len(s)-1]
}
var n uint64
_, err := fmt.Sscanf(s, "%d", &n)
return n * mul, err
}

11
weed/storage/blockvol/iscsi/cmd/iscsi-target/admin.go

@ -12,6 +12,7 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
_ "net/http/pprof" // registers /debug/pprof/* handlers on DefaultServeMux
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
@ -189,13 +190,21 @@ func (a *adminServer) handleRebuild(w http.ResponseWriter, r *http.Request) {
// startAdminServer starts the HTTP admin server in a background goroutine. // startAdminServer starts the HTTP admin server in a background goroutine.
// Returns the listener so tests can determine the actual bound port. // Returns the listener so tests can determine the actual bound port.
// Includes /debug/pprof/* endpoints for profiling.
func startAdminServer(addr string, srv *adminServer) (net.Listener, error) { func startAdminServer(addr string, srv *adminServer) (net.Listener, error) {
ln, err := net.Listen("tcp", addr) ln, err := net.Listen("tcp", addr)
if err != nil { if err != nil {
return nil, fmt.Errorf("admin listen %s: %w", addr, err) return nil, fmt.Errorf("admin listen %s: %w", addr, err)
} }
mux := http.NewServeMux()
mux.Handle("/assign", srv)
mux.Handle("/status", srv)
mux.Handle("/replica", srv)
mux.Handle("/rebuild", srv)
// pprof handlers registered on DefaultServeMux by net/http/pprof import.
mux.Handle("/debug/pprof/", http.DefaultServeMux)
go func() { go func() {
if err := http.Serve(ln, srv); err != nil && !isClosedErr(err) {
if err := http.Serve(ln, mux); err != nil && !isClosedErr(err) {
srv.logger.Printf("admin server error: %v", err) srv.logger.Printf("admin server error: %v", err)
} }
}() }()

133
weed/storage/blockvol/iscsi/cmd/iscsi-target/main.go

@ -13,7 +13,9 @@ import (
"os/signal" "os/signal"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"syscall" "syscall"
"time"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/iscsi" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/iscsi"
@ -98,19 +100,33 @@ func main() {
logger.Printf("admin server: %s", ln.Addr()) logger.Printf("admin server: %s", ln.Addr())
} }
// Create adapter
adapter := &blockVolAdapter{vol: vol}
// Create adapter with latency instrumentation
adapter := &instrumentedAdapter{
inner: &blockVolAdapter{vol: vol},
logger: logger,
}
// Create target server // Create target server
config := iscsi.DefaultTargetConfig() config := iscsi.DefaultTargetConfig()
config.TargetName = *iqn config.TargetName = *iqn
config.TargetAlias = "SeaweedFS BlockVol" config.TargetAlias = "SeaweedFS BlockVol"
if *portal != "" {
// Parse portal group tag from "addr:port,tpgt" format
if idx := strings.LastIndex(*portal, ","); idx >= 0 {
if tpgt, err := strconv.Atoi((*portal)[idx+1:]); err == nil {
config.TargetPortalGroupTag = tpgt
}
}
}
ts := iscsi.NewTargetServer(*addr, config, logger) ts := iscsi.NewTargetServer(*addr, config, logger)
if *portal != "" { if *portal != "" {
ts.SetPortalAddr(*portal) ts.SetPortalAddr(*portal)
} }
ts.AddVolume(*iqn, adapter) ts.AddVolume(*iqn, adapter)
// Start periodic performance stats logging (every 5 seconds).
adapter.StartStatsLogger(5 * time.Second)
// Graceful shutdown on signal // Graceful shutdown on signal
sigCh := make(chan os.Signal, 1) sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
@ -148,6 +164,119 @@ func (a *blockVolAdapter) BlockSize() uint32 { return a.vol.Info().BlockSize }
func (a *blockVolAdapter) VolumeSize() uint64 { return a.vol.Info().VolumeSize } func (a *blockVolAdapter) VolumeSize() uint64 { return a.vol.Info().VolumeSize }
func (a *blockVolAdapter) IsHealthy() bool { return a.vol.Info().Healthy } func (a *blockVolAdapter) IsHealthy() bool { return a.vol.Info().Healthy }
// instrumentedAdapter wraps a BlockDevice and logs latency stats periodically.
type instrumentedAdapter struct {
inner iscsi.BlockDevice
logger *log.Logger
// Counters (atomic, lock-free)
writeOps atomic.Int64
readOps atomic.Int64
syncOps atomic.Int64
writeTotUs atomic.Int64 // total microseconds
readTotUs atomic.Int64
syncTotUs atomic.Int64
writeMaxUs atomic.Int64
readMaxUs atomic.Int64
syncMaxUs atomic.Int64
writeBytes atomic.Int64
readBytes atomic.Int64
}
func (a *instrumentedAdapter) ReadAt(lba uint64, length uint32) ([]byte, error) {
start := time.Now()
data, err := a.inner.ReadAt(lba, length)
us := time.Since(start).Microseconds()
a.readOps.Add(1)
a.readTotUs.Add(us)
a.readBytes.Add(int64(length))
atomicMax(&a.readMaxUs, us)
return data, err
}
func (a *instrumentedAdapter) WriteAt(lba uint64, data []byte) error {
start := time.Now()
err := a.inner.WriteAt(lba, data)
us := time.Since(start).Microseconds()
a.writeOps.Add(1)
a.writeTotUs.Add(us)
a.writeBytes.Add(int64(len(data)))
atomicMax(&a.writeMaxUs, us)
return err
}
func (a *instrumentedAdapter) Trim(lba uint64, length uint32) error {
return a.inner.Trim(lba, length)
}
func (a *instrumentedAdapter) SyncCache() error {
start := time.Now()
err := a.inner.SyncCache()
us := time.Since(start).Microseconds()
a.syncOps.Add(1)
a.syncTotUs.Add(us)
atomicMax(&a.syncMaxUs, us)
return err
}
func (a *instrumentedAdapter) BlockSize() uint32 { return a.inner.BlockSize() }
func (a *instrumentedAdapter) VolumeSize() uint64 { return a.inner.VolumeSize() }
func (a *instrumentedAdapter) IsHealthy() bool { return a.inner.IsHealthy() }
// StartStatsLogger runs a goroutine that logs performance stats every interval.
func (a *instrumentedAdapter) StartStatsLogger(interval time.Duration) {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
wr := a.writeOps.Swap(0)
rd := a.readOps.Swap(0)
sy := a.syncOps.Swap(0)
wrUs := a.writeTotUs.Swap(0)
rdUs := a.readTotUs.Swap(0)
syUs := a.syncTotUs.Swap(0)
wrMax := a.writeMaxUs.Swap(0)
rdMax := a.readMaxUs.Swap(0)
syMax := a.syncMaxUs.Swap(0)
wrBytes := a.writeBytes.Swap(0)
rdBytes := a.readBytes.Swap(0)
if wr+rd+sy == 0 {
continue // quiet when idle
}
wrAvg, rdAvg, syAvg := int64(0), int64(0), int64(0)
if wr > 0 {
wrAvg = wrUs / wr
}
if rd > 0 {
rdAvg = rdUs / rd
}
if sy > 0 {
syAvg = syUs / sy
}
a.logger.Printf("PERF[%ds] wr=%d(%.1fMB avg=%dus max=%dus) rd=%d(%.1fMB avg=%dus max=%dus) sync=%d(avg=%dus max=%dus)",
int(interval.Seconds()),
wr, float64(wrBytes)/(1024*1024), wrAvg, wrMax,
rd, float64(rdBytes)/(1024*1024), rdAvg, rdMax,
sy, syAvg, syMax,
)
}
}()
}
func atomicMax(addr *atomic.Int64, val int64) {
for {
old := addr.Load()
if val <= old {
return
}
if addr.CompareAndSwap(old, val) {
return
}
}
}
func parseSize(s string) (uint64, error) { func parseSize(s string) (uint64, error) {
s = strings.TrimSpace(s) s = strings.TrimSpace(s)
if len(s) == 0 { if len(s) == 0 {

42
weed/storage/blockvol/iscsi/dataio.go

@ -27,16 +27,21 @@ func NewDataInWriter(maxSegLen uint32) *DataInWriter {
} }
// WriteDataIn splits data into Data-In PDUs and writes them to w. // WriteDataIn splits data into Data-In PDUs and writes them to w.
// itt is the initiator task tag, ttt is the target transfer tag.
// itt is the initiator task tag, edtl is expected data transfer length.
// statSN is the current StatSN (incremented when S-bit is set). // statSN is the current StatSN (incremented when S-bit is set).
// Returns the number of PDUs written. // Returns the number of PDUs written.
func (d *DataInWriter) WriteDataIn(w io.Writer, data []byte, itt uint32, expCmdSN, maxCmdSN uint32, statSN *uint32) (int, error) {
func (d *DataInWriter) WriteDataIn(w io.Writer, data []byte, itt, edtl uint32, expCmdSN, maxCmdSN uint32, statSN *uint32) (int, error) {
totalLen := uint32(len(data)) totalLen := uint32(len(data))
if totalLen == 0 { if totalLen == 0 {
// Zero-length read -- send single Data-In with S-bit, no data // Zero-length read -- send single Data-In with S-bit, no data
pdu := &PDU{} pdu := &PDU{}
pdu.SetOpcode(OpSCSIDataIn) pdu.SetOpcode(OpSCSIDataIn)
pdu.SetOpSpecific1(FlagF | FlagS) // Final + Status
flags := uint8(FlagF | FlagS)
if edtl > 0 {
flags |= FlagU
pdu.SetResidualCount(edtl)
}
pdu.SetOpSpecific1(flags)
pdu.SetInitiatorTaskTag(itt) pdu.SetInitiatorTaskTag(itt)
pdu.SetTargetTransferTag(0xFFFFFFFF) pdu.SetTargetTransferTag(0xFFFFFFFF)
pdu.SetStatSN(*statSN) pdu.SetStatSN(*statSN)
@ -74,7 +79,15 @@ func (d *DataInWriter) WriteDataIn(w io.Writer, data []byte, itt uint32, expCmdS
pdu.DataSegment = data[offset : offset+segLen] pdu.DataSegment = data[offset : offset+segLen]
if isFinal { if isFinal {
pdu.SetOpSpecific1(FlagF | FlagS) // Final + Status
flags := uint8(FlagF | FlagS)
if totalLen < edtl {
flags |= FlagU
pdu.SetResidualCount(edtl - totalLen)
} else if totalLen > edtl {
flags |= FlagO
pdu.SetResidualCount(totalLen - edtl)
}
pdu.SetOpSpecific1(flags)
pdu.SetStatSN(*statSN) pdu.SetStatSN(*statSN)
*statSN++ *statSN++
pdu.SetSCSIStatus(SCSIStatusGood) pdu.SetSCSIStatus(SCSIStatusGood)
@ -219,14 +232,20 @@ func BuildSCSIResponse(result SCSIResult, itt uint32, expCmdSN, maxCmdSN uint32)
} }
// BuildDataInPDUs splits data into Data-In PDUs and returns them. // BuildDataInPDUs splits data into Data-In PDUs and returns them.
// edtl is the Expected Data Transfer Length from the SCSI command PDU.
// StatSN is NOT set on the final PDU -- the txLoop assigns it. // StatSN is NOT set on the final PDU -- the txLoop assigns it.
// Intermediate PDUs (without S-bit) never carry StatSN. // Intermediate PDUs (without S-bit) never carry StatSN.
func (d *DataInWriter) BuildDataInPDUs(data []byte, itt uint32, expCmdSN, maxCmdSN uint32) []*PDU {
func (d *DataInWriter) BuildDataInPDUs(data []byte, itt, edtl uint32, expCmdSN, maxCmdSN uint32) []*PDU {
totalLen := uint32(len(data)) totalLen := uint32(len(data))
if totalLen == 0 { if totalLen == 0 {
pdu := &PDU{} pdu := &PDU{}
pdu.SetOpcode(OpSCSIDataIn) pdu.SetOpcode(OpSCSIDataIn)
pdu.SetOpSpecific1(FlagF | FlagS)
flags := uint8(FlagF | FlagS)
if edtl > 0 {
flags |= FlagU // underflow: expected data but got none
pdu.SetResidualCount(edtl)
}
pdu.SetOpSpecific1(flags)
pdu.SetInitiatorTaskTag(itt) pdu.SetInitiatorTaskTag(itt)
pdu.SetTargetTransferTag(0xFFFFFFFF) pdu.SetTargetTransferTag(0xFFFFFFFF)
pdu.SetExpCmdSN(expCmdSN) pdu.SetExpCmdSN(expCmdSN)
@ -262,7 +281,16 @@ func (d *DataInWriter) BuildDataInPDUs(data []byte, itt uint32, expCmdSN, maxCmd
pdu.DataSegment = seg pdu.DataSegment = seg
if isFinal { if isFinal {
pdu.SetOpSpecific1(FlagF | FlagS)
flags := uint8(FlagF | FlagS)
// RFC 7143: set underflow/overflow residual on final Data-In with S-bit.
if totalLen < edtl {
flags |= FlagU
pdu.SetResidualCount(edtl - totalLen)
} else if totalLen > edtl {
flags |= FlagO
pdu.SetResidualCount(totalLen - edtl)
}
pdu.SetOpSpecific1(flags)
pdu.SetSCSIStatus(SCSIStatusGood) pdu.SetSCSIStatus(SCSIStatusGood)
} else { } else {
pdu.SetOpSpecific1(0) pdu.SetOpSpecific1(0)

14
weed/storage/blockvol/iscsi/dataio_test.go

@ -40,7 +40,7 @@ func testDataInSinglePDU(t *testing.T) {
data := bytes.Repeat([]byte{0xAA}, 4096) data := bytes.Repeat([]byte{0xAA}, 4096)
statSN := uint32(1) statSN := uint32(1)
n, err := dw.WriteDataIn(w, data, 0x100, 1, 10, &statSN)
n, err := dw.WriteDataIn(w, data, 0x100, uint32(len(data)), 1, 10, &statSN)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -75,7 +75,7 @@ func testDataInMultiPDU(t *testing.T) {
data := bytes.Repeat([]byte{0xBB}, 3000) data := bytes.Repeat([]byte{0xBB}, 3000)
statSN := uint32(1) statSN := uint32(1)
n, err := dw.WriteDataIn(w, data, 0x200, 1, 10, &statSN)
n, err := dw.WriteDataIn(w, data, 0x200, uint32(len(data)), 1, 10, &statSN)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -121,7 +121,7 @@ func testDataInExactBoundary(t *testing.T) {
data := bytes.Repeat([]byte{0xCC}, 2048) // exact 2 PDUs data := bytes.Repeat([]byte{0xCC}, 2048) // exact 2 PDUs
statSN := uint32(1) statSN := uint32(1)
n, err := dw.WriteDataIn(w, data, 0x300, 1, 10, &statSN)
n, err := dw.WriteDataIn(w, data, 0x300, uint32(len(data)), 1, 10, &statSN)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -135,7 +135,7 @@ func testDataInZeroLength(t *testing.T) {
dw := NewDataInWriter(8192) dw := NewDataInWriter(8192)
statSN := uint32(5) statSN := uint32(5)
n, err := dw.WriteDataIn(w, nil, 0x400, 1, 10, &statSN)
n, err := dw.WriteDataIn(w, nil, 0x400, 0, 1, 10, &statSN)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -153,7 +153,7 @@ func testDataInDataSNOrdering(t *testing.T) {
data := bytes.Repeat([]byte{0xDD}, 2048) // 4 PDUs data := bytes.Repeat([]byte{0xDD}, 2048) // 4 PDUs
statSN := uint32(1) statSN := uint32(1)
dw.WriteDataIn(w, data, 0x500, 1, 10, &statSN)
dw.WriteDataIn(w, data, 0x500, uint32(len(data)), 1, 10, &statSN)
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
pdu, _ := ReadPDU(w) pdu, _ := ReadPDU(w)
@ -173,7 +173,7 @@ func testDataInFbitSbit(t *testing.T) {
data := bytes.Repeat([]byte{0xEE}, 2500) data := bytes.Repeat([]byte{0xEE}, 2500)
statSN := uint32(1) statSN := uint32(1)
dw.WriteDataIn(w, data, 0x600, 1, 10, &statSN)
dw.WriteDataIn(w, data, 0x600, uint32(len(data)), 1, 10, &statSN)
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
pdu, _ := ReadPDU(w) pdu, _ := ReadPDU(w)
@ -399,7 +399,7 @@ func testDataInStatSNIncrement(t *testing.T) {
data := bytes.Repeat([]byte{0x00}, 3072) // 3 PDUs data := bytes.Repeat([]byte{0x00}, 3072) // 3 PDUs
statSN := uint32(100) statSN := uint32(100)
dw.WriteDataIn(w, data, 0x700, 1, 10, &statSN)
dw.WriteDataIn(w, data, 0x700, uint32(len(data)), 1, 10, &statSN)
// Only the final PDU has S-bit, so StatSN increments once // Only the final PDU has S-bit, so StatSN increments once
if statSN != 101 { if statSN != 101 {
t.Fatalf("StatSN should be 101, got %d", statSN) t.Fatalf("StatSN should be 101, got %d", statSN)

8
weed/storage/blockvol/iscsi/login.go

@ -58,6 +58,7 @@ const (
type TargetConfig struct { type TargetConfig struct {
TargetName string TargetName string
TargetAlias string TargetAlias string
TargetPortalGroupTag int // TPGT (1-65535); 0 = use default 1
MaxRecvDataSegmentLength int MaxRecvDataSegmentLength int
MaxBurstLength int MaxBurstLength int
FirstBurstLength int FirstBurstLength int
@ -272,6 +273,13 @@ func (ln *LoginNegotiator) HandleLoginPDU(req *PDU, resolver TargetResolver) *PD
} }
resp.SetTSIH(ln.tsih) resp.SetTSIH(ln.tsih)
// RFC 7143 Section 13.9: All Login Responses MUST carry TargetPortalGroupTag.
tpgt := ln.config.TargetPortalGroupTag
if tpgt <= 0 {
tpgt = 1
}
respParams.Set("TargetPortalGroupTag", strconv.Itoa(tpgt))
// Encode response params // Encode response params
if respParams.Len() > 0 { if respParams.Len() > 0 {
resp.DataSegment = respParams.Encode() resp.DataSegment = respParams.Encode()

209
weed/storage/blockvol/iscsi/scsi.go

@ -6,21 +6,28 @@ import (
// SCSI opcode constants (SPC-5 / SBC-4) // SCSI opcode constants (SPC-5 / SBC-4)
const ( const (
ScsiTestUnitReady uint8 = 0x00
ScsiInquiry uint8 = 0x12
ScsiModeSense6 uint8 = 0x1a
ScsiModeSense10 uint8 = 0x5a
ScsiReadCapacity10 uint8 = 0x25
ScsiRead10 uint8 = 0x28
ScsiWrite10 uint8 = 0x2a
ScsiSyncCache10 uint8 = 0x35
ScsiUnmap uint8 = 0x42
ScsiReportLuns uint8 = 0xa0
ScsiRead16 uint8 = 0x88
ScsiWrite16 uint8 = 0x8a
ScsiReadCapacity16 uint8 = 0x9e // SERVICE ACTION IN (16), SA=0x10
ScsiSyncCache16 uint8 = 0x91
ScsiWriteSame16 uint8 = 0x93
ScsiTestUnitReady uint8 = 0x00
ScsiRequestSense uint8 = 0x03
ScsiInquiry uint8 = 0x12
ScsiModeSelect6 uint8 = 0x15
ScsiModeSense6 uint8 = 0x1a
ScsiStartStopUnit uint8 = 0x1b
ScsiReadCapacity10 uint8 = 0x25
ScsiRead10 uint8 = 0x28
ScsiWrite10 uint8 = 0x2a
ScsiSyncCache10 uint8 = 0x35
ScsiUnmap uint8 = 0x42
ScsiModeSelect10 uint8 = 0x55
ScsiModeSense10 uint8 = 0x5a
ScsiPersistReserveIn uint8 = 0x5e
ScsiPersistReserveOut uint8 = 0x5f
ScsiRead16 uint8 = 0x88
ScsiWrite16 uint8 = 0x8a
ScsiSyncCache16 uint8 = 0x91
ScsiWriteSame16 uint8 = 0x93
ScsiServiceActionIn16 uint8 = 0x9e // READ CAPACITY(16), etc.
ScsiReportLuns uint8 = 0xa0
ScsiMaintenanceIn uint8 = 0xa3 // REPORT SUPPORTED OPCODES, etc.
) )
// Service action for READ CAPACITY (16) // Service action for READ CAPACITY (16)
@ -93,38 +100,52 @@ func (h *SCSIHandler) HandleCommand(cdb [16]byte, dataOut []byte) SCSIResult {
switch opcode { switch opcode {
case ScsiTestUnitReady: case ScsiTestUnitReady:
return h.testUnitReady() return h.testUnitReady()
case ScsiRequestSense:
return h.requestSense(cdb)
case ScsiInquiry: case ScsiInquiry:
return h.inquiry(cdb) return h.inquiry(cdb)
case ScsiModeSelect6:
return h.modeSelect6(cdb, dataOut)
case ScsiModeSense6: case ScsiModeSense6:
return h.modeSense6(cdb) return h.modeSense6(cdb)
case ScsiModeSense10:
return h.modeSense10(cdb)
case ScsiStartStopUnit:
return h.startStopUnit(cdb)
case ScsiReadCapacity10: case ScsiReadCapacity10:
return h.readCapacity10() return h.readCapacity10()
case ScsiReadCapacity16:
sa := cdb[1] & 0x1f
if sa == ScsiSAReadCapacity16 {
return h.readCapacity16(cdb)
}
return illegalRequest(ASCInvalidOpcode, ASCQLuk)
case ScsiReportLuns:
return h.reportLuns(cdb)
case ScsiRead10: case ScsiRead10:
return h.read10(cdb) return h.read10(cdb)
case ScsiRead16:
return h.read16(cdb)
case ScsiWrite10: case ScsiWrite10:
return h.write10(cdb, dataOut) return h.write10(cdb, dataOut)
case ScsiWrite16:
return h.write16(cdb, dataOut)
case ScsiSyncCache10: case ScsiSyncCache10:
return h.syncCache() return h.syncCache()
case ScsiSyncCache16:
return h.syncCache()
case ScsiUnmap: case ScsiUnmap:
return h.unmap(cdb, dataOut) return h.unmap(cdb, dataOut)
case ScsiModeSelect10:
return h.modeSelect10(cdb, dataOut)
case ScsiModeSense10:
return h.modeSense10(cdb)
case ScsiPersistReserveIn:
return h.persistReserveIn(cdb)
case ScsiPersistReserveOut:
return h.persistReserveOut(cdb, dataOut)
case ScsiRead16:
return h.read16(cdb)
case ScsiWrite16:
return h.write16(cdb, dataOut)
case ScsiSyncCache16:
return h.syncCache()
case ScsiWriteSame16: case ScsiWriteSame16:
return h.writeSame16(cdb, dataOut) return h.writeSame16(cdb, dataOut)
case ScsiServiceActionIn16:
sa := cdb[1] & 0x1f
if sa == ScsiSAReadCapacity16 {
return h.readCapacity16(cdb)
}
return illegalRequest(ASCInvalidOpcode, ASCQLuk)
case ScsiReportLuns:
return h.reportLuns(cdb)
case ScsiMaintenanceIn:
return h.maintenanceIn(cdb)
default: default:
return illegalRequest(ASCInvalidOpcode, ASCQLuk) return illegalRequest(ASCInvalidOpcode, ASCQLuk)
} }
@ -144,6 +165,64 @@ func (h *SCSIHandler) testUnitReady() SCSIResult {
return SCSIResult{Status: SCSIStatusGood} return SCSIResult{Status: SCSIStatusGood}
} }
func (h *SCSIHandler) requestSense(cdb [16]byte) SCSIResult {
allocLen := cdb[4]
if allocLen == 0 {
allocLen = 18
}
// Return fixed-format sense data with NO SENSE (no pending error).
data := BuildSenseData(SenseNoSense, 0x00, 0x00)
if int(allocLen) < len(data) {
data = data[:allocLen]
}
return SCSIResult{Status: SCSIStatusGood, Data: data}
}
// startStopUnit handles START STOP UNIT (0x1B).
// Windows sends this during disk init. We accept and ignore.
func (h *SCSIHandler) startStopUnit(cdb [16]byte) SCSIResult {
return SCSIResult{Status: SCSIStatusGood}
}
// modeSelect6 handles MODE SELECT(6) (0x15).
// Windows sends this to set caching mode pages. We accept and ignore the data.
func (h *SCSIHandler) modeSelect6(cdb [16]byte, dataOut []byte) SCSIResult {
return SCSIResult{Status: SCSIStatusGood}
}
// modeSelect10 handles MODE SELECT(10) (0x55).
// Same as modeSelect6 but with 10-byte CDB.
func (h *SCSIHandler) modeSelect10(cdb [16]byte, dataOut []byte) SCSIResult {
return SCSIResult{Status: SCSIStatusGood}
}
// persistReserveIn handles PERSISTENT RESERVE IN (0x5E).
// Windows Cluster and MPIO use this. We don't support reservations,
// so return ILLEGAL_REQUEST with a specific ASC that tells Windows
// "not supported" rather than "broken device".
func (h *SCSIHandler) persistReserveIn(cdb [16]byte) SCSIResult {
return illegalRequest(ASCInvalidOpcode, ASCQLuk)
}
// persistReserveOut handles PERSISTENT RESERVE OUT (0x5F).
func (h *SCSIHandler) persistReserveOut(cdb [16]byte, dataOut []byte) SCSIResult {
return illegalRequest(ASCInvalidOpcode, ASCQLuk)
}
// maintenanceIn handles MAINTENANCE IN (0xA3).
// Service action 0x0C = REPORT SUPPORTED OPERATION CODES.
// Windows sends this to discover which commands we support.
func (h *SCSIHandler) maintenanceIn(cdb [16]byte) SCSIResult {
sa := cdb[1] & 0x1f
if sa == 0x0c {
// REPORT SUPPORTED OPERATION CODES -- return empty (not supported).
// This tells Windows to probe commands individually.
return illegalRequest(ASCInvalidOpcode, ASCQLuk)
}
return illegalRequest(ASCInvalidOpcode, ASCQLuk)
}
func (h *SCSIHandler) inquiry(cdb [16]byte) SCSIResult { func (h *SCSIHandler) inquiry(cdb [16]byte) SCSIResult {
evpd := cdb[1] & 0x01 evpd := cdb[1] & 0x01
pageCode := cdb[2] pageCode := cdb[2]
@ -353,17 +432,22 @@ func (h *SCSIHandler) readCapacity16(cdb [16]byte) SCSIResult {
} }
func (h *SCSIHandler) modeSense6(cdb [16]byte) SCSIResult { func (h *SCSIHandler) modeSense6(cdb [16]byte) SCSIResult {
// Minimal MODE SENSE(6) response -- no mode pages
allocLen := cdb[4] allocLen := cdb[4]
if allocLen == 0 { if allocLen == 0 {
allocLen = 4 allocLen = 4
} }
pageCode := cdb[2] & 0x3f
data := make([]byte, 4)
data[0] = 3 // Mode data length (3 bytes follow)
data[1] = 0x00 // Medium type: default
data[2] = 0x00 // Device-specific parameter (no write protect)
data[3] = 0x00 // Block descriptor length = 0
// Build mode page data based on requested page.
pages := h.buildModePages(pageCode)
// MODE SENSE(6) header = 4 bytes + pages
data := make([]byte, 4+len(pages))
data[0] = byte(3 + len(pages)) // Mode data length (everything after byte 0)
data[1] = 0x00 // Medium type: default
data[2] = 0x00 // Device-specific parameter (no write protect)
data[3] = 0x00 // Block descriptor length = 0
copy(data[4:], pages)
if int(allocLen) < len(data) { if int(allocLen) < len(data) {
data = data[:allocLen] data = data[:allocLen]
@ -372,21 +456,24 @@ func (h *SCSIHandler) modeSense6(cdb [16]byte) SCSIResult {
} }
func (h *SCSIHandler) modeSense10(cdb [16]byte) SCSIResult { func (h *SCSIHandler) modeSense10(cdb [16]byte) SCSIResult {
// MODE SENSE(10) -- 8-byte header, no mode pages
allocLen := binary.BigEndian.Uint16(cdb[7:9]) allocLen := binary.BigEndian.Uint16(cdb[7:9])
if allocLen == 0 { if allocLen == 0 {
allocLen = 8 allocLen = 8
} }
pageCode := cdb[2] & 0x3f
data := make([]byte, 8)
// Mode data length = 6 (8-byte header minus the 2-byte length field)
binary.BigEndian.PutUint16(data[0:2], 6)
// Build mode page data based on requested page.
pages := h.buildModePages(pageCode)
// MODE SENSE(10) header = 8 bytes + pages
data := make([]byte, 8+len(pages))
binary.BigEndian.PutUint16(data[0:2], uint16(6+len(pages))) // Mode data length
data[2] = 0x00 // Medium type: default data[2] = 0x00 // Medium type: default
data[3] = 0x00 // Device-specific parameter (no write protect) data[3] = 0x00 // Device-specific parameter (no write protect)
data[4] = 0x00 // Reserved (LONGLBA=0) data[4] = 0x00 // Reserved (LONGLBA=0)
data[5] = 0x00 // Reserved data[5] = 0x00 // Reserved
// Block descriptor length = 0 (bytes 6-7)
binary.BigEndian.PutUint16(data[6:8], 0)
binary.BigEndian.PutUint16(data[6:8], 0) // Block descriptor length = 0
copy(data[8:], pages)
if int(allocLen) < len(data) { if int(allocLen) < len(data) {
data = data[:allocLen] data = data[:allocLen]
@ -394,6 +481,42 @@ func (h *SCSIHandler) modeSense10(cdb [16]byte) SCSIResult {
return SCSIResult{Status: SCSIStatusGood, Data: data} return SCSIResult{Status: SCSIStatusGood, Data: data}
} }
// buildModePages returns the mode page data for the requested page code.
// Page 0x3F = return all pages.
func (h *SCSIHandler) buildModePages(pageCode uint8) []byte {
switch pageCode {
case 0x08: // Caching mode page
return h.modePage08()
case 0x0a: // Control mode page
return h.modePage0A()
case 0x3f: // All pages
var pages []byte
pages = append(pages, h.modePage08()...)
pages = append(pages, h.modePage0A()...)
return pages
default:
return nil
}
}
// modePage08 returns the Caching mode page (SBC-4, Section 7.5.5).
// Tells Windows we support write caching.
func (h *SCSIHandler) modePage08() []byte {
page := make([]byte, 20)
page[0] = 0x08 // Page code
page[1] = 18 // Page length (20 - 2)
page[2] = 0x04 // WCE=1 (Write Cache Enable), RCD=0 (Read Cache Disable)
return page
}
// modePage0A returns the Control mode page (SPC-5, Section 8.4.8).
func (h *SCSIHandler) modePage0A() []byte {
page := make([]byte, 12)
page[0] = 0x0a // Page code
page[1] = 10 // Page length (12 - 2)
return page
}
func (h *SCSIHandler) reportLuns(cdb [16]byte) SCSIResult { func (h *SCSIHandler) reportLuns(cdb [16]byte) SCSIResult {
allocLen := binary.BigEndian.Uint32(cdb[6:10]) allocLen := binary.BigEndian.Uint32(cdb[6:10])
if allocLen < 16 { if allocLen < 16 {

6
weed/storage/blockvol/iscsi/scsi_test.go

@ -311,7 +311,7 @@ func testReadCapacity16(t *testing.T) {
dev := newMockDevice(3 * 1024 * 1024 * 1024 * 1024) // 3 TB dev := newMockDevice(3 * 1024 * 1024 * 1024 * 1024) // 3 TB
h := NewSCSIHandler(dev) h := NewSCSIHandler(dev)
var cdb [16]byte var cdb [16]byte
cdb[0] = ScsiReadCapacity16
cdb[0] = ScsiServiceActionIn16
cdb[1] = ScsiSAReadCapacity16 cdb[1] = ScsiSAReadCapacity16
binary.BigEndian.PutUint32(cdb[10:14], 32) binary.BigEndian.PutUint32(cdb[10:14], 32)
r := h.HandleCommand(cdb, nil) r := h.HandleCommand(cdb, nil)
@ -329,7 +329,7 @@ func testReadCapacity16LBPME(t *testing.T) {
dev := newMockDevice(100 * 4096) dev := newMockDevice(100 * 4096)
h := NewSCSIHandler(dev) h := NewSCSIHandler(dev)
var cdb [16]byte var cdb [16]byte
cdb[0] = ScsiReadCapacity16
cdb[0] = ScsiServiceActionIn16
cdb[1] = ScsiSAReadCapacity16 cdb[1] = ScsiSAReadCapacity16
binary.BigEndian.PutUint32(cdb[10:14], 32) binary.BigEndian.PutUint32(cdb[10:14], 32)
r := h.HandleCommand(cdb, nil) r := h.HandleCommand(cdb, nil)
@ -925,7 +925,7 @@ func testReadCapacity16InvalidSA(t *testing.T) {
dev := newMockDevice(100 * 4096) dev := newMockDevice(100 * 4096)
h := NewSCSIHandler(dev) h := NewSCSIHandler(dev)
var cdb [16]byte var cdb [16]byte
cdb[0] = ScsiReadCapacity16
cdb[0] = ScsiServiceActionIn16
cdb[1] = 0x05 // wrong service action cdb[1] = 0x05 // wrong service action
r := h.HandleCommand(cdb, nil) r := h.HandleCommand(cdb, nil)
if r.Status != SCSIStatusCheckCond { if r.Status != SCSIStatusCheckCond {

11
weed/storage/blockvol/iscsi/session.go

@ -269,6 +269,10 @@ func (s *Session) close() {
func (s *Session) dispatch(pdu *PDU) error { func (s *Session) dispatch(pdu *PDU) error {
op := pdu.Opcode() op := pdu.Opcode()
if op != OpSCSICmd && op != OpSCSIDataOut {
s.logger.Printf("dispatch: opcode=%s(0x%02x)", OpcodeName(op), op)
}
switch op { switch op {
case OpLoginReq: case OpLoginReq:
return s.handleLogin(pdu) return s.handleLogin(pdu)
@ -337,6 +341,8 @@ func (s *Session) handleText(pdu *PDU) error {
targets = lister.ListTargets() targets = lister.ListTargets()
} }
s.logger.Printf("text request: params=%q targets=%d", string(pdu.DataSegment), len(targets))
resp := HandleTextRequest(pdu, targets) resp := HandleTextRequest(pdu, targets)
// ExpCmdSN/MaxCmdSN are set by txLoop via pduNeedsStatSN. // ExpCmdSN/MaxCmdSN are set by txLoop via pduNeedsStatSN.
s.enqueue(resp) s.enqueue(resp)
@ -356,6 +362,9 @@ func (s *Session) handleSCSICmd(pdu *PDU) error {
itt := pdu.InitiatorTaskTag() itt := pdu.InitiatorTaskTag()
flags := pdu.OpSpecific1() flags := pdu.OpSpecific1()
s.logger.Printf("SCSI CDB: opcode=0x%02x cdb=%x itt=0x%08x flags=0x%02x edtl=%d",
cdb[0], cdb[:], itt, flags, pdu.ExpectedDataTransferLength())
// CmdSN validation for non-immediate commands // CmdSN validation for non-immediate commands
if !pdu.Immediate() { if !pdu.Immediate() {
cmdSN := pdu.CmdSN() cmdSN := pdu.CmdSN()
@ -400,7 +409,7 @@ func (s *Session) handleSCSICmd(pdu *PDU) error {
if isRead && result.Status == SCSIStatusGood && len(result.Data) > 0 { if isRead && result.Status == SCSIStatusGood && len(result.Data) > 0 {
// Build Data-In PDUs and enqueue them all. // Build Data-In PDUs and enqueue them all.
pdus := s.dataInWriter.BuildDataInPDUs(result.Data, itt, s.expCmdSN.Load(), s.maxCmdSN.Load())
pdus := s.dataInWriter.BuildDataInPDUs(result.Data, itt, expectedLen, s.expCmdSN.Load(), s.maxCmdSN.Load())
for _, p := range pdus { for _, p := range pdus {
s.enqueue(p) s.enqueue(p)
} }

755
weed/storage/blockvol/qa_phase4a_cp4b4_test.go

@ -0,0 +1,755 @@
package blockvol
import (
"fmt"
"net"
"path/filepath"
"sync"
"testing"
"time"
)
// TestQAPhase4ACP4b4 tests Phase 4A CP4b-4 adversarial scenarios:
// partial channel failures, protocol edge cases, rebuild scenarios,
// edge interactions, and error injection.
func TestQAPhase4ACP4b4(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
// Group A: Partial Channel Failures
{name: "barrier_stale_lsn_after_gap", run: testQA4b4BarrierStaleLSNAfterGap},
{name: "ship_data_ok_ctrl_down", run: testQA4b4ShipDataOkCtrlDown},
{name: "ship_ctrl_ok_data_down", run: testQA4b4ShipCtrlOkDataDown},
// Group B: Protocol Edge Cases
{name: "ship_large_entry_near_max", run: testQA4b4ShipLargeEntryNearMax},
{name: "ship_entries_across_wal_wrap", run: testQA4b4ShipEntriesAcrossWALWrap},
// Group C: Rebuild Scenarios
{name: "rebuild_catchup_concurrent_writes", run: testQA4b4RebuildCatchupConcurrentWrites},
{name: "rebuild_full_extent_midcopy_writes", run: testQA4b4RebuildFullExtentMidcopyWrites},
{name: "rebuild_interrupted_retry", run: testQA4b4RebuildInterruptedRetry},
// Group D: Edge Interactions
{name: "read_during_wal_reclaim", run: testQA4b4ReadDuringWALReclaim},
{name: "trim_blocks_survive_rebuild", run: testQA4b4TrimBlocksSurviveRebuild},
{name: "demote_while_wal_full", run: testQA4b4DemoteWhileWALFull},
// Group E: Error Injection
{name: "rebuild_server_epoch_mismatch", run: testQA4b4RebuildServerEpochMismatch},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.run(t)
})
}
}
// ========== Group A: Partial Channel Failures ==========
// A1: Ship LSN 1,2,4 (skip 3) to replica receiver, then barrier(4).
// Barrier must time out because receiver blocks waiting for contiguous LSN 3.
func testQA4b4BarrierStaleLSNAfterGap(t *testing.T) {
dir := t.TempDir()
cfg := DefaultConfig()
cfg.FlushInterval = 5 * time.Millisecond
vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{
VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024,
}, cfg)
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
defer vol.Close()
if err := vol.SetEpoch(1); err != nil {
t.Fatalf("SetEpoch: %v", err)
}
recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatalf("NewReplicaReceiver: %v", err)
}
recv.barrierTimeout = 200 * time.Millisecond // fast timeout for test
recv.Serve()
defer recv.Stop()
dataConn, err := net.Dial("tcp", recv.DataAddr())
if err != nil {
t.Fatalf("dial data: %v", err)
}
defer dataConn.Close()
// Send LSN 1, 2, 4 (skip 3).
for _, lsn := range []uint64{1, 2, 4} {
entry := &WALEntry{LSN: lsn, Epoch: 1, Type: EntryTypeWrite, LBA: lsn - 1, Length: 4096, Data: makeBlock(byte('A' + lsn))}
encoded, _ := entry.Encode()
if err := WriteFrame(dataConn, MsgWALEntry, encoded); err != nil {
t.Fatalf("send LSN=%d: %v", lsn, err)
}
}
time.Sleep(50 * time.Millisecond)
// receivedLSN should be 2 (gap at 3 blocks further progress).
if got := recv.ReceivedLSN(); got != 2 {
t.Fatalf("ReceivedLSN = %d after gap, want 2", got)
}
// Barrier(4) must time out since LSN 3 is missing.
ctrlConn, err := net.Dial("tcp", recv.CtrlAddr())
if err != nil {
t.Fatalf("dial ctrl: %v", err)
}
defer ctrlConn.Close()
req := EncodeBarrierRequest(BarrierRequest{LSN: 4, Epoch: 1})
if err := WriteFrame(ctrlConn, MsgBarrierReq, req); err != nil {
t.Fatalf("send barrier: %v", err)
}
ctrlConn.SetReadDeadline(time.Now().Add(3 * time.Second))
_, payload, err := ReadFrame(ctrlConn)
if err != nil {
t.Fatalf("read barrier response: %v", err)
}
if payload[0] == BarrierOK {
t.Error("barrier returned OK despite gap at LSN 3, expected timeout")
}
// receivedLSN should still be 2.
if got := recv.ReceivedLSN(); got != 2 {
t.Errorf("ReceivedLSN = %d after barrier timeout, want 2", got)
}
}
// A2: Data channel works, ctrl listener closed before Barrier.
// Barrier must return error. Data entries still applied.
func testQA4b4ShipDataOkCtrlDown(t *testing.T) {
primary, replica := createReplicaPair(t)
defer primary.Close()
defer replica.Close()
// Write data through primary -> shipped to replica via data channel.
for i := 0; i < 5; i++ {
if err := primary.WriteLBA(uint64(i), makeBlock(byte('A'+i))); err != nil {
t.Fatalf("WriteLBA(%d): %v", i, err)
}
}
time.Sleep(50 * time.Millisecond)
// Verify data entries were applied to replica.
replicaLSN := replica.replRecv.ReceivedLSN()
if replicaLSN < 5 {
t.Fatalf("replica receivedLSN = %d, want >= 5", replicaLSN)
}
// Kill ctrl listener to make barrier fail.
replica.replRecv.ctrlListener.Close()
// Force primary shipper to reconnect ctrl.
primary.shipper.ctrlMu.Lock()
if primary.shipper.ctrlConn != nil {
primary.shipper.ctrlConn.Close()
primary.shipper.ctrlConn = nil
}
primary.shipper.ctrlMu.Unlock()
// Barrier must fail (ctrl down).
err := primary.shipper.Barrier(replicaLSN)
if err == nil {
t.Error("Barrier should fail when ctrl listener is closed")
}
// Data entries should still be applied (receivedLSN unchanged or higher).
if got := replica.replRecv.ReceivedLSN(); got < replicaLSN {
t.Errorf("receivedLSN regressed from %d to %d after ctrl failure", replicaLSN, got)
}
}
// A3: Ctrl is up, data listener closed. Ship must fail, barrier may timeout.
func testQA4b4ShipCtrlOkDataDown(t *testing.T) {
dir := t.TempDir()
cfg := DefaultConfig()
cfg.FlushInterval = 5 * time.Millisecond
vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{
VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024,
}, cfg)
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
defer vol.Close()
if err := vol.SetEpoch(1); err != nil {
t.Fatalf("SetEpoch: %v", err)
}
recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatalf("NewReplicaReceiver: %v", err)
}
recv.barrierTimeout = 200 * time.Millisecond
recv.Serve()
defer recv.Stop()
ctrlAddr := recv.CtrlAddr()
// Close data listener -- data conn will fail.
recv.dataListener.Close()
// Create shipper pointed at closed data port, working ctrl port.
shipper := NewWALShipper(recv.DataAddr(), ctrlAddr, func() uint64 { return 1 })
defer shipper.Stop()
// Ship should fail (can't connect data), shipper degrades.
entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('X')}
_ = shipper.Ship(entry)
// receivedLSN should stay 0.
if got := recv.ReceivedLSN(); got != 0 {
t.Errorf("ReceivedLSN = %d, want 0 (data channel was down)", got)
}
}
// ========== Group B: Protocol Edge Cases ==========
// B1: Ship a large WAL entry through the frame codec and verify it round-trips.
// Uses WriteFrame/ReadFrame directly to test near-max frame payload.
// Then ships a multi-block entry through a real replica receiver.
func testQA4b4ShipLargeEntryNearMax(t *testing.T) {
// Part 1: Frame codec round-trip with a large payload (1MB).
const largeSize = 1 * 1024 * 1024
largePayload := make([]byte, largeSize)
for i := range largePayload {
largePayload[i] = byte('L')
}
server, client := net.Pipe()
defer server.Close()
defer client.Close()
writeDone := make(chan error, 1)
go func() {
writeDone <- WriteFrame(client, MsgWALEntry, largePayload)
}()
msgType, payload, err := ReadFrame(server)
if err != nil {
t.Fatalf("ReadFrame large: %v", err)
}
if msgType != MsgWALEntry {
t.Errorf("msgType = 0x%02x, want MsgWALEntry", msgType)
}
if len(payload) != largeSize {
t.Errorf("payload len = %d, want %d", len(payload), largeSize)
}
if payload[0] != 'L' || payload[largeSize-1] != 'L' {
t.Error("payload data corrupted")
}
if err := <-writeDone; err != nil {
t.Fatalf("WriteFrame: %v", err)
}
// Part 2: Ship a multi-block entry (8 blocks = 32KB) through a real receiver.
dir := t.TempDir()
cfg := DefaultConfig()
cfg.FlushInterval = 5 * time.Millisecond
vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{
VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 512 * 1024,
}, cfg)
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
defer vol.Close()
if err := vol.SetEpoch(1); err != nil {
t.Fatalf("SetEpoch: %v", err)
}
recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatalf("NewReplicaReceiver: %v", err)
}
recv.Serve()
defer recv.Stop()
dataConn, err := net.Dial("tcp", recv.DataAddr())
if err != nil {
t.Fatalf("dial data: %v", err)
}
defer dataConn.Close()
// 8 blocks = 32KB data.
const blockCount = 8
bigData := make([]byte, blockCount*4096)
for i := range bigData {
bigData[i] = byte('B')
}
entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: uint32(len(bigData)), Data: bigData}
encoded, err := entry.Encode()
if err != nil {
t.Fatalf("Encode: %v", err)
}
if err := WriteFrame(dataConn, MsgWALEntry, encoded); err != nil {
t.Fatalf("WriteFrame: %v", err)
}
time.Sleep(100 * time.Millisecond)
if got := recv.ReceivedLSN(); got != 1 {
t.Errorf("ReceivedLSN = %d, want 1 (multi-block entry should be applied)", got)
}
}
// B2: Write enough entries to force WAL wrap, ship all, verify replica has data.
func testQA4b4ShipEntriesAcrossWALWrap(t *testing.T) {
primary, replica := createReplicaPair(t)
defer primary.Close()
defer replica.Close()
// createReplicaPair uses WALSize=512KB, VolumeSize=1MB.
// Each write is 4KB WAL entry + header. Fill enough to wrap.
// ~512KB WAL / ~4KB per entry ≈ 128 entries before wrap, but flusher
// reclaims space so we can write more.
const numWrites = 50
for i := 0; i < numWrites; i++ {
if err := primary.WriteLBA(uint64(i%256), makeBlock(byte('W'+i%26))); err != nil {
// WAL full is acceptable if flusher can't keep up.
t.Logf("WriteLBA(%d): %v (expected if WAL fills)", i, err)
break
}
}
// SyncCache to push barrier.
if err := primary.SyncCache(); err != nil {
// Degraded replica is ok -- we test that entries shipped correctly.
t.Logf("SyncCache: %v (may degrade)", err)
}
time.Sleep(100 * time.Millisecond)
// Replica should have received entries.
replicaLSN := replica.replRecv.ReceivedLSN()
if replicaLSN == 0 {
t.Error("replica receivedLSN = 0, expected > 0 after writes across WAL")
}
// Verify first few LBAs on replica.
for i := 0; i < 5 && uint64(i) <= replicaLSN; i++ {
data, err := replica.ReadLBA(uint64(i), 4096)
if err != nil {
t.Fatalf("replica ReadLBA(%d): %v", i, err)
}
if data[0] == 0 {
t.Errorf("replica LBA %d is zero, expected non-zero data", i)
}
}
}
// ========== Group C: Rebuild Scenarios ==========
// C1: Start rebuild while primary receives concurrent writes.
func testQA4b4RebuildCatchupConcurrentWrites(t *testing.T) {
primary := cp3Primary(t, "rb_cc_pri.bv", 1)
defer primary.Close()
// Write initial data.
for i := 0; i < 5; i++ {
primary.WriteLBA(uint64(i), makeBlock(byte('A'+i)))
}
primary.StartRebuildServer("127.0.0.1:0")
defer primary.StopRebuildServer()
// Start concurrent writes on primary while rebuild happens.
var wg sync.WaitGroup
stopWriting := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
lba := uint64(10)
for {
select {
case <-stopWriting:
return
default:
}
primary.WriteLBA(lba, makeBlock(byte('C')))
lba++
if lba > 14 {
lba = 10
}
time.Sleep(1 * time.Millisecond)
}
}()
// Start rebuild client.
replica := cp3Rebuilding(t, "rb_cc_rep.bv", 1)
defer replica.Close()
err := StartRebuild(replica, primary.rebuildServer.Addr(), 1, 1)
close(stopWriting)
wg.Wait()
if err != nil {
t.Fatalf("StartRebuild: %v", err)
}
if replica.Role() != RoleReplica {
t.Errorf("role = %s, want Replica", replica.Role())
}
// Verify initial data on replica.
for i := 0; i < 5; i++ {
data, err := replica.ReadLBA(uint64(i), 4096)
if err != nil {
t.Fatalf("ReadLBA(%d): %v", i, err)
}
if data[0] != byte('A'+i) {
t.Errorf("LBA %d: got %c, want %c", i, data[0], byte('A'+i))
}
}
}
// C2: Write initial data, start full extent rebuild, concurrent writes during copy.
func testQA4b4RebuildFullExtentMidcopyWrites(t *testing.T) {
primary := cp3Primary(t, "rb_fe_pri.bv", 1)
defer primary.Close()
// Write and flush to make data in extent.
for i := 0; i < 5; i++ {
primary.WriteLBA(uint64(i), makeBlock(byte('D'+i)))
}
primary.SyncCache()
time.Sleep(30 * time.Millisecond) // let flusher move to extent
primary.StartRebuildServer("127.0.0.1:0")
defer primary.StopRebuildServer()
// Start concurrent writes during rebuild.
var wg sync.WaitGroup
stopWriting := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
lba := uint64(10)
for {
select {
case <-stopWriting:
return
default:
}
primary.WriteLBA(lba, makeBlock(byte('M')))
lba++
if lba > 14 {
lba = 10
}
time.Sleep(1 * time.Millisecond)
}
}()
// Force full extent rebuild (fromLSN=0 below checkpoint).
replica := cp3Rebuilding(t, "rb_fe_rep.bv", 1)
defer replica.Close()
err := StartRebuild(replica, primary.rebuildServer.Addr(), 0, 1)
close(stopWriting)
wg.Wait()
if err != nil {
t.Fatalf("StartRebuild: %v", err)
}
if replica.Role() != RoleReplica {
t.Errorf("role = %s, want Replica", replica.Role())
}
// Verify flushed data on replica (from extent).
for i := 0; i < 5; i++ {
data, err := replica.ReadLBA(uint64(i), 4096)
if err != nil {
t.Fatalf("ReadLBA(%d): %v", i, err)
}
if data[0] != byte('D'+i) {
t.Errorf("LBA %d: got %c, want %c", i, data[0], byte('D'+i))
}
}
}
// C3: Start rebuild, read half the entries, close connection, retry from start.
func testQA4b4RebuildInterruptedRetry(t *testing.T) {
primary := cp3Primary(t, "rb_int_pri.bv", 1)
defer primary.Close()
for i := 0; i < 10; i++ {
primary.WriteLBA(uint64(i), makeBlock(byte('R'+i%26)))
}
primary.StartRebuildServer("127.0.0.1:0")
defer primary.StopRebuildServer()
// First attempt: connect, read a few entries, then close.
conn1, err := net.Dial("tcp", primary.rebuildServer.Addr())
if err != nil {
t.Fatalf("dial 1: %v", err)
}
req := EncodeRebuildRequest(RebuildRequest{Type: RebuildWALCatchUp, FromLSN: 1, Epoch: 1})
WriteFrame(conn1, MsgRebuildReq, req)
// Read a few entries then abort.
for i := 0; i < 3; i++ {
_, _, err := ReadFrame(conn1)
if err != nil {
t.Fatalf("read entry %d: %v", i, err)
}
}
conn1.Close() // simulate crash
// Second attempt: full rebuild should succeed.
replica := cp3Rebuilding(t, "rb_int_rep.bv", 1)
defer replica.Close()
if err := StartRebuild(replica, primary.rebuildServer.Addr(), 1, 1); err != nil {
t.Fatalf("StartRebuild (retry): %v", err)
}
if replica.Role() != RoleReplica {
t.Errorf("role = %s, want Replica", replica.Role())
}
// Verify all data.
for i := 0; i < 10; i++ {
data, err := replica.ReadLBA(uint64(i), 4096)
if err != nil {
t.Fatalf("ReadLBA(%d): %v", i, err)
}
expected := byte('R' + i%26)
if data[0] != expected {
t.Errorf("LBA %d: got %c, want %c", i, data[0], expected)
}
}
}
// ========== Group D: Edge Interactions ==========
// D1: Write blocks, flush to reclaim WAL, concurrent ReadLBA on flushed blocks.
func testQA4b4ReadDuringWALReclaim(t *testing.T) {
vol := cp3Primary(t, "read_reclaim.bv", 1)
defer vol.Close()
// Write data.
for i := 0; i < 10; i++ {
if err := vol.WriteLBA(uint64(i), makeBlock(byte('R'+i%26))); err != nil {
t.Fatalf("WriteLBA(%d): %v", i, err)
}
}
// SyncCache + wait for flusher to reclaim WAL.
vol.SyncCache()
time.Sleep(50 * time.Millisecond)
// Concurrent reads on recently-flushed blocks.
var wg sync.WaitGroup
errs := make([]error, 10)
for i := 0; i < 10; i++ {
wg.Add(1)
go func(lba int) {
defer wg.Done()
data, err := vol.ReadLBA(uint64(lba), 4096)
if err != nil {
errs[lba] = err
return
}
expected := byte('R' + lba%26)
if data[0] != expected {
errs[lba] = fmt.Errorf("LBA %d: got %c, want %c", lba, data[0], expected)
}
}(i)
}
wg.Wait()
for i, err := range errs {
if err != nil {
t.Errorf("read LBA %d: %v", i, err)
}
}
}
// D2: Write blocks, TRIM some, rebuild replica, verify trimmed blocks read as zeros.
func testQA4b4TrimBlocksSurviveRebuild(t *testing.T) {
primary := cp3Primary(t, "trim_rb_pri.bv", 1)
defer primary.Close()
// Write 10 blocks.
for i := 0; i < 10; i++ {
primary.WriteLBA(uint64(i), makeBlock(byte('T'+i%26)))
}
// TRIM blocks 3-5.
for i := 3; i <= 5; i++ {
if err := primary.Trim(uint64(i), 4096); err != nil {
t.Fatalf("Trim(%d): %v", i, err)
}
}
primary.StartRebuildServer("127.0.0.1:0")
defer primary.StopRebuildServer()
replica := cp3Rebuilding(t, "trim_rb_rep.bv", 1)
defer replica.Close()
if err := StartRebuild(replica, primary.rebuildServer.Addr(), 1, 1); err != nil {
t.Fatalf("StartRebuild: %v", err)
}
// Trimmed blocks should read as zeros.
for i := 3; i <= 5; i++ {
data, err := replica.ReadLBA(uint64(i), 4096)
if err != nil {
t.Fatalf("ReadLBA(%d): %v", i, err)
}
nonZero := false
for _, b := range data {
if b != 0 {
nonZero = true
break
}
}
if nonZero {
t.Errorf("LBA %d should be zero after TRIM+rebuild, got non-zero data", i)
}
}
// Non-trimmed blocks should still have data.
data0, _ := replica.ReadLBA(0, 4096)
if data0[0] != 'T' {
t.Errorf("LBA 0: got %c, want 'T'", data0[0])
}
}
// D3: Fill WAL, start WriteLBA (blocks in WAL-full retry), concurrently demote.
func testQA4b4DemoteWhileWALFull(t *testing.T) {
// Create volume with tiny WAL.
dir := t.TempDir()
cfg := DefaultConfig()
cfg.FlushInterval = 100 * time.Millisecond // slow flusher
cfg.WALFullTimeout = 2 * time.Second
vol, err := CreateBlockVol(filepath.Join(dir, "demote_walful.bv"), CreateOptions{
VolumeSize: 64 * 1024,
BlockSize: 4096,
WALSize: 16 * 1024, // tiny WAL: ~4 entries
}, cfg)
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
defer vol.Close()
if err := vol.HandleAssignment(1, RolePrimary, 30*time.Second); err != nil {
t.Fatalf("assign primary: %v", err)
}
vol.drainTimeout = 3 * time.Second
// Fill WAL.
writeCount := 0
for i := 0; i < 20; i++ {
if err := vol.WriteLBA(uint64(i%16), makeBlock(byte('F'))); err != nil {
break
}
writeCount++
}
if writeCount < 2 {
t.Fatalf("only wrote %d entries before WAL full", writeCount)
}
// Start a WriteLBA that will likely block in WAL-full retry.
writeDone := make(chan error, 1)
go func() {
writeDone <- vol.WriteLBA(0, makeBlock('Z'))
}()
// Give the write a moment to start retrying.
time.Sleep(10 * time.Millisecond)
// Concurrently demote.
demoteDone := make(chan error, 1)
go func() {
demoteDone <- vol.HandleAssignment(2, RoleStale, 0)
}()
// Both should complete within timeout (no deadlock).
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select {
case err := <-demoteDone:
// Demote may succeed or fail with drain timeout -- both OK.
t.Logf("demote result: %v", err)
case <-timer.C:
t.Fatal("demote hung for 5s -- deadlock between WAL-full retry and demote drain")
}
select {
case err := <-writeDone:
// Write may fail (demoted, closed, WAL full timeout) -- all acceptable.
t.Logf("write result: %v", err)
case <-timer.C:
t.Fatal("write hung for 5s after demote")
}
}
// ========== Group E: Error Injection ==========
// E1: Start rebuild server on primary (epoch=1), client sends epoch=2.
// Server should return EPOCH_MISMATCH.
func testQA4b4RebuildServerEpochMismatch(t *testing.T) {
vol := cp3Primary(t, "rbs_epm.bv", 1)
defer vol.Close()
vol.StartRebuildServer("127.0.0.1:0")
defer vol.StopRebuildServer()
// Client sends mismatched epoch=2.
conn, err := net.Dial("tcp", vol.rebuildServer.Addr())
if err != nil {
t.Fatalf("dial: %v", err)
}
defer conn.Close()
req := EncodeRebuildRequest(RebuildRequest{Type: RebuildWALCatchUp, FromLSN: 1, Epoch: 2})
if err := WriteFrame(conn, MsgRebuildReq, req); err != nil {
t.Fatalf("send request: %v", err)
}
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
msgType, payload, err := ReadFrame(conn)
if err != nil {
t.Fatalf("read response: %v", err)
}
if msgType != MsgRebuildError {
t.Errorf("msgType = 0x%02x, want MsgRebuildError(0x%02x)", msgType, MsgRebuildError)
}
if string(payload) != "EPOCH_MISMATCH" {
t.Errorf("payload = %q, want EPOCH_MISMATCH", string(payload))
}
// Also test epoch=0 (lower than server's epoch=1).
conn2, err := net.Dial("tcp", vol.rebuildServer.Addr())
if err != nil {
t.Fatalf("dial 2: %v", err)
}
defer conn2.Close()
req2 := EncodeRebuildRequest(RebuildRequest{Type: RebuildFullExtent, Epoch: 0})
if err := WriteFrame(conn2, MsgRebuildReq, req2); err != nil {
t.Fatalf("send request 2: %v", err)
}
conn2.SetReadDeadline(time.Now().Add(3 * time.Second))
msgType2, payload2, err := ReadFrame(conn2)
if err != nil {
t.Fatalf("read response 2: %v", err)
}
if msgType2 != MsgRebuildError || string(payload2) != "EPOCH_MISMATCH" {
t.Errorf("epoch=0: type=0x%02x payload=%q, want EPOCH_MISMATCH", msgType2, payload2)
}
}

8
weed/storage/blockvol/test/ha_test.go

@ -24,10 +24,10 @@ const (
haISCSIPort2 = 3261 // replica iSCSI (used after promotion) haISCSIPort2 = 3261 // replica iSCSI (used after promotion)
haAdminPort1 = 8080 // primary admin haAdminPort1 = 8080 // primary admin
haAdminPort2 = 8081 // replica admin haAdminPort2 = 8081 // replica admin
haReplData1 = 9001 // replica receiver data (on replica node)
haReplCtrl1 = 9002 // replica receiver ctrl (on replica node)
haRebuildPort1 = 9003 // rebuild server (primary)
haRebuildPort2 = 9004 // rebuild server (replica, after promotion)
haReplData1 = 9011 // replica receiver data (on replica node)
haReplCtrl1 = 9012 // replica receiver ctrl (on replica node)
haRebuildPort1 = 9013 // rebuild server (primary)
haRebuildPort2 = 9014 // rebuild server (replica, after promotion)
) )
// newHAPair creates a primary HATarget on targetNode and a replica HATarget // newHAPair creates a primary HATarget on targetNode and a replica HATarget

Loading…
Cancel
Save