Browse Source

fix: ioMu data-plane exclusion for restore/import/expand

Adds sync.RWMutex (ioMu) to BlockVol enforcing mutual exclusion
between normal I/O and destructive state operations.

Shared (RLock): WriteLBA, ReadLBA, Trim, SyncCache, replica
applyEntry, rebuild applyRebuildEntry — concurrent I/O safe.

Exclusive (Lock): RestoreSnapshot, ImportSnapshot, Expand,
PrepareExpand, CommitExpand, CancelExpand — drains all in-flight
I/O before modifying extent/WAL/dirtyMap.

Scope rule: RLock covers local data-structure mutation only.
Replication shipping is asynchronous and outside the lock, so
exclusive holders block only behind local I/O, not network stalls.

Lock ordering: ioMu > snapMu > assignMu > mu.

Closes the critical ER item: restore/import vs concurrent WriteLBA
silent data corruption gap.

3 new tests: concurrent writes allowed, real restore-vs-write
contention with data integrity check, close coordination.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
e92263b4f4
  1. 64
      weed/storage/blockvol/blockvol.go
  2. 157
      weed/storage/blockvol/blockvol_test.go
  3. 4
      weed/storage/blockvol/rebuild.go
  4. 4
      weed/storage/blockvol/replica_apply.go
  5. 4
      weed/storage/blockvol/snapshot_export.go

64
weed/storage/blockvol/blockvol.go

