Browse Source

feat: WAL admission metrics for visibility into write pressure behavior

Add counters (total, soft, hard, timeout) and wait-time histogram to
WALAdmission, wired through EngineMetrics and exported as Prometheus
metrics. Six new tests verify all code paths. Nil-safe for backwards
compatibility.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 4 days ago
parent
commit
e22e57a3f7
  1. 2
      weed/storage/blockvol/blockvol.go
  2. 27
      weed/storage/blockvol/engine_metrics.go
  3. 22
      weed/storage/blockvol/iscsi/cmd/iscsi-target/metrics.go
  4. 21
      weed/storage/blockvol/wal_admission.go
  5. 179
      weed/storage/blockvol/wal_admission_test.go

2
weed/storage/blockvol/blockvol.go

@ -175,6 +175,7 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B
WALUsedFn: wal.UsedFraction,
NotifyFn: v.flusher.NotifyUrgent,
ClosedFn: v.closed.Load,
Metrics: v.Metrics,
})
return v, nil
}
@ -288,6 +289,7 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
WALUsedFn: wal.UsedFraction,
NotifyFn: v.flusher.NotifyUrgent,
ClosedFn: v.closed.Load,
Metrics: v.Metrics,
})
return v, nil

27
weed/storage/blockvol/engine_metrics.go

