Browse Source

feat: replica-aware WAL retention (CP13-6)

Flusher now holds WAL entries needed by recoverable replicas.
Both AdvanceTail (physical space) and checkpointLSN (scan gate)
are gated by the minimum flushed LSN across catch-up-eligible
replicas.

New methods on ShipperGroup:
- MinRecoverableFlushedLSN() (uint64, bool): pure read, returns
  min flushed LSN across InSync/Degraded/Disconnected/CatchingUp
  replicas with known progress. Excludes NeedsRebuild.
- EvaluateRetentionBudgets(timeout): separate mutation step,
  escalates replicas that exceed walRetentionTimeout (5m default)
  to NeedsRebuild, releasing their WAL hold.

Flusher integration: evaluates budgets then queries floor on each
flush cycle. If floor < maxLSN, holds both checkpoint and tail.
Extent writes proceed normally (reads work), only WAL reclaim
is deferred.

LastContactTime on WALShipper: updated on barrier success,
handshake success, and catch-up completion. Not on Ship (TCP
write only). Avoids misclassifying idle-but-healthy replicas.

CP13-6 ships with timeout budget only. walRetentionMaxBytes
is deferred (documented as partial slice).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
Ping Qiu 5 days ago
parent
commit
47f0111cae
  1. 28
      weed/storage/blockvol/blockvol.go
  2. 54
      weed/storage/blockvol/flusher.go
  3. 53
      weed/storage/blockvol/shipper_group.go
  4. 16
      weed/storage/blockvol/wal_shipper.go

28
weed/storage/blockvol/blockvol.go

@ -33,6 +33,11 @@ type CreateOptions struct {
var ErrVolumeClosed = errors.New("blockvol: volume closed")
// BlockVol is the core block volume engine.
// walRetentionTimeout is the maximum time a recoverable replica can hold
// WAL entries. After this, the replica is escalated to NeedsRebuild and
// its WAL hold is released. CP13-6: timeout-only budget (max-bytes deferred).
const walRetentionTimeout = 5 * time.Minute
type BlockVol struct {
mu sync.RWMutex
ioMu sync.RWMutex // guards local data mutation (WAL/dirtyMap/extent); Lock for restore/import/expand
@ -178,6 +183,18 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B
Interval: cfg.FlushInterval,
Metrics: v.Metrics,
BatchIO: bio,
// CP13-6: replica-aware WAL retention.
RetentionFloorFn: func() (uint64, bool) {
if v.shipperGroup == nil {
return 0, false
}
return v.shipperGroup.MinRecoverableFlushedLSN()
},
EvaluateRetentionBudgetsFn: func() {
if v.shipperGroup != nil {
v.shipperGroup.EvaluateRetentionBudgets(walRetentionTimeout)
}
},
})
go v.flusher.Run()
v.walAdmission = NewWALAdmission(WALAdmissionConfig{
@ -290,6 +307,17 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
Interval: cfg.FlushInterval,
Metrics: v.Metrics,
BatchIO: bio,
RetentionFloorFn: func() (uint64, bool) {
if v.shipperGroup == nil {
return 0, false
}
return v.shipperGroup.MinRecoverableFlushedLSN()
},
EvaluateRetentionBudgetsFn: func() {
if v.shipperGroup != nil {
v.shipperGroup.EvaluateRetentionBudgets(walRetentionTimeout)
}
},
})
go v.flusher.Run()

54
weed/storage/blockvol/flusher.go

