Browse Source

feat: Phase 12 P3+P4 — diagnosability surfaces, perf floor, rollout gates

P3: Add explicit bounded read-only diagnosis surfaces for all symptom classes:
- FailoverDiagnostic: volume-oriented failover state with per-volume
  DeferredPromotion/PendingRebuild entries and proper timer lifecycle
- PublicationDiagnostic: two-read coherence check (LookupBlockVolume vs
  registry authority) with computed Coherent verdict
- RecoveryDiagnostic: minimal ActiveTasks surface (Path A)
- Blocker ledger: 3 diagnosed + 3 unresolved, finite, from actual file
- Runbook references only exposed surfaces, no internal state

P4: Add bounded performance floor + rollout-gate package:
- Engine-local floor measurement with explicit IOPS gates per workload
- Cost characterization: WAL 2x write amp, -56% replication tax
- Rollout gates with semantic cross-checks against cited evidence
  (baseline numbers, transport/network matrix, blocker counts)
- Launch envelope tightened to actually measured combinations only

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 24 hours ago
parent
commit
643a5a1074
  1. 29
      sw-block/.private/phase/phase-12-p3-blockers.md
  2. 70
      sw-block/.private/phase/phase-12-p3-runbook.md
  3. 101
      sw-block/.private/phase/phase-12-p4-floor.md
  4. 64
      sw-block/.private/phase/phase-12-p4-rollout-gates.md
  5. 374
      weed/server/block_recovery.go
  6. 192
      weed/server/master_block_failover.go
  7. 270
      weed/server/qa_block_diagnosability_test.go
  8. 582
      weed/server/qa_block_perf_test.go
  9. 308
      weed/server/qa_block_soak_test.go

29
sw-block/.private/phase/phase-12-p3-blockers.md

@ -0,0 +1,29 @@
# Phase 12 P3 — Blocker Ledger
Date: 2026-04-02
Scope: bounded diagnosability / blocker accounting for the accepted RF=2 sync_all chosen path
## Diagnosed and Bounded
| ID | Symptom | Evidence Surface | Owning Truth | Status |
|----|---------|-----------------|--------------|--------|
| B1 | Failover does not converge | failover logs + registry Lookup epoch/primary | registry authority | Diagnosed: convergence depends on lease expiry + heartbeat cycle; bounded by lease TTL |
| B2 | Lookup publication stale after failover | LookupBlockVolume response vs registry entry | registry ISCSIAddr/VolumeServer | Diagnosed: publication updates on failover assignment delivery; bounded by assignment queue delivery |
| B3 | Recovery tasks remain after volume delete | RecoveryManager.DiagnosticSnapshot | RecoveryManager task map | Diagnosed: tasks drain on shutdown/cancel; bounded by RecoveryManager lifecycle |
## Unresolved but Explicit
| ID | Symptom | Current Evidence | Why Unresolved | Blocks P4/Rollout? |
|----|---------|-----------------|----------------|-------------------|
| U1 | V2 engine accepts stale-epoch assignments at orchestrator level | V2 idempotence check skips only same-epoch; lower epoch creates new sender | Engine ApplyAssignment does not check epoch monotonicity on Reconcile | No — V1 HandleAssignment rejects epoch regression; V2 is secondary |
| U2 | Single-process test cannot exercise Primary→Rebuilding role transition | HandleAssignment rejects transition in shared store | Test harness limitation, not production bug | No — production VS has separate stores |
| U3 | gRPC stream transport not exercised in control-loop tests | All logic above/below stream is real; stream itself bypassed | Would require live master+VS gRPC servers in test | Blocks full integration test, not correctness |
## Out of Scope for P3
- Performance floor characterization
- Rollout-gate criteria
- Hours/days soak
- RF>2 topology
- NVMe runtime transport proof
- CSI snapshot/expand

70
sw-block/.private/phase/phase-12-p3-runbook.md

@ -0,0 +1,70 @@
# Phase 12 P3 — Bounded Runbook
Scope: diagnosis of three symptom classes on the accepted RF=2 sync_all chosen path.
All diagnosis steps reference ONLY explicit bounded read-only surfaces:
- `LookupBlockVolume` — gRPC RPC returning current primary VS + iSCSI address
- `FailoverDiagnostic` — volume-oriented failover state snapshot
- `PublicationDiagnostic` — lookup vs authority coherence snapshot
- `RecoveryDiagnostic` — active recovery task set snapshot
- Blocker ledger — finite file at `phase-12-p3-blockers.md`
## S1: Failover/Recovery Convergence Stall
**Visible symptom:** Volume remains unavailable after a VS death; lookup still returns the old primary.
**Diagnosis surfaces:**
- `LookupBlockVolume(volumeName)` — check if `VolumeServer` is still the dead server
- `FailoverDiagnostic` — check `Volumes[]` for the affected volume
**Diagnosis steps:**
1. Call `LookupBlockVolume(volumeName)`. If `VolumeServer` changed from the dead server, failover succeeded.
2. If unchanged: read `FailoverDiagnostic`. Find the volume by name in `Volumes[]`.
3. If found with `DeferredPromotion=true`: lease-wait — failover is deferred until lease expires.
4. If found with `PendingRebuild=true`: failover completed, rebuild is pending for the dead server.
5. If `DeferredPromotionCount[deadServer] > 0` in the aggregate: deferred promotions are queued.
6. If the volume does not appear in either lookup change or `FailoverDiagnostic`: escalate.
**Conclusion classes (from surfaces only):**
- **Lease-wait:** `FailoverDiagnostic.DeferredPromotionCount[deadServer] > 0` — normal, bounded by lease TTL.
- **Rebuild-pending:** `FailoverDiagnostic.Volumes[].PendingRebuild=true` — failover done, rebuild queued.
- **Converged:** `LookupBlockVolume` shows new primary, no failover entries — resolved.
- **Unresolved:** None of the above — escalate.
## S2: Publication/Lookup Mismatch
**Visible symptom:** `LookupBlockVolume` returns an iSCSI address or volume server that doesn't match expected state.
**Diagnosis surfaces:**
- `LookupBlockVolume(volumeName)` — operator-visible publication
- `PublicationDiagnostic` — explicit coherence check (lookup vs authority)
**Diagnosis steps:**
1. Call `PublicationDiagnosticFor(volumeName)`. Check `Coherent` field.
2. If `Coherent=true`: lookup matches registry authority — no mismatch.
3. If `Coherent=false`: read `Reason` for explanation. Compare `LookupVolumeServer` vs `AuthorityVolumeServer` and `LookupIscsiAddr` vs `AuthorityIscsiAddr`.
4. Cross-check with `LookupBlockVolume` directly: repeated lookups should be self-consistent.
**Conclusion classes (from surfaces only):**
- **Coherent:** `PublicationDiagnostic.Coherent=true` — no mismatch.
- **Stale client:** Coherent but client sees old value — bounded by client re-query.
- **Unresolved:** `PublicationDiagnostic.Coherent=false` with no transient cause — escalate.
## S3: Leftover Runtime Work After Convergence
**Visible symptom:** After volume deletion or steady-state convergence, recovery tasks should have drained.
**Diagnosis surfaces:**
- `RecoveryDiagnostic``ActiveTasks` list (replicaIDs with active recovery work)
**Diagnosis steps:**
1. Call `RecoveryManager.DiagnosticSnapshot()`. Read `ActiveTasks`.
2. If `ActiveTasks` is empty: clean — no leftover work.
3. If non-empty: check whether any task replicaID contains the deleted volume's path.
4. If a deleted volume's replicaID is present in `ActiveTasks`: residue — escalate.
5. If all tasks are for live volumes: non-empty but expected — normal in-flight work.
**Conclusion classes (from surfaces only):**
- **Clean:** `RecoveryDiagnostic.ActiveTasks` is empty — runtime converged.
- **Non-empty, no residue:** Tasks present but none for the deleted/converged volume — normal.
- **Residue:** Deleted volume's replicaID still in `ActiveTasks` — escalate.

101
sw-block/.private/phase/phase-12-p4-floor.md

