From 499e244b8e05162c05293497d3e8b8691753d6ed Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Wed, 25 Mar 2026 01:52:35 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20durable=20progress=20truth=20=E2=80=94?= =?UTF-8?q?=20replicaFlushedLSN=20in=20barrier=20(CP13-3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Barrier response extended from 1-byte status to 9-byte payload carrying the replica's durable WAL progress (FlushedLSN). Updated only after successful fd.Sync(), never on receive/append/send. Replica side: new flushedLSN field on ReplicaReceiver, advanced only in handleBarrier after proven contiguous receipt + sync. max() guard prevents regression. Shipper side: new replicaFlushedLSN (authoritative) replacing ShippedLSN (diagnostic only). Monotonic CAS update from barrier response. hasFlushedProgress flag tracks whether replica supports the extended protocol. ShipperGroup: MinReplicaFlushedLSN() returns (uint64, bool) — minimum across shippers with known progress. (0, false) for empty groups or legacy replicas. Backward compat: 1-byte legacy responses decoded as FlushedLSN=0. Legacy replicas explicitly excluded from sync_all correctness. 7 new tests: roundtrip, backward compat, flush-only-after-sync, not-on-receive, shipper update, monotonicity, group minimum. Co-Authored-By: Claude Opus 4.6 (1M context) --- weed/storage/blockvol/repl_proto.go | 26 +- weed/storage/blockvol/replica_apply.go | 9 + weed/storage/blockvol/replica_barrier.go | 13 +- weed/storage/blockvol/shipper_group.go | 24 + .../blockvol/sync_all_protocol_test.go | 1440 +++++++++++++++++ weed/storage/blockvol/wal_shipper.go | 49 +- 6 files changed, 1551 insertions(+), 10 deletions(-) create mode 100644 weed/storage/blockvol/sync_all_protocol_test.go diff --git a/weed/storage/blockvol/repl_proto.go b/weed/storage/blockvol/repl_proto.go index a1b95def2..692448df0 100644 --- a/weed/storage/blockvol/repl_proto.go +++ b/weed/storage/blockvol/repl_proto.go @@ -34,8 +34,32 @@ type BarrierRequest struct { } // BarrierResponse is the replica's reply to a barrier request. +// Wire format: [1B status][8B flushedLSN] = 9 bytes. +// Legacy replicas send only 1 byte (FlushedLSN defaults to 0). type BarrierResponse struct { - Status byte + Status byte + FlushedLSN uint64 // replica's durable WAL progress after this barrier +} + +// EncodeBarrierResponse serializes a BarrierResponse (1+8 = 9 bytes). +func EncodeBarrierResponse(resp BarrierResponse) []byte { + buf := make([]byte, 9) + buf[0] = resp.Status + binary.BigEndian.PutUint64(buf[1:9], resp.FlushedLSN) + return buf +} + +// DecodeBarrierResponse deserializes a BarrierResponse. +// Handles both 9-byte (new) and 1-byte (legacy) responses. +func DecodeBarrierResponse(buf []byte) BarrierResponse { + if len(buf) < 1 { + return BarrierResponse{} + } + resp := BarrierResponse{Status: buf[0]} + if len(buf) >= 9 { + resp.FlushedLSN = binary.BigEndian.Uint64(buf[1:9]) + } + return resp } // Frame header: [1B type][4B payload_len]. diff --git a/weed/storage/blockvol/replica_apply.go b/weed/storage/blockvol/replica_apply.go index dd964ef7a..6e7d09d0f 100644 --- a/weed/storage/blockvol/replica_apply.go +++ b/weed/storage/blockvol/replica_apply.go @@ -24,6 +24,7 @@ type ReplicaReceiver struct { mu sync.Mutex receivedLSN uint64 + flushedLSN uint64 // highest LSN durably persisted (fd.Sync completed); updated only in handleBarrier cond *sync.Cond connMu sync.Mutex // protects activeConns @@ -302,6 +303,14 @@ func (r *ReplicaReceiver) ReceivedLSN() uint64 { return r.receivedLSN } +// FlushedLSN returns the highest LSN durably persisted on this replica +// (after successful WAL fd.Sync). Updated only by handleBarrier. +func (r *ReplicaReceiver) FlushedLSN() uint64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.flushedLSN +} + // DataAddr returns the data listener's canonical address (ip:port). // Wildcard listener addresses are resolved using the advertised host // or outbound-IP fallback. diff --git a/weed/storage/blockvol/replica_barrier.go b/weed/storage/blockvol/replica_barrier.go index ffd899524..dd62f74ec 100644 --- a/weed/storage/blockvol/replica_barrier.go +++ b/weed/storage/blockvol/replica_barrier.go @@ -40,7 +40,7 @@ func (r *ReplicaReceiver) handleControlConn(conn net.Conn) { resp := r.handleBarrier(req) - respPayload := []byte{resp.Status} + respPayload := EncodeBarrierResponse(resp) if err := WriteFrame(conn, MsgBarrierResp, respPayload); err != nil { log.Printf("replica: write barrier response: %v", err) return @@ -97,5 +97,14 @@ func (r *ReplicaReceiver) handleBarrier(req BarrierRequest) BarrierResponse { return BarrierResponse{Status: BarrierFsyncFailed} } - return BarrierResponse{Status: BarrierOK} + // Advance durable progress. Only after fd.Sync() succeeds and contiguous + // receipt through req.LSN has been proven (step 2 above). + r.mu.Lock() + if req.LSN > r.flushedLSN { + r.flushedLSN = req.LSN + } + flushed := r.flushedLSN + r.mu.Unlock() + + return BarrierResponse{Status: BarrierOK, FlushedLSN: flushed} } diff --git a/weed/storage/blockvol/shipper_group.go b/weed/storage/blockvol/shipper_group.go index 385cf2d50..ed4d89273 100644 --- a/weed/storage/blockvol/shipper_group.go +++ b/weed/storage/blockvol/shipper_group.go @@ -103,6 +103,30 @@ func (sg *ShipperGroup) Len() int { return len(sg.shippers) } +// MinReplicaFlushedLSN returns the minimum replicaFlushedLSN across all +// shippers that have reported valid progress (HasFlushedProgress == true). +// The bool return indicates whether any shipper has known progress. +// Returns (0, false) for empty groups or groups where no shipper has +// received a valid FlushedLSN response yet. +// Used by WAL retention (CP13-6) to gate WAL reclaim. +func (sg *ShipperGroup) MinReplicaFlushedLSN() (uint64, bool) { + sg.mu.RLock() + defer sg.mu.RUnlock() + var min uint64 + found := false + for _, s := range sg.shippers { + if !s.HasFlushedProgress() { + continue + } + lsn := s.ReplicaFlushedLSN() + if !found || lsn < min { + min = lsn + found = true + } + } + return min, found +} + // Shipper returns the shipper at index i. For internal/test use. func (sg *ShipperGroup) Shipper(i int) *WALShipper { sg.mu.RLock() diff --git a/weed/storage/blockvol/sync_all_protocol_test.go b/weed/storage/blockvol/sync_all_protocol_test.go new file mode 100644 index 000000000..5c1b5d937 --- /dev/null +++ b/weed/storage/blockvol/sync_all_protocol_test.go @@ -0,0 +1,1440 @@ +package blockvol + +// CP13-1: Protocol gap tests for sync_all replication correctness. +// These tests validate invariants from design/sync-all-reconnect-protocol.md +// and design/replication-modes-and-rebuild.md. +// +// Expected baseline (current code): +// Most tests FAIL — they expose missing protocol features. +// Tests that pass confirm already-working behavior. +// +// After CP13-2..CP13-7 implementation, all must PASS. + +import ( + "bytes" + "path/filepath" + "testing" + "time" +) + +// ---------- Durable progress truth ---------- + +// TestReplicaProgress_BarrierUsesFlushedLSN verifies that barrier success is +// gated on replicaFlushedLSN (WAL fdatasync on replica), not sender-side +// LastSentLSN or TCP send completion. +// +// Currently EXPECTED TO FAIL: the barrier protocol does not return +// replicaFlushedLSN; it returns a single status byte. The primary has +// no way to verify what the replica durably persisted. +func TestReplicaProgress_BarrierUsesFlushedLSN(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write data. + for i := uint64(0); i < 5; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatalf("WriteLBA(%d): %v", i, err) + } + } + + // SyncCache triggers barrier. + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache: %v", err) + } + + // After successful SyncCache under sync_all, the primary MUST know + // the replica's durable progress. Currently there's no API for this. + // The shipper only tracks ShippedLSN (send-side), not replica-confirmed. + // + // When CP13-3 is implemented, the shipper (or ReplicaProgress struct) + // will expose ReplicaFlushedLSN. For now, check that ShippedLSN >= 5 + // as a weaker proxy — the real test is that barrier response carries + // the replica's durable LSN. + sg := primary.shipperGroup + if sg == nil { + t.Fatal("shipperGroup is nil") + } + s := sg.Shipper(0) + if s == nil { + t.Fatal("no shipper at index 0") + } + shipped := s.ShippedLSN() + if shipped < 5 { + t.Fatalf("ShippedLSN=%d, expected >=5 — shipper didn't track progress", shipped) + } + + // The REAL invariant (will be testable after CP13-3): + // shippers[0].ReplicaFlushedLSN() >= 5 + // For now, we can't test this — mark as known gap. + t.Log("NOTE: ReplicaFlushedLSN not yet available — ShippedLSN used as weak proxy") +} + +// TestReplicaProgress_FlushedLSNMonotonicWithinEpoch verifies that +// replicaFlushedLSN never decreases within a single epoch. +// +// Currently EXPECTED TO FAIL: replicaFlushedLSN doesn't exist yet. +func TestReplicaProgress_FlushedLSNMonotonicWithinEpoch(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + var lastFlushed uint64 + + for round := 0; round < 5; round++ { + if err := primary.WriteLBA(uint64(round), makeBlock(byte('A'+round))); err != nil { + t.Fatalf("write %d: %v", round, err) + } + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache %d: %v", round, err) + } + + // After CP13-3, this would be: + // flushed := shippers[0].ReplicaFlushedLSN() + // For now use ReceivedLSN as proxy. + flushed := recv.ReceivedLSN() + if flushed < lastFlushed { + t.Fatalf("round %d: flushedLSN went backwards (%d < %d)", round, flushed, lastFlushed) + } + lastFlushed = flushed + } + + if lastFlushed == 0 { + t.Fatal("flushedLSN never advanced from 0") + } +} + +// ---------- Barrier eligibility ---------- + +// TestBarrier_RejectsReplicaNotInSync verifies that barrier only counts +// replicas in InSync state. Degraded, CatchingUp, Disconnected, and +// NeedsRebuild replicas must not satisfy sync_all. +// +// Currently EXPECTED TO FAIL: the shipper has only degraded/healthy binary +// state, no full state machine (Disconnected/Connecting/CatchingUp/InSync/ +// Degraded/NeedsRebuild). +func TestBarrier_RejectsReplicaNotInSync(t *testing.T) { + primary, _ := createSyncAllPair(t) + defer primary.Close() + + // Create a shipper pointing to a dead address. It will never connect. + primary.SetReplicaAddr("127.0.0.1:1", "127.0.0.1:2") // dead ports + + // Write something. + if err := primary.WriteLBA(0, makeBlock('X')); err != nil { + t.Fatalf("write: %v", err) + } + + // SyncCache must fail — the replica is not InSync. + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + if err == nil { + t.Fatal("SyncCache succeeded with dead replica — barrier should have failed") + } + // Good — barrier correctly rejected the non-InSync replica. + case <-time.After(10 * time.Second): + t.Fatal("SyncCache hung — barrier timeout not propagated for dead replica") + } +} + +// TestBarrier_EpochMismatchRejected verifies that a barrier response from +// a stale epoch is rejected even if the replica claims durability. +// +// Currently EXPECTED TO FAIL: barrier protocol checks epoch on the replica +// side, but the primary does not verify the response epoch. +func TestBarrier_EpochMismatchRejected(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write and sync at epoch 1 — should succeed. + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatal(err) + } + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache epoch 1: %v", err) + } + + // Advance primary epoch to 2. Replica stays at epoch 1. + primary.SetEpoch(2) + primary.SetMasterEpoch(2) + + // Write at epoch 2. + if err := primary.WriteLBA(1, makeBlock('B')); err != nil { + t.Fatal(err) + } + + // SyncCache — barrier request at epoch 2, but replica responds at epoch 1. + // This should fail: replica epoch doesn't match primary epoch. + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + if err == nil { + t.Fatal("SyncCache succeeded with epoch mismatch — should be rejected") + } + t.Logf("correctly failed: %v", err) + case <-time.After(10 * time.Second): + t.Fatal("SyncCache hung on epoch mismatch") + } +} + +// ---------- Reconnect and catch-up ---------- + +// TestReconnect_CatchupFromRetainedWal verifies that after a short disconnect, +// the shipper replays retained WAL entries to catch up the replica, then +// transitions to InSync. +// +// Currently EXPECTED TO FAIL: no reconnect handshake or WAL catch-up exists. +func TestReconnect_CatchupFromRetainedWal(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write 3 entries while healthy. + for i := uint64(0); i < 3; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatal(err) + } + } + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache healthy: %v", err) + } + + // Disconnect replica. + recv.Stop() + time.Sleep(50 * time.Millisecond) + + // Write 2 more entries during disconnect (shipped to nowhere). + for i := uint64(3); i < 5; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatal(err) + } + } + + // Reconnect replica. + recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv2.Serve() + defer recv2.Stop() + primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr()) + + // SyncCache after reconnect — must succeed after catch-up. + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + if err != nil { + t.Fatalf("SyncCache after reconnect: %v — catch-up did not work", err) + } + case <-time.After(10 * time.Second): + t.Fatal("SyncCache hung — no catch-up protocol") + } + + // Verify all 5 entries on replica. + replica.flusher.FlushOnce() + for i := uint64(0); i < 5; i++ { + got, err := replica.ReadLBA(i, 4096) + if err != nil { + t.Fatalf("replica ReadLBA(%d): %v", i, err) + } + if got[0] != byte('A'+i) { + t.Fatalf("replica LBA %d: expected %c, got %c", i, 'A'+i, got[0]) + } + } +} + +// TestReconnect_GapBeyondRetainedWal_NeedsRebuild verifies that when the +// replica's gap exceeds the retained WAL range, the system transitions to +// NeedsRebuild instead of silently losing data. +// +// Currently EXPECTED TO FAIL: no WAL retention tracking or NeedsRebuild state. +func TestReconnect_GapBeyondRetainedWal_NeedsRebuild(t *testing.T) { + dir := t.TempDir() + opts := CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 32 * 1024, // tiny WAL — will be reclaimed quickly + DurabilityMode: DurabilitySyncAll, + } + + primary, err := CreateBlockVol(filepath.Join(dir, "primary.blk"), opts) + if err != nil { + t.Fatal(err) + } + defer primary.Close() + primary.SetRole(RolePrimary) + primary.SetEpoch(1) + primary.SetMasterEpoch(1) + primary.lease.Grant(30 * time.Second) + + replica, err := CreateBlockVol(filepath.Join(dir, "replica.blk"), opts) + if err != nil { + t.Fatal(err) + } + defer replica.Close() + replica.SetRole(RoleReplica) + replica.SetEpoch(1) + replica.SetMasterEpoch(1) + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write and sync while healthy. + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatal(err) + } + if err := primary.SyncCache(); err != nil { + t.Fatalf("initial SyncCache: %v", err) + } + + // Disconnect replica. + recv.Stop() + time.Sleep(50 * time.Millisecond) + + // Write enough to fill and reclaim the tiny WAL. + // With 32KB WAL and 4KB blocks, ~7 entries fill it. + for i := uint64(1); i < 50; i++ { + _ = primary.WriteLBA(i%10, makeBlock(byte('0'+i%10))) + } + // Force flusher to reclaim WAL. + primary.flusher.FlushOnce() + primary.flusher.FlushOnce() + + // Reconnect replica — it last saw LSN ~1, but WAL has been reclaimed past that. + recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv2.Serve() + defer recv2.Stop() + primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr()) + + // After CP13-5/CP13-7: the shipper should detect the gap, transition + // to NeedsRebuild, and reject the barrier. SyncCache should fail with + // ErrDurabilityBarrierFailed (not hang, not silently succeed). + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + if err == nil { + t.Fatal("SyncCache succeeded despite unrecoverable WAL gap — should require rebuild") + } + t.Logf("correctly failed: %v", err) + case <-time.After(10 * time.Second): + t.Fatal("SyncCache hung — no rebuild detection") + } +} + +// ---------- WAL retention ---------- + +// TestWalRetention_RequiredReplicaBlocksReclaim verifies that the flusher +// does not reclaim WAL entries that a required replica still needs for catch-up. +// +// Currently EXPECTED TO FAIL: WAL reclaim is driven only by checkpointLSN, +// not replica progress. +func TestWalRetention_RequiredReplicaBlocksReclaim(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write and sync while healthy — replica is caught up. + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatal(err) + } + if err := primary.SyncCache(); err != nil { + t.Fatal(err) + } + + // Disconnect replica. + recv.Stop() + time.Sleep(50 * time.Millisecond) + + // Write more data — replica misses these. + for i := uint64(1); i < 10; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatal(err) + } + } + + // Flush checkpoint — this normally advances WAL tail. + primary.flusher.FlushOnce() + + walTail := primary.wal.LogicalTail() + walHead := primary.wal.LogicalHead() + + // After CP13-6: WAL tail should NOT advance past what the disconnected + // replica has confirmed (flushedLSN ~= 1). The retained range should + // cover entries 2-9 so the replica can catch up. + // + // Currently, the flusher advances WAL tail freely — it doesn't + // consider replica progress. So this test checks whether the + // WAL still retains the entries the replica needs. + if walTail >= walHead { + t.Logf("WAL fully drained (tail=%d head=%d) — entries needed by replica may be lost", walTail, walHead) + // This is the expected failure on current code. + // After CP13-6, this should not happen while a required replica is behind. + } + + // The definitive check (after CP13-6 implementation): + // replicaFlushedLSN := + // if walRetainStartLSN > replicaFlushedLSN + 1 { + // t.Fatal("WAL reclaimed entries needed by required replica") + // } + t.Log("NOTE: WAL retention is not yet replica-progress-aware — test documents the gap") +} + +// ---------- Ship degraded behavior ---------- + +// TestShip_DegradedDoesNotSilentlyCountAsHealthy verifies that when a +// shipper is degraded, Ship() does not silently pretend entries were +// delivered. The primary must know that entries were dropped. +// +// Currently EXPECTED BEHAVIOR: Ship() returns nil when degraded (fire-and-forget). +// This is acceptable for best_effort but problematic for sync_all because +// the primary loses track of the replica gap size. +func TestShip_DegradedDoesNotSilentlyCountAsHealthy(t *testing.T) { + primary, _ := createSyncAllPair(t) + defer primary.Close() + + // Point shipper at dead address — will degrade on first Ship. + primary.SetReplicaAddr("127.0.0.1:1", "127.0.0.1:2") + + // Write — Ship will fail and mark degraded. + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatal(err) + } + // Give shipper time to attempt connection and degrade. + time.Sleep(100 * time.Millisecond) + + sg := primary.shipperGroup + if sg == nil { + t.Fatal("no shipper group") + } + s0 := sg.Shipper(0) + if s0 == nil { + t.Fatal("no shipper at index 0") + } + + // Shipper should be degraded. + if !s0.IsDegraded() { + t.Fatal("shipper not degraded after failed Ship to dead address") + } + + // ShippedLSN should NOT advance past what was actually confirmed. + // Currently ShippedLSN advances on local Ship (before network ACK), + // which is incorrect for sync_all truth tracking. + shipped := s0.ShippedLSN() + t.Logf("ShippedLSN after degraded Ship: %d", shipped) + + // After CP13-3: ShippedLSN should be 0 (nothing confirmed by replica). + // Currently it may be > 0 because Ship() updates it before network delivery. + if shipped > 0 { + t.Log("NOTE: ShippedLSN advanced despite degraded state — sender-side tracking is not authoritative") + } +} + +// ---------- Reconnect edge cases ---------- + +// TestReconnect_EpochChangeDuringCatchup_Aborts verifies that if the primary's +// epoch advances while a replica is in CatchingUp state, the catch-up is +// aborted and the reconnect handshake restarts with the new epoch. +// +// Currently EXPECTED TO FAIL: no CatchingUp state or epoch-aware catch-up. +func TestReconnect_EpochChangeDuringCatchup_Aborts(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write and sync at epoch 1. + for i := uint64(0); i < 3; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatal(err) + } + } + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache epoch 1: %v", err) + } + + // Disconnect replica. + recv.Stop() + time.Sleep(50 * time.Millisecond) + + // Write at epoch 1 (replica misses these). + for i := uint64(3); i < 6; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatal(err) + } + } + + // Advance epoch to 2 BEFORE reconnect. + primary.SetEpoch(2) + primary.SetMasterEpoch(2) + + // Reconnect replica (still at epoch 1). + recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv2.Serve() + defer recv2.Stop() + primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr()) + + // The handshake should detect epoch mismatch and reject the catch-up. + // The replica must update to epoch 2 before it can rejoin. + // SyncCache should fail because the replica can't participate at epoch 1. + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + if err == nil { + t.Fatal("SyncCache succeeded with epoch mismatch during catch-up — should abort") + } + t.Logf("correctly failed: %v", err) + case <-time.After(10 * time.Second): + t.Fatal("SyncCache hung — epoch change during catch-up not handled") + } +} + +// TestReconnect_CatchupTimeout_TransitionsDegraded verifies that if WAL +// catch-up takes longer than the configured timeout, the replica transitions +// to Degraded (not stuck in CatchingUp forever). +// +// Currently EXPECTED TO FAIL: no catch-up timeout mechanism. +func TestReconnect_CatchupTimeout_TransitionsDegraded(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write and sync. + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatal(err) + } + if err := primary.SyncCache(); err != nil { + t.Fatal(err) + } + + // Disconnect replica. + recv.Stop() + time.Sleep(50 * time.Millisecond) + + // Write a lot while disconnected — creates large catch-up gap. + for i := uint64(1); i < 50; i++ { + if err := primary.WriteLBA(i%10, makeBlock(byte('0'+i%10))); err != nil { + t.Fatal(err) + } + } + + // Reconnect replica — catch-up will be needed. + recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv2.Serve() + defer recv2.Stop() + primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr()) + + // After CP13-5: catch-up should either complete within timeout + // or transition to Degraded. SyncCache should not hang indefinitely. + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + // Either success (catch-up completed) or error (timeout/degraded). + // Both are acceptable — the key is it doesn't hang. + t.Logf("SyncCache returned: %v (catch-up bounded)", err) + case <-time.After(15 * time.Second): + t.Fatal("SyncCache hung >15s — catch-up timeout not implemented") + } +} + +// ---------- Barrier edge cases ---------- + +// TestBarrier_DuringCatchup_Rejected verifies that a barrier request is +// rejected while the replica is in CatchingUp state. Only InSync replicas +// may participate in sync_all barriers. +// +// Currently EXPECTED TO FAIL: no CatchingUp state exists. +func TestBarrier_DuringCatchup_Rejected(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Initial write + sync (healthy). + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatal(err) + } + if err := primary.SyncCache(); err != nil { + t.Fatal(err) + } + + // Disconnect and write more (creates gap). + recv.Stop() + time.Sleep(50 * time.Millisecond) + + for i := uint64(1); i < 10; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatal(err) + } + } + + // Reconnect — replica is behind and needs catch-up. + recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv2.Serve() + defer recv2.Stop() + primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr()) + + // Immediately attempt SyncCache — replica should be in CatchingUp, + // not yet InSync. Barrier must either fail fast or wait for catch-up + // to complete (not succeed prematurely with stale replica state). + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + if err == nil { + // After CP13-4/5: this would only succeed if catch-up completed + // fast enough. We need to verify the replica actually has all data. + replica.flusher.FlushOnce() + for i := uint64(0); i < 10; i++ { + got, _ := replica.ReadLBA(i, 4096) + if got[0] != byte('A'+i) { + t.Fatalf("SyncCache returned nil but replica missing LBA %d — barrier accepted during catch-up gap", i) + } + } + t.Log("SyncCache succeeded — replica must have completed catch-up") + } else { + t.Logf("SyncCache correctly failed during catch-up: %v", err) + } + case <-time.After(15 * time.Second): + t.Fatal("SyncCache hung — barrier not bounded during catch-up phase") + } +} + +// TestBarrier_ReplicaSlowFsync_Timeout verifies that a barrier does not +// hang indefinitely when the replica's fdatasync takes too long. +// The barrier must timeout and return an error. +// +// Currently EXPECTED TO FAIL: barrier timeout is 5s (barrierTimeout constant) +// which works, but this test validates the behavior explicitly. +func TestBarrier_ReplicaSlowFsync_Timeout(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write data. + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatal(err) + } + if err := primary.SyncCache(); err != nil { + t.Fatalf("initial SyncCache: %v", err) + } + + // Now kill the replica's control channel but keep data channel alive. + // This simulates a replica that received entries but can't respond to barriers + // (e.g., stuck in a long fdatasync). + recv.Stop() + time.Sleep(50 * time.Millisecond) + + // Write more — these go to the degraded shipper. + if err := primary.WriteLBA(1, makeBlock('B')); err != nil { + t.Fatal(err) + } + + // SyncCache — the barrier should timeout, not hang forever. + start := time.Now() + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + elapsed := time.Since(start) + if err == nil { + t.Fatal("SyncCache succeeded with dead replica — should have timed out") + } + // Barrier timeout is 5s. The SyncCache should return within ~6s + // (5s barrier + some overhead). + if elapsed > 12*time.Second { + t.Fatalf("barrier took %v — timeout not working (expected <12s)", elapsed) + } + t.Logf("barrier failed in %v: %v", elapsed, err) + case <-time.After(15 * time.Second): + t.Fatal("SyncCache hung >15s — barrier timeout broken") + } +} + +// ---------- WAL retention edge cases ---------- + +// TestWalRetention_TimeoutTriggersNeedsRebuild verifies that a replica +// disconnected for longer than the retention timeout is automatically +// transitioned to NeedsRebuild, and the WAL hold is released. +// +// Currently EXPECTED TO FAIL: no retention timeout mechanism. +func TestWalRetention_TimeoutTriggersNeedsRebuild(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write and sync while healthy. + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatal(err) + } + if err := primary.SyncCache(); err != nil { + t.Fatal(err) + } + + // Disconnect replica. + recv.Stop() + time.Sleep(50 * time.Millisecond) + + // Write more — replica misses these. + for i := uint64(1); i < 10; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatal(err) + } + } + + // After CP13-6: there should be a retention timeout. If the replica + // doesn't reconnect within this timeout, it auto-transitions to + // NeedsRebuild and the WAL retention hold is released. + // + // For now, verify the WAL state after flushing: + primary.flusher.FlushOnce() + + walTail := primary.wal.LogicalTail() + walHead := primary.wal.LogicalHead() + + // After CP13-6: with retention timeout expired, WAL tail should + // advance freely (not pinned by dead replica). + // After CP13-6 with retention hold: WAL tail should NOT advance past + // what the replica confirmed, until timeout releases the hold. + // + // Currently: WAL drains freely (no replica-aware retention). + t.Logf("WAL after flush: tail=%d head=%d (retention timeout not implemented)", walTail, walHead) + + // The real assertion (after CP13-6): + // - Before timeout: WAL retains entries for replica + // - After timeout: replica transitions to NeedsRebuild, WAL released + // - Shipper state should reflect NeedsRebuild + sg := primary.shipperGroup + if sg != nil { + s := sg.Shipper(0) + if s != nil { + // After CP13-4/6: s.State() should be NeedsRebuild after timeout. + // Currently only IsDegraded() exists. + if !s.IsDegraded() { + t.Log("NOTE: shipper not degraded — retention timeout hasn't triggered state change") + } else { + t.Log("shipper is degraded (but no NeedsRebuild state yet)") + } + } + } +} + +// TestWalRetention_MaxBytesTriggersNeedsRebuild verifies that when the +// replica lag exceeds the configured maximum retention bytes, the replica +// is transitioned to NeedsRebuild and the WAL hold is released. +// +// Currently EXPECTED TO FAIL: no retention max-bytes mechanism. +func TestWalRetention_MaxBytesTriggersNeedsRebuild(t *testing.T) { + dir := t.TempDir() + opts := CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 64 * 1024, // small WAL + DurabilityMode: DurabilitySyncAll, + } + + primary, err := CreateBlockVol(filepath.Join(dir, "primary.blk"), opts) + if err != nil { + t.Fatal(err) + } + defer primary.Close() + primary.SetRole(RolePrimary) + primary.SetEpoch(1) + primary.SetMasterEpoch(1) + primary.lease.Grant(30 * time.Second) + + replica, err := CreateBlockVol(filepath.Join(dir, "replica.blk"), opts) + if err != nil { + t.Fatal(err) + } + defer replica.Close() + replica.SetRole(RoleReplica) + replica.SetEpoch(1) + replica.SetMasterEpoch(1) + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Initial sync. + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatal(err) + } + if err := primary.SyncCache(); err != nil { + t.Fatal(err) + } + + // Disconnect replica. + recv.Stop() + time.Sleep(50 * time.Millisecond) + + // Write enough to far exceed any reasonable retention budget. + // 64KB WAL ≈ 15 entries at 4KB each. Write 100 entries to overflow. + for i := uint64(0); i < 100; i++ { + _ = primary.WriteLBA(i%16, makeBlock(byte('0'+i%10))) + } + + // Flush to reclaim WAL space. + primary.flusher.FlushOnce() + primary.flusher.FlushOnce() + + // After CP13-6: the lag (primaryHeadLSN - replicaFlushedLSN) exceeds + // maxRetentionBytes. The shipper should auto-transition to NeedsRebuild. + sg := primary.shipperGroup + if sg != nil { + s := sg.Shipper(0) + if s != nil { + if s.IsDegraded() { + // Good — at least degraded. After CP13-4/6, should be NeedsRebuild. + t.Log("shipper is degraded (expected NeedsRebuild after CP13-6)") + } else { + t.Log("NOTE: shipper not even degraded despite massive lag") + } + } + } + + // The real assertion (after CP13-6): + // s.State() == ReplicaStateNeedsRebuild + // And WAL tail has advanced freely (not pinned). + t.Log("NOTE: max-bytes retention trigger not implemented yet — test documents the gap") +} + +// ---------- Data integrity ---------- + +// TestCatchupReplay_DataIntegrity_AllBlocksMatch verifies that after a +// WAL catch-up, every block on the replica matches the primary exactly. +// +// Currently EXPECTED TO FAIL: no catch-up protocol — replica stays behind. +func TestCatchupReplay_DataIntegrity_AllBlocksMatch(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Phase 1: Write 5 blocks while healthy. + for i := uint64(0); i < 5; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatal(err) + } + } + if err := primary.SyncCache(); err != nil { + t.Fatal(err) + } + + // Phase 2: Disconnect, write 5 more blocks (gap). + recv.Stop() + time.Sleep(50 * time.Millisecond) + + for i := uint64(5); i < 10; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatal(err) + } + } + + // Phase 3: Reconnect — catch-up should deliver blocks 5-9. + recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv2.Serve() + defer recv2.Stop() + primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr()) + + // Wait for catch-up + barrier. + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + if err != nil { + t.Fatalf("SyncCache after reconnect: %v — catch-up failed, can't verify integrity", err) + } + case <-time.After(10 * time.Second): + t.Fatal("SyncCache hung — catch-up not implemented") + } + + // Phase 4: Verify every block matches on primary and replica. + primary.flusher.FlushOnce() + replica.flusher.FlushOnce() + + for i := uint64(0); i < 10; i++ { + pData, err := primary.ReadLBA(i, 4096) + if err != nil { + t.Fatalf("primary ReadLBA(%d): %v", i, err) + } + rData, err := replica.ReadLBA(i, 4096) + if err != nil { + t.Fatalf("replica ReadLBA(%d): %v", i, err) + } + if !bytes.Equal(pData, rData) { + t.Fatalf("LBA %d: primary=%c replica=%c — data divergence after catch-up", + i, pData[0], rData[0]) + } + } +} + +// TestCatchupReplay_DuplicateEntry_Idempotent verifies that if the catch-up +// replays an entry the replica already has (overlap between shipped and +// catch-up range), the replay is idempotent — no double-apply, no error. +// +// Currently EXPECTED TO FAIL: no catch-up protocol. +func TestCatchupReplay_DuplicateEntry_Idempotent(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write 5 entries and sync — replica has all 5. + for i := uint64(0); i < 5; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatal(err) + } + } + if err := primary.SyncCache(); err != nil { + t.Fatal(err) + } + + replicaLSN := recv.ReceivedLSN() + if replicaLSN < 5 { + t.Fatalf("replica only at LSN %d, expected >=5 before disconnect", replicaLSN) + } + + // Disconnect briefly, write 2 more. + recv.Stop() + time.Sleep(50 * time.Millisecond) + + for i := uint64(5); i < 7; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatal(err) + } + } + + // Reconnect. After CP13-5, the catch-up handshake might replay from + // an LSN the replica already has (overlap). The replay must be safe: + // - entries already applied should be skipped (LSN <= replicaFlushedLSN) + // - no double-write to the same WAL position + recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv2.Serve() + defer recv2.Stop() + primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr()) + + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + if err != nil { + t.Fatalf("SyncCache after reconnect with overlap: %v", err) + } + case <-time.After(10 * time.Second): + t.Fatal("SyncCache hung — catch-up with duplicate handling not implemented") + } + + // Verify data integrity — all 7 blocks must be correct. + replica.flusher.FlushOnce() + for i := uint64(0); i < 7; i++ { + got, err := replica.ReadLBA(i, 4096) + if err != nil { + t.Fatalf("replica ReadLBA(%d): %v", i, err) + } + if got[0] != byte('A'+i) { + t.Fatalf("LBA %d: expected %c, got %c — duplicate entry corrupted data", i, 'A'+i, got[0]) + } + } +} + +// ---------- best_effort mode ---------- + +// TestBestEffort_FlushSucceeds_ReplicaDown verifies that under best_effort +// mode, SyncCache (FLUSH) succeeds even when all replicas are down. +// best_effort = primary-local durability only. +// +// Currently EXPECTED: PASS — best_effort should already work this way. +func TestBestEffort_FlushSucceeds_ReplicaDown(t *testing.T) { + pDir := t.TempDir() + opts := CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 256 * 1024, + DurabilityMode: DurabilityBestEffort, + } + + primary, err := CreateBlockVol(filepath.Join(pDir, "primary.blockvol"), opts) + if err != nil { + t.Fatal(err) + } + defer primary.Close() + primary.SetRole(RolePrimary) + primary.SetEpoch(1) + primary.SetMasterEpoch(1) + primary.lease.Grant(30 * time.Second) + + // Point shipper at dead address — immediately degraded. + primary.SetReplicaAddr("127.0.0.1:1", "127.0.0.1:2") + + // Write data. + for i := uint64(0); i < 5; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatalf("WriteLBA(%d): %v", i, err) + } + } + + // SyncCache under best_effort with all replicas dead MUST succeed. + // best_effort only requires primary-local durability. + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + if err != nil { + t.Fatalf("best_effort SyncCache failed with dead replica: %v — should succeed (primary-local only)", err) + } + case <-time.After(10 * time.Second): + t.Fatal("best_effort SyncCache hung — should be primary-local only, no barrier wait") + } + + // Verify data is readable from primary. + for i := uint64(0); i < 5; i++ { + got, err := primary.ReadLBA(i, 4096) + if err != nil { + t.Fatalf("ReadLBA(%d): %v", i, err) + } + if got[0] != byte('A'+i) { + t.Fatalf("LBA %d: expected %c, got %c", i, 'A'+i, got[0]) + } + } +} + +// ============================================================ +// CP13-3: Durable Progress Truth Tests +// ============================================================ + +func TestBarrierResp_FlushedLSN_Roundtrip(t *testing.T) { + resp := BarrierResponse{Status: BarrierOK, FlushedLSN: 42} + encoded := EncodeBarrierResponse(resp) + if len(encoded) != 9 { + t.Fatalf("expected 9 bytes, got %d", len(encoded)) + } + decoded := DecodeBarrierResponse(encoded) + if decoded.Status != BarrierOK { + t.Fatalf("status: got %d, want %d", decoded.Status, BarrierOK) + } + if decoded.FlushedLSN != 42 { + t.Fatalf("FlushedLSN: got %d, want 42", decoded.FlushedLSN) + } +} + +func TestBarrierResp_BackwardCompat_1Byte(t *testing.T) { + // Legacy replica sends only 1 status byte. + legacy := []byte{BarrierOK} + decoded := DecodeBarrierResponse(legacy) + if decoded.Status != BarrierOK { + t.Fatalf("status: got %d, want %d", decoded.Status, BarrierOK) + } + if decoded.FlushedLSN != 0 { + t.Fatalf("FlushedLSN should be 0 for legacy response, got %d", decoded.FlushedLSN) + } +} + +func TestReplica_FlushedLSN_OnlyAfterSync(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Before any barrier, FlushedLSN must be 0. + if recv.FlushedLSN() != 0 { + t.Fatalf("FlushedLSN before barrier: got %d, want 0", recv.FlushedLSN()) + } + + // Write data. + primary.SetRole(RolePrimary) + primary.SetEpoch(1) + primary.SetMasterEpoch(1) + primary.lease.Grant(30 * time.Second) + replica.SetRole(RoleReplica) + replica.SetEpoch(1) + replica.SetMasterEpoch(1) + + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatalf("write: %v", err) + } + + // Wait for replica to receive. + waitForReceivedLSN(t, recv, 1, 5*time.Second) + + // ReceivedLSN should be 1, but FlushedLSN still 0 (no barrier yet). + if recv.ReceivedLSN() < 1 { + t.Fatalf("ReceivedLSN: got %d, want >= 1", recv.ReceivedLSN()) + } + if recv.FlushedLSN() != 0 { + t.Fatalf("FlushedLSN should still be 0 before barrier, got %d", recv.FlushedLSN()) + } + + // SyncCache triggers barrier → fd.Sync → FlushedLSN advances. + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache: %v", err) + } + + // Now FlushedLSN should match. + if recv.FlushedLSN() < 1 { + t.Fatalf("FlushedLSN after barrier: got %d, want >= 1", recv.FlushedLSN()) + } +} + +func TestReplica_FlushedLSN_NotOnReceive(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + primary.SetRole(RolePrimary) + primary.SetEpoch(1) + primary.SetMasterEpoch(1) + primary.lease.Grant(30 * time.Second) + replica.SetRole(RoleReplica) + replica.SetEpoch(1) + replica.SetMasterEpoch(1) + + // Write 5 entries — shipped to replica. + for i := 0; i < 5; i++ { + if err := primary.WriteLBA(uint64(i), makeBlock(byte('A'+i))); err != nil { + t.Fatalf("write %d: %v", i, err) + } + } + waitForReceivedLSN(t, recv, 5, 5*time.Second) + + // ReceivedLSN=5 but FlushedLSN must still be 0 (no barrier). + if recv.FlushedLSN() != 0 { + t.Fatalf("FlushedLSN should be 0 without barrier, got %d", recv.FlushedLSN()) + } +} + +func TestShipper_ReplicaFlushedLSN_UpdatedOnBarrier(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + primary.SetRole(RolePrimary) + primary.SetEpoch(1) + primary.SetMasterEpoch(1) + primary.lease.Grant(30 * time.Second) + replica.SetRole(RoleReplica) + replica.SetEpoch(1) + replica.SetMasterEpoch(1) + + // Before barrier, shipper has no flushed progress. + shipper := primary.shipperGroup.Shipper(0) + if shipper == nil { + t.Fatal("no shipper configured") + } + if shipper.ReplicaFlushedLSN() != 0 { + t.Fatalf("ReplicaFlushedLSN before barrier: got %d, want 0", shipper.ReplicaFlushedLSN()) + } + if shipper.HasFlushedProgress() { + t.Fatal("HasFlushedProgress should be false before any barrier") + } + + // Write + SyncCache (barrier). + if err := primary.WriteLBA(0, makeBlock('X')); err != nil { + t.Fatalf("write: %v", err) + } + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache: %v", err) + } + + // Shipper should now have flushed progress. + if !shipper.HasFlushedProgress() { + t.Fatal("HasFlushedProgress should be true after successful barrier") + } + if shipper.ReplicaFlushedLSN() < 1 { + t.Fatalf("ReplicaFlushedLSN after barrier: got %d, want >= 1", shipper.ReplicaFlushedLSN()) + } +} + +func TestShipper_ReplicaFlushedLSN_Monotonic(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + primary.SetRole(RolePrimary) + primary.SetEpoch(1) + primary.SetMasterEpoch(1) + primary.lease.Grant(30 * time.Second) + replica.SetRole(RoleReplica) + replica.SetEpoch(1) + replica.SetMasterEpoch(1) + + shipper := primary.shipperGroup.Shipper(0) + + // Write + sync 3 times. + var prevFlushed uint64 + for round := 0; round < 3; round++ { + if err := primary.WriteLBA(uint64(round), makeBlock(byte('A'+round))); err != nil { + t.Fatalf("write round %d: %v", round, err) + } + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache round %d: %v", round, err) + } + cur := shipper.ReplicaFlushedLSN() + if cur < prevFlushed { + t.Fatalf("round %d: FlushedLSN regressed from %d to %d", round, prevFlushed, cur) + } + prevFlushed = cur + } + if prevFlushed < 3 { + t.Fatalf("final FlushedLSN: got %d, want >= 3", prevFlushed) + } +} + +func TestShipperGroup_MinReplicaFlushedLSN(t *testing.T) { + // Test with no shippers. + emptyGroup := NewShipperGroup(nil) + _, ok := emptyGroup.MinReplicaFlushedLSN() + if ok { + t.Fatal("empty group should return (_, false)") + } + + // Test with shippers that have no progress. + s1 := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 }) + s2 := NewWALShipper("127.0.0.1:9003", "127.0.0.1:9004", func() uint64 { return 1 }) + group := NewShipperGroup([]*WALShipper{s1, s2}) + + _, ok = group.MinReplicaFlushedLSN() + if ok { + t.Fatal("no shipper has flushed progress yet, should return false") + } + + // Simulate s1 getting progress. + s1.replicaFlushedLSN.Store(10) + s1.hasFlushedProgress.Store(true) + + min, ok := group.MinReplicaFlushedLSN() + if !ok { + t.Fatal("s1 has progress, should return true") + } + if min != 10 { + t.Fatalf("min: got %d, want 10", min) + } + + // Simulate s2 getting lower progress. + s2.replicaFlushedLSN.Store(5) + s2.hasFlushedProgress.Store(true) + + min, ok = group.MinReplicaFlushedLSN() + if !ok { + t.Fatal("both have progress, should return true") + } + if min != 5 { + t.Fatalf("min: got %d, want 5 (the lower one)", min) + } +} + +// waitForReceivedLSN polls until the receiver reaches the target LSN or times out. +func waitForReceivedLSN(t *testing.T, recv *ReplicaReceiver, target uint64, timeout time.Duration) { + t.Helper() + deadline := time.After(timeout) + for recv.ReceivedLSN() < target { + select { + case <-deadline: + t.Fatalf("timeout waiting for ReceivedLSN >= %d (got %d)", target, recv.ReceivedLSN()) + default: + time.Sleep(time.Millisecond) + } + } +} diff --git a/weed/storage/blockvol/wal_shipper.go b/weed/storage/blockvol/wal_shipper.go index d45a0c79f..92f825590 100644 --- a/weed/storage/blockvol/wal_shipper.go +++ b/weed/storage/blockvol/wal_shipper.go @@ -31,9 +31,11 @@ type WALShipper struct { ctrlMu sync.Mutex // protects ctrlConn ctrlConn net.Conn - shippedLSN atomic.Uint64 - degraded atomic.Bool - stopped atomic.Bool + shippedLSN atomic.Uint64 // diagnostic: highest LSN sent to TCP socket + replicaFlushedLSN atomic.Uint64 // authoritative: highest LSN durably persisted on replica + hasFlushedProgress atomic.Bool // true once replica returns a valid (non-zero) FlushedLSN + degraded atomic.Bool + stopped atomic.Bool } // NewWALShipper creates a WAL shipper. Connections are established lazily on @@ -53,7 +55,8 @@ func NewWALShipper(dataAddr, controlAddr string, epochFn func() uint64, metrics } // Ship sends a WAL entry to the replica over the data channel. -// On write error, the shipper enters degraded mode permanently. +// On write error, the shipper enters degraded mode. Recovery requires +// the full reconnect protocol. See design/sync-all-reconnect-protocol.md. func (s *WALShipper) Ship(entry *WALEntry) error { if s.stopped.Load() || s.degraded.Load() { return nil @@ -98,7 +101,9 @@ func (s *WALShipper) Ship(entry *WALEntry) error { // Barrier sends a barrier request on the control channel and waits for the // replica to confirm durability up to lsnMax. Returns ErrReplicaDegraded if -// the shipper is in degraded mode. +// the shipper is in degraded mode. Reconnection requires the full reconnect +// protocol (ResumeShipReq handshake + WAL catch-up), not just TCP retry. +// See design/sync-all-reconnect-protocol.md. func (s *WALShipper) Barrier(lsnMax uint64) error { if s.stopped.Load() { return ErrShipperStopped @@ -144,8 +149,23 @@ func (s *WALShipper) Barrier(lsnMax uint64) error { return ErrReplicaDegraded } - switch payload[0] { + resp := DecodeBarrierResponse(payload) + + switch resp.Status { case BarrierOK: + // Update authoritative durable progress (monotonic: only advance). + if resp.FlushedLSN > 0 { + s.hasFlushedProgress.Store(true) + for { + cur := s.replicaFlushedLSN.Load() + if resp.FlushedLSN <= cur { + break + } + if s.replicaFlushedLSN.CompareAndSwap(cur, resp.FlushedLSN) { + break + } + } + } s.recordBarrierMetric(barrierStart, false) return nil case BarrierEpochMismatch: @@ -173,11 +193,26 @@ func (s *WALShipper) recordBarrierMetric(start time.Time, failed bool) { } } -// ShippedLSN returns the highest LSN successfully sent to the replica. +// ShippedLSN returns the highest LSN successfully sent to the replica (diagnostic only). +// This is NOT authoritative for sync durability — use ReplicaFlushedLSN() instead. func (s *WALShipper) ShippedLSN() uint64 { return s.shippedLSN.Load() } +// ReplicaFlushedLSN returns the highest LSN durably persisted on the replica, +// as reported in the barrier response after fd.Sync(). This is the authoritative +// durable progress variable for sync_all correctness. +func (s *WALShipper) ReplicaFlushedLSN() uint64 { + return s.replicaFlushedLSN.Load() +} + +// HasFlushedProgress returns true if the replica has ever reported a valid +// (non-zero) FlushedLSN. Legacy replicas that only support 1-byte barrier +// responses will never set this, and must not count toward sync_all. +func (s *WALShipper) HasFlushedProgress() bool { + return s.hasFlushedProgress.Load() +} + // IsDegraded returns true if the replica is unreachable. func (s *WALShipper) IsDegraded() bool { return s.degraded.Load()