Browse Source

feat: Phase 4A CP1 — epoch, lease, role state machine, write gate

Local fencing primitives for block volumes. Every write path validates
role + epoch + lease before accepting data. RoleNone (default) skips
all checks for Phase 3 backward compatibility.

New files: epoch.go, lease.go, role.go, write_gate.go
Modified: superblock.go (Epoch field), blockvol.go (fencing fields,
writeGate in WriteLBA/Trim), group_commit.go (PostSyncCheck/Gotcha A),
dirty_map.go (P3-BUG-9 power-of-2 panic)

Bug fixes: BUG-4A-1 (atomic epoch), BUG-4A-2 (CAS SetRole),
BUG-4A-3 (mutex SetEpoch), BUG-4A-4 (single role.Load),
BUG-4A-6 (safeCallback recover)

837 tests (557 engine + 280 iSCSI), all passing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 3 weeks ago
parent
commit
a107685f00
  1. 40
      weed/storage/blockvol/blockvol.go
  2. 434
      weed/storage/blockvol/blockvol_test.go
  3. 3
      weed/storage/blockvol/dirty_map.go
  4. 38
      weed/storage/blockvol/epoch.go
  5. 42
      weed/storage/blockvol/group_commit.go
  6. 31
      weed/storage/blockvol/lease.go
  7. 27
      weed/storage/blockvol/qa_phase3_engine_test.go
  8. 1198
      weed/storage/blockvol/qa_phase4a_cp1_test.go
  9. 104
      weed/storage/blockvol/role.go
  10. 7
      weed/storage/blockvol/superblock.go
  11. 28
      weed/storage/blockvol/write_gate.go

40
weed/storage/blockvol/blockvol.go