@ -4,6 +4,7 @@
package blockvol
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
@ -34,6 +35,7 @@ var ErrVolumeClosed = errors.New("blockvol: volume closed")
// BlockVol is the core block volume engine.
type BlockVol struct {
mu sync.RWMutex
ioMu sync.RWMutex // guards local data mutation (WAL/dirtyMap/extent); Lock for restore/import/expand
fd *os.File
path string
super Superblock
@ -150,7 +152,7 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B
v.nextLSN.Store(1)
v.healthy.Store(true)
v.groupCommit = NewGroupCommitter(GroupCommitterConfig{
SyncFunc: fd.Sync,
SyncFunc: v.syncWithWALProgress,
MaxDelay: cfg.GroupCommitMaxDelay,
MaxBatch: cfg.GroupCommitMaxBatch,
LowWatermark: cfg.GroupCommitLowWatermark,
@ -235,11 +237,15 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
dirtyMap := NewDirtyMap(cfg.DirtyMapShards)
// Run WAL recovery: replay entries from tail to head.
log.Printf("blockvol: open %s: WALHead=%d WALTail=%d CheckpointLSN=%d WALOffset=%d WALSize=%d VolumeSize=%d Epoch=%d",
path, sb.WALHead, sb.WALTail, sb.WALCheckpointLSN, sb.WALOffset, sb.WALSize, sb.VolumeSize, sb.Epoch)
result, err := RecoverWAL(fd, &sb, dirtyMap)
if err != nil {
fd.Close()
return nil, fmt.Errorf("blockvol: recovery: %w", err)
}
log.Printf("blockvol: recovery %s: replayed=%d highestLSN=%d torn=%d dirtyEntries=%d",
path, result.EntriesReplayed, result.HighestLSN, result.TornEntries, dirtyMap.Len())
nextLSN := sb.WALCheckpointLSN + 1
if result.HighestLSN >= nextLSN {
@ -262,7 +268,7 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
v.epoch.Store(sb.Epoch)
v.healthy.Store(true)
v.groupCommit = NewGroupCommitter(GroupCommitterConfig{
SyncFunc: fd.Sync,
SyncFunc: v.syncWithWALProgress,
MaxDelay: cfg.GroupCommitMaxDelay,
MaxBatch: cfg.GroupCommitMaxBatch,
LowWatermark: cfg.GroupCommitLowWatermark,
@ -323,6 +329,34 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
// beginOp increments the in-flight ops counter. Returns ErrVolumeClosed if
// the volume is already closed, so callers must not proceed.
// syncWithWALProgress persists WAL head pointer in the superblock as part of
// every GroupCommit fsync. This ensures crash recovery can always find WAL
// entries that were durable at the time of the sync.
//
// Without this, a crash before the first flusher checkpoint leaves
// WALHead=0 in the superblock, making recovery skip all WAL entries
// (BUG-RESTART-ZEROS).
//
// Sequence: update superblock WALHead → pwrite superblock → fd.Sync.
// The fd.Sync flushes both WAL data and superblock in one syscall.
func (v *BlockVol) syncWithWALProgress() error {
// Update superblock WALHead from the live WAL writer.
// WALTail and CheckpointLSN are updated by the flusher, not here.
v.super.WALHead = v.wal.LogicalHead()
// Rewrite the full superblock at offset 0.
var buf bytes.Buffer
if _, err := v.super.WriteTo(&buf); err != nil {
return fmt.Errorf("blockvol: serialize superblock for WAL progress: %w", err)
}
if _, err := v.fd.WriteAt(buf.Bytes(), 0); err != nil {
return fmt.Errorf("blockvol: persist WAL progress: %w", err)
}
// Single fsync covers both WAL data and superblock.
return v.fd.Sync()
}
func (v *BlockVol) beginOp() error {
v.opsOutstanding.Add(1)
if v.closed.Load() {
@ -391,6 +425,8 @@ func (v *BlockVol) WriteLBA(lba uint64, data []byte) error {
return err
}
defer v.endOp()
v.ioMu.RLock()
defer v.ioMu.RUnlock()
if err := v.writeGate(); err != nil {
return err
}
@ -443,6 +479,8 @@ func (v *BlockVol) ReadLBA(lba uint64, length uint32) ([]byte, error) {
return nil, err
}
defer v.endOp()
v.ioMu.RLock()
defer v.ioMu.RUnlock()
if err := ValidateWrite(lba, length, v.super.VolumeSize, v.super.BlockSize); err != nil {
return nil, err
}
@ -590,6 +628,8 @@ func (v *BlockVol) Trim(lba uint64, length uint32) error {
return err
}
defer v.endOp()
v.ioMu.RLock()
defer v.ioMu.RUnlock()
if err := v.writeGate(); err != nil {
return err
}
@ -689,6 +729,8 @@ func (v *BlockVol) SyncCache() error {
return err
}
defer v.endOp()
v.ioMu.RLock()
defer v.ioMu.RUnlock()
return v.groupCommit.Submit()
}
@ -718,7 +760,7 @@ func (v *BlockVol) SetReplicaAddrs(addrs []ReplicaAddr) {
// Replace the group committer's sync function with a distributed version.
v.groupCommit.Stop()
v.groupCommit = NewGroupCommitter(GroupCommitterConfig{
SyncFunc: MakeDistributedSync(v.fd.Sync, v.shipperGroup, v),
SyncFunc: MakeDistributedSync(v.syncWithWALProgress, v.shipperGroup, v),
MaxDelay: v.config.GroupCommitMaxDelay,
MaxBatch: v.config.GroupCommitMaxBatch,
LowWatermark: v.config.GroupCommitLowWatermark,
@ -1072,6 +1114,10 @@ func (v *BlockVol) RestoreSnapshot(id uint32) error {
}
defer v.endOp()
// Exclusive lock: drains all in-flight I/O before modifying extent/WAL/dirtyMap.
v.ioMu.Lock()
defer v.ioMu.Unlock()
v.snapMu.RLock()
snap, ok := v.snapshots[id]
v.snapMu.RUnlock()
@ -1203,6 +1249,9 @@ func (v *BlockVol) Expand(newSize uint64) error {
return err
}
defer v.endOp()
// Exclusive ioMu: drain I/O before file growth + VolumeSize mutation.
v.ioMu.Lock()
defer v.ioMu.Unlock()
if err := v.writeGate(); err != nil {
return err
}
@ -1231,6 +1280,10 @@ func (v *BlockVol) PrepareExpand(newSize, expandEpoch uint64) error {
}
defer v.endOp()
// Exclusive ioMu: drain I/O before file growth.
v.ioMu.Lock()
defer v.ioMu.Unlock()
v.mu.Lock()
defer v.mu.Unlock()
@ -1262,6 +1315,8 @@ func (v *BlockVol) CommitExpand(expandEpoch uint64) error {
return err
}
defer v.endOp()
v.ioMu.Lock()
defer v.ioMu.Unlock()
v.mu.Lock()
defer v.mu.Unlock()
@ -1288,6 +1343,9 @@ func (v *BlockVol) CancelExpand(expandEpoch uint64) error {
}
defer v.endOp()
v.ioMu.Lock()
defer v.ioMu.Unlock()
v.mu.Lock()
defer v.mu.Unlock()

157
weed/storage/blockvol/blockvol_test.go

@ -70,6 +70,10 @@ func TestBlockVol(t *testing.T) {
{name: "close_during_sync_cache", run: testCloseDuringSyncCache},
// Review finding: Close timeout if op stuck.
{name: "close_timeout_if_op_stuck", run: testCloseTimeoutIfOpStuck},
// ER Fix 1: ioMu restore/import guard.
{name: "iomu_concurrent_writes_allowed", run: testIoMuConcurrentWritesAllowed},
{name: "iomu_restore_blocks_writes", run: testIoMuRestoreBlocksWrites},
{name: "iomu_close_with_iomu", run: testIoMuCloseCoordinates},
// Phase 4A CP1: Epoch tests.
{name: "epoch_persist_roundtrip", run: testEpochPersistRoundtrip},
{name: "epoch_in_wal_entry", run: testEpochInWALEntry},
@ -4928,6 +4932,159 @@ func testStatusStaleNoLease(t *testing.T) {
}
}
// --- ER Fix 1: ioMu tests ---
// testIoMuConcurrentWritesAllowed verifies that multiple concurrent WriteLBA
// calls succeed (ioMu.RLock is shared).
func testIoMuConcurrentWritesAllowed(t *testing.T) {
v := createTestVol(t)
defer v.Close()
v.SetRole(RolePrimary)
v.SetEpoch(1)
v.SetMasterEpoch(1)
v.lease.Grant(10 * time.Second)
const goroutines = 8
const writes = 50
var wg sync.WaitGroup
var errCount atomic.Int64
for g := 0; g < goroutines; g++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for w := 0; w < writes; w++ {
lba := uint64((id*writes + w) % 256) // within 1MB volume
data := makeBlock(byte(id))
if err := v.WriteLBA(lba, data); err != nil {
errCount.Add(1)
}
}
}(g)
}
wg.Wait()
if errCount.Load() > 0 {
t.Fatalf("concurrent writes had %d errors", errCount.Load())
}
}
// testIoMuRestoreBlocksWrites runs a real RestoreSnapshot under write contention.
// Concurrent writers run before and during restore. After restore completes,
// verify the volume reflects snapshot state (not corrupted by concurrent writes).
func testIoMuRestoreBlocksWrites(t *testing.T) {
v := createTestVol(t)
defer v.Close()
v.SetRole(RolePrimary)
v.SetEpoch(1)
v.SetMasterEpoch(1)
v.lease.Grant(30 * time.Second)
// Write known data to LBA 0.
if err := v.WriteLBA(0, makeBlock('A')); err != nil {
t.Fatalf("WriteLBA(A): %v", err)
}
if err := v.SyncCache(); err != nil {
t.Fatalf("SyncCache: %v", err)
}
// Create snapshot capturing 'A' at LBA 0.
snapID := uint32(1)
if err := v.CreateSnapshot(snapID); err != nil {
t.Fatalf("CreateSnapshot: %v", err)
}
// Overwrite LBA 0 with 'B' after snapshot.
if err := v.WriteLBA(0, makeBlock('B')); err != nil {
t.Fatalf("WriteLBA(B): %v", err)
}
if err := v.SyncCache(); err != nil {
t.Fatalf("SyncCache: %v", err)
}
// Start concurrent writers on LBAs 1-10 while restore runs.
var writerWg sync.WaitGroup
stopWriters := make(chan struct{})
var writeCount atomic.Int64
for g := 0; g < 4; g++ {
writerWg.Add(1)
go func(id int) {
defer writerWg.Done()
lba := uint64(1 + id)
for {
select {
case <-stopWriters:
return
default:
}
if err := v.WriteLBA(lba, makeBlock(byte('W'+id))); err != nil {
return // volume closed or restore draining — expected
}
writeCount.Add(1)
}
}(g)
}
// Let writers run for a bit, then restore concurrently.
time.Sleep(5 * time.Millisecond)
// Restore in the main goroutine — this acquires ioMu.Lock(),
// draining all in-flight writers before modifying extent/WAL/dirty.
if err := v.RestoreSnapshot(snapID); err != nil {
close(stopWriters)
writerWg.Wait()
t.Fatalf("RestoreSnapshot: %v", err)
}
close(stopWriters)
writerWg.Wait()
// Verify: LBA 0 must be 'A' (snapshot data), not 'B' (post-snapshot write).
got, err := v.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA after restore: %v", err)
}
if got[0] != 'A' {
t.Fatalf("LBA 0: expected 'A' after restore, got %c — restore/write race", got[0])
}
// Sanity: writers did run (not zero iterations).
if writeCount.Load() == 0 {
t.Log("warning: concurrent writers did not execute any iterations")
}
}
// testIoMuCloseCoordinates verifies that Close still works correctly
// with the ioMu in the struct (no deadlock).
func testIoMuCloseCoordinates(t *testing.T) {
v := createTestVol(t)
v.SetRole(RolePrimary)
v.SetEpoch(1)
v.SetMasterEpoch(1)
v.lease.Grant(10 * time.Second)
// Write some data.
if err := v.WriteLBA(0, makeBlock('X')); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
// Close should complete without deadlock.
done := make(chan error, 1)
go func() {
done <- v.Close()
}()
select {
case err := <-done:
if err != nil {
t.Fatalf("Close: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatal("Close deadlocked")
}
}
// Suppress unused import warnings.
var _ = fmt.Sprintf
var _ io.Reader

4
weed/storage/blockvol/rebuild.go

@ -429,6 +429,10 @@ func applyRebuildEntry(vol *BlockVol, payload []byte) error {
return fmt.Errorf("decode: %w", err)
}
// ioMu.RLock: protect WAL/dirtyMap mutation against exclusive restore/import.
vol.ioMu.RLock()
defer vol.ioMu.RUnlock()
walOff, err := vol.wal.Append(&entry)
if errors.Is(err, ErrWALFull) {
// Trigger flusher and retry.

4
weed/storage/blockvol/replica_apply.go

@ -194,6 +194,10 @@ func (r *ReplicaReceiver) applyEntry(payload []byte) error {
return fmt.Errorf("decode WAL entry: %w", err)
}
// ioMu.RLock: protect WAL/dirtyMap mutation against exclusive restore/import.
r.vol.ioMu.RLock()
defer r.vol.ioMu.RUnlock()
// Validate epoch: replicas must NOT accept epoch bumps from WAL stream.
// Only the master can change epochs (via SetEpoch in CP3).
localEpoch := r.vol.epoch.Load()

4
weed/storage/blockvol/snapshot_export.go

@ -221,6 +221,10 @@ func (v *BlockVol) ImportSnapshot(ctx context.Context, manifest *SnapshotArtifac
}
defer v.endOp()
// Exclusive lock: drains all in-flight I/O before modifying extent/WAL/dirtyMap.
v.ioMu.Lock()
defer v.ioMu.Unlock()
if err := ValidateManifest(manifest); err != nil {
return err
}

Loading…
Cancel
Save