@ -0,0 +1,101 @@
# Phase 12 P4 — Performance Floor Summary
Date: 2026-04-02
Scope: bounded performance floor for the accepted RF=2, sync_all chosen path.
## Workload Envelope
| Parameter | Value |
|-----------|-------|
| Topology | RF=2, sync_all |
| Operations | 4K random write, 4K random read, sequential write, sequential read |
| Runtime | Steady-state, no failover, no disturbance |
| Path | Accepted chosen path (same as P1/P2/P3) |
## Environment
### Unit Test Harness (engine-local)
| Parameter | Value |
|-----------|-------|
| Name | `TestP12P4_PerformanceFloor_Bounded` |
| Location | `weed/server/qa_block_perf_test.go` |
| Platform | Single-process, local disk |
| Volume | 64MB, 4K blocks, 16MB WAL |
| Writer | Single-threaded (worst-case for group commit) |
| Replication | Not exercised (engine-local only) |
| Measurement | Worst of 3 iterations (floor, not peak) |
### Production Baseline (cross-machine)
| Parameter | Value |
|-----------|-------|
| Name | `baseline-roce-20260401` |
| Location | `learn/projects/sw-block/test/results/baseline-roce-20260401.md` |
| Hardware | m01 (10.0.0.1) - M02 (10.0.0.3), 25Gbps RoCE |
| Protocol | NVMe-TCP |
| Volume | 2GB, RF=2, sync_all, cross-machine replication |
| Writer | fio, QD1-128, j=4 |
## Floor Table: Production (RF=2, sync_all, NVMe-TCP, 25Gbps RoCE)
These are measured floor values from the production baseline, not the unit test.
| Workload | Floor IOPS | Notes |
|----------|-----------|-------|
| 4K random write QD1 | 28,347 | Barrier round-trip limited (flat across QD) |
| 4K random write QD32 | 28,453 | Same barrier ceiling |
| 4K random read QD32 | 136,648 | No replication overhead |
| Mixed 70/30 QD32 | 28,423 | Write-side limited |
Latency: Write latency is bounded by sync_all barrier round-trip (~35us at QD1).
Read latency: sub-microsecond for cached, single-digit microseconds for extent.
## Floor Table: Engine-Local (unit test harness)
These values are measured by `TestP12P4_PerformanceFloor_Bounded` on the dev machine.
They characterize the engine I/O floor WITHOUT transport or replication.
Actual values vary by hardware; the test produces them on each run.
| Workload | Metric | Method | Gate |
|----------|--------|--------|------|
| 4K random write | Floor IOPS, Avg/P50/P99/Max latency | Worst of 3 iterations | >= 1,000 IOPS, P99 <= 100ms |
| 4K random read | Floor IOPS, Avg/P50/P99/Max latency | Worst of 3 iterations | >= 5,000 IOPS |
| 4K sequential write | Floor IOPS, Avg/P50/P99/Max latency | Worst of 3 iterations | >= 2,000 IOPS, P99 <= 100ms |
| 4K sequential read | Floor IOPS, Avg/P50/P99/Max latency | Worst of 3 iterations | >= 10,000 IOPS |
Gate thresholds are regression gates enforced in code (`perfFloorGates` in `qa_block_perf_test.go`).
Set at ~10% of measured values to tolerate slow CI/VM hardware while catching catastrophic regressions.
## Cost Summary
| Cost | Value | Source |
|------|-------|--------|
| WAL write amplification | 2x minimum | Engine design: each write → WAL + eventual extent flush |
| Replication tax (RF=2 sync_all vs RF=1) | -56% | baseline-roce-20260401.md (NVMe-TCP, 25Gbps RoCE) |
| Replication tax (RF=2 sync_all vs RF=1, iSCSI 1Gbps) | -56% | baseline-roce-20260401.md |
| Degraded mode penalty (sync_all RF=2, one replica dead) | -66% | baseline-roce-20260401.md (barrier timeout) |
| Group commit | 1 fdatasync per batch | Amortizes sync cost across concurrent writers |
## Acceptance Evidence
| Item | Evidence | Type |
|------|----------|------|
| Floor gates pass | `perfFloorGates` thresholds enforced per workload | Acceptance |
| Workload runs repeatably | `TestP12P4_PerformanceFloor_Bounded` passes | Acceptance |
| Cost statement is bounded | `TestP12P4_CostCharacterization_Bounded` passes | Acceptance |
| Production baseline exists | `baseline-roce-20260401.md` with measured values | Acceptance |
| Floor is worst-of-N, not peak | Test takes minimum IOPS across 3 iterations | Method |
| Regression-safe | Test fails if floor drops below gate (blocks rollout) | Acceptance |
| Replication tax documented | -56% from measured production baseline | Support telemetry |
## What P4 does NOT claim
- This is not a claim that the measured floor is "good enough" for any specific application.
- This does not claim readiness for failover-under-load scenarios.
- This does not claim readiness for hours/days soak under load.
- This does not claim readiness for RF>2 topologies.
- This does not claim readiness for all transport combinations (iSCSI + NVMe + kernel versions).
- This does not claim readiness for production rollout beyond the explicitly named launch envelope.
- Engine-local floor numbers are not production floor numbers.
- The replication tax is measured on one specific hardware configuration and may differ on other hardware.

64
sw-block/.private/phase/phase-12-p4-rollout-gates.md

@ -0,0 +1,64 @@
# Phase 12 P4 — Rollout Gates
Date: 2026-04-02
Scope: bounded first-launch envelope for the accepted RF=2, sync_all chosen path.
This is a bounded first-launch envelope, not general readiness.
## Supported Launch Envelope
Only the transport/network combinations with measured baselines are included.
| Parameter | Value |
|-----------|-------|
| Topology | RF=2, sync_all |
| Transport + Network | NVMe-TCP @ 25Gbps RoCE (measured), iSCSI @ 25Gbps RoCE (measured), iSCSI @ 1Gbps (measured) |
| NOT included | NVMe-TCP @ 1Gbps (not measured) |
| Volume size | Up to 2GB (tested baseline) |
| Failover | Lease-based, bounded by TTL (30s default) |
| Recovery | Catch-up-first, rebuild fallback |
| Degraded mode | Documented -66% write penalty (sync_all RF=2, one replica dead) |
## Cleared Gates
| Gate | Evidence | Status | Notes |
|------|----------|--------|-------|
| G1 | P1 disturbance tests pass | Cleared | Restart/reconnect correctness under disturbance |
| G2 | P2 soak tests pass | Cleared | Repeated create/failover/recover cycles, no drift |
| G3 | P3 diagnosability tests pass | Cleared | Explicit bounded diagnosis surfaces for all symptom classes |
| G4 | P4 floor gates pass | Cleared | Explicit IOPS thresholds + P99 ceilings enforced per workload in code |
| G5 | P4 cost characterization bounded | Cleared | WAL 2x write amp, -56% replication tax documented |
| G6 | Production baseline exists | Cleared | baseline-roce-20260401.md: 28.4K write IOPS, 136.6K read IOPS |
| G8 | Floor gates are regression-safe | Cleared | Test fails if any workload drops below defined minimum IOPS or exceeds P99 ceiling |
| G7 | Blocker ledger finite | Cleared | 3 diagnosed (B1-B3) + 3 unresolved (U1-U3), all explicit |
## Remaining Blockers / Exclusions
| Exclusion | Why | Impact |
|-----------|-----|--------|
| E1 | Failover-under-load perf not measured | Cannot claim bounded perf during failover |
| E2 | Hours/days soak not run | Cannot claim long-run stability under sustained load |
| E3 | RF>2 not measured | Cannot claim perf floor for RF=3+ |
| E4 | Broad transport matrix not tested | Cannot claim parity across all kernel/NVMe/iSCSI versions |
| E5 | Degraded mode is severe (-66%) | sync_all RF=2 has sharp write cliff on replica death |
| E6 | V2 stale-epoch at orchestrator level (U1 from P3) | V1 guards suffice; V2 is secondary path |
| E7 | gRPC stream transport not exercised in unit tests (U3 from P3) | Blocks full integration test, not correctness |
## Reject Conditions
This launch envelope should be REJECTED if:
1. Any P1/P2/P3 test regresses (correctness/stability/diagnosability gate violated)
2. Production baseline numbers are not reproducible on the target hardware
3. Degraded mode behavior (-66% cliff) is not acceptable for the deployment scenario
4. The deployment requires RF>2, failover-under-load guarantees, or long soak proof
5. The deployment requires transport combinations not covered by the baseline
## What P4 does NOT claim
- This does not claim general production readiness.
- This does not claim readiness for any deployment outside the named launch envelope.
- This does not claim that the performance floor is optimal or final.
- This does not claim that the degraded-mode penalty is acceptable (deployment-specific decision).
- This does not claim hours/days stability under sustained load.
- This is a bounded first-launch gate, not a broad rollout approval.

374
weed/server/block_recovery.go