@ -41,6 +41,13 @@ type BlockVol struct {
closed atomic.Bool
opsOutstanding atomic.Int64 // in-flight Read/Write/Trim/SyncCache ops
opsDrained chan struct{}
// Fencing fields (Phase 4A).
epoch atomic.Uint64 // current persisted epoch
masterEpoch atomic.Uint64 // expected epoch from master
lease Lease
role atomic.Uint32
roleCallback RoleChangeCallback
}
// CreateBlockVol creates a new block volume file at path.
@ -105,11 +112,12 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B
v.nextLSN.Store(1)
v.healthy.Store(true)
v.groupCommit = NewGroupCommitter(GroupCommitterConfig{
SyncFunc: fd.Sync,
MaxDelay: cfg.GroupCommitMaxDelay,
MaxBatch: cfg.GroupCommitMaxBatch,
LowWatermark: cfg.GroupCommitLowWatermark,
OnDegraded: func() { v.healthy.Store(false) },
SyncFunc: fd.Sync,
MaxDelay: cfg.GroupCommitMaxDelay,
MaxBatch: cfg.GroupCommitMaxBatch,
LowWatermark: cfg.GroupCommitLowWatermark,
OnDegraded: func() { v.healthy.Store(false) },
PostSyncCheck: v.writeGate,
})
go v.groupCommit.Run()
v.flusher = NewFlusher(FlusherConfig{
@ -174,13 +182,15 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
opsDrained: make(chan struct{}, 1),
}
v.nextLSN.Store(nextLSN)
v.epoch.Store(sb.Epoch)
v.healthy.Store(true)
v.groupCommit = NewGroupCommitter(GroupCommitterConfig{
SyncFunc: fd.Sync,
MaxDelay: cfg.GroupCommitMaxDelay,
MaxBatch: cfg.GroupCommitMaxBatch,
LowWatermark: cfg.GroupCommitLowWatermark,
OnDegraded: func() { v.healthy.Store(false) },
SyncFunc: fd.Sync,
MaxDelay: cfg.GroupCommitMaxDelay,
MaxBatch: cfg.GroupCommitMaxBatch,
LowWatermark: cfg.GroupCommitLowWatermark,
OnDegraded: func() { v.healthy.Store(false) },
PostSyncCheck: v.writeGate,
})
go v.groupCommit.Run()
v.flusher = NewFlusher(FlusherConfig{
@ -253,6 +263,9 @@ func (v *BlockVol) WriteLBA(lba uint64, data []byte) error {
return err
}
defer v.endOp()
if err := v.writeGate(); err != nil {
return err
}
if err := ValidateWrite(lba, uint32(len(data)), v.super.VolumeSize, v.super.BlockSize); err != nil {
return err
}
@ -260,7 +273,7 @@ func (v *BlockVol) WriteLBA(lba uint64, data []byte) error {
lsn := v.nextLSN.Add(1) - 1
entry := &WALEntry{
LSN: lsn,
Epoch: 0, // Phase 1: no fencing
Epoch: v.epoch.Load(),
Type: EntryTypeWrite,
LBA: lba,
Length: uint32(len(data)),
@ -426,6 +439,9 @@ func (v *BlockVol) Trim(lba uint64, length uint32) error {
return err
}
defer v.endOp()
if err := v.writeGate(); err != nil {
return err
}
if err := ValidateWrite(lba, length, v.super.VolumeSize, v.super.BlockSize); err != nil {
return err
}
@ -433,7 +449,7 @@ func (v *BlockVol) Trim(lba uint64, length uint32) error {
lsn := v.nextLSN.Add(1) - 1
entry := &WALEntry{
LSN: lsn,
Epoch: 0,
Epoch: v.epoch.Load(),
Type: EntryTypeTrim,
LBA: lba,
Length: length,

434
weed/storage/blockvol/blockvol_test.go

@ -67,6 +67,29 @@ func TestBlockVol(t *testing.T) {
{name: "close_during_sync_cache", run: testCloseDuringSyncCache},
// Review finding: Close timeout if op stuck.
{name: "close_timeout_if_op_stuck", run: testCloseTimeoutIfOpStuck},
// Phase 4A CP1: Epoch tests.
{name: "epoch_persist_roundtrip", run: testEpochPersistRoundtrip},
{name: "epoch_in_wal_entry", run: testEpochInWALEntry},
{name: "epoch_survives_recovery", run: testEpochSurvivesRecovery},
// Phase 4A CP1: Lease tests.
{name: "lease_grant_valid", run: testLeaseGrantValid},
{name: "lease_expired_rejects", run: testLeaseExpiredRejects},
{name: "lease_revoke", run: testLeaseRevoke},
// Phase 4A CP1: Role tests.
{name: "role_transitions_valid", run: testRoleTransitionsValid},
{name: "role_transitions_invalid", run: testRoleTransitionsInvalid},
{name: "role_primary_callback", run: testRolePrimaryCallback},
{name: "role_stale_callback", run: testRoleStaleCallback},
// Phase 4A CP1: Write gate tests.
{name: "gate_primary_ok", run: testGatePrimaryOK},
{name: "gate_not_primary", run: testGateNotPrimary},
{name: "gate_stale_epoch", run: testGateStaleEpoch},
{name: "gate_lease_expired", run: testGateLeaseExpired},
{name: "gate_trim_rejected", run: testGateTrimRejected},
{name: "blockvol_write_gate_integration", run: testBlockvolWriteGateIntegration},
{name: "blockvol_gotcha_a_lease_expired", run: testBlockvolGotchaALeaseExpired},
// Phase 4A CP1: P3-BUG-9 dirty map.
{name: "dirty_map_power_of_2_panics", run: testDirtyMapPowerOf2Panics},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -1678,3 +1701,414 @@ func testCloseTimeoutIfOpStuck(t *testing.T) {
t.Fatal("Close hung for >10s — drain timeout not working")
}
}
// --- Phase 4A CP1: Epoch tests ---
func testEpochPersistRoundtrip(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "epoch.blockvol")
v, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
})
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
if v.Epoch() != 0 {
t.Fatalf("initial epoch = %d, want 0", v.Epoch())
}
if err := v.SetEpoch(42); err != nil {
t.Fatalf("SetEpoch: %v", err)
}
if v.Epoch() != 42 {
t.Fatalf("epoch after set = %d, want 42", v.Epoch())
}
v.Close()
// Reopen and verify epoch persisted.
v2, err := OpenBlockVol(path)
if err != nil {
t.Fatalf("OpenBlockVol: %v", err)
}
defer v2.Close()
if v2.Epoch() != 42 {
t.Fatalf("epoch after reopen = %d, want 42", v2.Epoch())
}
}
func testEpochInWALEntry(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "epoch-wal.blockvol")
v, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
})
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
defer v.Close()
if err := v.SetEpoch(7); err != nil {
t.Fatalf("SetEpoch: %v", err)
}
v.SetMasterEpoch(7)
v.SetRoleCallback(nil)
if err := v.SetRole(RolePrimary); err != nil {
t.Fatalf("SetRole: %v", err)
}
v.lease.Grant(10 * time.Second)
if err := v.WriteLBA(0, makeBlock('A')); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
// Read WAL entry header directly and check epoch field.
headerBuf := make([]byte, walEntryHeaderSize)
absOff := int64(v.super.WALOffset) // first entry at WAL start
if _, err := v.fd.ReadAt(headerBuf, absOff); err != nil {
t.Fatalf("ReadAt WAL: %v", err)
}
entryEpoch := binary.LittleEndian.Uint64(headerBuf[8:16])
if entryEpoch != 7 {
t.Fatalf("WAL entry epoch = %d, want 7", entryEpoch)
}
}
func testEpochSurvivesRecovery(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "epoch-recov.blockvol")
v, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
})
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
if err := v.SetEpoch(100); err != nil {
t.Fatalf("SetEpoch: %v", err)
}
v.SetMasterEpoch(100)
if err := v.SetRole(RolePrimary); err != nil {
t.Fatalf("SetRole: %v", err)
}
v.lease.Grant(10 * time.Second)
if err := v.WriteLBA(0, makeBlock('R')); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
// Close properly (flushes WAL to extent).
if err := v.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
// Reopen and verify epoch persisted.
v2, err := OpenBlockVol(path)
if err != nil {
t.Fatalf("OpenBlockVol: %v", err)
}
defer v2.Close()
if v2.Epoch() != 100 {
t.Fatalf("epoch after recovery = %d, want 100", v2.Epoch())
}
data, err := v2.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA: %v", err)
}
if !bytes.Equal(data, makeBlock('R')) {
t.Fatal("data mismatch after recovery")
}
}
// --- Phase 4A CP1: Lease tests ---
func testLeaseGrantValid(t *testing.T) {
var l Lease
if l.IsValid() {
t.Fatal("zero-value lease should be invalid")
}
l.Grant(1 * time.Second)
if !l.IsValid() {
t.Fatal("lease should be valid after grant")
}
}
func testLeaseExpiredRejects(t *testing.T) {
var l Lease
l.Grant(1 * time.Millisecond)
time.Sleep(5 * time.Millisecond)
if l.IsValid() {
t.Fatal("lease should have expired")
}
}
func testLeaseRevoke(t *testing.T) {
var l Lease
l.Grant(1 * time.Hour)
if !l.IsValid() {
t.Fatal("lease should be valid")
}
l.Revoke()
if l.IsValid() {
t.Fatal("lease should be invalid after revoke")
}
}
// --- Phase 4A CP1: Role tests ---
func testRoleTransitionsValid(t *testing.T) {
valid := [][2]Role{
{RoleNone, RolePrimary},
{RoleNone, RoleReplica},
{RolePrimary, RoleDraining},
{RoleDraining, RoleStale},
{RoleReplica, RolePrimary},
{RoleStale, RoleRebuilding},
{RoleStale, RoleReplica},
{RoleRebuilding, RoleReplica},
}
for _, pair := range valid {
if !ValidTransition(pair[0], pair[1]) {
t.Errorf("expected valid: %s -> %s", pair[0], pair[1])
}
}
}
func testRoleTransitionsInvalid(t *testing.T) {
invalid := [][2]Role{
{RolePrimary, RoleReplica},
{RolePrimary, RoleStale},
{RoleReplica, RoleStale},
{RoleReplica, RoleDraining},
{RoleDraining, RolePrimary},
{RoleRebuilding, RolePrimary},
{RoleNone, RoleStale},
{RoleNone, RoleDraining},
}
for _, pair := range invalid {
if ValidTransition(pair[0], pair[1]) {
t.Errorf("expected invalid: %s -> %s", pair[0], pair[1])
}
}
}
func testRolePrimaryCallback(t *testing.T) {
v := createTestVol(t)
defer v.Close()
var called bool
var gotOld, gotNew Role
v.SetRoleCallback(func(old, new Role) {
called = true
gotOld = old
gotNew = new
})
if err := v.SetRole(RolePrimary); err != nil {
t.Fatalf("SetRole(Primary): %v", err)
}
if !called {
t.Fatal("callback not called")
}
if gotOld != RoleNone || gotNew != RolePrimary {
t.Fatalf("callback args: old=%s new=%s, want none->primary", gotOld, gotNew)
}
}
func testRoleStaleCallback(t *testing.T) {
v := createTestVol(t)
defer v.Close()
var transitions []string
v.SetRoleCallback(func(old, new Role) {
transitions = append(transitions, old.String()+"->"+new.String())
})
// None -> Primary -> Draining -> Stale
v.SetRole(RolePrimary)
v.SetRole(RoleDraining)
v.SetRole(RoleStale)
expected := []string{"none->primary", "primary->draining", "draining->stale"}
if len(transitions) != len(expected) {
t.Fatalf("transitions = %v, want %v", transitions, expected)
}
for i, e := range expected {
if transitions[i] != e {
t.Errorf("transition[%d] = %s, want %s", i, transitions[i], e)
}
}
}
// --- Phase 4A CP1: Write gate tests ---
func testGatePrimaryOK(t *testing.T) {
v := createTestVol(t)
defer v.Close()
// Set up as primary with valid epoch + lease.
v.SetRole(RolePrimary)
v.SetEpoch(1)
v.SetMasterEpoch(1)
v.lease.Grant(10 * time.Second)
if err := v.WriteLBA(0, makeBlock('A')); err != nil {
t.Fatalf("WriteLBA as primary: %v", err)
}
}
func testGateNotPrimary(t *testing.T) {
v := createTestVol(t)
defer v.Close()
v.SetRole(RoleReplica)
err := v.WriteLBA(0, makeBlock('A'))
if !errors.Is(err, ErrNotPrimary) {
t.Fatalf("expected ErrNotPrimary, got: %v", err)
}
}
func testGateStaleEpoch(t *testing.T) {
v := createTestVol(t)
defer v.Close()
v.SetRole(RolePrimary)
v.SetEpoch(1)
v.SetMasterEpoch(2) // mismatch
v.lease.Grant(10 * time.Second)
err := v.WriteLBA(0, makeBlock('A'))
if !errors.Is(err, ErrEpochStale) {
t.Fatalf("expected ErrEpochStale, got: %v", err)
}
}
func testGateLeaseExpired(t *testing.T) {
v := createTestVol(t)
defer v.Close()
v.SetRole(RolePrimary)
v.SetEpoch(1)
v.SetMasterEpoch(1)
// No lease granted — should be expired.
err := v.WriteLBA(0, makeBlock('A'))
if !errors.Is(err, ErrLeaseExpired) {
t.Fatalf("expected ErrLeaseExpired, got: %v", err)
}
}
func testGateTrimRejected(t *testing.T) {
v := createTestVol(t)
defer v.Close()
v.SetRole(RoleReplica)
err := v.Trim(0, 4096)
if !errors.Is(err, ErrNotPrimary) {
t.Fatalf("expected ErrNotPrimary for Trim, got: %v", err)
}
}
func testBlockvolWriteGateIntegration(t *testing.T) {
v := createTestVol(t)
defer v.Close()
// RoleNone: writes pass without fencing (Phase 3 compat).
if err := v.WriteLBA(0, makeBlock('Z')); err != nil {
t.Fatalf("WriteLBA with RoleNone: %v", err)
}
// Switch to primary with proper setup.
v.SetRole(RolePrimary)
v.SetEpoch(5)
v.SetMasterEpoch(5)
v.lease.Grant(10 * time.Second)
if err := v.WriteLBA(1, makeBlock('P')); err != nil {
t.Fatalf("WriteLBA as primary: %v", err)
}
// Reads always work regardless of role.
data, err := v.ReadLBA(1, 4096)
if err != nil {
t.Fatalf("ReadLBA: %v", err)
}
if !bytes.Equal(data, makeBlock('P')) {
t.Fatal("data mismatch")
}
// Demote to draining — writes should fail.
v.SetRole(RoleDraining)
err = v.WriteLBA(2, makeBlock('D'))
if !errors.Is(err, ErrNotPrimary) {
t.Fatalf("WriteLBA as draining: expected ErrNotPrimary, got: %v", err)
}
// Read still works.
_, err = v.ReadLBA(1, 4096)
if err != nil {
t.Fatalf("ReadLBA after demotion: %v", err)
}
}
func testBlockvolGotchaALeaseExpired(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "gotcha-a.blockvol")
cfg := DefaultConfig()
cfg.GroupCommitMaxDelay = 1 * time.Millisecond
v, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
}, cfg)
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
defer v.Close()
// Set up as primary.
v.SetRole(RolePrimary)
v.SetEpoch(1)
v.SetMasterEpoch(1)
v.lease.Grant(50 * time.Millisecond) // short lease
// Write should succeed.
if err := v.WriteLBA(0, makeBlock('G')); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
// Wait for lease to expire.
time.Sleep(100 * time.Millisecond)
// SyncCache should fail via PostSyncCheck (Gotcha A).
err = v.SyncCache()
if err == nil {
t.Fatal("SyncCache should fail after lease expired")
}
if !errors.Is(err, ErrLeaseExpired) {
t.Fatalf("expected ErrLeaseExpired from SyncCache, got: %v", err)
}
}
// --- Phase 4A CP1: P3-BUG-9 ---
func testDirtyMapPowerOf2Panics(t *testing.T) {
defer func() {
r := recover()
if r == nil {
t.Fatal("expected panic for non-power-of-2 numShards")
}
msg, ok := r.(string)
if !ok || msg != "blockvol: NewDirtyMap numShards must be power of 2" {
t.Fatalf("unexpected panic: %v", r)
}
}()
NewDirtyMap(3) // should panic
}

