diff --git a/weed/storage/blockvol/qa_wal_admission_test.go b/weed/storage/blockvol/qa_wal_admission_test.go index b29487944..45f5cf8ac 100644 --- a/weed/storage/blockvol/qa_wal_admission_test.go +++ b/weed/storage/blockvol/qa_wal_admission_test.go @@ -460,3 +460,261 @@ func TestQA_Admission_WriteLBAIntegration(t *testing.T) { } t.Logf("all %d writes succeeded with maxConcurrent=4", writers*writesPerWriter) } + +// ============================================================================= +// QA Adversarial Tests for WAL Admission Metrics (Item 2: WAL Visibility Hooks) +// +// These tests verify counter correctness under concurrent pressure, metrics +// consistency across all code paths, and integration with EngineMetrics. +// ============================================================================= + +// TestQA_Admission_Metrics_ConcurrentCountersConsistent verifies that under +// heavy concurrent load, Total >= Soft + Hard and Timeout <= Hard. +func TestQA_Admission_Metrics_ConcurrentCountersConsistent(t *testing.T) { + m := NewEngineMetrics() + var pressure atomic.Int64 + pressure.Store(50) + + a := NewWALAdmission(WALAdmissionConfig{ + MaxConcurrent: 8, + SoftWatermark: 0.7, + HardWatermark: 0.9, + WALUsedFn: func() float64 { return float64(pressure.Load()) / 100.0 }, + NotifyFn: func() {}, + ClosedFn: func() bool { return false }, + Metrics: m, + }) + + // Oscillate pressure so writers hit all code paths. + stopOsc := make(chan struct{}) + go func() { + zones := []int64{30, 80, 95, 50, 75, 92, 40, 85} + i := 0 + for { + select { + case <-stopOsc: + return + default: + pressure.Store(zones[i%len(zones)]) + i++ + time.Sleep(time.Millisecond) + } + } + }() + + var wg sync.WaitGroup + const writers = 16 + const iters = 30 + + wg.Add(writers) + for i := 0; i < writers; i++ { + go func() { + defer wg.Done() + for j := 0; j < iters; j++ { + err := a.Acquire(50 * time.Millisecond) + if err == nil { + time.Sleep(time.Duration(rand.Intn(50)) * time.Microsecond) + a.Release() + } + } + }() + } + wg.Wait() + close(stopOsc) + + total := m.WALAdmitTotal.Load() + soft := m.WALAdmitSoftTotal.Load() + hard := m.WALAdmitHardTotal.Load() + timeout := m.WALAdmitTimeoutTotal.Load() + waitCount, waitSumNs := m.WALAdmitWaitSnapshot() + + // Every Acquire call records exactly one total count. + if total != writers*iters { + t.Fatalf("WALAdmitTotal = %d, want %d", total, writers*iters) + } + + // soft and hard are mutually exclusive per call. + // But a call hitting hard also doesn't count as soft. + // Timeout is a subset of hard path calls. + if timeout > hard { + t.Fatalf("Timeout (%d) > Hard (%d): timeouts only happen in hard path", timeout, hard) + } + + // Wait histogram count must equal total. + if waitCount != total { + t.Fatalf("WALAdmitWait count = %d, want %d (same as total)", waitCount, total) + } + + // Sum of wait times must be non-negative. + if waitSumNs < 0 { + t.Fatalf("WALAdmitWait sum = %d, must be >= 0", waitSumNs) + } + + t.Logf("total=%d soft=%d hard=%d timeout=%d waitCount=%d waitSumNs=%d", + total, soft, hard, timeout, waitCount, waitSumNs) +} + +// TestQA_Admission_Metrics_SemaphoreWaitPathRecords verifies that metrics are +// recorded even when the semaphore-wait path (not hard watermark) is the +// bottleneck. The semaphore-wait code has a separate recordAdmit call. +func TestQA_Admission_Metrics_SemaphoreWaitPathRecords(t *testing.T) { + m := NewEngineMetrics() + a := NewWALAdmission(WALAdmissionConfig{ + MaxConcurrent: 1, + SoftWatermark: 0.7, + HardWatermark: 0.9, + WALUsedFn: func() float64 { return 0.0 }, // no pressure + NotifyFn: func() {}, + ClosedFn: func() bool { return false }, + Metrics: m, + }) + + // Fill semaphore. + a.sem <- struct{}{} + + // Release after 10ms. + go func() { + time.Sleep(10 * time.Millisecond) + <-a.sem + }() + + if err := a.Acquire(500 * time.Millisecond); err != nil { + t.Fatalf("Acquire: %v", err) + } + a.Release() + + if m.WALAdmitTotal.Load() != 1 { + t.Fatalf("WALAdmitTotal = %d, want 1", m.WALAdmitTotal.Load()) + } + // No pressure → no soft/hard flags. + if m.WALAdmitSoftTotal.Load() != 0 { + t.Fatalf("WALAdmitSoftTotal = %d, want 0", m.WALAdmitSoftTotal.Load()) + } + if m.WALAdmitHardTotal.Load() != 0 { + t.Fatalf("WALAdmitHardTotal = %d, want 0", m.WALAdmitHardTotal.Load()) + } + + // Wait time should be >10ms (semaphore wait). + _, waitSum := m.WALAdmitWaitSnapshot() + if waitSum < int64(5*time.Millisecond) { + t.Fatalf("WALAdmitWait sum = %dns, expected >= 5ms from semaphore wait", waitSum) + } +} + +// TestQA_Admission_Metrics_SemaphoreTimeoutRecords verifies that a timeout +// during semaphore wait (not hard watermark) still records timedOut=true. +func TestQA_Admission_Metrics_SemaphoreTimeoutRecords(t *testing.T) { + m := NewEngineMetrics() + a := NewWALAdmission(WALAdmissionConfig{ + MaxConcurrent: 1, + SoftWatermark: 0.7, + HardWatermark: 0.9, + WALUsedFn: func() float64 { return 0.0 }, // no pressure + NotifyFn: func() {}, + ClosedFn: func() bool { return false }, + Metrics: m, + }) + + // Fill semaphore permanently. + a.sem <- struct{}{} + + err := a.Acquire(10 * time.Millisecond) + if !errors.Is(err, ErrWALFull) { + t.Fatalf("expected ErrWALFull, got %v", err) + } + + if m.WALAdmitTotal.Load() != 1 { + t.Fatalf("WALAdmitTotal = %d, want 1", m.WALAdmitTotal.Load()) + } + if m.WALAdmitTimeoutTotal.Load() != 1 { + t.Fatalf("WALAdmitTimeoutTotal = %d, want 1 (semaphore timeout)", m.WALAdmitTimeoutTotal.Load()) + } + // Hard should NOT be set — timeout was in semaphore path, not hard pressure path. + if m.WALAdmitHardTotal.Load() != 0 { + t.Fatalf("WALAdmitHardTotal = %d, want 0", m.WALAdmitHardTotal.Load()) + } + + <-a.sem // cleanup +} + +// TestQA_Admission_Metrics_CloseDuringSemaphoreRecords verifies that volume +// close during semaphore wait records metrics correctly (no timeout flag). +func TestQA_Admission_Metrics_CloseDuringSemaphoreRecords(t *testing.T) { + m := NewEngineMetrics() + var closed atomic.Bool + + a := NewWALAdmission(WALAdmissionConfig{ + MaxConcurrent: 1, + SoftWatermark: 0.7, + HardWatermark: 0.9, + WALUsedFn: func() float64 { return 0.0 }, + NotifyFn: func() {}, + ClosedFn: closed.Load, + Metrics: m, + }) + + a.sem <- struct{}{} + + go func() { + time.Sleep(10 * time.Millisecond) + closed.Store(true) + }() + + err := a.Acquire(2 * time.Second) + if !errors.Is(err, ErrVolumeClosed) { + t.Fatalf("expected ErrVolumeClosed, got %v", err) + } + + if m.WALAdmitTotal.Load() != 1 { + t.Fatalf("WALAdmitTotal = %d, want 1", m.WALAdmitTotal.Load()) + } + // Close is NOT a timeout. + if m.WALAdmitTimeoutTotal.Load() != 0 { + t.Fatalf("WALAdmitTimeoutTotal = %d, want 0 (close, not timeout)", m.WALAdmitTimeoutTotal.Load()) + } + + <-a.sem +} + +// TestQA_Admission_Metrics_Integration_WriteLBA verifies that real WriteLBA +// calls produce correct admission metrics in the engine. +func TestQA_Admission_Metrics_Integration_WriteLBA(t *testing.T) { + dir := t.TempDir() + cfg := DefaultConfig() + cfg.WALMaxConcurrentWrites = 4 + cfg.FlushInterval = 5 * time.Millisecond + cfg.WALFullTimeout = 2 * time.Second + + vol, err := CreateBlockVol(dir+"/test.blk", CreateOptions{ + VolumeSize: 256 * 1024, + BlockSize: 4096, + WALSize: 128 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + + // Write 20 blocks. + data := make([]byte, 4096) + for i := 0; i < 20; i++ { + data[0] = byte(i) + if err := vol.WriteLBA(uint64(i%64), data); err != nil { + t.Fatalf("WriteLBA %d: %v", i, err) + } + } + + // Engine metrics should show 20 admits with 0 timeouts. + m := vol.Metrics + if m == nil { + t.Skip("volume has no metrics (nil)") + } + + total := m.WALAdmitTotal.Load() + if total != 20 { + t.Fatalf("WALAdmitTotal = %d, want 20", total) + } + if m.WALAdmitTimeoutTotal.Load() != 0 { + t.Fatalf("WALAdmitTimeoutTotal = %d, want 0", m.WALAdmitTimeoutTotal.Load()) + } +}