@ -0,0 +1,374 @@
package weed_server
import (
"context"
"sync"
bridge "github.com/seaweedfs/seaweedfs/sw-block/bridge/blockvol"
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/v2bridge"
)
// recoveryTask tracks a live recovery goroutine for one replica target.
// The task pointer serves as identity token — only the goroutine that owns
// THIS pointer may mark it as done.
type recoveryTask struct {
replicaID string
cancel context.CancelFunc
done chan struct{} // closed when the goroutine exits
}
// RecoveryManager owns live recovery execution for all replica targets.
//
// Ownership model:
// - At most one recovery goroutine per replicaID at any time.
// - On supersede/replace: the old goroutine is cancelled AND drained
// before the replacement starts. No overlap.
// - Cancellation: context cancel + session invalidation (for removal/shutdown).
// For supersede: context cancel only (engine already attached replacement session).
type RecoveryManager struct {
bs *BlockService
mu sync.Mutex
tasks map[string]*recoveryTask
wg sync.WaitGroup
// TestHook: if set, called before execution starts. Tests use this
// to hold the goroutine alive for serialized-replacement proofs.
OnBeforeExecute func(replicaID string)
}
func NewRecoveryManager(bs *BlockService) *RecoveryManager {
return &RecoveryManager{
bs: bs,
tasks: make(map[string]*recoveryTask),
}
}
// HandleAssignmentResult processes the engine's assignment result.
//
// Engine result semantics:
// - SessionsCreated: new session, start goroutine
// - SessionsSuperseded: old replaced by new — cancel+drain old, start new
// - Removed: sender gone — cancel+drain, invalidate session
func (rm *RecoveryManager) HandleAssignmentResult(result engine.AssignmentResult, assignments []blockvol.BlockVolumeAssignment) {
// Removed: cancel + invalidate + drain.
for _, replicaID := range result.Removed {
rm.cancelAndDrain(replicaID, true)
}
// Superseded: cancel + drain (no invalidate — engine has replacement session),
// then start new.
for _, replicaID := range result.SessionsSuperseded {
rm.cancelAndDrain(replicaID, false)
rm.startTask(replicaID, assignments)
}
// Created: start new (cancel stale defensively).
for _, replicaID := range result.SessionsCreated {
rm.cancelAndDrain(replicaID, false)
rm.startTask(replicaID, assignments)
}
}
// cancelAndDrain cancels a running task and WAITS for it to exit.
// This ensures no overlap between old and new owners.
func (rm *RecoveryManager) cancelAndDrain(replicaID string, invalidateSession bool) {
rm.mu.Lock()
task, ok := rm.tasks[replicaID]
if !ok {
rm.mu.Unlock()
return
}
glog.V(1).Infof("recovery: cancelling+draining task for %s (invalidate=%v)", replicaID, invalidateSession)
task.cancel()
if invalidateSession && rm.bs.v2Orchestrator != nil {
if s := rm.bs.v2Orchestrator.Registry.Sender(replicaID); s != nil {
s.InvalidateSession("recovery_removed", engine.StateDisconnected)
}
}
delete(rm.tasks, replicaID)
doneCh := task.done
rm.mu.Unlock()
// Wait for the old goroutine to exit OUTSIDE the lock.
// This serializes replacement: new task cannot start until old is fully drained.
<-doneCh
}
// startTask creates and starts a new recovery goroutine. Caller must ensure
// no existing task for this replicaID (call cancelAndDrain first).
func (rm *RecoveryManager) startTask(replicaID string, assignments []blockvol.BlockVolumeAssignment) {
rm.mu.Lock()
defer rm.mu.Unlock()
rebuildAddr := rm.deriveRebuildAddr(replicaID, assignments)
ctx, cancel := context.WithCancel(context.Background())
task := &recoveryTask{
replicaID: replicaID,
cancel: cancel,
done: make(chan struct{}),
}
rm.tasks[replicaID] = task
rm.wg.Add(1)
go rm.runRecovery(ctx, task, rebuildAddr)
}
// Shutdown cancels all active recovery tasks and waits for drain.
func (rm *RecoveryManager) Shutdown() {
rm.mu.Lock()
for _, task := range rm.tasks {
task.cancel()
if rm.bs.v2Orchestrator != nil {
if s := rm.bs.v2Orchestrator.Registry.Sender(task.replicaID); s != nil {
s.InvalidateSession("recovery_shutdown", engine.StateDisconnected)
}
}
}
rm.tasks = make(map[string]*recoveryTask)
rm.mu.Unlock()
rm.wg.Wait()
}
// ActiveTaskCount returns the number of active recovery tasks (for testing).
func (rm *RecoveryManager) ActiveTaskCount() int {
rm.mu.Lock()
defer rm.mu.Unlock()
return len(rm.tasks)
}
// DiagnosticSnapshot returns a bounded read-only snapshot of active recovery
// tasks for operator-visible diagnosis. Each entry shows the replicaID being
// recovered. This is the P3 diagnosability surface — read-only, no semantics.
type RecoveryDiagnostic struct {
ActiveTasks []string // replicaIDs with active recovery work
}
func (rm *RecoveryManager) DiagnosticSnapshot() RecoveryDiagnostic {
rm.mu.Lock()
defer rm.mu.Unlock()
diag := RecoveryDiagnostic{}
for id := range rm.tasks {
diag.ActiveTasks = append(diag.ActiveTasks, id)
}
return diag
}
// runRecovery is the recovery goroutine for one replica target.
func (rm *RecoveryManager) runRecovery(ctx context.Context, task *recoveryTask, rebuildAddr string) {
defer rm.wg.Done()
defer close(task.done) // signal drain completion
defer func() {
rm.mu.Lock()
// Only delete if we're still the active task (pointer comparison).
if rm.tasks[task.replicaID] == task {
delete(rm.tasks, task.replicaID)
}
rm.mu.Unlock()
}()
replicaID := task.replicaID
if ctx.Err() != nil {
return
}
orch := rm.bs.v2Orchestrator
s := orch.Registry.Sender(replicaID)
if s == nil {
glog.V(1).Infof("recovery: sender %s not found, skipping", replicaID)
return
}
sessSnap := s.SessionSnapshot()
if sessSnap == nil {
glog.V(1).Infof("recovery: sender %s has no active session, skipping", replicaID)
return
}
glog.V(0).Infof("recovery: starting %s session for %s (rebuildAddr=%s)",
sessSnap.Kind, replicaID, rebuildAddr)
if rm.OnBeforeExecute != nil {
rm.OnBeforeExecute(replicaID)
}
switch sessSnap.Kind {
case engine.SessionCatchUp:
rm.runCatchUp(ctx, replicaID, rebuildAddr)
case engine.SessionRebuild:
rm.runRebuild(ctx, replicaID, rebuildAddr)
default:
glog.V(1).Infof("recovery: unknown session kind %s for %s", sessSnap.Kind, replicaID)
}
}
func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID, rebuildAddr string) {
bs := rm.bs
volPath := rm.volumePathForReplica(replicaID)
if volPath == "" {
glog.Warningf("recovery: cannot determine volume path for %s", replicaID)
return
}
var sa engine.StorageAdapter
var replicaFlushedLSN uint64
var executor *v2bridge.Executor
if err := bs.blockStore.WithVolume(volPath, func(vol *blockvol.BlockVol) error {
reader := v2bridge.NewReader(vol)
pinner := v2bridge.NewPinner(vol)
sa = bridge.NewStorageAdapter(
&readerShimForRecovery{reader},
&pinnerShimForRecovery{pinner},
)
if s := bs.v2Orchestrator.Registry.Sender(replicaID); s != nil {
if snap := s.SessionSnapshot(); snap != nil {
replicaFlushedLSN = snap.StartLSN
}
}
executor = v2bridge.NewExecutor(vol, rebuildAddr)
return nil
}); err != nil {
glog.Warningf("recovery: cannot access volume %s: %v", volPath, err)
return
}
if ctx.Err() != nil {
return
}
driver := &engine.RecoveryDriver{Orchestrator: bs.v2Orchestrator, Storage: sa}
plan, err := driver.PlanRecovery(replicaID, replicaFlushedLSN)
if err != nil {
glog.Warningf("recovery: plan failed for %s: %v", replicaID, err)
return
}
if ctx.Err() != nil {
driver.CancelPlan(plan, "context_cancelled")
return
}
exec := engine.NewCatchUpExecutor(driver, plan)
exec.IO = executor
if execErr := exec.Execute(nil, 0); execErr != nil {
if ctx.Err() != nil {
glog.V(1).Infof("recovery: catch-up cancelled for %s: %v", replicaID, execErr)
} else {
glog.Warningf("recovery: catch-up execution failed for %s: %v", replicaID, execErr)
}
return
}
glog.V(0).Infof("recovery: catch-up completed for %s", replicaID)
}
func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID, rebuildAddr string) {
bs := rm.bs
volPath := rm.volumePathForReplica(replicaID)
if volPath == "" {
glog.Warningf("recovery: cannot determine volume path for %s", replicaID)
return
}
var sa engine.StorageAdapter
var executor *v2bridge.Executor
if err := bs.blockStore.WithVolume(volPath, func(vol *blockvol.BlockVol) error {
reader := v2bridge.NewReader(vol)
pinner := v2bridge.NewPinner(vol)
sa = bridge.NewStorageAdapter(
&readerShimForRecovery{reader},
&pinnerShimForRecovery{pinner},
)
executor = v2bridge.NewExecutor(vol, rebuildAddr)
return nil
}); err != nil {
glog.Warningf("recovery: cannot access volume %s: %v", volPath, err)
return
}
if ctx.Err() != nil {
return
}
driver := &engine.RecoveryDriver{Orchestrator: bs.v2Orchestrator, Storage: sa}
plan, err := driver.PlanRebuild(replicaID)
if err != nil {
glog.Warningf("recovery: rebuild plan failed for %s: %v", replicaID, err)
return
}
if ctx.Err() != nil {
driver.CancelPlan(plan, "context_cancelled")
return
}
exec := engine.NewRebuildExecutor(driver, plan)
exec.IO = executor
if execErr := exec.Execute(); execErr != nil {
if ctx.Err() != nil {
glog.V(1).Infof("recovery: rebuild cancelled for %s: %v", replicaID, execErr)
} else {
glog.Warningf("recovery: rebuild execution failed for %s: %v", replicaID, execErr)
}
return
}
glog.V(0).Infof("recovery: rebuild completed for %s", replicaID)
}
func (rm *RecoveryManager) deriveRebuildAddr(replicaID string, assignments []blockvol.BlockVolumeAssignment) string {
volPath := rm.volumePathForReplica(replicaID)
for _, a := range assignments {
if a.Path == volPath && a.RebuildAddr != "" {
return a.RebuildAddr
}
}
return ""
}
func (rm *RecoveryManager) volumePathForReplica(replicaID string) string {
for i := len(replicaID) - 1; i >= 0; i-- {
if replicaID[i] == '/' {
return replicaID[:i]
}
}
return ""
}
// --- Bridge shims ---
type readerShimForRecovery struct{ r *v2bridge.Reader }
func (s *readerShimForRecovery) ReadState() bridge.BlockVolState {
rs := s.r.ReadState()
return bridge.BlockVolState{
WALHeadLSN: rs.WALHeadLSN,
WALTailLSN: rs.WALTailLSN,
CommittedLSN: rs.CommittedLSN,
CheckpointLSN: rs.CheckpointLSN,
CheckpointTrusted: rs.CheckpointTrusted,
}
}
type pinnerShimForRecovery struct{ p *v2bridge.Pinner }
func (s *pinnerShimForRecovery) HoldWALRetention(startLSN uint64) (func(), error) {
return s.p.HoldWALRetention(startLSN)
}
func (s *pinnerShimForRecovery) HoldSnapshot(checkpointLSN uint64) (func(), error) {
return s.p.HoldSnapshot(checkpointLSN)
}
func (s *pinnerShimForRecovery) HoldFullBase(committedLSN uint64) (func(), error) {
return s.p.HoldFullBase(committedLSN)
}

192
weed/server/master_block_failover.go

