From 16a796e56d7b8608a8121cdc959bef2594f609b9 Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Sun, 1 Mar 2026 20:12:39 -0800 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=204A=20CP2=20=E2=80=94=20WAL=20sh?= =?UTF-8?q?ipping,=20replica=20barrier,=20distributed=20group=20commit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Primary ships WAL entries to replica over TCP (data channel), confirms durability via barrier RPC (control channel). SyncCache runs local fsync and replica barrier in parallel via MakeDistributedSync. When replica is unreachable, shipper enters permanent degraded mode and falls back to local-only sync (Phase 3 behavior). Key design: two separate TCP ports (data+control), contiguous LSN enforcement, epoch equality check, WAL-full retry on replica, cond.Wait-based barrier with configurable timeout, BarrierFsyncFailed status code. Close lifecycle: shipper → receiver → drain → committer → flusher → fd. New files: repl_proto.go, wal_shipper.go, replica_apply.go, replica_barrier.go, dist_group_commit.go Modified: blockvol.go, blockvol_test.go 27 dev tests + 21 QA tests = 48 new tests; 889 total (609 engine + 280 iSCSI), all passing. Co-Authored-By: Claude Opus 4.6 --- weed/storage/blockvol/blockvol.go | 61 +- weed/storage/blockvol/blockvol_test.go | 1104 ++++++++++++++++++ weed/storage/blockvol/dist_group_commit.go | 46 + weed/storage/blockvol/qa_phase4a_cp2_test.go | 1104 ++++++++++++++++++ weed/storage/blockvol/repl_proto.go | 107 ++ weed/storage/blockvol/replica_apply.go | 287 +++++ weed/storage/blockvol/replica_barrier.go | 101 ++ weed/storage/blockvol/wal_shipper.go | 197 ++++ 8 files changed, 3006 insertions(+), 1 deletion(-) create mode 100644 weed/storage/blockvol/dist_group_commit.go create mode 100644 weed/storage/blockvol/qa_phase4a_cp2_test.go create mode 100644 weed/storage/blockvol/repl_proto.go create mode 100644 weed/storage/blockvol/replica_apply.go create mode 100644 weed/storage/blockvol/replica_barrier.go create mode 100644 weed/storage/blockvol/wal_shipper.go diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index f674035e3..dcf276a11 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -7,6 +7,7 @@ import ( "encoding/binary" "errors" "fmt" + "log" "os" "sync" "sync/atomic" @@ -42,6 +43,10 @@ type BlockVol struct { opsOutstanding atomic.Int64 // in-flight Read/Write/Trim/SyncCache ops opsDrained chan struct{} + // Replication fields (Phase 4A CP2). + shipper *WALShipper + replRecv *ReplicaReceiver + // Fencing fields (Phase 4A). epoch atomic.Uint64 // current persisted epoch masterEpoch atomic.Uint64 // expected epoch from master @@ -253,6 +258,12 @@ func (v *BlockVol) appendWithRetry(entry *WALEntry) (uint64, error) { if v.closed.Load() { return 0, ErrVolumeClosed } + + // Ship to replica if configured (fire-and-forget). + if v.shipper != nil { + v.shipper.Ship(entry) + } + return walOff, nil } @@ -505,11 +516,59 @@ func (v *BlockVol) SyncCache() error { return v.groupCommit.Submit() } +// SetReplicaAddr configures the replica endpoint and creates a WAL shipper +// with distributed group commit. Must be called before any writes. +func (v *BlockVol) SetReplicaAddr(dataAddr, ctrlAddr string) { + v.shipper = NewWALShipper(dataAddr, ctrlAddr, func() uint64 { + return v.epoch.Load() + }) + + // Replace the group committer's sync function with a distributed version. + v.groupCommit.Stop() + v.groupCommit = NewGroupCommitter(GroupCommitterConfig{ + SyncFunc: MakeDistributedSync(v.fd.Sync, v.shipper, v), + MaxDelay: v.config.GroupCommitMaxDelay, + MaxBatch: v.config.GroupCommitMaxBatch, + LowWatermark: v.config.GroupCommitLowWatermark, + OnDegraded: func() { v.healthy.Store(false) }, + PostSyncCheck: v.writeGate, + }) + go v.groupCommit.Run() +} + +// StartReplicaReceiver starts listening for replicated WAL entries from a primary. +func (v *BlockVol) StartReplicaReceiver(dataAddr, ctrlAddr string) error { + recv, err := NewReplicaReceiver(v, dataAddr, ctrlAddr) + if err != nil { + return err + } + v.replRecv = recv + recv.Serve() + return nil +} + +// degradeReplica marks the shipper as degraded and logs a warning. +func (v *BlockVol) degradeReplica(err error) { + if v.shipper != nil { + v.shipper.degraded.Store(true) + } + log.Printf("blockvol: replica degraded: %v", err) +} + // Close shuts down the block volume and closes the file. -// Shutdown order: drain in-flight ops → group committer → flusher → final flush → close fd. +// Shutdown order: shipper → replica receiver → drain ops → group committer → flusher → final flush → close fd. func (v *BlockVol) Close() error { v.closed.Store(true) + // Stop shipper first: no more Ship calls. + if v.shipper != nil { + v.shipper.Stop() + } + // Stop replica receiver: no more barrier waits. + if v.replRecv != nil { + v.replRecv.Stop() + } + // Drain in-flight ops: beginOp checks closed and returns ErrVolumeClosed, // so no new ops can start. Wait for existing ones to finish (max 5s). if v.opsOutstanding.Load() > 0 { diff --git a/weed/storage/blockvol/blockvol_test.go b/weed/storage/blockvol/blockvol_test.go index 7e887324e..06f42c655 100644 --- a/weed/storage/blockvol/blockvol_test.go +++ b/weed/storage/blockvol/blockvol_test.go @@ -4,6 +4,9 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" + "io" + "net" "os" "path/filepath" "sync" @@ -90,6 +93,40 @@ func TestBlockVol(t *testing.T) { {name: "blockvol_gotcha_a_lease_expired", run: testBlockvolGotchaALeaseExpired}, // Phase 4A CP1: P3-BUG-9 dirty map. {name: "dirty_map_power_of_2_panics", run: testDirtyMapPowerOf2Panics}, + // Phase 4A CP2: Replication wire protocol. + {name: "frame_roundtrip", run: testFrameRoundtrip}, + {name: "frame_large_payload", run: testFrameLargePayload}, + // Phase 4A CP2: WAL shipper. + {name: "ship_single_entry", run: testShipSingleEntry}, + {name: "ship_batch", run: testShipBatch}, + {name: "ship_epoch_mismatch_dropped", run: testShipEpochMismatchDropped}, + {name: "ship_degraded_on_error", run: testShipDegradedOnError}, + {name: "ship_no_replica_noop", run: testShipNoReplicaNoop}, + // Phase 4A CP2: Replica apply. + {name: "replica_apply_entry", run: testReplicaApplyEntry}, + {name: "replica_reject_stale_epoch", run: testReplicaRejectStaleEpoch}, + {name: "replica_apply_updates_dirty_map", run: testReplicaApplyUpdatesDirtyMap}, + {name: "replica_reject_duplicate_lsn", run: testReplicaRejectDuplicateLSN}, + {name: "replica_flusher_works", run: testReplicaFlusherWorks}, + // Phase 4A CP2: Replica barrier. + {name: "barrier_already_received", run: testBarrierAlreadyReceived}, + {name: "barrier_wait_for_entries", run: testBarrierWaitForEntries}, + {name: "barrier_timeout", run: testBarrierTimeout}, + {name: "barrier_epoch_mismatch_fast_fail", run: testBarrierEpochMismatchFastFail}, + {name: "barrier_concurrent_append", run: testBarrierConcurrentAppend}, + // Phase 4A CP2: Distributed group commit. + {name: "dist_commit_both_pass", run: testDistCommitBothPass}, + {name: "dist_commit_local_fail", run: testDistCommitLocalFail}, + {name: "dist_commit_remote_fail_degrades", run: testDistCommitRemoteFailDegrades}, + {name: "dist_commit_no_replica", run: testDistCommitNoReplica}, + // Phase 4A CP2: BlockVol integration. + {name: "blockvol_write_with_replica", run: testBlockvolWriteWithReplica}, + {name: "blockvol_no_replica_compat", run: testBlockvolNoReplicaCompat}, + // Phase 4A CP2 bug fixes. + {name: "replica_reject_future_epoch", run: testReplicaRejectFutureEpoch}, + {name: "replica_reject_lsn_gap", run: testReplicaRejectLSNGap}, + {name: "barrier_fsync_failed_status", run: testBarrierFsyncFailedStatus}, + {name: "barrier_configurable_timeout", run: testBarrierConfigurableTimeout}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -2112,3 +2149,1070 @@ func testDirtyMapPowerOf2Panics(t *testing.T) { }() NewDirtyMap(3) // should panic } + +// ============================================================================= +// Phase 4A CP2: Replication wire protocol tests +// ============================================================================= + +func testFrameRoundtrip(t *testing.T) { + // Test frame write + read roundtrip with various payloads. + tests := []struct { + msgType byte + payload []byte + }{ + {MsgWALEntry, []byte("hello")}, + {MsgBarrierReq, EncodeBarrierRequest(BarrierRequest{Vid: 1, LSN: 42, Epoch: 7})}, + {MsgBarrierResp, []byte{BarrierOK}}, + {0xFF, []byte{}}, // empty payload + } + for _, tc := range tests { + var buf bytes.Buffer + if err := WriteFrame(&buf, tc.msgType, tc.payload); err != nil { + t.Fatalf("WriteFrame: %v", err) + } + gotType, gotPayload, err := ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame: %v", err) + } + if gotType != tc.msgType { + t.Errorf("type: got 0x%02x, want 0x%02x", gotType, tc.msgType) + } + if !bytes.Equal(gotPayload, tc.payload) { + t.Errorf("payload mismatch") + } + } +} + +func testFrameLargePayload(t *testing.T) { + payload := make([]byte, 1024*1024) // 1MB + for i := range payload { + payload[i] = byte(i % 256) + } + var buf bytes.Buffer + if err := WriteFrame(&buf, MsgWALEntry, payload); err != nil { + t.Fatalf("WriteFrame: %v", err) + } + gotType, gotPayload, err := ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame: %v", err) + } + if gotType != MsgWALEntry { + t.Errorf("type: got 0x%02x, want 0x%02x", gotType, MsgWALEntry) + } + if !bytes.Equal(gotPayload, payload) { + t.Error("large payload mismatch") + } +} + +// ============================================================================= +// Phase 4A CP2: WAL shipper tests +// ============================================================================= + +// mockDataServer starts a TCP server that reads frames and collects them. +func mockDataServer(t *testing.T) (addr string, frames *[][]byte, done chan struct{}) { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + collected := &[][]byte{} + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + conn, err := ln.Accept() + if err != nil { + return + } + defer conn.Close() + for { + _, payload, err := ReadFrame(conn) + if err != nil { + return + } + *collected = append(*collected, payload) + } + }() + t.Cleanup(func() { ln.Close() }) + return ln.Addr().String(), collected, doneCh +} + +// mockCtrlServer starts a TCP server that reads barrier requests and responds OK. +func mockCtrlServer(t *testing.T, respStatus byte) (addr string, done chan struct{}) { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + conn, err := ln.Accept() + if err != nil { + return + } + defer conn.Close() + for { + msgType, _, err := ReadFrame(conn) + if err != nil { + return + } + if msgType == MsgBarrierReq { + WriteFrame(conn, MsgBarrierResp, []byte{respStatus}) + } + } + }() + t.Cleanup(func() { ln.Close() }) + return ln.Addr().String(), doneCh +} + +func testShipSingleEntry(t *testing.T) { + dataAddr, frames, done := mockDataServer(t) + ctrlAddr, _ := mockCtrlServer(t, BarrierOK) + + epoch := uint64(1) + s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }) + defer s.Stop() + + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} + if err := s.Ship(entry); err != nil { + t.Fatalf("Ship: %v", err) + } + + if s.ShippedLSN() != 1 { + t.Errorf("ShippedLSN: got %d, want 1", s.ShippedLSN()) + } + + s.Stop() + <-done + + if len(*frames) != 1 { + t.Fatalf("frames: got %d, want 1", len(*frames)) + } + decoded, err := DecodeWALEntry((*frames)[0]) + if err != nil { + t.Fatalf("decode: %v", err) + } + if decoded.LSN != 1 { + t.Errorf("LSN: got %d, want 1", decoded.LSN) + } +} + +func testShipBatch(t *testing.T) { + dataAddr, frames, done := mockDataServer(t) + ctrlAddr, _ := mockCtrlServer(t, BarrierOK) + + epoch := uint64(1) + s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }) + defer s.Stop() + + for i := uint64(1); i <= 5; i++ { + entry := &WALEntry{LSN: i, Epoch: 1, Type: EntryTypeTrim, LBA: i, Length: 4096} + if err := s.Ship(entry); err != nil { + t.Fatalf("Ship(%d): %v", i, err) + } + } + + if s.ShippedLSN() != 5 { + t.Errorf("ShippedLSN: got %d, want 5", s.ShippedLSN()) + } + + s.Stop() + <-done + + if len(*frames) != 5 { + t.Fatalf("frames: got %d, want 5", len(*frames)) + } +} + +func testShipEpochMismatchDropped(t *testing.T) { + // Use a real server so connections work, but verify no frames arrive. + dataAddr, frames, done := mockDataServer(t) + ctrlAddr, _ := mockCtrlServer(t, BarrierOK) + + epoch := uint64(2) // shipper epoch is 2 + s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }) + + // First ship a valid entry to establish connection. + validEntry := &WALEntry{LSN: 1, Epoch: 2, Type: EntryTypeTrim, LBA: 0, Length: 4096} + if err := s.Ship(validEntry); err != nil { + t.Fatalf("Ship valid: %v", err) + } + + // Now ship an entry with old epoch 1 — should be silently dropped. + staleEntry := &WALEntry{LSN: 2, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} + if err := s.Ship(staleEntry); err != nil { + t.Fatalf("Ship stale: %v", err) + } + + // ShippedLSN should be 1 (only the valid entry). + if s.ShippedLSN() != 1 { + t.Errorf("ShippedLSN should be 1, got %d", s.ShippedLSN()) + } + + s.Stop() + <-done + + // Only the valid entry should have been shipped. + if len(*frames) != 1 { + t.Errorf("frames: got %d, want 1 (stale entry should be dropped)", len(*frames)) + } +} + +func testShipDegradedOnError(t *testing.T) { + // Start a server that immediately closes the connection. + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + go func() { + conn, err := ln.Accept() + if err != nil { + return + } + conn.Close() // close immediately — writes will fail + }() + defer ln.Close() + + ctrlAddr, _ := mockCtrlServer(t, BarrierOK) + epoch := uint64(1) + s := NewWALShipper(ln.Addr().String(), ctrlAddr, func() uint64 { return epoch }) + defer s.Stop() + + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} + // Ship may succeed on first write (kernel buffer) or fail. Keep shipping until degraded. + for i := 0; i < 10; i++ { + entry.LSN = uint64(i + 1) + s.Ship(entry) + if s.IsDegraded() { + break + } + time.Sleep(5 * time.Millisecond) + } + + if !s.IsDegraded() { + t.Error("expected shipper to be degraded after write error") + } + + // Subsequent Ship calls should be no-ops. + entry.LSN = 100 + if err := s.Ship(entry); err != nil { + t.Fatalf("Ship after degraded: %v", err) + } + + // Barrier should return ErrReplicaDegraded. + if err := s.Barrier(100); !errors.Is(err, ErrReplicaDegraded) { + t.Errorf("Barrier after degraded: got %v, want ErrReplicaDegraded", err) + } +} + +func testShipNoReplicaNoop(t *testing.T) { + // A nil shipper should not be called, but test that a stopped shipper is safe. + s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }) + s.Stop() + + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} + if err := s.Ship(entry); err != nil { + t.Fatalf("Ship after stop: %v", err) + } + if err := s.Barrier(1); !errors.Is(err, ErrShipperStopped) { + t.Errorf("Barrier after stop: got %v, want ErrShipperStopped", err) + } +} + +// ============================================================================= +// Phase 4A CP2: Replica apply tests +// ============================================================================= + +func createReplicaVolPair(t *testing.T) (primary *BlockVol, replica *BlockVol) { + t.Helper() + pDir := t.TempDir() + rDir := t.TempDir() + opts := CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 256 * 1024, + } + p, err := CreateBlockVol(filepath.Join(pDir, "primary.blockvol"), opts) + if err != nil { + t.Fatalf("CreateBlockVol primary: %v", err) + } + r, err := CreateBlockVol(filepath.Join(rDir, "replica.blockvol"), opts) + if err != nil { + p.Close() + t.Fatalf("CreateBlockVol replica: %v", err) + } + return p, r +} + +func testReplicaApplyEntry(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + // Connect to data port and send a WAL entry. + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + data := makeBlock('X') + entry := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: data} + encoded, _ := entry.Encode() + if err := WriteFrame(conn, MsgWALEntry, encoded); err != nil { + t.Fatal(err) + } + + // Wait for apply. + deadline := time.After(2 * time.Second) + for { + if recv.ReceivedLSN() >= 1 { + break + } + select { + case <-deadline: + t.Fatal("timeout waiting for entry to be applied") + default: + time.Sleep(time.Millisecond) + } + } + + if recv.ReceivedLSN() != 1 { + t.Errorf("ReceivedLSN: got %d, want 1", recv.ReceivedLSN()) + } +} + +func testReplicaRejectStaleEpoch(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + replica.epoch.Store(5) // replica at epoch 5 + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Entry with epoch 3 (stale) — should be rejected. + entry := &WALEntry{LSN: 1, Epoch: 3, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} + encoded, _ := entry.Encode() + WriteFrame(conn, MsgWALEntry, encoded) + + // Give it time to process. + time.Sleep(50 * time.Millisecond) + + if recv.ReceivedLSN() != 0 { + t.Errorf("ReceivedLSN should be 0 (stale entry rejected), got %d", recv.ReceivedLSN()) + } +} + +func testReplicaApplyUpdatesDirtyMap(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + data := makeBlock('D') + entry := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeWrite, LBA: 5, Length: 4096, Data: data} + encoded, _ := entry.Encode() + WriteFrame(conn, MsgWALEntry, encoded) + + // Wait for apply. + deadline := time.After(2 * time.Second) + for recv.ReceivedLSN() < 1 { + select { + case <-deadline: + t.Fatal("timeout") + default: + time.Sleep(time.Millisecond) + } + } + + // Check dirty map has an entry for LBA 5. + _, lsn, _, ok := replica.dirtyMap.Get(5) + if !ok { + t.Fatal("dirty map: LBA 5 not found") + } + if lsn != 1 { + t.Errorf("dirty map LSN: got %d, want 1", lsn) + } +} + +func testReplicaRejectDuplicateLSN(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Send LSN=1. + entry := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeTrim, LBA: 0, Length: 4096} + encoded, _ := entry.Encode() + WriteFrame(conn, MsgWALEntry, encoded) + + deadline := time.After(2 * time.Second) + for recv.ReceivedLSN() < 1 { + select { + case <-deadline: + t.Fatal("timeout") + default: + time.Sleep(time.Millisecond) + } + } + + // Send duplicate LSN=1 — should be skipped. + WriteFrame(conn, MsgWALEntry, encoded) + time.Sleep(50 * time.Millisecond) + + // ReceivedLSN should still be 1. + if recv.ReceivedLSN() != 1 { + t.Errorf("ReceivedLSN after dup: got %d, want 1", recv.ReceivedLSN()) + } +} + +func testReplicaFlusherWorks(t *testing.T) { + // Verify the replica vol's flusher can flush replicated entries. + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + data := makeBlock('F') + entry := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: data} + encoded, _ := entry.Encode() + WriteFrame(conn, MsgWALEntry, encoded) + + deadline := time.After(2 * time.Second) + for recv.ReceivedLSN() < 1 { + select { + case <-deadline: + t.Fatal("timeout") + default: + time.Sleep(time.Millisecond) + } + } + + // Trigger flusher and verify data is readable from replica. + replica.flusher.FlushOnce() + + got, err := replica.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA: %v", err) + } + if !bytes.Equal(got, data) { + t.Error("flushed data mismatch on replica") + } +} + +// ============================================================================= +// Phase 4A CP2: Replica barrier tests +// ============================================================================= + +func testBarrierAlreadyReceived(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + // Send an entry first. + dataConn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer dataConn.Close() + + entry := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeTrim, LBA: 0, Length: 4096} + encoded, _ := entry.Encode() + WriteFrame(dataConn, MsgWALEntry, encoded) + + deadline := time.After(2 * time.Second) + for recv.ReceivedLSN() < 1 { + select { + case <-deadline: + t.Fatal("timeout") + default: + time.Sleep(time.Millisecond) + } + } + + // Now send a barrier for LSN=1 — should succeed immediately. + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatal(err) + } + defer ctrlConn.Close() + ctrlConn.SetDeadline(time.Now().Add(5 * time.Second)) + + req := EncodeBarrierRequest(BarrierRequest{LSN: 1, Epoch: 0}) + WriteFrame(ctrlConn, MsgBarrierReq, req) + + msgType, payload, err := ReadFrame(ctrlConn) + if err != nil { + t.Fatal(err) + } + if msgType != MsgBarrierResp { + t.Fatalf("unexpected msg type 0x%02x", msgType) + } + if payload[0] != BarrierOK { + t.Errorf("barrier status: got %d, want BarrierOK", payload[0]) + } +} + +func testBarrierWaitForEntries(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + // Send barrier BEFORE the entry arrives. + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatal(err) + } + defer ctrlConn.Close() + ctrlConn.SetDeadline(time.Now().Add(5 * time.Second)) + + barrierDone := make(chan byte, 1) + go func() { + req := EncodeBarrierRequest(BarrierRequest{LSN: 1, Epoch: 0}) + WriteFrame(ctrlConn, MsgBarrierReq, req) + _, payload, err := ReadFrame(ctrlConn) + if err != nil { + barrierDone <- 0xFF + return + } + barrierDone <- payload[0] + }() + + // Wait a bit, then send the entry. + time.Sleep(50 * time.Millisecond) + + dataConn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer dataConn.Close() + + entry := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeTrim, LBA: 0, Length: 4096} + encoded, _ := entry.Encode() + WriteFrame(dataConn, MsgWALEntry, encoded) + + select { + case status := <-barrierDone: + if status != BarrierOK { + t.Errorf("barrier status: got %d, want BarrierOK", status) + } + case <-time.After(10 * time.Second): + t.Fatal("barrier did not complete") + } +} + +func testBarrierTimeout(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.barrierTimeout = 100 * time.Millisecond // fast timeout for test + recv.Serve() + defer recv.Stop() + + // Send barrier for LSN=999 that will never arrive. + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatal(err) + } + defer ctrlConn.Close() + ctrlConn.SetDeadline(time.Now().Add(5 * time.Second)) + + req := EncodeBarrierRequest(BarrierRequest{LSN: 999, Epoch: 0}) + WriteFrame(ctrlConn, MsgBarrierReq, req) + + start := time.Now() + _, payload, err := ReadFrame(ctrlConn) + elapsed := time.Since(start) + if err != nil { + t.Fatal(err) + } + if payload[0] != BarrierTimeout { + t.Errorf("barrier status: got %d, want BarrierTimeout", payload[0]) + } + if elapsed > 2*time.Second { + t.Errorf("barrier timeout took %v, expected ~100ms", elapsed) + } +} + +func testBarrierEpochMismatchFastFail(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + replica.epoch.Store(5) + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatal(err) + } + defer ctrlConn.Close() + ctrlConn.SetDeadline(time.Now().Add(5 * time.Second)) + + // Barrier with epoch=3 (mismatch with replica epoch=5). + req := EncodeBarrierRequest(BarrierRequest{LSN: 1, Epoch: 3}) + WriteFrame(ctrlConn, MsgBarrierReq, req) + + start := time.Now() + _, payload, err := ReadFrame(ctrlConn) + elapsed := time.Since(start) + if err != nil { + t.Fatal(err) + } + if payload[0] != BarrierEpochMismatch { + t.Errorf("barrier status: got %d, want BarrierEpochMismatch", payload[0]) + } + // Should be fast — no waiting. + if elapsed > 500*time.Millisecond { + t.Errorf("epoch mismatch took %v, expected fast fail", elapsed) + } +} + +func testBarrierConcurrentAppend(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + // Start barrier waiting for LSN=10. + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatal(err) + } + defer ctrlConn.Close() + ctrlConn.SetDeadline(time.Now().Add(10 * time.Second)) + + barrierDone := make(chan byte, 1) + go func() { + req := EncodeBarrierRequest(BarrierRequest{LSN: 10, Epoch: 0}) + WriteFrame(ctrlConn, MsgBarrierReq, req) + _, payload, err := ReadFrame(ctrlConn) + if err != nil { + barrierDone <- 0xFF + return + } + barrierDone <- payload[0] + }() + + // Stream 10 entries concurrently. + dataConn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer dataConn.Close() + + for i := uint64(1); i <= 10; i++ { + entry := &WALEntry{LSN: i, Epoch: 0, Type: EntryTypeTrim, LBA: i, Length: 4096} + encoded, _ := entry.Encode() + WriteFrame(dataConn, MsgWALEntry, encoded) + } + + select { + case status := <-barrierDone: + if status != BarrierOK { + t.Errorf("barrier status: got %d, want BarrierOK", status) + } + case <-time.After(10 * time.Second): + t.Fatal("barrier did not complete") + } +} + +// ============================================================================= +// Phase 4A CP2: Distributed group commit tests +// ============================================================================= + +func testDistCommitBothPass(t *testing.T) { + syncCalled := atomic.Bool{} + walSync := func() error { + syncCalled.Store(true) + return nil + } + + // Mock shipper with barrier that succeeds. + dataAddr, _, _ := mockDataServer(t) + ctrlAddr, _ := mockCtrlServer(t, BarrierOK) + + v := createTestVol(t) + defer v.Close() + + shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }) + defer shipper.Stop() + + distSync := MakeDistributedSync(walSync, shipper, v) + if err := distSync(); err != nil { + t.Fatalf("distSync: %v", err) + } + if !syncCalled.Load() { + t.Error("local walSync was not called") + } +} + +func testDistCommitLocalFail(t *testing.T) { + walSync := func() error { + return fmt.Errorf("disk error") + } + + dataAddr, _, _ := mockDataServer(t) + ctrlAddr, _ := mockCtrlServer(t, BarrierOK) + + v := createTestVol(t) + defer v.Close() + + shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }) + defer shipper.Stop() + + distSync := MakeDistributedSync(walSync, shipper, v) + err := distSync() + if err == nil { + t.Fatal("expected error from local sync failure") + } +} + +func testDistCommitRemoteFailDegrades(t *testing.T) { + walSync := func() error { return nil } + + dataAddr, _, _ := mockDataServer(t) + // Control server that returns epoch mismatch. + ctrlAddr, _ := mockCtrlServer(t, BarrierEpochMismatch) + + v := createTestVol(t) + defer v.Close() + + shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }) + defer shipper.Stop() + + // Set shipper on vol so degradeReplica can mark it degraded. + v.shipper = shipper + + distSync := MakeDistributedSync(walSync, shipper, v) + // Should NOT return error (local succeeded, remote degraded). + if err := distSync(); err != nil { + t.Fatalf("distSync should not fail on remote error: %v", err) + } + + if !shipper.IsDegraded() { + t.Error("shipper should be degraded after barrier failure") + } + + // Second call should fall back to local-only. + if err := distSync(); err != nil { + t.Fatalf("distSync local-only: %v", err) + } +} + +func testDistCommitNoReplica(t *testing.T) { + syncCalled := atomic.Bool{} + walSync := func() error { + syncCalled.Store(true) + return nil + } + + v := createTestVol(t) + defer v.Close() + + // nil shipper — local-only mode. + distSync := MakeDistributedSync(walSync, nil, v) + if err := distSync(); err != nil { + t.Fatalf("distSync: %v", err) + } + if !syncCalled.Load() { + t.Error("local walSync was not called") + } +} + +// ============================================================================= +// Phase 4A CP2: BlockVol integration tests +// ============================================================================= + +func testBlockvolWriteWithReplica(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + // Start replica receiver. + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + // Configure primary to ship to replica. + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write data on primary. + data := makeBlock('R') + if err := primary.WriteLBA(0, data); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Wait for replica to receive. + deadline := time.After(5 * time.Second) + for recv.ReceivedLSN() < 1 { + select { + case <-deadline: + t.Fatal("timeout waiting for replica") + default: + time.Sleep(time.Millisecond) + } + } + + // Flush replica and read back. + replica.flusher.FlushOnce() + got, err := replica.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("replica ReadLBA: %v", err) + } + if !bytes.Equal(got, data) { + t.Error("replica data mismatch") + } +} + +func testBlockvolNoReplicaCompat(t *testing.T) { + // Verify that a BlockVol without replica works identically to Phase 3. + v := createTestVol(t) + defer v.Close() + + data := makeBlock('N') + if err := v.WriteLBA(0, data); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + if err := v.SyncCache(); err != nil { + t.Fatalf("SyncCache: %v", err) + } + + got, err := v.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA: %v", err) + } + if !bytes.Equal(got, data) { + t.Error("data mismatch in no-replica mode") + } + + // Verify shipper is nil. + if v.shipper != nil { + t.Error("shipper should be nil without SetReplicaAddr") + } +} + +// ============================================================================= +// Phase 4A CP2 bug fix tests +// ============================================================================= + +func testReplicaRejectFutureEpoch(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + replica.epoch.Store(3) // replica at epoch 3 + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Entry with epoch 5 (future) — must be rejected. Replicas must NOT + // accept epoch bumps from WAL stream; only master can change epoch. + entry := &WALEntry{LSN: 1, Epoch: 5, Type: EntryTypeTrim, LBA: 0, Length: 4096} + encoded, _ := entry.Encode() + WriteFrame(conn, MsgWALEntry, encoded) + + time.Sleep(50 * time.Millisecond) + + if recv.ReceivedLSN() != 0 { + t.Errorf("ReceivedLSN should be 0 (future epoch rejected), got %d", recv.ReceivedLSN()) + } +} + +func testReplicaRejectLSNGap(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Send LSN=1 — should succeed. + entry1 := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeTrim, LBA: 0, Length: 4096} + encoded1, _ := entry1.Encode() + WriteFrame(conn, MsgWALEntry, encoded1) + + deadline := time.After(2 * time.Second) + for recv.ReceivedLSN() < 1 { + select { + case <-deadline: + t.Fatal("timeout waiting for LSN 1") + default: + time.Sleep(time.Millisecond) + } + } + + // Send LSN=3 (gap — skips LSN=2) — must be rejected. + entry3 := &WALEntry{LSN: 3, Epoch: 0, Type: EntryTypeTrim, LBA: 1, Length: 4096} + encoded3, _ := entry3.Encode() + WriteFrame(conn, MsgWALEntry, encoded3) + + time.Sleep(50 * time.Millisecond) + + if recv.ReceivedLSN() != 1 { + t.Errorf("ReceivedLSN should be 1 (gap rejected), got %d", recv.ReceivedLSN()) + } +} + +func testBarrierFsyncFailedStatus(t *testing.T) { + // Verify BarrierFsyncFailed is a distinct status code. + if BarrierFsyncFailed == BarrierTimeout { + t.Error("BarrierFsyncFailed should be distinct from BarrierTimeout") + } + if BarrierFsyncFailed == BarrierOK { + t.Error("BarrierFsyncFailed should be distinct from BarrierOK") + } + if BarrierFsyncFailed != 0x03 { + t.Errorf("BarrierFsyncFailed: got 0x%02x, want 0x03", BarrierFsyncFailed) + } +} + +func testBarrierConfigurableTimeout(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + // Set a very short timeout. + recv.barrierTimeout = 50 * time.Millisecond + recv.Serve() + defer recv.Stop() + + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatal(err) + } + defer ctrlConn.Close() + ctrlConn.SetDeadline(time.Now().Add(5 * time.Second)) + + req := EncodeBarrierRequest(BarrierRequest{LSN: 999, Epoch: 0}) + start := time.Now() + WriteFrame(ctrlConn, MsgBarrierReq, req) + + _, payload, err := ReadFrame(ctrlConn) + elapsed := time.Since(start) + if err != nil { + t.Fatal(err) + } + if payload[0] != BarrierTimeout { + t.Errorf("barrier status: got %d, want BarrierTimeout", payload[0]) + } + // Should complete quickly — well under 1s. + if elapsed > 1*time.Second { + t.Errorf("configurable timeout took %v, expected ~50ms", elapsed) + } +} + +// Suppress unused import warnings. +var _ = fmt.Sprintf +var _ io.Reader +var _ net.Conn diff --git a/weed/storage/blockvol/dist_group_commit.go b/weed/storage/blockvol/dist_group_commit.go new file mode 100644 index 000000000..1a489da35 --- /dev/null +++ b/weed/storage/blockvol/dist_group_commit.go @@ -0,0 +1,46 @@ +package blockvol + +import ( + "sync" +) + +// MakeDistributedSync creates a sync function that runs local WAL fsync and +// replica barrier in parallel. If no replica is configured or the replica is +// degraded, it falls back to local-only sync (Phase 3 behavior). +// +// walSync: the local fsync function (typically fd.Sync) +// shipper: the WAL shipper to the replica (may be nil) +// vol: the BlockVol (used to read nextLSN and trigger degradation) +func MakeDistributedSync(walSync func() error, shipper *WALShipper, vol *BlockVol) func() error { + return func() error { + if shipper == nil || shipper.IsDegraded() { + return walSync() + } + + // The highest LSN that needs to be durable is nextLSN-1. + lsnMax := vol.nextLSN.Load() - 1 + + var localErr, remoteErr error + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + localErr = walSync() + }() + go func() { + defer wg.Done() + remoteErr = shipper.Barrier(lsnMax) + }() + wg.Wait() + + if localErr != nil { + return localErr + } + if remoteErr != nil { + // Local succeeded, replica failed — degrade but don't fail the client. + vol.degradeReplica(remoteErr) + return nil + } + return nil + } +} diff --git a/weed/storage/blockvol/qa_phase4a_cp2_test.go b/weed/storage/blockvol/qa_phase4a_cp2_test.go new file mode 100644 index 000000000..dc7c6197c --- /dev/null +++ b/weed/storage/blockvol/qa_phase4a_cp2_test.go @@ -0,0 +1,1104 @@ +package blockvol + +import ( + "bytes" + "errors" + "io" + "math" + "net" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" +) + +// TestQAPhase4ACP2 tests Phase 4A CP2 replication primitives adversarially: +// frame protocol, WAL shipper, replica receiver, barrier, distributed sync. +func TestQAPhase4ACP2(t *testing.T) { + tests := []struct { + name string + run func(t *testing.T) + }{ + // QA-4A-CP2-1: Frame Protocol Adversarial + {name: "frame_max_payload_boundary", run: testQAFrameMaxPayloadBoundary}, + {name: "frame_truncated_header", run: testQAFrameTruncatedHeader}, + {name: "frame_truncated_payload", run: testQAFrameTruncatedPayload}, + {name: "frame_zero_payload", run: testQAFrameZeroPayload}, + {name: "frame_concurrent_writes_no_interleave", run: testQAFrameConcurrentWrites}, + + // QA-4A-CP2-2: WALShipper Adversarial + {name: "shipper_ship_after_stop", run: testQAShipperShipAfterStop}, + {name: "shipper_barrier_after_degraded", run: testQAShipperBarrierAfterDegraded}, + {name: "shipper_stale_epoch_no_shippedlsn", run: testQAShipperStaleEpochNoShippedLSN}, + {name: "shipper_degraded_is_permanent", run: testQAShipperDegradedPermanent}, + {name: "shipper_concurrent_ship_stop", run: testQAShipperConcurrentShipStop}, + + // QA-4A-CP2-3: ReplicaReceiver Adversarial + {name: "receiver_out_of_order_lsn_skipped", run: testQAReceiverOutOfOrderLSN}, + {name: "receiver_concurrent_data_conns", run: testQAReceiverConcurrentDataConns}, + {name: "receiver_future_epoch_rejected", run: testQAReceiverFutureEpochRejected}, + {name: "receiver_barrier_before_entries_waits", run: testQAReceiverBarrierBeforeEntries}, + {name: "receiver_barrier_timeout_no_entries", run: testQAReceiverBarrierTimeoutNoEntries}, + {name: "receiver_barrier_epoch_mismatch", run: testQAReceiverBarrierEpochMismatch}, + {name: "receiver_stop_unblocks_barrier", run: testQAReceiverStopUnblocksBarrier}, + + // QA-4A-CP2-4: DistributedSync Adversarial + {name: "dsync_local_fail_returns_error", run: testQADSyncLocalFailReturnsError}, + {name: "dsync_remote_fail_degrades_not_errors", run: testQADSyncRemoteFailDegrades}, + {name: "dsync_both_fail_returns_local", run: testQADSyncBothFail}, + {name: "dsync_parallel_execution", run: testQADSyncParallelExecution}, + + // QA-4A-CP2-5: End-to-end Adversarial + {name: "e2e_replica_data_matches_primary", run: testQAE2EReplicaDataMatchesPrimary}, + {name: "e2e_close_primary_during_ship", run: testQAE2EClosePrimaryDuringShip}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.run(t) + }) + } +} + +// --- helpers --- + +// createReplicaPair creates a fenced primary + replica vol connected via loopback. +// Returns primary, replica, and a cleanup function. +func createReplicaPair(t *testing.T) (primary, replica *BlockVol) { + t.Helper() + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + // Create primary. + p, err := CreateBlockVol(filepath.Join(dir, "primary.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 512 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol(primary): %v", err) + } + if err := p.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + p.SetMasterEpoch(1) + if err := p.SetRole(RolePrimary); err != nil { + t.Fatalf("SetRole: %v", err) + } + p.lease.Grant(30 * time.Second) + + // Create replica. + r, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 512 * 1024, + }, cfg) + if err != nil { + p.Close() + t.Fatalf("CreateBlockVol(replica): %v", err) + } + if err := r.SetEpoch(1); err != nil { + p.Close() + r.Close() + t.Fatalf("SetEpoch(replica): %v", err) + } + + // Start replica receiver on ephemeral ports. + if err := r.StartReplicaReceiver("127.0.0.1:0", "127.0.0.1:0"); err != nil { + p.Close() + r.Close() + t.Fatalf("StartReplicaReceiver: %v", err) + } + + // Connect primary's shipper to replica's addresses. + p.SetReplicaAddr(r.replRecv.DataAddr(), r.replRecv.CtrlAddr()) + + return p, r +} + +// --- QA-4A-CP2-1: Frame Protocol Adversarial --- + +func testQAFrameMaxPayloadBoundary(t *testing.T) { + // Payload exactly at maxFramePayload → should succeed. + // Payload at maxFramePayload+1 → ReadFrame must return ErrFrameTooLarge. + + // Test at boundary: we can't allocate 256MB in a test, but we can test + // the ReadFrame parser with a crafted header. + var buf bytes.Buffer + + // Write a frame header claiming payload = maxFramePayload + 1 + hdr := make([]byte, frameHeaderSize) + hdr[0] = MsgWALEntry + bigEndianPut32(hdr[1:5], uint32(maxFramePayload)+1) + buf.Write(hdr) + + _, _, err := ReadFrame(&buf) + if !errors.Is(err, ErrFrameTooLarge) { + t.Errorf("ReadFrame with oversized payload: got %v, want ErrFrameTooLarge", err) + } + + // Exactly at max: header says maxFramePayload but we won't supply the data. + // ReadFrame should try to read and get EOF (not ErrFrameTooLarge). + buf.Reset() + hdr[0] = MsgWALEntry + bigEndianPut32(hdr[1:5], uint32(maxFramePayload)) + buf.Write(hdr) + // No payload data → ReadFrame should return io.ErrUnexpectedEOF (not ErrFrameTooLarge). + _, _, err = ReadFrame(&buf) + if errors.Is(err, ErrFrameTooLarge) { + t.Error("payload exactly at maxFramePayload should not be rejected as too large") + } + // Should be some read error (EOF/UnexpectedEOF) since we didn't write payload bytes. + if err == nil { + t.Error("ReadFrame with missing payload data should return error") + } +} + +func testQAFrameTruncatedHeader(t *testing.T) { + // Only 3 bytes (< 5 byte header) → ReadFrame must return error. + buf := bytes.NewReader([]byte{0x01, 0x00, 0x00}) + _, _, err := ReadFrame(buf) + if err == nil { + t.Error("ReadFrame with 3-byte truncated header should return error") + } +} + +func testQAFrameTruncatedPayload(t *testing.T) { + // Header says 100 bytes payload, but only 10 bytes follow. + var buf bytes.Buffer + hdr := make([]byte, frameHeaderSize) + hdr[0] = MsgWALEntry + bigEndianPut32(hdr[1:5], 100) + buf.Write(hdr) + buf.Write(make([]byte, 10)) // only 10 of 100 bytes + + _, _, err := ReadFrame(&buf) + if err == nil { + t.Error("ReadFrame with truncated payload should return error") + } +} + +func testQAFrameZeroPayload(t *testing.T) { + // Zero-length payload must roundtrip correctly. + var buf bytes.Buffer + if err := WriteFrame(&buf, 0x42, nil); err != nil { + t.Fatalf("WriteFrame(nil payload): %v", err) + } + if err := WriteFrame(&buf, 0x43, []byte{}); err != nil { + t.Fatalf("WriteFrame(empty payload): %v", err) + } + + msgType, payload, err := ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame(nil payload): %v", err) + } + if msgType != 0x42 || len(payload) != 0 { + t.Errorf("frame 1: type=0x%02x len=%d, want 0x42 len=0", msgType, len(payload)) + } + + msgType, payload, err = ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame(empty payload): %v", err) + } + if msgType != 0x43 || len(payload) != 0 { + t.Errorf("frame 2: type=0x%02x len=%d, want 0x43 len=0", msgType, len(payload)) + } +} + +func testQAFrameConcurrentWrites(t *testing.T) { + // Multiple goroutines writing frames to the same connection. + // Frames must not interleave (each frame readable as a complete unit). + // This relies on the WALShipper's mutex — test at the connection level. + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + const writers = 8 + const framesPerWriter = 20 + var mu sync.Mutex // simulate WALShipper's mutex + + var wg sync.WaitGroup + wg.Add(writers) + for g := 0; g < writers; g++ { + go func(id int) { + defer wg.Done() + payload := []byte{byte(id)} + for i := 0; i < framesPerWriter; i++ { + mu.Lock() + _ = WriteFrame(client, MsgWALEntry, payload) + mu.Unlock() + } + }(g) + } + + // Reader: read all frames and verify integrity. + totalFrames := writers * framesPerWriter + readDone := make(chan error, 1) + var frameCount int + go func() { + for i := 0; i < totalFrames; i++ { + msgType, payload, err := ReadFrame(server) + if err != nil { + readDone <- err + return + } + if msgType != MsgWALEntry { + readDone <- errors.New("wrong message type") + return + } + if len(payload) != 1 { + readDone <- errors.New("wrong payload length") + return + } + if payload[0] >= byte(writers) { + readDone <- errors.New("invalid writer ID in payload") + return + } + frameCount++ + } + readDone <- nil + }() + + wg.Wait() + + select { + case err := <-readDone: + if err != nil { + t.Fatalf("frame read error: %v (after %d frames)", err, frameCount) + } + case <-time.After(5 * time.Second): + t.Fatal("frame reader hung for 5s") + } + + if frameCount != totalFrames { + t.Errorf("read %d frames, want %d", frameCount, totalFrames) + } +} + +// --- QA-4A-CP2-2: WALShipper Adversarial --- + +func testQAShipperShipAfterStop(t *testing.T) { + s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }) + s.Stop() + + // Ship after Stop must not panic, must return nil (silently drop). + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('S')} + err := s.Ship(entry) + if err != nil { + t.Errorf("Ship after Stop: got %v, want nil", err) + } + if s.ShippedLSN() != 0 { + t.Errorf("ShippedLSN after stopped Ship: got %d, want 0", s.ShippedLSN()) + } +} + +func testQAShipperBarrierAfterDegraded(t *testing.T) { + s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }) + s.degraded.Store(true) + + err := s.Barrier(1) + if !errors.Is(err, ErrReplicaDegraded) { + t.Errorf("Barrier when degraded: got %v, want ErrReplicaDegraded", err) + } +} + +func testQAShipperStaleEpochNoShippedLSN(t *testing.T) { + // Ship with epoch != current → entry silently dropped, shippedLSN unchanged. + currentEpoch := uint64(5) + s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return currentEpoch }) + + entry := &WALEntry{LSN: 10, Epoch: 3, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('X')} + err := s.Ship(entry) + if err != nil { + t.Errorf("Ship with stale epoch: got %v, want nil", err) + } + if s.ShippedLSN() != 0 { + t.Errorf("ShippedLSN after stale epoch Ship: got %d, want 0 (entry should be dropped)", s.ShippedLSN()) + } +} + +func testQAShipperDegradedPermanent(t *testing.T) { + // After degradation, Ship and Barrier must fail immediately forever. + // Create a listener that accepts but immediately closes (triggers write error). + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + defer ln.Close() + + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + conn.Close() // immediately close → Ship will get write error + } + }() + + s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 }) + defer s.Stop() + + // Ship triggers connection → immediate close → write error → degraded. + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('D')} + _ = s.Ship(entry) // first attempt may or may not degrade + _ = s.Ship(entry) // second attempt more likely to hit closed conn + _ = s.Ship(entry) // third attempt for good measure + + // Force degraded if the connection race didn't trigger it. + if !s.IsDegraded() { + // The connection might have worked briefly. Manually degrade for the rest of the test. + s.degraded.Store(true) + } + + // Once degraded, subsequent Ship must not attempt connection. + lsnBefore := s.ShippedLSN() + for i := 0; i < 100; i++ { + _ = s.Ship(entry) + } + lsnAfter := s.ShippedLSN() + if lsnAfter > lsnBefore { + t.Errorf("ShippedLSN advanced from %d to %d after degradation — should not ship when degraded", lsnBefore, lsnAfter) + } + + // Barrier must immediately return ErrReplicaDegraded. + err = s.Barrier(1) + if !errors.Is(err, ErrReplicaDegraded) { + t.Errorf("Barrier after degraded: got %v, want ErrReplicaDegraded", err) + } +} + +func testQAShipperConcurrentShipStop(t *testing.T) { + // Ship and Stop from concurrent goroutines → no deadlock, no panic. + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + // Just consume data. + go io.Copy(io.Discard, conn) + } + }() + defer ln.Close() + + s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 }) + + var wg sync.WaitGroup + // Concurrent shippers. + wg.Add(4) + for g := 0; g < 4; g++ { + go func() { + defer wg.Done() + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('C')} + for i := 0; i < 50; i++ { + _ = s.Ship(entry) + } + }() + } + + // Stop mid-flight. + time.Sleep(5 * time.Millisecond) + s.Stop() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("concurrent Ship + Stop deadlocked for 5s") + } +} + +// --- QA-4A-CP2-3: ReplicaReceiver Adversarial --- + +func testQAReceiverOutOfOrderLSN(t *testing.T) { + // Tests contiguity enforcement: + // 1. Contiguous LSN=1..5 → all applied + // 2. Duplicate LSN=3 → skipped + // 3. Gap LSN=7 (skipping 6) → rejected + // 4. Correct next LSN=6 → accepted + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatalf("dial data: %v", err) + } + defer conn.Close() + + // Case 1: Contiguous LSN=1..5 → all applied. + for lsn := uint64(1); lsn <= 5; lsn++ { + entry := &WALEntry{LSN: lsn, Epoch: 1, Type: EntryTypeWrite, LBA: lsn - 1, Length: 4096, Data: makeBlock(byte('A' + lsn))} + encoded, _ := entry.Encode() + if err := WriteFrame(conn, MsgWALEntry, encoded); err != nil { + t.Fatalf("send LSN=%d: %v", lsn, err) + } + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 5 { + t.Fatalf("ReceivedLSN = %d after LSN 1-5, want 5", recv.ReceivedLSN()) + } + + // Case 2: Duplicate LSN=3 → skipped. + entry3 := &WALEntry{LSN: 3, Epoch: 1, Type: EntryTypeWrite, LBA: 2, Length: 4096, Data: makeBlock('Z')} + encoded3, _ := entry3.Encode() + if err := WriteFrame(conn, MsgWALEntry, encoded3); err != nil { + t.Fatalf("send duplicate LSN=3: %v", err) + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 5 { + t.Errorf("ReceivedLSN = %d after duplicate LSN=3, want 5", recv.ReceivedLSN()) + } + + // Case 3: Gap LSN=7 (skips 6) → rejected by contiguity check. + entry7 := &WALEntry{LSN: 7, Epoch: 1, Type: EntryTypeWrite, LBA: 6, Length: 4096, Data: makeBlock('G')} + encoded7, _ := entry7.Encode() + if err := WriteFrame(conn, MsgWALEntry, encoded7); err != nil { + t.Fatalf("send gap LSN=7: %v", err) + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 5 { + t.Errorf("ReceivedLSN = %d after gap LSN=7, want 5 (gap must be rejected)", recv.ReceivedLSN()) + } + + // Case 4: Correct next LSN=6 → accepted. + entry6 := &WALEntry{LSN: 6, Epoch: 1, Type: EntryTypeWrite, LBA: 5, Length: 4096, Data: makeBlock('F')} + encoded6, _ := entry6.Encode() + if err := WriteFrame(conn, MsgWALEntry, encoded6); err != nil { + t.Fatalf("send LSN=6: %v", err) + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 6 { + t.Errorf("ReceivedLSN = %d after correct LSN=6, want 6", recv.ReceivedLSN()) + } +} + +func testQAReceiverConcurrentDataConns(t *testing.T) { + // With contiguous LSN enforcement, entries must arrive in order on a single + // connection. Test that a stream of 40 contiguous entries from one connection + // is fully applied, then a second connection sending a duplicate is rejected. + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 512 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + defer recv.Stop() + + // Send 40 contiguous entries from a single connection. + conn1, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer conn1.Close() + + for lsn := uint64(1); lsn <= 40; lsn++ { + entry := &WALEntry{LSN: lsn, Epoch: 1, Type: EntryTypeWrite, LBA: lsn, Length: 4096, Data: makeBlock(byte('A' + lsn%26))} + encoded, _ := entry.Encode() + if err := WriteFrame(conn1, MsgWALEntry, encoded); err != nil { + t.Fatalf("send LSN=%d: %v", lsn, err) + } + } + + time.Sleep(100 * time.Millisecond) + + got := recv.ReceivedLSN() + if got != 40 { + t.Errorf("ReceivedLSN = %d after 40 contiguous entries, want 40", got) + } + + // Second connection sending a duplicate LSN=5 → must be skipped. + conn2, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatalf("dial conn2: %v", err) + } + defer conn2.Close() + + dup := &WALEntry{LSN: 5, Epoch: 1, Type: EntryTypeWrite, LBA: 5, Length: 4096, Data: makeBlock('Z')} + encodedDup, _ := dup.Encode() + WriteFrame(conn2, MsgWALEntry, encodedDup) + + time.Sleep(20 * time.Millisecond) + + // receivedLSN must still be 40. + if recv.ReceivedLSN() != 40 { + t.Errorf("ReceivedLSN = %d after duplicate on conn2, want 40", recv.ReceivedLSN()) + } +} + +func testQAReceiverFutureEpochRejected(t *testing.T) { + // R1 fix: replica must reject entries with epoch > local (not just <). + // A future epoch in the WAL stream is a protocol violation — only master + // can bump epochs via SetEpoch. + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(5); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatalf("dial data: %v", err) + } + defer conn.Close() + + // Send entry with stale epoch=3 → rejected. + stale := &WALEntry{LSN: 1, Epoch: 3, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('S')} + encodedStale, _ := stale.Encode() + if err := WriteFrame(conn, MsgWALEntry, encodedStale); err != nil { + t.Fatalf("send stale epoch: %v", err) + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 0 { + t.Errorf("ReceivedLSN = %d after stale epoch entry, want 0", recv.ReceivedLSN()) + } + + // Send entry with future epoch=10 → also rejected (R1 fix). + future := &WALEntry{LSN: 1, Epoch: 10, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('F')} + encodedFuture, _ := future.Encode() + if err := WriteFrame(conn, MsgWALEntry, encodedFuture); err != nil { + t.Fatalf("send future epoch: %v", err) + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 0 { + t.Errorf("ReceivedLSN = %d after future epoch entry, want 0 (future epoch must be rejected)", recv.ReceivedLSN()) + } + + // Send entry with correct epoch=5 → accepted. + correct := &WALEntry{LSN: 1, Epoch: 5, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('C')} + encodedCorrect, _ := correct.Encode() + if err := WriteFrame(conn, MsgWALEntry, encodedCorrect); err != nil { + t.Fatalf("send correct epoch: %v", err) + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 1 { + t.Errorf("ReceivedLSN = %d after correct epoch entry, want 1", recv.ReceivedLSN()) + } +} + +func testQAReceiverBarrierEpochMismatch(t *testing.T) { + // Barrier with wrong epoch → immediate BarrierEpochMismatch (no wait). + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 100 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(5); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + defer recv.Stop() + + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatalf("dial ctrl: %v", err) + } + defer ctrlConn.Close() + + // Barrier with stale epoch=3 → immediate EpochMismatch. + req := EncodeBarrierRequest(BarrierRequest{LSN: 1, Epoch: 3}) + start := time.Now() + if err := WriteFrame(ctrlConn, MsgBarrierReq, req); err != nil { + t.Fatalf("send barrier: %v", err) + } + + ctrlConn.SetReadDeadline(time.Now().Add(3 * time.Second)) + _, payload, err := ReadFrame(ctrlConn) + elapsed := time.Since(start) + if err != nil { + t.Fatalf("read barrier response: %v", err) + } + if payload[0] != BarrierEpochMismatch { + t.Errorf("barrier status = %d, want BarrierEpochMismatch(%d)", payload[0], BarrierEpochMismatch) + } + // Must be fast (no waiting), well under 100ms. + if elapsed > 500*time.Millisecond { + t.Errorf("epoch mismatch barrier took %v, expected immediate response", elapsed) + } + + // Barrier with future epoch=99 → also immediate EpochMismatch. + req2 := EncodeBarrierRequest(BarrierRequest{LSN: 1, Epoch: 99}) + if err := WriteFrame(ctrlConn, MsgBarrierReq, req2); err != nil { + t.Fatalf("send barrier future: %v", err) + } + + ctrlConn.SetReadDeadline(time.Now().Add(3 * time.Second)) + _, payload2, err := ReadFrame(ctrlConn) + if err != nil { + t.Fatalf("read barrier response (future): %v", err) + } + if payload2[0] != BarrierEpochMismatch { + t.Errorf("future epoch barrier status = %d, want BarrierEpochMismatch(%d)", payload2[0], BarrierEpochMismatch) + } +} + +func testQAReceiverBarrierBeforeEntries(t *testing.T) { + // Barrier arrives BEFORE data entries → must wait, then succeed when entries arrive. + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + defer recv.Stop() + + // Send barrier for LSN=3 BEFORE any data. + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatalf("dial ctrl: %v", err) + } + defer ctrlConn.Close() + + barrierDone := make(chan BarrierResponse, 1) + go func() { + req := EncodeBarrierRequest(BarrierRequest{LSN: 3, Epoch: 1}) + if err := WriteFrame(ctrlConn, MsgBarrierReq, req); err != nil { + return + } + _, payload, err := ReadFrame(ctrlConn) + if err != nil { + return + } + barrierDone <- BarrierResponse{Status: payload[0]} + }() + + // Barrier should be waiting (not returned yet). + select { + case resp := <-barrierDone: + t.Fatalf("barrier returned immediately with status %d, expected it to wait", resp.Status) + case <-time.After(50 * time.Millisecond): + // Good — barrier is waiting. + } + + // Now send entries LSN=1,2,3. + dataConn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatalf("dial data: %v", err) + } + defer dataConn.Close() + + for lsn := uint64(1); lsn <= 3; lsn++ { + entry := &WALEntry{LSN: lsn, Epoch: 1, Type: EntryTypeWrite, LBA: lsn, Length: 4096, Data: makeBlock(byte('0' + lsn))} + encoded, _ := entry.Encode() + if err := WriteFrame(dataConn, MsgWALEntry, encoded); err != nil { + t.Fatalf("send LSN=%d: %v", lsn, err) + } + } + + // Barrier should now complete with OK. + select { + case resp := <-barrierDone: + if resp.Status != BarrierOK { + t.Errorf("barrier status = %d, want BarrierOK(0)", resp.Status) + } + case <-time.After(5 * time.Second): + t.Fatal("barrier hung for 5s after entries were sent") + } +} + +func testQAReceiverBarrierTimeoutNoEntries(t *testing.T) { + // Barrier for LSN=999 with no entries → must timeout (not hang forever). + // Uses short configurable barrierTimeout for fast test. + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 100 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.barrierTimeout = 100 * time.Millisecond // fast timeout for test + recv.Serve() + defer recv.Stop() + + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatalf("dial ctrl: %v", err) + } + defer ctrlConn.Close() + + req := EncodeBarrierRequest(BarrierRequest{LSN: 999, Epoch: 1}) + start := time.Now() + if err := WriteFrame(ctrlConn, MsgBarrierReq, req); err != nil { + t.Fatalf("send barrier: %v", err) + } + + ctrlConn.SetReadDeadline(time.Now().Add(3 * time.Second)) + _, payload, err := ReadFrame(ctrlConn) + elapsed := time.Since(start) + if err != nil { + t.Fatalf("read barrier response: %v", err) + } + if payload[0] != BarrierTimeout { + t.Errorf("barrier status = %d, want BarrierTimeout(%d)", payload[0], BarrierTimeout) + } + // Verify timeout was fast (~100ms, not 5s). + if elapsed > 1*time.Second { + t.Errorf("barrier timeout took %v, expected ~100ms (configurable timeout)", elapsed) + } +} + +func testQAReceiverStopUnblocksBarrier(t *testing.T) { + // Barrier waiting for entries → Stop called → barrier must return (not hang). + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 100 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatalf("dial ctrl: %v", err) + } + defer ctrlConn.Close() + + // Send barrier for high LSN (will wait). + req := EncodeBarrierRequest(BarrierRequest{LSN: math.MaxUint64, Epoch: 1}) + if err := WriteFrame(ctrlConn, MsgBarrierReq, req); err != nil { + t.Fatalf("send barrier: %v", err) + } + + // Let barrier start waiting. + time.Sleep(50 * time.Millisecond) + + // Stop should unblock the barrier. + recv.Stop() + + ctrlConn.SetReadDeadline(time.Now().Add(3 * time.Second)) + _, payload, err := ReadFrame(ctrlConn) + if err != nil { + // Connection closed by Stop → also acceptable. + t.Logf("barrier read after Stop: %v (connection closed — acceptable)", err) + return + } + if payload[0] != BarrierTimeout { + t.Errorf("barrier after Stop: status = %d, want BarrierTimeout(%d)", payload[0], BarrierTimeout) + } +} + +// --- QA-4A-CP2-4: DistributedSync Adversarial --- + +func testQADSyncLocalFailReturnsError(t *testing.T) { + // Local fsync fails → must return error regardless of remote result. + localErr := errors.New("disk on fire") + + vol := &BlockVol{} + vol.nextLSN.Store(10) + + syncFn := MakeDistributedSync( + func() error { return localErr }, + nil, // no shipper → local only + vol, + ) + + err := syncFn() + if !errors.Is(err, localErr) { + t.Errorf("dsync with local fail: got %v, want %v", err, localErr) + } +} + +func testQADSyncRemoteFailDegrades(t *testing.T) { + // Remote barrier fails → local succeeded → must return nil + degrade shipper. + primary, replica := createReplicaPair(t) + defer primary.Close() + defer replica.Close() + + // Write some data so there's something to sync. + if err := primary.WriteLBA(0, makeBlock('D')); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Stop the replica receiver to make barrier fail. + replica.replRecv.Stop() + + // Force shipper to reconnect (old ctrl conn may still be open). + primary.shipper.ctrlMu.Lock() + if primary.shipper.ctrlConn != nil { + primary.shipper.ctrlConn.Close() + primary.shipper.ctrlConn = nil + } + primary.shipper.ctrlMu.Unlock() + + // SyncCache triggers distributed sync → barrier fails → degrade. + err := primary.SyncCache() + // Should succeed (local fsync succeeded, remote degraded — returned nil). + if err != nil { + t.Errorf("SyncCache after replica stop: got %v, want nil (local succeeded, remote should degrade silently)", err) + } + + // Shipper should be degraded. + if !primary.shipper.IsDegraded() { + t.Error("shipper should be degraded after replica receiver stopped") + } +} + +func testQADSyncBothFail(t *testing.T) { + // Both local and remote fail → must return local error. + localErr := errors.New("local disk fail") + + vol := &BlockVol{} + vol.nextLSN.Store(10) + + shipper := NewWALShipper("127.0.0.1:1", "127.0.0.1:1", func() uint64 { return 1 }) + + syncFn := MakeDistributedSync( + func() error { return localErr }, + shipper, + vol, + ) + + err := syncFn() + if !errors.Is(err, localErr) { + t.Errorf("dsync with both fail: got %v, want %v", err, localErr) + } +} + +func testQADSyncParallelExecution(t *testing.T) { + // Verify local and remote execute in parallel, not sequentially. + // If sequential, total time ≈ 200ms. If parallel, ≈ 100ms. + // + // Strategy: create a real replica pair where barrier takes ~80ms + // (because data arrives with delay) and local fsync also takes ~80ms. + // If parallel: ~80ms total. If sequential: ~160ms. + primary, replica := createReplicaPair(t) + defer primary.Close() + defer replica.Close() + + // Write some data to primary so there's a WAL entry to sync. + if err := primary.WriteLBA(0, makeBlock('P')); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Give replica time to receive the shipped entry. + time.Sleep(30 * time.Millisecond) + + // Replace the distributed sync with one that has a slow local fsync. + // The barrier should be near-instant since replica already has the data. + var localStart, localEnd atomic.Int64 + slowLocalSync := func() error { + localStart.Store(time.Now().UnixNano()) + time.Sleep(80 * time.Millisecond) + localEnd.Store(time.Now().UnixNano()) + return nil + } + + syncFn := MakeDistributedSync(slowLocalSync, primary.shipper, primary) + + start := time.Now() + err := syncFn() + elapsed := time.Since(start) + + if err != nil { + t.Fatalf("dsync: %v", err) + } + + // Parallel: total ≈ max(local=80ms, barrier=fast) ≈ 80ms + // Sequential: total ≈ local + barrier ≈ 80ms + barrier_overhead + // Key check: elapsed should be < 150ms (well under sequential 160ms+) + if elapsed > 150*time.Millisecond { + t.Errorf("dsync took %v, suggesting sequential execution (expected <150ms for parallel)", elapsed) + } + + // Verify local fsync actually ran (not skipped). + if localStart.Load() == 0 { + t.Error("local fsync was never called — distributed sync may have fallen back to local-only") + } +} + +// --- QA-4A-CP2-5: End-to-end Adversarial --- + +func testQAE2EReplicaDataMatchesPrimary(t *testing.T) { + // Write N blocks on primary → verify replica has identical data via ReadLBA. + primary, replica := createReplicaPair(t) + defer primary.Close() + defer replica.Close() + + const numBlocks = 20 + for i := 0; i < numBlocks; i++ { + if err := primary.WriteLBA(uint64(i), makeBlock(byte('A'+i%26))); err != nil { + t.Fatalf("WriteLBA(%d): %v", i, err) + } + } + + // SyncCache ensures WAL entries are durable on both nodes. + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache: %v", err) + } + + // Give replica time to process entries. + time.Sleep(50 * time.Millisecond) + + // Verify replica has the data. + for i := 0; i < numBlocks; i++ { + data, err := replica.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("replica ReadLBA(%d): %v", i, err) + } + expected := byte('A' + i%26) + if data[0] != expected { + t.Errorf("replica LBA %d: data[0] = %c, want %c", i, data[0], expected) + } + } + + // Verify receivedLSN on replica matches what primary shipped. + primaryLSN := primary.shipper.ShippedLSN() + replicaLSN := replica.replRecv.ReceivedLSN() + if replicaLSN < primaryLSN { + t.Errorf("replica receivedLSN = %d < primary shippedLSN = %d", replicaLSN, primaryLSN) + } +} + +func testQAE2EClosePrimaryDuringShip(t *testing.T) { + // Close primary while writes + shipping in progress → no hang, no panic. + primary, replica := createReplicaPair(t) + defer replica.Close() + + var wg sync.WaitGroup + + // Writer goroutine. + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + err := primary.WriteLBA(uint64(i%256), makeBlock(byte('W'))) + if err != nil { + return // closed or WAL full — expected + } + } + }() + + // Close after a few writes. + time.Sleep(10 * time.Millisecond) + primary.Close() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("Close primary during ship hung for 10s") + } +} + +// --- helper --- + +func bigEndianPut32(b []byte, v uint32) { + b[0] = byte(v >> 24) + b[1] = byte(v >> 16) + b[2] = byte(v >> 8) + b[3] = byte(v) +} diff --git a/weed/storage/blockvol/repl_proto.go b/weed/storage/blockvol/repl_proto.go new file mode 100644 index 000000000..8f15024a9 --- /dev/null +++ b/weed/storage/blockvol/repl_proto.go @@ -0,0 +1,107 @@ +package blockvol + +import ( + "encoding/binary" + "errors" + "fmt" + "io" +) + +// Data channel message types. +const ( + MsgWALEntry byte = 0x01 +) + +// Control channel message types. +const ( + MsgBarrierReq byte = 0x01 + MsgBarrierResp byte = 0x02 +) + +// Barrier response status codes. +const ( + BarrierOK byte = 0x00 + BarrierEpochMismatch byte = 0x01 + BarrierTimeout byte = 0x02 + BarrierFsyncFailed byte = 0x03 +) + +// BarrierRequest is sent by the primary to the replica on the control channel. +type BarrierRequest struct { + Vid uint32 + LSN uint64 + Epoch uint64 +} + +// BarrierResponse is the replica's reply to a barrier request. +type BarrierResponse struct { + Status byte +} + +// Frame header: [1B type][4B payload_len]. +const frameHeaderSize = 5 + +// maxFramePayload caps the payload size to prevent OOM on corrupt data. +const maxFramePayload = 256 * 1024 * 1024 // 256MB + +var ( + ErrFrameTooLarge = errors.New("repl: frame payload exceeds maximum size") + ErrFrameEmpty = errors.New("repl: empty frame payload") +) + +// WriteFrame writes a length-prefixed frame: [1B type][4B len][payload]. +func WriteFrame(w io.Writer, msgType byte, payload []byte) error { + hdr := make([]byte, frameHeaderSize) + hdr[0] = msgType + binary.BigEndian.PutUint32(hdr[1:5], uint32(len(payload))) + if _, err := w.Write(hdr); err != nil { + return fmt.Errorf("repl: write frame header: %w", err) + } + if len(payload) > 0 { + if _, err := w.Write(payload); err != nil { + return fmt.Errorf("repl: write frame payload: %w", err) + } + } + return nil +} + +// ReadFrame reads a length-prefixed frame and returns message type and payload. +func ReadFrame(r io.Reader) (msgType byte, payload []byte, err error) { + hdr := make([]byte, frameHeaderSize) + if _, err = io.ReadFull(r, hdr); err != nil { + return 0, nil, fmt.Errorf("repl: read frame header: %w", err) + } + msgType = hdr[0] + payloadLen := binary.BigEndian.Uint32(hdr[1:5]) + if payloadLen > maxFramePayload { + return 0, nil, ErrFrameTooLarge + } + payload = make([]byte, payloadLen) + if payloadLen > 0 { + if _, err = io.ReadFull(r, payload); err != nil { + return 0, nil, fmt.Errorf("repl: read frame payload: %w", err) + } + } + return msgType, payload, nil +} + +// EncodeBarrierRequest serializes a BarrierRequest (4+8+8 = 20 bytes). +func EncodeBarrierRequest(req BarrierRequest) []byte { + buf := make([]byte, 20) + binary.BigEndian.PutUint32(buf[0:4], req.Vid) + binary.BigEndian.PutUint64(buf[4:12], req.LSN) + binary.BigEndian.PutUint64(buf[12:20], req.Epoch) + return buf +} + +// DecodeBarrierRequest deserializes a BarrierRequest. +func DecodeBarrierRequest(buf []byte) (BarrierRequest, error) { + if len(buf) < 20 { + return BarrierRequest{}, fmt.Errorf("repl: barrier request too short: %d bytes", len(buf)) + } + return BarrierRequest{ + Vid: binary.BigEndian.Uint32(buf[0:4]), + LSN: binary.BigEndian.Uint64(buf[4:12]), + Epoch: binary.BigEndian.Uint64(buf[12:20]), + }, nil +} diff --git a/weed/storage/blockvol/replica_apply.go b/weed/storage/blockvol/replica_apply.go new file mode 100644 index 000000000..81ff53fb7 --- /dev/null +++ b/weed/storage/blockvol/replica_apply.go @@ -0,0 +1,287 @@ +package blockvol + +import ( + "errors" + "fmt" + "log" + "net" + "sync" + "time" +) + +var ( + ErrStaleEpoch = errors.New("blockvol: stale epoch") + ErrDuplicateLSN = errors.New("blockvol: duplicate or out-of-order LSN") +) + +// ReplicaReceiver listens for WAL entries from a primary and applies them +// to the local BlockVol. It runs two listeners: one for the data channel +// (WAL entries) and one for the control channel (barrier requests). +type ReplicaReceiver struct { + vol *BlockVol + barrierTimeout time.Duration + + mu sync.Mutex + receivedLSN uint64 + cond *sync.Cond + + connMu sync.Mutex // protects activeConns + activeConns map[net.Conn]struct{} + + dataListener net.Listener + ctrlListener net.Listener + stopCh chan struct{} + stopped bool + wg sync.WaitGroup +} + +const defaultBarrierTimeout = 5 * time.Second + +// NewReplicaReceiver creates and starts listening on the data and control ports. +func NewReplicaReceiver(vol *BlockVol, dataAddr, ctrlAddr string) (*ReplicaReceiver, error) { + dataLn, err := net.Listen("tcp", dataAddr) + if err != nil { + return nil, fmt.Errorf("replica: listen data %s: %w", dataAddr, err) + } + ctrlLn, err := net.Listen("tcp", ctrlAddr) + if err != nil { + dataLn.Close() + return nil, fmt.Errorf("replica: listen ctrl %s: %w", ctrlAddr, err) + } + + r := &ReplicaReceiver{ + vol: vol, + barrierTimeout: defaultBarrierTimeout, + dataListener: dataLn, + ctrlListener: ctrlLn, + stopCh: make(chan struct{}), + activeConns: make(map[net.Conn]struct{}), + } + r.cond = sync.NewCond(&r.mu) + return r, nil +} + +// Serve starts accept loops for both listeners. Call Stop() to shut down. +func (r *ReplicaReceiver) Serve() { + r.wg.Add(2) + go r.acceptDataLoop() + go r.acceptCtrlLoop() +} + +// Stop shuts down both listeners, closes active connections, and waits for goroutines. +func (r *ReplicaReceiver) Stop() { + r.mu.Lock() + if r.stopped { + r.mu.Unlock() + return + } + r.stopped = true + r.mu.Unlock() + + close(r.stopCh) + r.dataListener.Close() + r.ctrlListener.Close() + + // Close all active connections to unblock ReadFrame calls. + r.connMu.Lock() + for conn := range r.activeConns { + conn.Close() + } + r.connMu.Unlock() + + // Wake any barrier waiters so they can exit (must hold mu for cond). + r.mu.Lock() + r.cond.Broadcast() + r.mu.Unlock() + r.wg.Wait() +} + +func (r *ReplicaReceiver) trackConn(conn net.Conn) { + r.connMu.Lock() + r.activeConns[conn] = struct{}{} + r.connMu.Unlock() +} + +func (r *ReplicaReceiver) untrackConn(conn net.Conn) { + r.connMu.Lock() + delete(r.activeConns, conn) + r.connMu.Unlock() +} + +func (r *ReplicaReceiver) acceptDataLoop() { + defer r.wg.Done() + for { + conn, err := r.dataListener.Accept() + if err != nil { + select { + case <-r.stopCh: + return + default: + log.Printf("replica: data accept error: %v", err) + return + } + } + r.trackConn(conn) + r.wg.Add(1) + go func() { + defer r.wg.Done() + defer r.untrackConn(conn) + r.handleDataConn(conn) + }() + } +} + +func (r *ReplicaReceiver) acceptCtrlLoop() { + defer r.wg.Done() + for { + conn, err := r.ctrlListener.Accept() + if err != nil { + select { + case <-r.stopCh: + return + default: + log.Printf("replica: ctrl accept error: %v", err) + return + } + } + r.trackConn(conn) + r.wg.Add(1) + go func() { + defer r.wg.Done() + defer r.untrackConn(conn) + r.handleControlConn(conn) + }() + } +} + +// handleDataConn reads WAL entry frames and applies them to the local volume. +func (r *ReplicaReceiver) handleDataConn(conn net.Conn) { + defer conn.Close() + for { + select { + case <-r.stopCh: + return + default: + } + + msgType, payload, err := ReadFrame(conn) + if err != nil { + select { + case <-r.stopCh: + default: + log.Printf("replica: data read error: %v", err) + } + return + } + + if msgType != MsgWALEntry { + log.Printf("replica: unexpected data message type 0x%02x", msgType) + continue + } + + if err := r.applyEntry(payload); err != nil { + log.Printf("replica: apply entry error: %v", err) + } + } +} + +// applyEntry decodes and applies a single WAL entry to the local volume. +// The entire apply (LSN check → WAL append → dirty map → receivedLSN update) +// is serialized under mu to prevent TOCTOU races between concurrent entries. +func (r *ReplicaReceiver) applyEntry(payload []byte) error { + entry, err := DecodeWALEntry(payload) + if err != nil { + return fmt.Errorf("decode WAL entry: %w", err) + } + + // Validate epoch: replicas must NOT accept epoch bumps from WAL stream. + // Only the master can change epochs (via SetEpoch in CP3). + localEpoch := r.vol.epoch.Load() + if entry.Epoch != localEpoch { + return fmt.Errorf("%w: entry epoch %d != local %d", ErrStaleEpoch, entry.Epoch, localEpoch) + } + + r.mu.Lock() + defer r.mu.Unlock() + + // Enforce contiguous LSN: only accept the next expected entry. + // This prevents gaps that would let a barrier pass incorrectly. + if entry.LSN <= r.receivedLSN { + log.Printf("replica: skipping duplicate/old LSN %d (received %d)", entry.LSN, r.receivedLSN) + return nil + } + if entry.LSN != r.receivedLSN+1 { + return fmt.Errorf("%w: expected LSN %d, got %d (gap)", ErrDuplicateLSN, r.receivedLSN+1, entry.LSN) + } + + // Append to local WAL (with retry on WAL full). + walOff, err := r.replicaAppendWithRetry(&entry) + if err != nil { + return fmt.Errorf("WAL append: %w", err) + } + + // Update dirty map. + switch entry.Type { + case EntryTypeWrite, EntryTypeTrim: + blocks := entry.Length / r.vol.super.BlockSize + for i := uint32(0); i < blocks; i++ { + r.vol.dirtyMap.Put(entry.LBA+uint64(i), walOff, entry.LSN, r.vol.super.BlockSize) + } + } + + // Update receivedLSN and signal barrier waiters. + r.receivedLSN = entry.LSN + r.cond.Broadcast() + + return nil +} + +// replicaAppendWithRetry appends a WAL entry, retrying on WAL-full by +// triggering the flusher. Caller must hold r.mu. +func (r *ReplicaReceiver) replicaAppendWithRetry(entry *WALEntry) (uint64, error) { + walOff, err := r.vol.wal.Append(entry) + if !errors.Is(err, ErrWALFull) { + return walOff, err + } + + deadline := time.After(r.vol.config.WALFullTimeout) + for errors.Is(err, ErrWALFull) { + select { + case <-r.stopCh: + return 0, fmt.Errorf("replica: stopped during WAL retry") + default: + } + if r.vol.flusher != nil { + r.vol.flusher.NotifyUrgent() + } + // Release mu briefly so barrier waiters can proceed and + // the flusher can make progress (it may need dirty map lock). + r.mu.Unlock() + select { + case <-deadline: + r.mu.Lock() + return 0, fmt.Errorf("replica: WAL full timeout: %w", ErrWALFull) + case <-time.After(1 * time.Millisecond): + } + r.mu.Lock() + walOff, err = r.vol.wal.Append(entry) + } + return walOff, err +} + +// ReceivedLSN returns the highest LSN received and written to the local WAL. +func (r *ReplicaReceiver) ReceivedLSN() uint64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.receivedLSN +} + +// DataAddr returns the data listener's address (useful for tests with :0 ports). +func (r *ReplicaReceiver) DataAddr() string { + return r.dataListener.Addr().String() +} + +// CtrlAddr returns the control listener's address. +func (r *ReplicaReceiver) CtrlAddr() string { + return r.ctrlListener.Addr().String() +} diff --git a/weed/storage/blockvol/replica_barrier.go b/weed/storage/blockvol/replica_barrier.go new file mode 100644 index 000000000..76e3b2830 --- /dev/null +++ b/weed/storage/blockvol/replica_barrier.go @@ -0,0 +1,101 @@ +package blockvol + +import ( + "log" + "net" + "time" +) + +// handleControlConn reads barrier requests from the control channel and +// responds with barrier status after ensuring durability. +func (r *ReplicaReceiver) handleControlConn(conn net.Conn) { + defer conn.Close() + for { + select { + case <-r.stopCh: + return + default: + } + + msgType, payload, err := ReadFrame(conn) + if err != nil { + select { + case <-r.stopCh: + default: + log.Printf("replica: ctrl read error: %v", err) + } + return + } + + if msgType != MsgBarrierReq { + log.Printf("replica: unexpected ctrl message type 0x%02x", msgType) + continue + } + + req, err := DecodeBarrierRequest(payload) + if err != nil { + log.Printf("replica: decode barrier request: %v", err) + continue + } + + resp := r.handleBarrier(req) + + respPayload := []byte{resp.Status} + if err := WriteFrame(conn, MsgBarrierResp, respPayload); err != nil { + log.Printf("replica: write barrier response: %v", err) + return + } + } +} + +// handleBarrier waits until all WAL entries up to req.LSN have been received, +// then fsyncs the WAL to ensure durability. +func (r *ReplicaReceiver) handleBarrier(req BarrierRequest) BarrierResponse { + // Fail fast on epoch mismatch. + localEpoch := r.vol.epoch.Load() + if req.Epoch != localEpoch { + return BarrierResponse{Status: BarrierEpochMismatch} + } + + // Use a timer goroutine to wake us on deadline. + timer := time.NewTimer(r.barrierTimeout) + defer timer.Stop() + timedOut := make(chan struct{}) + go func() { + select { + case <-timer.C: + close(timedOut) + // Broadcast to wake any cond.Wait blocked in the loop below. + r.mu.Lock() + r.cond.Broadcast() + r.mu.Unlock() + case <-r.stopCh: + } + }() + + r.mu.Lock() + for r.receivedLSN < req.LSN { + // Check if timed out or stopped. + select { + case <-timedOut: + r.mu.Unlock() + return BarrierResponse{Status: BarrierTimeout} + case <-r.stopCh: + r.mu.Unlock() + return BarrierResponse{Status: BarrierTimeout} + default: + } + + // Block on cond.Wait — woken by applyEntry or timeout goroutine. + r.cond.Wait() + } + r.mu.Unlock() + + // fsync WAL AFTER confirming all entries received. + if err := r.vol.fd.Sync(); err != nil { + log.Printf("replica: barrier fsync error: %v", err) + return BarrierResponse{Status: BarrierFsyncFailed} + } + + return BarrierResponse{Status: BarrierOK} +} diff --git a/weed/storage/blockvol/wal_shipper.go b/weed/storage/blockvol/wal_shipper.go new file mode 100644 index 000000000..5ca9348d2 --- /dev/null +++ b/weed/storage/blockvol/wal_shipper.go @@ -0,0 +1,197 @@ +package blockvol + +import ( + "errors" + "fmt" + "log" + "net" + "sync" + "sync/atomic" + "time" +) + +var ( + ErrReplicaDegraded = errors.New("blockvol: replica degraded") + ErrShipperStopped = errors.New("blockvol: shipper stopped") +) + +const barrierTimeout = 5 * time.Second + +// WALShipper streams WAL entries from the primary to a replica over TCP. +// Fire-and-forget: no per-entry ACK. Barriers provide durability confirmation. +type WALShipper struct { + dataAddr string + controlAddr string + epochFn func() uint64 + + mu sync.Mutex // protects dataConn + dataConn net.Conn + + ctrlMu sync.Mutex // protects ctrlConn + ctrlConn net.Conn + + shippedLSN atomic.Uint64 + degraded atomic.Bool + stopped atomic.Bool +} + +// NewWALShipper creates a WAL shipper. Connections are established lazily on +// first Ship/Barrier call. epochFn returns the current epoch for validation. +func NewWALShipper(dataAddr, controlAddr string, epochFn func() uint64) *WALShipper { + return &WALShipper{ + dataAddr: dataAddr, + controlAddr: controlAddr, + epochFn: epochFn, + } +} + +// Ship sends a WAL entry to the replica over the data channel. +// On write error, the shipper enters degraded mode permanently. +func (s *WALShipper) Ship(entry *WALEntry) error { + if s.stopped.Load() || s.degraded.Load() { + return nil + } + + // Validate epoch: drop stale entries. + if entry.Epoch != s.epochFn() { + log.Printf("wal_shipper: dropping entry LSN=%d with stale epoch %d (current %d)", + entry.LSN, entry.Epoch, s.epochFn()) + return nil + } + + encoded, err := entry.Encode() + if err != nil { + return fmt.Errorf("wal_shipper: encode entry: %w", err) + } + + s.mu.Lock() + defer s.mu.Unlock() + + if err := s.ensureDataConn(); err != nil { + s.markDegraded() + return nil + } + + if err := WriteFrame(s.dataConn, MsgWALEntry, encoded); err != nil { + s.markDegraded() + return nil + } + + s.shippedLSN.Store(entry.LSN) + return nil +} + +// Barrier sends a barrier request on the control channel and waits for the +// replica to confirm durability up to lsnMax. Returns ErrReplicaDegraded if +// the shipper is in degraded mode. +func (s *WALShipper) Barrier(lsnMax uint64) error { + if s.stopped.Load() { + return ErrShipperStopped + } + if s.degraded.Load() { + return ErrReplicaDegraded + } + + req := EncodeBarrierRequest(BarrierRequest{ + LSN: lsnMax, + Epoch: s.epochFn(), + }) + + s.ctrlMu.Lock() + defer s.ctrlMu.Unlock() + + if err := s.ensureCtrlConn(); err != nil { + s.markDegraded() + return ErrReplicaDegraded + } + + s.ctrlConn.SetDeadline(time.Now().Add(barrierTimeout)) + + if err := WriteFrame(s.ctrlConn, MsgBarrierReq, req); err != nil { + s.markDegraded() + return ErrReplicaDegraded + } + + msgType, payload, err := ReadFrame(s.ctrlConn) + if err != nil { + s.markDegraded() + return ErrReplicaDegraded + } + + if msgType != MsgBarrierResp || len(payload) < 1 { + s.markDegraded() + return ErrReplicaDegraded + } + + switch payload[0] { + case BarrierOK: + return nil + case BarrierEpochMismatch: + return fmt.Errorf("wal_shipper: barrier epoch mismatch") + case BarrierTimeout: + return fmt.Errorf("wal_shipper: barrier timeout on replica") + case BarrierFsyncFailed: + return fmt.Errorf("wal_shipper: barrier fsync failed on replica") + default: + return fmt.Errorf("wal_shipper: unknown barrier status %d", payload[0]) + } +} + +// ShippedLSN returns the highest LSN successfully sent to the replica. +func (s *WALShipper) ShippedLSN() uint64 { + return s.shippedLSN.Load() +} + +// IsDegraded returns true if the replica is unreachable. +func (s *WALShipper) IsDegraded() bool { + return s.degraded.Load() +} + +// Stop shuts down the shipper and closes connections. +func (s *WALShipper) Stop() { + if s.stopped.Swap(true) { + return + } + s.mu.Lock() + if s.dataConn != nil { + s.dataConn.Close() + s.dataConn = nil + } + s.mu.Unlock() + + s.ctrlMu.Lock() + if s.ctrlConn != nil { + s.ctrlConn.Close() + s.ctrlConn = nil + } + s.ctrlMu.Unlock() +} + +func (s *WALShipper) ensureDataConn() error { + if s.dataConn != nil { + return nil + } + conn, err := net.DialTimeout("tcp", s.dataAddr, 3*time.Second) + if err != nil { + return err + } + s.dataConn = conn + return nil +} + +func (s *WALShipper) ensureCtrlConn() error { + if s.ctrlConn != nil { + return nil + } + conn, err := net.DialTimeout("tcp", s.controlAddr, 3*time.Second) + if err != nil { + return err + } + s.ctrlConn = conn + return nil +} + +func (s *WALShipper) markDegraded() { + s.degraded.Store(true) + log.Printf("wal_shipper: replica degraded (data=%s, ctrl=%s)", s.dataAddr, s.controlAddr) +}