diff --git a/weed/storage/blockvol/iscsi/cmd/blockbench/main.go b/weed/storage/blockvol/iscsi/cmd/blockbench/main.go new file mode 100644 index 000000000..869d93edd --- /dev/null +++ b/weed/storage/blockvol/iscsi/cmd/blockbench/main.go @@ -0,0 +1,189 @@ +// blockbench is a simple block device benchmark tool. +// Usage: blockbench [-f path] [-size 256M] [-bs 4K] [-duration 10s] [-pattern seq|rand] [-rw read|write|mixed] +package main + +import ( + "crypto/rand" + "flag" + "fmt" + "math/big" + "os" + "strings" + "sync/atomic" + "time" +) + +func main() { + filePath := flag.String("f", "", "test file path (e.g. F:\\testfile.dat)") + fileSize := flag.String("size", "256M", "test file size") + blockSize := flag.String("bs", "4K", "block size") + duration := flag.Duration("duration", 10*time.Second, "test duration") + pattern := flag.String("pattern", "seq", "access pattern: seq or rand") + mode := flag.String("rw", "write", "read, write, or mixed") + workers := flag.Int("workers", 1, "number of concurrent workers") + direct := flag.Bool("direct", false, "use O_DIRECT (not supported on all OS)") + flag.Parse() + + if *filePath == "" { + fmt.Fprintln(os.Stderr, "error: -f is required") + flag.Usage() + os.Exit(1) + } + + fSize, err := parseSize(*fileSize) + if err != nil { + fmt.Fprintf(os.Stderr, "bad -size: %v\n", err) + os.Exit(1) + } + bSize, err := parseSize(*blockSize) + if err != nil { + fmt.Fprintf(os.Stderr, "bad -bs: %v\n", err) + os.Exit(1) + } + + _ = *direct // placeholder for future O_DIRECT support + + fmt.Printf("=== BlockBench ===\n") + fmt.Printf("file=%s size=%s bs=%s duration=%s pattern=%s mode=%s workers=%d\n", + *filePath, *fileSize, *blockSize, *duration, *pattern, *mode, *workers) + + // Create/open file + f, err := os.OpenFile(*filePath, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + fmt.Fprintf(os.Stderr, "open: %v\n", err) + os.Exit(1) + } + defer f.Close() + + // Pre-allocate + if err := f.Truncate(int64(fSize)); err != nil { + fmt.Fprintf(os.Stderr, "truncate: %v\n", err) + os.Exit(1) + } + + totalBlocks := fSize / bSize + + // Prepare write buffer + writeBuf := make([]byte, bSize) + rand.Read(writeBuf) + + var ops atomic.Int64 + var bytes atomic.Int64 + var maxLatUs atomic.Int64 + + isRand := *pattern == "rand" + doWrite := *mode == "write" || *mode == "mixed" + doRead := *mode == "read" || *mode == "mixed" + + stop := make(chan struct{}) + + fmt.Printf("running %s %s for %s...\n", *pattern, *mode, *duration) + + start := time.Now() + + for w := 0; w < *workers; w++ { + wf, err := os.OpenFile(*filePath, os.O_RDWR, 0644) + if err != nil { + fmt.Fprintf(os.Stderr, "worker open: %v\n", err) + os.Exit(1) + } + defer wf.Close() + + go func(wf *os.File, workerID int) { + buf := make([]byte, bSize) + copy(buf, writeBuf) + var seqBlock uint64 + readBuf := make([]byte, bSize) + + for { + select { + case <-stop: + return + default: + } + + var blockNum uint64 + if isRand { + n, _ := rand.Int(rand.Reader, big.NewInt(int64(totalBlocks))) + blockNum = n.Uint64() + } else { + blockNum = seqBlock % totalBlocks + seqBlock++ + } + + off := int64(blockNum * bSize) + opStart := time.Now() + + if doWrite && (!doRead || seqBlock%2 == 0) { + _, err = wf.WriteAt(buf, off) + } else if doRead { + _, err = wf.ReadAt(readBuf, off) + } + + us := time.Since(opStart).Microseconds() + if err == nil { + ops.Add(1) + bytes.Add(int64(bSize)) + for { + old := maxLatUs.Load() + if us <= old || maxLatUs.CompareAndSwap(old, us) { + break + } + } + } + } + }(wf, w) + } + + time.Sleep(*duration) + close(stop) + elapsed := time.Since(start) + + // Small sleep to let workers finish + time.Sleep(10 * time.Millisecond) + + totalOps := ops.Load() + totalBytes := bytes.Load() + maxLat := maxLatUs.Load() + + iops := float64(totalOps) / elapsed.Seconds() + mbps := float64(totalBytes) / (1024 * 1024) / elapsed.Seconds() + avgUs := int64(0) + if totalOps > 0 { + avgUs = int64(elapsed.Microseconds()) / totalOps * int64(*workers) + // Rough approximation — actual avg needs per-op tracking + avgUs = int64(float64(elapsed.Microseconds()) / float64(totalOps) * float64(*workers)) + } + + fmt.Printf("\n=== Results ===\n") + fmt.Printf("ops: %d\n", totalOps) + fmt.Printf("IOPS: %.0f\n", iops) + fmt.Printf("throughput: %.1f MB/s\n", mbps) + fmt.Printf("max_lat: %d us (%.1f ms)\n", maxLat, float64(maxLat)/1000) + fmt.Printf("elapsed: %.1fs\n", elapsed.Seconds()) + if totalOps > 0 { + fmt.Printf("avg_lat: ~%d us (%.2f ms) [estimated]\n", avgUs, float64(avgUs)/1000) + } +} + +func parseSize(s string) (uint64, error) { + s = strings.TrimSpace(s) + if len(s) == 0 { + return 0, fmt.Errorf("empty size") + } + mul := uint64(1) + switch s[len(s)-1] { + case 'K', 'k': + mul = 1024 + s = s[:len(s)-1] + case 'M', 'm': + mul = 1024 * 1024 + s = s[:len(s)-1] + case 'G', 'g': + mul = 1024 * 1024 * 1024 + s = s[:len(s)-1] + } + var n uint64 + _, err := fmt.Sscanf(s, "%d", &n) + return n * mul, err +} diff --git a/weed/storage/blockvol/iscsi/cmd/iscsi-target/admin.go b/weed/storage/blockvol/iscsi/cmd/iscsi-target/admin.go index 0e7469a0a..39aac99fe 100644 --- a/weed/storage/blockvol/iscsi/cmd/iscsi-target/admin.go +++ b/weed/storage/blockvol/iscsi/cmd/iscsi-target/admin.go @@ -12,6 +12,7 @@ import ( "log" "net" "net/http" + _ "net/http/pprof" // registers /debug/pprof/* handlers on DefaultServeMux "time" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" @@ -189,13 +190,21 @@ func (a *adminServer) handleRebuild(w http.ResponseWriter, r *http.Request) { // startAdminServer starts the HTTP admin server in a background goroutine. // Returns the listener so tests can determine the actual bound port. +// Includes /debug/pprof/* endpoints for profiling. func startAdminServer(addr string, srv *adminServer) (net.Listener, error) { ln, err := net.Listen("tcp", addr) if err != nil { return nil, fmt.Errorf("admin listen %s: %w", addr, err) } + mux := http.NewServeMux() + mux.Handle("/assign", srv) + mux.Handle("/status", srv) + mux.Handle("/replica", srv) + mux.Handle("/rebuild", srv) + // pprof handlers registered on DefaultServeMux by net/http/pprof import. + mux.Handle("/debug/pprof/", http.DefaultServeMux) go func() { - if err := http.Serve(ln, srv); err != nil && !isClosedErr(err) { + if err := http.Serve(ln, mux); err != nil && !isClosedErr(err) { srv.logger.Printf("admin server error: %v", err) } }() diff --git a/weed/storage/blockvol/iscsi/cmd/iscsi-target/main.go b/weed/storage/blockvol/iscsi/cmd/iscsi-target/main.go index d183d2de4..8a1979ffb 100644 --- a/weed/storage/blockvol/iscsi/cmd/iscsi-target/main.go +++ b/weed/storage/blockvol/iscsi/cmd/iscsi-target/main.go @@ -13,7 +13,9 @@ import ( "os/signal" "strconv" "strings" + "sync/atomic" "syscall" + "time" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/iscsi" @@ -98,19 +100,33 @@ func main() { logger.Printf("admin server: %s", ln.Addr()) } - // Create adapter - adapter := &blockVolAdapter{vol: vol} + // Create adapter with latency instrumentation + adapter := &instrumentedAdapter{ + inner: &blockVolAdapter{vol: vol}, + logger: logger, + } // Create target server config := iscsi.DefaultTargetConfig() config.TargetName = *iqn config.TargetAlias = "SeaweedFS BlockVol" + if *portal != "" { + // Parse portal group tag from "addr:port,tpgt" format + if idx := strings.LastIndex(*portal, ","); idx >= 0 { + if tpgt, err := strconv.Atoi((*portal)[idx+1:]); err == nil { + config.TargetPortalGroupTag = tpgt + } + } + } ts := iscsi.NewTargetServer(*addr, config, logger) if *portal != "" { ts.SetPortalAddr(*portal) } ts.AddVolume(*iqn, adapter) + // Start periodic performance stats logging (every 5 seconds). + adapter.StartStatsLogger(5 * time.Second) + // Graceful shutdown on signal sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) @@ -148,6 +164,119 @@ func (a *blockVolAdapter) BlockSize() uint32 { return a.vol.Info().BlockSize } func (a *blockVolAdapter) VolumeSize() uint64 { return a.vol.Info().VolumeSize } func (a *blockVolAdapter) IsHealthy() bool { return a.vol.Info().Healthy } +// instrumentedAdapter wraps a BlockDevice and logs latency stats periodically. +type instrumentedAdapter struct { + inner iscsi.BlockDevice + logger *log.Logger + + // Counters (atomic, lock-free) + writeOps atomic.Int64 + readOps atomic.Int64 + syncOps atomic.Int64 + writeTotUs atomic.Int64 // total microseconds + readTotUs atomic.Int64 + syncTotUs atomic.Int64 + writeMaxUs atomic.Int64 + readMaxUs atomic.Int64 + syncMaxUs atomic.Int64 + writeBytes atomic.Int64 + readBytes atomic.Int64 +} + +func (a *instrumentedAdapter) ReadAt(lba uint64, length uint32) ([]byte, error) { + start := time.Now() + data, err := a.inner.ReadAt(lba, length) + us := time.Since(start).Microseconds() + a.readOps.Add(1) + a.readTotUs.Add(us) + a.readBytes.Add(int64(length)) + atomicMax(&a.readMaxUs, us) + return data, err +} + +func (a *instrumentedAdapter) WriteAt(lba uint64, data []byte) error { + start := time.Now() + err := a.inner.WriteAt(lba, data) + us := time.Since(start).Microseconds() + a.writeOps.Add(1) + a.writeTotUs.Add(us) + a.writeBytes.Add(int64(len(data))) + atomicMax(&a.writeMaxUs, us) + return err +} + +func (a *instrumentedAdapter) Trim(lba uint64, length uint32) error { + return a.inner.Trim(lba, length) +} + +func (a *instrumentedAdapter) SyncCache() error { + start := time.Now() + err := a.inner.SyncCache() + us := time.Since(start).Microseconds() + a.syncOps.Add(1) + a.syncTotUs.Add(us) + atomicMax(&a.syncMaxUs, us) + return err +} + +func (a *instrumentedAdapter) BlockSize() uint32 { return a.inner.BlockSize() } +func (a *instrumentedAdapter) VolumeSize() uint64 { return a.inner.VolumeSize() } +func (a *instrumentedAdapter) IsHealthy() bool { return a.inner.IsHealthy() } + +// StartStatsLogger runs a goroutine that logs performance stats every interval. +func (a *instrumentedAdapter) StartStatsLogger(interval time.Duration) { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for range ticker.C { + wr := a.writeOps.Swap(0) + rd := a.readOps.Swap(0) + sy := a.syncOps.Swap(0) + wrUs := a.writeTotUs.Swap(0) + rdUs := a.readTotUs.Swap(0) + syUs := a.syncTotUs.Swap(0) + wrMax := a.writeMaxUs.Swap(0) + rdMax := a.readMaxUs.Swap(0) + syMax := a.syncMaxUs.Swap(0) + wrBytes := a.writeBytes.Swap(0) + rdBytes := a.readBytes.Swap(0) + + if wr+rd+sy == 0 { + continue // quiet when idle + } + + wrAvg, rdAvg, syAvg := int64(0), int64(0), int64(0) + if wr > 0 { + wrAvg = wrUs / wr + } + if rd > 0 { + rdAvg = rdUs / rd + } + if sy > 0 { + syAvg = syUs / sy + } + a.logger.Printf("PERF[%ds] wr=%d(%.1fMB avg=%dus max=%dus) rd=%d(%.1fMB avg=%dus max=%dus) sync=%d(avg=%dus max=%dus)", + int(interval.Seconds()), + wr, float64(wrBytes)/(1024*1024), wrAvg, wrMax, + rd, float64(rdBytes)/(1024*1024), rdAvg, rdMax, + sy, syAvg, syMax, + ) + } + }() +} + +func atomicMax(addr *atomic.Int64, val int64) { + for { + old := addr.Load() + if val <= old { + return + } + if addr.CompareAndSwap(old, val) { + return + } + } +} + func parseSize(s string) (uint64, error) { s = strings.TrimSpace(s) if len(s) == 0 { diff --git a/weed/storage/blockvol/iscsi/dataio.go b/weed/storage/blockvol/iscsi/dataio.go index 94873075f..fbbef95fa 100644 --- a/weed/storage/blockvol/iscsi/dataio.go +++ b/weed/storage/blockvol/iscsi/dataio.go @@ -27,16 +27,21 @@ func NewDataInWriter(maxSegLen uint32) *DataInWriter { } // WriteDataIn splits data into Data-In PDUs and writes them to w. -// itt is the initiator task tag, ttt is the target transfer tag. +// itt is the initiator task tag, edtl is expected data transfer length. // statSN is the current StatSN (incremented when S-bit is set). // Returns the number of PDUs written. -func (d *DataInWriter) WriteDataIn(w io.Writer, data []byte, itt uint32, expCmdSN, maxCmdSN uint32, statSN *uint32) (int, error) { +func (d *DataInWriter) WriteDataIn(w io.Writer, data []byte, itt, edtl uint32, expCmdSN, maxCmdSN uint32, statSN *uint32) (int, error) { totalLen := uint32(len(data)) if totalLen == 0 { // Zero-length read -- send single Data-In with S-bit, no data pdu := &PDU{} pdu.SetOpcode(OpSCSIDataIn) - pdu.SetOpSpecific1(FlagF | FlagS) // Final + Status + flags := uint8(FlagF | FlagS) + if edtl > 0 { + flags |= FlagU + pdu.SetResidualCount(edtl) + } + pdu.SetOpSpecific1(flags) pdu.SetInitiatorTaskTag(itt) pdu.SetTargetTransferTag(0xFFFFFFFF) pdu.SetStatSN(*statSN) @@ -74,7 +79,15 @@ func (d *DataInWriter) WriteDataIn(w io.Writer, data []byte, itt uint32, expCmdS pdu.DataSegment = data[offset : offset+segLen] if isFinal { - pdu.SetOpSpecific1(FlagF | FlagS) // Final + Status + flags := uint8(FlagF | FlagS) + if totalLen < edtl { + flags |= FlagU + pdu.SetResidualCount(edtl - totalLen) + } else if totalLen > edtl { + flags |= FlagO + pdu.SetResidualCount(totalLen - edtl) + } + pdu.SetOpSpecific1(flags) pdu.SetStatSN(*statSN) *statSN++ pdu.SetSCSIStatus(SCSIStatusGood) @@ -219,14 +232,20 @@ func BuildSCSIResponse(result SCSIResult, itt uint32, expCmdSN, maxCmdSN uint32) } // BuildDataInPDUs splits data into Data-In PDUs and returns them. +// edtl is the Expected Data Transfer Length from the SCSI command PDU. // StatSN is NOT set on the final PDU -- the txLoop assigns it. // Intermediate PDUs (without S-bit) never carry StatSN. -func (d *DataInWriter) BuildDataInPDUs(data []byte, itt uint32, expCmdSN, maxCmdSN uint32) []*PDU { +func (d *DataInWriter) BuildDataInPDUs(data []byte, itt, edtl uint32, expCmdSN, maxCmdSN uint32) []*PDU { totalLen := uint32(len(data)) if totalLen == 0 { pdu := &PDU{} pdu.SetOpcode(OpSCSIDataIn) - pdu.SetOpSpecific1(FlagF | FlagS) + flags := uint8(FlagF | FlagS) + if edtl > 0 { + flags |= FlagU // underflow: expected data but got none + pdu.SetResidualCount(edtl) + } + pdu.SetOpSpecific1(flags) pdu.SetInitiatorTaskTag(itt) pdu.SetTargetTransferTag(0xFFFFFFFF) pdu.SetExpCmdSN(expCmdSN) @@ -262,7 +281,16 @@ func (d *DataInWriter) BuildDataInPDUs(data []byte, itt uint32, expCmdSN, maxCmd pdu.DataSegment = seg if isFinal { - pdu.SetOpSpecific1(FlagF | FlagS) + flags := uint8(FlagF | FlagS) + // RFC 7143: set underflow/overflow residual on final Data-In with S-bit. + if totalLen < edtl { + flags |= FlagU + pdu.SetResidualCount(edtl - totalLen) + } else if totalLen > edtl { + flags |= FlagO + pdu.SetResidualCount(totalLen - edtl) + } + pdu.SetOpSpecific1(flags) pdu.SetSCSIStatus(SCSIStatusGood) } else { pdu.SetOpSpecific1(0) diff --git a/weed/storage/blockvol/iscsi/dataio_test.go b/weed/storage/blockvol/iscsi/dataio_test.go index ab314f9d2..7aa0780e4 100644 --- a/weed/storage/blockvol/iscsi/dataio_test.go +++ b/weed/storage/blockvol/iscsi/dataio_test.go @@ -40,7 +40,7 @@ func testDataInSinglePDU(t *testing.T) { data := bytes.Repeat([]byte{0xAA}, 4096) statSN := uint32(1) - n, err := dw.WriteDataIn(w, data, 0x100, 1, 10, &statSN) + n, err := dw.WriteDataIn(w, data, 0x100, uint32(len(data)), 1, 10, &statSN) if err != nil { t.Fatal(err) } @@ -75,7 +75,7 @@ func testDataInMultiPDU(t *testing.T) { data := bytes.Repeat([]byte{0xBB}, 3000) statSN := uint32(1) - n, err := dw.WriteDataIn(w, data, 0x200, 1, 10, &statSN) + n, err := dw.WriteDataIn(w, data, 0x200, uint32(len(data)), 1, 10, &statSN) if err != nil { t.Fatal(err) } @@ -121,7 +121,7 @@ func testDataInExactBoundary(t *testing.T) { data := bytes.Repeat([]byte{0xCC}, 2048) // exact 2 PDUs statSN := uint32(1) - n, err := dw.WriteDataIn(w, data, 0x300, 1, 10, &statSN) + n, err := dw.WriteDataIn(w, data, 0x300, uint32(len(data)), 1, 10, &statSN) if err != nil { t.Fatal(err) } @@ -135,7 +135,7 @@ func testDataInZeroLength(t *testing.T) { dw := NewDataInWriter(8192) statSN := uint32(5) - n, err := dw.WriteDataIn(w, nil, 0x400, 1, 10, &statSN) + n, err := dw.WriteDataIn(w, nil, 0x400, 0, 1, 10, &statSN) if err != nil { t.Fatal(err) } @@ -153,7 +153,7 @@ func testDataInDataSNOrdering(t *testing.T) { data := bytes.Repeat([]byte{0xDD}, 2048) // 4 PDUs statSN := uint32(1) - dw.WriteDataIn(w, data, 0x500, 1, 10, &statSN) + dw.WriteDataIn(w, data, 0x500, uint32(len(data)), 1, 10, &statSN) for i := 0; i < 4; i++ { pdu, _ := ReadPDU(w) @@ -173,7 +173,7 @@ func testDataInFbitSbit(t *testing.T) { data := bytes.Repeat([]byte{0xEE}, 2500) statSN := uint32(1) - dw.WriteDataIn(w, data, 0x600, 1, 10, &statSN) + dw.WriteDataIn(w, data, 0x600, uint32(len(data)), 1, 10, &statSN) for i := 0; i < 3; i++ { pdu, _ := ReadPDU(w) @@ -399,7 +399,7 @@ func testDataInStatSNIncrement(t *testing.T) { data := bytes.Repeat([]byte{0x00}, 3072) // 3 PDUs statSN := uint32(100) - dw.WriteDataIn(w, data, 0x700, 1, 10, &statSN) + dw.WriteDataIn(w, data, 0x700, uint32(len(data)), 1, 10, &statSN) // Only the final PDU has S-bit, so StatSN increments once if statSN != 101 { t.Fatalf("StatSN should be 101, got %d", statSN) diff --git a/weed/storage/blockvol/iscsi/login.go b/weed/storage/blockvol/iscsi/login.go index 9d6f22d8e..19062915d 100644 --- a/weed/storage/blockvol/iscsi/login.go +++ b/weed/storage/blockvol/iscsi/login.go @@ -58,6 +58,7 @@ const ( type TargetConfig struct { TargetName string TargetAlias string + TargetPortalGroupTag int // TPGT (1-65535); 0 = use default 1 MaxRecvDataSegmentLength int MaxBurstLength int FirstBurstLength int @@ -272,6 +273,13 @@ func (ln *LoginNegotiator) HandleLoginPDU(req *PDU, resolver TargetResolver) *PD } resp.SetTSIH(ln.tsih) + // RFC 7143 Section 13.9: All Login Responses MUST carry TargetPortalGroupTag. + tpgt := ln.config.TargetPortalGroupTag + if tpgt <= 0 { + tpgt = 1 + } + respParams.Set("TargetPortalGroupTag", strconv.Itoa(tpgt)) + // Encode response params if respParams.Len() > 0 { resp.DataSegment = respParams.Encode() diff --git a/weed/storage/blockvol/iscsi/scsi.go b/weed/storage/blockvol/iscsi/scsi.go index f42d0ee78..281676493 100644 --- a/weed/storage/blockvol/iscsi/scsi.go +++ b/weed/storage/blockvol/iscsi/scsi.go @@ -6,21 +6,28 @@ import ( // SCSI opcode constants (SPC-5 / SBC-4) const ( - ScsiTestUnitReady uint8 = 0x00 - ScsiInquiry uint8 = 0x12 - ScsiModeSense6 uint8 = 0x1a - ScsiModeSense10 uint8 = 0x5a - ScsiReadCapacity10 uint8 = 0x25 - ScsiRead10 uint8 = 0x28 - ScsiWrite10 uint8 = 0x2a - ScsiSyncCache10 uint8 = 0x35 - ScsiUnmap uint8 = 0x42 - ScsiReportLuns uint8 = 0xa0 - ScsiRead16 uint8 = 0x88 - ScsiWrite16 uint8 = 0x8a - ScsiReadCapacity16 uint8 = 0x9e // SERVICE ACTION IN (16), SA=0x10 - ScsiSyncCache16 uint8 = 0x91 - ScsiWriteSame16 uint8 = 0x93 + ScsiTestUnitReady uint8 = 0x00 + ScsiRequestSense uint8 = 0x03 + ScsiInquiry uint8 = 0x12 + ScsiModeSelect6 uint8 = 0x15 + ScsiModeSense6 uint8 = 0x1a + ScsiStartStopUnit uint8 = 0x1b + ScsiReadCapacity10 uint8 = 0x25 + ScsiRead10 uint8 = 0x28 + ScsiWrite10 uint8 = 0x2a + ScsiSyncCache10 uint8 = 0x35 + ScsiUnmap uint8 = 0x42 + ScsiModeSelect10 uint8 = 0x55 + ScsiModeSense10 uint8 = 0x5a + ScsiPersistReserveIn uint8 = 0x5e + ScsiPersistReserveOut uint8 = 0x5f + ScsiRead16 uint8 = 0x88 + ScsiWrite16 uint8 = 0x8a + ScsiSyncCache16 uint8 = 0x91 + ScsiWriteSame16 uint8 = 0x93 + ScsiServiceActionIn16 uint8 = 0x9e // READ CAPACITY(16), etc. + ScsiReportLuns uint8 = 0xa0 + ScsiMaintenanceIn uint8 = 0xa3 // REPORT SUPPORTED OPCODES, etc. ) // Service action for READ CAPACITY (16) @@ -93,38 +100,52 @@ func (h *SCSIHandler) HandleCommand(cdb [16]byte, dataOut []byte) SCSIResult { switch opcode { case ScsiTestUnitReady: return h.testUnitReady() + case ScsiRequestSense: + return h.requestSense(cdb) case ScsiInquiry: return h.inquiry(cdb) + case ScsiModeSelect6: + return h.modeSelect6(cdb, dataOut) case ScsiModeSense6: return h.modeSense6(cdb) - case ScsiModeSense10: - return h.modeSense10(cdb) + case ScsiStartStopUnit: + return h.startStopUnit(cdb) case ScsiReadCapacity10: return h.readCapacity10() - case ScsiReadCapacity16: - sa := cdb[1] & 0x1f - if sa == ScsiSAReadCapacity16 { - return h.readCapacity16(cdb) - } - return illegalRequest(ASCInvalidOpcode, ASCQLuk) - case ScsiReportLuns: - return h.reportLuns(cdb) case ScsiRead10: return h.read10(cdb) - case ScsiRead16: - return h.read16(cdb) case ScsiWrite10: return h.write10(cdb, dataOut) - case ScsiWrite16: - return h.write16(cdb, dataOut) case ScsiSyncCache10: return h.syncCache() - case ScsiSyncCache16: - return h.syncCache() case ScsiUnmap: return h.unmap(cdb, dataOut) + case ScsiModeSelect10: + return h.modeSelect10(cdb, dataOut) + case ScsiModeSense10: + return h.modeSense10(cdb) + case ScsiPersistReserveIn: + return h.persistReserveIn(cdb) + case ScsiPersistReserveOut: + return h.persistReserveOut(cdb, dataOut) + case ScsiRead16: + return h.read16(cdb) + case ScsiWrite16: + return h.write16(cdb, dataOut) + case ScsiSyncCache16: + return h.syncCache() case ScsiWriteSame16: return h.writeSame16(cdb, dataOut) + case ScsiServiceActionIn16: + sa := cdb[1] & 0x1f + if sa == ScsiSAReadCapacity16 { + return h.readCapacity16(cdb) + } + return illegalRequest(ASCInvalidOpcode, ASCQLuk) + case ScsiReportLuns: + return h.reportLuns(cdb) + case ScsiMaintenanceIn: + return h.maintenanceIn(cdb) default: return illegalRequest(ASCInvalidOpcode, ASCQLuk) } @@ -144,6 +165,64 @@ func (h *SCSIHandler) testUnitReady() SCSIResult { return SCSIResult{Status: SCSIStatusGood} } +func (h *SCSIHandler) requestSense(cdb [16]byte) SCSIResult { + allocLen := cdb[4] + if allocLen == 0 { + allocLen = 18 + } + + // Return fixed-format sense data with NO SENSE (no pending error). + data := BuildSenseData(SenseNoSense, 0x00, 0x00) + if int(allocLen) < len(data) { + data = data[:allocLen] + } + return SCSIResult{Status: SCSIStatusGood, Data: data} +} + +// startStopUnit handles START STOP UNIT (0x1B). +// Windows sends this during disk init. We accept and ignore. +func (h *SCSIHandler) startStopUnit(cdb [16]byte) SCSIResult { + return SCSIResult{Status: SCSIStatusGood} +} + +// modeSelect6 handles MODE SELECT(6) (0x15). +// Windows sends this to set caching mode pages. We accept and ignore the data. +func (h *SCSIHandler) modeSelect6(cdb [16]byte, dataOut []byte) SCSIResult { + return SCSIResult{Status: SCSIStatusGood} +} + +// modeSelect10 handles MODE SELECT(10) (0x55). +// Same as modeSelect6 but with 10-byte CDB. +func (h *SCSIHandler) modeSelect10(cdb [16]byte, dataOut []byte) SCSIResult { + return SCSIResult{Status: SCSIStatusGood} +} + +// persistReserveIn handles PERSISTENT RESERVE IN (0x5E). +// Windows Cluster and MPIO use this. We don't support reservations, +// so return ILLEGAL_REQUEST with a specific ASC that tells Windows +// "not supported" rather than "broken device". +func (h *SCSIHandler) persistReserveIn(cdb [16]byte) SCSIResult { + return illegalRequest(ASCInvalidOpcode, ASCQLuk) +} + +// persistReserveOut handles PERSISTENT RESERVE OUT (0x5F). +func (h *SCSIHandler) persistReserveOut(cdb [16]byte, dataOut []byte) SCSIResult { + return illegalRequest(ASCInvalidOpcode, ASCQLuk) +} + +// maintenanceIn handles MAINTENANCE IN (0xA3). +// Service action 0x0C = REPORT SUPPORTED OPERATION CODES. +// Windows sends this to discover which commands we support. +func (h *SCSIHandler) maintenanceIn(cdb [16]byte) SCSIResult { + sa := cdb[1] & 0x1f + if sa == 0x0c { + // REPORT SUPPORTED OPERATION CODES -- return empty (not supported). + // This tells Windows to probe commands individually. + return illegalRequest(ASCInvalidOpcode, ASCQLuk) + } + return illegalRequest(ASCInvalidOpcode, ASCQLuk) +} + func (h *SCSIHandler) inquiry(cdb [16]byte) SCSIResult { evpd := cdb[1] & 0x01 pageCode := cdb[2] @@ -353,17 +432,22 @@ func (h *SCSIHandler) readCapacity16(cdb [16]byte) SCSIResult { } func (h *SCSIHandler) modeSense6(cdb [16]byte) SCSIResult { - // Minimal MODE SENSE(6) response -- no mode pages allocLen := cdb[4] if allocLen == 0 { allocLen = 4 } + pageCode := cdb[2] & 0x3f - data := make([]byte, 4) - data[0] = 3 // Mode data length (3 bytes follow) - data[1] = 0x00 // Medium type: default - data[2] = 0x00 // Device-specific parameter (no write protect) - data[3] = 0x00 // Block descriptor length = 0 + // Build mode page data based on requested page. + pages := h.buildModePages(pageCode) + + // MODE SENSE(6) header = 4 bytes + pages + data := make([]byte, 4+len(pages)) + data[0] = byte(3 + len(pages)) // Mode data length (everything after byte 0) + data[1] = 0x00 // Medium type: default + data[2] = 0x00 // Device-specific parameter (no write protect) + data[3] = 0x00 // Block descriptor length = 0 + copy(data[4:], pages) if int(allocLen) < len(data) { data = data[:allocLen] @@ -372,21 +456,24 @@ func (h *SCSIHandler) modeSense6(cdb [16]byte) SCSIResult { } func (h *SCSIHandler) modeSense10(cdb [16]byte) SCSIResult { - // MODE SENSE(10) -- 8-byte header, no mode pages allocLen := binary.BigEndian.Uint16(cdb[7:9]) if allocLen == 0 { allocLen = 8 } + pageCode := cdb[2] & 0x3f - data := make([]byte, 8) - // Mode data length = 6 (8-byte header minus the 2-byte length field) - binary.BigEndian.PutUint16(data[0:2], 6) + // Build mode page data based on requested page. + pages := h.buildModePages(pageCode) + + // MODE SENSE(10) header = 8 bytes + pages + data := make([]byte, 8+len(pages)) + binary.BigEndian.PutUint16(data[0:2], uint16(6+len(pages))) // Mode data length data[2] = 0x00 // Medium type: default data[3] = 0x00 // Device-specific parameter (no write protect) data[4] = 0x00 // Reserved (LONGLBA=0) data[5] = 0x00 // Reserved - // Block descriptor length = 0 (bytes 6-7) - binary.BigEndian.PutUint16(data[6:8], 0) + binary.BigEndian.PutUint16(data[6:8], 0) // Block descriptor length = 0 + copy(data[8:], pages) if int(allocLen) < len(data) { data = data[:allocLen] @@ -394,6 +481,42 @@ func (h *SCSIHandler) modeSense10(cdb [16]byte) SCSIResult { return SCSIResult{Status: SCSIStatusGood, Data: data} } +// buildModePages returns the mode page data for the requested page code. +// Page 0x3F = return all pages. +func (h *SCSIHandler) buildModePages(pageCode uint8) []byte { + switch pageCode { + case 0x08: // Caching mode page + return h.modePage08() + case 0x0a: // Control mode page + return h.modePage0A() + case 0x3f: // All pages + var pages []byte + pages = append(pages, h.modePage08()...) + pages = append(pages, h.modePage0A()...) + return pages + default: + return nil + } +} + +// modePage08 returns the Caching mode page (SBC-4, Section 7.5.5). +// Tells Windows we support write caching. +func (h *SCSIHandler) modePage08() []byte { + page := make([]byte, 20) + page[0] = 0x08 // Page code + page[1] = 18 // Page length (20 - 2) + page[2] = 0x04 // WCE=1 (Write Cache Enable), RCD=0 (Read Cache Disable) + return page +} + +// modePage0A returns the Control mode page (SPC-5, Section 8.4.8). +func (h *SCSIHandler) modePage0A() []byte { + page := make([]byte, 12) + page[0] = 0x0a // Page code + page[1] = 10 // Page length (12 - 2) + return page +} + func (h *SCSIHandler) reportLuns(cdb [16]byte) SCSIResult { allocLen := binary.BigEndian.Uint32(cdb[6:10]) if allocLen < 16 { diff --git a/weed/storage/blockvol/iscsi/scsi_test.go b/weed/storage/blockvol/iscsi/scsi_test.go index e6697bf91..655763f09 100644 --- a/weed/storage/blockvol/iscsi/scsi_test.go +++ b/weed/storage/blockvol/iscsi/scsi_test.go @@ -311,7 +311,7 @@ func testReadCapacity16(t *testing.T) { dev := newMockDevice(3 * 1024 * 1024 * 1024 * 1024) // 3 TB h := NewSCSIHandler(dev) var cdb [16]byte - cdb[0] = ScsiReadCapacity16 + cdb[0] = ScsiServiceActionIn16 cdb[1] = ScsiSAReadCapacity16 binary.BigEndian.PutUint32(cdb[10:14], 32) r := h.HandleCommand(cdb, nil) @@ -329,7 +329,7 @@ func testReadCapacity16LBPME(t *testing.T) { dev := newMockDevice(100 * 4096) h := NewSCSIHandler(dev) var cdb [16]byte - cdb[0] = ScsiReadCapacity16 + cdb[0] = ScsiServiceActionIn16 cdb[1] = ScsiSAReadCapacity16 binary.BigEndian.PutUint32(cdb[10:14], 32) r := h.HandleCommand(cdb, nil) @@ -925,7 +925,7 @@ func testReadCapacity16InvalidSA(t *testing.T) { dev := newMockDevice(100 * 4096) h := NewSCSIHandler(dev) var cdb [16]byte - cdb[0] = ScsiReadCapacity16 + cdb[0] = ScsiServiceActionIn16 cdb[1] = 0x05 // wrong service action r := h.HandleCommand(cdb, nil) if r.Status != SCSIStatusCheckCond { diff --git a/weed/storage/blockvol/iscsi/session.go b/weed/storage/blockvol/iscsi/session.go index d604700e7..fc12b6927 100644 --- a/weed/storage/blockvol/iscsi/session.go +++ b/weed/storage/blockvol/iscsi/session.go @@ -269,6 +269,10 @@ func (s *Session) close() { func (s *Session) dispatch(pdu *PDU) error { op := pdu.Opcode() + if op != OpSCSICmd && op != OpSCSIDataOut { + s.logger.Printf("dispatch: opcode=%s(0x%02x)", OpcodeName(op), op) + } + switch op { case OpLoginReq: return s.handleLogin(pdu) @@ -337,6 +341,8 @@ func (s *Session) handleText(pdu *PDU) error { targets = lister.ListTargets() } + s.logger.Printf("text request: params=%q targets=%d", string(pdu.DataSegment), len(targets)) + resp := HandleTextRequest(pdu, targets) // ExpCmdSN/MaxCmdSN are set by txLoop via pduNeedsStatSN. s.enqueue(resp) @@ -356,6 +362,9 @@ func (s *Session) handleSCSICmd(pdu *PDU) error { itt := pdu.InitiatorTaskTag() flags := pdu.OpSpecific1() + s.logger.Printf("SCSI CDB: opcode=0x%02x cdb=%x itt=0x%08x flags=0x%02x edtl=%d", + cdb[0], cdb[:], itt, flags, pdu.ExpectedDataTransferLength()) + // CmdSN validation for non-immediate commands if !pdu.Immediate() { cmdSN := pdu.CmdSN() @@ -400,7 +409,7 @@ func (s *Session) handleSCSICmd(pdu *PDU) error { if isRead && result.Status == SCSIStatusGood && len(result.Data) > 0 { // Build Data-In PDUs and enqueue them all. - pdus := s.dataInWriter.BuildDataInPDUs(result.Data, itt, s.expCmdSN.Load(), s.maxCmdSN.Load()) + pdus := s.dataInWriter.BuildDataInPDUs(result.Data, itt, expectedLen, s.expCmdSN.Load(), s.maxCmdSN.Load()) for _, p := range pdus { s.enqueue(p) } diff --git a/weed/storage/blockvol/qa_phase4a_cp4b4_test.go b/weed/storage/blockvol/qa_phase4a_cp4b4_test.go new file mode 100644 index 000000000..fdabc193f --- /dev/null +++ b/weed/storage/blockvol/qa_phase4a_cp4b4_test.go @@ -0,0 +1,755 @@ +package blockvol + +import ( + "fmt" + "net" + "path/filepath" + "sync" + "testing" + "time" +) + +// TestQAPhase4ACP4b4 tests Phase 4A CP4b-4 adversarial scenarios: +// partial channel failures, protocol edge cases, rebuild scenarios, +// edge interactions, and error injection. +func TestQAPhase4ACP4b4(t *testing.T) { + tests := []struct { + name string + run func(t *testing.T) + }{ + // Group A: Partial Channel Failures + {name: "barrier_stale_lsn_after_gap", run: testQA4b4BarrierStaleLSNAfterGap}, + {name: "ship_data_ok_ctrl_down", run: testQA4b4ShipDataOkCtrlDown}, + {name: "ship_ctrl_ok_data_down", run: testQA4b4ShipCtrlOkDataDown}, + + // Group B: Protocol Edge Cases + {name: "ship_large_entry_near_max", run: testQA4b4ShipLargeEntryNearMax}, + {name: "ship_entries_across_wal_wrap", run: testQA4b4ShipEntriesAcrossWALWrap}, + + // Group C: Rebuild Scenarios + {name: "rebuild_catchup_concurrent_writes", run: testQA4b4RebuildCatchupConcurrentWrites}, + {name: "rebuild_full_extent_midcopy_writes", run: testQA4b4RebuildFullExtentMidcopyWrites}, + {name: "rebuild_interrupted_retry", run: testQA4b4RebuildInterruptedRetry}, + + // Group D: Edge Interactions + {name: "read_during_wal_reclaim", run: testQA4b4ReadDuringWALReclaim}, + {name: "trim_blocks_survive_rebuild", run: testQA4b4TrimBlocksSurviveRebuild}, + {name: "demote_while_wal_full", run: testQA4b4DemoteWhileWALFull}, + + // Group E: Error Injection + {name: "rebuild_server_epoch_mismatch", run: testQA4b4RebuildServerEpochMismatch}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.run(t) + }) + } +} + +// ========== Group A: Partial Channel Failures ========== + +// A1: Ship LSN 1,2,4 (skip 3) to replica receiver, then barrier(4). +// Barrier must time out because receiver blocks waiting for contiguous LSN 3. +func testQA4b4BarrierStaleLSNAfterGap(t *testing.T) { + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.barrierTimeout = 200 * time.Millisecond // fast timeout for test + recv.Serve() + defer recv.Stop() + + dataConn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatalf("dial data: %v", err) + } + defer dataConn.Close() + + // Send LSN 1, 2, 4 (skip 3). + for _, lsn := range []uint64{1, 2, 4} { + entry := &WALEntry{LSN: lsn, Epoch: 1, Type: EntryTypeWrite, LBA: lsn - 1, Length: 4096, Data: makeBlock(byte('A' + lsn))} + encoded, _ := entry.Encode() + if err := WriteFrame(dataConn, MsgWALEntry, encoded); err != nil { + t.Fatalf("send LSN=%d: %v", lsn, err) + } + } + time.Sleep(50 * time.Millisecond) + + // receivedLSN should be 2 (gap at 3 blocks further progress). + if got := recv.ReceivedLSN(); got != 2 { + t.Fatalf("ReceivedLSN = %d after gap, want 2", got) + } + + // Barrier(4) must time out since LSN 3 is missing. + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatalf("dial ctrl: %v", err) + } + defer ctrlConn.Close() + + req := EncodeBarrierRequest(BarrierRequest{LSN: 4, Epoch: 1}) + if err := WriteFrame(ctrlConn, MsgBarrierReq, req); err != nil { + t.Fatalf("send barrier: %v", err) + } + + ctrlConn.SetReadDeadline(time.Now().Add(3 * time.Second)) + _, payload, err := ReadFrame(ctrlConn) + if err != nil { + t.Fatalf("read barrier response: %v", err) + } + if payload[0] == BarrierOK { + t.Error("barrier returned OK despite gap at LSN 3, expected timeout") + } + // receivedLSN should still be 2. + if got := recv.ReceivedLSN(); got != 2 { + t.Errorf("ReceivedLSN = %d after barrier timeout, want 2", got) + } +} + +// A2: Data channel works, ctrl listener closed before Barrier. +// Barrier must return error. Data entries still applied. +func testQA4b4ShipDataOkCtrlDown(t *testing.T) { + primary, replica := createReplicaPair(t) + defer primary.Close() + defer replica.Close() + + // Write data through primary -> shipped to replica via data channel. + for i := 0; i < 5; i++ { + if err := primary.WriteLBA(uint64(i), makeBlock(byte('A'+i))); err != nil { + t.Fatalf("WriteLBA(%d): %v", i, err) + } + } + time.Sleep(50 * time.Millisecond) + + // Verify data entries were applied to replica. + replicaLSN := replica.replRecv.ReceivedLSN() + if replicaLSN < 5 { + t.Fatalf("replica receivedLSN = %d, want >= 5", replicaLSN) + } + + // Kill ctrl listener to make barrier fail. + replica.replRecv.ctrlListener.Close() + + // Force primary shipper to reconnect ctrl. + primary.shipper.ctrlMu.Lock() + if primary.shipper.ctrlConn != nil { + primary.shipper.ctrlConn.Close() + primary.shipper.ctrlConn = nil + } + primary.shipper.ctrlMu.Unlock() + + // Barrier must fail (ctrl down). + err := primary.shipper.Barrier(replicaLSN) + if err == nil { + t.Error("Barrier should fail when ctrl listener is closed") + } + + // Data entries should still be applied (receivedLSN unchanged or higher). + if got := replica.replRecv.ReceivedLSN(); got < replicaLSN { + t.Errorf("receivedLSN regressed from %d to %d after ctrl failure", replicaLSN, got) + } +} + +// A3: Ctrl is up, data listener closed. Ship must fail, barrier may timeout. +func testQA4b4ShipCtrlOkDataDown(t *testing.T) { + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.barrierTimeout = 200 * time.Millisecond + recv.Serve() + defer recv.Stop() + + ctrlAddr := recv.CtrlAddr() + + // Close data listener -- data conn will fail. + recv.dataListener.Close() + + // Create shipper pointed at closed data port, working ctrl port. + shipper := NewWALShipper(recv.DataAddr(), ctrlAddr, func() uint64 { return 1 }) + defer shipper.Stop() + + // Ship should fail (can't connect data), shipper degrades. + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('X')} + _ = shipper.Ship(entry) + + // receivedLSN should stay 0. + if got := recv.ReceivedLSN(); got != 0 { + t.Errorf("ReceivedLSN = %d, want 0 (data channel was down)", got) + } +} + +// ========== Group B: Protocol Edge Cases ========== + +// B1: Ship a large WAL entry through the frame codec and verify it round-trips. +// Uses WriteFrame/ReadFrame directly to test near-max frame payload. +// Then ships a multi-block entry through a real replica receiver. +func testQA4b4ShipLargeEntryNearMax(t *testing.T) { + // Part 1: Frame codec round-trip with a large payload (1MB). + const largeSize = 1 * 1024 * 1024 + largePayload := make([]byte, largeSize) + for i := range largePayload { + largePayload[i] = byte('L') + } + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + writeDone := make(chan error, 1) + go func() { + writeDone <- WriteFrame(client, MsgWALEntry, largePayload) + }() + + msgType, payload, err := ReadFrame(server) + if err != nil { + t.Fatalf("ReadFrame large: %v", err) + } + if msgType != MsgWALEntry { + t.Errorf("msgType = 0x%02x, want MsgWALEntry", msgType) + } + if len(payload) != largeSize { + t.Errorf("payload len = %d, want %d", len(payload), largeSize) + } + if payload[0] != 'L' || payload[largeSize-1] != 'L' { + t.Error("payload data corrupted") + } + + if err := <-writeDone; err != nil { + t.Fatalf("WriteFrame: %v", err) + } + + // Part 2: Ship a multi-block entry (8 blocks = 32KB) through a real receiver. + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 512 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + defer recv.Stop() + + dataConn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatalf("dial data: %v", err) + } + defer dataConn.Close() + + // 8 blocks = 32KB data. + const blockCount = 8 + bigData := make([]byte, blockCount*4096) + for i := range bigData { + bigData[i] = byte('B') + } + + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: uint32(len(bigData)), Data: bigData} + encoded, err := entry.Encode() + if err != nil { + t.Fatalf("Encode: %v", err) + } + + if err := WriteFrame(dataConn, MsgWALEntry, encoded); err != nil { + t.Fatalf("WriteFrame: %v", err) + } + + time.Sleep(100 * time.Millisecond) + + if got := recv.ReceivedLSN(); got != 1 { + t.Errorf("ReceivedLSN = %d, want 1 (multi-block entry should be applied)", got) + } +} + +// B2: Write enough entries to force WAL wrap, ship all, verify replica has data. +func testQA4b4ShipEntriesAcrossWALWrap(t *testing.T) { + primary, replica := createReplicaPair(t) + defer primary.Close() + defer replica.Close() + + // createReplicaPair uses WALSize=512KB, VolumeSize=1MB. + // Each write is 4KB WAL entry + header. Fill enough to wrap. + // ~512KB WAL / ~4KB per entry ≈ 128 entries before wrap, but flusher + // reclaims space so we can write more. + const numWrites = 50 + for i := 0; i < numWrites; i++ { + if err := primary.WriteLBA(uint64(i%256), makeBlock(byte('W'+i%26))); err != nil { + // WAL full is acceptable if flusher can't keep up. + t.Logf("WriteLBA(%d): %v (expected if WAL fills)", i, err) + break + } + } + + // SyncCache to push barrier. + if err := primary.SyncCache(); err != nil { + // Degraded replica is ok -- we test that entries shipped correctly. + t.Logf("SyncCache: %v (may degrade)", err) + } + + time.Sleep(100 * time.Millisecond) + + // Replica should have received entries. + replicaLSN := replica.replRecv.ReceivedLSN() + if replicaLSN == 0 { + t.Error("replica receivedLSN = 0, expected > 0 after writes across WAL") + } + + // Verify first few LBAs on replica. + for i := 0; i < 5 && uint64(i) <= replicaLSN; i++ { + data, err := replica.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("replica ReadLBA(%d): %v", i, err) + } + if data[0] == 0 { + t.Errorf("replica LBA %d is zero, expected non-zero data", i) + } + } +} + +// ========== Group C: Rebuild Scenarios ========== + +// C1: Start rebuild while primary receives concurrent writes. +func testQA4b4RebuildCatchupConcurrentWrites(t *testing.T) { + primary := cp3Primary(t, "rb_cc_pri.bv", 1) + defer primary.Close() + + // Write initial data. + for i := 0; i < 5; i++ { + primary.WriteLBA(uint64(i), makeBlock(byte('A'+i))) + } + + primary.StartRebuildServer("127.0.0.1:0") + defer primary.StopRebuildServer() + + // Start concurrent writes on primary while rebuild happens. + var wg sync.WaitGroup + stopWriting := make(chan struct{}) + wg.Add(1) + go func() { + defer wg.Done() + lba := uint64(10) + for { + select { + case <-stopWriting: + return + default: + } + primary.WriteLBA(lba, makeBlock(byte('C'))) + lba++ + if lba > 14 { + lba = 10 + } + time.Sleep(1 * time.Millisecond) + } + }() + + // Start rebuild client. + replica := cp3Rebuilding(t, "rb_cc_rep.bv", 1) + defer replica.Close() + + err := StartRebuild(replica, primary.rebuildServer.Addr(), 1, 1) + close(stopWriting) + wg.Wait() + + if err != nil { + t.Fatalf("StartRebuild: %v", err) + } + if replica.Role() != RoleReplica { + t.Errorf("role = %s, want Replica", replica.Role()) + } + + // Verify initial data on replica. + for i := 0; i < 5; i++ { + data, err := replica.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("ReadLBA(%d): %v", i, err) + } + if data[0] != byte('A'+i) { + t.Errorf("LBA %d: got %c, want %c", i, data[0], byte('A'+i)) + } + } +} + +// C2: Write initial data, start full extent rebuild, concurrent writes during copy. +func testQA4b4RebuildFullExtentMidcopyWrites(t *testing.T) { + primary := cp3Primary(t, "rb_fe_pri.bv", 1) + defer primary.Close() + + // Write and flush to make data in extent. + for i := 0; i < 5; i++ { + primary.WriteLBA(uint64(i), makeBlock(byte('D'+i))) + } + primary.SyncCache() + time.Sleep(30 * time.Millisecond) // let flusher move to extent + + primary.StartRebuildServer("127.0.0.1:0") + defer primary.StopRebuildServer() + + // Start concurrent writes during rebuild. + var wg sync.WaitGroup + stopWriting := make(chan struct{}) + wg.Add(1) + go func() { + defer wg.Done() + lba := uint64(10) + for { + select { + case <-stopWriting: + return + default: + } + primary.WriteLBA(lba, makeBlock(byte('M'))) + lba++ + if lba > 14 { + lba = 10 + } + time.Sleep(1 * time.Millisecond) + } + }() + + // Force full extent rebuild (fromLSN=0 below checkpoint). + replica := cp3Rebuilding(t, "rb_fe_rep.bv", 1) + defer replica.Close() + + err := StartRebuild(replica, primary.rebuildServer.Addr(), 0, 1) + close(stopWriting) + wg.Wait() + + if err != nil { + t.Fatalf("StartRebuild: %v", err) + } + if replica.Role() != RoleReplica { + t.Errorf("role = %s, want Replica", replica.Role()) + } + + // Verify flushed data on replica (from extent). + for i := 0; i < 5; i++ { + data, err := replica.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("ReadLBA(%d): %v", i, err) + } + if data[0] != byte('D'+i) { + t.Errorf("LBA %d: got %c, want %c", i, data[0], byte('D'+i)) + } + } +} + +// C3: Start rebuild, read half the entries, close connection, retry from start. +func testQA4b4RebuildInterruptedRetry(t *testing.T) { + primary := cp3Primary(t, "rb_int_pri.bv", 1) + defer primary.Close() + + for i := 0; i < 10; i++ { + primary.WriteLBA(uint64(i), makeBlock(byte('R'+i%26))) + } + + primary.StartRebuildServer("127.0.0.1:0") + defer primary.StopRebuildServer() + + // First attempt: connect, read a few entries, then close. + conn1, err := net.Dial("tcp", primary.rebuildServer.Addr()) + if err != nil { + t.Fatalf("dial 1: %v", err) + } + req := EncodeRebuildRequest(RebuildRequest{Type: RebuildWALCatchUp, FromLSN: 1, Epoch: 1}) + WriteFrame(conn1, MsgRebuildReq, req) + + // Read a few entries then abort. + for i := 0; i < 3; i++ { + _, _, err := ReadFrame(conn1) + if err != nil { + t.Fatalf("read entry %d: %v", i, err) + } + } + conn1.Close() // simulate crash + + // Second attempt: full rebuild should succeed. + replica := cp3Rebuilding(t, "rb_int_rep.bv", 1) + defer replica.Close() + + if err := StartRebuild(replica, primary.rebuildServer.Addr(), 1, 1); err != nil { + t.Fatalf("StartRebuild (retry): %v", err) + } + if replica.Role() != RoleReplica { + t.Errorf("role = %s, want Replica", replica.Role()) + } + + // Verify all data. + for i := 0; i < 10; i++ { + data, err := replica.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("ReadLBA(%d): %v", i, err) + } + expected := byte('R' + i%26) + if data[0] != expected { + t.Errorf("LBA %d: got %c, want %c", i, data[0], expected) + } + } +} + +// ========== Group D: Edge Interactions ========== + +// D1: Write blocks, flush to reclaim WAL, concurrent ReadLBA on flushed blocks. +func testQA4b4ReadDuringWALReclaim(t *testing.T) { + vol := cp3Primary(t, "read_reclaim.bv", 1) + defer vol.Close() + + // Write data. + for i := 0; i < 10; i++ { + if err := vol.WriteLBA(uint64(i), makeBlock(byte('R'+i%26))); err != nil { + t.Fatalf("WriteLBA(%d): %v", i, err) + } + } + + // SyncCache + wait for flusher to reclaim WAL. + vol.SyncCache() + time.Sleep(50 * time.Millisecond) + + // Concurrent reads on recently-flushed blocks. + var wg sync.WaitGroup + errs := make([]error, 10) + for i := 0; i < 10; i++ { + wg.Add(1) + go func(lba int) { + defer wg.Done() + data, err := vol.ReadLBA(uint64(lba), 4096) + if err != nil { + errs[lba] = err + return + } + expected := byte('R' + lba%26) + if data[0] != expected { + errs[lba] = fmt.Errorf("LBA %d: got %c, want %c", lba, data[0], expected) + } + }(i) + } + wg.Wait() + + for i, err := range errs { + if err != nil { + t.Errorf("read LBA %d: %v", i, err) + } + } +} + +// D2: Write blocks, TRIM some, rebuild replica, verify trimmed blocks read as zeros. +func testQA4b4TrimBlocksSurviveRebuild(t *testing.T) { + primary := cp3Primary(t, "trim_rb_pri.bv", 1) + defer primary.Close() + + // Write 10 blocks. + for i := 0; i < 10; i++ { + primary.WriteLBA(uint64(i), makeBlock(byte('T'+i%26))) + } + + // TRIM blocks 3-5. + for i := 3; i <= 5; i++ { + if err := primary.Trim(uint64(i), 4096); err != nil { + t.Fatalf("Trim(%d): %v", i, err) + } + } + + primary.StartRebuildServer("127.0.0.1:0") + defer primary.StopRebuildServer() + + replica := cp3Rebuilding(t, "trim_rb_rep.bv", 1) + defer replica.Close() + + if err := StartRebuild(replica, primary.rebuildServer.Addr(), 1, 1); err != nil { + t.Fatalf("StartRebuild: %v", err) + } + + // Trimmed blocks should read as zeros. + for i := 3; i <= 5; i++ { + data, err := replica.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("ReadLBA(%d): %v", i, err) + } + nonZero := false + for _, b := range data { + if b != 0 { + nonZero = true + break + } + } + if nonZero { + t.Errorf("LBA %d should be zero after TRIM+rebuild, got non-zero data", i) + } + } + + // Non-trimmed blocks should still have data. + data0, _ := replica.ReadLBA(0, 4096) + if data0[0] != 'T' { + t.Errorf("LBA 0: got %c, want 'T'", data0[0]) + } +} + +// D3: Fill WAL, start WriteLBA (blocks in WAL-full retry), concurrently demote. +func testQA4b4DemoteWhileWALFull(t *testing.T) { + // Create volume with tiny WAL. + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 100 * time.Millisecond // slow flusher + cfg.WALFullTimeout = 2 * time.Second + + vol, err := CreateBlockVol(filepath.Join(dir, "demote_walful.bv"), CreateOptions{ + VolumeSize: 64 * 1024, + BlockSize: 4096, + WALSize: 16 * 1024, // tiny WAL: ~4 entries + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + + if err := vol.HandleAssignment(1, RolePrimary, 30*time.Second); err != nil { + t.Fatalf("assign primary: %v", err) + } + vol.drainTimeout = 3 * time.Second + + // Fill WAL. + writeCount := 0 + for i := 0; i < 20; i++ { + if err := vol.WriteLBA(uint64(i%16), makeBlock(byte('F'))); err != nil { + break + } + writeCount++ + } + if writeCount < 2 { + t.Fatalf("only wrote %d entries before WAL full", writeCount) + } + + // Start a WriteLBA that will likely block in WAL-full retry. + writeDone := make(chan error, 1) + go func() { + writeDone <- vol.WriteLBA(0, makeBlock('Z')) + }() + + // Give the write a moment to start retrying. + time.Sleep(10 * time.Millisecond) + + // Concurrently demote. + demoteDone := make(chan error, 1) + go func() { + demoteDone <- vol.HandleAssignment(2, RoleStale, 0) + }() + + // Both should complete within timeout (no deadlock). + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + select { + case err := <-demoteDone: + // Demote may succeed or fail with drain timeout -- both OK. + t.Logf("demote result: %v", err) + case <-timer.C: + t.Fatal("demote hung for 5s -- deadlock between WAL-full retry and demote drain") + } + + select { + case err := <-writeDone: + // Write may fail (demoted, closed, WAL full timeout) -- all acceptable. + t.Logf("write result: %v", err) + case <-timer.C: + t.Fatal("write hung for 5s after demote") + } +} + +// ========== Group E: Error Injection ========== + +// E1: Start rebuild server on primary (epoch=1), client sends epoch=2. +// Server should return EPOCH_MISMATCH. +func testQA4b4RebuildServerEpochMismatch(t *testing.T) { + vol := cp3Primary(t, "rbs_epm.bv", 1) + defer vol.Close() + + vol.StartRebuildServer("127.0.0.1:0") + defer vol.StopRebuildServer() + + // Client sends mismatched epoch=2. + conn, err := net.Dial("tcp", vol.rebuildServer.Addr()) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer conn.Close() + + req := EncodeRebuildRequest(RebuildRequest{Type: RebuildWALCatchUp, FromLSN: 1, Epoch: 2}) + if err := WriteFrame(conn, MsgRebuildReq, req); err != nil { + t.Fatalf("send request: %v", err) + } + + conn.SetReadDeadline(time.Now().Add(3 * time.Second)) + msgType, payload, err := ReadFrame(conn) + if err != nil { + t.Fatalf("read response: %v", err) + } + if msgType != MsgRebuildError { + t.Errorf("msgType = 0x%02x, want MsgRebuildError(0x%02x)", msgType, MsgRebuildError) + } + if string(payload) != "EPOCH_MISMATCH" { + t.Errorf("payload = %q, want EPOCH_MISMATCH", string(payload)) + } + + // Also test epoch=0 (lower than server's epoch=1). + conn2, err := net.Dial("tcp", vol.rebuildServer.Addr()) + if err != nil { + t.Fatalf("dial 2: %v", err) + } + defer conn2.Close() + + req2 := EncodeRebuildRequest(RebuildRequest{Type: RebuildFullExtent, Epoch: 0}) + if err := WriteFrame(conn2, MsgRebuildReq, req2); err != nil { + t.Fatalf("send request 2: %v", err) + } + + conn2.SetReadDeadline(time.Now().Add(3 * time.Second)) + msgType2, payload2, err := ReadFrame(conn2) + if err != nil { + t.Fatalf("read response 2: %v", err) + } + if msgType2 != MsgRebuildError || string(payload2) != "EPOCH_MISMATCH" { + t.Errorf("epoch=0: type=0x%02x payload=%q, want EPOCH_MISMATCH", msgType2, payload2) + } +} + diff --git a/weed/storage/blockvol/test/ha_test.go b/weed/storage/blockvol/test/ha_test.go index 5ce0e0044..4f4d54224 100644 --- a/weed/storage/blockvol/test/ha_test.go +++ b/weed/storage/blockvol/test/ha_test.go @@ -24,10 +24,10 @@ const ( haISCSIPort2 = 3261 // replica iSCSI (used after promotion) haAdminPort1 = 8080 // primary admin haAdminPort2 = 8081 // replica admin - haReplData1 = 9001 // replica receiver data (on replica node) - haReplCtrl1 = 9002 // replica receiver ctrl (on replica node) - haRebuildPort1 = 9003 // rebuild server (primary) - haRebuildPort2 = 9004 // rebuild server (replica, after promotion) + haReplData1 = 9011 // replica receiver data (on replica node) + haReplCtrl1 = 9012 // replica receiver ctrl (on replica node) + haRebuildPort1 = 9013 // rebuild server (primary) + haRebuildPort2 = 9014 // rebuild server (replica, after promotion) ) // newHAPair creates a primary HATarget on targetNode and a replica HATarget