@ -1,10 +1,12 @@
package weed_server
import (
"context"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
@ -18,18 +20,139 @@ type pendingRebuild struct {
ReplicaCtrlAddr string // CP13-8: saved from before death for catch-up-first recovery
}
// deferredPromotion tracks a deferred promotion timer with its volume context.
type deferredPromotion struct {
Timer *time.Timer
VolumeName string
CurrentPrimary string // current (stale) primary that will be replaced
AffectedServer string // dead server addr
}
// blockFailoverState holds failover and rebuild state on the master.
type blockFailoverState struct {
mu sync.Mutex
pendingRebuilds map[string][]pendingRebuild // dead server addr -> pending rebuilds
// R2-F2: Track deferred promotion timers so they can be cancelled on reconnect.
deferredTimers map[string][]*time.Timer // dead server addr -> pending timers
pendingRebuilds map[string][]pendingRebuild // dead server addr -> pending rebuilds
deferredTimers map[string][]deferredPromotion // dead server addr -> pending deferred promotions
}
// FailoverVolumeState is one volume's failover diagnosis entry.
type FailoverVolumeState struct {
VolumeName string
CurrentPrimary string
AffectedServer string // dead server that triggered the failover/rebuild
DeferredPromotion bool // true if a deferred promotion timer is pending
PendingRebuild bool // true if a rebuild is pending for this volume
Reason string // "lease_wait", "rebuild_pending", or ""
}
// FailoverDiagnostic is a bounded read-only snapshot of failover state
// for operator-visible diagnosis. P3 diagnosability surface.
//
// Volume-oriented: each entry describes one volume's failover state.
// Aggregate counts are derived from the volume list.
type FailoverDiagnostic struct {
Volumes []FailoverVolumeState
PendingRebuildCount map[string]int // dead server → count of pending rebuilds
DeferredPromotionCount map[string]int // dead server → count of deferred promotion timers
}
func (fs *blockFailoverState) DiagnosticSnapshot() FailoverDiagnostic {
fs.mu.Lock()
defer fs.mu.Unlock()
diag := FailoverDiagnostic{
PendingRebuildCount: make(map[string]int),
DeferredPromotionCount: make(map[string]int),
}
for server, rebuilds := range fs.pendingRebuilds {
diag.PendingRebuildCount[server] = len(rebuilds)
for _, rb := range rebuilds {
diag.Volumes = append(diag.Volumes, FailoverVolumeState{
VolumeName: rb.VolumeName,
CurrentPrimary: rb.NewPrimary,
AffectedServer: server,
PendingRebuild: true,
Reason: "rebuild_pending",
})
}
}
for server, promos := range fs.deferredTimers {
diag.DeferredPromotionCount[server] = len(promos)
for _, dp := range promos {
diag.Volumes = append(diag.Volumes, FailoverVolumeState{
VolumeName: dp.VolumeName,
CurrentPrimary: dp.CurrentPrimary,
AffectedServer: dp.AffectedServer,
DeferredPromotion: true,
Reason: "lease_wait",
})
}
}
return diag
}
// PublicationDiagnostic is a bounded read-only snapshot comparing the
// operator-visible publication (LookupBlockVolume response) against the
// registry authority for one volume. P3 diagnosability surface for S2.
type PublicationDiagnostic struct {
VolumeName string
LookupVolumeServer string // what LookupBlockVolume returns
LookupIscsiAddr string
AuthorityVolumeServer string // registry entry (source of truth)
AuthorityIscsiAddr string
Coherent bool // true if lookup == authority
Reason string // "" if coherent, otherwise why they diverge
}
// PublicationDiagnosticFor returns a PublicationDiagnostic for the named volume.
// It performs two independent reads:
// - Lookup side: calls LookupBlockVolume (the actual gRPC method)
// - Authority side: reads the registry directly
//
// Then compares the two. If they diverge, Coherent=false with a Reason.
func (ms *MasterServer) PublicationDiagnosticFor(volumeName string) (PublicationDiagnostic, bool) {
if ms.blockRegistry == nil {
return PublicationDiagnostic{}, false
}
// Read 1: the operator-visible publication surface.
lookupResp, err := ms.LookupBlockVolume(context.Background(), &master_pb.LookupBlockVolumeRequest{Name: volumeName})
if err != nil {
return PublicationDiagnostic{}, false
}
// Read 2: the registry authority (separate read).
entry, ok := ms.blockRegistry.Lookup(volumeName)
if !ok {
return PublicationDiagnostic{}, false
}
diag := PublicationDiagnostic{
VolumeName: volumeName,
LookupVolumeServer: lookupResp.VolumeServer,
LookupIscsiAddr: lookupResp.IscsiAddr,
AuthorityVolumeServer: entry.VolumeServer,
AuthorityIscsiAddr: entry.ISCSIAddr,
}
// Compare the two reads.
vsMatch := diag.LookupVolumeServer == diag.AuthorityVolumeServer
iscsiMatch := diag.LookupIscsiAddr == diag.AuthorityIscsiAddr
diag.Coherent = vsMatch && iscsiMatch
if !diag.Coherent {
if !vsMatch {
diag.Reason = "volume_server_mismatch"
} else {
diag.Reason = "iscsi_addr_mismatch"
}
}
return diag, true
}
func newBlockFailoverState() *blockFailoverState {
return &blockFailoverState{
pendingRebuilds: make(map[string][]pendingRebuild),
deferredTimers: make(map[string][]*time.Timer),
deferredTimers: make(map[string][]deferredPromotion),
}
}
@ -60,7 +183,11 @@ func (ms *MasterServer) failoverBlockVolumes(deadServer string) {
glog.V(0).Infof("failover: %q lease expires in %v, deferring promotion", entry.Name, delay)
volumeName := entry.Name
capturedEpoch := entry.Epoch // T3: capture epoch for stale-timer validation
capturedDeadServer := deadServer // capture for closure
timer := time.AfterFunc(delay, func() {
// Clean up the deferred entry regardless of outcome.
ms.removeFiredDeferredPromotion(capturedDeadServer, volumeName)
// T3: Re-validate before acting — prevent stale timer on recreated/changed volume.
current, ok := ms.blockRegistry.Lookup(volumeName)
if !ok {
@ -76,7 +203,12 @@ func (ms *MasterServer) failoverBlockVolumes(deadServer string) {
})
ms.blockFailover.mu.Lock()
ms.blockFailover.deferredTimers[deadServer] = append(
ms.blockFailover.deferredTimers[deadServer], timer)
ms.blockFailover.deferredTimers[deadServer], deferredPromotion{
Timer: timer,
VolumeName: volumeName,
CurrentPrimary: entry.VolumeServer,
AffectedServer: deadServer,
})
ms.blockFailover.mu.Unlock()
continue
}
@ -159,12 +291,14 @@ func (ms *MasterServer) finalizePromotion(volumeName, oldPrimary, oldPath string
assignment.ReplicaAddrs = append(assignment.ReplicaAddrs, blockvol.ReplicaAddr{
DataAddr: ri.DataAddr,
CtrlAddr: ri.CtrlAddr,
ServerID: ri.Server, // V2: stable identity
})
}
// Backward compat: also set scalar fields if exactly 1 replica.
if len(entry.Replicas) == 1 {
assignment.ReplicaDataAddr = entry.Replicas[0].DataAddr
assignment.ReplicaCtrlAddr = entry.Replicas[0].CtrlAddr
assignment.ReplicaServerID = entry.Replicas[0].Server // V2: stable identity
}
ms.blockAssignmentQueue.Enqueue(entry.VolumeServer, assignment)
@ -210,14 +344,36 @@ func (ms *MasterServer) cancelDeferredTimers(server string) {
return
}
ms.blockFailover.mu.Lock()
timers := ms.blockFailover.deferredTimers[server]
promos := ms.blockFailover.deferredTimers[server]
delete(ms.blockFailover.deferredTimers, server)
ms.blockFailover.mu.Unlock()
for _, t := range timers {
t.Stop()
for _, dp := range promos {
dp.Timer.Stop()
}
if len(timers) > 0 {
glog.V(0).Infof("failover: cancelled %d deferred promotion timers for reconnected %s", len(timers), server)
if len(promos) > 0 {
glog.V(0).Infof("failover: cancelled %d deferred promotion timers for reconnected %s", len(promos), server)
}
}
// removeFiredDeferredPromotion removes a single deferred promotion entry after
// its timer has fired (whether it promoted or was skipped). This keeps
// FailoverDiagnostic accurate: once the timer fires, the volume is no longer
// in lease-wait state.
func (ms *MasterServer) removeFiredDeferredPromotion(server, volumeName string) {
if ms.blockFailover == nil {
return
}
ms.blockFailover.mu.Lock()
defer ms.blockFailover.mu.Unlock()
promos := ms.blockFailover.deferredTimers[server]
for i, dp := range promos {
if dp.VolumeName == volumeName {
ms.blockFailover.deferredTimers[server] = append(promos[:i], promos[i+1:]...)
if len(ms.blockFailover.deferredTimers[server]) == 0 {
delete(ms.blockFailover.deferredTimers, server)
}
return
}
}
}
@ -286,16 +442,18 @@ func (ms *MasterServer) recoverBlockVolumes(reconnectedServer string) {
Role: blockvol.RoleToWire(blockvol.RolePrimary),
LeaseTtlMs: leaseTTLMs,
}
// Include all replica addresses.
// Include all replica addresses with stable identity.
for _, ri := range entry.Replicas {
primaryAssignment.ReplicaAddrs = append(primaryAssignment.ReplicaAddrs, blockvol.ReplicaAddr{
DataAddr: ri.DataAddr,
CtrlAddr: ri.CtrlAddr,
ServerID: ri.Server, // V2: stable identity
})
}
if len(entry.Replicas) == 1 {
primaryAssignment.ReplicaDataAddr = entry.Replicas[0].DataAddr
primaryAssignment.ReplicaCtrlAddr = entry.Replicas[0].CtrlAddr
primaryAssignment.ReplicaServerID = entry.Replicas[0].Server // V2
}
ms.blockAssignmentQueue.Enqueue(entry.VolumeServer, primaryAssignment)
@ -346,11 +504,13 @@ func (ms *MasterServer) refreshPrimaryForAddrChange(ac ReplicaAddrChange) {
assignment.ReplicaAddrs = append(assignment.ReplicaAddrs, blockvol.ReplicaAddr{
DataAddr: ri.DataAddr,
CtrlAddr: ri.CtrlAddr,
ServerID: ri.Server, // V2: stable identity
})
}
if len(entry.Replicas) == 1 {
assignment.ReplicaDataAddr = entry.Replicas[0].DataAddr
assignment.ReplicaCtrlAddr = entry.Replicas[0].CtrlAddr
assignment.ReplicaServerID = entry.Replicas[0].Server // V2
}
// Use current registry primary (not stale ac.PrimaryServer) in case
// failover happened between address-change detection and this refresh.
@ -384,6 +544,9 @@ func (ms *MasterServer) reevaluateOrphanedPrimaries(server string) {
capturedEpoch := entry.Epoch
deadPrimary := entry.VolumeServer
timer := time.AfterFunc(delay, func() {
// Clean up the deferred entry regardless of outcome.
ms.removeFiredDeferredPromotion(deadPrimary, volumeName)
current, ok := ms.blockRegistry.Lookup(volumeName)
if !ok {
return
@ -397,7 +560,12 @@ func (ms *MasterServer) reevaluateOrphanedPrimaries(server string) {
})
ms.blockFailover.mu.Lock()
ms.blockFailover.deferredTimers[deadPrimary] = append(
ms.blockFailover.deferredTimers[deadPrimary], timer)
ms.blockFailover.deferredTimers[deadPrimary], deferredPromotion{
Timer: timer,
VolumeName: volumeName,
CurrentPrimary: deadPrimary,
AffectedServer: deadPrimary,
})
ms.blockFailover.mu.Unlock()
continue
}

270
weed/server/qa_block_diagnosability_test.go

@ -0,0 +1,270 @@
package weed_server
import (
"context"
"os"
"strings"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// ============================================================
// Phase 12 P3: Diagnosability / Blocker Accounting
//
// All diagnosis conclusions use ONLY explicit bounded read-only
// diagnosis surfaces:
// - LookupBlockVolume (product-visible publication)
// - FailoverDiagnostic (volume-oriented failover state)
// - PublicationDiagnostic (lookup vs authority coherence)
// - RecoveryDiagnostic (active recovery task set)
// - phase-12-p3-blockers.md (finite blocker ledger)
//
// NOT performance, NOT rollout readiness.
// ============================================================
// --- S1: Failover convergence diagnosable via FailoverDiagnostic ---
func TestP12P3_FailoverConvergence_Diagnosable(t *testing.T) {
s := newSoakSetup(t)
ctx := context.Background()
s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "diag-vol-1", SizeBytes: 1 << 20,
})
entry, _ := s.ms.blockRegistry.Lookup("diag-vol-1")
s.bs.localServerID = entry.VolumeServer
s.deliver(entry.VolumeServer)
time.Sleep(100 * time.Millisecond)
// Surface 1: LookupBlockVolume shows current primary before failover.
lookupBefore, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "diag-vol-1"})
if lookupBefore.VolumeServer != entry.VolumeServer {
t.Fatal("lookup should show original primary before failover")
}
// Surface 2: FailoverDiagnostic — no volumes in failover state yet.
failoverBefore := s.ms.blockFailover.DiagnosticSnapshot()
for _, v := range failoverBefore.Volumes {
if v.VolumeName == "diag-vol-1" {
t.Fatalf("S1: diag-vol-1 should not appear in failover diagnostic before failover, got %+v", v)
}
}
// Trigger failover: expire lease, then failover.
s.ms.blockRegistry.UpdateEntry("diag-vol-1", func(e *BlockVolumeEntry) {
e.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
})
s.ms.failoverBlockVolumes(entry.VolumeServer)
// Surface 1 after: LookupBlockVolume shows NEW primary.
lookupAfter, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "diag-vol-1"})
if lookupAfter.VolumeServer == entry.VolumeServer {
t.Fatal("S1 stall: lookup still shows old primary after failover")
}
// Surface 2 after: FailoverDiagnostic shows volume-level failover state.
failoverAfter := s.ms.blockFailover.DiagnosticSnapshot()
// Classify via explicit diagnosis surface: find diag-vol-1 in failover volumes.
var found *FailoverVolumeState
for i := range failoverAfter.Volumes {
if failoverAfter.Volumes[i].VolumeName == "diag-vol-1" {
found = &failoverAfter.Volumes[i]
break
}
}
if found == nil {
t.Fatal("S1: diag-vol-1 not found in FailoverDiagnostic after failover")
}
// Diagnosis conclusion from explicit surfaces only:
// - Lookup changed (old → new primary)
// - FailoverDiagnostic classifies state as rebuild_pending
// - AffectedServer identifies the dead server
if !found.PendingRebuild {
t.Fatal("S1: FailoverDiagnostic should show PendingRebuild=true")
}
if found.Reason != "rebuild_pending" {
t.Fatalf("S1: expected reason=rebuild_pending, got %q", found.Reason)
}
if found.AffectedServer != entry.VolumeServer {
t.Fatalf("S1: AffectedServer should be dead server %s, got %s", entry.VolumeServer, found.AffectedServer)
}
if found.CurrentPrimary != lookupAfter.VolumeServer {
t.Fatalf("S1: CurrentPrimary should match lookup %s, got %s", lookupAfter.VolumeServer, found.CurrentPrimary)
}
t.Logf("P12P3 S1: diagnosed via LookupBlockVolume(%s→%s) + FailoverDiagnostic(vol=%s, reason=%s, affected=%s)",
lookupBefore.VolumeServer, lookupAfter.VolumeServer, found.VolumeName, found.Reason, found.AffectedServer)
}
// --- S2: Publication mismatch diagnosable via PublicationDiagnostic ---
func TestP12P3_PublicationMismatch_Diagnosable(t *testing.T) {
s := newSoakSetup(t)
ctx := context.Background()
s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "diag-vol-2", SizeBytes: 1 << 20,
})
entry, _ := s.ms.blockRegistry.Lookup("diag-vol-2")
s.bs.localServerID = entry.VolumeServer
s.deliver(entry.VolumeServer)
time.Sleep(100 * time.Millisecond)
// Surface 1: PublicationDiagnostic before failover — should be coherent.
pubBefore, ok := s.ms.PublicationDiagnosticFor("diag-vol-2")
if !ok {
t.Fatal("S2: PublicationDiagnosticFor should find diag-vol-2")
}
if !pubBefore.Coherent {
t.Fatalf("S2: publication should be coherent before failover, got reason=%q", pubBefore.Reason)
}
// Surface 2: LookupBlockVolume — repeated lookups self-consistent.
lookup1, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "diag-vol-2"})
lookup2, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "diag-vol-2"})
if lookup1.IscsiAddr != lookup2.IscsiAddr || lookup1.VolumeServer != lookup2.VolumeServer {
t.Fatalf("S2: repeated lookup mismatch: %s/%s vs %s/%s",
lookup1.VolumeServer, lookup1.IscsiAddr, lookup2.VolumeServer, lookup2.IscsiAddr)
}
// Trigger failover.
s.ms.blockRegistry.UpdateEntry("diag-vol-2", func(e *BlockVolumeEntry) {
e.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
})
s.ms.failoverBlockVolumes(entry.VolumeServer)
// Surface 1 after: PublicationDiagnostic after failover — still coherent.
pubAfter, ok := s.ms.PublicationDiagnosticFor("diag-vol-2")
if !ok {
t.Fatal("S2: PublicationDiagnosticFor should find diag-vol-2 after failover")
}
if !pubAfter.Coherent {
t.Fatalf("S2: publication should be coherent after failover, got reason=%q", pubAfter.Reason)
}
// Diagnosis conclusion from explicit surfaces only:
// - Pre-failover: coherent, lookup matches authority
// - Post-failover: coherent, lookup updated to new primary
// - Publication switched: post != pre
if pubAfter.LookupVolumeServer == pubBefore.LookupVolumeServer {
t.Fatal("S2: LookupVolumeServer unchanged after failover — publication did not switch")
}
if pubAfter.LookupIscsiAddr == pubBefore.LookupIscsiAddr {
t.Fatal("S2: LookupIscsiAddr unchanged after failover")
}
// Post-failover repeated lookup still self-consistent (via diagnostic).
pubAfter2, _ := s.ms.PublicationDiagnosticFor("diag-vol-2")
if pubAfter2.LookupVolumeServer != pubAfter.LookupVolumeServer ||
pubAfter2.LookupIscsiAddr != pubAfter.LookupIscsiAddr {
t.Fatal("S2: post-failover publication diagnostics inconsistent")
}
t.Logf("P12P3 S2: diagnosed via PublicationDiagnostic — pre(vs=%s, iscsi=%s, coherent=%v) → post(vs=%s, iscsi=%s, coherent=%v)",
pubBefore.LookupVolumeServer, pubBefore.LookupIscsiAddr, pubBefore.Coherent,
pubAfter.LookupVolumeServer, pubAfter.LookupIscsiAddr, pubAfter.Coherent)
}
// --- S3: Runtime residue diagnosable via RecoveryDiagnostic ---
func TestP12P3_RuntimeResidue_Diagnosable(t *testing.T) {
s := newSoakSetup(t)
ctx := context.Background()
s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "diag-vol-3", SizeBytes: 1 << 20,
})
entry, _ := s.ms.blockRegistry.Lookup("diag-vol-3")
s.bs.localServerID = entry.VolumeServer
s.deliver(entry.VolumeServer)
time.Sleep(200 * time.Millisecond)
// Surface: RecoveryDiagnostic shows active task set.
diagBefore := s.bs.v2Recovery.DiagnosticSnapshot()
t.Logf("S3 before delete: %d active tasks: %v", len(diagBefore.ActiveTasks), diagBefore.ActiveTasks)
// Delete the volume.
s.ms.DeleteBlockVolume(ctx, &master_pb.DeleteBlockVolumeRequest{Name: "diag-vol-3"})
time.Sleep(200 * time.Millisecond)
// Surface after: RecoveryDiagnostic — no tasks for deleted volume.
diagAfter := s.bs.v2Recovery.DiagnosticSnapshot()
for _, task := range diagAfter.ActiveTasks {
if strings.Contains(task, "diag-vol-3") {
t.Fatalf("S3 residue: task %s active after delete", task)
}
}
// Diagnosis conclusion from explicit surface only:
// - RecoveryDiagnostic.ActiveTasks does not contain deleted volume's tasks
// - Conclusion: clean (no residue) or non-empty but unrelated to deleted volume
if len(diagAfter.ActiveTasks) == 0 {
t.Log("P12P3 S3: diagnosed via RecoveryDiagnostic — clean (0 active tasks after delete)")
} else {
t.Logf("P12P3 S3: diagnosed via RecoveryDiagnostic — %d active tasks (none for deleted vol)",
len(diagAfter.ActiveTasks))
}
}
// --- Blocker ledger: reads and validates the actual file ---
func TestP12P3_BlockerLedger_Bounded(t *testing.T) {
ledgerPath := "../../sw-block/.private/phase/phase-12-p3-blockers.md"
data, err := os.ReadFile(ledgerPath)
if err != nil {
t.Fatalf("blocker ledger must exist at %s: %v", ledgerPath, err)
}
content := string(data)
// Must contain diagnosed items.
for _, id := range []string{"B1", "B2", "B3"} {
if !strings.Contains(content, id) {
t.Fatalf("ledger missing diagnosed item %s", id)
}
}
// Must contain unresolved items.
for _, id := range []string{"U1", "U2", "U3"} {
if !strings.Contains(content, id) {
t.Fatalf("ledger missing unresolved item %s", id)
}
}
// Must contain out-of-scope section.
if !strings.Contains(content, "Out of Scope") {
t.Fatal("ledger must have 'Out of Scope' section")
}
// Must NOT overclaim perf or rollout.
lines := strings.Split(content, "\n")
diagnosedCount := 0
unresolvedCount := 0
for _, line := range lines {
if strings.HasPrefix(strings.TrimSpace(line), "| B") {
diagnosedCount++
}
if strings.HasPrefix(strings.TrimSpace(line), "| U") {
unresolvedCount++
}
}
total := diagnosedCount + unresolvedCount
if total == 0 {
t.Fatal("ledger has no blocker items")
}
if total > 20 {
t.Fatalf("ledger should be finite, got %d items", total)
}
t.Logf("P12P3 blockers: %d diagnosed + %d unresolved = %d total (from actual file, finite)",
diagnosedCount, unresolvedCount, total)
}

