Browse Source

fix: superblock write coordination (superMu) + remove debug logs

Adds sync.Mutex (superMu) to BlockVol, shared between group commit's
syncWithWALProgress() and flusher's updateSuperblockCheckpoint().
Both paths now serialize superblock mutation + persist, preventing
WALTail/WALCheckpointLSN regression when flusher and group commit
write the full superblock concurrently.

persistSuperblock() also guarded for consistency.

Removes temporary log.Printf lines in the open/recovery path that
were added during BUG-RESTART-ZEROS investigation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
3ddb87adc9
  1. 12
      weed/storage/blockvol/blockvol.go
  2. 11
      weed/storage/blockvol/flusher.go

12
weed/storage/blockvol/blockvol.go

@ -36,6 +36,7 @@ var ErrVolumeClosed = errors.New("blockvol: volume closed")
type BlockVol struct {
mu sync.RWMutex
ioMu sync.RWMutex // guards local data mutation (WAL/dirtyMap/extent); Lock for restore/import/expand
superMu sync.Mutex // serializes superblock mutation + persist (group commit vs flusher)
fd *os.File
path string
super Superblock
@ -170,6 +171,7 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B
v.flusher = NewFlusher(FlusherConfig{
FD: fd,
Super: &v.super,
SuperMu: &v.superMu,
WAL: wal,
DirtyMap: dm,
Interval: cfg.FlushInterval,
@ -237,15 +239,11 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
dirtyMap := NewDirtyMap(cfg.DirtyMapShards)
// Run WAL recovery: replay entries from tail to head.
log.Printf("blockvol: open %s: WALHead=%d WALTail=%d CheckpointLSN=%d WALOffset=%d WALSize=%d VolumeSize=%d Epoch=%d",
path, sb.WALHead, sb.WALTail, sb.WALCheckpointLSN, sb.WALOffset, sb.WALSize, sb.VolumeSize, sb.Epoch)
result, err := RecoverWAL(fd, &sb, dirtyMap)
if err != nil {
fd.Close()
return nil, fmt.Errorf("blockvol: recovery: %w", err)
}
log.Printf("blockvol: recovery %s: replayed=%d highestLSN=%d torn=%d dirtyEntries=%d",
path, result.EntriesReplayed, result.HighestLSN, result.TornEntries, dirtyMap.Len())
nextLSN := sb.WALCheckpointLSN + 1
if result.HighestLSN >= nextLSN {
@ -285,6 +283,7 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
v.flusher = NewFlusher(FlusherConfig{
FD: fd,
Super: &v.super,
SuperMu: &v.superMu,
WAL: wal,
DirtyMap: dirtyMap,
Interval: cfg.FlushInterval,
@ -340,6 +339,9 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
// Sequence: update superblock WALHead → pwrite superblock → fd.Sync.
// The fd.Sync flushes both WAL data and superblock in one syscall.
func (v *BlockVol) syncWithWALProgress() error {
v.superMu.Lock()
defer v.superMu.Unlock()
// Update superblock WALHead from the live WAL writer.
// WALTail and CheckpointLSN are updated by the flusher, not here.
v.super.WALHead = v.wal.LogicalHead()
@ -1367,6 +1369,8 @@ func (v *BlockVol) ExpandState() (preparedSize, expandEpoch uint64) {
// persistSuperblock writes the superblock to disk and fsyncs.
func (v *BlockVol) persistSuperblock() error {
v.superMu.Lock()
defer v.superMu.Unlock()
if _, err := v.fd.Seek(0, 0); err != nil {
return fmt.Errorf("blockvol: persist superblock seek: %w", err)
}

11
weed/storage/blockvol/flusher.go

@ -16,6 +16,7 @@ import (
type Flusher struct {
fd *os.File
super *Superblock
superMu *sync.Mutex // serializes superblock writes (shared with group commit)
wal *WALWriter
dirtyMap *DirtyMap
walOffset uint64 // absolute file offset of WAL region
@ -53,6 +54,7 @@ type Flusher struct {
type FlusherConfig struct {
FD *os.File
Super *Superblock
SuperMu *sync.Mutex // serializes superblock writes (shared with group commit)
WAL *WALWriter
DirtyMap *DirtyMap
Interval time.Duration // default 100ms
@ -75,6 +77,7 @@ func NewFlusher(cfg FlusherConfig) *Flusher {
return &Flusher{
fd: cfg.FD,
super: cfg.Super,
superMu: cfg.SuperMu,
wal: cfg.WAL,
dirtyMap: cfg.DirtyMap,
walOffset: cfg.Super.WALOffset,
@ -401,6 +404,8 @@ func (f *Flusher) flushOnceLocked() error {
}
// Update superblock checkpoint.
log.Printf("flusher: checkpoint LSN=%d entries=%d WALTail=%d WALHead=%d",
maxLSN, len(entries), f.wal.LogicalTail(), f.wal.LogicalHead())
f.updateSuperblockCheckpoint(maxLSN, f.wal.Tail())
// Record metrics.
@ -413,7 +418,13 @@ func (f *Flusher) flushOnceLocked() error {
}
// updateSuperblockCheckpoint writes the updated checkpoint to disk.
// Acquires superMu to serialize against syncWithWALProgress (group commit).
func (f *Flusher) updateSuperblockCheckpoint(checkpointLSN uint64, walTail uint64) error {
if f.superMu != nil {
f.superMu.Lock()
defer f.superMu.Unlock()
}
f.super.WALCheckpointLSN = checkpointLSN
f.super.WALHead = f.wal.LogicalHead()
f.super.WALTail = f.wal.LogicalTail()

Loading…
Cancel
Save