Browse Source

feat: Phase 4A CP3 -- promotion, rebuild, split-brain prevention

Add master-driven lifecycle operations: promotion, demotion, rebuild,
and split-brain prevention. All testable on Windows with mock TCP.

New files:
- promotion.go: HandleAssignment (single entry point for role changes),
  promote (Replica/None -> Primary with durable epoch), demote
  (Primary -> Draining -> Stale with drain timeout)
- rebuild.go: RebuildServer (WAL catch-up + full extent streaming),
  StartRebuild client (WAL catch-up with full extent fallback,
  two-phase rebuild with second catch-up for concurrent writes)

Modified:
- wal_writer.go: ScanFrom() method, ErrWALRecycled sentinel
- repl_proto.go: rebuild message types + RebuildRequest encode/decode
- blockvol.go: assignMu, drainTimeout, rebuildServer fields;
  HandleAssignment/StartRebuildServer/StopRebuildServer methods;
  rebuild server stop in Close()
- dirty_map.go: Clear() method for full extent rebuild

32 new tests covering WAL scan, promotion/demotion, rebuild server,
rebuild client, split-brain prevention, and full lifecycle scenarios.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
b31383e294
  1. 33
      weed/storage/blockvol/blockvol.go
  2. 916
      weed/storage/blockvol/blockvol_test.go
  3. 10
      weed/storage/blockvol/dirty_map.go
  4. 114
      weed/storage/blockvol/promotion.go
  5. 438
      weed/storage/blockvol/rebuild.go
  6. 43
      weed/storage/blockvol/repl_proto.go
  7. 92
      weed/storage/blockvol/wal_writer.go

33
weed/storage/blockvol/blockvol.go