582
weed/server/qa_block_perf_test.go

@ -0,0 +1,582 @@
package weed_server
import (
"crypto/rand"
"fmt"
"math"
mrand "math/rand"
"os"
"path/filepath"
"sort"
"strings"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
// ============================================================
// Phase 12 P4: Performance Floor — Bounded Measurement Package
//
// Workload envelope:
// Topology: RF=2 sync_all accepted chosen path
// Operations: 4K random write, 4K random read, 4K sequential write, 4K sequential read
// Runtime: no failover, no disturbance, steady-state
// Environment: unit test harness (single-process, local disk, engine-local I/O)
//
// What this measures:
// Engine I/O floor for the accepted chosen path. WriteLBA/ReadLBA through
// the full fencing path (epoch, role, lease, writeGate, WAL, dirtyMap).
// No transport layer (iSCSI/NVMe). No cross-machine replication.
//
// What this does NOT measure:
// Transport throughput, cross-machine replication tax, multi-client concurrency,
// failover-under-load, degraded mode. Production floor with replication is
// documented in baseline-roce-20260401.md.
//
// NOT performance tuning. NOT broad benchmark.
// ============================================================
const (
perfBlockSize = 4096
perfVolumeSize = 64 * 1024 * 1024 // 64MB
perfWALSize = 16 * 1024 * 1024 // 16MB
perfOps = 1000 // ops per measurement run
perfWarmupOps = 200 // warmup ops (discarded from measurement)
perfIterations = 3 // run N times, report worst as floor
)
// Minimum acceptable floor thresholds (engine-local, single-writer).
//
// These are regression gates, not performance targets. Set conservatively
// so any reasonable hardware passes, but catastrophic regressions
// (accidental serialization, O(n^2) scan, broken WAL path) are caught.
//
// Rationale for values:
// Measured on dev SSD: rand-write ~10K, rand-read ~80K, seq-write ~30K, seq-read ~180K.
// Thresholds set at ~10% of measured to tolerate slow CI machines and VMs.
// Write P99 ceiling at 100ms catches deadlocks/stalls without false-positiving
// on slow storage.
var perfFloorGates = map[string]struct {
MinIOPS float64
MaxWriteP99 time.Duration // 0 = no ceiling (reads)
}{
"rand-write": {MinIOPS: 1000, MaxWriteP99: 100 * time.Millisecond},
"rand-read": {MinIOPS: 5000},
"seq-write": {MinIOPS: 2000, MaxWriteP99: 100 * time.Millisecond},
"seq-read": {MinIOPS: 10000},
}
// perfResult holds measurements for one workload run.
type perfResult struct {
Workload string
Ops int
Elapsed time.Duration
IOPS float64
MBps float64
LatSamples []int64 // per-op latency in nanoseconds
}
func (r *perfResult) latPct(pct float64) time.Duration {
if len(r.LatSamples) == 0 {
return 0
}
sorted := make([]int64, len(r.LatSamples))
copy(sorted, r.LatSamples)
sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] })
idx := int(math.Ceil(pct/100.0*float64(len(sorted)))) - 1
if idx < 0 {
idx = 0
}
if idx >= len(sorted) {
idx = len(sorted) - 1
}
return time.Duration(sorted[idx])
}
func (r *perfResult) latAvg() time.Duration {
if len(r.LatSamples) == 0 {
return 0
}
var sum int64
for _, s := range r.LatSamples {
sum += s
}
return time.Duration(sum / int64(len(r.LatSamples)))
}
// setupPerfVolume creates a BlockVol configured as Primary for perf measurement.
func setupPerfVolume(t *testing.T) *blockvol.BlockVol {
t.Helper()
dir := t.TempDir()
volPath := filepath.Join(dir, "perf.blk")
vol, err := blockvol.CreateBlockVol(volPath, blockvol.CreateOptions{
VolumeSize: perfVolumeSize,
BlockSize: perfBlockSize,
WALSize: perfWALSize,
})
if err != nil {
t.Fatal(err)
}
// Set up as Primary with long lease so writes are allowed.
if err := vol.HandleAssignment(1, blockvol.RolePrimary, 10*time.Minute); err != nil {
vol.Close()
t.Fatal(err)
}
t.Cleanup(func() { vol.Close() })
return vol
}
// maxLBAs returns the number of addressable 4K blocks in the extent area.
func maxLBAs() uint64 {
// Volume size minus WAL, divided by block size, with safety margin.
return (perfVolumeSize - perfWALSize) / perfBlockSize / 2
}
// runPerfWorkload executes one workload measurement and returns the result.
func runPerfWorkload(t *testing.T, vol *blockvol.BlockVol, workload string, ops int) perfResult {
t.Helper()
data := make([]byte, perfBlockSize)
rand.Read(data)
max := maxLBAs()
samples := make([]int64, 0, ops)
start := time.Now()
for i := 0; i < ops; i++ {
var lba uint64
switch {
case strings.HasPrefix(workload, "rand"):
lba = uint64(mrand.Int63n(int64(max)))
default: // sequential
lba = uint64(i) % max
}
opStart := time.Now()
switch {
case strings.HasSuffix(workload, "write"):
if err := vol.WriteLBA(lba, data); err != nil {
t.Fatalf("%s op %d: WriteLBA(%d): %v", workload, i, lba, err)
}
case strings.HasSuffix(workload, "read"):
if _, err := vol.ReadLBA(lba, perfBlockSize); err != nil {
t.Fatalf("%s op %d: ReadLBA(%d): %v", workload, i, lba, err)
}
}
samples = append(samples, time.Since(opStart).Nanoseconds())
}
elapsed := time.Since(start)
iops := float64(ops) / elapsed.Seconds()
mbps := iops * float64(perfBlockSize) / (1024 * 1024)
return perfResult{
Workload: workload,
Ops: ops,
Elapsed: elapsed,
IOPS: iops,
MBps: mbps,
LatSamples: samples,
}
}
// floorOf returns the worst (lowest) IOPS and worst (highest) P99 across iterations.
type perfFloor struct {
Workload string
FloorIOPS float64
FloorMBps float64
WorstAvg time.Duration
WorstP50 time.Duration
WorstP99 time.Duration
WorstMax time.Duration
}
func computeFloor(results []perfResult) perfFloor {
f := perfFloor{
Workload: results[0].Workload,
FloorIOPS: math.MaxFloat64,
FloorMBps: math.MaxFloat64,
}
for _, r := range results {
if r.IOPS < f.FloorIOPS {
f.FloorIOPS = r.IOPS
}
if r.MBps < f.FloorMBps {
f.FloorMBps = r.MBps
}
avg := r.latAvg()
if avg > f.WorstAvg {
f.WorstAvg = avg
}
p50 := r.latPct(50)
if p50 > f.WorstP50 {
f.WorstP50 = p50
}
p99 := r.latPct(99)
if p99 > f.WorstP99 {
f.WorstP99 = p99
}
pmax := r.latPct(100)
if pmax > f.WorstMax {
f.WorstMax = pmax
}
}
return f
}
// --- Test 1: PerformanceFloor_Bounded ---
func TestP12P4_PerformanceFloor_Bounded(t *testing.T) {
vol := setupPerfVolume(t)
workloads := []string{"rand-write", "rand-read", "seq-write", "seq-read"}
floors := make([]perfFloor, 0, len(workloads))
for _, wl := range workloads {
// Warmup: populate volume with data (needed for reads).
if strings.HasSuffix(wl, "read") {
warmupData := make([]byte, perfBlockSize)
rand.Read(warmupData)
for i := 0; i < int(maxLBAs()); i++ {
if err := vol.WriteLBA(uint64(i), warmupData); err != nil {
break // WAL full is acceptable during warmup
}
}
time.Sleep(200 * time.Millisecond) // let flusher drain
}
// Warmup ops (discarded).
runPerfWorkload(t, vol, wl, perfWarmupOps)
// Measurement: N iterations, take floor.
var results []perfResult
for iter := 0; iter < perfIterations; iter++ {
r := runPerfWorkload(t, vol, wl, perfOps)
results = append(results, r)
}
floor := computeFloor(results)
floors = append(floors, floor)
}
// Report structured floor table.
t.Log("")
t.Log("=== P12P4 Performance Floor (engine-local, single-writer) ===")
t.Log("")
t.Logf("%-12s %10s %8s %10s %10s %10s %10s",
"Workload", "Floor IOPS", "MB/s", "Avg Lat", "P50 Lat", "P99 Lat", "Max Lat")
t.Logf("%-12s %10s %8s %10s %10s %10s %10s",
"--------", "----------", "------", "-------", "-------", "-------", "-------")
for _, f := range floors {
t.Logf("%-12s %10.0f %8.2f %10s %10s %10s %10s",
f.Workload, f.FloorIOPS, f.FloorMBps, f.WorstAvg, f.WorstP50, f.WorstP99, f.WorstMax)
}
t.Log("")
t.Logf("Config: volume=%dMB WAL=%dMB block=%dB ops=%d warmup=%d iterations=%d",
perfVolumeSize/(1024*1024), perfWALSize/(1024*1024), perfBlockSize, perfOps, perfWarmupOps, perfIterations)
t.Log("Method: worst of N iterations (floor, not peak)")
t.Log("Scope: engine-local only; production RF=2 floor in baseline-roce-20260401.md")
// Gate: floor values must meet minimum acceptable thresholds.
// These are regression gates — if any floor drops below the gate,
// the test fails, blocking rollout.
t.Log("")
t.Log("=== Floor Gate Validation ===")
allGatesPassed := true
for _, f := range floors {
gate, ok := perfFloorGates[f.Workload]
if !ok {
t.Fatalf("no floor gate defined for workload %s", f.Workload)
}
passed := true
if f.FloorIOPS < gate.MinIOPS {
t.Errorf("GATE FAIL: %s floor IOPS %.0f < minimum %.0f", f.Workload, f.FloorIOPS, gate.MinIOPS)
passed = false
}
if gate.MaxWriteP99 > 0 && f.WorstP99 > gate.MaxWriteP99 {
t.Errorf("GATE FAIL: %s worst P99 %s > ceiling %s", f.Workload, f.WorstP99, gate.MaxWriteP99)
passed = false
}
status := "PASS"
if !passed {
status = "FAIL"
allGatesPassed = false
}
t.Logf(" %-12s min=%6.0f IOPS → floor=%6.0f [%s]", f.Workload, gate.MinIOPS, f.FloorIOPS, status)
}
if !allGatesPassed {
t.Fatal("P12P4 PerformanceFloor: FAIL — one or more floor gates not met")
}
t.Log("P12P4 PerformanceFloor: PASS — all floor gates met")
}
// --- Test 2: CostCharacterization_Bounded ---
func TestP12P4_CostCharacterization_Bounded(t *testing.T) {
vol := setupPerfVolume(t)
// Measure write latency breakdown: WriteLBA includes WAL append + group commit.
data := make([]byte, perfBlockSize)
rand.Read(data)
max := maxLBAs()
const costOps = 500
var writeLatSum int64
for i := 0; i < costOps; i++ {
lba := uint64(mrand.Int63n(int64(max)))
start := time.Now()
if err := vol.WriteLBA(lba, data); err != nil {
t.Fatalf("write op %d: %v", i, err)
}
writeLatSum += time.Since(start).Nanoseconds()
}
avgWriteLat := time.Duration(writeLatSum / costOps)
// Measure read latency for comparison.
// Populate first.
for i := 0; i < int(max/2); i++ {
vol.WriteLBA(uint64(i), data)
}
time.Sleep(200 * time.Millisecond) // let flusher drain
var readLatSum int64
for i := 0; i < costOps; i++ {
lba := uint64(mrand.Int63n(int64(max / 2)))
start := time.Now()
if _, err := vol.ReadLBA(lba, perfBlockSize); err != nil {
t.Fatalf("read op %d: %v", i, err)
}
readLatSum += time.Since(start).Nanoseconds()
}
avgReadLat := time.Duration(readLatSum / costOps)
// Cost statement.
t.Log("")
t.Log("=== P12P4 Cost Characterization (engine-local) ===")
t.Log("")
t.Logf("Average write latency: %s (includes WAL append + group commit sync)", avgWriteLat)
t.Logf("Average read latency: %s (dirtyMap lookup + WAL/extent read)", avgReadLat)
t.Log("")
t.Log("Bounded cost statement:")
t.Log(" WAL write amplification: 2x minimum (WAL write + eventual extent flush)")
t.Log(" Group commit: amortizes fdatasync across batched writers (1 sync per batch)")
t.Log(" Replication tax (production RF=2 sync_all): -56% vs RF=1 (barrier round-trip)")
t.Log(" Replication tax source: baseline-roce-20260401.md, measured on 25Gbps RoCE")
t.Log("")
t.Logf("Write/read ratio: %.1fx (write is %.1fx slower than read)",
float64(avgWriteLat)/float64(avgReadLat),
float64(avgWriteLat)/float64(avgReadLat))
t.Log("")
t.Logf("Config: volume=%dMB WAL=%dMB block=%dB ops=%d",
perfVolumeSize/(1024*1024), perfWALSize/(1024*1024), perfBlockSize, costOps)
// Proof: cost values are finite and positive.
if avgWriteLat <= 0 || avgReadLat <= 0 {
t.Fatal("latency values must be positive")
}
if avgWriteLat < avgReadLat {
t.Log("Note: write faster than read in this run (possible due to WAL cache hits)")
}
t.Log("P12P4 CostCharacterization: PASS — bounded cost statement produced")
}
// --- Test 3: RolloutGate_Bounded ---
func TestP12P4_RolloutGate_Bounded(t *testing.T) {
floorPath := "../../sw-block/.private/phase/phase-12-p4-floor.md"
gatesPath := "../../sw-block/.private/phase/phase-12-p4-rollout-gates.md"
baselinePath := "../../learn/projects/sw-block/test/results/baseline-roce-20260401.md"
blockerPath := "../../sw-block/.private/phase/phase-12-p3-blockers.md"
// --- Read all cited evidence sources ---
floorData, err := os.ReadFile(floorPath)
if err != nil {
t.Fatalf("floor doc must exist at %s: %v", floorPath, err)
}
floorContent := string(floorData)
gatesData, err := os.ReadFile(gatesPath)
if err != nil {
t.Fatalf("rollout-gates doc must exist at %s: %v", gatesPath, err)
}
gatesContent := string(gatesData)
baselineData, err := os.ReadFile(baselinePath)
if err != nil {
t.Fatalf("cited baseline must exist at %s: %v", baselinePath, err)
}
baselineContent := string(baselineData)
blockerData, err := os.ReadFile(blockerPath)
if err != nil {
t.Fatalf("cited blocker ledger must exist at %s: %v", blockerPath, err)
}
blockerContent := string(blockerData)
// --- Structural validation (shape) ---
// Floor doc: workload envelope, floor table, non-claims.
for _, required := range []string{"RF=2", "sync_all", "4K random write", "4K random read", "sequential write", "sequential read"} {
if !strings.Contains(floorContent, required) {
t.Fatalf("floor doc missing required content: %q", required)
}
}
if !strings.Contains(floorContent, "Floor") || !strings.Contains(floorContent, "IOPS") {
t.Fatal("floor doc must contain floor table with IOPS")
}
if !strings.Contains(floorContent, "does NOT") {
t.Fatal("floor doc must contain explicit non-claims")
}
// Gates doc: gates table, launch envelope, exclusions, non-claims.
if !strings.Contains(gatesContent, "Gate") || !strings.Contains(gatesContent, "Status") {
t.Fatal("rollout-gates doc must contain gates table")
}
if !strings.Contains(gatesContent, "Launch Envelope") {
t.Fatal("rollout-gates doc must contain launch envelope")
}
if !strings.Contains(gatesContent, "Exclusion") {
t.Fatal("rollout-gates doc must contain exclusions")
}
if !strings.Contains(gatesContent, "does NOT") {
t.Fatal("rollout-gates doc must contain explicit non-claims")
}
// Count gates — must be finite.
gateLines := 0
for _, line := range strings.Split(gatesContent, "\n") {
trimmed := strings.TrimSpace(line)
if strings.HasPrefix(trimmed, "| G") || strings.HasPrefix(trimmed, "| E") {
gateLines++
}
}
if gateLines == 0 {
t.Fatal("rollout-gates doc has no gate items")
}
if gateLines > 20 {
t.Fatalf("rollout-gates doc should be finite, got %d items", gateLines)
}
// --- Semantic cross-checks (evidence alignment) ---
// 1. G6 cites "28.4K write IOPS" — baseline must contain this number.
if strings.Contains(gatesContent, "28.4K write IOPS") || strings.Contains(gatesContent, "28,4") {
// The gates doc cites write IOPS from baseline. Verify the baseline has it.
if !strings.Contains(baselineContent, "28,") {
t.Fatal("G6 cites write IOPS but baseline does not contain matching value")
}
}
// More precise: baseline must contain the specific numbers cited in G6.
if !strings.Contains(baselineContent, "28,347") && !strings.Contains(baselineContent, "28,429") &&
!strings.Contains(baselineContent, "28,453") {
t.Fatal("baseline must contain RF=2 sync_all write IOPS data (28,3xx-28,4xx range)")
}
if !strings.Contains(baselineContent, "136,648") {
t.Fatal("baseline must contain RF=2 read IOPS data (136,648)")
}
// 2. G5 cites "-56% replication tax" — baseline must contain this.
if strings.Contains(gatesContent, "-56%") {
if !strings.Contains(baselineContent, "-56%") {
t.Fatal("G5 cites -56% replication tax but baseline does not contain -56%")
}
}
// 3. Launch envelope claims specific transport/network combos — verify against baseline.
// Claimed: NVMe-TCP @ 25Gbps RoCE
if strings.Contains(gatesContent, "NVMe-TCP @ 25Gbps RoCE") {
if !strings.Contains(baselineContent, "NVMe-TCP") || !strings.Contains(baselineContent, "RoCE") {
t.Fatal("launch envelope claims NVMe-TCP @ RoCE but baseline has no such data")
}
}
// Claimed: iSCSI @ 25Gbps RoCE
if strings.Contains(gatesContent, "iSCSI @ 25Gbps RoCE") {
if !strings.Contains(baselineContent, "iSCSI") || !strings.Contains(baselineContent, "RoCE") {
t.Fatal("launch envelope claims iSCSI @ RoCE but baseline has no such data")
}
}
// Claimed: iSCSI @ 1Gbps
if strings.Contains(gatesContent, "iSCSI @ 1Gbps") {
if !strings.Contains(baselineContent, "iSCSI") || !strings.Contains(baselineContent, "1Gbps") {
t.Fatal("launch envelope claims iSCSI @ 1Gbps but baseline has no such data")
}
}
// Exclusion: NVMe-TCP @ 1Gbps must NOT be claimed as supported.
if strings.Contains(gatesContent, "NOT included") {
// Verify baseline indeed lacks NVMe-TCP @ 1Gbps.
hasNvme1g := strings.Contains(baselineContent, "NVMe-TCP") && strings.Contains(baselineContent, "| NVMe-TCP | 1Gbps")
if hasNvme1g {
t.Fatal("baseline contains NVMe-TCP @ 1Gbps data but gates doc excludes it — resolve mismatch")
}
}
// 4. G7 cites blocker ledger counts — verify against actual ledger.
if strings.Contains(gatesContent, "3 diagnosed") {
diagCount := 0
for _, line := range strings.Split(blockerContent, "\n") {
if strings.HasPrefix(strings.TrimSpace(line), "| B") {
diagCount++
}
}
if diagCount != 3 {
t.Fatalf("G7 claims 3 diagnosed blockers but ledger has %d", diagCount)
}
}
if strings.Contains(gatesContent, "3 unresolved") {
unresCount := 0
for _, line := range strings.Split(blockerContent, "\n") {
if strings.HasPrefix(strings.TrimSpace(line), "| U") {
unresCount++
}
}
if unresCount != 3 {
t.Fatalf("G7 claims 3 unresolved blockers but ledger has %d", unresCount)
}
}
// 5. Floor doc gate thresholds must match code-defined gates.
for workload, gate := range perfFloorGates {
// The doc uses comma-formatted numbers (e.g., "1,000" or "5,000").
minInt := int(gate.MinIOPS)
// Check for both comma-formatted and plain forms.
found := false
for _, form := range []string{
fmt.Sprintf("%d", minInt), // "1000"
fmt.Sprintf("%d,%03d", minInt/1000, minInt%1000), // "1,000"
} {
if strings.Contains(floorContent, form) {
found = true
break
}
}
if !found {
t.Errorf("floor doc gate for %s should cite minimum %d IOPS but doesn't", workload, minInt)
}
}
t.Logf("P12P4 RolloutGate: floor doc %d bytes, gates doc %d bytes, %d gate items",
len(floorData), len(gatesData), gateLines)
t.Log("P12P4 RolloutGate: semantic cross-checks passed (baseline, blocker ledger, gate thresholds)")
t.Log("P12P4 RolloutGate: PASS — bounded launch envelope with verified evidence alignment")
}
// --- Helpers ---
func init() {
// Seed random for reproducible LBA patterns within a test run.
mrand.Seed(time.Now().UnixNano())
}
// formatDuration formats a duration for table display.
func formatDuration(d time.Duration) string {
if d < time.Microsecond {
return fmt.Sprintf("%dns", d.Nanoseconds())
}
if d < time.Millisecond {
return fmt.Sprintf("%.1fus", float64(d.Nanoseconds())/1000.0)
}
return fmt.Sprintf("%.2fms", float64(d.Nanoseconds())/1e6)
}

