From 47f0111cae8a36db03d38ae1a9c2a7c1413b4aa2 Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Wed, 25 Mar 2026 22:04:23 -0700 Subject: [PATCH] feat: replica-aware WAL retention (CP13-6) Flusher now holds WAL entries needed by recoverable replicas. Both AdvanceTail (physical space) and checkpointLSN (scan gate) are gated by the minimum flushed LSN across catch-up-eligible replicas. New methods on ShipperGroup: - MinRecoverableFlushedLSN() (uint64, bool): pure read, returns min flushed LSN across InSync/Degraded/Disconnected/CatchingUp replicas with known progress. Excludes NeedsRebuild. - EvaluateRetentionBudgets(timeout): separate mutation step, escalates replicas that exceed walRetentionTimeout (5m default) to NeedsRebuild, releasing their WAL hold. Flusher integration: evaluates budgets then queries floor on each flush cycle. If floor < maxLSN, holds both checkpoint and tail. Extent writes proceed normally (reads work), only WAL reclaim is deferred. LastContactTime on WALShipper: updated on barrier success, handshake success, and catch-up completion. Not on Ship (TCP write only). Avoids misclassifying idle-but-healthy replicas. CP13-6 ships with timeout budget only. walRetentionMaxBytes is deferred (documented as partial slice). Co-Authored-By: Claude Opus 4.6 (1M context) --- weed/storage/blockvol/blockvol.go | 28 +++++++++++++ weed/storage/blockvol/flusher.go | 54 +++++++++++++++++++------- weed/storage/blockvol/shipper_group.go | 53 +++++++++++++++++++++++++ weed/storage/blockvol/wal_shipper.go | 16 ++++++++ 4 files changed, 136 insertions(+), 15 deletions(-) diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index 150d78d34..0683169f8 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -33,6 +33,11 @@ type CreateOptions struct { var ErrVolumeClosed = errors.New("blockvol: volume closed") // BlockVol is the core block volume engine. +// walRetentionTimeout is the maximum time a recoverable replica can hold +// WAL entries. After this, the replica is escalated to NeedsRebuild and +// its WAL hold is released. CP13-6: timeout-only budget (max-bytes deferred). +const walRetentionTimeout = 5 * time.Minute + type BlockVol struct { mu sync.RWMutex ioMu sync.RWMutex // guards local data mutation (WAL/dirtyMap/extent); Lock for restore/import/expand @@ -178,6 +183,18 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B Interval: cfg.FlushInterval, Metrics: v.Metrics, BatchIO: bio, + // CP13-6: replica-aware WAL retention. + RetentionFloorFn: func() (uint64, bool) { + if v.shipperGroup == nil { + return 0, false + } + return v.shipperGroup.MinRecoverableFlushedLSN() + }, + EvaluateRetentionBudgetsFn: func() { + if v.shipperGroup != nil { + v.shipperGroup.EvaluateRetentionBudgets(walRetentionTimeout) + } + }, }) go v.flusher.Run() v.walAdmission = NewWALAdmission(WALAdmissionConfig{ @@ -290,6 +307,17 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) { Interval: cfg.FlushInterval, Metrics: v.Metrics, BatchIO: bio, + RetentionFloorFn: func() (uint64, bool) { + if v.shipperGroup == nil { + return 0, false + } + return v.shipperGroup.MinRecoverableFlushedLSN() + }, + EvaluateRetentionBudgetsFn: func() { + if v.shipperGroup != nil { + v.shipperGroup.EvaluateRetentionBudgets(walRetentionTimeout) + } + }, }) go v.flusher.Run() diff --git a/weed/storage/blockvol/flusher.go b/weed/storage/blockvol/flusher.go index fe902fd9d..ba0dbb414 100644 --- a/weed/storage/blockvol/flusher.go +++ b/weed/storage/blockvol/flusher.go @@ -18,6 +18,8 @@ type Flusher struct { super *Superblock superMu *sync.Mutex // serializes superblock writes (shared with group commit) wal *WALWriter + retentionFloorFn func() (uint64, bool) // CP13-6 + evaluateRetentionBudgetsFn func() // CP13-6 dirtyMap *DirtyMap walOffset uint64 // absolute file offset of WAL region walSize uint64 @@ -61,6 +63,9 @@ type FlusherConfig struct { Logger *log.Logger // optional; defaults to log.Default() Metrics *EngineMetrics // optional; if nil, no metrics recorded BatchIO batchio.BatchIO // optional; defaults to batchio.NewStandard() + // CP13-6: replica-aware WAL retention. + RetentionFloorFn func() (floorLSN uint64, hasFloor bool) // nil = no replica hold + EvaluateRetentionBudgetsFn func() // nil = no budget evaluation } // NewFlusher creates a flusher. Call Run() in a goroutine. @@ -87,12 +92,14 @@ func NewFlusher(cfg FlusherConfig) *Flusher { bio: cfg.BatchIO, logger: cfg.Logger, metrics: cfg.Metrics, - checkpointLSN: cfg.Super.WALCheckpointLSN, - checkpointTail: 0, - interval: cfg.Interval, - notifyCh: make(chan struct{}, 1), - stopCh: make(chan struct{}), - done: make(chan struct{}), + checkpointLSN: cfg.Super.WALCheckpointLSN, + checkpointTail: 0, + interval: cfg.Interval, + notifyCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), + done: make(chan struct{}), + retentionFloorFn: cfg.RetentionFloorFn, + evaluateRetentionBudgetsFn: cfg.EvaluateRetentionBudgetsFn, } } @@ -394,19 +401,35 @@ func (f *Flusher) flushOnceLocked() error { f.dirtyMap.Delete(e.Lba) } } - f.checkpointLSN = maxLSN - f.checkpointTail = maxWALEnd + // CP13-6: replica-aware WAL retention. + // Evaluate retention budgets first (may escalate stale replicas). + if f.evaluateRetentionBudgetsFn != nil { + f.evaluateRetentionBudgetsFn() + } + // Check retention floor — hold WAL for recoverable replicas. + effectiveLSN := maxLSN + effectiveWALEnd := maxWALEnd + if f.retentionFloorFn != nil { + floorLSN, hasFloor := f.retentionFloorFn() + if hasFloor && maxLSN > floorLSN { + // Hold: don't advance checkpoint/tail past the floor. + // Extent writes are done (reads work), but WAL space is kept. + effectiveLSN = floorLSN + effectiveWALEnd = 0 + } + } + + f.checkpointLSN = effectiveLSN + f.checkpointTail = effectiveWALEnd f.mu.Unlock() - // Advance WAL tail to free space. - if maxWALEnd > 0 { - f.wal.AdvanceTail(maxWALEnd) + // Advance WAL tail to free space (only if not held by retention). + if effectiveWALEnd > 0 { + f.wal.AdvanceTail(effectiveWALEnd) } // Update superblock checkpoint. - log.Printf("flusher: checkpoint LSN=%d entries=%d WALTail=%d WALHead=%d", - maxLSN, len(entries), f.wal.LogicalTail(), f.wal.LogicalHead()) - f.updateSuperblockCheckpoint(maxLSN, f.wal.Tail()) + f.updateSuperblockCheckpoint(effectiveLSN, f.wal.Tail()) // Record metrics. if f.metrics != nil { @@ -418,7 +441,8 @@ func (f *Flusher) flushOnceLocked() error { } // updateSuperblockCheckpoint writes the updated checkpoint to disk. -// Acquires superMu to serialize against syncWithWALProgress (group commit). +// This is the primary place where WALHead is persisted to the superblock. +// Recovery's extended scan handles the gap between checkpoints. func (f *Flusher) updateSuperblockCheckpoint(checkpointLSN uint64, walTail uint64) error { f.superMu.Lock() defer f.superMu.Unlock() diff --git a/weed/storage/blockvol/shipper_group.go b/weed/storage/blockvol/shipper_group.go index 58c6a8759..07ddd8695 100644 --- a/weed/storage/blockvol/shipper_group.go +++ b/weed/storage/blockvol/shipper_group.go @@ -1,7 +1,9 @@ package blockvol import ( + "log" "sync" + "time" ) // ShipperGroup wraps multiple WALShippers for fan-out replication to N replicas. @@ -127,6 +129,57 @@ func (sg *ShipperGroup) MinReplicaFlushedLSN() (uint64, bool) { return min, found } +// MinRecoverableFlushedLSN returns the minimum replicaFlushedLSN across +// shippers that are catch-up candidates (not NeedsRebuild, have flushed progress). +// Pure read — does not mutate state. Returns (0, false) if no recoverable +// replica has known progress. +func (sg *ShipperGroup) MinRecoverableFlushedLSN() (uint64, bool) { + sg.mu.RLock() + defer sg.mu.RUnlock() + var min uint64 + found := false + for _, s := range sg.shippers { + if !s.HasFlushedProgress() { + continue + } + if s.State() == ReplicaNeedsRebuild { + continue + } + lsn := s.ReplicaFlushedLSN() + if !found || lsn < min { + min = lsn + found = true + } + } + return min, found +} + +// EvaluateRetentionBudgets checks each recoverable replica's contact time +// against the timeout. Replicas that exceed the timeout are transitioned to +// NeedsRebuild, releasing their WAL hold. Must be called before +// MinRecoverableFlushedLSN to ensure stale replicas are escalated first. +func (sg *ShipperGroup) EvaluateRetentionBudgets(timeout time.Duration) { + sg.mu.RLock() + defer sg.mu.RUnlock() + for _, s := range sg.shippers { + if !s.HasFlushedProgress() { + continue // bootstrap shippers — not retention candidates + } + if s.State() == ReplicaNeedsRebuild { + continue + } + ct := s.LastContactTime() + if ct.IsZero() { + continue // no contact yet — skip + } + if time.Since(ct) > timeout { + s.state.Store(uint32(ReplicaNeedsRebuild)) + log.Printf("shipper_group: retention timeout for %s (last contact %v ago), transitioning to NeedsRebuild", + s.dataAddr, time.Since(ct).Round(time.Second)) + } + } +} + // InSyncCount returns the number of shippers in ReplicaInSync state. func (sg *ShipperGroup) InSyncCount() int { sg.mu.RLock() diff --git a/weed/storage/blockvol/wal_shipper.go b/weed/storage/blockvol/wal_shipper.go index ec0f09e56..d3785ce2f 100644 --- a/weed/storage/blockvol/wal_shipper.go +++ b/weed/storage/blockvol/wal_shipper.go @@ -69,6 +69,7 @@ type WALShipper struct { hasFlushedProgress atomic.Bool // true once replica returns a valid (non-zero) FlushedLSN state atomic.Uint32 // ReplicaState catchupFailures int // consecutive catch-up failures; reset on success + lastContactTime atomic.Value // time.Time: last successful barrier/handshake/catch-up stopped atomic.Bool } @@ -277,6 +278,20 @@ func (s *WALShipper) State() ReplicaState { return ReplicaState(s.state.Load()) } +// LastContactTime returns the last time this replica had successful +// durable contact (barrier success, reconnect handshake, catch-up completion). +// Returns zero time if no contact has occurred. +func (s *WALShipper) LastContactTime() time.Time { + if v := s.lastContactTime.Load(); v != nil { + return v.(time.Time) + } + return time.Time{} +} + +func (s *WALShipper) touchContactTime() { + s.lastContactTime.Store(time.Now()) +} + // IsDegraded returns true if the replica is not sync-eligible (any state // other than InSync). This overloads Disconnected, Connecting, CatchingUp, // NeedsRebuild, and Degraded into one "not healthy" shape for backward @@ -391,6 +406,7 @@ func (s *WALShipper) doReconnectAndCatchUp() error { func (s *WALShipper) markInSync() { s.state.Store(uint32(ReplicaInSync)) s.catchupFailures = 0 + s.touchContactTime() log.Printf("wal_shipper: replica in-sync (data=%s, ctrl=%s)", s.dataAddr, s.controlAddr) }