diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index dcf276a11..ec64c07b3 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -53,6 +53,11 @@ type BlockVol struct { lease Lease role atomic.Uint32 roleCallback RoleChangeCallback + + // Promotion/rebuild fields (Phase 4A CP3). + rebuildServer *RebuildServer + assignMu sync.Mutex // serializes HandleAssignment calls + drainTimeout time.Duration // default 10s, for demote drain } // CreateBlockVol creates a new block volume file at path. @@ -547,6 +552,30 @@ func (v *BlockVol) StartReplicaReceiver(dataAddr, ctrlAddr string) error { return nil } +// HandleAssignment processes a role/epoch/lease assignment from master. +func (v *BlockVol) HandleAssignment(epoch uint64, role Role, leaseTTL time.Duration) error { + return HandleAssignment(v, epoch, role, leaseTTL) +} + +// StartRebuildServer creates and starts a rebuild server on the given address. +func (v *BlockVol) StartRebuildServer(addr string) error { + srv, err := NewRebuildServer(v, addr) + if err != nil { + return err + } + v.rebuildServer = srv + srv.Serve() + return nil +} + +// StopRebuildServer stops the rebuild server if running. +func (v *BlockVol) StopRebuildServer() { + if v.rebuildServer != nil { + v.rebuildServer.Stop() + v.rebuildServer = nil + } +} + // degradeReplica marks the shipper as degraded and logs a warning. func (v *BlockVol) degradeReplica(err error) { if v.shipper != nil { @@ -556,7 +585,7 @@ func (v *BlockVol) degradeReplica(err error) { } // Close shuts down the block volume and closes the file. -// Shutdown order: shipper → replica receiver → drain ops → group committer → flusher → final flush → close fd. +// Shutdown order: shipper -> replica receiver -> rebuild server -> drain ops -> group committer -> flusher -> final flush -> close fd. func (v *BlockVol) Close() error { v.closed.Store(true) @@ -568,6 +597,8 @@ func (v *BlockVol) Close() error { if v.replRecv != nil { v.replRecv.Stop() } + // Stop rebuild server. + v.StopRebuildServer() // Drain in-flight ops: beginOp checks closed and returns ErrVolumeClosed, // so no new ops can start. Wait for existing ones to finish (max 5s). diff --git a/weed/storage/blockvol/blockvol_test.go b/weed/storage/blockvol/blockvol_test.go index 06f42c655..3473b09c6 100644 --- a/weed/storage/blockvol/blockvol_test.go +++ b/weed/storage/blockvol/blockvol_test.go @@ -127,6 +127,42 @@ func TestBlockVol(t *testing.T) { {name: "replica_reject_lsn_gap", run: testReplicaRejectLSNGap}, {name: "barrier_fsync_failed_status", run: testBarrierFsyncFailedStatus}, {name: "barrier_configurable_timeout", run: testBarrierConfigurableTimeout}, + // Phase 4A CP3: WAL scanner. + {name: "wal_scan_from_middle", run: testWALScanFromMiddle}, + {name: "wal_scan_empty", run: testWALScanEmpty}, + {name: "wal_scan_recycled", run: testWALScanRecycled}, + {name: "wal_scan_wrap_padding", run: testWALScanWrapPadding}, + {name: "wal_scan_entry_crosses_end", run: testWALScanEntryCrossesEnd}, + // Phase 4A CP3: Promotion + Demotion. + {name: "promote_replica_to_primary", run: testPromoteReplicaToPrimary}, + {name: "promote_rejects_non_replica", run: testPromoteRejectsNonReplica}, + {name: "demote_primary_to_stale", run: testDemotePrimaryToStale}, + {name: "demote_drains_inflight_ops", run: testDemoteDrainsInflightOps}, + {name: "demote_stops_shipper", run: testDemoteStopsShipper}, + {name: "assignment_refresh_lease", run: testAssignmentRefreshLease}, + {name: "assignment_invalid_transition", run: testAssignmentInvalidTransition}, + // Phase 4A CP3: Rebuild protocol types. + {name: "rebuild_request_roundtrip", run: testRebuildRequestRoundtrip}, + // Phase 4A CP3: Rebuild server. + {name: "rebuild_server_wal_catchup", run: testRebuildServerWALCatchUp}, + {name: "rebuild_server_wal_recycled", run: testRebuildServerWALRecycled}, + {name: "rebuild_server_full_extent", run: testRebuildServerFullExtent}, + {name: "rebuild_server_epoch_mismatch", run: testRebuildServerEpochMismatch}, + // Phase 4A CP3: Rebuild client. + {name: "rebuild_wal_catchup_happy", run: testRebuildWALCatchUpHappy}, + {name: "rebuild_wal_catchup_to_replica", run: testRebuildWALCatchUpToReplica}, + {name: "rebuild_fallback_full_extent", run: testRebuildFallbackFullExtent}, + {name: "rebuild_full_extent_data_correct", run: testRebuildFullExtentDataCorrect}, + {name: "rebuild_full_extent_resets_dirty_map", run: testRebuildFullExtentResetsDirtyMap}, + // Phase 4A CP3: Split-brain tests. + {name: "split_brain_dead_zone", run: testSplitBrainDeadZone}, + {name: "split_brain_stale_primary_fenced", run: testSplitBrainStalePrimaryFenced}, + {name: "split_brain_epoch_rejects_stale_write", run: testSplitBrainEpochRejectsStaleWrite}, + {name: "split_brain_no_self_promotion", run: testSplitBrainNoSelfPromotion}, + {name: "split_brain_concurrent_assignment", run: testSplitBrainConcurrentAssignment}, + // Phase 4A CP3: Lifecycle tests. + {name: "blockvol_full_lifecycle", run: testBlockvolFullLifecycle}, + {name: "blockvol_rebuild_lifecycle", run: testBlockvolRebuildLifecycle}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -3212,6 +3248,886 @@ func testBarrierConfigurableTimeout(t *testing.T) { } } +// --------------------------------------------------------------------------- +// Phase 4A CP3: WAL Scanner tests +// --------------------------------------------------------------------------- + +func testWALScanFromMiddle(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + // Write 10 entries. + for i := 0; i < 10; i++ { + data := makeBlock(byte('A' + i)) + if err := v.WriteLBA(uint64(i), data); err != nil { + t.Fatalf("WriteLBA(%d): %v", i, err) + } + } + + // Scan from LSN=5 (0-indexed: LSN 1..10, so fromLSN=5 gets LSN 5..10 = 6 entries). + var scanned []uint64 + err := v.wal.ScanFrom(v.fd, v.super.WALOffset, 0, 5, func(e *WALEntry) error { + scanned = append(scanned, e.LSN) + return nil + }) + if err != nil { + t.Fatalf("ScanFrom: %v", err) + } + if len(scanned) != 6 { + t.Fatalf("expected 6 entries, got %d: %v", len(scanned), scanned) + } + if scanned[0] != 5 || scanned[5] != 10 { + t.Errorf("expected LSN range [5..10], got %v", scanned) + } +} + +func testWALScanEmpty(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + var count int + err := v.wal.ScanFrom(v.fd, v.super.WALOffset, 0, 1, func(e *WALEntry) error { + count++ + return nil + }) + if err != nil { + t.Fatalf("ScanFrom: %v", err) + } + if count != 0 { + t.Errorf("expected 0 entries on empty WAL, got %d", count) + } +} + +func testWALScanRecycled(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + // Write some entries. + for i := 0; i < 5; i++ { + v.WriteLBA(uint64(i), makeBlock(byte('A'+i))) + } + + // Simulate checkpointLSN=3 (entries 1-3 flushed). + err := v.wal.ScanFrom(v.fd, v.super.WALOffset, 3, 2, func(e *WALEntry) error { + return nil + }) + if !errors.Is(err, ErrWALRecycled) { + t.Fatalf("expected ErrWALRecycled, got %v", err) + } +} + +func testWALScanWrapPadding(t *testing.T) { + // Use a small WAL that will wrap. + dir := t.TempDir() + path := filepath.Join(dir, "test.blockvol") + v, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 64 * 1024, // 64KB + BlockSize: 4096, + WALSize: 4096 * 4, // 16KB WAL — very small, forces wrapping + }) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer v.Close() + + // Write entries until we wrap. WAL entry for 4KB write = 38 + 4096 = 4134 bytes. + // 16KB WAL can hold ~3 entries before wrapping. Write 2, flush, write 2 more. + v.WriteLBA(0, makeBlock('A')) + v.WriteLBA(1, makeBlock('B')) + + // Force flush to free WAL space. + v.flusher.FlushOnce() + + // Write more to trigger wrap. + v.WriteLBA(2, makeBlock('C')) + v.WriteLBA(3, makeBlock('D')) + + // Scan all entries from LSN=1. We should get whatever is still in WAL. + var scanned []uint64 + checkpointLSN := v.flusher.CheckpointLSN() + err = v.wal.ScanFrom(v.fd, v.super.WALOffset, checkpointLSN, checkpointLSN+1, func(e *WALEntry) error { + scanned = append(scanned, e.LSN) + return nil + }) + if err != nil { + t.Fatalf("ScanFrom with wrap: %v", err) + } + if len(scanned) == 0 { + t.Fatal("expected entries after wrap, got 0") + } +} + +func testWALScanEntryCrossesEnd(t *testing.T) { + // Similar to wrap padding — an entry that would span WAL end triggers padding. + dir := t.TempDir() + path := filepath.Join(dir, "test.blockvol") + v, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 64 * 1024, + BlockSize: 4096, + WALSize: 4096 * 5, // 20KB WAL + }) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer v.Close() + + // Write 3 entries, flush 2, write 2 more (forces padding + wrap). + v.WriteLBA(0, makeBlock('A')) + v.WriteLBA(1, makeBlock('B')) + v.WriteLBA(2, makeBlock('C')) + v.flusher.FlushOnce() + v.WriteLBA(3, makeBlock('D')) + v.WriteLBA(4, makeBlock('E')) + + checkpointLSN := v.flusher.CheckpointLSN() + var scanned []uint64 + err = v.wal.ScanFrom(v.fd, v.super.WALOffset, checkpointLSN, checkpointLSN+1, func(e *WALEntry) error { + scanned = append(scanned, e.LSN) + return nil + }) + if err != nil { + t.Fatalf("ScanFrom: %v", err) + } + if len(scanned) == 0 { + t.Fatal("expected entries after padding/wrap, got 0") + } +} + +// --------------------------------------------------------------------------- +// Phase 4A CP3: Promotion + Demotion tests +// --------------------------------------------------------------------------- + +// setupPrimary creates a volume and promotes it to Primary. +func setupPrimary(t *testing.T) *BlockVol { + t.Helper() + v := createTestVol(t) + if err := v.HandleAssignment(1, RolePrimary, 30*time.Second); err != nil { + t.Fatalf("promote to primary: %v", err) + } + return v +} + +// setupReplica creates a volume and sets it to Replica role. +func setupReplica(t *testing.T) *BlockVol { + t.Helper() + v := createTestVol(t) + if err := v.HandleAssignment(1, RoleReplica, 0); err != nil { + t.Fatalf("set replica: %v", err) + } + return v +} + +func testPromoteReplicaToPrimary(t *testing.T) { + v := setupReplica(t) + defer v.Close() + + if err := v.HandleAssignment(2, RolePrimary, 30*time.Second); err != nil { + t.Fatalf("promote: %v", err) + } + if v.Role() != RolePrimary { + t.Errorf("role: got %s, want Primary", v.Role()) + } + if v.Epoch() != 2 { + t.Errorf("epoch: got %d, want 2", v.Epoch()) + } + // Writes should succeed. + if err := v.WriteLBA(0, makeBlock('A')); err != nil { + t.Errorf("write after promote: %v", err) + } +} + +func testPromoteRejectsNonReplica(t *testing.T) { + v := setupPrimary(t) + defer v.Close() + + // Primary can't be promoted again. + err := v.HandleAssignment(3, RolePrimary, 30*time.Second) + if err != nil { + t.Errorf("same-role assignment should be a lease refresh, got error: %v", err) + } + + // Stale can't be promoted to Primary directly. + v2 := createTestVol(t) + defer v2.Close() + v2.HandleAssignment(1, RolePrimary, 30*time.Second) + v2.HandleAssignment(2, RoleStale, 0) + err = v2.HandleAssignment(3, RolePrimary, 30*time.Second) + if !errors.Is(err, ErrInvalidAssignment) { + t.Errorf("expected ErrInvalidAssignment for Stale→Primary, got: %v", err) + } +} + +func testDemotePrimaryToStale(t *testing.T) { + v := setupPrimary(t) + defer v.Close() + + if err := v.HandleAssignment(2, RoleStale, 0); err != nil { + t.Fatalf("demote: %v", err) + } + if v.Role() != RoleStale { + t.Errorf("role: got %s, want Stale", v.Role()) + } + if v.Epoch() != 2 { + t.Errorf("epoch: got %d, want 2", v.Epoch()) + } + // Writes should fail. + err := v.WriteLBA(0, makeBlock('A')) + if err == nil { + t.Error("expected write to fail after demotion") + } +} + +func testDemoteDrainsInflightOps(t *testing.T) { + v := setupPrimary(t) + defer v.Close() + + // Start a write that will hold an op outstanding. + var wg sync.WaitGroup + started := make(chan struct{}) + wg.Add(1) + go func() { + defer wg.Done() + v.beginOp() + close(started) + // Hold the op for a bit. + time.Sleep(50 * time.Millisecond) + v.endOp() + }() + + <-started + + // Demote should wait for the op to complete. + v.drainTimeout = 2 * time.Second + errCh := make(chan error, 1) + go func() { + errCh <- v.HandleAssignment(2, RoleStale, 0) + }() + + wg.Wait() + err := <-errCh + if err != nil { + t.Fatalf("demote: %v", err) + } + if v.Role() != RoleStale { + t.Errorf("role: got %s, want Stale", v.Role()) + } +} + +func testDemoteStopsShipper(t *testing.T) { + v := setupPrimary(t) + defer v.Close() + + // Create a shipper (won't connect but that's fine for this test). + v.shipper = NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { + return v.epoch.Load() + }) + + if err := v.HandleAssignment(2, RoleStale, 0); err != nil { + t.Fatalf("demote: %v", err) + } + if !v.shipper.stopped.Load() { + t.Error("shipper should be stopped after demotion") + } +} + +func testAssignmentRefreshLease(t *testing.T) { + v := setupPrimary(t) + defer v.Close() + + // Same role + same epoch -> refresh lease. + if err := v.HandleAssignment(1, RolePrimary, 1*time.Hour); err != nil { + t.Fatalf("refresh: %v", err) + } + if v.Role() != RolePrimary { + t.Errorf("role: got %s, want Primary", v.Role()) + } + if !v.lease.IsValid() { + t.Error("lease should be valid after refresh") + } + + // Same role + bumped epoch -> epoch updated, writes still work. + if err := v.HandleAssignment(5, RolePrimary, 1*time.Hour); err != nil { + t.Fatalf("refresh with epoch bump: %v", err) + } + if v.Epoch() != 5 { + t.Errorf("epoch after bump: got %d, want 5", v.Epoch()) + } + if err := v.WriteLBA(0, makeBlock('X')); err != nil { + t.Errorf("write after epoch bump refresh: %v", err) + } +} + +func testAssignmentInvalidTransition(t *testing.T) { + v := setupReplica(t) + defer v.Close() + + // Replica → Stale is invalid. + err := v.HandleAssignment(2, RoleStale, 0) + if !errors.Is(err, ErrInvalidAssignment) { + t.Errorf("expected ErrInvalidAssignment, got: %v", err) + } +} + +// --------------------------------------------------------------------------- +// Phase 4A CP3: Rebuild protocol roundtrip +// --------------------------------------------------------------------------- + +func testRebuildRequestRoundtrip(t *testing.T) { + req := RebuildRequest{ + Type: RebuildWALCatchUp, + FromLSN: 42, + Epoch: 7, + } + buf := EncodeRebuildRequest(req) + decoded, err := DecodeRebuildRequest(buf) + if err != nil { + t.Fatal(err) + } + if decoded.Type != req.Type || decoded.FromLSN != req.FromLSN || decoded.Epoch != req.Epoch { + t.Errorf("roundtrip mismatch: got %+v, want %+v", decoded, req) + } +} + +// --------------------------------------------------------------------------- +// Phase 4A CP3: Rebuild Server tests +// --------------------------------------------------------------------------- + +func testRebuildServerWALCatchUp(t *testing.T) { + v := setupPrimary(t) + defer v.Close() + + // Write some data. + for i := 0; i < 5; i++ { + v.WriteLBA(uint64(i), makeBlock(byte('A'+i))) + } + + srv, err := NewRebuildServer(v, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + srv.Serve() + defer srv.Stop() + + // Connect and request catch-up from LSN=1. + conn, err := net.Dial("tcp", srv.Addr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + req := RebuildRequest{Type: RebuildWALCatchUp, FromLSN: 1, Epoch: v.Epoch()} + WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req)) + + var entries int + for { + msgType, payload, err := ReadFrame(conn) + if err != nil { + t.Fatal(err) + } + if msgType == MsgRebuildDone { + break + } + if msgType == MsgRebuildEntry { + _, decErr := DecodeWALEntry(payload) + if decErr != nil { + t.Fatalf("decode entry: %v", decErr) + } + entries++ + } + if msgType == MsgRebuildError { + t.Fatalf("server error: %s", string(payload)) + } + } + if entries != 5 { + t.Errorf("expected 5 entries, got %d", entries) + } +} + +func testRebuildServerWALRecycled(t *testing.T) { + v := setupPrimary(t) + defer v.Close() + + // Write and flush to advance checkpoint. + for i := 0; i < 5; i++ { + v.WriteLBA(uint64(i), makeBlock(byte('A'+i))) + } + v.flusher.FlushOnce() + + srv, err := NewRebuildServer(v, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + srv.Serve() + defer srv.Stop() + + conn, err := net.Dial("tcp", srv.Addr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Request from LSN=1, but checkpoint is past that → WAL_RECYCLED. + req := RebuildRequest{Type: RebuildWALCatchUp, FromLSN: 1, Epoch: v.Epoch()} + WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req)) + + msgType, payload, err := ReadFrame(conn) + if err != nil { + t.Fatal(err) + } + if msgType != MsgRebuildError { + t.Fatalf("expected MsgRebuildError, got 0x%02x", msgType) + } + if string(payload) != "WAL_RECYCLED" { + t.Errorf("expected WAL_RECYCLED error, got: %s", string(payload)) + } +} + +func testRebuildServerFullExtent(t *testing.T) { + v := setupPrimary(t) + defer v.Close() + + // Write and flush so data is in extent. + v.WriteLBA(0, makeBlock('X')) + v.flusher.FlushOnce() + + srv, err := NewRebuildServer(v, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + srv.Serve() + defer srv.Stop() + + conn, err := net.Dial("tcp", srv.Addr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + req := RebuildRequest{Type: RebuildFullExtent, Epoch: v.Epoch()} + WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req)) + + var totalBytes int + for { + msgType, payload, err := ReadFrame(conn) + if err != nil { + t.Fatal(err) + } + if msgType == MsgRebuildDone { + break + } + if msgType == MsgRebuildExtent { + totalBytes += len(payload) + } + if msgType == MsgRebuildError { + t.Fatalf("server error: %s", string(payload)) + } + } + if uint64(totalBytes) != v.super.VolumeSize { + t.Errorf("expected %d bytes, got %d", v.super.VolumeSize, totalBytes) + } +} + +func testRebuildServerEpochMismatch(t *testing.T) { + v := setupPrimary(t) + defer v.Close() + + srv, err := NewRebuildServer(v, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + srv.Serve() + defer srv.Stop() + + conn, err := net.Dial("tcp", srv.Addr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Wrong epoch. + req := RebuildRequest{Type: RebuildWALCatchUp, FromLSN: 1, Epoch: 999} + WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req)) + + msgType, payload, err := ReadFrame(conn) + if err != nil { + t.Fatal(err) + } + if msgType != MsgRebuildError { + t.Fatalf("expected MsgRebuildError, got 0x%02x", msgType) + } + if string(payload) != "EPOCH_MISMATCH" { + t.Errorf("expected EPOCH_MISMATCH, got: %s", string(payload)) + } +} + +// --------------------------------------------------------------------------- +// Phase 4A CP3: Rebuild Client tests +// --------------------------------------------------------------------------- + +// setupRebuilding creates a volume in RoleRebuilding state with the given epoch. +func setupRebuilding(t *testing.T, epoch uint64) *BlockVol { + t.Helper() + v := createTestVol(t) + // Path: None → Primary → Stale → Rebuilding + if err := v.HandleAssignment(epoch, RolePrimary, 30*time.Second); err != nil { + t.Fatalf("setup rebuilding: promote: %v", err) + } + if err := v.HandleAssignment(epoch, RoleStale, 0); err != nil { + t.Fatalf("setup rebuilding: demote: %v", err) + } + if err := v.HandleAssignment(epoch, RoleRebuilding, 0); err != nil { + t.Fatalf("setup rebuilding: set rebuilding: %v", err) + } + return v +} + +func testRebuildWALCatchUpHappy(t *testing.T) { + primary := setupPrimary(t) + defer primary.Close() + + for i := 0; i < 5; i++ { + primary.WriteLBA(uint64(i), makeBlock(byte('A'+i))) + } + + srv, err := NewRebuildServer(primary, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + srv.Serve() + defer srv.Stop() + + stale := setupRebuilding(t, primary.Epoch()) + defer stale.Close() + + if err := StartRebuild(stale, srv.Addr(), 1, primary.Epoch()); err != nil { + t.Fatalf("StartRebuild: %v", err) + } + if stale.Role() != RoleReplica { + t.Errorf("role after rebuild: got %s, want Replica", stale.Role()) + } +} + +func testRebuildWALCatchUpToReplica(t *testing.T) { + primary := setupPrimary(t) + defer primary.Close() + + primary.WriteLBA(0, makeBlock('Z')) + + srv, err := NewRebuildServer(primary, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + srv.Serve() + defer srv.Stop() + + stale := setupRebuilding(t, primary.Epoch()) + defer stale.Close() + + if err := StartRebuild(stale, srv.Addr(), 1, primary.Epoch()); err != nil { + t.Fatalf("StartRebuild: %v", err) + } + + // After rebuild, reads should work. + data, err := stale.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA: %v", err) + } + if data[0] != 'Z' { + t.Errorf("data mismatch: got %c, want Z", data[0]) + } +} + +func testRebuildFallbackFullExtent(t *testing.T) { + primary := setupPrimary(t) + defer primary.Close() + + // Write and flush so WAL is recycled. + primary.WriteLBA(0, makeBlock('M')) + primary.flusher.FlushOnce() + + srv, err := NewRebuildServer(primary, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + srv.Serve() + defer srv.Stop() + + stale := setupRebuilding(t, primary.Epoch()) + defer stale.Close() + + // Request catch-up from LSN=1, which is recycled → falls back to full extent. + if err := StartRebuild(stale, srv.Addr(), 1, primary.Epoch()); err != nil { + t.Fatalf("StartRebuild (fallback): %v", err) + } + if stale.Role() != RoleReplica { + t.Errorf("role: got %s, want Replica", stale.Role()) + } + + // Verify data matches. + data, err := stale.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA: %v", err) + } + if data[0] != 'M' { + t.Errorf("data mismatch: got %c, want M", data[0]) + } +} + +func testRebuildFullExtentDataCorrect(t *testing.T) { + primary := setupPrimary(t) + defer primary.Close() + + // Write several blocks and flush. + for i := 0; i < 10; i++ { + primary.WriteLBA(uint64(i), makeBlock(byte('0'+i))) + } + primary.flusher.FlushOnce() + + srv, err := NewRebuildServer(primary, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + srv.Serve() + defer srv.Stop() + + stale := setupRebuilding(t, primary.Epoch()) + defer stale.Close() + + // Trigger full extent (LSN=1 recycled after flush). + if err := StartRebuild(stale, srv.Addr(), 1, primary.Epoch()); err != nil { + t.Fatalf("StartRebuild: %v", err) + } + + // Verify all blocks. + for i := 0; i < 10; i++ { + data, err := stale.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("ReadLBA(%d): %v", i, err) + } + if data[0] != byte('0'+i) { + t.Errorf("block %d: got %c, want %c", i, data[0], byte('0'+i)) + } + } +} + +func testRebuildFullExtentResetsDirtyMap(t *testing.T) { + primary := setupPrimary(t) + defer primary.Close() + + primary.WriteLBA(0, makeBlock('A')) + primary.flusher.FlushOnce() + + srv, err := NewRebuildServer(primary, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + srv.Serve() + defer srv.Stop() + + stale := setupRebuilding(t, primary.Epoch()) + defer stale.Close() + + // Write some data directly to WAL to create dirty map entries on the stale volume. + // (Can't use WriteLBA since role is Rebuilding, not Primary.) + entry := &WALEntry{LSN: 100, Epoch: primary.Epoch(), Type: EntryTypeWrite, LBA: 5, Length: 4096, Data: makeBlock('Z')} + walOff, _ := stale.wal.Append(entry) + stale.dirtyMap.Put(5, walOff, 100, 4096) + if stale.dirtyMap.Len() == 0 { + t.Fatal("expected dirty entries before rebuild") + } + + if err := StartRebuild(stale, srv.Addr(), 1, primary.Epoch()); err != nil { + t.Fatalf("StartRebuild: %v", err) + } + + // After full extent rebuild, dirty map should be cleared. + if stale.dirtyMap.Len() != 0 { + t.Errorf("dirty map should be empty after full extent rebuild, got %d entries", stale.dirtyMap.Len()) + } +} + +// --------------------------------------------------------------------------- +// Phase 4A CP3: Split-brain tests +// --------------------------------------------------------------------------- + +func testSplitBrainDeadZone(t *testing.T) { + // After demote (old primary), before promote (new primary) — no node accepts writes. + oldPrimary := setupPrimary(t) + defer oldPrimary.Close() + newReplica := setupReplica(t) + defer newReplica.Close() + + // Demote old primary. + if err := oldPrimary.HandleAssignment(2, RoleStale, 0); err != nil { + t.Fatalf("demote: %v", err) + } + + // Old primary can't write. + if err := oldPrimary.WriteLBA(0, makeBlock('A')); err == nil { + t.Error("old primary should reject writes after demotion") + } + + // New replica hasn't been promoted yet — can't write. + if err := newReplica.WriteLBA(0, makeBlock('B')); err == nil { + t.Error("replica should reject writes before promotion") + } +} + +func testSplitBrainStalePrimaryFenced(t *testing.T) { + v := setupPrimary(t) + defer v.Close() + + // Let lease expire. + v.lease.Grant(1 * time.Millisecond) + time.Sleep(5 * time.Millisecond) + + err := v.WriteLBA(0, makeBlock('A')) + if !errors.Is(err, ErrLeaseExpired) { + t.Errorf("expected ErrLeaseExpired, got: %v", err) + } +} + +func testSplitBrainEpochRejectsStaleWrite(t *testing.T) { + v := setupPrimary(t) + defer v.Close() + + // Simulate master bumping epoch without this node knowing. + v.masterEpoch.Store(99) + + err := v.WriteLBA(0, makeBlock('A')) + if !errors.Is(err, ErrEpochStale) { + t.Errorf("expected ErrEpochStale, got: %v", err) + } +} + +func testSplitBrainNoSelfPromotion(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + // Set to Replica. + v.HandleAssignment(1, RoleReplica, 0) + + // Try direct SetRole without going through HandleAssignment. + // This should work because SetRole itself is valid (Replica→Primary), + // but without setting epoch/lease, writes will fail. + if err := v.SetRole(RolePrimary); err != nil { + t.Fatalf("SetRole: %v", err) + } + + // Writes fail because epoch/masterEpoch mismatch (self-promotion + // didn't set masterEpoch). + err := v.WriteLBA(0, makeBlock('A')) + if err == nil { + t.Error("self-promotion without proper assignment should fail writes") + } +} + +func testSplitBrainConcurrentAssignment(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + // Set up as Replica first. + v.HandleAssignment(1, RoleReplica, 0) + + // Concurrent assignment attempts: promote + invalid. + var wg sync.WaitGroup + results := make([]error, 2) + wg.Add(2) + go func() { + defer wg.Done() + results[0] = v.HandleAssignment(2, RolePrimary, 30*time.Second) + }() + go func() { + defer wg.Done() + results[1] = v.HandleAssignment(3, RolePrimary, 30*time.Second) + }() + wg.Wait() + + // With assignMu serialization, one should succeed and the other + // should either succeed (refresh on already-promoted) or fail. + // The key guarantee is no panic and consistent state. + if v.Role() != RolePrimary { + t.Errorf("role: got %s, want Primary after concurrent assignments", v.Role()) + } +} + +// --------------------------------------------------------------------------- +// Phase 4A CP3: Lifecycle tests +// --------------------------------------------------------------------------- + +func testBlockvolFullLifecycle(t *testing.T) { + // Primary writes, promotes replica, demotes old primary, new primary serves writes. + primary := setupPrimary(t) + defer primary.Close() + replica := setupReplica(t) + defer replica.Close() + + // Primary writes. + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatalf("primary write: %v", err) + } + + // Demote old primary. + if err := primary.HandleAssignment(2, RoleStale, 0); err != nil { + t.Fatalf("demote: %v", err) + } + + // Promote replica. + if err := replica.HandleAssignment(2, RolePrimary, 30*time.Second); err != nil { + t.Fatalf("promote replica: %v", err) + } + + // New primary can write. + if err := replica.WriteLBA(1, makeBlock('B')); err != nil { + t.Fatalf("new primary write: %v", err) + } + + // Old primary can't write. + if err := primary.WriteLBA(2, makeBlock('C')); err == nil { + t.Error("old primary should reject writes") + } +} + +func testBlockvolRebuildLifecycle(t *testing.T) { + primary := setupPrimary(t) + defer primary.Close() + + // Write data. + primary.WriteLBA(0, makeBlock('R')) + primary.WriteLBA(1, makeBlock('S')) + + srv, err := NewRebuildServer(primary, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + srv.Serve() + defer srv.Stop() + + // Create stale and rebuild. + stale := setupRebuilding(t, primary.Epoch()) + defer stale.Close() + + if err := StartRebuild(stale, srv.Addr(), 1, primary.Epoch()); err != nil { + t.Fatalf("rebuild: %v", err) + } + + // Verify data. + data, err := stale.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA(0): %v", err) + } + if data[0] != 'R' { + t.Errorf("block 0: got %c, want R", data[0]) + } + data, err = stale.ReadLBA(1, 4096) + if err != nil { + t.Fatalf("ReadLBA(1): %v", err) + } + if data[0] != 'S' { + t.Errorf("block 1: got %c, want S", data[0]) + } +} + // Suppress unused import warnings. var _ = fmt.Sprintf var _ io.Reader diff --git a/weed/storage/blockvol/dirty_map.go b/weed/storage/blockvol/dirty_map.go index 15a70fed4..c3e15481f 100644 --- a/weed/storage/blockvol/dirty_map.go +++ b/weed/storage/blockvol/dirty_map.go @@ -105,6 +105,16 @@ func (d *DirtyMap) Range(start uint64, count uint32, fn func(lba, walOffset, lsn } } +// Clear removes all entries from the dirty map. +func (d *DirtyMap) Clear() { + for i := range d.shards { + s := &d.shards[i] + s.mu.Lock() + s.m = make(map[uint64]dirtyEntry) + s.mu.Unlock() + } +} + // Len returns the number of dirty entries across all shards. func (d *DirtyMap) Len() int { n := 0 diff --git a/weed/storage/blockvol/promotion.go b/weed/storage/blockvol/promotion.go new file mode 100644 index 000000000..4edc256be --- /dev/null +++ b/weed/storage/blockvol/promotion.go @@ -0,0 +1,114 @@ +package blockvol + +import ( + "errors" + "fmt" + "time" +) + +var ( + ErrInvalidAssignment = errors.New("blockvol: invalid assignment transition") + ErrDrainTimeout = errors.New("blockvol: drain timeout waiting for in-flight ops") +) + +const defaultDrainTimeout = 10 * time.Second + +// HandleAssignment processes a role/epoch/lease assignment from master. +// Serialized with vol.assignMu to prevent concurrent assignment races. +func HandleAssignment(vol *BlockVol, newEpoch uint64, newRole Role, leaseTTL time.Duration) error { + vol.assignMu.Lock() + defer vol.assignMu.Unlock() + + current := vol.Role() + + // Same role -> refresh lease and update epoch if bumped. + if current == newRole { + if newEpoch > vol.Epoch() { + if err := vol.SetEpoch(newEpoch); err != nil { + return fmt.Errorf("assignment refresh: set epoch: %w", err) + } + vol.SetMasterEpoch(newEpoch) + } + if current == RolePrimary { + vol.lease.Grant(leaseTTL) + } + return nil + } + + switch { + case current == RoleReplica && newRole == RolePrimary: + return promote(vol, newEpoch, leaseTTL) + case current == RolePrimary && newRole == RoleStale: + return demote(vol, newEpoch) + case current == RoleStale && newRole == RoleRebuilding: + // Rebuild started externally via StartRebuild. + return vol.SetRole(RoleRebuilding) + case current == RoleNone && newRole == RolePrimary: + return promote(vol, newEpoch, leaseTTL) + case current == RoleNone && newRole == RoleReplica: + vol.SetMasterEpoch(newEpoch) + return vol.SetRole(RoleReplica) + default: + return fmt.Errorf("%w: %s -> %s", ErrInvalidAssignment, current, newRole) + } +} + +// promote transitions Replica/None -> Primary. +// Order matters: epoch durable BEFORE writes possible. +func promote(vol *BlockVol, newEpoch uint64, leaseTTL time.Duration) error { + if err := vol.SetEpoch(newEpoch); err != nil { + return fmt.Errorf("promote: set epoch: %w", err) + } + vol.SetMasterEpoch(newEpoch) + if err := vol.SetRole(RolePrimary); err != nil { + return fmt.Errorf("promote: set role: %w", err) + } + vol.lease.Grant(leaseTTL) + return nil +} + +// demote transitions Primary -> Draining -> Stale. +// Revokes lease first, drains in-flight ops, then persists new epoch. +func demote(vol *BlockVol, newEpoch uint64) error { + // Revoke lease — writeGate blocks new writes immediately. + vol.lease.Revoke() + + // Transition to Draining. + if err := vol.SetRole(RoleDraining); err != nil { + return fmt.Errorf("demote: set draining: %w", err) + } + + // Wait for in-flight ops to drain. + drainTTL := vol.drainTimeout + if drainTTL == 0 { + drainTTL = defaultDrainTimeout + } + deadline := time.NewTimer(drainTTL) + defer deadline.Stop() + ticker := time.NewTicker(1 * time.Millisecond) + defer ticker.Stop() + for vol.opsOutstanding.Load() > 0 { + select { + case <-deadline.C: + return ErrDrainTimeout + case <-ticker.C: + } + } + + // Stop shipper if present. + if vol.shipper != nil { + vol.shipper.Stop() + } + + // Transition Draining -> Stale. + if err := vol.SetRole(RoleStale); err != nil { + return fmt.Errorf("demote: set stale: %w", err) + } + + // Persist new epoch. + if err := vol.SetEpoch(newEpoch); err != nil { + return fmt.Errorf("demote: set epoch: %w", err) + } + vol.SetMasterEpoch(newEpoch) + return nil +} diff --git a/weed/storage/blockvol/rebuild.go b/weed/storage/blockvol/rebuild.go new file mode 100644 index 000000000..76848875b --- /dev/null +++ b/weed/storage/blockvol/rebuild.go @@ -0,0 +1,438 @@ +package blockvol + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + "log" + "net" + "sync" +) + +const rebuildExtentChunkSize = 64 * 1024 // 64KB chunks for extent streaming + +// --------------------------------------------------------------------------- +// Rebuild Server (primary side) +// --------------------------------------------------------------------------- + +// RebuildServer serves WAL catch-up and full extent data to rebuilding replicas. +type RebuildServer struct { + vol *BlockVol + listener net.Listener + stopCh chan struct{} + wg sync.WaitGroup +} + +// NewRebuildServer creates a rebuild server listening on addr. +func NewRebuildServer(vol *BlockVol, addr string) (*RebuildServer, error) { + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("rebuild: listen %s: %w", addr, err) + } + return &RebuildServer{ + vol: vol, + listener: ln, + stopCh: make(chan struct{}), + }, nil +} + +// Serve starts accepting rebuild connections. +func (s *RebuildServer) Serve() { + s.wg.Add(1) + go s.acceptLoop() +} + +// Stop shuts down the rebuild server. +func (s *RebuildServer) Stop() { + select { + case <-s.stopCh: + return // already stopped + default: + } + close(s.stopCh) + s.listener.Close() + s.wg.Wait() +} + +// Addr returns the listener's address. +func (s *RebuildServer) Addr() string { + return s.listener.Addr().String() +} + +func (s *RebuildServer) acceptLoop() { + defer s.wg.Done() + for { + conn, err := s.listener.Accept() + if err != nil { + select { + case <-s.stopCh: + return + default: + log.Printf("rebuild: accept error: %v", err) + return + } + } + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.handleConn(conn) + }() + } +} + +func (s *RebuildServer) handleConn(conn net.Conn) { + defer conn.Close() + + msgType, payload, err := ReadFrame(conn) + if err != nil { + log.Printf("rebuild: read request: %v", err) + return + } + if msgType != MsgRebuildReq { + log.Printf("rebuild: unexpected message type 0x%02x", msgType) + return + } + + req, err := DecodeRebuildRequest(payload) + if err != nil { + log.Printf("rebuild: decode request: %v", err) + return + } + + // Validate epoch. + if req.Epoch != s.vol.epoch.Load() { + WriteFrame(conn, MsgRebuildError, []byte("EPOCH_MISMATCH")) + return + } + + switch req.Type { + case RebuildWALCatchUp: + s.handleWALCatchUp(conn, req) + case RebuildFullExtent: + s.handleFullExtent(conn) + default: + WriteFrame(conn, MsgRebuildError, []byte("UNKNOWN_TYPE")) + } +} + +func (s *RebuildServer) handleWALCatchUp(conn net.Conn, req RebuildRequest) { + checkpointLSN := s.vol.flusher.CheckpointLSN() + + err := s.vol.wal.ScanFrom(s.vol.fd, s.vol.super.WALOffset, + checkpointLSN, req.FromLSN, func(entry *WALEntry) error { + encoded, err := entry.Encode() + if err != nil { + return err + } + return WriteFrame(conn, MsgRebuildEntry, encoded) + }) + + if errors.Is(err, ErrWALRecycled) { + WriteFrame(conn, MsgRebuildError, []byte("WAL_RECYCLED")) + return + } + if err != nil { + WriteFrame(conn, MsgRebuildError, []byte(err.Error())) + return + } + + // Send done with the current nextLSN as the snapshot point. + // The client uses this to know where to start a second catch-up + // after a full extent copy. + lsnBuf := make([]byte, 8) + binary.BigEndian.PutUint64(lsnBuf, s.vol.nextLSN.Load()) + WriteFrame(conn, MsgRebuildDone, lsnBuf) +} + +func (s *RebuildServer) handleFullExtent(conn net.Conn) { + // Capture snapshot LSN before streaming — client will use this + // for a second catch-up scan to capture writes during copy. + snapshotLSN := s.vol.nextLSN.Load() + + extentStart := s.vol.super.WALOffset + s.vol.super.WALSize + volumeSize := s.vol.super.VolumeSize + + buf := make([]byte, rebuildExtentChunkSize) + var offset uint64 + for offset < volumeSize { + chunkSize := uint64(rebuildExtentChunkSize) + if offset+chunkSize > volumeSize { + chunkSize = volumeSize - offset + } + n, err := s.vol.fd.ReadAt(buf[:chunkSize], int64(extentStart+offset)) + if err != nil && err != io.EOF { + WriteFrame(conn, MsgRebuildError, []byte(err.Error())) + return + } + if err := WriteFrame(conn, MsgRebuildExtent, buf[:n]); err != nil { + return + } + offset += uint64(n) + } + + // Send done with snapshot LSN. + lsnBuf := make([]byte, 8) + binary.BigEndian.PutUint64(lsnBuf, snapshotLSN) + WriteFrame(conn, MsgRebuildDone, lsnBuf) +} + +// --------------------------------------------------------------------------- +// Rebuild Client (rebuilding replica side) +// --------------------------------------------------------------------------- + +// StartRebuild initiates rebuild from primary. The volume must already be +// in RoleRebuilding (set via HandleAssignment). On success, transitions +// the volume to RoleReplica. +func StartRebuild(vol *BlockVol, primaryAddr string, fromLSN uint64, epoch uint64) error { + if vol.Role() != RoleRebuilding { + return fmt.Errorf("rebuild: expected role Rebuilding, got %s", vol.Role()) + } + + conn, err := net.Dial("tcp", primaryAddr) + if err != nil { + return fmt.Errorf("rebuild: connect to %s: %w", primaryAddr, err) + } + defer conn.Close() + + // Try WAL catch-up first. + req := RebuildRequest{ + Type: RebuildWALCatchUp, + FromLSN: fromLSN, + Epoch: epoch, + } + if err := WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req)); err != nil { + return fmt.Errorf("rebuild: send request: %w", err) + } + + // Read responses. + var snapshotLSN uint64 + for { + msgType, payload, err := ReadFrame(conn) + if err != nil { + return fmt.Errorf("rebuild: read response: %w", err) + } + + switch msgType { + case MsgRebuildEntry: + if err := applyRebuildEntry(vol, payload); err != nil { + return fmt.Errorf("rebuild: apply entry: %w", err) + } + + case MsgRebuildDone: + if len(payload) >= 8 { + snapshotLSN = binary.BigEndian.Uint64(payload[:8]) + } + goto catchUpDone + + case MsgRebuildError: + errMsg := string(payload) + if errMsg == "WAL_RECYCLED" { + // Fall back to full extent rebuild. + conn.Close() + return rebuildFullExtent(vol, primaryAddr, epoch) + } + return fmt.Errorf("rebuild: server error: %s", errMsg) + + default: + return fmt.Errorf("rebuild: unexpected message type 0x%02x", msgType) + } + } + +catchUpDone: + conn.Close() + + // Second catch-up: capture writes that arrived on the primary after + // the scan snapshot. Without this, those writes are lost. + if err := rebuildSecondCatchUp(vol, primaryAddr, snapshotLSN, epoch); err != nil { + return err + } + return vol.SetRole(RoleReplica) +} + +// rebuildFullExtent streams the full extent from primary, then does a +// second WAL catch-up to capture writes during the copy. +func rebuildFullExtent(vol *BlockVol, primaryAddr string, epoch uint64) error { + conn, err := net.Dial("tcp", primaryAddr) + if err != nil { + return fmt.Errorf("rebuild: connect for full extent: %w", err) + } + defer conn.Close() + + req := RebuildRequest{ + Type: RebuildFullExtent, + Epoch: epoch, + } + if err := WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req)); err != nil { + return fmt.Errorf("rebuild: send extent request: %w", err) + } + + extentStart := vol.super.WALOffset + vol.super.WALSize + var offset uint64 + var snapshotLSN uint64 + + for { + msgType, payload, err := ReadFrame(conn) + if err != nil { + return fmt.Errorf("rebuild: read extent response: %w", err) + } + + switch msgType { + case MsgRebuildExtent: + if _, err := vol.fd.WriteAt(payload, int64(extentStart+offset)); err != nil { + return fmt.Errorf("rebuild: write extent at offset %d: %w", offset, err) + } + offset += uint64(len(payload)) + + case MsgRebuildDone: + if len(payload) >= 8 { + snapshotLSN = binary.BigEndian.Uint64(payload[:8]) + } + goto extentDone + + case MsgRebuildError: + return fmt.Errorf("rebuild: server error during extent: %s", string(payload)) + + default: + return fmt.Errorf("rebuild: unexpected message type 0x%02x during extent", msgType) + } + } + +extentDone: + // Fsync the extent data. + if err := vol.fd.Sync(); err != nil { + return fmt.Errorf("rebuild: fsync extent: %w", err) + } + + // Clear dirty map -- all data now in extent, stale dirty entries invalid. + vol.dirtyMap.Clear() + + // Reset WAL state -- no valid WAL entries for old data. + vol.wal.mu.Lock() + vol.wal.logicalHead = 0 + vol.wal.logicalTail = 0 + vol.wal.mu.Unlock() + + // Persist clean superblock state so crash recovery doesn't replay stale WAL. + checkpointLSN := uint64(0) + if snapshotLSN > 0 { + checkpointLSN = snapshotLSN - 1 + } + vol.mu.Lock() + vol.super.WALHead = 0 + vol.super.WALTail = 0 + vol.super.WALCheckpointLSN = checkpointLSN + if _, err := vol.fd.Seek(0, 0); err != nil { + vol.mu.Unlock() + return fmt.Errorf("rebuild: seek superblock: %w", err) + } + if _, err := vol.super.WriteTo(vol.fd); err != nil { + vol.mu.Unlock() + return fmt.Errorf("rebuild: write superblock: %w", err) + } + if err := vol.fd.Sync(); err != nil { + vol.mu.Unlock() + return fmt.Errorf("rebuild: sync superblock: %w", err) + } + vol.mu.Unlock() + + conn.Close() + + // Second catch-up scan: capture writes during extent copy. + if err := rebuildSecondCatchUp(vol, primaryAddr, snapshotLSN, epoch); err != nil { + return err + } + return vol.SetRole(RoleReplica) +} + +// rebuildSecondCatchUp connects to the primary and streams WAL entries +// from snapshotLSN to capture writes that arrived after the initial scan +// or extent copy. Shared by both WAL catch-up and full-extent paths. +func rebuildSecondCatchUp(vol *BlockVol, primaryAddr string, snapshotLSN uint64, epoch uint64) error { + if snapshotLSN == 0 { + return nil + } + conn, err := net.Dial("tcp", primaryAddr) + if err != nil { + return fmt.Errorf("rebuild: connect for second catch-up: %w", err) + } + defer conn.Close() + + req := RebuildRequest{ + Type: RebuildWALCatchUp, + FromLSN: snapshotLSN, + Epoch: epoch, + } + if err := WriteFrame(conn, MsgRebuildReq, EncodeRebuildRequest(req)); err != nil { + return fmt.Errorf("rebuild: send second catch-up request: %w", err) + } + + for { + msgType, payload, err := ReadFrame(conn) + if err != nil { + return fmt.Errorf("rebuild: read second catch-up: %w", err) + } + + switch msgType { + case MsgRebuildEntry: + if err := applyRebuildEntry(vol, payload); err != nil { + return fmt.Errorf("rebuild: apply second catch-up entry: %w", err) + } + case MsgRebuildDone: + return nil + case MsgRebuildError: + return fmt.Errorf("rebuild: server error during second catch-up: %s", string(payload)) + default: + return fmt.Errorf("rebuild: unexpected message type 0x%02x during second catch-up", msgType) + } + } +} + +// applyRebuildEntry decodes and applies a WAL entry during rebuild. +// Unlike ReplicaReceiver.applyEntry, no contiguous LSN enforcement +// (catch-up entries arrive in order but may have gaps from flushed entries). +func applyRebuildEntry(vol *BlockVol, payload []byte) error { + entry, err := DecodeWALEntry(payload) + if err != nil { + return fmt.Errorf("decode: %w", err) + } + + walOff, err := vol.wal.Append(&entry) + if errors.Is(err, ErrWALFull) { + // Trigger flusher and retry. + if vol.flusher != nil { + vol.flusher.NotifyUrgent() + } + for i := 0; i < 100 && errors.Is(err, ErrWALFull); i++ { + if vol.flusher != nil { + vol.flusher.NotifyUrgent() + } + walOff, err = vol.wal.Append(&entry) + } + } + if err != nil { + return fmt.Errorf("WAL append: %w", err) + } + + switch entry.Type { + case EntryTypeWrite, EntryTypeTrim: + blocks := entry.Length / vol.super.BlockSize + for i := uint32(0); i < blocks; i++ { + vol.dirtyMap.Put(entry.LBA+uint64(i), walOff, entry.LSN, vol.super.BlockSize) + } + } + + // Update nextLSN if this entry has a higher LSN. + for { + current := vol.nextLSN.Load() + next := entry.LSN + 1 + if next <= current { + break + } + if vol.nextLSN.CompareAndSwap(current, next) { + break + } + } + + return nil +} diff --git a/weed/storage/blockvol/repl_proto.go b/weed/storage/blockvol/repl_proto.go index 8f15024a9..3f12a1da6 100644 --- a/weed/storage/blockvol/repl_proto.go +++ b/weed/storage/blockvol/repl_proto.go @@ -85,6 +85,49 @@ func ReadFrame(r io.Reader) (msgType byte, payload []byte, err error) { return msgType, payload, nil } +// Rebuild message types (on rebuild channel). +const ( + MsgRebuildReq byte = 0x10 // client → server + MsgRebuildEntry byte = 0x11 // server → client: WAL entry + MsgRebuildExtent byte = 0x12 // server → client: extent chunk + MsgRebuildDone byte = 0x13 // server → client: stream complete + MsgRebuildError byte = 0x14 // server → client: error +) + +// Rebuild request types. +const ( + RebuildWALCatchUp byte = 0x01 + RebuildFullExtent byte = 0x02 +) + +// RebuildRequest is sent by the rebuilding replica to the primary. +type RebuildRequest struct { + Type byte // RebuildWALCatchUp or RebuildFullExtent + FromLSN uint64 + Epoch uint64 +} + +// EncodeRebuildRequest serializes a RebuildRequest (1+8+8 = 17 bytes). +func EncodeRebuildRequest(req RebuildRequest) []byte { + buf := make([]byte, 17) + buf[0] = req.Type + binary.BigEndian.PutUint64(buf[1:9], req.FromLSN) + binary.BigEndian.PutUint64(buf[9:17], req.Epoch) + return buf +} + +// DecodeRebuildRequest deserializes a RebuildRequest. +func DecodeRebuildRequest(buf []byte) (RebuildRequest, error) { + if len(buf) < 17 { + return RebuildRequest{}, fmt.Errorf("repl: rebuild request too short: %d bytes", len(buf)) + } + return RebuildRequest{ + Type: buf[0], + FromLSN: binary.BigEndian.Uint64(buf[1:9]), + Epoch: binary.BigEndian.Uint64(buf[9:17]), + }, nil +} + // EncodeBarrierRequest serializes a BarrierRequest (4+8+8 = 20 bytes). func EncodeBarrierRequest(req BarrierRequest) []byte { buf := make([]byte, 20) diff --git a/weed/storage/blockvol/wal_writer.go b/weed/storage/blockvol/wal_writer.go index f8343c678..9ddf50a20 100644 --- a/weed/storage/blockvol/wal_writer.go +++ b/weed/storage/blockvol/wal_writer.go @@ -10,7 +10,8 @@ import ( ) var ( - ErrWALFull = errors.New("blockvol: WAL region full") + ErrWALFull = errors.New("blockvol: WAL region full") + ErrWALRecycled = errors.New("blockvol: WAL entries recycled past requested LSN") ) // WALWriter appends entries to the circular WAL region of a blockvol file. @@ -198,6 +199,95 @@ func (w *WALWriter) UsedFraction() float64 { return float64(u) / float64(s) } +// ScanFrom reads WAL entries starting at the first entry with LSN >= fromLSN. +// Calls fn for each valid WRITE or TRIM entry. Returns ErrWALRecycled if +// fromLSN is below checkpointLSN (those entries have been flushed to extent +// and the WAL space may have been reused). +func (w *WALWriter) ScanFrom(fd *os.File, walOffset uint64, + checkpointLSN uint64, fromLSN uint64, fn func(*WALEntry) error) error { + + if fromLSN <= checkpointLSN && checkpointLSN > 0 { + return ErrWALRecycled + } + + // Snapshot logical positions under lock. + w.mu.Lock() + logicalTail := w.logicalTail + logicalHead := w.logicalHead + walSize := w.walSize + w.mu.Unlock() + + if logicalHead == logicalTail { + return nil // empty WAL + } + + pos := logicalTail + for pos < logicalHead { + physPos := pos % walSize + remaining := walSize - physPos + + // Need at least a header to proceed. + if remaining < uint64(walEntryHeaderSize) { + // Too small for a header — skip padding at end of region. + pos += remaining + continue + } + + // Read header. + headerBuf := make([]byte, walEntryHeaderSize) + absOff := int64(walOffset + physPos) + if _, err := fd.ReadAt(headerBuf, absOff); err != nil { + return fmt.Errorf("ScanFrom: read header at WAL+%d: %w", physPos, err) + } + + entryType := headerBuf[16] + lengthField := binary.LittleEndian.Uint32(headerBuf[26:]) + + // Calculate entry size based on type. + var payloadLen uint64 + switch entryType { + case EntryTypePadding: + entrySize := uint64(walEntryHeaderSize) + uint64(lengthField) + pos += entrySize + continue + case EntryTypeWrite: + payloadLen = uint64(lengthField) + default: + // TRIM, BARRIER: no data payload + } + + entrySize := uint64(walEntryHeaderSize) + payloadLen + + // Read full entry. + fullBuf := make([]byte, entrySize) + if physPos+entrySize <= walSize { + if _, err := fd.ReadAt(fullBuf, absOff); err != nil { + return fmt.Errorf("ScanFrom: read entry at WAL+%d: %w", physPos, err) + } + } else { + // Entry should not span WAL boundary (padding prevents this), + // but guard against it. + return fmt.Errorf("ScanFrom: entry at WAL+%d spans boundary", physPos) + } + + entry, err := DecodeWALEntry(fullBuf) + if err != nil { + // CRC failure — stop scanning (torn write). + return nil + } + + if entry.LSN >= fromLSN && (entry.Type == EntryTypeWrite || entry.Type == EntryTypeTrim) { + if err := fn(&entry); err != nil { + return err + } + } + + pos += entrySize + } + + return nil +} + // Sync fsyncs the underlying file descriptor. func (w *WALWriter) Sync() error { return w.fd.Sync()