308
weed/server/qa_block_soak_test.go

@ -0,0 +1,308 @@
package weed_server
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/v2bridge"
)
// ============================================================
// Phase 12 P2: Soak / Long-Run Stability Hardening
//
// Proves: repeated chosen-path cycles return to bounded truth
// without hidden state drift or unbounded runtime artifacts.
//
// NOT diagnosability, NOT performance-floor, NOT rollout readiness.
// ============================================================
const soakCycles = 5
type soakSetup struct {
ms *MasterServer
bs *BlockService
store *storage.BlockVolumeStore
dir string
}
func newSoakSetup(t *testing.T) *soakSetup {
t.Helper()
dir := t.TempDir()
store := storage.NewBlockVolumeStore()
ms := &MasterServer{
blockRegistry: NewBlockVolumeRegistry(),
blockAssignmentQueue: NewBlockAssignmentQueue(),
blockFailover: newBlockFailoverState(),
}
ms.blockRegistry.MarkBlockCapable("vs1:9333")
ms.blockRegistry.MarkBlockCapable("vs2:9333")
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
sanitized := strings.ReplaceAll(server, ":", "_")
serverDir := filepath.Join(dir, sanitized)
os.MkdirAll(serverDir, 0755)
volPath := filepath.Join(serverDir, fmt.Sprintf("%s.blk", name))
vol, err := blockvol.CreateBlockVol(volPath, blockvol.CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
WALSize: 256 * 1024,
})
if err != nil {
return nil, err
}
vol.Close()
if _, err := store.AddBlockVolume(volPath, ""); err != nil {
return nil, err
}
host := server
if idx := strings.LastIndex(server, ":"); idx >= 0 {
host = server[:idx]
}
return &blockAllocResult{
Path: volPath,
IQN: fmt.Sprintf("iqn.2024.test:%s", name),
ISCSIAddr: host + ":3260",
ReplicaDataAddr: server + ":14260",
ReplicaCtrlAddr: server + ":14261",
RebuildListenAddr: server + ":15000",
}, nil
}
ms.blockVSDelete = func(ctx context.Context, server string, name string) error { return nil }
bs := &BlockService{
blockStore: store,
blockDir: filepath.Join(dir, "vs1_9333"),
listenAddr: "127.0.0.1:3260",
localServerID: "vs1:9333",
v2Bridge: v2bridge.NewControlBridge(),
v2Orchestrator: engine.NewRecoveryOrchestrator(),
replStates: make(map[string]*volReplState),
}
bs.v2Recovery = NewRecoveryManager(bs)
t.Cleanup(func() {
bs.v2Recovery.Shutdown()
store.Close()
})
return &soakSetup{ms: ms, bs: bs, store: store, dir: dir}
}
func (s *soakSetup) deliver(server string) int {
pending := s.ms.blockAssignmentQueue.Peek(server)
if len(pending) == 0 {
return 0
}
protoAssignments := blockvol.AssignmentsToProto(pending)
goAssignments := blockvol.AssignmentsFromProto(protoAssignments)
s.bs.ProcessAssignments(goAssignments)
return len(goAssignments)
}
// --- Repeated create/failover/recover cycles with end-of-cycle truth checks ---
func TestP12P2_RepeatedCycles_NoDrift(t *testing.T) {
s := newSoakSetup(t)
ctx := context.Background()
for cycle := 1; cycle <= soakCycles; cycle++ {
volName := fmt.Sprintf("soak-vol-%d", cycle)
// Step 1: Create.
createResp, err := s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: volName, SizeBytes: 1 << 20,
})
if err != nil {
t.Fatalf("cycle %d create: %v", cycle, err)
}
primaryVS := createResp.VolumeServer
// Deliver initial assignment.
s.bs.localServerID = primaryVS
s.deliver(primaryVS)
time.Sleep(100 * time.Millisecond)
entry, ok := s.ms.blockRegistry.Lookup(volName)
if !ok {
t.Fatalf("cycle %d: volume not in registry", cycle)
}
// Step 2: Failover.
s.ms.blockRegistry.UpdateEntry(volName, func(e *BlockVolumeEntry) {
e.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
})
s.ms.failoverBlockVolumes(primaryVS)
entryAfter, _ := s.ms.blockRegistry.Lookup(volName)
if entryAfter.Epoch <= entry.Epoch {
t.Fatalf("cycle %d: epoch did not increase: %d <= %d", cycle, entryAfter.Epoch, entry.Epoch)
}
// Deliver failover assignment.
newPrimary := entryAfter.VolumeServer
s.bs.localServerID = newPrimary
s.deliver(newPrimary)
time.Sleep(100 * time.Millisecond)
// Step 3: Reconnect.
s.ms.recoverBlockVolumes(primaryVS)
s.bs.localServerID = primaryVS
s.deliver(primaryVS)
s.bs.localServerID = newPrimary
s.deliver(newPrimary)
time.Sleep(100 * time.Millisecond)
// === End-of-cycle truth checks ===
// Registry: volume exists, epoch monotonic.
finalEntry, ok := s.ms.blockRegistry.Lookup(volName)
if !ok {
t.Fatalf("cycle %d: volume missing from registry at end", cycle)
}
if finalEntry.Epoch < entryAfter.Epoch {
t.Fatalf("cycle %d: registry epoch regressed: %d < %d", cycle, finalEntry.Epoch, entryAfter.Epoch)
}
// VS-visible: promoted vol epoch matches.
var volEpoch uint64
if err := s.store.WithVolume(entryAfter.Path, func(vol *blockvol.BlockVol) error {
volEpoch = vol.Epoch()
return nil
}); err != nil {
t.Fatalf("cycle %d: promoted vol access failed: %v", cycle, err)
}
if volEpoch < entryAfter.Epoch {
t.Fatalf("cycle %d: vol epoch=%d < registry=%d", cycle, volEpoch, entryAfter.Epoch)
}
// Publication: lookup matches registry truth (not just non-empty).
lookupResp, err := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: volName})
if err != nil {
t.Fatalf("cycle %d: lookup failed: %v", cycle, err)
}
if lookupResp.IscsiAddr != finalEntry.ISCSIAddr {
t.Fatalf("cycle %d: lookup iSCSI=%q != registry=%q", cycle, lookupResp.IscsiAddr, finalEntry.ISCSIAddr)
}
if lookupResp.VolumeServer != finalEntry.VolumeServer {
t.Fatalf("cycle %d: lookup VS=%q != registry=%q", cycle, lookupResp.VolumeServer, finalEntry.VolumeServer)
}
t.Logf("cycle %d: registry=%d vol=%d lookup=registry ✓",
cycle, finalEntry.Epoch, volEpoch)
}
t.Logf("P12P2 repeated cycles: %d cycles, all end-of-cycle truth checks passed", soakCycles)
}
// --- Runtime state hygiene: no unbounded leftovers after cycles ---
func TestP12P2_RuntimeHygiene_NoLeftovers(t *testing.T) {
s := newSoakSetup(t)
ctx := context.Background()
// Create and delete several volumes to exercise lifecycle.
for i := 1; i <= soakCycles; i++ {
name := fmt.Sprintf("hygiene-vol-%d", i)
s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: name, SizeBytes: 1 << 20,
})
entry, _ := s.ms.blockRegistry.Lookup(name)
s.bs.localServerID = entry.VolumeServer
s.deliver(entry.VolumeServer)
time.Sleep(50 * time.Millisecond)
}
// Delete all volumes.
for i := 1; i <= soakCycles; i++ {
name := fmt.Sprintf("hygiene-vol-%d", i)
s.ms.DeleteBlockVolume(ctx, &master_pb.DeleteBlockVolumeRequest{Name: name})
}
time.Sleep(200 * time.Millisecond)
// Check: no stale recovery tasks.
activeTasks := s.bs.v2Recovery.ActiveTaskCount()
if activeTasks > 0 {
t.Fatalf("stale recovery tasks: %d (expected 0 after all volumes deleted)", activeTasks)
}
// Check: registry should have no entries for deleted volumes.
for i := 1; i <= soakCycles; i++ {
name := fmt.Sprintf("hygiene-vol-%d", i)
if _, ok := s.ms.blockRegistry.Lookup(name); ok {
t.Fatalf("stale registry entry: %s (should be deleted)", name)
}
}
// Check: assignment queue should not have unbounded stale entries.
for _, server := range []string{"vs1:9333", "vs2:9333"} {
pending := s.ms.blockAssignmentQueue.Peek(server)
// Some pending entries may exist (lease grants etc), but check for bounded size.
if len(pending) > soakCycles*2 {
t.Fatalf("unbounded stale assignments for %s: %d", server, len(pending))
}
}
t.Logf("P12P2 hygiene: %d volumes created+deleted, 0 stale tasks, 0 stale registry, bounded queue", soakCycles)
}
// --- Steady-state repeated delivery: idempotence holds over many cycles ---
func TestP12P2_SteadyState_IdempotenceHolds(t *testing.T) {
s := newSoakSetup(t)
ctx := context.Background()
s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "steady-vol-1", SizeBytes: 1 << 20,
})
entry, _ := s.ms.blockRegistry.Lookup("steady-vol-1")
s.bs.localServerID = entry.VolumeServer
s.deliver(entry.VolumeServer)
time.Sleep(200 * time.Millisecond)
replicaID := entry.Path + "/" + entry.Replicas[0].Server
eventsAfterFirst := len(s.bs.v2Orchestrator.Log.EventsFor(replicaID))
if eventsAfterFirst == 0 {
t.Fatal("first delivery must create events")
}
// Deliver the same assignment many times.
for i := 0; i < soakCycles*2; i++ {
s.deliver(entry.VolumeServer)
time.Sleep(20 * time.Millisecond)
}
eventsAfterSoak := len(s.bs.v2Orchestrator.Log.EventsFor(replicaID))
if eventsAfterSoak != eventsAfterFirst {
t.Fatalf("idempotence drift: events %d → %d after %d repeated deliveries",
eventsAfterFirst, eventsAfterSoak, soakCycles*2)
}
// Verify: registry, vol, and lookup are still coherent.
finalEntry, _ := s.ms.blockRegistry.Lookup("steady-vol-1")
if finalEntry.Epoch != entry.Epoch {
t.Fatalf("epoch drifted: %d → %d", entry.Epoch, finalEntry.Epoch)
}
lookupResp, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "steady-vol-1"})
if lookupResp.IscsiAddr != finalEntry.ISCSIAddr {
t.Fatalf("lookup iSCSI=%q != registry=%q after soak", lookupResp.IscsiAddr, finalEntry.ISCSIAddr)
}
if lookupResp.VolumeServer != finalEntry.VolumeServer {
t.Fatalf("lookup VS=%q != registry=%q after soak", lookupResp.VolumeServer, finalEntry.VolumeServer)
}
t.Logf("P12P2 steady state: %d repeated deliveries, events stable at %d, lookup=registry ✓",
soakCycles*2, eventsAfterFirst)
}
Loading…
Cancel
Save