@ -53,6 +53,11 @@ type BlockVol struct {
lease Lease
role atomic.Uint32
roleCallback RoleChangeCallback
// Promotion/rebuild fields (Phase 4A CP3).
rebuildServer *RebuildServer
assignMu sync.Mutex // serializes HandleAssignment calls
drainTimeout time.Duration // default 10s, for demote drain
}
// CreateBlockVol creates a new block volume file at path.
@ -547,6 +552,30 @@ func (v *BlockVol) StartReplicaReceiver(dataAddr, ctrlAddr string) error {
return nil
}
// HandleAssignment processes a role/epoch/lease assignment from master.
func (v *BlockVol) HandleAssignment(epoch uint64, role Role, leaseTTL time.Duration) error {
return HandleAssignment(v, epoch, role, leaseTTL)
}
// StartRebuildServer creates and starts a rebuild server on the given address.
func (v *BlockVol) StartRebuildServer(addr string) error {
srv, err := NewRebuildServer(v, addr)
if err != nil {
return err
}
v.rebuildServer = srv
srv.Serve()
return nil
}
// StopRebuildServer stops the rebuild server if running.
func (v *BlockVol) StopRebuildServer() {
if v.rebuildServer != nil {
v.rebuildServer.Stop()
v.rebuildServer = nil
}
}
// degradeReplica marks the shipper as degraded and logs a warning.
func (v *BlockVol) degradeReplica(err error) {
if v.shipper != nil {
@ -556,7 +585,7 @@ func (v *BlockVol) degradeReplica(err error) {
}
// Close shuts down the block volume and closes the file.
// Shutdown order: shipper → replica receiver → drain ops → group committer → flusher → final flush → close fd.
// Shutdown order: shipper -> replica receiver -> rebuild server -> drain ops -> group committer -> flusher -> final flush -> close fd.
func (v *BlockVol) Close() error {
v.closed.Store(true)
@ -568,6 +597,8 @@ func (v *BlockVol) Close() error {
if v.replRecv != nil {
v.replRecv.Stop()
}
// Stop rebuild server.
v.StopRebuildServer()
// Drain in-flight ops: beginOp checks closed and returns ErrVolumeClosed,
// so no new ops can start. Wait for existing ones to finish (max 5s).

916
weed/storage/blockvol/blockvol_test.go

@ -127,6 +127,42 @@ func TestBlockVol(t *testing.T) {
{name: "replica_reject_lsn_gap", run: testReplicaRejectLSNGap},
{name: "barrier_fsync_failed_status", run: testBarrierFsyncFailedStatus},
{name: "barrier_configurable_timeout", run: testBarrierConfigurableTimeout},
// Phase 4A CP3: WAL scanner.
{name: "wal_scan_from_middle", run: testWALScanFromMiddle},
{name: "wal_scan_empty", run: testWALScanEmpty},
{name: "wal_scan_recycled", run: testWALScanRecycled},
{name: "wal_scan_wrap_padding", run: testWALScanWrapPadding},
{name: "wal_scan_entry_crosses_end", run: testWALScanEntryCrossesEnd},
// Phase 4A CP3: Promotion + Demotion.
{name: "promote_replica_to_primary", run: testPromoteReplicaToPrimary},
{name: "promote_rejects_non_replica", run: testPromoteRejectsNonReplica},
{name: "demote_primary_to_stale", run: testDemotePrimaryToStale},
{name: "demote_drains_inflight_ops", run: testDemoteDrainsInflightOps},
{name: "demote_stops_shipper", run: testDemoteStopsShipper},
{name: "assignment_refresh_lease", run: testAssignmentRefreshLease},
{name: "assignment_invalid_transition", run: testAssignmentInvalidTransition},
// Phase 4A CP3: Rebuild protocol types.
{name: "rebuild_request_roundtrip", run: testRebuildRequestRoundtrip},
// Phase 4A CP3: Rebuild server.
{name: "rebuild_server_wal_catchup", run: testRebuildServerWALCatchUp},
{name: "rebuild_server_wal_recycled", run: testRebuildServerWALRecycled},
{name: "rebuild_server_full_extent", run: testRebuildServerFullExtent},
{name: "rebuild_server_epoch_mismatch", run: testRebuildServerEpochMismatch},
// Phase 4A CP3: Rebuild client.
{name: "rebuild_wal_catchup_happy", run: testRebuildWALCatchUpHappy},
{name: "rebuild_wal_catchup_to_replica", run: testRebuildWALCatchUpToReplica},
{name: "rebuild_fallback_full_extent", run: testRebuildFallbackFullExtent},
{name: "rebuild_full_extent_data_correct", run: testRebuildFullExtentDataCorrect},
{name: "rebuild_full_extent_resets_dirty_map", run: testRebuildFullExtentResetsDirtyMap},
// Phase 4A CP3: Split-brain tests.
{name: "split_brain_dead_zone", run: testSplitBrainDeadZone},
{name: "split_brain_stale_primary_fenced", run: testSplitBrainStalePrimaryFenced},
{name: "split_brain_epoch_rejects_stale_write", run: testSplitBrainEpochRejectsStaleWrite},
{name: "split_brain_no_self_promotion", run: testSplitBrainNoSelfPromotion},
{name: "split_brain_concurrent_assignment", run: testSplitBrainConcurrentAssignment},
// Phase 4A CP3: Lifecycle tests.
{name: "blockvol_full_lifecycle", run: testBlockvolFullLifecycle},
{name: "blockvol_rebuild_lifecycle", run: testBlockvolRebuildLifecycle},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -3212,6 +3248,886 @@ func testBarrierConfigurableTimeout(t *testing.T) {
}
}
// ---------------------------------------------------------------------------
// Phase 4A CP3: WAL Scanner tests
// ---------------------------------------------------------------------------
func testWALScanFromMiddle(t *testing.T) {
v := createTestVol(t)
defer v.Close()
// Write 10 entries.
for i := 0; i < 10; i++ {
data := makeBlock(byte('A' + i))
if err := v.WriteLBA(uint64(i), data); err != nil {
t.Fatalf("WriteLBA(%d): %v", i, err)
}
}
// Scan from LSN=5 (0-indexed: LSN 1..10, so fromLSN=5 gets LSN 5..10 = 6 entries).
var scanned []uint64
err := v.wal.ScanFrom(v.fd, v.super.WALOffset, 0, 5, func(e *WALEntry) error {
scanned = append(scanned, e.LSN)
return nil
})
if err != nil {
t.Fatalf("ScanFrom: %v", err)
}
if len(scanned) != 6 {
t.Fatalf("expected 6 entries, got %d: %v", len(scanned), scanned)
}
if scanned[0] != 5 || scanned[5] != 10 {
t.Errorf("expected LSN range [5..10], got %v", scanned)
}
}
func testWALScanEmpty(t *testing.T) {
v := createTestVol(t)
defer v.Close()
var count int
err := v.wal.ScanFrom(v.fd, v.super.WALOffset, 0, 1, func(e *WALEntry) error {
count++
return nil
})
if err != nil {
t.Fatalf("ScanFrom: %v", err)
}
if count != 0 {
t.Errorf("expected 0 entries on empty WAL, got %d", count)
}
}
func testWALScanRecycled(t *testing.T) {
v := createTestVol(t)
defer v.Close()
// Write some entries.
for i := 0; i < 5; i++ {
v.WriteLBA(uint64(i), makeBlock(byte('A'+i)))
}
// Simulate checkpointLSN=3 (entries 1-3 flushed).
err := v.wal.ScanFrom(v.fd, v.super.WALOffset, 3, 2, func(e *WALEntry) error {
return nil
})
if !errors.Is(err, ErrWALRecycled) {
t.Fatalf("expected ErrWALRecycled, got %v", err)
}
}
func testWALScanWrapPadding(t *testing.T) {
// Use a small WAL that will wrap.
dir := t.TempDir()
path := filepath.Join(dir, "test.blockvol")
v, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 64 * 1024, // 64KB
BlockSize: 4096,
WALSize: 4096 * 4, // 16KB WAL — very small, forces wrapping
})
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
defer v.Close()
// Write entries until we wrap. WAL entry for 4KB write = 38 + 4096 = 4134 bytes.
// 16KB WAL can hold ~3 entries before wrapping. Write 2, flush, write 2 more.
v.WriteLBA(0, makeBlock('A'))
v.WriteLBA(1, makeBlock('B'))
// Force flush to free WAL space.
v.flusher.FlushOnce()
// Write more to trigger wrap.
v.WriteLBA(2, makeBlock('C'))
v.WriteLBA(3, makeBlock('D'))
// Scan all entries from LSN=1. We should get whatever is still in WAL.
var scanned []uint64
checkpointLSN := v.flusher.CheckpointLSN()
err = v.wal.ScanFrom(v.fd, v.super.WALOffset, checkpointLSN, checkpointLSN+1, func(e *WALEntry) error {
scanned = append(scanned, e.LSN)
return nil
})
if err != nil {
t.Fatalf("ScanFrom with wrap: %v", err)
}
if len(scanned) == 0 {
t.Fatal("expected entries after wrap, got 0")
}
}
func testWALScanEntryCrossesEnd(t *testing.T) {
// Similar to wrap padding — an entry that would span WAL end triggers padding.
dir := t.TempDir()
path := filepath.Join(dir, "test.blockvol")
v, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 64 * 1024,
BlockSize: 4096,
WALSize: 4096 * 5, // 20KB WAL
})
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
defer v.Close()
// Write 3 entries, flush 2, write 2 more (forces padding + wrap).
v.WriteLBA(0, makeBlock('A'))
v.WriteLBA(1, makeBlock('B'))
v.WriteLBA(2, makeBlock('C'))
v.flusher.FlushOnce()
v.WriteLBA(3, makeBlock('D'))
v.WriteLBA(4, makeBlock('E'))
checkpointLSN := v.flusher.CheckpointLSN()
var scanned []uint64
err = v.wal.ScanFrom(v.fd, v.super.WALOffset, checkpointLSN, checkpointLSN+1, func(e *WALEntry) error {
scanned = append(scanned, e.LSN)
return nil
})
if err != nil {
t.Fatalf("ScanFrom: %v", err)
}
if len(scanned) == 0 {
t.Fatal("expected entries after padding/wrap, got 0")
}
}
// ---------------------------------------------------------------------------
// Phase 4A CP3: Promotion + Demotion tests
// ---------------------------------------------------------------------------
// setupPrimary creates a volume and promotes it to Primary.
func setupPrimary(t *testing.T) *BlockVol {
t.Helper()
v := createTestVol(t)
if err := v.HandleAssignment(1, RolePrimary, 30*time.Second); err != nil {
t.Fatalf("promote to primary: %v", err)
}
return v
}
// setupReplica creates a volume and sets it to Replica role.
func setupReplica(t *testing.T) *BlockVol {
t.Helper()
v := createTestVol(t)
if err := v.HandleAssignment(1, RoleReplica, 0); err != nil {
t.Fatalf("set replica: %v", err)
}
return v
}
func testPromoteReplicaToPrimary(t *testing.T) {
v := setupReplica(t)
defer v.Close()
if err := v.HandleAssignment(2, RolePrimary, 30*time.Second); err != nil {
t.Fatalf("promote: %v", err)
}
if v.Role() != RolePrimary {
t.Errorf("role: got %s, want Primary", v.Role())
}
if v.Epoch() != 2 {
t.Errorf("epoch: got %d, want 2", v.Epoch())
}
// Writes should succeed.
if err := v.WriteLBA(0, makeBlock('A')); err != nil {
t.Errorf("write after promote: %v", err)
}
}
func testPromoteRejectsNonReplica(t *testing.T) {
v := setupPrimary(t)
defer v.Close()
// Primary can't be promoted again.
err := v.HandleAssignment(3, RolePrimary, 30*time.Second)
if err != nil {
t.Errorf("same-role assignment should be a lease refresh, got error: %v", err)
}
// Stale can't be promoted to Primary directly.
v2 := createTestVol(t)
defer v2.Close()
v2.HandleAssignment(1, RolePrimary, 30*time.Second)
v2.HandleAssignment(2, RoleStale, 0)
err = v2.HandleAssignment(3, RolePrimary, 30*time.Second)
if !errors.Is(err, ErrInvalidAssignment) {
t.Errorf("expected ErrInvalidAssignment for Stale→Primary, got: %v", err)
}
}
func testDemotePrimaryToStale(t *testing.T) {
v := setupPrimary(t)
defer v.Close()
if err := v.HandleAssignment(2, RoleStale, 0); err != nil {
t.Fatalf("demote: %v", err)
}
if v.Role() != RoleStale {
t.Errorf("role: got %s, want Stale", v.Role())
}
if v.Epoch() != 2 {
t.Errorf("epoch: got %d, want 2", v.Epoch())
}
// Writes should fail.
err := v.WriteLBA(0, makeBlock('A'))
if err == nil {
t.Error("expected write to fail after demotion")
}
}
func testDemoteDrainsInflightOps(t *testing.T) {
v := setupPrimary(t)
defer v.Close()
// Start a write that will hold an op outstanding.
var wg sync.WaitGroup
started := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
v.beginOp()
close(started)
// Hold the op for a bit.
time.Sleep(50 * time.Millisecond)
v.endOp()
}()
<-started
// Demote should wait for the op to complete.
v.drainTimeout = 2 * time.Second
errCh := make(chan error, 1)
go func() {
errCh <- v.HandleAssignment(2, RoleStale, 0)
}()
wg.Wait()
err := <-errCh
if err != nil {
t.Fatalf("demote: %v", err)
}
if v.Role() != RoleStale {
t.Errorf("role: got %s, want Stale", v.Role())
}
}
func testDemoteStopsShipper(t *testing.T) {
v := setupPrimary(t)
defer v.Close()
// Create a shipper (won't connect but that's fine for this test).
v.shipper = NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 {
return v.epoch.Load()
})
if err := v.HandleAssignment(2, RoleStale, 0); err != nil {
t.Fatalf("demote: %v", err)
}
if !v.shipper.stopped.Load() {
t.Error("shipper should be stopped after demotion")
}
}
func testAssignmentRefreshLease(t *testing.T) {
v := setupPrimary(t)
defer v.Close()
// Same role + same epoch -> refresh lease.
if err := v.HandleAssignment(1, RolePrimary, 1*time.Hour); err != nil {
t.Fatalf("refresh: %v", err)
}
if v.Role() != RolePrimary {
t.Errorf("role: got %s, want Primary", v.Role())
}
if !v.lease.IsValid() {
t.Error("lease should be valid after refresh")
}
// Same role + bumped epoch -> epoch updated, writes still work.
if err := v.HandleAssignment(5, RolePrimary, 1*time.Hour); err != nil {
t.Fatalf("refresh with epoch bump: %v", err)
}
if v.Epoch() != 5 {
t.Errorf("epoch after bump: got %d, want 5", v.Epoch())
}
if err := v.WriteLBA(0, makeBlock('X')); err != nil {
t.Errorf("write after epoch bump refresh: %v", err)
}
}
func testAssignmentInvalidTransition(t *testing.T) {
v := setupReplica(t)
defer v.Close()
// Replica → Stale is invalid.
err := v.HandleAssignment(2, RoleStale, 0)
if !errors.Is(err, ErrInvalidAssignment) {
t.Errorf("expected ErrInvalidAssignment, got: %v", err)
}
}
// ---------------------------------------------------------------------------
// Phase 4A CP3: Rebuild protocol roundtrip
// ---------------------------------------------------------------------------
func testRebuildRequestRoundtrip(t *testing.T) {
req := RebuildRequest{
Type: RebuildWALCatchUp,
FromLSN: 42,
Epoch: 7,
}
buf := EncodeRebuildRequest(req)
decoded, err := DecodeRebuildRequest(buf)
if err != nil {
t.Fatal(err)
}
if decoded.Type != req.Type || decoded.FromLSN != req.FromLSN || decoded.Epoch != req.Epoch {
t.Errorf("roundtrip mismatch: got %+v, want %+v", decoded, req)
}
}
// ---------------------------------------------------------------------------
// Phase 4A CP3: Rebuild Server tests
// ---------------------------------------------------------------------------
func testRebuildServerWALCatchUp(t *testing.T) {
v := setupPrimary(t)
defer v.Close()
// Write some data.
for i := 0; i < 5; i++ {
v.WriteLBA(uint64(i), makeBlock(byte('A'+i)))
}
srv, err := NewRebuildServer(v, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
srv.Serve()
defer srv.Stop()
// Connect and request catch-up from LSN=1.
conn, err := net.Dial("tcp", srv.Addr())
if err != nil {
t.Fatal(err)
}
defer conn.Close()
req := RebuildRequest{Type: RebuildWALCatchUp, FromLSN: 1, Epoch: v.Epoch()}
WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req))
var entries int
for {
msgType, payload, err := ReadFrame(conn)
if err != nil {
t.Fatal(err)
}
if msgType == MsgRebuildDone {
break
}
if msgType == MsgRebuildEntry {
_, decErr := DecodeWALEntry(payload)
if decErr != nil {
t.Fatalf("decode entry: %v", decErr)
}
entries++
}
if msgType == MsgRebuildError {
t.Fatalf("server error: %s", string(payload))
}
}
if entries != 5 {
t.Errorf("expected 5 entries, got %d", entries)
}
}
func testRebuildServerWALRecycled(t *testing.T) {
v := setupPrimary(t)
defer v.Close()
// Write and flush to advance checkpoint.
for i := 0; i < 5; i++ {
v.WriteLBA(uint64(i), makeBlock(byte('A'+i)))
}
v.flusher.FlushOnce()
srv, err := NewRebuildServer(v, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
srv.Serve()
defer srv.Stop()
conn, err := net.Dial("tcp", srv.Addr())
if err != nil {
t.Fatal(err)
}
defer conn.Close()
// Request from LSN=1, but checkpoint is past that → WAL_RECYCLED.
req := RebuildRequest{Type: RebuildWALCatchUp, FromLSN: 1, Epoch: v.Epoch()}
WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req))
msgType, payload, err := ReadFrame(conn)
if err != nil {
t.Fatal(err)
}
if msgType != MsgRebuildError {
t.Fatalf("expected MsgRebuildError, got 0x%02x", msgType)
}
if string(payload) != "WAL_RECYCLED" {
t.Errorf("expected WAL_RECYCLED error, got: %s", string(payload))
}
}
func testRebuildServerFullExtent(t *testing.T) {
v := setupPrimary(t)
defer v.Close()
// Write and flush so data is in extent.
v.WriteLBA(0, makeBlock('X'))
v.flusher.FlushOnce()
srv, err := NewRebuildServer(v, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
srv.Serve()
defer srv.Stop()
conn, err := net.Dial("tcp", srv.Addr())
if err != nil {
t.Fatal(err)
}
defer conn.Close()
req := RebuildRequest{Type: RebuildFullExtent, Epoch: v.Epoch()}
WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req))
var totalBytes int
for {
msgType, payload, err := ReadFrame(conn)
if err != nil {
t.Fatal(err)
}
if msgType == MsgRebuildDone {
break
}
if msgType == MsgRebuildExtent {
totalBytes += len(payload)
}
if msgType == MsgRebuildError {
t.Fatalf("server error: %s", string(payload))
}
}
if uint64(totalBytes) != v.super.VolumeSize {
t.Errorf("expected %d bytes, got %d", v.super.VolumeSize, totalBytes)
}
}
func testRebuildServerEpochMismatch(t *testing.T) {
v := setupPrimary(t)
defer v.Close()
srv, err := NewRebuildServer(v, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
srv.Serve()
defer srv.Stop()
conn, err := net.Dial("tcp", srv.Addr())
if err != nil {
t.Fatal(err)
}
defer conn.Close()
// Wrong epoch.
req := RebuildRequest{Type: RebuildWALCatchUp, FromLSN: 1, Epoch: 999}
WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req))
msgType, payload, err := ReadFrame(conn)
if err != nil {
t.Fatal(err)
}
if msgType != MsgRebuildError {
t.Fatalf("expected MsgRebuildError, got 0x%02x", msgType)
}
if string(payload) != "EPOCH_MISMATCH" {
t.Errorf("expected EPOCH_MISMATCH, got: %s", string(payload))
}
}
// ---------------------------------------------------------------------------
// Phase 4A CP3: Rebuild Client tests
// ---------------------------------------------------------------------------
// setupRebuilding creates a volume in RoleRebuilding state with the given epoch.
func setupRebuilding(t *testing.T, epoch uint64) *BlockVol {
t.Helper()
v := createTestVol(t)
// Path: None → Primary → Stale → Rebuilding
if err := v.HandleAssignment(epoch, RolePrimary, 30*time.Second); err != nil {
t.Fatalf("setup rebuilding: promote: %v", err)
}
if err := v.HandleAssignment(epoch, RoleStale, 0); err != nil {
t.Fatalf("setup rebuilding: demote: %v", err)
}
if err := v.HandleAssignment(epoch, RoleRebuilding, 0); err != nil {
t.Fatalf("setup rebuilding: set rebuilding: %v", err)
}
return v
}
func testRebuildWALCatchUpHappy(t *testing.T) {
primary := setupPrimary(t)
defer primary.Close()
for i := 0; i < 5; i++ {
primary.WriteLBA(uint64(i), makeBlock(byte('A'+i)))
}
srv, err := NewRebuildServer(primary, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
srv.Serve()
defer srv.Stop()
stale := setupRebuilding(t, primary.Epoch())
defer stale.Close()
if err := StartRebuild(stale, srv.Addr(), 1, primary.Epoch()); err != nil {
t.Fatalf("StartRebuild: %v", err)
}
if stale.Role() != RoleReplica {
t.Errorf("role after rebuild: got %s, want Replica", stale.Role())
}
}
func testRebuildWALCatchUpToReplica(t *testing.T) {
primary := setupPrimary(t)
defer primary.Close()
primary.WriteLBA(0, makeBlock('Z'))
srv, err := NewRebuildServer(primary, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
srv.Serve()
defer srv.Stop()
stale := setupRebuilding(t, primary.Epoch())
defer stale.Close()
if err := StartRebuild(stale, srv.Addr(), 1, primary.Epoch()); err != nil {
t.Fatalf("StartRebuild: %v", err)
}
// After rebuild, reads should work.
data, err := stale.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA: %v", err)
}
if data[0] != 'Z' {
t.Errorf("data mismatch: got %c, want Z", data[0])
}
}
func testRebuildFallbackFullExtent(t *testing.T) {
primary := setupPrimary(t)
defer primary.Close()
// Write and flush so WAL is recycled.
primary.WriteLBA(0, makeBlock('M'))
primary.flusher.FlushOnce()
srv, err := NewRebuildServer(primary, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
srv.Serve()
defer srv.Stop()
stale := setupRebuilding(t, primary.Epoch())
defer stale.Close()
// Request catch-up from LSN=1, which is recycled → falls back to full extent.
if err := StartRebuild(stale, srv.Addr(), 1, primary.Epoch()); err != nil {
t.Fatalf("StartRebuild (fallback): %v", err)
}
if stale.Role() != RoleReplica {
t.Errorf("role: got %s, want Replica", stale.Role())
}
// Verify data matches.
data, err := stale.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA: %v", err)
}
if data[0] != 'M' {
t.Errorf("data mismatch: got %c, want M", data[0])
}
}
func testRebuildFullExtentDataCorrect(t *testing.T) {
primary := setupPrimary(t)
defer primary.Close()
// Write several blocks and flush.
for i := 0; i < 10; i++ {
primary.WriteLBA(uint64(i), makeBlock(byte('0'+i)))
}
primary.flusher.FlushOnce()
srv, err := NewRebuildServer(primary, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
srv.Serve()
defer srv.Stop()
stale := setupRebuilding(t, primary.Epoch())
defer stale.Close()
// Trigger full extent (LSN=1 recycled after flush).
if err := StartRebuild(stale, srv.Addr(), 1, primary.Epoch()); err != nil {
t.Fatalf("StartRebuild: %v", err)
}
// Verify all blocks.
for i := 0; i < 10; i++ {
data, err := stale.ReadLBA(uint64(i), 4096)
if err != nil {
t.Fatalf("ReadLBA(%d): %v", i, err)
}
if data[0] != byte('0'+i) {
t.Errorf("block %d: got %c, want %c", i, data[0], byte('0'+i))
}
}
}
func testRebuildFullExtentResetsDirtyMap(t *testing.T) {
primary := setupPrimary(t)
defer primary.Close()
primary.WriteLBA(0, makeBlock('A'))
primary.flusher.FlushOnce()
srv, err := NewRebuildServer(primary, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
srv.Serve()
defer srv.Stop()
stale := setupRebuilding(t, primary.Epoch())
defer stale.Close()
// Write some data directly to WAL to create dirty map entries on the stale volume.
// (Can't use WriteLBA since role is Rebuilding, not Primary.)
entry := &WALEntry{LSN: 100, Epoch: primary.Epoch(), Type: EntryTypeWrite, LBA: 5, Length: 4096, Data: makeBlock('Z')}
walOff, _ := stale.wal.Append(entry)
stale.dirtyMap.Put(5, walOff, 100, 4096)
if stale.dirtyMap.Len() == 0 {
t.Fatal("expected dirty entries before rebuild")
}
if err := StartRebuild(stale, srv.Addr(), 1, primary.Epoch()); err != nil {
t.Fatalf("StartRebuild: %v", err)
}
// After full extent rebuild, dirty map should be cleared.
if stale.dirtyMap.Len() != 0 {
t.Errorf("dirty map should be empty after full extent rebuild, got %d entries", stale.dirtyMap.Len())
}
}
// ---------------------------------------------------------------------------
// Phase 4A CP3: Split-brain tests
// ---------------------------------------------------------------------------
func testSplitBrainDeadZone(t *testing.T) {
// After demote (old primary), before promote (new primary) — no node accepts writes.
oldPrimary := setupPrimary(t)
defer oldPrimary.Close()
newReplica := setupReplica(t)
defer newReplica.Close()
// Demote old primary.
if err := oldPrimary.HandleAssignment(2, RoleStale, 0); err != nil {
t.Fatalf("demote: %v", err)
}
// Old primary can't write.
if err := oldPrimary.WriteLBA(0, makeBlock('A')); err == nil {
t.Error("old primary should reject writes after demotion")
}
// New replica hasn't been promoted yet — can't write.
if err := newReplica.WriteLBA(0, makeBlock('B')); err == nil {
t.Error("replica should reject writes before promotion")
}
}
func testSplitBrainStalePrimaryFenced(t *testing.T) {
v := setupPrimary(t)
defer v.Close()
// Let lease expire.
v.lease.Grant(1 * time.Millisecond)
time.Sleep(5 * time.Millisecond)
err := v.WriteLBA(0, makeBlock('A'))
if !errors.Is(err, ErrLeaseExpired) {
t.Errorf("expected ErrLeaseExpired, got: %v", err)
}
}
func testSplitBrainEpochRejectsStaleWrite(t *testing.T) {
v := setupPrimary(t)
defer v.Close()
// Simulate master bumping epoch without this node knowing.
v.masterEpoch.Store(99)
err := v.WriteLBA(0, makeBlock('A'))
if !errors.Is(err, ErrEpochStale) {
t.Errorf("expected ErrEpochStale, got: %v", err)
}
}
func testSplitBrainNoSelfPromotion(t *testing.T) {
v := createTestVol(t)
defer v.Close()
// Set to Replica.
v.HandleAssignment(1, RoleReplica, 0)
// Try direct SetRole without going through HandleAssignment.
// This should work because SetRole itself is valid (Replica→Primary),
// but without setting epoch/lease, writes will fail.
if err := v.SetRole(RolePrimary); err != nil {
t.Fatalf("SetRole: %v", err)
}
// Writes fail because epoch/masterEpoch mismatch (self-promotion
// didn't set masterEpoch).
err := v.WriteLBA(0, makeBlock('A'))
if err == nil {
t.Error("self-promotion without proper assignment should fail writes")
}
}
func testSplitBrainConcurrentAssignment(t *testing.T) {
v := createTestVol(t)
defer v.Close()
// Set up as Replica first.
v.HandleAssignment(1, RoleReplica, 0)
// Concurrent assignment attempts: promote + invalid.
var wg sync.WaitGroup
results := make([]error, 2)
wg.Add(2)
go func() {
defer wg.Done()
results[0] = v.HandleAssignment(2, RolePrimary, 30*time.Second)
}()
go func() {
defer wg.Done()
results[1] = v.HandleAssignment(3, RolePrimary, 30*time.Second)
}()
wg.Wait()
// With assignMu serialization, one should succeed and the other
// should either succeed (refresh on already-promoted) or fail.
// The key guarantee is no panic and consistent state.
if v.Role() != RolePrimary {
t.Errorf("role: got %s, want Primary after concurrent assignments", v.Role())
}
}
// ---------------------------------------------------------------------------
// Phase 4A CP3: Lifecycle tests
// ---------------------------------------------------------------------------
func testBlockvolFullLifecycle(t *testing.T) {
// Primary writes, promotes replica, demotes old primary, new primary serves writes.
primary := setupPrimary(t)
defer primary.Close()
replica := setupReplica(t)
defer replica.Close()
// Primary writes.
if err := primary.WriteLBA(0, makeBlock('A')); err != nil {
t.Fatalf("primary write: %v", err)
}
// Demote old primary.
if err := primary.HandleAssignment(2, RoleStale, 0); err != nil {
t.Fatalf("demote: %v", err)
}
// Promote replica.
if err := replica.HandleAssignment(2, RolePrimary, 30*time.Second); err != nil {
t.Fatalf("promote replica: %v", err)
}
// New primary can write.
if err := replica.WriteLBA(1, makeBlock('B')); err != nil {
t.Fatalf("new primary write: %v", err)
}
// Old primary can't write.
if err := primary.WriteLBA(2, makeBlock('C')); err == nil {
t.Error("old primary should reject writes")
}
}
func testBlockvolRebuildLifecycle(t *testing.T) {
primary := setupPrimary(t)
defer primary.Close()
// Write data.
primary.WriteLBA(0, makeBlock('R'))
primary.WriteLBA(1, makeBlock('S'))
srv, err := NewRebuildServer(primary, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
srv.Serve()
defer srv.Stop()
// Create stale and rebuild.
stale := setupRebuilding(t, primary.Epoch())
defer stale.Close()
if err := StartRebuild(stale, srv.Addr(), 1, primary.Epoch()); err != nil {
t.Fatalf("rebuild: %v", err)
}
// Verify data.
data, err := stale.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA(0): %v", err)
}
if data[0] != 'R' {
t.Errorf("block 0: got %c, want R", data[0])
}
data, err = stale.ReadLBA(1, 4096)
if err != nil {
t.Fatalf("ReadLBA(1): %v", err)
}
if data[0] != 'S' {
t.Errorf("block 1: got %c, want S", data[0])
}
}
// Suppress unused import warnings.
var _ = fmt.Sprintf
var _ io.Reader

10
weed/storage/blockvol/dirty_map.go

@ -105,6 +105,16 @@ func (d *DirtyMap) Range(start uint64, count uint32, fn func(lba, walOffset, lsn
}
}
// Clear removes all entries from the dirty map.
func (d *DirtyMap) Clear() {
for i := range d.shards {
s := &d.shards[i]
s.mu.Lock()
s.m = make(map[uint64]dirtyEntry)
s.mu.Unlock()
}
}
// Len returns the number of dirty entries across all shards.
func (d *DirtyMap) Len() int {
n := 0

114
weed/storage/blockvol/promotion.go

@ -0,0 +1,114 @@
package blockvol
import (
"errors"
"fmt"
"time"
)
var (
ErrInvalidAssignment = errors.New("blockvol: invalid assignment transition")
ErrDrainTimeout = errors.New("blockvol: drain timeout waiting for in-flight ops")
)
const defaultDrainTimeout = 10 * time.Second
// HandleAssignment processes a role/epoch/lease assignment from master.
// Serialized with vol.assignMu to prevent concurrent assignment races.
func HandleAssignment(vol *BlockVol, newEpoch uint64, newRole Role, leaseTTL time.Duration) error {
vol.assignMu.Lock()
defer vol.assignMu.Unlock()
current := vol.Role()
// Same role -> refresh lease and update epoch if bumped.
if current == newRole {
if newEpoch > vol.Epoch() {
if err := vol.SetEpoch(newEpoch); err != nil {
return fmt.Errorf("assignment refresh: set epoch: %w", err)
}
vol.SetMasterEpoch(newEpoch)
}
if current == RolePrimary {
vol.lease.Grant(leaseTTL)
}
return nil
}
switch {
case current == RoleReplica && newRole == RolePrimary:
return promote(vol, newEpoch, leaseTTL)
case current == RolePrimary && newRole == RoleStale:
return demote(vol, newEpoch)
case current == RoleStale && newRole == RoleRebuilding:
// Rebuild started externally via StartRebuild.
return vol.SetRole(RoleRebuilding)
case current == RoleNone && newRole == RolePrimary:
return promote(vol, newEpoch, leaseTTL)
case current == RoleNone && newRole == RoleReplica:
vol.SetMasterEpoch(newEpoch)
return vol.SetRole(RoleReplica)
default:
return fmt.Errorf("%w: %s -> %s", ErrInvalidAssignment, current, newRole)
}
}
// promote transitions Replica/None -> Primary.
// Order matters: epoch durable BEFORE writes possible.
func promote(vol *BlockVol, newEpoch uint64, leaseTTL time.Duration) error {
if err := vol.SetEpoch(newEpoch); err != nil {
return fmt.Errorf("promote: set epoch: %w", err)
}
vol.SetMasterEpoch(newEpoch)
if err := vol.SetRole(RolePrimary); err != nil {
return fmt.Errorf("promote: set role: %w", err)
}
vol.lease.Grant(leaseTTL)
return nil
}
// demote transitions Primary -> Draining -> Stale.
// Revokes lease first, drains in-flight ops, then persists new epoch.
func demote(vol *BlockVol, newEpoch uint64) error {
// Revoke lease — writeGate blocks new writes immediately.
vol.lease.Revoke()
// Transition to Draining.
if err := vol.SetRole(RoleDraining); err != nil {
return fmt.Errorf("demote: set draining: %w", err)
}
// Wait for in-flight ops to drain.
drainTTL := vol.drainTimeout
if drainTTL == 0 {
drainTTL = defaultDrainTimeout
}
deadline := time.NewTimer(drainTTL)
defer deadline.Stop()
ticker := time.NewTicker(1 * time.Millisecond)
defer ticker.Stop()
for vol.opsOutstanding.Load() > 0 {
select {
case <-deadline.C:
return ErrDrainTimeout
case <-ticker.C:
}
}
// Stop shipper if present.
if vol.shipper != nil {
vol.shipper.Stop()
}
// Transition Draining -> Stale.
if err := vol.SetRole(RoleStale); err != nil {
return fmt.Errorf("demote: set stale: %w", err)
}
// Persist new epoch.
if err := vol.SetEpoch(newEpoch); err != nil {
return fmt.Errorf("demote: set epoch: %w", err)
}
vol.SetMasterEpoch(newEpoch)
return nil
}

438
weed/storage/blockvol/rebuild.go

@ -0,0 +1,438 @@
package blockvol
import (
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"net"
"sync"
)
const rebuildExtentChunkSize = 64 * 1024 // 64KB chunks for extent streaming
// ---------------------------------------------------------------------------
// Rebuild Server (primary side)
// ---------------------------------------------------------------------------
// RebuildServer serves WAL catch-up and full extent data to rebuilding replicas.
type RebuildServer struct {
vol *BlockVol
listener net.Listener
stopCh chan struct{}
wg sync.WaitGroup
}
// NewRebuildServer creates a rebuild server listening on addr.
func NewRebuildServer(vol *BlockVol, addr string) (*RebuildServer, error) {
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("rebuild: listen %s: %w", addr, err)
}
return &RebuildServer{
vol: vol,
listener: ln,
stopCh: make(chan struct{}),
}, nil
}
// Serve starts accepting rebuild connections.
func (s *RebuildServer) Serve() {
s.wg.Add(1)
go s.acceptLoop()
}
// Stop shuts down the rebuild server.
func (s *RebuildServer) Stop() {
select {
case <-s.stopCh:
return // already stopped
default:
}
close(s.stopCh)
s.listener.Close()
s.wg.Wait()
}
// Addr returns the listener's address.
func (s *RebuildServer) Addr() string {
return s.listener.Addr().String()
}
func (s *RebuildServer) acceptLoop() {
defer s.wg.Done()
for {
conn, err := s.listener.Accept()
if err != nil {
select {
case <-s.stopCh:
return
default:
log.Printf("rebuild: accept error: %v", err)
return
}
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.handleConn(conn)
}()
}
}
func (s *RebuildServer) handleConn(conn net.Conn) {
defer conn.Close()
msgType, payload, err := ReadFrame(conn)
if err != nil {
log.Printf("rebuild: read request: %v", err)
return
}
if msgType != MsgRebuildReq {
log.Printf("rebuild: unexpected message type 0x%02x", msgType)
return
}
req, err := DecodeRebuildRequest(payload)
if err != nil {
log.Printf("rebuild: decode request: %v", err)
return
}
// Validate epoch.
if req.Epoch != s.vol.epoch.Load() {
WriteFrame(conn, MsgRebuildError, []byte("EPOCH_MISMATCH"))
return
}
switch req.Type {
case RebuildWALCatchUp:
s.handleWALCatchUp(conn, req)
case RebuildFullExtent:
s.handleFullExtent(conn)
default:
WriteFrame(conn, MsgRebuildError, []byte("UNKNOWN_TYPE"))
}
}
func (s *RebuildServer) handleWALCatchUp(conn net.Conn, req RebuildRequest) {
checkpointLSN := s.vol.flusher.CheckpointLSN()
err := s.vol.wal.ScanFrom(s.vol.fd, s.vol.super.WALOffset,
checkpointLSN, req.FromLSN, func(entry *WALEntry) error {
encoded, err := entry.Encode()
if err != nil {
return err
}
return WriteFrame(conn, MsgRebuildEntry, encoded)
})
if errors.Is(err, ErrWALRecycled) {
WriteFrame(conn, MsgRebuildError, []byte("WAL_RECYCLED"))
return
}
if err != nil {
WriteFrame(conn, MsgRebuildError, []byte(err.Error()))
return
}
// Send done with the current nextLSN as the snapshot point.
// The client uses this to know where to start a second catch-up
// after a full extent copy.
lsnBuf := make([]byte, 8)
binary.BigEndian.PutUint64(lsnBuf, s.vol.nextLSN.Load())
WriteFrame(conn, MsgRebuildDone, lsnBuf)
}
func (s *RebuildServer) handleFullExtent(conn net.Conn) {
// Capture snapshot LSN before streaming — client will use this
// for a second catch-up scan to capture writes during copy.
snapshotLSN := s.vol.nextLSN.Load()
extentStart := s.vol.super.WALOffset + s.vol.super.WALSize
volumeSize := s.vol.super.VolumeSize
buf := make([]byte, rebuildExtentChunkSize)
var offset uint64
for offset < volumeSize {
chunkSize := uint64(rebuildExtentChunkSize)
if offset+chunkSize > volumeSize {
chunkSize = volumeSize - offset
}
n, err := s.vol.fd.ReadAt(buf[:chunkSize], int64(extentStart+offset))
if err != nil && err != io.EOF {
WriteFrame(conn, MsgRebuildError, []byte(err.Error()))
return
}
if err := WriteFrame(conn, MsgRebuildExtent, buf[:n]); err != nil {
return
}
offset += uint64(n)
}
// Send done with snapshot LSN.
lsnBuf := make([]byte, 8)
binary.BigEndian.PutUint64(lsnBuf, snapshotLSN)
WriteFrame(conn, MsgRebuildDone, lsnBuf)
}
// ---------------------------------------------------------------------------
// Rebuild Client (rebuilding replica side)
// ---------------------------------------------------------------------------
// StartRebuild initiates rebuild from primary. The volume must already be
// in RoleRebuilding (set via HandleAssignment). On success, transitions
// the volume to RoleReplica.
func StartRebuild(vol *BlockVol, primaryAddr string, fromLSN uint64, epoch uint64) error {
if vol.Role() != RoleRebuilding {
return fmt.Errorf("rebuild: expected role Rebuilding, got %s", vol.Role())
}
conn, err := net.Dial("tcp", primaryAddr)
if err != nil {
return fmt.Errorf("rebuild: connect to %s: %w", primaryAddr, err)
}
defer conn.Close()
// Try WAL catch-up first.
req := RebuildRequest{
Type: RebuildWALCatchUp,
FromLSN: fromLSN,
Epoch: epoch,
}
if err := WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req)); err != nil {
return fmt.Errorf("rebuild: send request: %w", err)
}
// Read responses.
var snapshotLSN uint64
for {
msgType, payload, err := ReadFrame(conn)
if err != nil {
return fmt.Errorf("rebuild: read response: %w", err)
}
switch msgType {
case MsgRebuildEntry:
if err := applyRebuildEntry(vol, payload); err != nil {
return fmt.Errorf("rebuild: apply entry: %w", err)
}
case MsgRebuildDone:
if len(payload) >= 8 {
snapshotLSN = binary.BigEndian.Uint64(payload[:8])
}
goto catchUpDone
case MsgRebuildError:
errMsg := string(payload)
if errMsg == "WAL_RECYCLED" {
// Fall back to full extent rebuild.
conn.Close()
return rebuildFullExtent(vol, primaryAddr, epoch)
}
return fmt.Errorf("rebuild: server error: %s", errMsg)
default:
return fmt.Errorf("rebuild: unexpected message type 0x%02x", msgType)
}
}
catchUpDone:
conn.Close()
// Second catch-up: capture writes that arrived on the primary after
// the scan snapshot. Without this, those writes are lost.
if err := rebuildSecondCatchUp(vol, primaryAddr, snapshotLSN, epoch); err != nil {
return err
}
return vol.SetRole(RoleReplica)
}
// rebuildFullExtent streams the full extent from primary, then does a
// second WAL catch-up to capture writes during the copy.
func rebuildFullExtent(vol *BlockVol, primaryAddr string, epoch uint64) error {
conn, err := net.Dial("tcp", primaryAddr)
if err != nil {
return fmt.Errorf("rebuild: connect for full extent: %w", err)
}
defer conn.Close()
req := RebuildRequest{
Type: RebuildFullExtent,
Epoch: epoch,
}
if err := WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req)); err != nil {
return fmt.Errorf("rebuild: send extent request: %w", err)
}
extentStart := vol.super.WALOffset + vol.super.WALSize
var offset uint64
var snapshotLSN uint64
for {
msgType, payload, err := ReadFrame(conn)
if err != nil {
return fmt.Errorf("rebuild: read extent response: %w", err)
}
switch msgType {
case MsgRebuildExtent:
if _, err := vol.fd.WriteAt(payload, int64(extentStart+offset)); err != nil {
return fmt.Errorf("rebuild: write extent at offset %d: %w", offset, err)
}
offset += uint64(len(payload))
case MsgRebuildDone:
if len(payload) >= 8 {
snapshotLSN = binary.BigEndian.Uint64(payload[:8])
}
goto extentDone
case MsgRebuildError:
return fmt.Errorf("rebuild: server error during extent: %s", string(payload))
default:
return fmt.Errorf("rebuild: unexpected message type 0x%02x during extent", msgType)
}
}
extentDone:
// Fsync the extent data.
if err := vol.fd.Sync(); err != nil {
return fmt.Errorf("rebuild: fsync extent: %w", err)
}
// Clear dirty map -- all data now in extent, stale dirty entries invalid.
vol.dirtyMap.Clear()
// Reset WAL state -- no valid WAL entries for old data.
vol.wal.mu.Lock()
vol.wal.logicalHead = 0
vol.wal.logicalTail = 0
vol.wal.mu.Unlock()
// Persist clean superblock state so crash recovery doesn't replay stale WAL.
checkpointLSN := uint64(0)
if snapshotLSN > 0 {
checkpointLSN = snapshotLSN - 1
}
vol.mu.Lock()
vol.super.WALHead = 0
vol.super.WALTail = 0
vol.super.WALCheckpointLSN = checkpointLSN
if _, err := vol.fd.Seek(0, 0); err != nil {
vol.mu.Unlock()
return fmt.Errorf("rebuild: seek superblock: %w", err)
}
if _, err := vol.super.WriteTo(vol.fd); err != nil {
vol.mu.Unlock()
return fmt.Errorf("rebuild: write superblock: %w", err)
}
if err := vol.fd.Sync(); err != nil {
vol.mu.Unlock()
return fmt.Errorf("rebuild: sync superblock: %w", err)
}
vol.mu.Unlock()
conn.Close()
// Second catch-up scan: capture writes during extent copy.
if err := rebuildSecondCatchUp(vol, primaryAddr, snapshotLSN, epoch); err != nil {
return err
}
return vol.SetRole(RoleReplica)
}
// rebuildSecondCatchUp connects to the primary and streams WAL entries
// from snapshotLSN to capture writes that arrived after the initial scan
// or extent copy. Shared by both WAL catch-up and full-extent paths.
func rebuildSecondCatchUp(vol *BlockVol, primaryAddr string, snapshotLSN uint64, epoch uint64) error {
if snapshotLSN == 0 {
return nil
}
conn, err := net.Dial("tcp", primaryAddr)
if err != nil {
return fmt.Errorf("rebuild: connect for second catch-up: %w", err)
}
defer conn.Close()
req := RebuildRequest{
Type: RebuildWALCatchUp,
FromLSN: snapshotLSN,
Epoch: epoch,
}
if err := WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req)); err != nil {
return fmt.Errorf("rebuild: send second catch-up request: %w", err)
}
for {
msgType, payload, err := ReadFrame(conn)
if err != nil {
return fmt.Errorf("rebuild: read second catch-up: %w", err)
}
switch msgType {
case MsgRebuildEntry:
if err := applyRebuildEntry(vol, payload); err != nil {
return fmt.Errorf("rebuild: apply second catch-up entry: %w", err)
}
case MsgRebuildDone:
return nil
case MsgRebuildError:
return fmt.Errorf("rebuild: server error during second catch-up: %s", string(payload))
default:
return fmt.Errorf("rebuild: unexpected message type 0x%02x during second catch-up", msgType)
}
}
}
// applyRebuildEntry decodes and applies a WAL entry during rebuild.
// Unlike ReplicaReceiver.applyEntry, no contiguous LSN enforcement
// (catch-up entries arrive in order but may have gaps from flushed entries).
func applyRebuildEntry(vol *BlockVol, payload []byte) error {
entry, err := DecodeWALEntry(payload)
if err != nil {
return fmt.Errorf("decode: %w", err)
}
walOff, err := vol.wal.Append(&entry)
if errors.Is(err, ErrWALFull) {
// Trigger flusher and retry.
if vol.flusher != nil {
vol.flusher.NotifyUrgent()
}
for i := 0; i < 100 && errors.Is(err, ErrWALFull); i++ {
if vol.flusher != nil {
vol.flusher.NotifyUrgent()
}
walOff, err = vol.wal.Append(&entry)
}
}
if err != nil {
return fmt.Errorf("WAL append: %w", err)
}
switch entry.Type {
case EntryTypeWrite, EntryTypeTrim:
blocks := entry.Length / vol.super.BlockSize
for i := uint32(0); i < blocks; i++ {
vol.dirtyMap.Put(entry.LBA+uint64(i), walOff, entry.LSN, vol.super.BlockSize)
}
}
// Update nextLSN if this entry has a higher LSN.
for {
current := vol.nextLSN.Load()
next := entry.LSN + 1
if next <= current {
break
}
if vol.nextLSN.CompareAndSwap(current, next) {
break
}
}
return nil
}