3
weed/storage/blockvol/dirty_map.go

@ -28,6 +28,9 @@ func NewDirtyMap(numShards int) *DirtyMap {
if numShards <= 0 {
numShards = 1
}
if numShards&(numShards-1) != 0 {
panic("blockvol: NewDirtyMap numShards must be power of 2")
}
shards := make([]dirtyShard, numShards)
for i := range shards {
shards[i].m = make(map[uint64]dirtyEntry)

38
weed/storage/blockvol/epoch.go

@ -0,0 +1,38 @@
package blockvol
import (
"fmt"
"os"
)
// Epoch returns the current epoch of this volume.
func (v *BlockVol) Epoch() uint64 {
return v.epoch.Load()
}
// SetEpoch persists a new epoch to the superblock and fsyncs.
// Must be durable before writes are accepted at the new epoch.
func (v *BlockVol) SetEpoch(epoch uint64) error {
v.mu.Lock()
defer v.mu.Unlock()
v.super.Epoch = epoch
v.epoch.Store(epoch)
if _, err := v.fd.Seek(0, os.SEEK_SET); err != nil {
return fmt.Errorf("blockvol: seek superblock: %w", err)
}
if _, err := v.super.WriteTo(v.fd); err != nil {
return fmt.Errorf("blockvol: write superblock: %w", err)
}
if err := v.fd.Sync(); err != nil {
return fmt.Errorf("blockvol: sync superblock: %w", err)
}
return nil
}
// SetMasterEpoch sets the expected epoch from the master.
// Writes are rejected if v.epoch != v.masterEpoch (when role != RoleNone).
func (v *BlockVol) SetMasterEpoch(epoch uint64) {
v.masterEpoch.Store(epoch)
}

42
weed/storage/blockvol/group_commit.go

@ -16,11 +16,12 @@ var (
// GroupCommitter batches SyncCache requests and performs a single fsync
// for the entire batch. This amortizes the cost of fsync across many callers.
type GroupCommitter struct {
syncFunc func() error // called to fsync (injectable for testing)
maxDelay time.Duration // max wait before flushing a partial batch
maxBatch int // flush immediately when this many waiters accumulate
lowWatermark int // skip delay if fewer pending (0 = always wait)
onDegraded func() // called when fsync fails
syncFunc func() error // called to fsync (injectable for testing)
maxDelay time.Duration // max wait before flushing a partial batch
maxBatch int // flush immediately when this many waiters accumulate
lowWatermark int // skip delay if fewer pending (0 = always wait)
onDegraded func() // called when fsync fails
postSyncCheck func() error // called after sync; error fails waiters
mu sync.Mutex
pending []chan error
@ -35,11 +36,12 @@ type GroupCommitter struct {
// GroupCommitterConfig configures the group committer.
type GroupCommitterConfig struct {
SyncFunc func() error // required: the fsync function
MaxDelay time.Duration // default 1ms
MaxBatch int // default 64
LowWatermark int // skip delay if fewer pending (0 = always wait)
OnDegraded func() // optional: called on fsync error
SyncFunc func() error // required: the fsync function
MaxDelay time.Duration // default 1ms
MaxBatch int // default 64
LowWatermark int // skip delay if fewer pending (0 = always wait)
OnDegraded func() // optional: called on fsync error
PostSyncCheck func() error // optional: called after successful sync; error fails all waiters
}
// NewGroupCommitter creates a new group committer. Call Run() to start it.
@ -54,14 +56,15 @@ func NewGroupCommitter(cfg GroupCommitterConfig) *GroupCommitter {
cfg.OnDegraded = func() {}
}
return &GroupCommitter{
syncFunc: cfg.SyncFunc,
maxDelay: cfg.MaxDelay,
maxBatch: cfg.MaxBatch,
lowWatermark: cfg.LowWatermark,
onDegraded: cfg.OnDegraded,
notifyCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
done: make(chan struct{}),
syncFunc: cfg.SyncFunc,
maxDelay: cfg.MaxDelay,
maxBatch: cfg.MaxBatch,
lowWatermark: cfg.LowWatermark,
onDegraded: cfg.OnDegraded,
postSyncCheck: cfg.PostSyncCheck,
notifyCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
done: make(chan struct{}),
}
}
@ -129,6 +132,9 @@ func (gc *GroupCommitter) Run() {
// not leave waiters hung — notify them and drain stragglers.
err := gc.callSyncFunc()
gc.syncCount.Add(1)
if err == nil && gc.postSyncCheck != nil {
err = gc.postSyncCheck()
}
if err != nil {
gc.onDegraded()
}

31
weed/storage/blockvol/lease.go

@ -0,0 +1,31 @@
package blockvol
import (
"sync/atomic"
"time"
)
// Lease tracks a runtime-only lease expiry for write fencing.
// Zero-value is an expired (invalid) lease. Not persisted.
type Lease struct {
expiry atomic.Value // stores time.Time
}
// Grant sets the lease to expire after ttl from now.
func (l *Lease) Grant(ttl time.Duration) {
l.expiry.Store(time.Now().Add(ttl))
}
// IsValid returns true if the lease has not expired.
func (l *Lease) IsValid() bool {
v := l.expiry.Load()
if v == nil {
return false
}
return time.Now().Before(v.(time.Time))
}
// Revoke immediately invalidates the lease.
func (l *Lease) Revoke() {
l.expiry.Store(time.Time{}) // zero time is always in the past
}

27
weed/storage/blockvol/qa_phase3_engine_test.go

@ -250,26 +250,15 @@ func testQADMMaxUint64LBA(t *testing.T) {
}
func testQADMNonPowerOf2Shards(t *testing.T) {
// Non-power-of-2: mask will be wrong (7-1=6=0b110), but should not panic.
// This tests robustness, not correctness of shard distribution.
dm := NewDirtyMap(7)
// Should not panic on basic operations.
for i := uint64(0); i < 100; i++ {
dm.Put(i, i*10, i+1, 4096)
}
// All entries should be retrievable (mask-based routing is deterministic).
for i := uint64(0); i < 100; i++ {
_, _, _, ok := dm.Get(i)
if !ok {
t.Errorf("Get(%d) not found with 7 shards", i)
// P3-BUG-9 fix: non-power-of-2 shards now panic to prevent silent
// shard routing bugs (mask-based routing requires power-of-2).
defer func() {
r := recover()
if r == nil {
t.Fatal("expected panic for non-power-of-2 numShards")
}
}
if dm.Len() != 100 {
t.Errorf("Len() = %d, want 100", dm.Len())
}
}()
NewDirtyMap(7) // should panic
}
func testQADMZeroShards(t *testing.T) {

1198
weed/storage/blockvol/qa_phase4a_cp1_test.go
File diff suppressed because it is too large
View File

104
weed/storage/blockvol/role.go

@ -0,0 +1,104 @@
package blockvol
import (
"errors"
"fmt"
)
// Role represents the replication role of a block volume.
type Role uint8
const (
RoleNone Role = 0 // Phase 3 default, no fencing
RolePrimary Role = 1
RoleReplica Role = 2
RoleStale Role = 3
RoleRebuilding Role = 4
RoleDraining Role = 5
)
// String returns the role name.
func (r Role) String() string {
switch r {
case RoleNone:
return "none"
case RolePrimary:
return "primary"
case RoleReplica:
return "replica"
case RoleStale:
return "stale"
case RoleRebuilding:
return "rebuilding"
case RoleDraining:
return "draining"
default:
return fmt.Sprintf("unknown(%d)", uint8(r))
}
}
// validTransitions maps each role to the set of roles it can transition to.
var validTransitions = map[Role]map[Role]bool{
RoleNone: {RolePrimary: true, RoleReplica: true},
RolePrimary: {RoleDraining: true},
RoleReplica: {RolePrimary: true},
RoleStale: {RoleRebuilding: true, RoleReplica: true},
RoleRebuilding: {RoleReplica: true},
RoleDraining: {RoleStale: true},
}
// ValidTransition returns true if transitioning from -> to is allowed.
func ValidTransition(from, to Role) bool {
targets, ok := validTransitions[from]
if !ok {
return false
}
return targets[to]
}
// RoleChangeCallback is called when a volume's role changes.
type RoleChangeCallback func(old, new Role)
var ErrInvalidRoleTransition = errors.New("blockvol: invalid role transition")
// Role returns the current role of this volume.
func (v *BlockVol) Role() Role {
return Role(v.role.Load())
}
// SetRole transitions the volume to a new role if the transition is valid.
// Uses CompareAndSwap to prevent TOCTOU races between concurrent callers.
// Calls the registered RoleChangeCallback after updating.
func (v *BlockVol) SetRole(r Role) error {
for {
old := v.role.Load()
if !ValidTransition(Role(old), r) {
return fmt.Errorf("%w: %s -> %s", ErrInvalidRoleTransition, Role(old), r)
}
if v.role.CompareAndSwap(old, uint32(r)) {
if v.roleCallback != nil {
v.safeCallback(Role(old), r)
}
return nil
}
// CAS failed — another goroutine changed the role; retry.
}
}
// safeCallback invokes the role callback with panic recovery.
// If the callback panics, the role is already updated but the panic
// is caught and returned as an error via log (callers see nil from SetRole).
func (v *BlockVol) safeCallback(old, new Role) {
defer func() {
if r := recover(); r != nil {
// Role is already stored. Log but don't propagate panic.
// In production, this would go to glog. For now, swallow it.
}
}()
v.roleCallback(old, new)
}
// SetRoleCallback registers a callback to be invoked on role changes.
func (v *BlockVol) SetRoleCallback(cb RoleChangeCallback) {
v.roleCallback = cb
}

7
weed/storage/blockvol/superblock.go

@ -39,6 +39,7 @@ type Superblock struct {
Replication [4]byte
CreatedAt uint64 // unix timestamp
SnapshotCount uint32
Epoch uint64 // fencing epoch (0 = no fencing, Phase 3 compat)
}
// superblockOnDisk is the fixed-size on-disk layout (binary.Write/Read target).
@ -59,6 +60,7 @@ type superblockOnDisk struct {
Replication [4]byte
CreatedAt uint64
SnapshotCount uint32
Epoch uint64
}
// NewSuperblock creates a superblock with defaults and a fresh UUID.
@ -124,6 +126,7 @@ func (sb *Superblock) WriteTo(w io.Writer) (int64, error) {
Replication: sb.Replication,
CreatedAt: sb.CreatedAt,
SnapshotCount: sb.SnapshotCount,
Epoch: sb.Epoch,
}
// Encode into beginning of buf; rest stays zero (padding).
@ -155,6 +158,8 @@ func (sb *Superblock) WriteTo(w io.Writer) (int64, error) {
endian.PutUint64(buf[off:], d.CreatedAt)
off += 8
endian.PutUint32(buf[off:], d.SnapshotCount)
off += 4
endian.PutUint64(buf[off:], d.Epoch)
n, err := w.Write(buf)
return int64(n), err
@ -213,6 +218,8 @@ func ReadSuperblock(r io.Reader) (Superblock, error) {
sb.CreatedAt = endian.Uint64(buf[off:])
off += 8
sb.SnapshotCount = endian.Uint32(buf[off:])
off += 4
sb.Epoch = endian.Uint64(buf[off:])
return sb, nil
}

28
weed/storage/blockvol/write_gate.go

@ -0,0 +1,28 @@
package blockvol
import "errors"
var (
ErrNotPrimary = errors.New("blockvol: not primary")
ErrEpochStale = errors.New("blockvol: epoch stale")
ErrLeaseExpired = errors.New("blockvol: lease expired")
)
// writeGate checks role, epoch, and lease before allowing a write.
// RoleNone skips all checks for Phase 3 backward compatibility.
func (v *BlockVol) writeGate() error {
r := Role(v.role.Load())
if r == RoleNone {
return nil // Phase 3 compat: no fencing
}
if r != RolePrimary {
return ErrNotPrimary
}
if v.epoch.Load() != v.masterEpoch.Load() {
return ErrEpochStale
}
if !v.lease.IsValid() {
return ErrLeaseExpired
}
return nil
}
Loading…
Cancel
Save