Browse Source

fix: serialize LSN allocation + shipping with shipMu

Concurrent WriteLBA/Trim calls could deliver WAL entries to replicas
out of LSN order: two goroutines allocate LSN 4 and 5 concurrently,
but LSN 5 could reach the replica first via ShipAll, causing the
replica to reject it as an LSN gap.

shipMu now wraps nextLSN.Add + wal.Append + ShipAll in both
WriteLBA and Trim, guaranteeing LSN-ordered delivery to replicas
under concurrent writers.

The dirty map update and WAL pressure check happen after shipMu
is released — they don't need ordering guarantees.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
Ping Qiu 7 days ago
parent
commit
9e481a83e9
  1. 16
      weed/storage/blockvol/blockvol.go

16
weed/storage/blockvol/blockvol.go

@ -37,6 +37,7 @@ type BlockVol struct {
mu sync.RWMutex
ioMu sync.RWMutex // guards local data mutation (WAL/dirtyMap/extent); Lock for restore/import/expand
superMu sync.Mutex // serializes superblock mutation + persist (group commit vs flusher)
shipMu sync.Mutex // serializes ShipAll calls to guarantee LSN-order delivery
fd *os.File
path string
super Superblock
@ -406,6 +407,8 @@ func (v *BlockVol) appendWithRetry(entry *WALEntry) (uint64, error) {
}
// Ship to replicas if configured (fire-and-forget).
// LSN-order delivery is guaranteed by shipMu held from LSN allocation
// through shipping in WriteLBA/Trim.
if v.shipperGroup != nil {
v.shipperGroup.ShipAll(entry)
}
@ -442,6 +445,10 @@ func (v *BlockVol) WriteLBA(lba uint64, data []byte) error {
defer v.walAdmission.Release()
}
// shipMu serializes LSN allocation + WAL append + Ship to guarantee
// LSN-order delivery to replicas under concurrent writes.
v.shipMu.Lock()
lsn := v.nextLSN.Add(1) - 1
entry := &WALEntry{
LSN: lsn,
@ -454,9 +461,12 @@ func (v *BlockVol) WriteLBA(lba uint64, data []byte) error {
walOff, err := v.appendWithRetry(entry)
if err != nil {
v.shipMu.Unlock()
return err
}
v.shipMu.Unlock()
// Check WAL pressure and notify flusher if threshold exceeded.
if v.wal.UsedFraction() >= v.config.WALPressureThreshold {
v.flusher.NotifyUrgent()
@ -645,6 +655,9 @@ func (v *BlockVol) Trim(lba uint64, length uint32) error {
defer v.walAdmission.Release()
}
// shipMu serializes LSN allocation + WAL append + Ship (same as WriteLBA).
v.shipMu.Lock()
lsn := v.nextLSN.Add(1) - 1
entry := &WALEntry{
LSN: lsn,
@ -656,9 +669,12 @@ func (v *BlockVol) Trim(lba uint64, length uint32) error {
walOff, err := v.appendWithRetry(entry)
if err != nil {
v.shipMu.Unlock()
return err
}
v.shipMu.Unlock()
// Update dirty map: mark each trimmed block so the flusher sees it.
// readOneBlock checks entry type and returns zeros for TRIM entries.
blocks := length / v.super.BlockSize

Loading…
Cancel
Save