From a107685f00f5c577d97f4267e113ed4c89f7e91d Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Sun, 1 Mar 2026 16:00:06 -0800 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=204A=20CP1=20=E2=80=94=20epoch,?= =?UTF-8?q?=20lease,=20role=20state=20machine,=20write=20gate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Local fencing primitives for block volumes. Every write path validates role + epoch + lease before accepting data. RoleNone (default) skips all checks for Phase 3 backward compatibility. New files: epoch.go, lease.go, role.go, write_gate.go Modified: superblock.go (Epoch field), blockvol.go (fencing fields, writeGate in WriteLBA/Trim), group_commit.go (PostSyncCheck/Gotcha A), dirty_map.go (P3-BUG-9 power-of-2 panic) Bug fixes: BUG-4A-1 (atomic epoch), BUG-4A-2 (CAS SetRole), BUG-4A-3 (mutex SetEpoch), BUG-4A-4 (single role.Load), BUG-4A-6 (safeCallback recover) 837 tests (557 engine + 280 iSCSI), all passing. Co-Authored-By: Claude Opus 4.6 --- weed/storage/blockvol/blockvol.go | 40 +- weed/storage/blockvol/blockvol_test.go | 434 ++++++ weed/storage/blockvol/dirty_map.go | 3 + weed/storage/blockvol/epoch.go | 38 + weed/storage/blockvol/group_commit.go | 42 +- weed/storage/blockvol/lease.go | 31 + .../storage/blockvol/qa_phase3_engine_test.go | 27 +- weed/storage/blockvol/qa_phase4a_cp1_test.go | 1198 +++++++++++++++++ weed/storage/blockvol/role.go | 104 ++ weed/storage/blockvol/superblock.go | 7 + weed/storage/blockvol/write_gate.go | 28 + 11 files changed, 1903 insertions(+), 49 deletions(-) create mode 100644 weed/storage/blockvol/epoch.go create mode 100644 weed/storage/blockvol/lease.go create mode 100644 weed/storage/blockvol/qa_phase4a_cp1_test.go create mode 100644 weed/storage/blockvol/role.go create mode 100644 weed/storage/blockvol/write_gate.go diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index 524fdc3ff..f674035e3 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -41,6 +41,13 @@ type BlockVol struct { closed atomic.Bool opsOutstanding atomic.Int64 // in-flight Read/Write/Trim/SyncCache ops opsDrained chan struct{} + + // Fencing fields (Phase 4A). + epoch atomic.Uint64 // current persisted epoch + masterEpoch atomic.Uint64 // expected epoch from master + lease Lease + role atomic.Uint32 + roleCallback RoleChangeCallback } // CreateBlockVol creates a new block volume file at path. @@ -105,11 +112,12 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B v.nextLSN.Store(1) v.healthy.Store(true) v.groupCommit = NewGroupCommitter(GroupCommitterConfig{ - SyncFunc: fd.Sync, - MaxDelay: cfg.GroupCommitMaxDelay, - MaxBatch: cfg.GroupCommitMaxBatch, - LowWatermark: cfg.GroupCommitLowWatermark, - OnDegraded: func() { v.healthy.Store(false) }, + SyncFunc: fd.Sync, + MaxDelay: cfg.GroupCommitMaxDelay, + MaxBatch: cfg.GroupCommitMaxBatch, + LowWatermark: cfg.GroupCommitLowWatermark, + OnDegraded: func() { v.healthy.Store(false) }, + PostSyncCheck: v.writeGate, }) go v.groupCommit.Run() v.flusher = NewFlusher(FlusherConfig{ @@ -174,13 +182,15 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) { opsDrained: make(chan struct{}, 1), } v.nextLSN.Store(nextLSN) + v.epoch.Store(sb.Epoch) v.healthy.Store(true) v.groupCommit = NewGroupCommitter(GroupCommitterConfig{ - SyncFunc: fd.Sync, - MaxDelay: cfg.GroupCommitMaxDelay, - MaxBatch: cfg.GroupCommitMaxBatch, - LowWatermark: cfg.GroupCommitLowWatermark, - OnDegraded: func() { v.healthy.Store(false) }, + SyncFunc: fd.Sync, + MaxDelay: cfg.GroupCommitMaxDelay, + MaxBatch: cfg.GroupCommitMaxBatch, + LowWatermark: cfg.GroupCommitLowWatermark, + OnDegraded: func() { v.healthy.Store(false) }, + PostSyncCheck: v.writeGate, }) go v.groupCommit.Run() v.flusher = NewFlusher(FlusherConfig{ @@ -253,6 +263,9 @@ func (v *BlockVol) WriteLBA(lba uint64, data []byte) error { return err } defer v.endOp() + if err := v.writeGate(); err != nil { + return err + } if err := ValidateWrite(lba, uint32(len(data)), v.super.VolumeSize, v.super.BlockSize); err != nil { return err } @@ -260,7 +273,7 @@ func (v *BlockVol) WriteLBA(lba uint64, data []byte) error { lsn := v.nextLSN.Add(1) - 1 entry := &WALEntry{ LSN: lsn, - Epoch: 0, // Phase 1: no fencing + Epoch: v.epoch.Load(), Type: EntryTypeWrite, LBA: lba, Length: uint32(len(data)), @@ -426,6 +439,9 @@ func (v *BlockVol) Trim(lba uint64, length uint32) error { return err } defer v.endOp() + if err := v.writeGate(); err != nil { + return err + } if err := ValidateWrite(lba, length, v.super.VolumeSize, v.super.BlockSize); err != nil { return err } @@ -433,7 +449,7 @@ func (v *BlockVol) Trim(lba uint64, length uint32) error { lsn := v.nextLSN.Add(1) - 1 entry := &WALEntry{ LSN: lsn, - Epoch: 0, + Epoch: v.epoch.Load(), Type: EntryTypeTrim, LBA: lba, Length: length, diff --git a/weed/storage/blockvol/blockvol_test.go b/weed/storage/blockvol/blockvol_test.go index acbac3c3e..7e887324e 100644 --- a/weed/storage/blockvol/blockvol_test.go +++ b/weed/storage/blockvol/blockvol_test.go @@ -67,6 +67,29 @@ func TestBlockVol(t *testing.T) { {name: "close_during_sync_cache", run: testCloseDuringSyncCache}, // Review finding: Close timeout if op stuck. {name: "close_timeout_if_op_stuck", run: testCloseTimeoutIfOpStuck}, + // Phase 4A CP1: Epoch tests. + {name: "epoch_persist_roundtrip", run: testEpochPersistRoundtrip}, + {name: "epoch_in_wal_entry", run: testEpochInWALEntry}, + {name: "epoch_survives_recovery", run: testEpochSurvivesRecovery}, + // Phase 4A CP1: Lease tests. + {name: "lease_grant_valid", run: testLeaseGrantValid}, + {name: "lease_expired_rejects", run: testLeaseExpiredRejects}, + {name: "lease_revoke", run: testLeaseRevoke}, + // Phase 4A CP1: Role tests. + {name: "role_transitions_valid", run: testRoleTransitionsValid}, + {name: "role_transitions_invalid", run: testRoleTransitionsInvalid}, + {name: "role_primary_callback", run: testRolePrimaryCallback}, + {name: "role_stale_callback", run: testRoleStaleCallback}, + // Phase 4A CP1: Write gate tests. + {name: "gate_primary_ok", run: testGatePrimaryOK}, + {name: "gate_not_primary", run: testGateNotPrimary}, + {name: "gate_stale_epoch", run: testGateStaleEpoch}, + {name: "gate_lease_expired", run: testGateLeaseExpired}, + {name: "gate_trim_rejected", run: testGateTrimRejected}, + {name: "blockvol_write_gate_integration", run: testBlockvolWriteGateIntegration}, + {name: "blockvol_gotcha_a_lease_expired", run: testBlockvolGotchaALeaseExpired}, + // Phase 4A CP1: P3-BUG-9 dirty map. + {name: "dirty_map_power_of_2_panics", run: testDirtyMapPowerOf2Panics}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1678,3 +1701,414 @@ func testCloseTimeoutIfOpStuck(t *testing.T) { t.Fatal("Close hung for >10s — drain timeout not working") } } + +// --- Phase 4A CP1: Epoch tests --- + +func testEpochPersistRoundtrip(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "epoch.blockvol") + v, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + }) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + + if v.Epoch() != 0 { + t.Fatalf("initial epoch = %d, want 0", v.Epoch()) + } + + if err := v.SetEpoch(42); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + if v.Epoch() != 42 { + t.Fatalf("epoch after set = %d, want 42", v.Epoch()) + } + v.Close() + + // Reopen and verify epoch persisted. + v2, err := OpenBlockVol(path) + if err != nil { + t.Fatalf("OpenBlockVol: %v", err) + } + defer v2.Close() + if v2.Epoch() != 42 { + t.Fatalf("epoch after reopen = %d, want 42", v2.Epoch()) + } +} + +func testEpochInWALEntry(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "epoch-wal.blockvol") + v, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + }) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer v.Close() + + if err := v.SetEpoch(7); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + v.SetMasterEpoch(7) + v.SetRoleCallback(nil) + if err := v.SetRole(RolePrimary); err != nil { + t.Fatalf("SetRole: %v", err) + } + v.lease.Grant(10 * time.Second) + + if err := v.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Read WAL entry header directly and check epoch field. + headerBuf := make([]byte, walEntryHeaderSize) + absOff := int64(v.super.WALOffset) // first entry at WAL start + if _, err := v.fd.ReadAt(headerBuf, absOff); err != nil { + t.Fatalf("ReadAt WAL: %v", err) + } + entryEpoch := binary.LittleEndian.Uint64(headerBuf[8:16]) + if entryEpoch != 7 { + t.Fatalf("WAL entry epoch = %d, want 7", entryEpoch) + } +} + +func testEpochSurvivesRecovery(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "epoch-recov.blockvol") + v, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + }) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + + if err := v.SetEpoch(100); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + v.SetMasterEpoch(100) + if err := v.SetRole(RolePrimary); err != nil { + t.Fatalf("SetRole: %v", err) + } + v.lease.Grant(10 * time.Second) + + if err := v.WriteLBA(0, makeBlock('R')); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Close properly (flushes WAL to extent). + if err := v.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + // Reopen and verify epoch persisted. + v2, err := OpenBlockVol(path) + if err != nil { + t.Fatalf("OpenBlockVol: %v", err) + } + defer v2.Close() + + if v2.Epoch() != 100 { + t.Fatalf("epoch after recovery = %d, want 100", v2.Epoch()) + } + + data, err := v2.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA: %v", err) + } + if !bytes.Equal(data, makeBlock('R')) { + t.Fatal("data mismatch after recovery") + } +} + +// --- Phase 4A CP1: Lease tests --- + +func testLeaseGrantValid(t *testing.T) { + var l Lease + if l.IsValid() { + t.Fatal("zero-value lease should be invalid") + } + l.Grant(1 * time.Second) + if !l.IsValid() { + t.Fatal("lease should be valid after grant") + } +} + +func testLeaseExpiredRejects(t *testing.T) { + var l Lease + l.Grant(1 * time.Millisecond) + time.Sleep(5 * time.Millisecond) + if l.IsValid() { + t.Fatal("lease should have expired") + } +} + +func testLeaseRevoke(t *testing.T) { + var l Lease + l.Grant(1 * time.Hour) + if !l.IsValid() { + t.Fatal("lease should be valid") + } + l.Revoke() + if l.IsValid() { + t.Fatal("lease should be invalid after revoke") + } +} + +// --- Phase 4A CP1: Role tests --- + +func testRoleTransitionsValid(t *testing.T) { + valid := [][2]Role{ + {RoleNone, RolePrimary}, + {RoleNone, RoleReplica}, + {RolePrimary, RoleDraining}, + {RoleDraining, RoleStale}, + {RoleReplica, RolePrimary}, + {RoleStale, RoleRebuilding}, + {RoleStale, RoleReplica}, + {RoleRebuilding, RoleReplica}, + } + for _, pair := range valid { + if !ValidTransition(pair[0], pair[1]) { + t.Errorf("expected valid: %s -> %s", pair[0], pair[1]) + } + } +} + +func testRoleTransitionsInvalid(t *testing.T) { + invalid := [][2]Role{ + {RolePrimary, RoleReplica}, + {RolePrimary, RoleStale}, + {RoleReplica, RoleStale}, + {RoleReplica, RoleDraining}, + {RoleDraining, RolePrimary}, + {RoleRebuilding, RolePrimary}, + {RoleNone, RoleStale}, + {RoleNone, RoleDraining}, + } + for _, pair := range invalid { + if ValidTransition(pair[0], pair[1]) { + t.Errorf("expected invalid: %s -> %s", pair[0], pair[1]) + } + } +} + +func testRolePrimaryCallback(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + var called bool + var gotOld, gotNew Role + v.SetRoleCallback(func(old, new Role) { + called = true + gotOld = old + gotNew = new + }) + + if err := v.SetRole(RolePrimary); err != nil { + t.Fatalf("SetRole(Primary): %v", err) + } + if !called { + t.Fatal("callback not called") + } + if gotOld != RoleNone || gotNew != RolePrimary { + t.Fatalf("callback args: old=%s new=%s, want none->primary", gotOld, gotNew) + } +} + +func testRoleStaleCallback(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + var transitions []string + v.SetRoleCallback(func(old, new Role) { + transitions = append(transitions, old.String()+"->"+new.String()) + }) + + // None -> Primary -> Draining -> Stale + v.SetRole(RolePrimary) + v.SetRole(RoleDraining) + v.SetRole(RoleStale) + + expected := []string{"none->primary", "primary->draining", "draining->stale"} + if len(transitions) != len(expected) { + t.Fatalf("transitions = %v, want %v", transitions, expected) + } + for i, e := range expected { + if transitions[i] != e { + t.Errorf("transition[%d] = %s, want %s", i, transitions[i], e) + } + } +} + +// --- Phase 4A CP1: Write gate tests --- + +func testGatePrimaryOK(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + // Set up as primary with valid epoch + lease. + v.SetRole(RolePrimary) + v.SetEpoch(1) + v.SetMasterEpoch(1) + v.lease.Grant(10 * time.Second) + + if err := v.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatalf("WriteLBA as primary: %v", err) + } +} + +func testGateNotPrimary(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + v.SetRole(RoleReplica) + + err := v.WriteLBA(0, makeBlock('A')) + if !errors.Is(err, ErrNotPrimary) { + t.Fatalf("expected ErrNotPrimary, got: %v", err) + } +} + +func testGateStaleEpoch(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + v.SetRole(RolePrimary) + v.SetEpoch(1) + v.SetMasterEpoch(2) // mismatch + v.lease.Grant(10 * time.Second) + + err := v.WriteLBA(0, makeBlock('A')) + if !errors.Is(err, ErrEpochStale) { + t.Fatalf("expected ErrEpochStale, got: %v", err) + } +} + +func testGateLeaseExpired(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + v.SetRole(RolePrimary) + v.SetEpoch(1) + v.SetMasterEpoch(1) + // No lease granted — should be expired. + + err := v.WriteLBA(0, makeBlock('A')) + if !errors.Is(err, ErrLeaseExpired) { + t.Fatalf("expected ErrLeaseExpired, got: %v", err) + } +} + +func testGateTrimRejected(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + v.SetRole(RoleReplica) + + err := v.Trim(0, 4096) + if !errors.Is(err, ErrNotPrimary) { + t.Fatalf("expected ErrNotPrimary for Trim, got: %v", err) + } +} + +func testBlockvolWriteGateIntegration(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + // RoleNone: writes pass without fencing (Phase 3 compat). + if err := v.WriteLBA(0, makeBlock('Z')); err != nil { + t.Fatalf("WriteLBA with RoleNone: %v", err) + } + + // Switch to primary with proper setup. + v.SetRole(RolePrimary) + v.SetEpoch(5) + v.SetMasterEpoch(5) + v.lease.Grant(10 * time.Second) + + if err := v.WriteLBA(1, makeBlock('P')); err != nil { + t.Fatalf("WriteLBA as primary: %v", err) + } + + // Reads always work regardless of role. + data, err := v.ReadLBA(1, 4096) + if err != nil { + t.Fatalf("ReadLBA: %v", err) + } + if !bytes.Equal(data, makeBlock('P')) { + t.Fatal("data mismatch") + } + + // Demote to draining — writes should fail. + v.SetRole(RoleDraining) + err = v.WriteLBA(2, makeBlock('D')) + if !errors.Is(err, ErrNotPrimary) { + t.Fatalf("WriteLBA as draining: expected ErrNotPrimary, got: %v", err) + } + + // Read still works. + _, err = v.ReadLBA(1, 4096) + if err != nil { + t.Fatalf("ReadLBA after demotion: %v", err) + } +} + +func testBlockvolGotchaALeaseExpired(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "gotcha-a.blockvol") + cfg := DefaultConfig() + cfg.GroupCommitMaxDelay = 1 * time.Millisecond + + v, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer v.Close() + + // Set up as primary. + v.SetRole(RolePrimary) + v.SetEpoch(1) + v.SetMasterEpoch(1) + v.lease.Grant(50 * time.Millisecond) // short lease + + // Write should succeed. + if err := v.WriteLBA(0, makeBlock('G')); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Wait for lease to expire. + time.Sleep(100 * time.Millisecond) + + // SyncCache should fail via PostSyncCheck (Gotcha A). + err = v.SyncCache() + if err == nil { + t.Fatal("SyncCache should fail after lease expired") + } + if !errors.Is(err, ErrLeaseExpired) { + t.Fatalf("expected ErrLeaseExpired from SyncCache, got: %v", err) + } +} + +// --- Phase 4A CP1: P3-BUG-9 --- + +func testDirtyMapPowerOf2Panics(t *testing.T) { + defer func() { + r := recover() + if r == nil { + t.Fatal("expected panic for non-power-of-2 numShards") + } + msg, ok := r.(string) + if !ok || msg != "blockvol: NewDirtyMap numShards must be power of 2" { + t.Fatalf("unexpected panic: %v", r) + } + }() + NewDirtyMap(3) // should panic +} diff --git a/weed/storage/blockvol/dirty_map.go b/weed/storage/blockvol/dirty_map.go index e3d164c09..15a70fed4 100644 --- a/weed/storage/blockvol/dirty_map.go +++ b/weed/storage/blockvol/dirty_map.go @@ -28,6 +28,9 @@ func NewDirtyMap(numShards int) *DirtyMap { if numShards <= 0 { numShards = 1 } + if numShards&(numShards-1) != 0 { + panic("blockvol: NewDirtyMap numShards must be power of 2") + } shards := make([]dirtyShard, numShards) for i := range shards { shards[i].m = make(map[uint64]dirtyEntry) diff --git a/weed/storage/blockvol/epoch.go b/weed/storage/blockvol/epoch.go new file mode 100644 index 000000000..9f2d8846c --- /dev/null +++ b/weed/storage/blockvol/epoch.go @@ -0,0 +1,38 @@ +package blockvol + +import ( + "fmt" + "os" +) + +// Epoch returns the current epoch of this volume. +func (v *BlockVol) Epoch() uint64 { + return v.epoch.Load() +} + +// SetEpoch persists a new epoch to the superblock and fsyncs. +// Must be durable before writes are accepted at the new epoch. +func (v *BlockVol) SetEpoch(epoch uint64) error { + v.mu.Lock() + defer v.mu.Unlock() + + v.super.Epoch = epoch + v.epoch.Store(epoch) + + if _, err := v.fd.Seek(0, os.SEEK_SET); err != nil { + return fmt.Errorf("blockvol: seek superblock: %w", err) + } + if _, err := v.super.WriteTo(v.fd); err != nil { + return fmt.Errorf("blockvol: write superblock: %w", err) + } + if err := v.fd.Sync(); err != nil { + return fmt.Errorf("blockvol: sync superblock: %w", err) + } + return nil +} + +// SetMasterEpoch sets the expected epoch from the master. +// Writes are rejected if v.epoch != v.masterEpoch (when role != RoleNone). +func (v *BlockVol) SetMasterEpoch(epoch uint64) { + v.masterEpoch.Store(epoch) +} diff --git a/weed/storage/blockvol/group_commit.go b/weed/storage/blockvol/group_commit.go index ba2288d41..e16375cd1 100644 --- a/weed/storage/blockvol/group_commit.go +++ b/weed/storage/blockvol/group_commit.go @@ -16,11 +16,12 @@ var ( // GroupCommitter batches SyncCache requests and performs a single fsync // for the entire batch. This amortizes the cost of fsync across many callers. type GroupCommitter struct { - syncFunc func() error // called to fsync (injectable for testing) - maxDelay time.Duration // max wait before flushing a partial batch - maxBatch int // flush immediately when this many waiters accumulate - lowWatermark int // skip delay if fewer pending (0 = always wait) - onDegraded func() // called when fsync fails + syncFunc func() error // called to fsync (injectable for testing) + maxDelay time.Duration // max wait before flushing a partial batch + maxBatch int // flush immediately when this many waiters accumulate + lowWatermark int // skip delay if fewer pending (0 = always wait) + onDegraded func() // called when fsync fails + postSyncCheck func() error // called after sync; error fails waiters mu sync.Mutex pending []chan error @@ -35,11 +36,12 @@ type GroupCommitter struct { // GroupCommitterConfig configures the group committer. type GroupCommitterConfig struct { - SyncFunc func() error // required: the fsync function - MaxDelay time.Duration // default 1ms - MaxBatch int // default 64 - LowWatermark int // skip delay if fewer pending (0 = always wait) - OnDegraded func() // optional: called on fsync error + SyncFunc func() error // required: the fsync function + MaxDelay time.Duration // default 1ms + MaxBatch int // default 64 + LowWatermark int // skip delay if fewer pending (0 = always wait) + OnDegraded func() // optional: called on fsync error + PostSyncCheck func() error // optional: called after successful sync; error fails all waiters } // NewGroupCommitter creates a new group committer. Call Run() to start it. @@ -54,14 +56,15 @@ func NewGroupCommitter(cfg GroupCommitterConfig) *GroupCommitter { cfg.OnDegraded = func() {} } return &GroupCommitter{ - syncFunc: cfg.SyncFunc, - maxDelay: cfg.MaxDelay, - maxBatch: cfg.MaxBatch, - lowWatermark: cfg.LowWatermark, - onDegraded: cfg.OnDegraded, - notifyCh: make(chan struct{}, 1), - stopCh: make(chan struct{}), - done: make(chan struct{}), + syncFunc: cfg.SyncFunc, + maxDelay: cfg.MaxDelay, + maxBatch: cfg.MaxBatch, + lowWatermark: cfg.LowWatermark, + onDegraded: cfg.OnDegraded, + postSyncCheck: cfg.PostSyncCheck, + notifyCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), + done: make(chan struct{}), } } @@ -129,6 +132,9 @@ func (gc *GroupCommitter) Run() { // not leave waiters hung — notify them and drain stragglers. err := gc.callSyncFunc() gc.syncCount.Add(1) + if err == nil && gc.postSyncCheck != nil { + err = gc.postSyncCheck() + } if err != nil { gc.onDegraded() } diff --git a/weed/storage/blockvol/lease.go b/weed/storage/blockvol/lease.go new file mode 100644 index 000000000..c8ac009fa --- /dev/null +++ b/weed/storage/blockvol/lease.go @@ -0,0 +1,31 @@ +package blockvol + +import ( + "sync/atomic" + "time" +) + +// Lease tracks a runtime-only lease expiry for write fencing. +// Zero-value is an expired (invalid) lease. Not persisted. +type Lease struct { + expiry atomic.Value // stores time.Time +} + +// Grant sets the lease to expire after ttl from now. +func (l *Lease) Grant(ttl time.Duration) { + l.expiry.Store(time.Now().Add(ttl)) +} + +// IsValid returns true if the lease has not expired. +func (l *Lease) IsValid() bool { + v := l.expiry.Load() + if v == nil { + return false + } + return time.Now().Before(v.(time.Time)) +} + +// Revoke immediately invalidates the lease. +func (l *Lease) Revoke() { + l.expiry.Store(time.Time{}) // zero time is always in the past +} diff --git a/weed/storage/blockvol/qa_phase3_engine_test.go b/weed/storage/blockvol/qa_phase3_engine_test.go index e971fe084..3caa4f1e0 100644 --- a/weed/storage/blockvol/qa_phase3_engine_test.go +++ b/weed/storage/blockvol/qa_phase3_engine_test.go @@ -250,26 +250,15 @@ func testQADMMaxUint64LBA(t *testing.T) { } func testQADMNonPowerOf2Shards(t *testing.T) { - // Non-power-of-2: mask will be wrong (7-1=6=0b110), but should not panic. - // This tests robustness, not correctness of shard distribution. - dm := NewDirtyMap(7) - - // Should not panic on basic operations. - for i := uint64(0); i < 100; i++ { - dm.Put(i, i*10, i+1, 4096) - } - - // All entries should be retrievable (mask-based routing is deterministic). - for i := uint64(0); i < 100; i++ { - _, _, _, ok := dm.Get(i) - if !ok { - t.Errorf("Get(%d) not found with 7 shards", i) + // P3-BUG-9 fix: non-power-of-2 shards now panic to prevent silent + // shard routing bugs (mask-based routing requires power-of-2). + defer func() { + r := recover() + if r == nil { + t.Fatal("expected panic for non-power-of-2 numShards") } - } - - if dm.Len() != 100 { - t.Errorf("Len() = %d, want 100", dm.Len()) - } + }() + NewDirtyMap(7) // should panic } func testQADMZeroShards(t *testing.T) { diff --git a/weed/storage/blockvol/qa_phase4a_cp1_test.go b/weed/storage/blockvol/qa_phase4a_cp1_test.go new file mode 100644 index 000000000..c8d1cc873 --- /dev/null +++ b/weed/storage/blockvol/qa_phase4a_cp1_test.go @@ -0,0 +1,1198 @@ +package blockvol + +import ( + "errors" + "math" + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" +) + +// TestQAPhase4ACP1 tests Phase 4A CP1 fencing primitives adversarially: +// epoch, lease, role state machine, write gate, and full integration. +func TestQAPhase4ACP1(t *testing.T) { + tests := []struct { + name string + run func(t *testing.T) + }{ + // QA-4A-1: Epoch Adversarial + {name: "epoch_concurrent_set_epoch", run: testQAEpochConcurrentSetEpoch}, + {name: "epoch_set_during_write", run: testQAEpochSetDuringWrite}, + {name: "epoch_max_uint64", run: testQAEpochMaxUint64}, + {name: "epoch_reopen_after_concurrent_sets", run: testQAEpochReopenAfterConcurrentSets}, + + // QA-4A-2: Lease Adversarial + {name: "lease_grant_zero_ttl", run: testQALeaseGrantZeroTTL}, + {name: "lease_grant_negative_ttl", run: testQALeaseGrantNegativeTTL}, + {name: "lease_concurrent_grant_revoke", run: testQALeaseConcurrentGrantRevoke}, + {name: "lease_expiry_boundary_1ms", run: testQALeaseExpiryBoundary1ms}, + {name: "lease_revoke_during_write", run: testQALeaseRevokeDuringWrite}, + + // QA-4A-3: Role State Machine Adversarial + {name: "role_concurrent_transitions", run: testQARoleConcurrentTransitions}, + {name: "role_rapid_cycle_100x", run: testQARoleRapidCycle100x}, + {name: "role_callback_panic_recovery", run: testQARoleCallbackPanicRecovery}, + {name: "role_callback_slow_blocks_transition", run: testQARoleCallbackSlowBlocksTransition}, + {name: "role_unknown_value", run: testQARoleUnknownValue}, + + // QA-4A-4: Write Gate Adversarial + {name: "gate_role_change_during_write", run: testQAGateRoleChangeDuringWrite}, + {name: "gate_epoch_bump_during_write", run: testQAGateEpochBumpDuringWrite}, + {name: "gate_lease_expire_during_write", run: testQAGateLeaseExpireDuringWrite}, + {name: "gate_concurrent_100_writers_role_flip", run: testQAGateConcurrent100WritersRoleFlip}, + {name: "gate_gotcha_a_lease_expires_after_wal_before_sync", run: testQAGateGotchaA}, + + // QA-4A-5: Integration Adversarial + {name: "fencing_full_cycle_primary_to_stale", run: testQAFencingFullCycle}, + {name: "fencing_writes_rejected_after_demotion", run: testQAFencingWritesRejectedAfterDemotion}, + {name: "fencing_read_always_works", run: testQAFencingReadAlwaysWorks}, + {name: "fencing_close_during_role_transition", run: testQAFencingCloseDuringRoleTransition}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.run(t) + }) + } +} + +// --- helpers --- + +// createFencedVol creates a blockvol configured as Primary with matching epochs and a valid lease. +func createFencedVol(t *testing.T, opts ...CreateOptions) *BlockVol { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "fenced.blockvol") + + o := CreateOptions{ + VolumeSize: 1 * 1024 * 1024, // 1MB + BlockSize: 4096, + WALSize: 256 * 1024, // 256KB + } + if len(opts) > 0 { + o = opts[0] + } + + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + v, err := CreateBlockVol(path, o, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + + // Set up fencing: epoch=1, masterEpoch=1, role=Primary, lease=10s. + if err := v.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + v.SetMasterEpoch(1) + if err := v.SetRole(RolePrimary); err != nil { + t.Fatalf("SetRole(Primary): %v", err) + } + v.lease.Grant(10 * time.Second) + + return v +} + +// --- QA-4A-1: Epoch Adversarial --- + +func testQAEpochConcurrentSetEpoch(t *testing.T) { + // BUG-4A-1 + BUG-4A-3: concurrent SetEpoch + SetMasterEpoch + WriteLBA. + // Under -race, expects data race on v.epoch / v.masterEpoch (plain uint64). + v := createFencedVol(t) + defer v.Close() + + const goroutines = 8 + const iterations = 50 + + var wg sync.WaitGroup + + // Epoch/masterEpoch writers. + wg.Add(goroutines) + for g := 0; g < goroutines; g++ { + go func(id int) { + defer wg.Done() + for i := 0; i < iterations; i++ { + epoch := uint64(id*iterations + i + 1) + _ = v.SetEpoch(epoch) + v.SetMasterEpoch(epoch) + } + }(g) + } + + // Concurrent writer. + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < iterations*2; i++ { + _ = v.WriteLBA(uint64(i%256), makeBlock(byte('A'+i%26))) + } + }() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("concurrent SetEpoch + WriteLBA hung for 10s") + } + + // Verify: superblock is readable and epoch is a valid value (not corrupted). + finalEpoch := v.Epoch() + if finalEpoch == 0 || finalEpoch > uint64(goroutines*iterations+1) { + t.Errorf("epoch = %d, want [1, %d]", finalEpoch, goroutines*iterations+1) + } + + f, err := os.Open(v.Path()) + if err != nil { + t.Fatalf("open for verify: %v", err) + } + defer f.Close() + sb, err := ReadSuperblock(f) + if err != nil { + t.Fatalf("ReadSuperblock after concurrent sets: %v (superblock corrupted)", err) + } + if err := sb.Validate(); err != nil { + t.Fatalf("superblock validation failed: %v", err) + } + t.Logf("final superblock epoch: %d", sb.Epoch) +} + +func testQAEpochSetDuringWrite(t *testing.T) { + // BUG-4A-1: WriteLBA reads v.epoch (plain uint64) while SetEpoch writes it. + v := createFencedVol(t) + defer v.Close() + + const writeIter = 200 + const epochBumps = 100 + + var wg sync.WaitGroup + + // Writer loop. + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < writeIter; i++ { + _ = v.WriteLBA(uint64(i%256), makeBlock(byte('W'))) + } + }() + + // Epoch bumper. + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < epochBumps; i++ { + epoch := uint64(i + 2) + _ = v.SetEpoch(epoch) + v.SetMasterEpoch(epoch) + time.Sleep(100 * time.Microsecond) + } + }() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("epoch_set_during_write hung for 10s") + } + + // Close and reopen — verify no corruption and epoch in expected range. + path := v.Path() + v.Close() + + v2, err := OpenBlockVol(path) + if err != nil { + t.Fatalf("reopen: %v (superblock may be corrupted)", err) + } + defer v2.Close() + e := v2.Epoch() + if e < 2 || e > uint64(epochBumps+1) { + t.Errorf("reopened epoch = %d, want [2, %d]", e, epochBumps+1) + } + t.Logf("reopened epoch: %d", e) +} + +func testQAEpochMaxUint64(t *testing.T) { + v := createFencedVol(t) + defer v.Close() + + // Set both epochs to MaxUint64. + if err := v.SetEpoch(math.MaxUint64); err != nil { + t.Fatalf("SetEpoch(MaxUint64): %v", err) + } + v.SetMasterEpoch(math.MaxUint64) + + if v.Epoch() != math.MaxUint64 { + t.Errorf("Epoch() = %d, want MaxUint64", v.Epoch()) + } + + // WriteLBA should succeed (epochs match, role=Primary, lease valid). + if err := v.WriteLBA(0, makeBlock('M')); err != nil { + t.Fatalf("WriteLBA at MaxUint64 epoch: %v", err) + } + + // Verify read. + data, err := v.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA: %v", err) + } + if data[0] != 'M' { + t.Errorf("data[0] = %c, want M", data[0]) + } + + // Close, reopen, verify epoch persisted. + path := v.Path() + v.Close() + + v2, err := OpenBlockVol(path) + if err != nil { + t.Fatalf("reopen: %v", err) + } + defer v2.Close() + if v2.Epoch() != math.MaxUint64 { + t.Errorf("reopened Epoch() = %d, want MaxUint64", v2.Epoch()) + } +} + +func testQAEpochReopenAfterConcurrentSets(t *testing.T) { + // BUG-4A-3: concurrent SetEpoch → close → reopen → verify no corruption. + dir := t.TempDir() + path := filepath.Join(dir, "epoch_reopen.blockvol") + + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + v, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + + const goroutines = 4 + const iterations = 50 + validEpochs := make(map[uint64]bool) + for g := 0; g < goroutines; g++ { + for i := 0; i < iterations; i++ { + validEpochs[uint64(g*iterations+i+1)] = true + } + } + + var wg sync.WaitGroup + wg.Add(goroutines) + for g := 0; g < goroutines; g++ { + go func(id int) { + defer wg.Done() + for i := 0; i < iterations; i++ { + _ = v.SetEpoch(uint64(id*iterations + i + 1)) + } + }(g) + } + wg.Wait() + + v.Close() + + // Reopen and verify epoch is one of the valid values. + v2, err := OpenBlockVol(path) + if err != nil { + t.Fatalf("reopen after concurrent sets: %v", err) + } + defer v2.Close() + + e := v2.Epoch() + if !validEpochs[e] && e != 0 { + t.Errorf("reopened epoch %d is not a valid epoch from any goroutine", e) + } + + // Also verify superblock raw decode works. + f, err := os.Open(path) + if err != nil { + t.Fatalf("open raw: %v", err) + } + defer f.Close() + sb, err := ReadSuperblock(f) + if err != nil { + t.Fatalf("raw ReadSuperblock: %v", err) + } + if err := sb.Validate(); err != nil { + t.Fatalf("superblock validation: %v", err) + } + t.Logf("reopened epoch: %d, superblock valid", e) +} + +// --- QA-4A-2: Lease Adversarial --- + +func testQALeaseGrantZeroTTL(t *testing.T) { + // BUG-4A-5: Grant(0) stores time.Now() — immediately expired. + var l Lease + l.Grant(0) + if l.IsValid() { + t.Error("Grant(0): IsValid() = true, want false (immediately expired)") + } + + // Grant(1ns) — may or may not be valid, but must not panic. + l.Grant(time.Nanosecond) + _ = l.IsValid() // just verify no panic +} + +func testQALeaseGrantNegativeTTL(t *testing.T) { + // BUG-4A-5: Grant(-1s) stores past time — always expired. + var l Lease + l.Grant(-1 * time.Second) + if l.IsValid() { + t.Error("Grant(-1s): IsValid() = true, want false") + } + + // Verify WriteLBA returns ErrLeaseExpired with a negative-TTL lease. + v := createFencedVol(t) + defer v.Close() + v.lease.Grant(-1 * time.Second) + + err := v.WriteLBA(0, makeBlock('N')) + if !errors.Is(err, ErrLeaseExpired) { + t.Errorf("WriteLBA with negative lease: got %v, want ErrLeaseExpired", err) + } +} + +func testQALeaseConcurrentGrantRevoke(t *testing.T) { + // Concurrent Grant + Revoke + IsValid: no panic, no data race. + var l Lease + + const granters = 4 + const revokers = 4 + const duration = 500 * time.Millisecond + + var wg sync.WaitGroup + stop := make(chan struct{}) + + // Granters. + wg.Add(granters) + for i := 0; i < granters; i++ { + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + l.Grant(1 * time.Second) + } + } + }() + } + + // Revokers. + wg.Add(revokers) + for i := 0; i < revokers; i++ { + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + l.Revoke() + } + } + }() + } + + // IsValid checker. + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + _ = l.IsValid() + } + } + }() + + time.Sleep(duration) + close(stop) + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("concurrent grant/revoke hung for 5s") + } +} + +func testQALeaseExpiryBoundary1ms(t *testing.T) { + var l Lease + l.Grant(1 * time.Millisecond) + + // Busy-poll until expired. + deadline := time.After(50 * time.Millisecond) + for l.IsValid() { + select { + case <-deadline: + t.Fatal("lease still valid after 50ms (granted 1ms)") + default: + // spin + } + } + + // Now verify WriteLBA fails. + v := createFencedVol(t) + defer v.Close() + v.lease.Grant(1 * time.Millisecond) + time.Sleep(5 * time.Millisecond) // ensure expired + + err := v.WriteLBA(0, makeBlock('E')) + if !errors.Is(err, ErrLeaseExpired) { + t.Errorf("WriteLBA after 1ms lease expiry: got %v, want ErrLeaseExpired", err) + } +} + +func testQALeaseRevokeDuringWrite(t *testing.T) { + v := createFencedVol(t) + defer v.Close() + + // Write some data first. + for i := 0; i < 50; i++ { + if err := v.WriteLBA(uint64(i), makeBlock(byte('A'+i%26))); err != nil { + t.Fatalf("pre-revoke WriteLBA(%d): %v", i, err) + } + } + + // Revoke lease. + v.lease.Revoke() + + // Subsequent writes must fail. + err := v.WriteLBA(50, makeBlock('Z')) + if !errors.Is(err, ErrLeaseExpired) { + t.Errorf("post-revoke WriteLBA: got %v, want ErrLeaseExpired", err) + } + + // Reads should still work (ReadLBA doesn't check writeGate). + for i := 0; i < 50; i++ { + data, err := v.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("post-revoke ReadLBA(%d): %v", i, err) + } + if data[0] != byte('A'+i%26) { + t.Errorf("LBA %d: data[0] = %c, want %c", i, data[0], byte('A'+i%26)) + } + } +} + +// --- QA-4A-3: Role State Machine Adversarial --- + +func testQARoleConcurrentTransitions(t *testing.T) { + // BUG-4A-2: TOCTOU in SetRole. Two goroutines both see old=Stale, + // both pass validation, both Store. One should fail (ideally). + v := createFencedVol(t) + defer v.Close() + + // Get to Stale: Primary → Draining → Stale. + if err := v.SetRole(RoleDraining); err != nil { + t.Fatalf("SetRole(Draining): %v", err) + } + if err := v.SetRole(RoleStale); err != nil { + t.Fatalf("SetRole(Stale): %v", err) + } + + // Now from Stale, both Rebuilding and Replica are valid transitions. + var wg sync.WaitGroup + var errA, errB error + wg.Add(2) + go func() { + defer wg.Done() + errA = v.SetRole(RoleRebuilding) + }() + go func() { + defer wg.Done() + errB = v.SetRole(RoleReplica) + }() + wg.Wait() + + // With CAS fix: exactly one must succeed, one must get ErrInvalidRoleTransition. + // If both succeed, CAS was reverted to Load+Store (BUG-4A-2 regression). + finalRole := v.Role() + t.Logf("concurrent transitions from Stale: errA=%v errB=%v finalRole=%s", + errA, errB, finalRole) + + if errA == nil && errB == nil { + t.Error("BUG-4A-2 REGRESSION: both transitions succeeded — CAS not working") + } + if errA != nil && errB != nil { + t.Error("both transitions failed — unexpected, at least one should succeed") + } + if finalRole != RoleRebuilding && finalRole != RoleReplica { + t.Errorf("final role = %s, want Rebuilding or Replica", finalRole) + } +} + +func testQARoleRapidCycle100x(t *testing.T) { + v := createFencedVol(t) + defer v.Close() + + var callbackCount atomic.Int64 + v.SetRoleCallback(func(old, new Role) { + callbackCount.Add(1) + }) + + // Full cycle: Primary→Draining→Stale→Rebuilding→Replica→Primary + cycle := []Role{RoleDraining, RoleStale, RoleRebuilding, RoleReplica, RolePrimary} + + const rounds = 100 + expectedCallbacks := int64(0) + for r := 0; r < rounds; r++ { + for _, target := range cycle { + if err := v.SetRole(target); err != nil { + t.Fatalf("round %d: SetRole(%s): %v", r, target, err) + } + expectedCallbacks++ + } + } + + if v.Role() != RolePrimary { + t.Errorf("final role = %s, want Primary", v.Role()) + } + + got := callbackCount.Load() + if got != expectedCallbacks { + t.Errorf("callback count = %d, want %d", got, expectedCallbacks) + } + t.Logf("rapid cycle: %d rounds, %d callbacks", rounds, got) +} + +func testQARoleCallbackPanicRecovery(t *testing.T) { + // BUG-4A-6 FIXED: callback panic is now recovered by safeCallback. + v := createFencedVol(t) + defer v.Close() + + v.SetRoleCallback(func(old, new Role) { + panic("callback panic!") + }) + + // SetRole(Draining) — callback panics but safeCallback recovers it. + // SetRole should return nil (CAS succeeded), not propagate the panic. + err := v.SetRole(RoleDraining) + if err != nil { + t.Fatalf("SetRole(Draining) returned error: %v (expected nil after panic recovery)", err) + } + + // Role should be Draining (CAS happened before callback). + if v.Role() != RoleDraining { + t.Errorf("role after recovered panic = %s, want Draining", v.Role()) + } +} + +func testQARoleCallbackSlowBlocksTransition(t *testing.T) { + v := createFencedVol(t) + defer v.Close() + + callbackDone := make(chan struct{}) + v.SetRoleCallback(func(old, new Role) { + time.Sleep(100 * time.Millisecond) + close(callbackDone) + }) + + // SetRole in goroutine (will be slow due to callback). + transitionDone := make(chan error, 1) + go func() { + transitionDone <- v.SetRole(RoleDraining) + }() + + // WriteLBA checks role atomically. Since CAS stores the new role before + // callback runs, writes must see Draining after a short wait. + time.Sleep(10 * time.Millisecond) // let CAS+Store happen + + err := v.WriteLBA(0, makeBlock('S')) + if !errors.Is(err, ErrNotPrimary) { + t.Errorf("write during slow callback: got %v, want ErrNotPrimary (role should already be Draining)", err) + } + + // Wait for callback to complete. + select { + case <-callbackDone: + case <-time.After(5 * time.Second): + t.Fatal("slow callback hung for 5s") + } + + select { + case err := <-transitionDone: + if err != nil { + t.Errorf("SetRole(Draining): %v", err) + } + case <-time.After(5 * time.Second): + t.Fatal("SetRole hung for 5s") + } +} + +func testQARoleUnknownValue(t *testing.T) { + v := createFencedVol(t) + defer v.Close() + + // Force an unknown role value (255) via atomic store. + v.role.Store(255) + + // writeGate should reject (255 != RolePrimary). + err := v.writeGate() + if !errors.Is(err, ErrNotPrimary) { + t.Errorf("writeGate with role=255: got %v, want ErrNotPrimary", err) + } + + // SetRole(Primary) from unknown role — not in transition table. + err = v.SetRole(RolePrimary) + if !errors.Is(err, ErrInvalidRoleTransition) { + t.Errorf("SetRole(Primary) from role=255: got %v, want ErrInvalidRoleTransition", err) + } + + // WriteLBA should also fail. + err = v.WriteLBA(0, makeBlock('U')) + if !errors.Is(err, ErrNotPrimary) { + t.Errorf("WriteLBA with role=255: got %v, want ErrNotPrimary", err) + } +} + +// --- QA-4A-4: Write Gate Adversarial --- + +func testQAGateRoleChangeDuringWrite(t *testing.T) { + v := createFencedVol(t) + defer v.Close() + + const writers = 16 + const writesPerGoroutine = 50 + + var wg sync.WaitGroup + var succeeded, rejected atomic.Int64 + + // Many concurrent writers. + wg.Add(writers) + for g := 0; g < writers; g++ { + go func(id int) { + defer wg.Done() + for i := 0; i < writesPerGoroutine; i++ { + lba := uint64((id*writesPerGoroutine + i) % 256) + err := v.WriteLBA(lba, makeBlock(byte('A'+id%26))) + if err == nil { + succeeded.Add(1) + } else if errors.Is(err, ErrNotPrimary) { + rejected.Add(1) + } + // other errors (WAL full, etc.) are ok + } + }(g) + } + + // After 10ms, flip role. + time.Sleep(10 * time.Millisecond) + if err := v.SetRole(RoleDraining); err != nil { + t.Logf("SetRole(Draining): %v", err) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("gate_role_change_during_write hung for 10s") + } + + t.Logf("role flip: %d succeeded, %d rejected (ErrNotPrimary)", + succeeded.Load(), rejected.Load()) + + // After transition, ALL new writes must fail. + err := v.WriteLBA(0, makeBlock('X')) + if !errors.Is(err, ErrNotPrimary) { + t.Errorf("write after Draining: got %v, want ErrNotPrimary", err) + } +} + +func testQAGateEpochBumpDuringWrite(t *testing.T) { + // BUG-4A-1: data race on v.epoch reads in writeGate/WriteLBA vs SetEpoch writes. + // Use large WAL + short WALFullTimeout to avoid writers blocking on WAL-full. + dir := t.TempDir() + path := filepath.Join(dir, "epoch_bump.blockvol") + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + cfg.WALFullTimeout = 200 * time.Millisecond + + v, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 1 * 1024 * 1024, // 1MB WAL — plenty of room + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer v.Close() + + // Set up fencing. + if err := v.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + v.SetMasterEpoch(1) + if err := v.SetRole(RolePrimary); err != nil { + t.Fatalf("SetRole: %v", err) + } + v.lease.Grant(10 * time.Second) + + const writers = 8 + const iterations = 30 + + var wg sync.WaitGroup + + // Writers. + wg.Add(writers) + for g := 0; g < writers; g++ { + go func(id int) { + defer wg.Done() + for i := 0; i < iterations; i++ { + _ = v.WriteLBA(uint64((id*iterations+i)%256), makeBlock(byte('A'+id%26))) + } + }(g) + } + + // Epoch bumper: set epoch=2, masterEpoch=2. + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(5 * time.Millisecond) + _ = v.SetEpoch(2) + v.SetMasterEpoch(2) + }() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("gate_epoch_bump_during_write hung for 10s") + } + + // After bump, both epochs are 2 — writes must succeed. + if v.Epoch() != 2 { + t.Errorf("Epoch() = %d after bump, want 2", v.Epoch()) + } + if writeErr := v.WriteLBA(0, makeBlock('Z')); writeErr != nil { + t.Errorf("write after epoch bump: %v (epochs should match)", writeErr) + } +} + +func testQAGateLeaseExpireDuringWrite(t *testing.T) { + v := createFencedVol(t) + defer v.Close() + + // Short lease — write in a loop that spans the expiry window. + v.lease.Grant(10 * time.Millisecond) + + var succeeded, expired int + deadline := time.After(200 * time.Millisecond) + i := 0 +loop: + for { + select { + case <-deadline: + break loop + default: + } + writeErr := v.WriteLBA(uint64(i%256), makeBlock(byte('W'))) + if writeErr == nil { + succeeded++ + } else if errors.Is(writeErr, ErrLeaseExpired) { + expired++ + } + i++ + time.Sleep(500 * time.Microsecond) + } + + t.Logf("lease expire: %d succeeded, %d expired (out of %d)", succeeded, expired, i) + + if expired == 0 { + t.Error("no writes expired — lease didn't expire in time?") + } + if succeeded == 0 { + t.Error("no writes succeeded — lease expired immediately?") + } + + // Verify monotonicity: once expired, no more successes. + v.lease.Grant(0) // force expired + for j := 0; j < 10; j++ { + writeErr := v.WriteLBA(uint64(j), makeBlock('X')) + if !errors.Is(writeErr, ErrLeaseExpired) { + t.Errorf("write %d after forced expiry: got %v, want ErrLeaseExpired", j, writeErr) + } + } +} + +func testQAGateConcurrent100WritersRoleFlip(t *testing.T) { + // Use large WAL + short timeout to avoid writers blocking on WAL-full. + dir := t.TempDir() + path := filepath.Join(dir, "100writers.blockvol") + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + cfg.WALFullTimeout = 200 * time.Millisecond + + vol, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 2 * 1024 * 1024, // 2MB WAL + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + vol.SetMasterEpoch(1) + if err := vol.SetRole(RolePrimary); err != nil { + t.Fatalf("SetRole: %v", err) + } + vol.lease.Grant(10 * time.Second) + v := vol + defer v.Close() + + const writers = 100 + const blocksPerWriter = 10 + + var wg sync.WaitGroup + var succeeded, notPrimary atomic.Int64 + + // 100 writers. + wg.Add(writers) + for g := 0; g < writers; g++ { + go func(id int) { + defer wg.Done() + for i := 0; i < blocksPerWriter; i++ { + lba := uint64((id*blocksPerWriter + i) % 256) + err := v.WriteLBA(lba, makeBlock(byte('A'+id%26))) + if err == nil { + succeeded.Add(1) + } else if errors.Is(err, ErrNotPrimary) { + notPrimary.Add(1) + } + } + }(g) + } + + // Flip role mid-flight. + time.Sleep(50 * time.Millisecond) + _ = v.SetRole(RoleDraining) + time.Sleep(50 * time.Millisecond) + _ = v.SetRole(RoleStale) + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("100 writers + role flip hung for 10s") + } + + s, np := succeeded.Load(), notPrimary.Load() + t.Logf("100 writers: %d succeeded, %d ErrNotPrimary", s, np) + + // Verify data integrity for successful writes: each block is readable. + // We can't know which exact writes succeeded, but ReadLBA should not error. + for i := 0; i < 256; i++ { + _, err := v.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("ReadLBA(%d) after stress: %v", i, err) + } + } +} + +func testQAGateGotchaA(t *testing.T) { + // Gotcha A: lease expires between WAL append and SyncCache. + // Data IS in WAL, but SyncCache returns ErrLeaseExpired. + v := createFencedVol(t) + defer v.Close() + + // Grant a short lease. + v.lease.Grant(50 * time.Millisecond) + + // Write data (succeeds — lease still valid). + if err := v.WriteLBA(0, makeBlock('G')); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Wait for lease to expire. + time.Sleep(100 * time.Millisecond) + + // SyncCache must fail — PostSyncCheck calls writeGate, which checks lease. + err := v.SyncCache() + if !errors.Is(err, ErrLeaseExpired) { + t.Errorf("SyncCache after lease expiry: got %v, want ErrLeaseExpired (Gotcha A)", err) + } + + // Key verification: data IS readable from dirty map / WAL. + data, err := v.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA after Gotcha A: %v", err) + } + if data[0] != 'G' { + t.Errorf("data[0] = %c, want G (data should be in WAL/dirty map)", data[0]) + } + t.Log("Gotcha A: data in WAL, SyncCache fenced, ReadLBA returns written data") +} + +// --- QA-4A-5: Integration Adversarial --- + +func testQAFencingFullCycle(t *testing.T) { + v := createFencedVol(t) + defer v.Close() + + // Write 100 blocks as Primary. + for i := 0; i < 100; i++ { + if err := v.WriteLBA(uint64(i), makeBlock(byte('A'+i%26))); err != nil { + t.Fatalf("WriteLBA(%d): %v", i, err) + } + } + if err := v.SyncCache(); err != nil { + t.Fatalf("SyncCache: %v", err) + } + + // Verify all 100 blocks. + for i := 0; i < 100; i++ { + data, err := v.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("ReadLBA(%d): %v", i, err) + } + if data[0] != byte('A'+i%26) { + t.Errorf("LBA %d: data[0] = %c, want %c", i, data[0], byte('A'+i%26)) + } + } + + // Primary → Draining — writes should fail. + if err := v.SetRole(RoleDraining); err != nil { + t.Fatalf("SetRole(Draining): %v", err) + } + if err := v.WriteLBA(0, makeBlock('X')); !errors.Is(err, ErrNotPrimary) { + t.Errorf("write as Draining: got %v, want ErrNotPrimary", err) + } + + // Draining → Stale — writes rejected. + if err := v.SetRole(RoleStale); err != nil { + t.Fatalf("SetRole(Stale): %v", err) + } + if err := v.WriteLBA(0, makeBlock('X')); !errors.Is(err, ErrNotPrimary) { + t.Errorf("write as Stale: got %v, want ErrNotPrimary", err) + } + + // Bump epoch, transition through Rebuilding → Replica. + if err := v.SetEpoch(2); err != nil { + t.Fatalf("SetEpoch(2): %v", err) + } + v.SetMasterEpoch(2) + + if err := v.SetRole(RoleRebuilding); err != nil { + t.Fatalf("SetRole(Rebuilding): %v", err) + } + if err := v.WriteLBA(0, makeBlock('X')); !errors.Is(err, ErrNotPrimary) { + t.Errorf("write as Rebuilding: got %v, want ErrNotPrimary", err) + } + + if err := v.SetRole(RoleReplica); err != nil { + t.Fatalf("SetRole(Replica): %v", err) + } + if err := v.WriteLBA(0, makeBlock('X')); !errors.Is(err, ErrNotPrimary) { + t.Errorf("write as Replica: got %v, want ErrNotPrimary", err) + } + + // Replica → Primary with fresh lease — writes succeed again. + if err := v.SetRole(RolePrimary); err != nil { + t.Fatalf("SetRole(Primary): %v", err) + } + v.lease.Grant(10 * time.Second) + + if err := v.WriteLBA(200, makeBlock('N')); err != nil { + t.Fatalf("write as new Primary: %v", err) + } + + // Original 100 blocks still readable. + for i := 0; i < 100; i++ { + data, err := v.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("final ReadLBA(%d): %v", i, err) + } + if data[0] != byte('A'+i%26) { + t.Errorf("final LBA %d: data[0] = %c, want %c", i, data[0], byte('A'+i%26)) + } + } + t.Log("full cycle: Primary→Draining→Stale→Rebuilding→Replica→Primary complete") +} + +func testQAFencingWritesRejectedAfterDemotion(t *testing.T) { + v := createFencedVol(t) + defer v.Close() + + // Write initial data. + if err := v.WriteLBA(0, makeBlock('D')); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + if err := v.SyncCache(); err != nil { + t.Fatalf("SyncCache: %v", err) + } + + // Revoke lease → ErrLeaseExpired. + v.lease.Revoke() + err := v.WriteLBA(1, makeBlock('X')) + if !errors.Is(err, ErrLeaseExpired) { + t.Errorf("after Revoke: got %v, want ErrLeaseExpired", err) + } + + // Re-grant lease for role checks, then demote. + v.lease.Grant(10 * time.Second) + + // Primary → Draining → ErrNotPrimary. + if err := v.SetRole(RoleDraining); err != nil { + t.Fatalf("SetRole(Draining): %v", err) + } + err = v.WriteLBA(1, makeBlock('X')) + if !errors.Is(err, ErrNotPrimary) { + t.Errorf("after Draining: got %v, want ErrNotPrimary", err) + } + + // Draining → Stale → ErrNotPrimary. + if err := v.SetRole(RoleStale); err != nil { + t.Fatalf("SetRole(Stale): %v", err) + } + err = v.WriteLBA(1, makeBlock('X')) + if !errors.Is(err, ErrNotPrimary) { + t.Errorf("after Stale: got %v, want ErrNotPrimary", err) + } + + // Verify: LBA 0 has original data, LBA 1 was never written. + data, readErr := v.ReadLBA(0, 4096) + if readErr != nil { + t.Fatalf("ReadLBA(0): %v", readErr) + } + if data[0] != 'D' { + t.Errorf("LBA 0: data[0] = %c, want D", data[0]) + } +} + +func testQAFencingReadAlwaysWorks(t *testing.T) { + v := createFencedVol(t) + defer v.Close() + + // Write data as Primary. + for i := 0; i < 10; i++ { + if err := v.WriteLBA(uint64(i), makeBlock(byte('R'+i%6))); err != nil { + t.Fatalf("WriteLBA(%d): %v", i, err) + } + } + + verifyReads := func(label string) { + for i := 0; i < 10; i++ { + data, err := v.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("[%s] ReadLBA(%d): %v", label, i, err) + } + if data[0] != byte('R'+i%6) { + t.Errorf("[%s] LBA %d: data[0] = %c, want %c", label, i, data[0], byte('R'+i%6)) + } + } + } + + // Reads work as Primary. + verifyReads("Primary") + + // Draining. + if err := v.SetRole(RoleDraining); err != nil { + t.Fatalf("SetRole(Draining): %v", err) + } + verifyReads("Draining") + + // Stale. + if err := v.SetRole(RoleStale); err != nil { + t.Fatalf("SetRole(Stale): %v", err) + } + verifyReads("Stale") + + // Rebuilding. + if err := v.SetRole(RoleRebuilding); err != nil { + t.Fatalf("SetRole(Rebuilding): %v", err) + } + verifyReads("Rebuilding") + + // Replica. + if err := v.SetRole(RoleReplica); err != nil { + t.Fatalf("SetRole(Replica): %v", err) + } + verifyReads("Replica") + + // Expired lease — reads still work. + v.lease.Revoke() + verifyReads("expired_lease") + + // Stale epoch — reads still work. + v.SetMasterEpoch(999) + verifyReads("stale_epoch") + + t.Log("reads work in all roles, with expired lease and stale epoch") +} + +func testQAFencingCloseDuringRoleTransition(t *testing.T) { + v := createFencedVol(t) + + // Start some writes. + for i := 0; i < 20; i++ { + _ = v.WriteLBA(uint64(i), makeBlock(byte('C'))) + } + + // Concurrent Close + SetRole. + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + _ = v.Close() + }() + + go func() { + defer wg.Done() + _ = v.SetRole(RoleDraining) + }() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + t.Log("Close + SetRole concurrent: no deadlock, no panic") + case <-time.After(5 * time.Second): + t.Fatal("Close + SetRole deadlocked for 5s") + } +} diff --git a/weed/storage/blockvol/role.go b/weed/storage/blockvol/role.go new file mode 100644 index 000000000..c097b535d --- /dev/null +++ b/weed/storage/blockvol/role.go @@ -0,0 +1,104 @@ +package blockvol + +import ( + "errors" + "fmt" +) + +// Role represents the replication role of a block volume. +type Role uint8 + +const ( + RoleNone Role = 0 // Phase 3 default, no fencing + RolePrimary Role = 1 + RoleReplica Role = 2 + RoleStale Role = 3 + RoleRebuilding Role = 4 + RoleDraining Role = 5 +) + +// String returns the role name. +func (r Role) String() string { + switch r { + case RoleNone: + return "none" + case RolePrimary: + return "primary" + case RoleReplica: + return "replica" + case RoleStale: + return "stale" + case RoleRebuilding: + return "rebuilding" + case RoleDraining: + return "draining" + default: + return fmt.Sprintf("unknown(%d)", uint8(r)) + } +} + +// validTransitions maps each role to the set of roles it can transition to. +var validTransitions = map[Role]map[Role]bool{ + RoleNone: {RolePrimary: true, RoleReplica: true}, + RolePrimary: {RoleDraining: true}, + RoleReplica: {RolePrimary: true}, + RoleStale: {RoleRebuilding: true, RoleReplica: true}, + RoleRebuilding: {RoleReplica: true}, + RoleDraining: {RoleStale: true}, +} + +// ValidTransition returns true if transitioning from -> to is allowed. +func ValidTransition(from, to Role) bool { + targets, ok := validTransitions[from] + if !ok { + return false + } + return targets[to] +} + +// RoleChangeCallback is called when a volume's role changes. +type RoleChangeCallback func(old, new Role) + +var ErrInvalidRoleTransition = errors.New("blockvol: invalid role transition") + +// Role returns the current role of this volume. +func (v *BlockVol) Role() Role { + return Role(v.role.Load()) +} + +// SetRole transitions the volume to a new role if the transition is valid. +// Uses CompareAndSwap to prevent TOCTOU races between concurrent callers. +// Calls the registered RoleChangeCallback after updating. +func (v *BlockVol) SetRole(r Role) error { + for { + old := v.role.Load() + if !ValidTransition(Role(old), r) { + return fmt.Errorf("%w: %s -> %s", ErrInvalidRoleTransition, Role(old), r) + } + if v.role.CompareAndSwap(old, uint32(r)) { + if v.roleCallback != nil { + v.safeCallback(Role(old), r) + } + return nil + } + // CAS failed — another goroutine changed the role; retry. + } +} + +// safeCallback invokes the role callback with panic recovery. +// If the callback panics, the role is already updated but the panic +// is caught and returned as an error via log (callers see nil from SetRole). +func (v *BlockVol) safeCallback(old, new Role) { + defer func() { + if r := recover(); r != nil { + // Role is already stored. Log but don't propagate panic. + // In production, this would go to glog. For now, swallow it. + } + }() + v.roleCallback(old, new) +} + +// SetRoleCallback registers a callback to be invoked on role changes. +func (v *BlockVol) SetRoleCallback(cb RoleChangeCallback) { + v.roleCallback = cb +} diff --git a/weed/storage/blockvol/superblock.go b/weed/storage/blockvol/superblock.go index a6a341d21..9599d4731 100644 --- a/weed/storage/blockvol/superblock.go +++ b/weed/storage/blockvol/superblock.go @@ -39,6 +39,7 @@ type Superblock struct { Replication [4]byte CreatedAt uint64 // unix timestamp SnapshotCount uint32 + Epoch uint64 // fencing epoch (0 = no fencing, Phase 3 compat) } // superblockOnDisk is the fixed-size on-disk layout (binary.Write/Read target). @@ -59,6 +60,7 @@ type superblockOnDisk struct { Replication [4]byte CreatedAt uint64 SnapshotCount uint32 + Epoch uint64 } // NewSuperblock creates a superblock with defaults and a fresh UUID. @@ -124,6 +126,7 @@ func (sb *Superblock) WriteTo(w io.Writer) (int64, error) { Replication: sb.Replication, CreatedAt: sb.CreatedAt, SnapshotCount: sb.SnapshotCount, + Epoch: sb.Epoch, } // Encode into beginning of buf; rest stays zero (padding). @@ -155,6 +158,8 @@ func (sb *Superblock) WriteTo(w io.Writer) (int64, error) { endian.PutUint64(buf[off:], d.CreatedAt) off += 8 endian.PutUint32(buf[off:], d.SnapshotCount) + off += 4 + endian.PutUint64(buf[off:], d.Epoch) n, err := w.Write(buf) return int64(n), err @@ -213,6 +218,8 @@ func ReadSuperblock(r io.Reader) (Superblock, error) { sb.CreatedAt = endian.Uint64(buf[off:]) off += 8 sb.SnapshotCount = endian.Uint32(buf[off:]) + off += 4 + sb.Epoch = endian.Uint64(buf[off:]) return sb, nil } diff --git a/weed/storage/blockvol/write_gate.go b/weed/storage/blockvol/write_gate.go new file mode 100644 index 000000000..c2b10f93a --- /dev/null +++ b/weed/storage/blockvol/write_gate.go @@ -0,0 +1,28 @@ +package blockvol + +import "errors" + +var ( + ErrNotPrimary = errors.New("blockvol: not primary") + ErrEpochStale = errors.New("blockvol: epoch stale") + ErrLeaseExpired = errors.New("blockvol: lease expired") +) + +// writeGate checks role, epoch, and lease before allowing a write. +// RoleNone skips all checks for Phase 3 backward compatibility. +func (v *BlockVol) writeGate() error { + r := Role(v.role.Load()) + if r == RoleNone { + return nil // Phase 3 compat: no fencing + } + if r != RolePrimary { + return ErrNotPrimary + } + if v.epoch.Load() != v.masterEpoch.Load() { + return ErrEpochStale + } + if !v.lease.IsValid() { + return ErrLeaseExpired + } + return nil +}