@ -18,6 +18,8 @@ type Flusher struct {
super *Superblock
superMu *sync.Mutex // serializes superblock writes (shared with group commit)
wal *WALWriter
retentionFloorFn func() (uint64, bool) // CP13-6
evaluateRetentionBudgetsFn func() // CP13-6
dirtyMap *DirtyMap
walOffset uint64 // absolute file offset of WAL region
walSize uint64
@ -61,6 +63,9 @@ type FlusherConfig struct {
Logger *log.Logger // optional; defaults to log.Default()
Metrics *EngineMetrics // optional; if nil, no metrics recorded
BatchIO batchio.BatchIO // optional; defaults to batchio.NewStandard()
// CP13-6: replica-aware WAL retention.
RetentionFloorFn func() (floorLSN uint64, hasFloor bool) // nil = no replica hold
EvaluateRetentionBudgetsFn func() // nil = no budget evaluation
}
// NewFlusher creates a flusher. Call Run() in a goroutine.
@ -87,12 +92,14 @@ func NewFlusher(cfg FlusherConfig) *Flusher {
bio: cfg.BatchIO,
logger: cfg.Logger,
metrics: cfg.Metrics,
checkpointLSN: cfg.Super.WALCheckpointLSN,
checkpointTail: 0,
interval: cfg.Interval,
notifyCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
done: make(chan struct{}),
checkpointLSN: cfg.Super.WALCheckpointLSN,
checkpointTail: 0,
interval: cfg.Interval,
notifyCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
done: make(chan struct{}),
retentionFloorFn: cfg.RetentionFloorFn,
evaluateRetentionBudgetsFn: cfg.EvaluateRetentionBudgetsFn,
}
}
@ -394,19 +401,35 @@ func (f *Flusher) flushOnceLocked() error {
f.dirtyMap.Delete(e.Lba)
}
}
f.checkpointLSN = maxLSN
f.checkpointTail = maxWALEnd
// CP13-6: replica-aware WAL retention.
// Evaluate retention budgets first (may escalate stale replicas).
if f.evaluateRetentionBudgetsFn != nil {
f.evaluateRetentionBudgetsFn()
}
// Check retention floor — hold WAL for recoverable replicas.
effectiveLSN := maxLSN
effectiveWALEnd := maxWALEnd
if f.retentionFloorFn != nil {
floorLSN, hasFloor := f.retentionFloorFn()
if hasFloor && maxLSN > floorLSN {
// Hold: don't advance checkpoint/tail past the floor.
// Extent writes are done (reads work), but WAL space is kept.
effectiveLSN = floorLSN
effectiveWALEnd = 0
}
}
f.checkpointLSN = effectiveLSN
f.checkpointTail = effectiveWALEnd
f.mu.Unlock()
// Advance WAL tail to free space.
if maxWALEnd > 0 {
f.wal.AdvanceTail(maxWALEnd)
// Advance WAL tail to free space (only if not held by retention).
if effectiveWALEnd > 0 {
f.wal.AdvanceTail(effectiveWALEnd)
}
// Update superblock checkpoint.
log.Printf("flusher: checkpoint LSN=%d entries=%d WALTail=%d WALHead=%d",
maxLSN, len(entries), f.wal.LogicalTail(), f.wal.LogicalHead())
f.updateSuperblockCheckpoint(maxLSN, f.wal.Tail())
f.updateSuperblockCheckpoint(effectiveLSN, f.wal.Tail())
// Record metrics.
if f.metrics != nil {
@ -418,7 +441,8 @@ func (f *Flusher) flushOnceLocked() error {
}
// updateSuperblockCheckpoint writes the updated checkpoint to disk.
// Acquires superMu to serialize against syncWithWALProgress (group commit).
// This is the primary place where WALHead is persisted to the superblock.
// Recovery's extended scan handles the gap between checkpoints.
func (f *Flusher) updateSuperblockCheckpoint(checkpointLSN uint64, walTail uint64) error {
f.superMu.Lock()
defer f.superMu.Unlock()

53
weed/storage/blockvol/shipper_group.go

@ -1,7 +1,9 @@
package blockvol
import (
"log"
"sync"
"time"
)
// ShipperGroup wraps multiple WALShippers for fan-out replication to N replicas.
@ -127,6 +129,57 @@ func (sg *ShipperGroup) MinReplicaFlushedLSN() (uint64, bool) {
return min, found
}
// MinRecoverableFlushedLSN returns the minimum replicaFlushedLSN across
// shippers that are catch-up candidates (not NeedsRebuild, have flushed progress).
// Pure read — does not mutate state. Returns (0, false) if no recoverable
// replica has known progress.
func (sg *ShipperGroup) MinRecoverableFlushedLSN() (uint64, bool) {
sg.mu.RLock()
defer sg.mu.RUnlock()
var min uint64
found := false
for _, s := range sg.shippers {
if !s.HasFlushedProgress() {
continue
}
if s.State() == ReplicaNeedsRebuild {
continue
}
lsn := s.ReplicaFlushedLSN()
if !found || lsn < min {
min = lsn
found = true
}
}
return min, found
}
// EvaluateRetentionBudgets checks each recoverable replica's contact time
// against the timeout. Replicas that exceed the timeout are transitioned to
// NeedsRebuild, releasing their WAL hold. Must be called before
// MinRecoverableFlushedLSN to ensure stale replicas are escalated first.
func (sg *ShipperGroup) EvaluateRetentionBudgets(timeout time.Duration) {
sg.mu.RLock()
defer sg.mu.RUnlock()
for _, s := range sg.shippers {
if !s.HasFlushedProgress() {
continue // bootstrap shippers — not retention candidates
}
if s.State() == ReplicaNeedsRebuild {
continue
}
ct := s.LastContactTime()
if ct.IsZero() {
continue // no contact yet — skip
}
if time.Since(ct) > timeout {
s.state.Store(uint32(ReplicaNeedsRebuild))
log.Printf("shipper_group: retention timeout for %s (last contact %v ago), transitioning to NeedsRebuild",
s.dataAddr, time.Since(ct).Round(time.Second))
}
}
}
// InSyncCount returns the number of shippers in ReplicaInSync state.
func (sg *ShipperGroup) InSyncCount() int {
sg.mu.RLock()

16
weed/storage/blockvol/wal_shipper.go

@ -69,6 +69,7 @@ type WALShipper struct {
hasFlushedProgress atomic.Bool // true once replica returns a valid (non-zero) FlushedLSN
state atomic.Uint32 // ReplicaState
catchupFailures int // consecutive catch-up failures; reset on success
lastContactTime atomic.Value // time.Time: last successful barrier/handshake/catch-up
stopped atomic.Bool
}
@ -277,6 +278,20 @@ func (s *WALShipper) State() ReplicaState {
return ReplicaState(s.state.Load())
}
// LastContactTime returns the last time this replica had successful
// durable contact (barrier success, reconnect handshake, catch-up completion).
// Returns zero time if no contact has occurred.
func (s *WALShipper) LastContactTime() time.Time {
if v := s.lastContactTime.Load(); v != nil {
return v.(time.Time)
}
return time.Time{}
}
func (s *WALShipper) touchContactTime() {
s.lastContactTime.Store(time.Now())
}
// IsDegraded returns true if the replica is not sync-eligible (any state
// other than InSync). This overloads Disconnected, Connecting, CatchingUp,
// NeedsRebuild, and Degraded into one "not healthy" shape for backward
@ -391,6 +406,7 @@ func (s *WALShipper) doReconnectAndCatchUp() error {
func (s *WALShipper) markInSync() {
s.state.Store(uint32(ReplicaInSync))
s.catchupFailures = 0
s.touchContactTime()
log.Printf("wal_shipper: replica in-sync (data=%s, ctrl=%s)", s.dataAddr, s.controlAddr)
}

Loading…
Cancel
Save