@ -30,6 +30,13 @@ type EngineMetrics struct {
ScrubErrorsTotal atomic.Uint64
scrubDurationNs atomicHistogram
// WAL Admission
WALAdmitTotal atomic.Uint64 // total Acquire calls
WALAdmitSoftTotal atomic.Uint64 // soft watermark throttles
WALAdmitHardTotal atomic.Uint64 // hard watermark blocks
WALAdmitTimeoutTotal atomic.Uint64 // ErrWALFull timeouts
walAdmitWaitNs atomicHistogram // wait time in Acquire
// Durability (CP8-3-1)
DurabilityBarrierFailedTotal atomic.Uint64 // sync_all barrier failures
DurabilityQuorumLostTotal atomic.Uint64 // sync_quorum quorum lost
@ -73,6 +80,26 @@ func (m *EngineMetrics) RecordWALBarrier(dur time.Duration, failed bool) {
}
}
// RecordWALAdmit records a WAL admission Acquire call.
func (m *EngineMetrics) RecordWALAdmit(waitDur time.Duration, soft, hard, timedOut bool) {
m.WALAdmitTotal.Add(1)
m.walAdmitWaitNs.record(waitDur.Nanoseconds())
if soft {
m.WALAdmitSoftTotal.Add(1)
}
if hard {
m.WALAdmitHardTotal.Add(1)
}
if timedOut {
m.WALAdmitTimeoutTotal.Add(1)
}
}
// WALAdmitWaitSnapshot returns WAL admission wait stats.
func (m *EngineMetrics) WALAdmitWaitSnapshot() (count uint64, sumNs int64) {
return m.walAdmitWaitNs.snapshot()
}
// RecordScrubPass records a completed scrub pass.
func (m *EngineMetrics) RecordScrubPass(dur time.Duration, errors int64) {
m.ScrubPassesTotal.Add(1)

22
weed/storage/blockvol/iscsi/cmd/iscsi-target/metrics.go

@ -138,6 +138,28 @@ func newMetricsAdapter(inner iscsi.BlockDevice, vol *blockvol.BlockVol, reg prom
Name: "replica_failed_barriers_total", Help: "Total failed barrier requests",
}, func() float64 { return float64(em.WALFailedBarriersTotal.Load()) }))
// WAL Admission
reg.MustRegister(prometheus.NewCounterFunc(prometheus.CounterOpts{
Namespace: "seaweedfs", Subsystem: "blockvol",
Name: "wal_admit_total", Help: "Total WAL admission Acquire calls",
}, func() float64 { return float64(em.WALAdmitTotal.Load()) }))
reg.MustRegister(prometheus.NewCounterFunc(prometheus.CounterOpts{
Namespace: "seaweedfs", Subsystem: "blockvol",
Name: "wal_admit_soft_total", Help: "Soft watermark throttle events",
}, func() float64 { return float64(em.WALAdmitSoftTotal.Load()) }))
reg.MustRegister(prometheus.NewCounterFunc(prometheus.CounterOpts{
Namespace: "seaweedfs", Subsystem: "blockvol",
Name: "wal_admit_hard_total", Help: "Hard watermark block events",
}, func() float64 { return float64(em.WALAdmitHardTotal.Load()) }))
reg.MustRegister(prometheus.NewCounterFunc(prometheus.CounterOpts{
Namespace: "seaweedfs", Subsystem: "blockvol",
Name: "wal_admit_timeout_total", Help: "WAL admission timeouts (ErrWALFull)",
}, func() float64 { return float64(em.WALAdmitTimeoutTotal.Load()) }))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "seaweedfs", Subsystem: "blockvol",
Name: "wal_admit_wait_seconds_total", Help: "Total time spent waiting in WAL admission (seconds)",
}, func() float64 { _, s := em.WALAdmitWaitSnapshot(); return float64(s) / 1e9 }))
// Scrub
reg.MustRegister(prometheus.NewCounterFunc(prometheus.CounterOpts{
Namespace: "seaweedfs", Subsystem: "blockvol",

21
weed/storage/blockvol/wal_admission.go

@ -28,6 +28,8 @@ type WALAdmission struct {
// sleepFn is the sleep function. Replaced in tests for determinism.
sleepFn func(time.Duration)
metrics *EngineMetrics // optional; if nil, no metrics recorded
}
// WALAdmissionConfig holds parameters for WALAdmission construction.
@ -38,6 +40,7 @@ type WALAdmissionConfig struct {
WALUsedFn func() float64 // returns WAL used fraction
NotifyFn func() // wake flusher on pressure
ClosedFn func() bool // check if volume is closed
Metrics *EngineMetrics // optional; if nil, no metrics recorded
}
// NewWALAdmission creates a WAL admission controller.
@ -50,6 +53,7 @@ func NewWALAdmission(cfg WALAdmissionConfig) *WALAdmission {
hardMark: cfg.HardWatermark,
closedFn: cfg.ClosedFn,
sleepFn: time.Sleep,
metrics: cfg.Metrics,
}
}
@ -57,6 +61,9 @@ func NewWALAdmission(cfg WALAdmissionConfig) *WALAdmission {
// The timeout covers both the watermark wait and semaphore acquisition.
// Returns ErrWALFull on timeout, ErrVolumeClosed if the volume closes.
func (a *WALAdmission) Acquire(timeout time.Duration) error {
start := time.Now()
var hitSoft, hitHard bool
deadline := time.NewTimer(timeout)
defer deadline.Stop()
@ -64,14 +71,17 @@ func (a *WALAdmission) Acquire(timeout time.Duration) error {
// Hard watermark gate: wait for flusher to drain before competing for semaphore.
if pressure >= a.hardMark {
hitHard = true
a.notifyFn()
for a.walUsed() >= a.hardMark {
if a.closedFn() {
a.recordAdmit(start, hitSoft, hitHard, false)
return ErrVolumeClosed
}
a.notifyFn()
select {
case <-deadline.C:
a.recordAdmit(start, hitSoft, hitHard, true)
return ErrWALFull
default:
}
@ -80,6 +90,7 @@ func (a *WALAdmission) Acquire(timeout time.Duration) error {
// Pressure dropped — fall through to semaphore acquisition.
} else if pressure >= a.softMark {
// Soft watermark: small delay to desynchronize herd.
hitSoft = true
a.notifyFn()
scale := (pressure - a.softMark) / (a.hardMark - a.softMark)
if scale > 1 {
@ -95,6 +106,7 @@ func (a *WALAdmission) Acquire(timeout time.Duration) error {
// Acquire semaphore slot using the same deadline.
select {
case a.sem <- struct{}{}:
a.recordAdmit(start, hitSoft, hitHard, false)
return nil
default:
}
@ -104,17 +116,26 @@ func (a *WALAdmission) Acquire(timeout time.Duration) error {
for {
select {
case a.sem <- struct{}{}:
a.recordAdmit(start, hitSoft, hitHard, false)
return nil
case <-deadline.C:
a.recordAdmit(start, hitSoft, hitHard, true)
return ErrWALFull
case <-closeTick.C:
if a.closedFn() {
a.recordAdmit(start, hitSoft, hitHard, false)
return ErrVolumeClosed
}
}
}
}
func (a *WALAdmission) recordAdmit(start time.Time, soft, hard, timedOut bool) {
if a.metrics != nil {
a.metrics.RecordWALAdmit(time.Since(start), soft, hard, timedOut)
}
}
// Release returns a write slot to the semaphore.
func (a *WALAdmission) Release() {
<-a.sem

179
weed/storage/blockvol/wal_admission_test.go

@ -352,3 +352,182 @@ func TestWALAdmission_CloseDuringSemaphoreWait(t *testing.T) {
// Drain.
<-a.sem
}
// --- WAL Admission Metrics Tests (Item 2: WAL visibility hooks) ---
func TestWALAdmission_Metrics_NoPressure(t *testing.T) {
m := NewEngineMetrics()
a := NewWALAdmission(WALAdmissionConfig{
MaxConcurrent: 16,
SoftWatermark: 0.7,
HardWatermark: 0.9,
WALUsedFn: func() float64 { return 0.0 },
NotifyFn: func() {},
ClosedFn: func() bool { return false },
Metrics: m,
})
if err := a.Acquire(100 * time.Millisecond); err != nil {
t.Fatalf("Acquire: %v", err)
}
a.Release()
if m.WALAdmitTotal.Load() != 1 {
t.Errorf("WALAdmitTotal = %d, want 1", m.WALAdmitTotal.Load())
}
if m.WALAdmitSoftTotal.Load() != 0 {
t.Errorf("WALAdmitSoftTotal = %d, want 0", m.WALAdmitSoftTotal.Load())
}
if m.WALAdmitHardTotal.Load() != 0 {
t.Errorf("WALAdmitHardTotal = %d, want 0", m.WALAdmitHardTotal.Load())
}
if m.WALAdmitTimeoutTotal.Load() != 0 {
t.Errorf("WALAdmitTimeoutTotal = %d, want 0", m.WALAdmitTimeoutTotal.Load())
}
}
func TestWALAdmission_Metrics_SoftWatermark(t *testing.T) {
m := NewEngineMetrics()
a := NewWALAdmission(WALAdmissionConfig{
MaxConcurrent: 16,
SoftWatermark: 0.7,
HardWatermark: 0.9,
WALUsedFn: func() float64 { return 0.8 },
NotifyFn: func() {},
ClosedFn: func() bool { return false },
Metrics: m,
})
a.sleepFn = func(d time.Duration) {} // no-op
if err := a.Acquire(100 * time.Millisecond); err != nil {
t.Fatalf("Acquire: %v", err)
}
a.Release()
if m.WALAdmitTotal.Load() != 1 {
t.Errorf("WALAdmitTotal = %d, want 1", m.WALAdmitTotal.Load())
}
if m.WALAdmitSoftTotal.Load() != 1 {
t.Errorf("WALAdmitSoftTotal = %d, want 1", m.WALAdmitSoftTotal.Load())
}
if m.WALAdmitHardTotal.Load() != 0 {
t.Errorf("WALAdmitHardTotal = %d, want 0", m.WALAdmitHardTotal.Load())
}
}
func TestWALAdmission_Metrics_HardWatermark(t *testing.T) {
m := NewEngineMetrics()
var pressure atomic.Int64
pressure.Store(95)
var sleepCount atomic.Int64
a := NewWALAdmission(WALAdmissionConfig{
MaxConcurrent: 16,
SoftWatermark: 0.7,
HardWatermark: 0.9,
WALUsedFn: func() float64 { return float64(pressure.Load()) / 100.0 },
NotifyFn: func() {},
ClosedFn: func() bool { return false },
Metrics: m,
})
a.sleepFn = func(d time.Duration) {
if sleepCount.Add(1) >= 3 {
pressure.Store(50) // drain
}
}
if err := a.Acquire(1 * time.Second); err != nil {
t.Fatalf("Acquire: %v", err)
}
a.Release()
if m.WALAdmitTotal.Load() != 1 {
t.Errorf("WALAdmitTotal = %d, want 1", m.WALAdmitTotal.Load())
}
if m.WALAdmitHardTotal.Load() != 1 {
t.Errorf("WALAdmitHardTotal = %d, want 1", m.WALAdmitHardTotal.Load())
}
// Wait snapshot should have recorded exactly 1 observation.
count, _ := m.WALAdmitWaitSnapshot()
if count != 1 {
t.Errorf("WALAdmitWait count = %d, want 1", count)
}
}
func TestWALAdmission_Metrics_Timeout(t *testing.T) {
m := NewEngineMetrics()
a := NewWALAdmission(WALAdmissionConfig{
MaxConcurrent: 16,
SoftWatermark: 0.7,
HardWatermark: 0.9,
WALUsedFn: func() float64 { return 0.95 }, // always above hard
NotifyFn: func() {},
ClosedFn: func() bool { return false },
Metrics: m,
})
a.sleepFn = func(d time.Duration) {}
err := a.Acquire(10 * time.Millisecond)
if !errors.Is(err, ErrWALFull) {
t.Fatalf("expected ErrWALFull, got %v", err)
}
if m.WALAdmitTotal.Load() != 1 {
t.Errorf("WALAdmitTotal = %d, want 1", m.WALAdmitTotal.Load())
}
if m.WALAdmitHardTotal.Load() != 1 {
t.Errorf("WALAdmitHardTotal = %d, want 1", m.WALAdmitHardTotal.Load())
}
if m.WALAdmitTimeoutTotal.Load() != 1 {
t.Errorf("WALAdmitTimeoutTotal = %d, want 1", m.WALAdmitTimeoutTotal.Load())
}
}
func TestWALAdmission_Metrics_NilMetrics(t *testing.T) {
// Ensure no panic when Metrics is nil (backwards compat).
a := NewWALAdmission(WALAdmissionConfig{
MaxConcurrent: 16,
SoftWatermark: 0.7,
HardWatermark: 0.9,
WALUsedFn: func() float64 { return 0.8 },
NotifyFn: func() {},
ClosedFn: func() bool { return false },
// Metrics: nil — intentionally omitted
})
a.sleepFn = func(d time.Duration) {}
if err := a.Acquire(100 * time.Millisecond); err != nil {
t.Fatalf("Acquire: %v", err)
}
a.Release()
// No panic = pass.
}
func TestWALAdmission_Metrics_ClosedDuringHard(t *testing.T) {
m := NewEngineMetrics()
var closed atomic.Bool
a := NewWALAdmission(WALAdmissionConfig{
MaxConcurrent: 16,
SoftWatermark: 0.7,
HardWatermark: 0.9,
WALUsedFn: func() float64 { return 0.95 },
NotifyFn: func() {},
ClosedFn: closed.Load,
Metrics: m,
})
a.sleepFn = func(d time.Duration) { closed.Store(true) }
err := a.Acquire(1 * time.Second)
if !errors.Is(err, ErrVolumeClosed) {
t.Fatalf("expected ErrVolumeClosed, got %v", err)
}
// Should still record metrics even on close.
if m.WALAdmitTotal.Load() != 1 {
t.Errorf("WALAdmitTotal = %d, want 1", m.WALAdmitTotal.Load())
}
if m.WALAdmitHardTotal.Load() != 1 {
t.Errorf("WALAdmitHardTotal = %d, want 1", m.WALAdmitHardTotal.Load())
}
}
Loading…
Cancel
Save