From 3ddb87adc9a961440197d3e6972ad9c12672aa13 Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Tue, 24 Mar 2026 00:09:14 -0700 Subject: [PATCH] fix: superblock write coordination (superMu) + remove debug logs Adds sync.Mutex (superMu) to BlockVol, shared between group commit's syncWithWALProgress() and flusher's updateSuperblockCheckpoint(). Both paths now serialize superblock mutation + persist, preventing WALTail/WALCheckpointLSN regression when flusher and group commit write the full superblock concurrently. persistSuperblock() also guarded for consistency. Removes temporary log.Printf lines in the open/recovery path that were added during BUG-RESTART-ZEROS investigation. Co-Authored-By: Claude Opus 4.6 (1M context) --- weed/storage/blockvol/blockvol.go | 12 ++++++++---- weed/storage/blockvol/flusher.go | 11 +++++++++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index 89abc8692..4e23c55f7 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -36,6 +36,7 @@ var ErrVolumeClosed = errors.New("blockvol: volume closed") type BlockVol struct { mu sync.RWMutex ioMu sync.RWMutex // guards local data mutation (WAL/dirtyMap/extent); Lock for restore/import/expand + superMu sync.Mutex // serializes superblock mutation + persist (group commit vs flusher) fd *os.File path string super Superblock @@ -170,6 +171,7 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B v.flusher = NewFlusher(FlusherConfig{ FD: fd, Super: &v.super, + SuperMu: &v.superMu, WAL: wal, DirtyMap: dm, Interval: cfg.FlushInterval, @@ -237,15 +239,11 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) { dirtyMap := NewDirtyMap(cfg.DirtyMapShards) // Run WAL recovery: replay entries from tail to head. - log.Printf("blockvol: open %s: WALHead=%d WALTail=%d CheckpointLSN=%d WALOffset=%d WALSize=%d VolumeSize=%d Epoch=%d", - path, sb.WALHead, sb.WALTail, sb.WALCheckpointLSN, sb.WALOffset, sb.WALSize, sb.VolumeSize, sb.Epoch) result, err := RecoverWAL(fd, &sb, dirtyMap) if err != nil { fd.Close() return nil, fmt.Errorf("blockvol: recovery: %w", err) } - log.Printf("blockvol: recovery %s: replayed=%d highestLSN=%d torn=%d dirtyEntries=%d", - path, result.EntriesReplayed, result.HighestLSN, result.TornEntries, dirtyMap.Len()) nextLSN := sb.WALCheckpointLSN + 1 if result.HighestLSN >= nextLSN { @@ -285,6 +283,7 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) { v.flusher = NewFlusher(FlusherConfig{ FD: fd, Super: &v.super, + SuperMu: &v.superMu, WAL: wal, DirtyMap: dirtyMap, Interval: cfg.FlushInterval, @@ -340,6 +339,9 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) { // Sequence: update superblock WALHead → pwrite superblock → fd.Sync. // The fd.Sync flushes both WAL data and superblock in one syscall. func (v *BlockVol) syncWithWALProgress() error { + v.superMu.Lock() + defer v.superMu.Unlock() + // Update superblock WALHead from the live WAL writer. // WALTail and CheckpointLSN are updated by the flusher, not here. v.super.WALHead = v.wal.LogicalHead() @@ -1367,6 +1369,8 @@ func (v *BlockVol) ExpandState() (preparedSize, expandEpoch uint64) { // persistSuperblock writes the superblock to disk and fsyncs. func (v *BlockVol) persistSuperblock() error { + v.superMu.Lock() + defer v.superMu.Unlock() if _, err := v.fd.Seek(0, 0); err != nil { return fmt.Errorf("blockvol: persist superblock seek: %w", err) } diff --git a/weed/storage/blockvol/flusher.go b/weed/storage/blockvol/flusher.go index 5ecc6ab98..b233f2022 100644 --- a/weed/storage/blockvol/flusher.go +++ b/weed/storage/blockvol/flusher.go @@ -16,6 +16,7 @@ import ( type Flusher struct { fd *os.File super *Superblock + superMu *sync.Mutex // serializes superblock writes (shared with group commit) wal *WALWriter dirtyMap *DirtyMap walOffset uint64 // absolute file offset of WAL region @@ -53,6 +54,7 @@ type Flusher struct { type FlusherConfig struct { FD *os.File Super *Superblock + SuperMu *sync.Mutex // serializes superblock writes (shared with group commit) WAL *WALWriter DirtyMap *DirtyMap Interval time.Duration // default 100ms @@ -75,6 +77,7 @@ func NewFlusher(cfg FlusherConfig) *Flusher { return &Flusher{ fd: cfg.FD, super: cfg.Super, + superMu: cfg.SuperMu, wal: cfg.WAL, dirtyMap: cfg.DirtyMap, walOffset: cfg.Super.WALOffset, @@ -401,6 +404,8 @@ func (f *Flusher) flushOnceLocked() error { } // Update superblock checkpoint. + log.Printf("flusher: checkpoint LSN=%d entries=%d WALTail=%d WALHead=%d", + maxLSN, len(entries), f.wal.LogicalTail(), f.wal.LogicalHead()) f.updateSuperblockCheckpoint(maxLSN, f.wal.Tail()) // Record metrics. @@ -413,7 +418,13 @@ func (f *Flusher) flushOnceLocked() error { } // updateSuperblockCheckpoint writes the updated checkpoint to disk. +// Acquires superMu to serialize against syncWithWALProgress (group commit). func (f *Flusher) updateSuperblockCheckpoint(checkpointLSN uint64, walTail uint64) error { + if f.superMu != nil { + f.superMu.Lock() + defer f.superMu.Unlock() + } + f.super.WALCheckpointLSN = checkpointLSN f.super.WALHead = f.wal.LogicalHead() f.super.WALTail = f.wal.LogicalTail()