43
weed/storage/blockvol/repl_proto.go

@ -85,6 +85,49 @@ func ReadFrame(r io.Reader) (msgType byte, payload []byte, err error) {
return msgType, payload, nil
}
// Rebuild message types (on rebuild channel).
const (
MsgRebuildReq byte = 0x10 // client → server
MsgRebuildEntry byte = 0x11 // server → client: WAL entry
MsgRebuildExtent byte = 0x12 // server → client: extent chunk
MsgRebuildDone byte = 0x13 // server → client: stream complete
MsgRebuildError byte = 0x14 // server → client: error
)
// Rebuild request types.
const (
RebuildWALCatchUp byte = 0x01
RebuildFullExtent byte = 0x02
)
// RebuildRequest is sent by the rebuilding replica to the primary.
type RebuildRequest struct {
Type byte // RebuildWALCatchUp or RebuildFullExtent
FromLSN uint64
Epoch uint64
}
// EncodeRebuildRequest serializes a RebuildRequest (1+8+8 = 17 bytes).
func EncodeRebuildRequest(req RebuildRequest) []byte {
buf := make([]byte, 17)
buf[0] = req.Type
binary.BigEndian.PutUint64(buf[1:9], req.FromLSN)
binary.BigEndian.PutUint64(buf[9:17], req.Epoch)
return buf
}
// DecodeRebuildRequest deserializes a RebuildRequest.
func DecodeRebuildRequest(buf []byte) (RebuildRequest, error) {
if len(buf) < 17 {
return RebuildRequest{}, fmt.Errorf("repl: rebuild request too short: %d bytes", len(buf))
}
return RebuildRequest{
Type: buf[0],
FromLSN: binary.BigEndian.Uint64(buf[1:9]),
Epoch: binary.BigEndian.Uint64(buf[9:17]),
}, nil
}
// EncodeBarrierRequest serializes a BarrierRequest (4+8+8 = 20 bytes).
func EncodeBarrierRequest(req BarrierRequest) []byte {
buf := make([]byte, 20)

92
weed/storage/blockvol/wal_writer.go

@ -10,7 +10,8 @@ import (
)
var (
ErrWALFull = errors.New("blockvol: WAL region full")
ErrWALFull = errors.New("blockvol: WAL region full")
ErrWALRecycled = errors.New("blockvol: WAL entries recycled past requested LSN")
)
// WALWriter appends entries to the circular WAL region of a blockvol file.
@ -198,6 +199,95 @@ func (w *WALWriter) UsedFraction() float64 {
return float64(u) / float64(s)
}
// ScanFrom reads WAL entries starting at the first entry with LSN >= fromLSN.
// Calls fn for each valid WRITE or TRIM entry. Returns ErrWALRecycled if
// fromLSN is below checkpointLSN (those entries have been flushed to extent
// and the WAL space may have been reused).
func (w *WALWriter) ScanFrom(fd *os.File, walOffset uint64,
checkpointLSN uint64, fromLSN uint64, fn func(*WALEntry) error) error {
if fromLSN <= checkpointLSN && checkpointLSN > 0 {
return ErrWALRecycled
}
// Snapshot logical positions under lock.
w.mu.Lock()
logicalTail := w.logicalTail
logicalHead := w.logicalHead
walSize := w.walSize
w.mu.Unlock()
if logicalHead == logicalTail {
return nil // empty WAL
}
pos := logicalTail
for pos < logicalHead {
physPos := pos % walSize
remaining := walSize - physPos
// Need at least a header to proceed.
if remaining < uint64(walEntryHeaderSize) {
// Too small for a header — skip padding at end of region.
pos += remaining
continue
}
// Read header.
headerBuf := make([]byte, walEntryHeaderSize)
absOff := int64(walOffset + physPos)
if _, err := fd.ReadAt(headerBuf, absOff); err != nil {
return fmt.Errorf("ScanFrom: read header at WAL+%d: %w", physPos, err)
}
entryType := headerBuf[16]
lengthField := binary.LittleEndian.Uint32(headerBuf[26:])
// Calculate entry size based on type.
var payloadLen uint64
switch entryType {
case EntryTypePadding:
entrySize := uint64(walEntryHeaderSize) + uint64(lengthField)
pos += entrySize
continue
case EntryTypeWrite:
payloadLen = uint64(lengthField)
default:
// TRIM, BARRIER: no data payload
}
entrySize := uint64(walEntryHeaderSize) + payloadLen
// Read full entry.
fullBuf := make([]byte, entrySize)
if physPos+entrySize <= walSize {
if _, err := fd.ReadAt(fullBuf, absOff); err != nil {
return fmt.Errorf("ScanFrom: read entry at WAL+%d: %w", physPos, err)
}
} else {
// Entry should not span WAL boundary (padding prevents this),
// but guard against it.
return fmt.Errorf("ScanFrom: entry at WAL+%d spans boundary", physPos)
}
entry, err := DecodeWALEntry(fullBuf)
if err != nil {
// CRC failure — stop scanning (torn write).
return nil
}
if entry.LSN >= fromLSN && (entry.Type == EntryTypeWrite || entry.Type == EntryTypeTrim) {
if err := fn(&entry); err != nil {
return err
}
}
pos += entrySize
}
return nil
}
// Sync fsyncs the underlying file descriptor.
func (w *WALWriter) Sync() error {
return w.fd.Sync()

Loading…
Cancel
Save