From 643a5a10742d716354277d6fa26485ec0c3bc70b Mon Sep 17 00:00:00 2001 From: pingqiu Date: Thu, 2 Apr 2026 16:20:22 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=2012=20P3+P4=20=E2=80=94=20diagno?= =?UTF-8?q?sability=20surfaces,=20perf=20floor,=20rollout=20gates?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P3: Add explicit bounded read-only diagnosis surfaces for all symptom classes: - FailoverDiagnostic: volume-oriented failover state with per-volume DeferredPromotion/PendingRebuild entries and proper timer lifecycle - PublicationDiagnostic: two-read coherence check (LookupBlockVolume vs registry authority) with computed Coherent verdict - RecoveryDiagnostic: minimal ActiveTasks surface (Path A) - Blocker ledger: 3 diagnosed + 3 unresolved, finite, from actual file - Runbook references only exposed surfaces, no internal state P4: Add bounded performance floor + rollout-gate package: - Engine-local floor measurement with explicit IOPS gates per workload - Cost characterization: WAL 2x write amp, -56% replication tax - Rollout gates with semantic cross-checks against cited evidence (baseline numbers, transport/network matrix, blocker counts) - Launch envelope tightened to actually measured combinations only Co-Authored-By: Claude Opus 4.6 (1M context) --- .../.private/phase/phase-12-p3-blockers.md | 29 + .../.private/phase/phase-12-p3-runbook.md | 70 +++ sw-block/.private/phase/phase-12-p4-floor.md | 101 +++ .../phase/phase-12-p4-rollout-gates.md | 64 ++ weed/server/block_recovery.go | 374 +++++++++++ weed/server/master_block_failover.go | 192 +++++- weed/server/qa_block_diagnosability_test.go | 270 ++++++++ weed/server/qa_block_perf_test.go | 582 ++++++++++++++++++ weed/server/qa_block_soak_test.go | 308 +++++++++ 9 files changed, 1978 insertions(+), 12 deletions(-) create mode 100644 sw-block/.private/phase/phase-12-p3-blockers.md create mode 100644 sw-block/.private/phase/phase-12-p3-runbook.md create mode 100644 sw-block/.private/phase/phase-12-p4-floor.md create mode 100644 sw-block/.private/phase/phase-12-p4-rollout-gates.md create mode 100644 weed/server/block_recovery.go create mode 100644 weed/server/qa_block_diagnosability_test.go create mode 100644 weed/server/qa_block_perf_test.go create mode 100644 weed/server/qa_block_soak_test.go diff --git a/sw-block/.private/phase/phase-12-p3-blockers.md b/sw-block/.private/phase/phase-12-p3-blockers.md new file mode 100644 index 000000000..97cd1864a --- /dev/null +++ b/sw-block/.private/phase/phase-12-p3-blockers.md @@ -0,0 +1,29 @@ +# Phase 12 P3 — Blocker Ledger + +Date: 2026-04-02 +Scope: bounded diagnosability / blocker accounting for the accepted RF=2 sync_all chosen path + +## Diagnosed and Bounded + +| ID | Symptom | Evidence Surface | Owning Truth | Status | +|----|---------|-----------------|--------------|--------| +| B1 | Failover does not converge | failover logs + registry Lookup epoch/primary | registry authority | Diagnosed: convergence depends on lease expiry + heartbeat cycle; bounded by lease TTL | +| B2 | Lookup publication stale after failover | LookupBlockVolume response vs registry entry | registry ISCSIAddr/VolumeServer | Diagnosed: publication updates on failover assignment delivery; bounded by assignment queue delivery | +| B3 | Recovery tasks remain after volume delete | RecoveryManager.DiagnosticSnapshot | RecoveryManager task map | Diagnosed: tasks drain on shutdown/cancel; bounded by RecoveryManager lifecycle | + +## Unresolved but Explicit + +| ID | Symptom | Current Evidence | Why Unresolved | Blocks P4/Rollout? | +|----|---------|-----------------|----------------|-------------------| +| U1 | V2 engine accepts stale-epoch assignments at orchestrator level | V2 idempotence check skips only same-epoch; lower epoch creates new sender | Engine ApplyAssignment does not check epoch monotonicity on Reconcile | No — V1 HandleAssignment rejects epoch regression; V2 is secondary | +| U2 | Single-process test cannot exercise Primary→Rebuilding role transition | HandleAssignment rejects transition in shared store | Test harness limitation, not production bug | No — production VS has separate stores | +| U3 | gRPC stream transport not exercised in control-loop tests | All logic above/below stream is real; stream itself bypassed | Would require live master+VS gRPC servers in test | Blocks full integration test, not correctness | + +## Out of Scope for P3 + +- Performance floor characterization +- Rollout-gate criteria +- Hours/days soak +- RF>2 topology +- NVMe runtime transport proof +- CSI snapshot/expand diff --git a/sw-block/.private/phase/phase-12-p3-runbook.md b/sw-block/.private/phase/phase-12-p3-runbook.md new file mode 100644 index 000000000..a27c6d2e9 --- /dev/null +++ b/sw-block/.private/phase/phase-12-p3-runbook.md @@ -0,0 +1,70 @@ +# Phase 12 P3 — Bounded Runbook + +Scope: diagnosis of three symptom classes on the accepted RF=2 sync_all chosen path. + +All diagnosis steps reference ONLY explicit bounded read-only surfaces: +- `LookupBlockVolume` — gRPC RPC returning current primary VS + iSCSI address +- `FailoverDiagnostic` — volume-oriented failover state snapshot +- `PublicationDiagnostic` — lookup vs authority coherence snapshot +- `RecoveryDiagnostic` — active recovery task set snapshot +- Blocker ledger — finite file at `phase-12-p3-blockers.md` + +## S1: Failover/Recovery Convergence Stall + +**Visible symptom:** Volume remains unavailable after a VS death; lookup still returns the old primary. + +**Diagnosis surfaces:** +- `LookupBlockVolume(volumeName)` — check if `VolumeServer` is still the dead server +- `FailoverDiagnostic` — check `Volumes[]` for the affected volume + +**Diagnosis steps:** +1. Call `LookupBlockVolume(volumeName)`. If `VolumeServer` changed from the dead server, failover succeeded. +2. If unchanged: read `FailoverDiagnostic`. Find the volume by name in `Volumes[]`. +3. If found with `DeferredPromotion=true`: lease-wait — failover is deferred until lease expires. +4. If found with `PendingRebuild=true`: failover completed, rebuild is pending for the dead server. +5. If `DeferredPromotionCount[deadServer] > 0` in the aggregate: deferred promotions are queued. +6. If the volume does not appear in either lookup change or `FailoverDiagnostic`: escalate. + +**Conclusion classes (from surfaces only):** +- **Lease-wait:** `FailoverDiagnostic.DeferredPromotionCount[deadServer] > 0` — normal, bounded by lease TTL. +- **Rebuild-pending:** `FailoverDiagnostic.Volumes[].PendingRebuild=true` — failover done, rebuild queued. +- **Converged:** `LookupBlockVolume` shows new primary, no failover entries — resolved. +- **Unresolved:** None of the above — escalate. + +## S2: Publication/Lookup Mismatch + +**Visible symptom:** `LookupBlockVolume` returns an iSCSI address or volume server that doesn't match expected state. + +**Diagnosis surfaces:** +- `LookupBlockVolume(volumeName)` — operator-visible publication +- `PublicationDiagnostic` — explicit coherence check (lookup vs authority) + +**Diagnosis steps:** +1. Call `PublicationDiagnosticFor(volumeName)`. Check `Coherent` field. +2. If `Coherent=true`: lookup matches registry authority — no mismatch. +3. If `Coherent=false`: read `Reason` for explanation. Compare `LookupVolumeServer` vs `AuthorityVolumeServer` and `LookupIscsiAddr` vs `AuthorityIscsiAddr`. +4. Cross-check with `LookupBlockVolume` directly: repeated lookups should be self-consistent. + +**Conclusion classes (from surfaces only):** +- **Coherent:** `PublicationDiagnostic.Coherent=true` — no mismatch. +- **Stale client:** Coherent but client sees old value — bounded by client re-query. +- **Unresolved:** `PublicationDiagnostic.Coherent=false` with no transient cause — escalate. + +## S3: Leftover Runtime Work After Convergence + +**Visible symptom:** After volume deletion or steady-state convergence, recovery tasks should have drained. + +**Diagnosis surfaces:** +- `RecoveryDiagnostic` — `ActiveTasks` list (replicaIDs with active recovery work) + +**Diagnosis steps:** +1. Call `RecoveryManager.DiagnosticSnapshot()`. Read `ActiveTasks`. +2. If `ActiveTasks` is empty: clean — no leftover work. +3. If non-empty: check whether any task replicaID contains the deleted volume's path. +4. If a deleted volume's replicaID is present in `ActiveTasks`: residue — escalate. +5. If all tasks are for live volumes: non-empty but expected — normal in-flight work. + +**Conclusion classes (from surfaces only):** +- **Clean:** `RecoveryDiagnostic.ActiveTasks` is empty — runtime converged. +- **Non-empty, no residue:** Tasks present but none for the deleted/converged volume — normal. +- **Residue:** Deleted volume's replicaID still in `ActiveTasks` — escalate. diff --git a/sw-block/.private/phase/phase-12-p4-floor.md b/sw-block/.private/phase/phase-12-p4-floor.md new file mode 100644 index 000000000..feb9f23d6 --- /dev/null +++ b/sw-block/.private/phase/phase-12-p4-floor.md @@ -0,0 +1,101 @@ +# Phase 12 P4 — Performance Floor Summary + +Date: 2026-04-02 +Scope: bounded performance floor for the accepted RF=2, sync_all chosen path. + +## Workload Envelope + +| Parameter | Value | +|-----------|-------| +| Topology | RF=2, sync_all | +| Operations | 4K random write, 4K random read, sequential write, sequential read | +| Runtime | Steady-state, no failover, no disturbance | +| Path | Accepted chosen path (same as P1/P2/P3) | + +## Environment + +### Unit Test Harness (engine-local) + +| Parameter | Value | +|-----------|-------| +| Name | `TestP12P4_PerformanceFloor_Bounded` | +| Location | `weed/server/qa_block_perf_test.go` | +| Platform | Single-process, local disk | +| Volume | 64MB, 4K blocks, 16MB WAL | +| Writer | Single-threaded (worst-case for group commit) | +| Replication | Not exercised (engine-local only) | +| Measurement | Worst of 3 iterations (floor, not peak) | + +### Production Baseline (cross-machine) + +| Parameter | Value | +|-----------|-------| +| Name | `baseline-roce-20260401` | +| Location | `learn/projects/sw-block/test/results/baseline-roce-20260401.md` | +| Hardware | m01 (10.0.0.1) - M02 (10.0.0.3), 25Gbps RoCE | +| Protocol | NVMe-TCP | +| Volume | 2GB, RF=2, sync_all, cross-machine replication | +| Writer | fio, QD1-128, j=4 | + +## Floor Table: Production (RF=2, sync_all, NVMe-TCP, 25Gbps RoCE) + +These are measured floor values from the production baseline, not the unit test. + +| Workload | Floor IOPS | Notes | +|----------|-----------|-------| +| 4K random write QD1 | 28,347 | Barrier round-trip limited (flat across QD) | +| 4K random write QD32 | 28,453 | Same barrier ceiling | +| 4K random read QD32 | 136,648 | No replication overhead | +| Mixed 70/30 QD32 | 28,423 | Write-side limited | + +Latency: Write latency is bounded by sync_all barrier round-trip (~35us at QD1). +Read latency: sub-microsecond for cached, single-digit microseconds for extent. + +## Floor Table: Engine-Local (unit test harness) + +These values are measured by `TestP12P4_PerformanceFloor_Bounded` on the dev machine. +They characterize the engine I/O floor WITHOUT transport or replication. +Actual values vary by hardware; the test produces them on each run. + +| Workload | Metric | Method | Gate | +|----------|--------|--------|------| +| 4K random write | Floor IOPS, Avg/P50/P99/Max latency | Worst of 3 iterations | >= 1,000 IOPS, P99 <= 100ms | +| 4K random read | Floor IOPS, Avg/P50/P99/Max latency | Worst of 3 iterations | >= 5,000 IOPS | +| 4K sequential write | Floor IOPS, Avg/P50/P99/Max latency | Worst of 3 iterations | >= 2,000 IOPS, P99 <= 100ms | +| 4K sequential read | Floor IOPS, Avg/P50/P99/Max latency | Worst of 3 iterations | >= 10,000 IOPS | + +Gate thresholds are regression gates enforced in code (`perfFloorGates` in `qa_block_perf_test.go`). +Set at ~10% of measured values to tolerate slow CI/VM hardware while catching catastrophic regressions. + +## Cost Summary + +| Cost | Value | Source | +|------|-------|--------| +| WAL write amplification | 2x minimum | Engine design: each write → WAL + eventual extent flush | +| Replication tax (RF=2 sync_all vs RF=1) | -56% | baseline-roce-20260401.md (NVMe-TCP, 25Gbps RoCE) | +| Replication tax (RF=2 sync_all vs RF=1, iSCSI 1Gbps) | -56% | baseline-roce-20260401.md | +| Degraded mode penalty (sync_all RF=2, one replica dead) | -66% | baseline-roce-20260401.md (barrier timeout) | +| Group commit | 1 fdatasync per batch | Amortizes sync cost across concurrent writers | + +## Acceptance Evidence + +| Item | Evidence | Type | +|------|----------|------| +| Floor gates pass | `perfFloorGates` thresholds enforced per workload | Acceptance | +| Workload runs repeatably | `TestP12P4_PerformanceFloor_Bounded` passes | Acceptance | +| Cost statement is bounded | `TestP12P4_CostCharacterization_Bounded` passes | Acceptance | +| Production baseline exists | `baseline-roce-20260401.md` with measured values | Acceptance | +| Floor is worst-of-N, not peak | Test takes minimum IOPS across 3 iterations | Method | +| Regression-safe | Test fails if floor drops below gate (blocks rollout) | Acceptance | +| Replication tax documented | -56% from measured production baseline | Support telemetry | + +## What P4 does NOT claim + +- This is not a claim that the measured floor is "good enough" for any specific application. +- This does not claim readiness for failover-under-load scenarios. +- This does not claim readiness for hours/days soak under load. +- This does not claim readiness for RF>2 topologies. +- This does not claim readiness for all transport combinations (iSCSI + NVMe + kernel versions). +- This does not claim readiness for production rollout beyond the explicitly named launch envelope. +- Engine-local floor numbers are not production floor numbers. +- The replication tax is measured on one specific hardware configuration and may differ on other hardware. diff --git a/sw-block/.private/phase/phase-12-p4-rollout-gates.md b/sw-block/.private/phase/phase-12-p4-rollout-gates.md new file mode 100644 index 000000000..643a9e50c --- /dev/null +++ b/sw-block/.private/phase/phase-12-p4-rollout-gates.md @@ -0,0 +1,64 @@ +# Phase 12 P4 — Rollout Gates + +Date: 2026-04-02 +Scope: bounded first-launch envelope for the accepted RF=2, sync_all chosen path. + +This is a bounded first-launch envelope, not general readiness. + +## Supported Launch Envelope + +Only the transport/network combinations with measured baselines are included. + +| Parameter | Value | +|-----------|-------| +| Topology | RF=2, sync_all | +| Transport + Network | NVMe-TCP @ 25Gbps RoCE (measured), iSCSI @ 25Gbps RoCE (measured), iSCSI @ 1Gbps (measured) | +| NOT included | NVMe-TCP @ 1Gbps (not measured) | +| Volume size | Up to 2GB (tested baseline) | +| Failover | Lease-based, bounded by TTL (30s default) | +| Recovery | Catch-up-first, rebuild fallback | +| Degraded mode | Documented -66% write penalty (sync_all RF=2, one replica dead) | + +## Cleared Gates + +| Gate | Evidence | Status | Notes | +|------|----------|--------|-------| +| G1 | P1 disturbance tests pass | Cleared | Restart/reconnect correctness under disturbance | +| G2 | P2 soak tests pass | Cleared | Repeated create/failover/recover cycles, no drift | +| G3 | P3 diagnosability tests pass | Cleared | Explicit bounded diagnosis surfaces for all symptom classes | +| G4 | P4 floor gates pass | Cleared | Explicit IOPS thresholds + P99 ceilings enforced per workload in code | +| G5 | P4 cost characterization bounded | Cleared | WAL 2x write amp, -56% replication tax documented | +| G6 | Production baseline exists | Cleared | baseline-roce-20260401.md: 28.4K write IOPS, 136.6K read IOPS | +| G8 | Floor gates are regression-safe | Cleared | Test fails if any workload drops below defined minimum IOPS or exceeds P99 ceiling | +| G7 | Blocker ledger finite | Cleared | 3 diagnosed (B1-B3) + 3 unresolved (U1-U3), all explicit | + +## Remaining Blockers / Exclusions + +| Exclusion | Why | Impact | +|-----------|-----|--------| +| E1 | Failover-under-load perf not measured | Cannot claim bounded perf during failover | +| E2 | Hours/days soak not run | Cannot claim long-run stability under sustained load | +| E3 | RF>2 not measured | Cannot claim perf floor for RF=3+ | +| E4 | Broad transport matrix not tested | Cannot claim parity across all kernel/NVMe/iSCSI versions | +| E5 | Degraded mode is severe (-66%) | sync_all RF=2 has sharp write cliff on replica death | +| E6 | V2 stale-epoch at orchestrator level (U1 from P3) | V1 guards suffice; V2 is secondary path | +| E7 | gRPC stream transport not exercised in unit tests (U3 from P3) | Blocks full integration test, not correctness | + +## Reject Conditions + +This launch envelope should be REJECTED if: + +1. Any P1/P2/P3 test regresses (correctness/stability/diagnosability gate violated) +2. Production baseline numbers are not reproducible on the target hardware +3. Degraded mode behavior (-66% cliff) is not acceptable for the deployment scenario +4. The deployment requires RF>2, failover-under-load guarantees, or long soak proof +5. The deployment requires transport combinations not covered by the baseline + +## What P4 does NOT claim + +- This does not claim general production readiness. +- This does not claim readiness for any deployment outside the named launch envelope. +- This does not claim that the performance floor is optimal or final. +- This does not claim that the degraded-mode penalty is acceptable (deployment-specific decision). +- This does not claim hours/days stability under sustained load. +- This is a bounded first-launch gate, not a broad rollout approval. diff --git a/weed/server/block_recovery.go b/weed/server/block_recovery.go new file mode 100644 index 000000000..8691d52a4 --- /dev/null +++ b/weed/server/block_recovery.go @@ -0,0 +1,374 @@ +package weed_server + +import ( + "context" + "sync" + + bridge "github.com/seaweedfs/seaweedfs/sw-block/bridge/blockvol" + engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/v2bridge" +) + +// recoveryTask tracks a live recovery goroutine for one replica target. +// The task pointer serves as identity token — only the goroutine that owns +// THIS pointer may mark it as done. +type recoveryTask struct { + replicaID string + cancel context.CancelFunc + done chan struct{} // closed when the goroutine exits +} + +// RecoveryManager owns live recovery execution for all replica targets. +// +// Ownership model: +// - At most one recovery goroutine per replicaID at any time. +// - On supersede/replace: the old goroutine is cancelled AND drained +// before the replacement starts. No overlap. +// - Cancellation: context cancel + session invalidation (for removal/shutdown). +// For supersede: context cancel only (engine already attached replacement session). +type RecoveryManager struct { + bs *BlockService + + mu sync.Mutex + tasks map[string]*recoveryTask + wg sync.WaitGroup + + // TestHook: if set, called before execution starts. Tests use this + // to hold the goroutine alive for serialized-replacement proofs. + OnBeforeExecute func(replicaID string) +} + +func NewRecoveryManager(bs *BlockService) *RecoveryManager { + return &RecoveryManager{ + bs: bs, + tasks: make(map[string]*recoveryTask), + } +} + +// HandleAssignmentResult processes the engine's assignment result. +// +// Engine result semantics: +// - SessionsCreated: new session, start goroutine +// - SessionsSuperseded: old replaced by new — cancel+drain old, start new +// - Removed: sender gone — cancel+drain, invalidate session +func (rm *RecoveryManager) HandleAssignmentResult(result engine.AssignmentResult, assignments []blockvol.BlockVolumeAssignment) { + // Removed: cancel + invalidate + drain. + for _, replicaID := range result.Removed { + rm.cancelAndDrain(replicaID, true) + } + + // Superseded: cancel + drain (no invalidate — engine has replacement session), + // then start new. + for _, replicaID := range result.SessionsSuperseded { + rm.cancelAndDrain(replicaID, false) + rm.startTask(replicaID, assignments) + } + + // Created: start new (cancel stale defensively). + for _, replicaID := range result.SessionsCreated { + rm.cancelAndDrain(replicaID, false) + rm.startTask(replicaID, assignments) + } +} + +// cancelAndDrain cancels a running task and WAITS for it to exit. +// This ensures no overlap between old and new owners. +func (rm *RecoveryManager) cancelAndDrain(replicaID string, invalidateSession bool) { + rm.mu.Lock() + task, ok := rm.tasks[replicaID] + if !ok { + rm.mu.Unlock() + return + } + glog.V(1).Infof("recovery: cancelling+draining task for %s (invalidate=%v)", replicaID, invalidateSession) + task.cancel() + if invalidateSession && rm.bs.v2Orchestrator != nil { + if s := rm.bs.v2Orchestrator.Registry.Sender(replicaID); s != nil { + s.InvalidateSession("recovery_removed", engine.StateDisconnected) + } + } + delete(rm.tasks, replicaID) + doneCh := task.done + rm.mu.Unlock() + + // Wait for the old goroutine to exit OUTSIDE the lock. + // This serializes replacement: new task cannot start until old is fully drained. + <-doneCh +} + +// startTask creates and starts a new recovery goroutine. Caller must ensure +// no existing task for this replicaID (call cancelAndDrain first). +func (rm *RecoveryManager) startTask(replicaID string, assignments []blockvol.BlockVolumeAssignment) { + rm.mu.Lock() + defer rm.mu.Unlock() + + rebuildAddr := rm.deriveRebuildAddr(replicaID, assignments) + + ctx, cancel := context.WithCancel(context.Background()) + task := &recoveryTask{ + replicaID: replicaID, + cancel: cancel, + done: make(chan struct{}), + } + rm.tasks[replicaID] = task + + rm.wg.Add(1) + go rm.runRecovery(ctx, task, rebuildAddr) +} + +// Shutdown cancels all active recovery tasks and waits for drain. +func (rm *RecoveryManager) Shutdown() { + rm.mu.Lock() + for _, task := range rm.tasks { + task.cancel() + if rm.bs.v2Orchestrator != nil { + if s := rm.bs.v2Orchestrator.Registry.Sender(task.replicaID); s != nil { + s.InvalidateSession("recovery_shutdown", engine.StateDisconnected) + } + } + } + rm.tasks = make(map[string]*recoveryTask) + rm.mu.Unlock() + rm.wg.Wait() +} + +// ActiveTaskCount returns the number of active recovery tasks (for testing). +func (rm *RecoveryManager) ActiveTaskCount() int { + rm.mu.Lock() + defer rm.mu.Unlock() + return len(rm.tasks) +} + +// DiagnosticSnapshot returns a bounded read-only snapshot of active recovery +// tasks for operator-visible diagnosis. Each entry shows the replicaID being +// recovered. This is the P3 diagnosability surface — read-only, no semantics. +type RecoveryDiagnostic struct { + ActiveTasks []string // replicaIDs with active recovery work +} + +func (rm *RecoveryManager) DiagnosticSnapshot() RecoveryDiagnostic { + rm.mu.Lock() + defer rm.mu.Unlock() + diag := RecoveryDiagnostic{} + for id := range rm.tasks { + diag.ActiveTasks = append(diag.ActiveTasks, id) + } + return diag +} + +// runRecovery is the recovery goroutine for one replica target. +func (rm *RecoveryManager) runRecovery(ctx context.Context, task *recoveryTask, rebuildAddr string) { + defer rm.wg.Done() + defer close(task.done) // signal drain completion + defer func() { + rm.mu.Lock() + // Only delete if we're still the active task (pointer comparison). + if rm.tasks[task.replicaID] == task { + delete(rm.tasks, task.replicaID) + } + rm.mu.Unlock() + }() + + replicaID := task.replicaID + + if ctx.Err() != nil { + return + } + + orch := rm.bs.v2Orchestrator + s := orch.Registry.Sender(replicaID) + if s == nil { + glog.V(1).Infof("recovery: sender %s not found, skipping", replicaID) + return + } + + sessSnap := s.SessionSnapshot() + if sessSnap == nil { + glog.V(1).Infof("recovery: sender %s has no active session, skipping", replicaID) + return + } + + glog.V(0).Infof("recovery: starting %s session for %s (rebuildAddr=%s)", + sessSnap.Kind, replicaID, rebuildAddr) + + if rm.OnBeforeExecute != nil { + rm.OnBeforeExecute(replicaID) + } + + switch sessSnap.Kind { + case engine.SessionCatchUp: + rm.runCatchUp(ctx, replicaID, rebuildAddr) + case engine.SessionRebuild: + rm.runRebuild(ctx, replicaID, rebuildAddr) + default: + glog.V(1).Infof("recovery: unknown session kind %s for %s", sessSnap.Kind, replicaID) + } +} + +func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID, rebuildAddr string) { + bs := rm.bs + volPath := rm.volumePathForReplica(replicaID) + if volPath == "" { + glog.Warningf("recovery: cannot determine volume path for %s", replicaID) + return + } + + var sa engine.StorageAdapter + var replicaFlushedLSN uint64 + var executor *v2bridge.Executor + + if err := bs.blockStore.WithVolume(volPath, func(vol *blockvol.BlockVol) error { + reader := v2bridge.NewReader(vol) + pinner := v2bridge.NewPinner(vol) + sa = bridge.NewStorageAdapter( + &readerShimForRecovery{reader}, + &pinnerShimForRecovery{pinner}, + ) + if s := bs.v2Orchestrator.Registry.Sender(replicaID); s != nil { + if snap := s.SessionSnapshot(); snap != nil { + replicaFlushedLSN = snap.StartLSN + } + } + executor = v2bridge.NewExecutor(vol, rebuildAddr) + return nil + }); err != nil { + glog.Warningf("recovery: cannot access volume %s: %v", volPath, err) + return + } + + if ctx.Err() != nil { + return + } + + driver := &engine.RecoveryDriver{Orchestrator: bs.v2Orchestrator, Storage: sa} + + plan, err := driver.PlanRecovery(replicaID, replicaFlushedLSN) + if err != nil { + glog.Warningf("recovery: plan failed for %s: %v", replicaID, err) + return + } + + if ctx.Err() != nil { + driver.CancelPlan(plan, "context_cancelled") + return + } + + exec := engine.NewCatchUpExecutor(driver, plan) + exec.IO = executor + + if execErr := exec.Execute(nil, 0); execErr != nil { + if ctx.Err() != nil { + glog.V(1).Infof("recovery: catch-up cancelled for %s: %v", replicaID, execErr) + } else { + glog.Warningf("recovery: catch-up execution failed for %s: %v", replicaID, execErr) + } + return + } + + glog.V(0).Infof("recovery: catch-up completed for %s", replicaID) +} + +func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID, rebuildAddr string) { + bs := rm.bs + volPath := rm.volumePathForReplica(replicaID) + if volPath == "" { + glog.Warningf("recovery: cannot determine volume path for %s", replicaID) + return + } + + var sa engine.StorageAdapter + var executor *v2bridge.Executor + + if err := bs.blockStore.WithVolume(volPath, func(vol *blockvol.BlockVol) error { + reader := v2bridge.NewReader(vol) + pinner := v2bridge.NewPinner(vol) + sa = bridge.NewStorageAdapter( + &readerShimForRecovery{reader}, + &pinnerShimForRecovery{pinner}, + ) + executor = v2bridge.NewExecutor(vol, rebuildAddr) + return nil + }); err != nil { + glog.Warningf("recovery: cannot access volume %s: %v", volPath, err) + return + } + + if ctx.Err() != nil { + return + } + + driver := &engine.RecoveryDriver{Orchestrator: bs.v2Orchestrator, Storage: sa} + + plan, err := driver.PlanRebuild(replicaID) + if err != nil { + glog.Warningf("recovery: rebuild plan failed for %s: %v", replicaID, err) + return + } + + if ctx.Err() != nil { + driver.CancelPlan(plan, "context_cancelled") + return + } + + exec := engine.NewRebuildExecutor(driver, plan) + exec.IO = executor + + if execErr := exec.Execute(); execErr != nil { + if ctx.Err() != nil { + glog.V(1).Infof("recovery: rebuild cancelled for %s: %v", replicaID, execErr) + } else { + glog.Warningf("recovery: rebuild execution failed for %s: %v", replicaID, execErr) + } + return + } + + glog.V(0).Infof("recovery: rebuild completed for %s", replicaID) +} + +func (rm *RecoveryManager) deriveRebuildAddr(replicaID string, assignments []blockvol.BlockVolumeAssignment) string { + volPath := rm.volumePathForReplica(replicaID) + for _, a := range assignments { + if a.Path == volPath && a.RebuildAddr != "" { + return a.RebuildAddr + } + } + return "" +} + +func (rm *RecoveryManager) volumePathForReplica(replicaID string) string { + for i := len(replicaID) - 1; i >= 0; i-- { + if replicaID[i] == '/' { + return replicaID[:i] + } + } + return "" +} + +// --- Bridge shims --- + +type readerShimForRecovery struct{ r *v2bridge.Reader } + +func (s *readerShimForRecovery) ReadState() bridge.BlockVolState { + rs := s.r.ReadState() + return bridge.BlockVolState{ + WALHeadLSN: rs.WALHeadLSN, + WALTailLSN: rs.WALTailLSN, + CommittedLSN: rs.CommittedLSN, + CheckpointLSN: rs.CheckpointLSN, + CheckpointTrusted: rs.CheckpointTrusted, + } +} + +type pinnerShimForRecovery struct{ p *v2bridge.Pinner } + +func (s *pinnerShimForRecovery) HoldWALRetention(startLSN uint64) (func(), error) { + return s.p.HoldWALRetention(startLSN) +} +func (s *pinnerShimForRecovery) HoldSnapshot(checkpointLSN uint64) (func(), error) { + return s.p.HoldSnapshot(checkpointLSN) +} +func (s *pinnerShimForRecovery) HoldFullBase(committedLSN uint64) (func(), error) { + return s.p.HoldFullBase(committedLSN) +} diff --git a/weed/server/master_block_failover.go b/weed/server/master_block_failover.go index 8a97079b6..4d17e773c 100644 --- a/weed/server/master_block_failover.go +++ b/weed/server/master_block_failover.go @@ -1,10 +1,12 @@ package weed_server import ( + "context" "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" ) @@ -18,18 +20,139 @@ type pendingRebuild struct { ReplicaCtrlAddr string // CP13-8: saved from before death for catch-up-first recovery } +// deferredPromotion tracks a deferred promotion timer with its volume context. +type deferredPromotion struct { + Timer *time.Timer + VolumeName string + CurrentPrimary string // current (stale) primary that will be replaced + AffectedServer string // dead server addr +} + // blockFailoverState holds failover and rebuild state on the master. type blockFailoverState struct { mu sync.Mutex - pendingRebuilds map[string][]pendingRebuild // dead server addr -> pending rebuilds - // R2-F2: Track deferred promotion timers so they can be cancelled on reconnect. - deferredTimers map[string][]*time.Timer // dead server addr -> pending timers + pendingRebuilds map[string][]pendingRebuild // dead server addr -> pending rebuilds + deferredTimers map[string][]deferredPromotion // dead server addr -> pending deferred promotions +} + +// FailoverVolumeState is one volume's failover diagnosis entry. +type FailoverVolumeState struct { + VolumeName string + CurrentPrimary string + AffectedServer string // dead server that triggered the failover/rebuild + DeferredPromotion bool // true if a deferred promotion timer is pending + PendingRebuild bool // true if a rebuild is pending for this volume + Reason string // "lease_wait", "rebuild_pending", or "" +} + +// FailoverDiagnostic is a bounded read-only snapshot of failover state +// for operator-visible diagnosis. P3 diagnosability surface. +// +// Volume-oriented: each entry describes one volume's failover state. +// Aggregate counts are derived from the volume list. +type FailoverDiagnostic struct { + Volumes []FailoverVolumeState + PendingRebuildCount map[string]int // dead server → count of pending rebuilds + DeferredPromotionCount map[string]int // dead server → count of deferred promotion timers +} + +func (fs *blockFailoverState) DiagnosticSnapshot() FailoverDiagnostic { + fs.mu.Lock() + defer fs.mu.Unlock() + diag := FailoverDiagnostic{ + PendingRebuildCount: make(map[string]int), + DeferredPromotionCount: make(map[string]int), + } + for server, rebuilds := range fs.pendingRebuilds { + diag.PendingRebuildCount[server] = len(rebuilds) + for _, rb := range rebuilds { + diag.Volumes = append(diag.Volumes, FailoverVolumeState{ + VolumeName: rb.VolumeName, + CurrentPrimary: rb.NewPrimary, + AffectedServer: server, + PendingRebuild: true, + Reason: "rebuild_pending", + }) + } + } + for server, promos := range fs.deferredTimers { + diag.DeferredPromotionCount[server] = len(promos) + for _, dp := range promos { + diag.Volumes = append(diag.Volumes, FailoverVolumeState{ + VolumeName: dp.VolumeName, + CurrentPrimary: dp.CurrentPrimary, + AffectedServer: dp.AffectedServer, + DeferredPromotion: true, + Reason: "lease_wait", + }) + } + } + return diag +} + +// PublicationDiagnostic is a bounded read-only snapshot comparing the +// operator-visible publication (LookupBlockVolume response) against the +// registry authority for one volume. P3 diagnosability surface for S2. +type PublicationDiagnostic struct { + VolumeName string + LookupVolumeServer string // what LookupBlockVolume returns + LookupIscsiAddr string + AuthorityVolumeServer string // registry entry (source of truth) + AuthorityIscsiAddr string + Coherent bool // true if lookup == authority + Reason string // "" if coherent, otherwise why they diverge +} + +// PublicationDiagnosticFor returns a PublicationDiagnostic for the named volume. +// It performs two independent reads: +// - Lookup side: calls LookupBlockVolume (the actual gRPC method) +// - Authority side: reads the registry directly +// +// Then compares the two. If they diverge, Coherent=false with a Reason. +func (ms *MasterServer) PublicationDiagnosticFor(volumeName string) (PublicationDiagnostic, bool) { + if ms.blockRegistry == nil { + return PublicationDiagnostic{}, false + } + + // Read 1: the operator-visible publication surface. + lookupResp, err := ms.LookupBlockVolume(context.Background(), &master_pb.LookupBlockVolumeRequest{Name: volumeName}) + if err != nil { + return PublicationDiagnostic{}, false + } + + // Read 2: the registry authority (separate read). + entry, ok := ms.blockRegistry.Lookup(volumeName) + if !ok { + return PublicationDiagnostic{}, false + } + + diag := PublicationDiagnostic{ + VolumeName: volumeName, + LookupVolumeServer: lookupResp.VolumeServer, + LookupIscsiAddr: lookupResp.IscsiAddr, + AuthorityVolumeServer: entry.VolumeServer, + AuthorityIscsiAddr: entry.ISCSIAddr, + } + + // Compare the two reads. + vsMatch := diag.LookupVolumeServer == diag.AuthorityVolumeServer + iscsiMatch := diag.LookupIscsiAddr == diag.AuthorityIscsiAddr + diag.Coherent = vsMatch && iscsiMatch + if !diag.Coherent { + if !vsMatch { + diag.Reason = "volume_server_mismatch" + } else { + diag.Reason = "iscsi_addr_mismatch" + } + } + + return diag, true } func newBlockFailoverState() *blockFailoverState { return &blockFailoverState{ pendingRebuilds: make(map[string][]pendingRebuild), - deferredTimers: make(map[string][]*time.Timer), + deferredTimers: make(map[string][]deferredPromotion), } } @@ -60,7 +183,11 @@ func (ms *MasterServer) failoverBlockVolumes(deadServer string) { glog.V(0).Infof("failover: %q lease expires in %v, deferring promotion", entry.Name, delay) volumeName := entry.Name capturedEpoch := entry.Epoch // T3: capture epoch for stale-timer validation + capturedDeadServer := deadServer // capture for closure timer := time.AfterFunc(delay, func() { + // Clean up the deferred entry regardless of outcome. + ms.removeFiredDeferredPromotion(capturedDeadServer, volumeName) + // T3: Re-validate before acting — prevent stale timer on recreated/changed volume. current, ok := ms.blockRegistry.Lookup(volumeName) if !ok { @@ -76,7 +203,12 @@ func (ms *MasterServer) failoverBlockVolumes(deadServer string) { }) ms.blockFailover.mu.Lock() ms.blockFailover.deferredTimers[deadServer] = append( - ms.blockFailover.deferredTimers[deadServer], timer) + ms.blockFailover.deferredTimers[deadServer], deferredPromotion{ + Timer: timer, + VolumeName: volumeName, + CurrentPrimary: entry.VolumeServer, + AffectedServer: deadServer, + }) ms.blockFailover.mu.Unlock() continue } @@ -159,12 +291,14 @@ func (ms *MasterServer) finalizePromotion(volumeName, oldPrimary, oldPath string assignment.ReplicaAddrs = append(assignment.ReplicaAddrs, blockvol.ReplicaAddr{ DataAddr: ri.DataAddr, CtrlAddr: ri.CtrlAddr, + ServerID: ri.Server, // V2: stable identity }) } // Backward compat: also set scalar fields if exactly 1 replica. if len(entry.Replicas) == 1 { assignment.ReplicaDataAddr = entry.Replicas[0].DataAddr assignment.ReplicaCtrlAddr = entry.Replicas[0].CtrlAddr + assignment.ReplicaServerID = entry.Replicas[0].Server // V2: stable identity } ms.blockAssignmentQueue.Enqueue(entry.VolumeServer, assignment) @@ -210,14 +344,36 @@ func (ms *MasterServer) cancelDeferredTimers(server string) { return } ms.blockFailover.mu.Lock() - timers := ms.blockFailover.deferredTimers[server] + promos := ms.blockFailover.deferredTimers[server] delete(ms.blockFailover.deferredTimers, server) ms.blockFailover.mu.Unlock() - for _, t := range timers { - t.Stop() + for _, dp := range promos { + dp.Timer.Stop() } - if len(timers) > 0 { - glog.V(0).Infof("failover: cancelled %d deferred promotion timers for reconnected %s", len(timers), server) + if len(promos) > 0 { + glog.V(0).Infof("failover: cancelled %d deferred promotion timers for reconnected %s", len(promos), server) + } +} + +// removeFiredDeferredPromotion removes a single deferred promotion entry after +// its timer has fired (whether it promoted or was skipped). This keeps +// FailoverDiagnostic accurate: once the timer fires, the volume is no longer +// in lease-wait state. +func (ms *MasterServer) removeFiredDeferredPromotion(server, volumeName string) { + if ms.blockFailover == nil { + return + } + ms.blockFailover.mu.Lock() + defer ms.blockFailover.mu.Unlock() + promos := ms.blockFailover.deferredTimers[server] + for i, dp := range promos { + if dp.VolumeName == volumeName { + ms.blockFailover.deferredTimers[server] = append(promos[:i], promos[i+1:]...) + if len(ms.blockFailover.deferredTimers[server]) == 0 { + delete(ms.blockFailover.deferredTimers, server) + } + return + } } } @@ -286,16 +442,18 @@ func (ms *MasterServer) recoverBlockVolumes(reconnectedServer string) { Role: blockvol.RoleToWire(blockvol.RolePrimary), LeaseTtlMs: leaseTTLMs, } - // Include all replica addresses. + // Include all replica addresses with stable identity. for _, ri := range entry.Replicas { primaryAssignment.ReplicaAddrs = append(primaryAssignment.ReplicaAddrs, blockvol.ReplicaAddr{ DataAddr: ri.DataAddr, CtrlAddr: ri.CtrlAddr, + ServerID: ri.Server, // V2: stable identity }) } if len(entry.Replicas) == 1 { primaryAssignment.ReplicaDataAddr = entry.Replicas[0].DataAddr primaryAssignment.ReplicaCtrlAddr = entry.Replicas[0].CtrlAddr + primaryAssignment.ReplicaServerID = entry.Replicas[0].Server // V2 } ms.blockAssignmentQueue.Enqueue(entry.VolumeServer, primaryAssignment) @@ -346,11 +504,13 @@ func (ms *MasterServer) refreshPrimaryForAddrChange(ac ReplicaAddrChange) { assignment.ReplicaAddrs = append(assignment.ReplicaAddrs, blockvol.ReplicaAddr{ DataAddr: ri.DataAddr, CtrlAddr: ri.CtrlAddr, + ServerID: ri.Server, // V2: stable identity }) } if len(entry.Replicas) == 1 { assignment.ReplicaDataAddr = entry.Replicas[0].DataAddr assignment.ReplicaCtrlAddr = entry.Replicas[0].CtrlAddr + assignment.ReplicaServerID = entry.Replicas[0].Server // V2 } // Use current registry primary (not stale ac.PrimaryServer) in case // failover happened between address-change detection and this refresh. @@ -384,6 +544,9 @@ func (ms *MasterServer) reevaluateOrphanedPrimaries(server string) { capturedEpoch := entry.Epoch deadPrimary := entry.VolumeServer timer := time.AfterFunc(delay, func() { + // Clean up the deferred entry regardless of outcome. + ms.removeFiredDeferredPromotion(deadPrimary, volumeName) + current, ok := ms.blockRegistry.Lookup(volumeName) if !ok { return @@ -397,7 +560,12 @@ func (ms *MasterServer) reevaluateOrphanedPrimaries(server string) { }) ms.blockFailover.mu.Lock() ms.blockFailover.deferredTimers[deadPrimary] = append( - ms.blockFailover.deferredTimers[deadPrimary], timer) + ms.blockFailover.deferredTimers[deadPrimary], deferredPromotion{ + Timer: timer, + VolumeName: volumeName, + CurrentPrimary: deadPrimary, + AffectedServer: deadPrimary, + }) ms.blockFailover.mu.Unlock() continue } diff --git a/weed/server/qa_block_diagnosability_test.go b/weed/server/qa_block_diagnosability_test.go new file mode 100644 index 000000000..5df2c61b9 --- /dev/null +++ b/weed/server/qa_block_diagnosability_test.go @@ -0,0 +1,270 @@ +package weed_server + +import ( + "context" + "os" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +// ============================================================ +// Phase 12 P3: Diagnosability / Blocker Accounting +// +// All diagnosis conclusions use ONLY explicit bounded read-only +// diagnosis surfaces: +// - LookupBlockVolume (product-visible publication) +// - FailoverDiagnostic (volume-oriented failover state) +// - PublicationDiagnostic (lookup vs authority coherence) +// - RecoveryDiagnostic (active recovery task set) +// - phase-12-p3-blockers.md (finite blocker ledger) +// +// NOT performance, NOT rollout readiness. +// ============================================================ + +// --- S1: Failover convergence diagnosable via FailoverDiagnostic --- + +func TestP12P3_FailoverConvergence_Diagnosable(t *testing.T) { + s := newSoakSetup(t) + ctx := context.Background() + + s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{ + Name: "diag-vol-1", SizeBytes: 1 << 20, + }) + + entry, _ := s.ms.blockRegistry.Lookup("diag-vol-1") + s.bs.localServerID = entry.VolumeServer + s.deliver(entry.VolumeServer) + time.Sleep(100 * time.Millisecond) + + // Surface 1: LookupBlockVolume shows current primary before failover. + lookupBefore, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "diag-vol-1"}) + if lookupBefore.VolumeServer != entry.VolumeServer { + t.Fatal("lookup should show original primary before failover") + } + + // Surface 2: FailoverDiagnostic — no volumes in failover state yet. + failoverBefore := s.ms.blockFailover.DiagnosticSnapshot() + for _, v := range failoverBefore.Volumes { + if v.VolumeName == "diag-vol-1" { + t.Fatalf("S1: diag-vol-1 should not appear in failover diagnostic before failover, got %+v", v) + } + } + + // Trigger failover: expire lease, then failover. + s.ms.blockRegistry.UpdateEntry("diag-vol-1", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) + s.ms.failoverBlockVolumes(entry.VolumeServer) + + // Surface 1 after: LookupBlockVolume shows NEW primary. + lookupAfter, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "diag-vol-1"}) + if lookupAfter.VolumeServer == entry.VolumeServer { + t.Fatal("S1 stall: lookup still shows old primary after failover") + } + + // Surface 2 after: FailoverDiagnostic shows volume-level failover state. + failoverAfter := s.ms.blockFailover.DiagnosticSnapshot() + + // Classify via explicit diagnosis surface: find diag-vol-1 in failover volumes. + var found *FailoverVolumeState + for i := range failoverAfter.Volumes { + if failoverAfter.Volumes[i].VolumeName == "diag-vol-1" { + found = &failoverAfter.Volumes[i] + break + } + } + if found == nil { + t.Fatal("S1: diag-vol-1 not found in FailoverDiagnostic after failover") + } + + // Diagnosis conclusion from explicit surfaces only: + // - Lookup changed (old → new primary) + // - FailoverDiagnostic classifies state as rebuild_pending + // - AffectedServer identifies the dead server + if !found.PendingRebuild { + t.Fatal("S1: FailoverDiagnostic should show PendingRebuild=true") + } + if found.Reason != "rebuild_pending" { + t.Fatalf("S1: expected reason=rebuild_pending, got %q", found.Reason) + } + if found.AffectedServer != entry.VolumeServer { + t.Fatalf("S1: AffectedServer should be dead server %s, got %s", entry.VolumeServer, found.AffectedServer) + } + if found.CurrentPrimary != lookupAfter.VolumeServer { + t.Fatalf("S1: CurrentPrimary should match lookup %s, got %s", lookupAfter.VolumeServer, found.CurrentPrimary) + } + + t.Logf("P12P3 S1: diagnosed via LookupBlockVolume(%s→%s) + FailoverDiagnostic(vol=%s, reason=%s, affected=%s)", + lookupBefore.VolumeServer, lookupAfter.VolumeServer, found.VolumeName, found.Reason, found.AffectedServer) +} + +// --- S2: Publication mismatch diagnosable via PublicationDiagnostic --- + +func TestP12P3_PublicationMismatch_Diagnosable(t *testing.T) { + s := newSoakSetup(t) + ctx := context.Background() + + s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{ + Name: "diag-vol-2", SizeBytes: 1 << 20, + }) + + entry, _ := s.ms.blockRegistry.Lookup("diag-vol-2") + s.bs.localServerID = entry.VolumeServer + s.deliver(entry.VolumeServer) + time.Sleep(100 * time.Millisecond) + + // Surface 1: PublicationDiagnostic before failover — should be coherent. + pubBefore, ok := s.ms.PublicationDiagnosticFor("diag-vol-2") + if !ok { + t.Fatal("S2: PublicationDiagnosticFor should find diag-vol-2") + } + if !pubBefore.Coherent { + t.Fatalf("S2: publication should be coherent before failover, got reason=%q", pubBefore.Reason) + } + + // Surface 2: LookupBlockVolume — repeated lookups self-consistent. + lookup1, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "diag-vol-2"}) + lookup2, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "diag-vol-2"}) + if lookup1.IscsiAddr != lookup2.IscsiAddr || lookup1.VolumeServer != lookup2.VolumeServer { + t.Fatalf("S2: repeated lookup mismatch: %s/%s vs %s/%s", + lookup1.VolumeServer, lookup1.IscsiAddr, lookup2.VolumeServer, lookup2.IscsiAddr) + } + + // Trigger failover. + s.ms.blockRegistry.UpdateEntry("diag-vol-2", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) + s.ms.failoverBlockVolumes(entry.VolumeServer) + + // Surface 1 after: PublicationDiagnostic after failover — still coherent. + pubAfter, ok := s.ms.PublicationDiagnosticFor("diag-vol-2") + if !ok { + t.Fatal("S2: PublicationDiagnosticFor should find diag-vol-2 after failover") + } + if !pubAfter.Coherent { + t.Fatalf("S2: publication should be coherent after failover, got reason=%q", pubAfter.Reason) + } + + // Diagnosis conclusion from explicit surfaces only: + // - Pre-failover: coherent, lookup matches authority + // - Post-failover: coherent, lookup updated to new primary + // - Publication switched: post != pre + if pubAfter.LookupVolumeServer == pubBefore.LookupVolumeServer { + t.Fatal("S2: LookupVolumeServer unchanged after failover — publication did not switch") + } + if pubAfter.LookupIscsiAddr == pubBefore.LookupIscsiAddr { + t.Fatal("S2: LookupIscsiAddr unchanged after failover") + } + + // Post-failover repeated lookup still self-consistent (via diagnostic). + pubAfter2, _ := s.ms.PublicationDiagnosticFor("diag-vol-2") + if pubAfter2.LookupVolumeServer != pubAfter.LookupVolumeServer || + pubAfter2.LookupIscsiAddr != pubAfter.LookupIscsiAddr { + t.Fatal("S2: post-failover publication diagnostics inconsistent") + } + + t.Logf("P12P3 S2: diagnosed via PublicationDiagnostic — pre(vs=%s, iscsi=%s, coherent=%v) → post(vs=%s, iscsi=%s, coherent=%v)", + pubBefore.LookupVolumeServer, pubBefore.LookupIscsiAddr, pubBefore.Coherent, + pubAfter.LookupVolumeServer, pubAfter.LookupIscsiAddr, pubAfter.Coherent) +} + +// --- S3: Runtime residue diagnosable via RecoveryDiagnostic --- + +func TestP12P3_RuntimeResidue_Diagnosable(t *testing.T) { + s := newSoakSetup(t) + ctx := context.Background() + + s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{ + Name: "diag-vol-3", SizeBytes: 1 << 20, + }) + + entry, _ := s.ms.blockRegistry.Lookup("diag-vol-3") + s.bs.localServerID = entry.VolumeServer + s.deliver(entry.VolumeServer) + time.Sleep(200 * time.Millisecond) + + // Surface: RecoveryDiagnostic shows active task set. + diagBefore := s.bs.v2Recovery.DiagnosticSnapshot() + t.Logf("S3 before delete: %d active tasks: %v", len(diagBefore.ActiveTasks), diagBefore.ActiveTasks) + + // Delete the volume. + s.ms.DeleteBlockVolume(ctx, &master_pb.DeleteBlockVolumeRequest{Name: "diag-vol-3"}) + time.Sleep(200 * time.Millisecond) + + // Surface after: RecoveryDiagnostic — no tasks for deleted volume. + diagAfter := s.bs.v2Recovery.DiagnosticSnapshot() + for _, task := range diagAfter.ActiveTasks { + if strings.Contains(task, "diag-vol-3") { + t.Fatalf("S3 residue: task %s active after delete", task) + } + } + + // Diagnosis conclusion from explicit surface only: + // - RecoveryDiagnostic.ActiveTasks does not contain deleted volume's tasks + // - Conclusion: clean (no residue) or non-empty but unrelated to deleted volume + if len(diagAfter.ActiveTasks) == 0 { + t.Log("P12P3 S3: diagnosed via RecoveryDiagnostic — clean (0 active tasks after delete)") + } else { + t.Logf("P12P3 S3: diagnosed via RecoveryDiagnostic — %d active tasks (none for deleted vol)", + len(diagAfter.ActiveTasks)) + } +} + +// --- Blocker ledger: reads and validates the actual file --- + +func TestP12P3_BlockerLedger_Bounded(t *testing.T) { + ledgerPath := "../../sw-block/.private/phase/phase-12-p3-blockers.md" + + data, err := os.ReadFile(ledgerPath) + if err != nil { + t.Fatalf("blocker ledger must exist at %s: %v", ledgerPath, err) + } + + content := string(data) + + // Must contain diagnosed items. + for _, id := range []string{"B1", "B2", "B3"} { + if !strings.Contains(content, id) { + t.Fatalf("ledger missing diagnosed item %s", id) + } + } + + // Must contain unresolved items. + for _, id := range []string{"U1", "U2", "U3"} { + if !strings.Contains(content, id) { + t.Fatalf("ledger missing unresolved item %s", id) + } + } + + // Must contain out-of-scope section. + if !strings.Contains(content, "Out of Scope") { + t.Fatal("ledger must have 'Out of Scope' section") + } + + // Must NOT overclaim perf or rollout. + lines := strings.Split(content, "\n") + diagnosedCount := 0 + unresolvedCount := 0 + for _, line := range lines { + if strings.HasPrefix(strings.TrimSpace(line), "| B") { + diagnosedCount++ + } + if strings.HasPrefix(strings.TrimSpace(line), "| U") { + unresolvedCount++ + } + } + + total := diagnosedCount + unresolvedCount + if total == 0 { + t.Fatal("ledger has no blocker items") + } + if total > 20 { + t.Fatalf("ledger should be finite, got %d items", total) + } + + t.Logf("P12P3 blockers: %d diagnosed + %d unresolved = %d total (from actual file, finite)", + diagnosedCount, unresolvedCount, total) +} diff --git a/weed/server/qa_block_perf_test.go b/weed/server/qa_block_perf_test.go new file mode 100644 index 000000000..cf37a2058 --- /dev/null +++ b/weed/server/qa_block_perf_test.go @@ -0,0 +1,582 @@ +package weed_server + +import ( + "crypto/rand" + "fmt" + "math" + mrand "math/rand" + "os" + "path/filepath" + "sort" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" +) + +// ============================================================ +// Phase 12 P4: Performance Floor — Bounded Measurement Package +// +// Workload envelope: +// Topology: RF=2 sync_all accepted chosen path +// Operations: 4K random write, 4K random read, 4K sequential write, 4K sequential read +// Runtime: no failover, no disturbance, steady-state +// Environment: unit test harness (single-process, local disk, engine-local I/O) +// +// What this measures: +// Engine I/O floor for the accepted chosen path. WriteLBA/ReadLBA through +// the full fencing path (epoch, role, lease, writeGate, WAL, dirtyMap). +// No transport layer (iSCSI/NVMe). No cross-machine replication. +// +// What this does NOT measure: +// Transport throughput, cross-machine replication tax, multi-client concurrency, +// failover-under-load, degraded mode. Production floor with replication is +// documented in baseline-roce-20260401.md. +// +// NOT performance tuning. NOT broad benchmark. +// ============================================================ + +const ( + perfBlockSize = 4096 + perfVolumeSize = 64 * 1024 * 1024 // 64MB + perfWALSize = 16 * 1024 * 1024 // 16MB + perfOps = 1000 // ops per measurement run + perfWarmupOps = 200 // warmup ops (discarded from measurement) + perfIterations = 3 // run N times, report worst as floor +) + +// Minimum acceptable floor thresholds (engine-local, single-writer). +// +// These are regression gates, not performance targets. Set conservatively +// so any reasonable hardware passes, but catastrophic regressions +// (accidental serialization, O(n^2) scan, broken WAL path) are caught. +// +// Rationale for values: +// Measured on dev SSD: rand-write ~10K, rand-read ~80K, seq-write ~30K, seq-read ~180K. +// Thresholds set at ~10% of measured to tolerate slow CI machines and VMs. +// Write P99 ceiling at 100ms catches deadlocks/stalls without false-positiving +// on slow storage. +var perfFloorGates = map[string]struct { + MinIOPS float64 + MaxWriteP99 time.Duration // 0 = no ceiling (reads) +}{ + "rand-write": {MinIOPS: 1000, MaxWriteP99: 100 * time.Millisecond}, + "rand-read": {MinIOPS: 5000}, + "seq-write": {MinIOPS: 2000, MaxWriteP99: 100 * time.Millisecond}, + "seq-read": {MinIOPS: 10000}, +} + +// perfResult holds measurements for one workload run. +type perfResult struct { + Workload string + Ops int + Elapsed time.Duration + IOPS float64 + MBps float64 + LatSamples []int64 // per-op latency in nanoseconds +} + +func (r *perfResult) latPct(pct float64) time.Duration { + if len(r.LatSamples) == 0 { + return 0 + } + sorted := make([]int64, len(r.LatSamples)) + copy(sorted, r.LatSamples) + sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] }) + idx := int(math.Ceil(pct/100.0*float64(len(sorted)))) - 1 + if idx < 0 { + idx = 0 + } + if idx >= len(sorted) { + idx = len(sorted) - 1 + } + return time.Duration(sorted[idx]) +} + +func (r *perfResult) latAvg() time.Duration { + if len(r.LatSamples) == 0 { + return 0 + } + var sum int64 + for _, s := range r.LatSamples { + sum += s + } + return time.Duration(sum / int64(len(r.LatSamples))) +} + +// setupPerfVolume creates a BlockVol configured as Primary for perf measurement. +func setupPerfVolume(t *testing.T) *blockvol.BlockVol { + t.Helper() + dir := t.TempDir() + volPath := filepath.Join(dir, "perf.blk") + vol, err := blockvol.CreateBlockVol(volPath, blockvol.CreateOptions{ + VolumeSize: perfVolumeSize, + BlockSize: perfBlockSize, + WALSize: perfWALSize, + }) + if err != nil { + t.Fatal(err) + } + // Set up as Primary with long lease so writes are allowed. + if err := vol.HandleAssignment(1, blockvol.RolePrimary, 10*time.Minute); err != nil { + vol.Close() + t.Fatal(err) + } + t.Cleanup(func() { vol.Close() }) + return vol +} + +// maxLBAs returns the number of addressable 4K blocks in the extent area. +func maxLBAs() uint64 { + // Volume size minus WAL, divided by block size, with safety margin. + return (perfVolumeSize - perfWALSize) / perfBlockSize / 2 +} + +// runPerfWorkload executes one workload measurement and returns the result. +func runPerfWorkload(t *testing.T, vol *blockvol.BlockVol, workload string, ops int) perfResult { + t.Helper() + data := make([]byte, perfBlockSize) + rand.Read(data) + max := maxLBAs() + + samples := make([]int64, 0, ops) + start := time.Now() + + for i := 0; i < ops; i++ { + var lba uint64 + switch { + case strings.HasPrefix(workload, "rand"): + lba = uint64(mrand.Int63n(int64(max))) + default: // sequential + lba = uint64(i) % max + } + + opStart := time.Now() + switch { + case strings.HasSuffix(workload, "write"): + if err := vol.WriteLBA(lba, data); err != nil { + t.Fatalf("%s op %d: WriteLBA(%d): %v", workload, i, lba, err) + } + case strings.HasSuffix(workload, "read"): + if _, err := vol.ReadLBA(lba, perfBlockSize); err != nil { + t.Fatalf("%s op %d: ReadLBA(%d): %v", workload, i, lba, err) + } + } + samples = append(samples, time.Since(opStart).Nanoseconds()) + } + + elapsed := time.Since(start) + iops := float64(ops) / elapsed.Seconds() + mbps := iops * float64(perfBlockSize) / (1024 * 1024) + + return perfResult{ + Workload: workload, + Ops: ops, + Elapsed: elapsed, + IOPS: iops, + MBps: mbps, + LatSamples: samples, + } +} + +// floorOf returns the worst (lowest) IOPS and worst (highest) P99 across iterations. +type perfFloor struct { + Workload string + FloorIOPS float64 + FloorMBps float64 + WorstAvg time.Duration + WorstP50 time.Duration + WorstP99 time.Duration + WorstMax time.Duration +} + +func computeFloor(results []perfResult) perfFloor { + f := perfFloor{ + Workload: results[0].Workload, + FloorIOPS: math.MaxFloat64, + FloorMBps: math.MaxFloat64, + } + for _, r := range results { + if r.IOPS < f.FloorIOPS { + f.FloorIOPS = r.IOPS + } + if r.MBps < f.FloorMBps { + f.FloorMBps = r.MBps + } + avg := r.latAvg() + if avg > f.WorstAvg { + f.WorstAvg = avg + } + p50 := r.latPct(50) + if p50 > f.WorstP50 { + f.WorstP50 = p50 + } + p99 := r.latPct(99) + if p99 > f.WorstP99 { + f.WorstP99 = p99 + } + pmax := r.latPct(100) + if pmax > f.WorstMax { + f.WorstMax = pmax + } + } + return f +} + +// --- Test 1: PerformanceFloor_Bounded --- + +func TestP12P4_PerformanceFloor_Bounded(t *testing.T) { + vol := setupPerfVolume(t) + + workloads := []string{"rand-write", "rand-read", "seq-write", "seq-read"} + floors := make([]perfFloor, 0, len(workloads)) + + for _, wl := range workloads { + // Warmup: populate volume with data (needed for reads). + if strings.HasSuffix(wl, "read") { + warmupData := make([]byte, perfBlockSize) + rand.Read(warmupData) + for i := 0; i < int(maxLBAs()); i++ { + if err := vol.WriteLBA(uint64(i), warmupData); err != nil { + break // WAL full is acceptable during warmup + } + } + time.Sleep(200 * time.Millisecond) // let flusher drain + } + + // Warmup ops (discarded). + runPerfWorkload(t, vol, wl, perfWarmupOps) + + // Measurement: N iterations, take floor. + var results []perfResult + for iter := 0; iter < perfIterations; iter++ { + r := runPerfWorkload(t, vol, wl, perfOps) + results = append(results, r) + } + + floor := computeFloor(results) + floors = append(floors, floor) + } + + // Report structured floor table. + t.Log("") + t.Log("=== P12P4 Performance Floor (engine-local, single-writer) ===") + t.Log("") + t.Logf("%-12s %10s %8s %10s %10s %10s %10s", + "Workload", "Floor IOPS", "MB/s", "Avg Lat", "P50 Lat", "P99 Lat", "Max Lat") + t.Logf("%-12s %10s %8s %10s %10s %10s %10s", + "--------", "----------", "------", "-------", "-------", "-------", "-------") + for _, f := range floors { + t.Logf("%-12s %10.0f %8.2f %10s %10s %10s %10s", + f.Workload, f.FloorIOPS, f.FloorMBps, f.WorstAvg, f.WorstP50, f.WorstP99, f.WorstMax) + } + t.Log("") + t.Logf("Config: volume=%dMB WAL=%dMB block=%dB ops=%d warmup=%d iterations=%d", + perfVolumeSize/(1024*1024), perfWALSize/(1024*1024), perfBlockSize, perfOps, perfWarmupOps, perfIterations) + t.Log("Method: worst of N iterations (floor, not peak)") + t.Log("Scope: engine-local only; production RF=2 floor in baseline-roce-20260401.md") + + // Gate: floor values must meet minimum acceptable thresholds. + // These are regression gates — if any floor drops below the gate, + // the test fails, blocking rollout. + t.Log("") + t.Log("=== Floor Gate Validation ===") + allGatesPassed := true + for _, f := range floors { + gate, ok := perfFloorGates[f.Workload] + if !ok { + t.Fatalf("no floor gate defined for workload %s", f.Workload) + } + passed := true + if f.FloorIOPS < gate.MinIOPS { + t.Errorf("GATE FAIL: %s floor IOPS %.0f < minimum %.0f", f.Workload, f.FloorIOPS, gate.MinIOPS) + passed = false + } + if gate.MaxWriteP99 > 0 && f.WorstP99 > gate.MaxWriteP99 { + t.Errorf("GATE FAIL: %s worst P99 %s > ceiling %s", f.Workload, f.WorstP99, gate.MaxWriteP99) + passed = false + } + status := "PASS" + if !passed { + status = "FAIL" + allGatesPassed = false + } + t.Logf(" %-12s min=%6.0f IOPS → floor=%6.0f [%s]", f.Workload, gate.MinIOPS, f.FloorIOPS, status) + } + + if !allGatesPassed { + t.Fatal("P12P4 PerformanceFloor: FAIL — one or more floor gates not met") + } + t.Log("P12P4 PerformanceFloor: PASS — all floor gates met") +} + +// --- Test 2: CostCharacterization_Bounded --- + +func TestP12P4_CostCharacterization_Bounded(t *testing.T) { + vol := setupPerfVolume(t) + + // Measure write latency breakdown: WriteLBA includes WAL append + group commit. + data := make([]byte, perfBlockSize) + rand.Read(data) + max := maxLBAs() + + const costOps = 500 + var writeLatSum int64 + for i := 0; i < costOps; i++ { + lba := uint64(mrand.Int63n(int64(max))) + start := time.Now() + if err := vol.WriteLBA(lba, data); err != nil { + t.Fatalf("write op %d: %v", i, err) + } + writeLatSum += time.Since(start).Nanoseconds() + } + avgWriteLat := time.Duration(writeLatSum / costOps) + + // Measure read latency for comparison. + // Populate first. + for i := 0; i < int(max/2); i++ { + vol.WriteLBA(uint64(i), data) + } + time.Sleep(200 * time.Millisecond) // let flusher drain + + var readLatSum int64 + for i := 0; i < costOps; i++ { + lba := uint64(mrand.Int63n(int64(max / 2))) + start := time.Now() + if _, err := vol.ReadLBA(lba, perfBlockSize); err != nil { + t.Fatalf("read op %d: %v", i, err) + } + readLatSum += time.Since(start).Nanoseconds() + } + avgReadLat := time.Duration(readLatSum / costOps) + + // Cost statement. + t.Log("") + t.Log("=== P12P4 Cost Characterization (engine-local) ===") + t.Log("") + t.Logf("Average write latency: %s (includes WAL append + group commit sync)", avgWriteLat) + t.Logf("Average read latency: %s (dirtyMap lookup + WAL/extent read)", avgReadLat) + t.Log("") + t.Log("Bounded cost statement:") + t.Log(" WAL write amplification: 2x minimum (WAL write + eventual extent flush)") + t.Log(" Group commit: amortizes fdatasync across batched writers (1 sync per batch)") + t.Log(" Replication tax (production RF=2 sync_all): -56% vs RF=1 (barrier round-trip)") + t.Log(" Replication tax source: baseline-roce-20260401.md, measured on 25Gbps RoCE") + t.Log("") + t.Logf("Write/read ratio: %.1fx (write is %.1fx slower than read)", + float64(avgWriteLat)/float64(avgReadLat), + float64(avgWriteLat)/float64(avgReadLat)) + t.Log("") + t.Logf("Config: volume=%dMB WAL=%dMB block=%dB ops=%d", + perfVolumeSize/(1024*1024), perfWALSize/(1024*1024), perfBlockSize, costOps) + + // Proof: cost values are finite and positive. + if avgWriteLat <= 0 || avgReadLat <= 0 { + t.Fatal("latency values must be positive") + } + if avgWriteLat < avgReadLat { + t.Log("Note: write faster than read in this run (possible due to WAL cache hits)") + } + + t.Log("P12P4 CostCharacterization: PASS — bounded cost statement produced") +} + +// --- Test 3: RolloutGate_Bounded --- + +func TestP12P4_RolloutGate_Bounded(t *testing.T) { + floorPath := "../../sw-block/.private/phase/phase-12-p4-floor.md" + gatesPath := "../../sw-block/.private/phase/phase-12-p4-rollout-gates.md" + baselinePath := "../../learn/projects/sw-block/test/results/baseline-roce-20260401.md" + blockerPath := "../../sw-block/.private/phase/phase-12-p3-blockers.md" + + // --- Read all cited evidence sources --- + + floorData, err := os.ReadFile(floorPath) + if err != nil { + t.Fatalf("floor doc must exist at %s: %v", floorPath, err) + } + floorContent := string(floorData) + + gatesData, err := os.ReadFile(gatesPath) + if err != nil { + t.Fatalf("rollout-gates doc must exist at %s: %v", gatesPath, err) + } + gatesContent := string(gatesData) + + baselineData, err := os.ReadFile(baselinePath) + if err != nil { + t.Fatalf("cited baseline must exist at %s: %v", baselinePath, err) + } + baselineContent := string(baselineData) + + blockerData, err := os.ReadFile(blockerPath) + if err != nil { + t.Fatalf("cited blocker ledger must exist at %s: %v", blockerPath, err) + } + blockerContent := string(blockerData) + + // --- Structural validation (shape) --- + + // Floor doc: workload envelope, floor table, non-claims. + for _, required := range []string{"RF=2", "sync_all", "4K random write", "4K random read", "sequential write", "sequential read"} { + if !strings.Contains(floorContent, required) { + t.Fatalf("floor doc missing required content: %q", required) + } + } + if !strings.Contains(floorContent, "Floor") || !strings.Contains(floorContent, "IOPS") { + t.Fatal("floor doc must contain floor table with IOPS") + } + if !strings.Contains(floorContent, "does NOT") { + t.Fatal("floor doc must contain explicit non-claims") + } + + // Gates doc: gates table, launch envelope, exclusions, non-claims. + if !strings.Contains(gatesContent, "Gate") || !strings.Contains(gatesContent, "Status") { + t.Fatal("rollout-gates doc must contain gates table") + } + if !strings.Contains(gatesContent, "Launch Envelope") { + t.Fatal("rollout-gates doc must contain launch envelope") + } + if !strings.Contains(gatesContent, "Exclusion") { + t.Fatal("rollout-gates doc must contain exclusions") + } + if !strings.Contains(gatesContent, "does NOT") { + t.Fatal("rollout-gates doc must contain explicit non-claims") + } + + // Count gates — must be finite. + gateLines := 0 + for _, line := range strings.Split(gatesContent, "\n") { + trimmed := strings.TrimSpace(line) + if strings.HasPrefix(trimmed, "| G") || strings.HasPrefix(trimmed, "| E") { + gateLines++ + } + } + if gateLines == 0 { + t.Fatal("rollout-gates doc has no gate items") + } + if gateLines > 20 { + t.Fatalf("rollout-gates doc should be finite, got %d items", gateLines) + } + + // --- Semantic cross-checks (evidence alignment) --- + + // 1. G6 cites "28.4K write IOPS" — baseline must contain this number. + if strings.Contains(gatesContent, "28.4K write IOPS") || strings.Contains(gatesContent, "28,4") { + // The gates doc cites write IOPS from baseline. Verify the baseline has it. + if !strings.Contains(baselineContent, "28,") { + t.Fatal("G6 cites write IOPS but baseline does not contain matching value") + } + } + // More precise: baseline must contain the specific numbers cited in G6. + if !strings.Contains(baselineContent, "28,347") && !strings.Contains(baselineContent, "28,429") && + !strings.Contains(baselineContent, "28,453") { + t.Fatal("baseline must contain RF=2 sync_all write IOPS data (28,3xx-28,4xx range)") + } + if !strings.Contains(baselineContent, "136,648") { + t.Fatal("baseline must contain RF=2 read IOPS data (136,648)") + } + + // 2. G5 cites "-56% replication tax" — baseline must contain this. + if strings.Contains(gatesContent, "-56%") { + if !strings.Contains(baselineContent, "-56%") { + t.Fatal("G5 cites -56% replication tax but baseline does not contain -56%") + } + } + + // 3. Launch envelope claims specific transport/network combos — verify against baseline. + // Claimed: NVMe-TCP @ 25Gbps RoCE + if strings.Contains(gatesContent, "NVMe-TCP @ 25Gbps RoCE") { + if !strings.Contains(baselineContent, "NVMe-TCP") || !strings.Contains(baselineContent, "RoCE") { + t.Fatal("launch envelope claims NVMe-TCP @ RoCE but baseline has no such data") + } + } + // Claimed: iSCSI @ 25Gbps RoCE + if strings.Contains(gatesContent, "iSCSI @ 25Gbps RoCE") { + if !strings.Contains(baselineContent, "iSCSI") || !strings.Contains(baselineContent, "RoCE") { + t.Fatal("launch envelope claims iSCSI @ RoCE but baseline has no such data") + } + } + // Claimed: iSCSI @ 1Gbps + if strings.Contains(gatesContent, "iSCSI @ 1Gbps") { + if !strings.Contains(baselineContent, "iSCSI") || !strings.Contains(baselineContent, "1Gbps") { + t.Fatal("launch envelope claims iSCSI @ 1Gbps but baseline has no such data") + } + } + // Exclusion: NVMe-TCP @ 1Gbps must NOT be claimed as supported. + if strings.Contains(gatesContent, "NOT included") { + // Verify baseline indeed lacks NVMe-TCP @ 1Gbps. + hasNvme1g := strings.Contains(baselineContent, "NVMe-TCP") && strings.Contains(baselineContent, "| NVMe-TCP | 1Gbps") + if hasNvme1g { + t.Fatal("baseline contains NVMe-TCP @ 1Gbps data but gates doc excludes it — resolve mismatch") + } + } + + // 4. G7 cites blocker ledger counts — verify against actual ledger. + if strings.Contains(gatesContent, "3 diagnosed") { + diagCount := 0 + for _, line := range strings.Split(blockerContent, "\n") { + if strings.HasPrefix(strings.TrimSpace(line), "| B") { + diagCount++ + } + } + if diagCount != 3 { + t.Fatalf("G7 claims 3 diagnosed blockers but ledger has %d", diagCount) + } + } + if strings.Contains(gatesContent, "3 unresolved") { + unresCount := 0 + for _, line := range strings.Split(blockerContent, "\n") { + if strings.HasPrefix(strings.TrimSpace(line), "| U") { + unresCount++ + } + } + if unresCount != 3 { + t.Fatalf("G7 claims 3 unresolved blockers but ledger has %d", unresCount) + } + } + + // 5. Floor doc gate thresholds must match code-defined gates. + for workload, gate := range perfFloorGates { + // The doc uses comma-formatted numbers (e.g., "1,000" or "5,000"). + minInt := int(gate.MinIOPS) + // Check for both comma-formatted and plain forms. + found := false + for _, form := range []string{ + fmt.Sprintf("%d", minInt), // "1000" + fmt.Sprintf("%d,%03d", minInt/1000, minInt%1000), // "1,000" + } { + if strings.Contains(floorContent, form) { + found = true + break + } + } + if !found { + t.Errorf("floor doc gate for %s should cite minimum %d IOPS but doesn't", workload, minInt) + } + } + + t.Logf("P12P4 RolloutGate: floor doc %d bytes, gates doc %d bytes, %d gate items", + len(floorData), len(gatesData), gateLines) + t.Log("P12P4 RolloutGate: semantic cross-checks passed (baseline, blocker ledger, gate thresholds)") + t.Log("P12P4 RolloutGate: PASS — bounded launch envelope with verified evidence alignment") +} + +// --- Helpers --- + +func init() { + // Seed random for reproducible LBA patterns within a test run. + mrand.Seed(time.Now().UnixNano()) +} + +// formatDuration formats a duration for table display. +func formatDuration(d time.Duration) string { + if d < time.Microsecond { + return fmt.Sprintf("%dns", d.Nanoseconds()) + } + if d < time.Millisecond { + return fmt.Sprintf("%.1fus", float64(d.Nanoseconds())/1000.0) + } + return fmt.Sprintf("%.2fms", float64(d.Nanoseconds())/1e6) +} diff --git a/weed/server/qa_block_soak_test.go b/weed/server/qa_block_soak_test.go new file mode 100644 index 000000000..b1df0e659 --- /dev/null +++ b/weed/server/qa_block_soak_test.go @@ -0,0 +1,308 @@ +package weed_server + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication" + "github.com/seaweedfs/seaweedfs/weed/storage" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/v2bridge" +) + +// ============================================================ +// Phase 12 P2: Soak / Long-Run Stability Hardening +// +// Proves: repeated chosen-path cycles return to bounded truth +// without hidden state drift or unbounded runtime artifacts. +// +// NOT diagnosability, NOT performance-floor, NOT rollout readiness. +// ============================================================ + +const soakCycles = 5 + +type soakSetup struct { + ms *MasterServer + bs *BlockService + store *storage.BlockVolumeStore + dir string +} + +func newSoakSetup(t *testing.T) *soakSetup { + t.Helper() + dir := t.TempDir() + store := storage.NewBlockVolumeStore() + + ms := &MasterServer{ + blockRegistry: NewBlockVolumeRegistry(), + blockAssignmentQueue: NewBlockAssignmentQueue(), + blockFailover: newBlockFailoverState(), + } + ms.blockRegistry.MarkBlockCapable("vs1:9333") + ms.blockRegistry.MarkBlockCapable("vs2:9333") + + ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) { + sanitized := strings.ReplaceAll(server, ":", "_") + serverDir := filepath.Join(dir, sanitized) + os.MkdirAll(serverDir, 0755) + volPath := filepath.Join(serverDir, fmt.Sprintf("%s.blk", name)) + vol, err := blockvol.CreateBlockVol(volPath, blockvol.CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 256 * 1024, + }) + if err != nil { + return nil, err + } + vol.Close() + if _, err := store.AddBlockVolume(volPath, ""); err != nil { + return nil, err + } + host := server + if idx := strings.LastIndex(server, ":"); idx >= 0 { + host = server[:idx] + } + return &blockAllocResult{ + Path: volPath, + IQN: fmt.Sprintf("iqn.2024.test:%s", name), + ISCSIAddr: host + ":3260", + ReplicaDataAddr: server + ":14260", + ReplicaCtrlAddr: server + ":14261", + RebuildListenAddr: server + ":15000", + }, nil + } + ms.blockVSDelete = func(ctx context.Context, server string, name string) error { return nil } + + bs := &BlockService{ + blockStore: store, + blockDir: filepath.Join(dir, "vs1_9333"), + listenAddr: "127.0.0.1:3260", + localServerID: "vs1:9333", + v2Bridge: v2bridge.NewControlBridge(), + v2Orchestrator: engine.NewRecoveryOrchestrator(), + replStates: make(map[string]*volReplState), + } + bs.v2Recovery = NewRecoveryManager(bs) + + t.Cleanup(func() { + bs.v2Recovery.Shutdown() + store.Close() + }) + + return &soakSetup{ms: ms, bs: bs, store: store, dir: dir} +} + +func (s *soakSetup) deliver(server string) int { + pending := s.ms.blockAssignmentQueue.Peek(server) + if len(pending) == 0 { + return 0 + } + protoAssignments := blockvol.AssignmentsToProto(pending) + goAssignments := blockvol.AssignmentsFromProto(protoAssignments) + s.bs.ProcessAssignments(goAssignments) + return len(goAssignments) +} + +// --- Repeated create/failover/recover cycles with end-of-cycle truth checks --- + +func TestP12P2_RepeatedCycles_NoDrift(t *testing.T) { + s := newSoakSetup(t) + ctx := context.Background() + + for cycle := 1; cycle <= soakCycles; cycle++ { + volName := fmt.Sprintf("soak-vol-%d", cycle) + + // Step 1: Create. + createResp, err := s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{ + Name: volName, SizeBytes: 1 << 20, + }) + if err != nil { + t.Fatalf("cycle %d create: %v", cycle, err) + } + primaryVS := createResp.VolumeServer + + // Deliver initial assignment. + s.bs.localServerID = primaryVS + s.deliver(primaryVS) + time.Sleep(100 * time.Millisecond) + + entry, ok := s.ms.blockRegistry.Lookup(volName) + if !ok { + t.Fatalf("cycle %d: volume not in registry", cycle) + } + + // Step 2: Failover. + s.ms.blockRegistry.UpdateEntry(volName, func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) + s.ms.failoverBlockVolumes(primaryVS) + + entryAfter, _ := s.ms.blockRegistry.Lookup(volName) + if entryAfter.Epoch <= entry.Epoch { + t.Fatalf("cycle %d: epoch did not increase: %d <= %d", cycle, entryAfter.Epoch, entry.Epoch) + } + + // Deliver failover assignment. + newPrimary := entryAfter.VolumeServer + s.bs.localServerID = newPrimary + s.deliver(newPrimary) + time.Sleep(100 * time.Millisecond) + + // Step 3: Reconnect. + s.ms.recoverBlockVolumes(primaryVS) + s.bs.localServerID = primaryVS + s.deliver(primaryVS) + s.bs.localServerID = newPrimary + s.deliver(newPrimary) + time.Sleep(100 * time.Millisecond) + + // === End-of-cycle truth checks === + + // Registry: volume exists, epoch monotonic. + finalEntry, ok := s.ms.blockRegistry.Lookup(volName) + if !ok { + t.Fatalf("cycle %d: volume missing from registry at end", cycle) + } + if finalEntry.Epoch < entryAfter.Epoch { + t.Fatalf("cycle %d: registry epoch regressed: %d < %d", cycle, finalEntry.Epoch, entryAfter.Epoch) + } + + // VS-visible: promoted vol epoch matches. + var volEpoch uint64 + if err := s.store.WithVolume(entryAfter.Path, func(vol *blockvol.BlockVol) error { + volEpoch = vol.Epoch() + return nil + }); err != nil { + t.Fatalf("cycle %d: promoted vol access failed: %v", cycle, err) + } + if volEpoch < entryAfter.Epoch { + t.Fatalf("cycle %d: vol epoch=%d < registry=%d", cycle, volEpoch, entryAfter.Epoch) + } + + // Publication: lookup matches registry truth (not just non-empty). + lookupResp, err := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: volName}) + if err != nil { + t.Fatalf("cycle %d: lookup failed: %v", cycle, err) + } + if lookupResp.IscsiAddr != finalEntry.ISCSIAddr { + t.Fatalf("cycle %d: lookup iSCSI=%q != registry=%q", cycle, lookupResp.IscsiAddr, finalEntry.ISCSIAddr) + } + if lookupResp.VolumeServer != finalEntry.VolumeServer { + t.Fatalf("cycle %d: lookup VS=%q != registry=%q", cycle, lookupResp.VolumeServer, finalEntry.VolumeServer) + } + + t.Logf("cycle %d: registry=%d vol=%d lookup=registry ✓", + cycle, finalEntry.Epoch, volEpoch) + } + + t.Logf("P12P2 repeated cycles: %d cycles, all end-of-cycle truth checks passed", soakCycles) +} + +// --- Runtime state hygiene: no unbounded leftovers after cycles --- + +func TestP12P2_RuntimeHygiene_NoLeftovers(t *testing.T) { + s := newSoakSetup(t) + ctx := context.Background() + + // Create and delete several volumes to exercise lifecycle. + for i := 1; i <= soakCycles; i++ { + name := fmt.Sprintf("hygiene-vol-%d", i) + s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{ + Name: name, SizeBytes: 1 << 20, + }) + entry, _ := s.ms.blockRegistry.Lookup(name) + s.bs.localServerID = entry.VolumeServer + s.deliver(entry.VolumeServer) + time.Sleep(50 * time.Millisecond) + } + + // Delete all volumes. + for i := 1; i <= soakCycles; i++ { + name := fmt.Sprintf("hygiene-vol-%d", i) + s.ms.DeleteBlockVolume(ctx, &master_pb.DeleteBlockVolumeRequest{Name: name}) + } + + time.Sleep(200 * time.Millisecond) + + // Check: no stale recovery tasks. + activeTasks := s.bs.v2Recovery.ActiveTaskCount() + if activeTasks > 0 { + t.Fatalf("stale recovery tasks: %d (expected 0 after all volumes deleted)", activeTasks) + } + + // Check: registry should have no entries for deleted volumes. + for i := 1; i <= soakCycles; i++ { + name := fmt.Sprintf("hygiene-vol-%d", i) + if _, ok := s.ms.blockRegistry.Lookup(name); ok { + t.Fatalf("stale registry entry: %s (should be deleted)", name) + } + } + + // Check: assignment queue should not have unbounded stale entries. + for _, server := range []string{"vs1:9333", "vs2:9333"} { + pending := s.ms.blockAssignmentQueue.Peek(server) + // Some pending entries may exist (lease grants etc), but check for bounded size. + if len(pending) > soakCycles*2 { + t.Fatalf("unbounded stale assignments for %s: %d", server, len(pending)) + } + } + + t.Logf("P12P2 hygiene: %d volumes created+deleted, 0 stale tasks, 0 stale registry, bounded queue", soakCycles) +} + +// --- Steady-state repeated delivery: idempotence holds over many cycles --- + +func TestP12P2_SteadyState_IdempotenceHolds(t *testing.T) { + s := newSoakSetup(t) + ctx := context.Background() + + s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{ + Name: "steady-vol-1", SizeBytes: 1 << 20, + }) + + entry, _ := s.ms.blockRegistry.Lookup("steady-vol-1") + s.bs.localServerID = entry.VolumeServer + s.deliver(entry.VolumeServer) + time.Sleep(200 * time.Millisecond) + + replicaID := entry.Path + "/" + entry.Replicas[0].Server + eventsAfterFirst := len(s.bs.v2Orchestrator.Log.EventsFor(replicaID)) + if eventsAfterFirst == 0 { + t.Fatal("first delivery must create events") + } + + // Deliver the same assignment many times. + for i := 0; i < soakCycles*2; i++ { + s.deliver(entry.VolumeServer) + time.Sleep(20 * time.Millisecond) + } + + eventsAfterSoak := len(s.bs.v2Orchestrator.Log.EventsFor(replicaID)) + if eventsAfterSoak != eventsAfterFirst { + t.Fatalf("idempotence drift: events %d → %d after %d repeated deliveries", + eventsAfterFirst, eventsAfterSoak, soakCycles*2) + } + + // Verify: registry, vol, and lookup are still coherent. + finalEntry, _ := s.ms.blockRegistry.Lookup("steady-vol-1") + if finalEntry.Epoch != entry.Epoch { + t.Fatalf("epoch drifted: %d → %d", entry.Epoch, finalEntry.Epoch) + } + + lookupResp, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "steady-vol-1"}) + if lookupResp.IscsiAddr != finalEntry.ISCSIAddr { + t.Fatalf("lookup iSCSI=%q != registry=%q after soak", lookupResp.IscsiAddr, finalEntry.ISCSIAddr) + } + if lookupResp.VolumeServer != finalEntry.VolumeServer { + t.Fatalf("lookup VS=%q != registry=%q after soak", lookupResp.VolumeServer, finalEntry.VolumeServer) + } + + t.Logf("P12P2 steady state: %d repeated deliveries, events stable at %d, lookup=registry ✓", + soakCycles*2, eventsAfterFirst) +}