diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index f674035e3..dcf276a11 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -7,6 +7,7 @@ import ( "encoding/binary" "errors" "fmt" + "log" "os" "sync" "sync/atomic" @@ -42,6 +43,10 @@ type BlockVol struct { opsOutstanding atomic.Int64 // in-flight Read/Write/Trim/SyncCache ops opsDrained chan struct{} + // Replication fields (Phase 4A CP2). + shipper *WALShipper + replRecv *ReplicaReceiver + // Fencing fields (Phase 4A). epoch atomic.Uint64 // current persisted epoch masterEpoch atomic.Uint64 // expected epoch from master @@ -253,6 +258,12 @@ func (v *BlockVol) appendWithRetry(entry *WALEntry) (uint64, error) { if v.closed.Load() { return 0, ErrVolumeClosed } + + // Ship to replica if configured (fire-and-forget). + if v.shipper != nil { + v.shipper.Ship(entry) + } + return walOff, nil } @@ -505,11 +516,59 @@ func (v *BlockVol) SyncCache() error { return v.groupCommit.Submit() } +// SetReplicaAddr configures the replica endpoint and creates a WAL shipper +// with distributed group commit. Must be called before any writes. +func (v *BlockVol) SetReplicaAddr(dataAddr, ctrlAddr string) { + v.shipper = NewWALShipper(dataAddr, ctrlAddr, func() uint64 { + return v.epoch.Load() + }) + + // Replace the group committer's sync function with a distributed version. + v.groupCommit.Stop() + v.groupCommit = NewGroupCommitter(GroupCommitterConfig{ + SyncFunc: MakeDistributedSync(v.fd.Sync, v.shipper, v), + MaxDelay: v.config.GroupCommitMaxDelay, + MaxBatch: v.config.GroupCommitMaxBatch, + LowWatermark: v.config.GroupCommitLowWatermark, + OnDegraded: func() { v.healthy.Store(false) }, + PostSyncCheck: v.writeGate, + }) + go v.groupCommit.Run() +} + +// StartReplicaReceiver starts listening for replicated WAL entries from a primary. +func (v *BlockVol) StartReplicaReceiver(dataAddr, ctrlAddr string) error { + recv, err := NewReplicaReceiver(v, dataAddr, ctrlAddr) + if err != nil { + return err + } + v.replRecv = recv + recv.Serve() + return nil +} + +// degradeReplica marks the shipper as degraded and logs a warning. +func (v *BlockVol) degradeReplica(err error) { + if v.shipper != nil { + v.shipper.degraded.Store(true) + } + log.Printf("blockvol: replica degraded: %v", err) +} + // Close shuts down the block volume and closes the file. -// Shutdown order: drain in-flight ops → group committer → flusher → final flush → close fd. +// Shutdown order: shipper → replica receiver → drain ops → group committer → flusher → final flush → close fd. func (v *BlockVol) Close() error { v.closed.Store(true) + // Stop shipper first: no more Ship calls. + if v.shipper != nil { + v.shipper.Stop() + } + // Stop replica receiver: no more barrier waits. + if v.replRecv != nil { + v.replRecv.Stop() + } + // Drain in-flight ops: beginOp checks closed and returns ErrVolumeClosed, // so no new ops can start. Wait for existing ones to finish (max 5s). if v.opsOutstanding.Load() > 0 { diff --git a/weed/storage/blockvol/blockvol_test.go b/weed/storage/blockvol/blockvol_test.go index 7e887324e..06f42c655 100644 --- a/weed/storage/blockvol/blockvol_test.go +++ b/weed/storage/blockvol/blockvol_test.go @@ -4,6 +4,9 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" + "io" + "net" "os" "path/filepath" "sync" @@ -90,6 +93,40 @@ func TestBlockVol(t *testing.T) { {name: "blockvol_gotcha_a_lease_expired", run: testBlockvolGotchaALeaseExpired}, // Phase 4A CP1: P3-BUG-9 dirty map. {name: "dirty_map_power_of_2_panics", run: testDirtyMapPowerOf2Panics}, + // Phase 4A CP2: Replication wire protocol. + {name: "frame_roundtrip", run: testFrameRoundtrip}, + {name: "frame_large_payload", run: testFrameLargePayload}, + // Phase 4A CP2: WAL shipper. + {name: "ship_single_entry", run: testShipSingleEntry}, + {name: "ship_batch", run: testShipBatch}, + {name: "ship_epoch_mismatch_dropped", run: testShipEpochMismatchDropped}, + {name: "ship_degraded_on_error", run: testShipDegradedOnError}, + {name: "ship_no_replica_noop", run: testShipNoReplicaNoop}, + // Phase 4A CP2: Replica apply. + {name: "replica_apply_entry", run: testReplicaApplyEntry}, + {name: "replica_reject_stale_epoch", run: testReplicaRejectStaleEpoch}, + {name: "replica_apply_updates_dirty_map", run: testReplicaApplyUpdatesDirtyMap}, + {name: "replica_reject_duplicate_lsn", run: testReplicaRejectDuplicateLSN}, + {name: "replica_flusher_works", run: testReplicaFlusherWorks}, + // Phase 4A CP2: Replica barrier. + {name: "barrier_already_received", run: testBarrierAlreadyReceived}, + {name: "barrier_wait_for_entries", run: testBarrierWaitForEntries}, + {name: "barrier_timeout", run: testBarrierTimeout}, + {name: "barrier_epoch_mismatch_fast_fail", run: testBarrierEpochMismatchFastFail}, + {name: "barrier_concurrent_append", run: testBarrierConcurrentAppend}, + // Phase 4A CP2: Distributed group commit. + {name: "dist_commit_both_pass", run: testDistCommitBothPass}, + {name: "dist_commit_local_fail", run: testDistCommitLocalFail}, + {name: "dist_commit_remote_fail_degrades", run: testDistCommitRemoteFailDegrades}, + {name: "dist_commit_no_replica", run: testDistCommitNoReplica}, + // Phase 4A CP2: BlockVol integration. + {name: "blockvol_write_with_replica", run: testBlockvolWriteWithReplica}, + {name: "blockvol_no_replica_compat", run: testBlockvolNoReplicaCompat}, + // Phase 4A CP2 bug fixes. + {name: "replica_reject_future_epoch", run: testReplicaRejectFutureEpoch}, + {name: "replica_reject_lsn_gap", run: testReplicaRejectLSNGap}, + {name: "barrier_fsync_failed_status", run: testBarrierFsyncFailedStatus}, + {name: "barrier_configurable_timeout", run: testBarrierConfigurableTimeout}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -2112,3 +2149,1070 @@ func testDirtyMapPowerOf2Panics(t *testing.T) { }() NewDirtyMap(3) // should panic } + +// ============================================================================= +// Phase 4A CP2: Replication wire protocol tests +// ============================================================================= + +func testFrameRoundtrip(t *testing.T) { + // Test frame write + read roundtrip with various payloads. + tests := []struct { + msgType byte + payload []byte + }{ + {MsgWALEntry, []byte("hello")}, + {MsgBarrierReq, EncodeBarrierRequest(BarrierRequest{Vid: 1, LSN: 42, Epoch: 7})}, + {MsgBarrierResp, []byte{BarrierOK}}, + {0xFF, []byte{}}, // empty payload + } + for _, tc := range tests { + var buf bytes.Buffer + if err := WriteFrame(&buf, tc.msgType, tc.payload); err != nil { + t.Fatalf("WriteFrame: %v", err) + } + gotType, gotPayload, err := ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame: %v", err) + } + if gotType != tc.msgType { + t.Errorf("type: got 0x%02x, want 0x%02x", gotType, tc.msgType) + } + if !bytes.Equal(gotPayload, tc.payload) { + t.Errorf("payload mismatch") + } + } +} + +func testFrameLargePayload(t *testing.T) { + payload := make([]byte, 1024*1024) // 1MB + for i := range payload { + payload[i] = byte(i % 256) + } + var buf bytes.Buffer + if err := WriteFrame(&buf, MsgWALEntry, payload); err != nil { + t.Fatalf("WriteFrame: %v", err) + } + gotType, gotPayload, err := ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame: %v", err) + } + if gotType != MsgWALEntry { + t.Errorf("type: got 0x%02x, want 0x%02x", gotType, MsgWALEntry) + } + if !bytes.Equal(gotPayload, payload) { + t.Error("large payload mismatch") + } +} + +// ============================================================================= +// Phase 4A CP2: WAL shipper tests +// ============================================================================= + +// mockDataServer starts a TCP server that reads frames and collects them. +func mockDataServer(t *testing.T) (addr string, frames *[][]byte, done chan struct{}) { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + collected := &[][]byte{} + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + conn, err := ln.Accept() + if err != nil { + return + } + defer conn.Close() + for { + _, payload, err := ReadFrame(conn) + if err != nil { + return + } + *collected = append(*collected, payload) + } + }() + t.Cleanup(func() { ln.Close() }) + return ln.Addr().String(), collected, doneCh +} + +// mockCtrlServer starts a TCP server that reads barrier requests and responds OK. +func mockCtrlServer(t *testing.T, respStatus byte) (addr string, done chan struct{}) { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + conn, err := ln.Accept() + if err != nil { + return + } + defer conn.Close() + for { + msgType, _, err := ReadFrame(conn) + if err != nil { + return + } + if msgType == MsgBarrierReq { + WriteFrame(conn, MsgBarrierResp, []byte{respStatus}) + } + } + }() + t.Cleanup(func() { ln.Close() }) + return ln.Addr().String(), doneCh +} + +func testShipSingleEntry(t *testing.T) { + dataAddr, frames, done := mockDataServer(t) + ctrlAddr, _ := mockCtrlServer(t, BarrierOK) + + epoch := uint64(1) + s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }) + defer s.Stop() + + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} + if err := s.Ship(entry); err != nil { + t.Fatalf("Ship: %v", err) + } + + if s.ShippedLSN() != 1 { + t.Errorf("ShippedLSN: got %d, want 1", s.ShippedLSN()) + } + + s.Stop() + <-done + + if len(*frames) != 1 { + t.Fatalf("frames: got %d, want 1", len(*frames)) + } + decoded, err := DecodeWALEntry((*frames)[0]) + if err != nil { + t.Fatalf("decode: %v", err) + } + if decoded.LSN != 1 { + t.Errorf("LSN: got %d, want 1", decoded.LSN) + } +} + +func testShipBatch(t *testing.T) { + dataAddr, frames, done := mockDataServer(t) + ctrlAddr, _ := mockCtrlServer(t, BarrierOK) + + epoch := uint64(1) + s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }) + defer s.Stop() + + for i := uint64(1); i <= 5; i++ { + entry := &WALEntry{LSN: i, Epoch: 1, Type: EntryTypeTrim, LBA: i, Length: 4096} + if err := s.Ship(entry); err != nil { + t.Fatalf("Ship(%d): %v", i, err) + } + } + + if s.ShippedLSN() != 5 { + t.Errorf("ShippedLSN: got %d, want 5", s.ShippedLSN()) + } + + s.Stop() + <-done + + if len(*frames) != 5 { + t.Fatalf("frames: got %d, want 5", len(*frames)) + } +} + +func testShipEpochMismatchDropped(t *testing.T) { + // Use a real server so connections work, but verify no frames arrive. + dataAddr, frames, done := mockDataServer(t) + ctrlAddr, _ := mockCtrlServer(t, BarrierOK) + + epoch := uint64(2) // shipper epoch is 2 + s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }) + + // First ship a valid entry to establish connection. + validEntry := &WALEntry{LSN: 1, Epoch: 2, Type: EntryTypeTrim, LBA: 0, Length: 4096} + if err := s.Ship(validEntry); err != nil { + t.Fatalf("Ship valid: %v", err) + } + + // Now ship an entry with old epoch 1 — should be silently dropped. + staleEntry := &WALEntry{LSN: 2, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} + if err := s.Ship(staleEntry); err != nil { + t.Fatalf("Ship stale: %v", err) + } + + // ShippedLSN should be 1 (only the valid entry). + if s.ShippedLSN() != 1 { + t.Errorf("ShippedLSN should be 1, got %d", s.ShippedLSN()) + } + + s.Stop() + <-done + + // Only the valid entry should have been shipped. + if len(*frames) != 1 { + t.Errorf("frames: got %d, want 1 (stale entry should be dropped)", len(*frames)) + } +} + +func testShipDegradedOnError(t *testing.T) { + // Start a server that immediately closes the connection. + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + go func() { + conn, err := ln.Accept() + if err != nil { + return + } + conn.Close() // close immediately — writes will fail + }() + defer ln.Close() + + ctrlAddr, _ := mockCtrlServer(t, BarrierOK) + epoch := uint64(1) + s := NewWALShipper(ln.Addr().String(), ctrlAddr, func() uint64 { return epoch }) + defer s.Stop() + + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} + // Ship may succeed on first write (kernel buffer) or fail. Keep shipping until degraded. + for i := 0; i < 10; i++ { + entry.LSN = uint64(i + 1) + s.Ship(entry) + if s.IsDegraded() { + break + } + time.Sleep(5 * time.Millisecond) + } + + if !s.IsDegraded() { + t.Error("expected shipper to be degraded after write error") + } + + // Subsequent Ship calls should be no-ops. + entry.LSN = 100 + if err := s.Ship(entry); err != nil { + t.Fatalf("Ship after degraded: %v", err) + } + + // Barrier should return ErrReplicaDegraded. + if err := s.Barrier(100); !errors.Is(err, ErrReplicaDegraded) { + t.Errorf("Barrier after degraded: got %v, want ErrReplicaDegraded", err) + } +} + +func testShipNoReplicaNoop(t *testing.T) { + // A nil shipper should not be called, but test that a stopped shipper is safe. + s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }) + s.Stop() + + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} + if err := s.Ship(entry); err != nil { + t.Fatalf("Ship after stop: %v", err) + } + if err := s.Barrier(1); !errors.Is(err, ErrShipperStopped) { + t.Errorf("Barrier after stop: got %v, want ErrShipperStopped", err) + } +} + +// ============================================================================= +// Phase 4A CP2: Replica apply tests +// ============================================================================= + +func createReplicaVolPair(t *testing.T) (primary *BlockVol, replica *BlockVol) { + t.Helper() + pDir := t.TempDir() + rDir := t.TempDir() + opts := CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 256 * 1024, + } + p, err := CreateBlockVol(filepath.Join(pDir, "primary.blockvol"), opts) + if err != nil { + t.Fatalf("CreateBlockVol primary: %v", err) + } + r, err := CreateBlockVol(filepath.Join(rDir, "replica.blockvol"), opts) + if err != nil { + p.Close() + t.Fatalf("CreateBlockVol replica: %v", err) + } + return p, r +} + +func testReplicaApplyEntry(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + // Connect to data port and send a WAL entry. + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + data := makeBlock('X') + entry := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: data} + encoded, _ := entry.Encode() + if err := WriteFrame(conn, MsgWALEntry, encoded); err != nil { + t.Fatal(err) + } + + // Wait for apply. + deadline := time.After(2 * time.Second) + for { + if recv.ReceivedLSN() >= 1 { + break + } + select { + case <-deadline: + t.Fatal("timeout waiting for entry to be applied") + default: + time.Sleep(time.Millisecond) + } + } + + if recv.ReceivedLSN() != 1 { + t.Errorf("ReceivedLSN: got %d, want 1", recv.ReceivedLSN()) + } +} + +func testReplicaRejectStaleEpoch(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + replica.epoch.Store(5) // replica at epoch 5 + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Entry with epoch 3 (stale) — should be rejected. + entry := &WALEntry{LSN: 1, Epoch: 3, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} + encoded, _ := entry.Encode() + WriteFrame(conn, MsgWALEntry, encoded) + + // Give it time to process. + time.Sleep(50 * time.Millisecond) + + if recv.ReceivedLSN() != 0 { + t.Errorf("ReceivedLSN should be 0 (stale entry rejected), got %d", recv.ReceivedLSN()) + } +} + +func testReplicaApplyUpdatesDirtyMap(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + data := makeBlock('D') + entry := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeWrite, LBA: 5, Length: 4096, Data: data} + encoded, _ := entry.Encode() + WriteFrame(conn, MsgWALEntry, encoded) + + // Wait for apply. + deadline := time.After(2 * time.Second) + for recv.ReceivedLSN() < 1 { + select { + case <-deadline: + t.Fatal("timeout") + default: + time.Sleep(time.Millisecond) + } + } + + // Check dirty map has an entry for LBA 5. + _, lsn, _, ok := replica.dirtyMap.Get(5) + if !ok { + t.Fatal("dirty map: LBA 5 not found") + } + if lsn != 1 { + t.Errorf("dirty map LSN: got %d, want 1", lsn) + } +} + +func testReplicaRejectDuplicateLSN(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Send LSN=1. + entry := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeTrim, LBA: 0, Length: 4096} + encoded, _ := entry.Encode() + WriteFrame(conn, MsgWALEntry, encoded) + + deadline := time.After(2 * time.Second) + for recv.ReceivedLSN() < 1 { + select { + case <-deadline: + t.Fatal("timeout") + default: + time.Sleep(time.Millisecond) + } + } + + // Send duplicate LSN=1 — should be skipped. + WriteFrame(conn, MsgWALEntry, encoded) + time.Sleep(50 * time.Millisecond) + + // ReceivedLSN should still be 1. + if recv.ReceivedLSN() != 1 { + t.Errorf("ReceivedLSN after dup: got %d, want 1", recv.ReceivedLSN()) + } +} + +func testReplicaFlusherWorks(t *testing.T) { + // Verify the replica vol's flusher can flush replicated entries. + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + data := makeBlock('F') + entry := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: data} + encoded, _ := entry.Encode() + WriteFrame(conn, MsgWALEntry, encoded) + + deadline := time.After(2 * time.Second) + for recv.ReceivedLSN() < 1 { + select { + case <-deadline: + t.Fatal("timeout") + default: + time.Sleep(time.Millisecond) + } + } + + // Trigger flusher and verify data is readable from replica. + replica.flusher.FlushOnce() + + got, err := replica.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA: %v", err) + } + if !bytes.Equal(got, data) { + t.Error("flushed data mismatch on replica") + } +} + +// ============================================================================= +// Phase 4A CP2: Replica barrier tests +// ============================================================================= + +func testBarrierAlreadyReceived(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + // Send an entry first. + dataConn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer dataConn.Close() + + entry := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeTrim, LBA: 0, Length: 4096} + encoded, _ := entry.Encode() + WriteFrame(dataConn, MsgWALEntry, encoded) + + deadline := time.After(2 * time.Second) + for recv.ReceivedLSN() < 1 { + select { + case <-deadline: + t.Fatal("timeout") + default: + time.Sleep(time.Millisecond) + } + } + + // Now send a barrier for LSN=1 — should succeed immediately. + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatal(err) + } + defer ctrlConn.Close() + ctrlConn.SetDeadline(time.Now().Add(5 * time.Second)) + + req := EncodeBarrierRequest(BarrierRequest{LSN: 1, Epoch: 0}) + WriteFrame(ctrlConn, MsgBarrierReq, req) + + msgType, payload, err := ReadFrame(ctrlConn) + if err != nil { + t.Fatal(err) + } + if msgType != MsgBarrierResp { + t.Fatalf("unexpected msg type 0x%02x", msgType) + } + if payload[0] != BarrierOK { + t.Errorf("barrier status: got %d, want BarrierOK", payload[0]) + } +} + +func testBarrierWaitForEntries(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + // Send barrier BEFORE the entry arrives. + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatal(err) + } + defer ctrlConn.Close() + ctrlConn.SetDeadline(time.Now().Add(5 * time.Second)) + + barrierDone := make(chan byte, 1) + go func() { + req := EncodeBarrierRequest(BarrierRequest{LSN: 1, Epoch: 0}) + WriteFrame(ctrlConn, MsgBarrierReq, req) + _, payload, err := ReadFrame(ctrlConn) + if err != nil { + barrierDone <- 0xFF + return + } + barrierDone <- payload[0] + }() + + // Wait a bit, then send the entry. + time.Sleep(50 * time.Millisecond) + + dataConn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer dataConn.Close() + + entry := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeTrim, LBA: 0, Length: 4096} + encoded, _ := entry.Encode() + WriteFrame(dataConn, MsgWALEntry, encoded) + + select { + case status := <-barrierDone: + if status != BarrierOK { + t.Errorf("barrier status: got %d, want BarrierOK", status) + } + case <-time.After(10 * time.Second): + t.Fatal("barrier did not complete") + } +} + +func testBarrierTimeout(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.barrierTimeout = 100 * time.Millisecond // fast timeout for test + recv.Serve() + defer recv.Stop() + + // Send barrier for LSN=999 that will never arrive. + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatal(err) + } + defer ctrlConn.Close() + ctrlConn.SetDeadline(time.Now().Add(5 * time.Second)) + + req := EncodeBarrierRequest(BarrierRequest{LSN: 999, Epoch: 0}) + WriteFrame(ctrlConn, MsgBarrierReq, req) + + start := time.Now() + _, payload, err := ReadFrame(ctrlConn) + elapsed := time.Since(start) + if err != nil { + t.Fatal(err) + } + if payload[0] != BarrierTimeout { + t.Errorf("barrier status: got %d, want BarrierTimeout", payload[0]) + } + if elapsed > 2*time.Second { + t.Errorf("barrier timeout took %v, expected ~100ms", elapsed) + } +} + +func testBarrierEpochMismatchFastFail(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + replica.epoch.Store(5) + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatal(err) + } + defer ctrlConn.Close() + ctrlConn.SetDeadline(time.Now().Add(5 * time.Second)) + + // Barrier with epoch=3 (mismatch with replica epoch=5). + req := EncodeBarrierRequest(BarrierRequest{LSN: 1, Epoch: 3}) + WriteFrame(ctrlConn, MsgBarrierReq, req) + + start := time.Now() + _, payload, err := ReadFrame(ctrlConn) + elapsed := time.Since(start) + if err != nil { + t.Fatal(err) + } + if payload[0] != BarrierEpochMismatch { + t.Errorf("barrier status: got %d, want BarrierEpochMismatch", payload[0]) + } + // Should be fast — no waiting. + if elapsed > 500*time.Millisecond { + t.Errorf("epoch mismatch took %v, expected fast fail", elapsed) + } +} + +func testBarrierConcurrentAppend(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + // Start barrier waiting for LSN=10. + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatal(err) + } + defer ctrlConn.Close() + ctrlConn.SetDeadline(time.Now().Add(10 * time.Second)) + + barrierDone := make(chan byte, 1) + go func() { + req := EncodeBarrierRequest(BarrierRequest{LSN: 10, Epoch: 0}) + WriteFrame(ctrlConn, MsgBarrierReq, req) + _, payload, err := ReadFrame(ctrlConn) + if err != nil { + barrierDone <- 0xFF + return + } + barrierDone <- payload[0] + }() + + // Stream 10 entries concurrently. + dataConn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer dataConn.Close() + + for i := uint64(1); i <= 10; i++ { + entry := &WALEntry{LSN: i, Epoch: 0, Type: EntryTypeTrim, LBA: i, Length: 4096} + encoded, _ := entry.Encode() + WriteFrame(dataConn, MsgWALEntry, encoded) + } + + select { + case status := <-barrierDone: + if status != BarrierOK { + t.Errorf("barrier status: got %d, want BarrierOK", status) + } + case <-time.After(10 * time.Second): + t.Fatal("barrier did not complete") + } +} + +// ============================================================================= +// Phase 4A CP2: Distributed group commit tests +// ============================================================================= + +func testDistCommitBothPass(t *testing.T) { + syncCalled := atomic.Bool{} + walSync := func() error { + syncCalled.Store(true) + return nil + } + + // Mock shipper with barrier that succeeds. + dataAddr, _, _ := mockDataServer(t) + ctrlAddr, _ := mockCtrlServer(t, BarrierOK) + + v := createTestVol(t) + defer v.Close() + + shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }) + defer shipper.Stop() + + distSync := MakeDistributedSync(walSync, shipper, v) + if err := distSync(); err != nil { + t.Fatalf("distSync: %v", err) + } + if !syncCalled.Load() { + t.Error("local walSync was not called") + } +} + +func testDistCommitLocalFail(t *testing.T) { + walSync := func() error { + return fmt.Errorf("disk error") + } + + dataAddr, _, _ := mockDataServer(t) + ctrlAddr, _ := mockCtrlServer(t, BarrierOK) + + v := createTestVol(t) + defer v.Close() + + shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }) + defer shipper.Stop() + + distSync := MakeDistributedSync(walSync, shipper, v) + err := distSync() + if err == nil { + t.Fatal("expected error from local sync failure") + } +} + +func testDistCommitRemoteFailDegrades(t *testing.T) { + walSync := func() error { return nil } + + dataAddr, _, _ := mockDataServer(t) + // Control server that returns epoch mismatch. + ctrlAddr, _ := mockCtrlServer(t, BarrierEpochMismatch) + + v := createTestVol(t) + defer v.Close() + + shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }) + defer shipper.Stop() + + // Set shipper on vol so degradeReplica can mark it degraded. + v.shipper = shipper + + distSync := MakeDistributedSync(walSync, shipper, v) + // Should NOT return error (local succeeded, remote degraded). + if err := distSync(); err != nil { + t.Fatalf("distSync should not fail on remote error: %v", err) + } + + if !shipper.IsDegraded() { + t.Error("shipper should be degraded after barrier failure") + } + + // Second call should fall back to local-only. + if err := distSync(); err != nil { + t.Fatalf("distSync local-only: %v", err) + } +} + +func testDistCommitNoReplica(t *testing.T) { + syncCalled := atomic.Bool{} + walSync := func() error { + syncCalled.Store(true) + return nil + } + + v := createTestVol(t) + defer v.Close() + + // nil shipper — local-only mode. + distSync := MakeDistributedSync(walSync, nil, v) + if err := distSync(); err != nil { + t.Fatalf("distSync: %v", err) + } + if !syncCalled.Load() { + t.Error("local walSync was not called") + } +} + +// ============================================================================= +// Phase 4A CP2: BlockVol integration tests +// ============================================================================= + +func testBlockvolWriteWithReplica(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + // Start replica receiver. + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + // Configure primary to ship to replica. + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write data on primary. + data := makeBlock('R') + if err := primary.WriteLBA(0, data); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Wait for replica to receive. + deadline := time.After(5 * time.Second) + for recv.ReceivedLSN() < 1 { + select { + case <-deadline: + t.Fatal("timeout waiting for replica") + default: + time.Sleep(time.Millisecond) + } + } + + // Flush replica and read back. + replica.flusher.FlushOnce() + got, err := replica.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("replica ReadLBA: %v", err) + } + if !bytes.Equal(got, data) { + t.Error("replica data mismatch") + } +} + +func testBlockvolNoReplicaCompat(t *testing.T) { + // Verify that a BlockVol without replica works identically to Phase 3. + v := createTestVol(t) + defer v.Close() + + data := makeBlock('N') + if err := v.WriteLBA(0, data); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + if err := v.SyncCache(); err != nil { + t.Fatalf("SyncCache: %v", err) + } + + got, err := v.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA: %v", err) + } + if !bytes.Equal(got, data) { + t.Error("data mismatch in no-replica mode") + } + + // Verify shipper is nil. + if v.shipper != nil { + t.Error("shipper should be nil without SetReplicaAddr") + } +} + +// ============================================================================= +// Phase 4A CP2 bug fix tests +// ============================================================================= + +func testReplicaRejectFutureEpoch(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + replica.epoch.Store(3) // replica at epoch 3 + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Entry with epoch 5 (future) — must be rejected. Replicas must NOT + // accept epoch bumps from WAL stream; only master can change epoch. + entry := &WALEntry{LSN: 1, Epoch: 5, Type: EntryTypeTrim, LBA: 0, Length: 4096} + encoded, _ := entry.Encode() + WriteFrame(conn, MsgWALEntry, encoded) + + time.Sleep(50 * time.Millisecond) + + if recv.ReceivedLSN() != 0 { + t.Errorf("ReceivedLSN should be 0 (future epoch rejected), got %d", recv.ReceivedLSN()) + } +} + +func testReplicaRejectLSNGap(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Send LSN=1 — should succeed. + entry1 := &WALEntry{LSN: 1, Epoch: 0, Type: EntryTypeTrim, LBA: 0, Length: 4096} + encoded1, _ := entry1.Encode() + WriteFrame(conn, MsgWALEntry, encoded1) + + deadline := time.After(2 * time.Second) + for recv.ReceivedLSN() < 1 { + select { + case <-deadline: + t.Fatal("timeout waiting for LSN 1") + default: + time.Sleep(time.Millisecond) + } + } + + // Send LSN=3 (gap — skips LSN=2) — must be rejected. + entry3 := &WALEntry{LSN: 3, Epoch: 0, Type: EntryTypeTrim, LBA: 1, Length: 4096} + encoded3, _ := entry3.Encode() + WriteFrame(conn, MsgWALEntry, encoded3) + + time.Sleep(50 * time.Millisecond) + + if recv.ReceivedLSN() != 1 { + t.Errorf("ReceivedLSN should be 1 (gap rejected), got %d", recv.ReceivedLSN()) + } +} + +func testBarrierFsyncFailedStatus(t *testing.T) { + // Verify BarrierFsyncFailed is a distinct status code. + if BarrierFsyncFailed == BarrierTimeout { + t.Error("BarrierFsyncFailed should be distinct from BarrierTimeout") + } + if BarrierFsyncFailed == BarrierOK { + t.Error("BarrierFsyncFailed should be distinct from BarrierOK") + } + if BarrierFsyncFailed != 0x03 { + t.Errorf("BarrierFsyncFailed: got 0x%02x, want 0x03", BarrierFsyncFailed) + } +} + +func testBarrierConfigurableTimeout(t *testing.T) { + primary, replica := createReplicaVolPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + // Set a very short timeout. + recv.barrierTimeout = 50 * time.Millisecond + recv.Serve() + defer recv.Stop() + + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatal(err) + } + defer ctrlConn.Close() + ctrlConn.SetDeadline(time.Now().Add(5 * time.Second)) + + req := EncodeBarrierRequest(BarrierRequest{LSN: 999, Epoch: 0}) + start := time.Now() + WriteFrame(ctrlConn, MsgBarrierReq, req) + + _, payload, err := ReadFrame(ctrlConn) + elapsed := time.Since(start) + if err != nil { + t.Fatal(err) + } + if payload[0] != BarrierTimeout { + t.Errorf("barrier status: got %d, want BarrierTimeout", payload[0]) + } + // Should complete quickly — well under 1s. + if elapsed > 1*time.Second { + t.Errorf("configurable timeout took %v, expected ~50ms", elapsed) + } +} + +// Suppress unused import warnings. +var _ = fmt.Sprintf +var _ io.Reader +var _ net.Conn diff --git a/weed/storage/blockvol/dist_group_commit.go b/weed/storage/blockvol/dist_group_commit.go new file mode 100644 index 000000000..1a489da35 --- /dev/null +++ b/weed/storage/blockvol/dist_group_commit.go @@ -0,0 +1,46 @@ +package blockvol + +import ( + "sync" +) + +// MakeDistributedSync creates a sync function that runs local WAL fsync and +// replica barrier in parallel. If no replica is configured or the replica is +// degraded, it falls back to local-only sync (Phase 3 behavior). +// +// walSync: the local fsync function (typically fd.Sync) +// shipper: the WAL shipper to the replica (may be nil) +// vol: the BlockVol (used to read nextLSN and trigger degradation) +func MakeDistributedSync(walSync func() error, shipper *WALShipper, vol *BlockVol) func() error { + return func() error { + if shipper == nil || shipper.IsDegraded() { + return walSync() + } + + // The highest LSN that needs to be durable is nextLSN-1. + lsnMax := vol.nextLSN.Load() - 1 + + var localErr, remoteErr error + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + localErr = walSync() + }() + go func() { + defer wg.Done() + remoteErr = shipper.Barrier(lsnMax) + }() + wg.Wait() + + if localErr != nil { + return localErr + } + if remoteErr != nil { + // Local succeeded, replica failed — degrade but don't fail the client. + vol.degradeReplica(remoteErr) + return nil + } + return nil + } +} diff --git a/weed/storage/blockvol/qa_phase4a_cp2_test.go b/weed/storage/blockvol/qa_phase4a_cp2_test.go new file mode 100644 index 000000000..dc7c6197c --- /dev/null +++ b/weed/storage/blockvol/qa_phase4a_cp2_test.go @@ -0,0 +1,1104 @@ +package blockvol + +import ( + "bytes" + "errors" + "io" + "math" + "net" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" +) + +// TestQAPhase4ACP2 tests Phase 4A CP2 replication primitives adversarially: +// frame protocol, WAL shipper, replica receiver, barrier, distributed sync. +func TestQAPhase4ACP2(t *testing.T) { + tests := []struct { + name string + run func(t *testing.T) + }{ + // QA-4A-CP2-1: Frame Protocol Adversarial + {name: "frame_max_payload_boundary", run: testQAFrameMaxPayloadBoundary}, + {name: "frame_truncated_header", run: testQAFrameTruncatedHeader}, + {name: "frame_truncated_payload", run: testQAFrameTruncatedPayload}, + {name: "frame_zero_payload", run: testQAFrameZeroPayload}, + {name: "frame_concurrent_writes_no_interleave", run: testQAFrameConcurrentWrites}, + + // QA-4A-CP2-2: WALShipper Adversarial + {name: "shipper_ship_after_stop", run: testQAShipperShipAfterStop}, + {name: "shipper_barrier_after_degraded", run: testQAShipperBarrierAfterDegraded}, + {name: "shipper_stale_epoch_no_shippedlsn", run: testQAShipperStaleEpochNoShippedLSN}, + {name: "shipper_degraded_is_permanent", run: testQAShipperDegradedPermanent}, + {name: "shipper_concurrent_ship_stop", run: testQAShipperConcurrentShipStop}, + + // QA-4A-CP2-3: ReplicaReceiver Adversarial + {name: "receiver_out_of_order_lsn_skipped", run: testQAReceiverOutOfOrderLSN}, + {name: "receiver_concurrent_data_conns", run: testQAReceiverConcurrentDataConns}, + {name: "receiver_future_epoch_rejected", run: testQAReceiverFutureEpochRejected}, + {name: "receiver_barrier_before_entries_waits", run: testQAReceiverBarrierBeforeEntries}, + {name: "receiver_barrier_timeout_no_entries", run: testQAReceiverBarrierTimeoutNoEntries}, + {name: "receiver_barrier_epoch_mismatch", run: testQAReceiverBarrierEpochMismatch}, + {name: "receiver_stop_unblocks_barrier", run: testQAReceiverStopUnblocksBarrier}, + + // QA-4A-CP2-4: DistributedSync Adversarial + {name: "dsync_local_fail_returns_error", run: testQADSyncLocalFailReturnsError}, + {name: "dsync_remote_fail_degrades_not_errors", run: testQADSyncRemoteFailDegrades}, + {name: "dsync_both_fail_returns_local", run: testQADSyncBothFail}, + {name: "dsync_parallel_execution", run: testQADSyncParallelExecution}, + + // QA-4A-CP2-5: End-to-end Adversarial + {name: "e2e_replica_data_matches_primary", run: testQAE2EReplicaDataMatchesPrimary}, + {name: "e2e_close_primary_during_ship", run: testQAE2EClosePrimaryDuringShip}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.run(t) + }) + } +} + +// --- helpers --- + +// createReplicaPair creates a fenced primary + replica vol connected via loopback. +// Returns primary, replica, and a cleanup function. +func createReplicaPair(t *testing.T) (primary, replica *BlockVol) { + t.Helper() + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + // Create primary. + p, err := CreateBlockVol(filepath.Join(dir, "primary.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 512 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol(primary): %v", err) + } + if err := p.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + p.SetMasterEpoch(1) + if err := p.SetRole(RolePrimary); err != nil { + t.Fatalf("SetRole: %v", err) + } + p.lease.Grant(30 * time.Second) + + // Create replica. + r, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 512 * 1024, + }, cfg) + if err != nil { + p.Close() + t.Fatalf("CreateBlockVol(replica): %v", err) + } + if err := r.SetEpoch(1); err != nil { + p.Close() + r.Close() + t.Fatalf("SetEpoch(replica): %v", err) + } + + // Start replica receiver on ephemeral ports. + if err := r.StartReplicaReceiver("127.0.0.1:0", "127.0.0.1:0"); err != nil { + p.Close() + r.Close() + t.Fatalf("StartReplicaReceiver: %v", err) + } + + // Connect primary's shipper to replica's addresses. + p.SetReplicaAddr(r.replRecv.DataAddr(), r.replRecv.CtrlAddr()) + + return p, r +} + +// --- QA-4A-CP2-1: Frame Protocol Adversarial --- + +func testQAFrameMaxPayloadBoundary(t *testing.T) { + // Payload exactly at maxFramePayload → should succeed. + // Payload at maxFramePayload+1 → ReadFrame must return ErrFrameTooLarge. + + // Test at boundary: we can't allocate 256MB in a test, but we can test + // the ReadFrame parser with a crafted header. + var buf bytes.Buffer + + // Write a frame header claiming payload = maxFramePayload + 1 + hdr := make([]byte, frameHeaderSize) + hdr[0] = MsgWALEntry + bigEndianPut32(hdr[1:5], uint32(maxFramePayload)+1) + buf.Write(hdr) + + _, _, err := ReadFrame(&buf) + if !errors.Is(err, ErrFrameTooLarge) { + t.Errorf("ReadFrame with oversized payload: got %v, want ErrFrameTooLarge", err) + } + + // Exactly at max: header says maxFramePayload but we won't supply the data. + // ReadFrame should try to read and get EOF (not ErrFrameTooLarge). + buf.Reset() + hdr[0] = MsgWALEntry + bigEndianPut32(hdr[1:5], uint32(maxFramePayload)) + buf.Write(hdr) + // No payload data → ReadFrame should return io.ErrUnexpectedEOF (not ErrFrameTooLarge). + _, _, err = ReadFrame(&buf) + if errors.Is(err, ErrFrameTooLarge) { + t.Error("payload exactly at maxFramePayload should not be rejected as too large") + } + // Should be some read error (EOF/UnexpectedEOF) since we didn't write payload bytes. + if err == nil { + t.Error("ReadFrame with missing payload data should return error") + } +} + +func testQAFrameTruncatedHeader(t *testing.T) { + // Only 3 bytes (< 5 byte header) → ReadFrame must return error. + buf := bytes.NewReader([]byte{0x01, 0x00, 0x00}) + _, _, err := ReadFrame(buf) + if err == nil { + t.Error("ReadFrame with 3-byte truncated header should return error") + } +} + +func testQAFrameTruncatedPayload(t *testing.T) { + // Header says 100 bytes payload, but only 10 bytes follow. + var buf bytes.Buffer + hdr := make([]byte, frameHeaderSize) + hdr[0] = MsgWALEntry + bigEndianPut32(hdr[1:5], 100) + buf.Write(hdr) + buf.Write(make([]byte, 10)) // only 10 of 100 bytes + + _, _, err := ReadFrame(&buf) + if err == nil { + t.Error("ReadFrame with truncated payload should return error") + } +} + +func testQAFrameZeroPayload(t *testing.T) { + // Zero-length payload must roundtrip correctly. + var buf bytes.Buffer + if err := WriteFrame(&buf, 0x42, nil); err != nil { + t.Fatalf("WriteFrame(nil payload): %v", err) + } + if err := WriteFrame(&buf, 0x43, []byte{}); err != nil { + t.Fatalf("WriteFrame(empty payload): %v", err) + } + + msgType, payload, err := ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame(nil payload): %v", err) + } + if msgType != 0x42 || len(payload) != 0 { + t.Errorf("frame 1: type=0x%02x len=%d, want 0x42 len=0", msgType, len(payload)) + } + + msgType, payload, err = ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame(empty payload): %v", err) + } + if msgType != 0x43 || len(payload) != 0 { + t.Errorf("frame 2: type=0x%02x len=%d, want 0x43 len=0", msgType, len(payload)) + } +} + +func testQAFrameConcurrentWrites(t *testing.T) { + // Multiple goroutines writing frames to the same connection. + // Frames must not interleave (each frame readable as a complete unit). + // This relies on the WALShipper's mutex — test at the connection level. + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + const writers = 8 + const framesPerWriter = 20 + var mu sync.Mutex // simulate WALShipper's mutex + + var wg sync.WaitGroup + wg.Add(writers) + for g := 0; g < writers; g++ { + go func(id int) { + defer wg.Done() + payload := []byte{byte(id)} + for i := 0; i < framesPerWriter; i++ { + mu.Lock() + _ = WriteFrame(client, MsgWALEntry, payload) + mu.Unlock() + } + }(g) + } + + // Reader: read all frames and verify integrity. + totalFrames := writers * framesPerWriter + readDone := make(chan error, 1) + var frameCount int + go func() { + for i := 0; i < totalFrames; i++ { + msgType, payload, err := ReadFrame(server) + if err != nil { + readDone <- err + return + } + if msgType != MsgWALEntry { + readDone <- errors.New("wrong message type") + return + } + if len(payload) != 1 { + readDone <- errors.New("wrong payload length") + return + } + if payload[0] >= byte(writers) { + readDone <- errors.New("invalid writer ID in payload") + return + } + frameCount++ + } + readDone <- nil + }() + + wg.Wait() + + select { + case err := <-readDone: + if err != nil { + t.Fatalf("frame read error: %v (after %d frames)", err, frameCount) + } + case <-time.After(5 * time.Second): + t.Fatal("frame reader hung for 5s") + } + + if frameCount != totalFrames { + t.Errorf("read %d frames, want %d", frameCount, totalFrames) + } +} + +// --- QA-4A-CP2-2: WALShipper Adversarial --- + +func testQAShipperShipAfterStop(t *testing.T) { + s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }) + s.Stop() + + // Ship after Stop must not panic, must return nil (silently drop). + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('S')} + err := s.Ship(entry) + if err != nil { + t.Errorf("Ship after Stop: got %v, want nil", err) + } + if s.ShippedLSN() != 0 { + t.Errorf("ShippedLSN after stopped Ship: got %d, want 0", s.ShippedLSN()) + } +} + +func testQAShipperBarrierAfterDegraded(t *testing.T) { + s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }) + s.degraded.Store(true) + + err := s.Barrier(1) + if !errors.Is(err, ErrReplicaDegraded) { + t.Errorf("Barrier when degraded: got %v, want ErrReplicaDegraded", err) + } +} + +func testQAShipperStaleEpochNoShippedLSN(t *testing.T) { + // Ship with epoch != current → entry silently dropped, shippedLSN unchanged. + currentEpoch := uint64(5) + s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return currentEpoch }) + + entry := &WALEntry{LSN: 10, Epoch: 3, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('X')} + err := s.Ship(entry) + if err != nil { + t.Errorf("Ship with stale epoch: got %v, want nil", err) + } + if s.ShippedLSN() != 0 { + t.Errorf("ShippedLSN after stale epoch Ship: got %d, want 0 (entry should be dropped)", s.ShippedLSN()) + } +} + +func testQAShipperDegradedPermanent(t *testing.T) { + // After degradation, Ship and Barrier must fail immediately forever. + // Create a listener that accepts but immediately closes (triggers write error). + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + defer ln.Close() + + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + conn.Close() // immediately close → Ship will get write error + } + }() + + s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 }) + defer s.Stop() + + // Ship triggers connection → immediate close → write error → degraded. + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('D')} + _ = s.Ship(entry) // first attempt may or may not degrade + _ = s.Ship(entry) // second attempt more likely to hit closed conn + _ = s.Ship(entry) // third attempt for good measure + + // Force degraded if the connection race didn't trigger it. + if !s.IsDegraded() { + // The connection might have worked briefly. Manually degrade for the rest of the test. + s.degraded.Store(true) + } + + // Once degraded, subsequent Ship must not attempt connection. + lsnBefore := s.ShippedLSN() + for i := 0; i < 100; i++ { + _ = s.Ship(entry) + } + lsnAfter := s.ShippedLSN() + if lsnAfter > lsnBefore { + t.Errorf("ShippedLSN advanced from %d to %d after degradation — should not ship when degraded", lsnBefore, lsnAfter) + } + + // Barrier must immediately return ErrReplicaDegraded. + err = s.Barrier(1) + if !errors.Is(err, ErrReplicaDegraded) { + t.Errorf("Barrier after degraded: got %v, want ErrReplicaDegraded", err) + } +} + +func testQAShipperConcurrentShipStop(t *testing.T) { + // Ship and Stop from concurrent goroutines → no deadlock, no panic. + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + // Just consume data. + go io.Copy(io.Discard, conn) + } + }() + defer ln.Close() + + s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 }) + + var wg sync.WaitGroup + // Concurrent shippers. + wg.Add(4) + for g := 0; g < 4; g++ { + go func() { + defer wg.Done() + entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('C')} + for i := 0; i < 50; i++ { + _ = s.Ship(entry) + } + }() + } + + // Stop mid-flight. + time.Sleep(5 * time.Millisecond) + s.Stop() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("concurrent Ship + Stop deadlocked for 5s") + } +} + +// --- QA-4A-CP2-3: ReplicaReceiver Adversarial --- + +func testQAReceiverOutOfOrderLSN(t *testing.T) { + // Tests contiguity enforcement: + // 1. Contiguous LSN=1..5 → all applied + // 2. Duplicate LSN=3 → skipped + // 3. Gap LSN=7 (skipping 6) → rejected + // 4. Correct next LSN=6 → accepted + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatalf("dial data: %v", err) + } + defer conn.Close() + + // Case 1: Contiguous LSN=1..5 → all applied. + for lsn := uint64(1); lsn <= 5; lsn++ { + entry := &WALEntry{LSN: lsn, Epoch: 1, Type: EntryTypeWrite, LBA: lsn - 1, Length: 4096, Data: makeBlock(byte('A' + lsn))} + encoded, _ := entry.Encode() + if err := WriteFrame(conn, MsgWALEntry, encoded); err != nil { + t.Fatalf("send LSN=%d: %v", lsn, err) + } + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 5 { + t.Fatalf("ReceivedLSN = %d after LSN 1-5, want 5", recv.ReceivedLSN()) + } + + // Case 2: Duplicate LSN=3 → skipped. + entry3 := &WALEntry{LSN: 3, Epoch: 1, Type: EntryTypeWrite, LBA: 2, Length: 4096, Data: makeBlock('Z')} + encoded3, _ := entry3.Encode() + if err := WriteFrame(conn, MsgWALEntry, encoded3); err != nil { + t.Fatalf("send duplicate LSN=3: %v", err) + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 5 { + t.Errorf("ReceivedLSN = %d after duplicate LSN=3, want 5", recv.ReceivedLSN()) + } + + // Case 3: Gap LSN=7 (skips 6) → rejected by contiguity check. + entry7 := &WALEntry{LSN: 7, Epoch: 1, Type: EntryTypeWrite, LBA: 6, Length: 4096, Data: makeBlock('G')} + encoded7, _ := entry7.Encode() + if err := WriteFrame(conn, MsgWALEntry, encoded7); err != nil { + t.Fatalf("send gap LSN=7: %v", err) + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 5 { + t.Errorf("ReceivedLSN = %d after gap LSN=7, want 5 (gap must be rejected)", recv.ReceivedLSN()) + } + + // Case 4: Correct next LSN=6 → accepted. + entry6 := &WALEntry{LSN: 6, Epoch: 1, Type: EntryTypeWrite, LBA: 5, Length: 4096, Data: makeBlock('F')} + encoded6, _ := entry6.Encode() + if err := WriteFrame(conn, MsgWALEntry, encoded6); err != nil { + t.Fatalf("send LSN=6: %v", err) + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 6 { + t.Errorf("ReceivedLSN = %d after correct LSN=6, want 6", recv.ReceivedLSN()) + } +} + +func testQAReceiverConcurrentDataConns(t *testing.T) { + // With contiguous LSN enforcement, entries must arrive in order on a single + // connection. Test that a stream of 40 contiguous entries from one connection + // is fully applied, then a second connection sending a duplicate is rejected. + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 512 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + defer recv.Stop() + + // Send 40 contiguous entries from a single connection. + conn1, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer conn1.Close() + + for lsn := uint64(1); lsn <= 40; lsn++ { + entry := &WALEntry{LSN: lsn, Epoch: 1, Type: EntryTypeWrite, LBA: lsn, Length: 4096, Data: makeBlock(byte('A' + lsn%26))} + encoded, _ := entry.Encode() + if err := WriteFrame(conn1, MsgWALEntry, encoded); err != nil { + t.Fatalf("send LSN=%d: %v", lsn, err) + } + } + + time.Sleep(100 * time.Millisecond) + + got := recv.ReceivedLSN() + if got != 40 { + t.Errorf("ReceivedLSN = %d after 40 contiguous entries, want 40", got) + } + + // Second connection sending a duplicate LSN=5 → must be skipped. + conn2, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatalf("dial conn2: %v", err) + } + defer conn2.Close() + + dup := &WALEntry{LSN: 5, Epoch: 1, Type: EntryTypeWrite, LBA: 5, Length: 4096, Data: makeBlock('Z')} + encodedDup, _ := dup.Encode() + WriteFrame(conn2, MsgWALEntry, encodedDup) + + time.Sleep(20 * time.Millisecond) + + // receivedLSN must still be 40. + if recv.ReceivedLSN() != 40 { + t.Errorf("ReceivedLSN = %d after duplicate on conn2, want 40", recv.ReceivedLSN()) + } +} + +func testQAReceiverFutureEpochRejected(t *testing.T) { + // R1 fix: replica must reject entries with epoch > local (not just <). + // A future epoch in the WAL stream is a protocol violation — only master + // can bump epochs via SetEpoch. + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(5); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + defer recv.Stop() + + conn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatalf("dial data: %v", err) + } + defer conn.Close() + + // Send entry with stale epoch=3 → rejected. + stale := &WALEntry{LSN: 1, Epoch: 3, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('S')} + encodedStale, _ := stale.Encode() + if err := WriteFrame(conn, MsgWALEntry, encodedStale); err != nil { + t.Fatalf("send stale epoch: %v", err) + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 0 { + t.Errorf("ReceivedLSN = %d after stale epoch entry, want 0", recv.ReceivedLSN()) + } + + // Send entry with future epoch=10 → also rejected (R1 fix). + future := &WALEntry{LSN: 1, Epoch: 10, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('F')} + encodedFuture, _ := future.Encode() + if err := WriteFrame(conn, MsgWALEntry, encodedFuture); err != nil { + t.Fatalf("send future epoch: %v", err) + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 0 { + t.Errorf("ReceivedLSN = %d after future epoch entry, want 0 (future epoch must be rejected)", recv.ReceivedLSN()) + } + + // Send entry with correct epoch=5 → accepted. + correct := &WALEntry{LSN: 1, Epoch: 5, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('C')} + encodedCorrect, _ := correct.Encode() + if err := WriteFrame(conn, MsgWALEntry, encodedCorrect); err != nil { + t.Fatalf("send correct epoch: %v", err) + } + time.Sleep(20 * time.Millisecond) + if recv.ReceivedLSN() != 1 { + t.Errorf("ReceivedLSN = %d after correct epoch entry, want 1", recv.ReceivedLSN()) + } +} + +func testQAReceiverBarrierEpochMismatch(t *testing.T) { + // Barrier with wrong epoch → immediate BarrierEpochMismatch (no wait). + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 100 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(5); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + defer recv.Stop() + + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatalf("dial ctrl: %v", err) + } + defer ctrlConn.Close() + + // Barrier with stale epoch=3 → immediate EpochMismatch. + req := EncodeBarrierRequest(BarrierRequest{LSN: 1, Epoch: 3}) + start := time.Now() + if err := WriteFrame(ctrlConn, MsgBarrierReq, req); err != nil { + t.Fatalf("send barrier: %v", err) + } + + ctrlConn.SetReadDeadline(time.Now().Add(3 * time.Second)) + _, payload, err := ReadFrame(ctrlConn) + elapsed := time.Since(start) + if err != nil { + t.Fatalf("read barrier response: %v", err) + } + if payload[0] != BarrierEpochMismatch { + t.Errorf("barrier status = %d, want BarrierEpochMismatch(%d)", payload[0], BarrierEpochMismatch) + } + // Must be fast (no waiting), well under 100ms. + if elapsed > 500*time.Millisecond { + t.Errorf("epoch mismatch barrier took %v, expected immediate response", elapsed) + } + + // Barrier with future epoch=99 → also immediate EpochMismatch. + req2 := EncodeBarrierRequest(BarrierRequest{LSN: 1, Epoch: 99}) + if err := WriteFrame(ctrlConn, MsgBarrierReq, req2); err != nil { + t.Fatalf("send barrier future: %v", err) + } + + ctrlConn.SetReadDeadline(time.Now().Add(3 * time.Second)) + _, payload2, err := ReadFrame(ctrlConn) + if err != nil { + t.Fatalf("read barrier response (future): %v", err) + } + if payload2[0] != BarrierEpochMismatch { + t.Errorf("future epoch barrier status = %d, want BarrierEpochMismatch(%d)", payload2[0], BarrierEpochMismatch) + } +} + +func testQAReceiverBarrierBeforeEntries(t *testing.T) { + // Barrier arrives BEFORE data entries → must wait, then succeed when entries arrive. + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 5 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + defer recv.Stop() + + // Send barrier for LSN=3 BEFORE any data. + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatalf("dial ctrl: %v", err) + } + defer ctrlConn.Close() + + barrierDone := make(chan BarrierResponse, 1) + go func() { + req := EncodeBarrierRequest(BarrierRequest{LSN: 3, Epoch: 1}) + if err := WriteFrame(ctrlConn, MsgBarrierReq, req); err != nil { + return + } + _, payload, err := ReadFrame(ctrlConn) + if err != nil { + return + } + barrierDone <- BarrierResponse{Status: payload[0]} + }() + + // Barrier should be waiting (not returned yet). + select { + case resp := <-barrierDone: + t.Fatalf("barrier returned immediately with status %d, expected it to wait", resp.Status) + case <-time.After(50 * time.Millisecond): + // Good — barrier is waiting. + } + + // Now send entries LSN=1,2,3. + dataConn, err := net.Dial("tcp", recv.DataAddr()) + if err != nil { + t.Fatalf("dial data: %v", err) + } + defer dataConn.Close() + + for lsn := uint64(1); lsn <= 3; lsn++ { + entry := &WALEntry{LSN: lsn, Epoch: 1, Type: EntryTypeWrite, LBA: lsn, Length: 4096, Data: makeBlock(byte('0' + lsn))} + encoded, _ := entry.Encode() + if err := WriteFrame(dataConn, MsgWALEntry, encoded); err != nil { + t.Fatalf("send LSN=%d: %v", lsn, err) + } + } + + // Barrier should now complete with OK. + select { + case resp := <-barrierDone: + if resp.Status != BarrierOK { + t.Errorf("barrier status = %d, want BarrierOK(0)", resp.Status) + } + case <-time.After(5 * time.Second): + t.Fatal("barrier hung for 5s after entries were sent") + } +} + +func testQAReceiverBarrierTimeoutNoEntries(t *testing.T) { + // Barrier for LSN=999 with no entries → must timeout (not hang forever). + // Uses short configurable barrierTimeout for fast test. + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 100 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.barrierTimeout = 100 * time.Millisecond // fast timeout for test + recv.Serve() + defer recv.Stop() + + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatalf("dial ctrl: %v", err) + } + defer ctrlConn.Close() + + req := EncodeBarrierRequest(BarrierRequest{LSN: 999, Epoch: 1}) + start := time.Now() + if err := WriteFrame(ctrlConn, MsgBarrierReq, req); err != nil { + t.Fatalf("send barrier: %v", err) + } + + ctrlConn.SetReadDeadline(time.Now().Add(3 * time.Second)) + _, payload, err := ReadFrame(ctrlConn) + elapsed := time.Since(start) + if err != nil { + t.Fatalf("read barrier response: %v", err) + } + if payload[0] != BarrierTimeout { + t.Errorf("barrier status = %d, want BarrierTimeout(%d)", payload[0], BarrierTimeout) + } + // Verify timeout was fast (~100ms, not 5s). + if elapsed > 1*time.Second { + t.Errorf("barrier timeout took %v, expected ~100ms (configurable timeout)", elapsed) + } +} + +func testQAReceiverStopUnblocksBarrier(t *testing.T) { + // Barrier waiting for entries → Stop called → barrier must return (not hang). + dir := t.TempDir() + cfg := DefaultConfig() + cfg.FlushInterval = 100 * time.Millisecond + + vol, err := CreateBlockVol(filepath.Join(dir, "replica.blockvol"), CreateOptions{ + VolumeSize: 1 * 1024 * 1024, BlockSize: 4096, WALSize: 256 * 1024, + }, cfg) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + defer vol.Close() + if err := vol.SetEpoch(1); err != nil { + t.Fatalf("SetEpoch: %v", err) + } + + recv, err := NewReplicaReceiver(vol, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("NewReplicaReceiver: %v", err) + } + recv.Serve() + + ctrlConn, err := net.Dial("tcp", recv.CtrlAddr()) + if err != nil { + t.Fatalf("dial ctrl: %v", err) + } + defer ctrlConn.Close() + + // Send barrier for high LSN (will wait). + req := EncodeBarrierRequest(BarrierRequest{LSN: math.MaxUint64, Epoch: 1}) + if err := WriteFrame(ctrlConn, MsgBarrierReq, req); err != nil { + t.Fatalf("send barrier: %v", err) + } + + // Let barrier start waiting. + time.Sleep(50 * time.Millisecond) + + // Stop should unblock the barrier. + recv.Stop() + + ctrlConn.SetReadDeadline(time.Now().Add(3 * time.Second)) + _, payload, err := ReadFrame(ctrlConn) + if err != nil { + // Connection closed by Stop → also acceptable. + t.Logf("barrier read after Stop: %v (connection closed — acceptable)", err) + return + } + if payload[0] != BarrierTimeout { + t.Errorf("barrier after Stop: status = %d, want BarrierTimeout(%d)", payload[0], BarrierTimeout) + } +} + +// --- QA-4A-CP2-4: DistributedSync Adversarial --- + +func testQADSyncLocalFailReturnsError(t *testing.T) { + // Local fsync fails → must return error regardless of remote result. + localErr := errors.New("disk on fire") + + vol := &BlockVol{} + vol.nextLSN.Store(10) + + syncFn := MakeDistributedSync( + func() error { return localErr }, + nil, // no shipper → local only + vol, + ) + + err := syncFn() + if !errors.Is(err, localErr) { + t.Errorf("dsync with local fail: got %v, want %v", err, localErr) + } +} + +func testQADSyncRemoteFailDegrades(t *testing.T) { + // Remote barrier fails → local succeeded → must return nil + degrade shipper. + primary, replica := createReplicaPair(t) + defer primary.Close() + defer replica.Close() + + // Write some data so there's something to sync. + if err := primary.WriteLBA(0, makeBlock('D')); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Stop the replica receiver to make barrier fail. + replica.replRecv.Stop() + + // Force shipper to reconnect (old ctrl conn may still be open). + primary.shipper.ctrlMu.Lock() + if primary.shipper.ctrlConn != nil { + primary.shipper.ctrlConn.Close() + primary.shipper.ctrlConn = nil + } + primary.shipper.ctrlMu.Unlock() + + // SyncCache triggers distributed sync → barrier fails → degrade. + err := primary.SyncCache() + // Should succeed (local fsync succeeded, remote degraded — returned nil). + if err != nil { + t.Errorf("SyncCache after replica stop: got %v, want nil (local succeeded, remote should degrade silently)", err) + } + + // Shipper should be degraded. + if !primary.shipper.IsDegraded() { + t.Error("shipper should be degraded after replica receiver stopped") + } +} + +func testQADSyncBothFail(t *testing.T) { + // Both local and remote fail → must return local error. + localErr := errors.New("local disk fail") + + vol := &BlockVol{} + vol.nextLSN.Store(10) + + shipper := NewWALShipper("127.0.0.1:1", "127.0.0.1:1", func() uint64 { return 1 }) + + syncFn := MakeDistributedSync( + func() error { return localErr }, + shipper, + vol, + ) + + err := syncFn() + if !errors.Is(err, localErr) { + t.Errorf("dsync with both fail: got %v, want %v", err, localErr) + } +} + +func testQADSyncParallelExecution(t *testing.T) { + // Verify local and remote execute in parallel, not sequentially. + // If sequential, total time ≈ 200ms. If parallel, ≈ 100ms. + // + // Strategy: create a real replica pair where barrier takes ~80ms + // (because data arrives with delay) and local fsync also takes ~80ms. + // If parallel: ~80ms total. If sequential: ~160ms. + primary, replica := createReplicaPair(t) + defer primary.Close() + defer replica.Close() + + // Write some data to primary so there's a WAL entry to sync. + if err := primary.WriteLBA(0, makeBlock('P')); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Give replica time to receive the shipped entry. + time.Sleep(30 * time.Millisecond) + + // Replace the distributed sync with one that has a slow local fsync. + // The barrier should be near-instant since replica already has the data. + var localStart, localEnd atomic.Int64 + slowLocalSync := func() error { + localStart.Store(time.Now().UnixNano()) + time.Sleep(80 * time.Millisecond) + localEnd.Store(time.Now().UnixNano()) + return nil + } + + syncFn := MakeDistributedSync(slowLocalSync, primary.shipper, primary) + + start := time.Now() + err := syncFn() + elapsed := time.Since(start) + + if err != nil { + t.Fatalf("dsync: %v", err) + } + + // Parallel: total ≈ max(local=80ms, barrier=fast) ≈ 80ms + // Sequential: total ≈ local + barrier ≈ 80ms + barrier_overhead + // Key check: elapsed should be < 150ms (well under sequential 160ms+) + if elapsed > 150*time.Millisecond { + t.Errorf("dsync took %v, suggesting sequential execution (expected <150ms for parallel)", elapsed) + } + + // Verify local fsync actually ran (not skipped). + if localStart.Load() == 0 { + t.Error("local fsync was never called — distributed sync may have fallen back to local-only") + } +} + +// --- QA-4A-CP2-5: End-to-end Adversarial --- + +func testQAE2EReplicaDataMatchesPrimary(t *testing.T) { + // Write N blocks on primary → verify replica has identical data via ReadLBA. + primary, replica := createReplicaPair(t) + defer primary.Close() + defer replica.Close() + + const numBlocks = 20 + for i := 0; i < numBlocks; i++ { + if err := primary.WriteLBA(uint64(i), makeBlock(byte('A'+i%26))); err != nil { + t.Fatalf("WriteLBA(%d): %v", i, err) + } + } + + // SyncCache ensures WAL entries are durable on both nodes. + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache: %v", err) + } + + // Give replica time to process entries. + time.Sleep(50 * time.Millisecond) + + // Verify replica has the data. + for i := 0; i < numBlocks; i++ { + data, err := replica.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("replica ReadLBA(%d): %v", i, err) + } + expected := byte('A' + i%26) + if data[0] != expected { + t.Errorf("replica LBA %d: data[0] = %c, want %c", i, data[0], expected) + } + } + + // Verify receivedLSN on replica matches what primary shipped. + primaryLSN := primary.shipper.ShippedLSN() + replicaLSN := replica.replRecv.ReceivedLSN() + if replicaLSN < primaryLSN { + t.Errorf("replica receivedLSN = %d < primary shippedLSN = %d", replicaLSN, primaryLSN) + } +} + +func testQAE2EClosePrimaryDuringShip(t *testing.T) { + // Close primary while writes + shipping in progress → no hang, no panic. + primary, replica := createReplicaPair(t) + defer replica.Close() + + var wg sync.WaitGroup + + // Writer goroutine. + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + err := primary.WriteLBA(uint64(i%256), makeBlock(byte('W'))) + if err != nil { + return // closed or WAL full — expected + } + } + }() + + // Close after a few writes. + time.Sleep(10 * time.Millisecond) + primary.Close() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("Close primary during ship hung for 10s") + } +} + +// --- helper --- + +func bigEndianPut32(b []byte, v uint32) { + b[0] = byte(v >> 24) + b[1] = byte(v >> 16) + b[2] = byte(v >> 8) + b[3] = byte(v) +} diff --git a/weed/storage/blockvol/repl_proto.go b/weed/storage/blockvol/repl_proto.go new file mode 100644 index 000000000..8f15024a9 --- /dev/null +++ b/weed/storage/blockvol/repl_proto.go @@ -0,0 +1,107 @@ +package blockvol + +import ( + "encoding/binary" + "errors" + "fmt" + "io" +) + +// Data channel message types. +const ( + MsgWALEntry byte = 0x01 +) + +// Control channel message types. +const ( + MsgBarrierReq byte = 0x01 + MsgBarrierResp byte = 0x02 +) + +// Barrier response status codes. +const ( + BarrierOK byte = 0x00 + BarrierEpochMismatch byte = 0x01 + BarrierTimeout byte = 0x02 + BarrierFsyncFailed byte = 0x03 +) + +// BarrierRequest is sent by the primary to the replica on the control channel. +type BarrierRequest struct { + Vid uint32 + LSN uint64 + Epoch uint64 +} + +// BarrierResponse is the replica's reply to a barrier request. +type BarrierResponse struct { + Status byte +} + +// Frame header: [1B type][4B payload_len]. +const frameHeaderSize = 5 + +// maxFramePayload caps the payload size to prevent OOM on corrupt data. +const maxFramePayload = 256 * 1024 * 1024 // 256MB + +var ( + ErrFrameTooLarge = errors.New("repl: frame payload exceeds maximum size") + ErrFrameEmpty = errors.New("repl: empty frame payload") +) + +// WriteFrame writes a length-prefixed frame: [1B type][4B len][payload]. +func WriteFrame(w io.Writer, msgType byte, payload []byte) error { + hdr := make([]byte, frameHeaderSize) + hdr[0] = msgType + binary.BigEndian.PutUint32(hdr[1:5], uint32(len(payload))) + if _, err := w.Write(hdr); err != nil { + return fmt.Errorf("repl: write frame header: %w", err) + } + if len(payload) > 0 { + if _, err := w.Write(payload); err != nil { + return fmt.Errorf("repl: write frame payload: %w", err) + } + } + return nil +} + +// ReadFrame reads a length-prefixed frame and returns message type and payload. +func ReadFrame(r io.Reader) (msgType byte, payload []byte, err error) { + hdr := make([]byte, frameHeaderSize) + if _, err = io.ReadFull(r, hdr); err != nil { + return 0, nil, fmt.Errorf("repl: read frame header: %w", err) + } + msgType = hdr[0] + payloadLen := binary.BigEndian.Uint32(hdr[1:5]) + if payloadLen > maxFramePayload { + return 0, nil, ErrFrameTooLarge + } + payload = make([]byte, payloadLen) + if payloadLen > 0 { + if _, err = io.ReadFull(r, payload); err != nil { + return 0, nil, fmt.Errorf("repl: read frame payload: %w", err) + } + } + return msgType, payload, nil +} + +// EncodeBarrierRequest serializes a BarrierRequest (4+8+8 = 20 bytes). +func EncodeBarrierRequest(req BarrierRequest) []byte { + buf := make([]byte, 20) + binary.BigEndian.PutUint32(buf[0:4], req.Vid) + binary.BigEndian.PutUint64(buf[4:12], req.LSN) + binary.BigEndian.PutUint64(buf[12:20], req.Epoch) + return buf +} + +// DecodeBarrierRequest deserializes a BarrierRequest. +func DecodeBarrierRequest(buf []byte) (BarrierRequest, error) { + if len(buf) < 20 { + return BarrierRequest{}, fmt.Errorf("repl: barrier request too short: %d bytes", len(buf)) + } + return BarrierRequest{ + Vid: binary.BigEndian.Uint32(buf[0:4]), + LSN: binary.BigEndian.Uint64(buf[4:12]), + Epoch: binary.BigEndian.Uint64(buf[12:20]), + }, nil +} diff --git a/weed/storage/blockvol/replica_apply.go b/weed/storage/blockvol/replica_apply.go new file mode 100644 index 000000000..81ff53fb7 --- /dev/null +++ b/weed/storage/blockvol/replica_apply.go @@ -0,0 +1,287 @@ +package blockvol + +import ( + "errors" + "fmt" + "log" + "net" + "sync" + "time" +) + +var ( + ErrStaleEpoch = errors.New("blockvol: stale epoch") + ErrDuplicateLSN = errors.New("blockvol: duplicate or out-of-order LSN") +) + +// ReplicaReceiver listens for WAL entries from a primary and applies them +// to the local BlockVol. It runs two listeners: one for the data channel +// (WAL entries) and one for the control channel (barrier requests). +type ReplicaReceiver struct { + vol *BlockVol + barrierTimeout time.Duration + + mu sync.Mutex + receivedLSN uint64 + cond *sync.Cond + + connMu sync.Mutex // protects activeConns + activeConns map[net.Conn]struct{} + + dataListener net.Listener + ctrlListener net.Listener + stopCh chan struct{} + stopped bool + wg sync.WaitGroup +} + +const defaultBarrierTimeout = 5 * time.Second + +// NewReplicaReceiver creates and starts listening on the data and control ports. +func NewReplicaReceiver(vol *BlockVol, dataAddr, ctrlAddr string) (*ReplicaReceiver, error) { + dataLn, err := net.Listen("tcp", dataAddr) + if err != nil { + return nil, fmt.Errorf("replica: listen data %s: %w", dataAddr, err) + } + ctrlLn, err := net.Listen("tcp", ctrlAddr) + if err != nil { + dataLn.Close() + return nil, fmt.Errorf("replica: listen ctrl %s: %w", ctrlAddr, err) + } + + r := &ReplicaReceiver{ + vol: vol, + barrierTimeout: defaultBarrierTimeout, + dataListener: dataLn, + ctrlListener: ctrlLn, + stopCh: make(chan struct{}), + activeConns: make(map[net.Conn]struct{}), + } + r.cond = sync.NewCond(&r.mu) + return r, nil +} + +// Serve starts accept loops for both listeners. Call Stop() to shut down. +func (r *ReplicaReceiver) Serve() { + r.wg.Add(2) + go r.acceptDataLoop() + go r.acceptCtrlLoop() +} + +// Stop shuts down both listeners, closes active connections, and waits for goroutines. +func (r *ReplicaReceiver) Stop() { + r.mu.Lock() + if r.stopped { + r.mu.Unlock() + return + } + r.stopped = true + r.mu.Unlock() + + close(r.stopCh) + r.dataListener.Close() + r.ctrlListener.Close() + + // Close all active connections to unblock ReadFrame calls. + r.connMu.Lock() + for conn := range r.activeConns { + conn.Close() + } + r.connMu.Unlock() + + // Wake any barrier waiters so they can exit (must hold mu for cond). + r.mu.Lock() + r.cond.Broadcast() + r.mu.Unlock() + r.wg.Wait() +} + +func (r *ReplicaReceiver) trackConn(conn net.Conn) { + r.connMu.Lock() + r.activeConns[conn] = struct{}{} + r.connMu.Unlock() +} + +func (r *ReplicaReceiver) untrackConn(conn net.Conn) { + r.connMu.Lock() + delete(r.activeConns, conn) + r.connMu.Unlock() +} + +func (r *ReplicaReceiver) acceptDataLoop() { + defer r.wg.Done() + for { + conn, err := r.dataListener.Accept() + if err != nil { + select { + case <-r.stopCh: + return + default: + log.Printf("replica: data accept error: %v", err) + return + } + } + r.trackConn(conn) + r.wg.Add(1) + go func() { + defer r.wg.Done() + defer r.untrackConn(conn) + r.handleDataConn(conn) + }() + } +} + +func (r *ReplicaReceiver) acceptCtrlLoop() { + defer r.wg.Done() + for { + conn, err := r.ctrlListener.Accept() + if err != nil { + select { + case <-r.stopCh: + return + default: + log.Printf("replica: ctrl accept error: %v", err) + return + } + } + r.trackConn(conn) + r.wg.Add(1) + go func() { + defer r.wg.Done() + defer r.untrackConn(conn) + r.handleControlConn(conn) + }() + } +} + +// handleDataConn reads WAL entry frames and applies them to the local volume. +func (r *ReplicaReceiver) handleDataConn(conn net.Conn) { + defer conn.Close() + for { + select { + case <-r.stopCh: + return + default: + } + + msgType, payload, err := ReadFrame(conn) + if err != nil { + select { + case <-r.stopCh: + default: + log.Printf("replica: data read error: %v", err) + } + return + } + + if msgType != MsgWALEntry { + log.Printf("replica: unexpected data message type 0x%02x", msgType) + continue + } + + if err := r.applyEntry(payload); err != nil { + log.Printf("replica: apply entry error: %v", err) + } + } +} + +// applyEntry decodes and applies a single WAL entry to the local volume. +// The entire apply (LSN check → WAL append → dirty map → receivedLSN update) +// is serialized under mu to prevent TOCTOU races between concurrent entries. +func (r *ReplicaReceiver) applyEntry(payload []byte) error { + entry, err := DecodeWALEntry(payload) + if err != nil { + return fmt.Errorf("decode WAL entry: %w", err) + } + + // Validate epoch: replicas must NOT accept epoch bumps from WAL stream. + // Only the master can change epochs (via SetEpoch in CP3). + localEpoch := r.vol.epoch.Load() + if entry.Epoch != localEpoch { + return fmt.Errorf("%w: entry epoch %d != local %d", ErrStaleEpoch, entry.Epoch, localEpoch) + } + + r.mu.Lock() + defer r.mu.Unlock() + + // Enforce contiguous LSN: only accept the next expected entry. + // This prevents gaps that would let a barrier pass incorrectly. + if entry.LSN <= r.receivedLSN { + log.Printf("replica: skipping duplicate/old LSN %d (received %d)", entry.LSN, r.receivedLSN) + return nil + } + if entry.LSN != r.receivedLSN+1 { + return fmt.Errorf("%w: expected LSN %d, got %d (gap)", ErrDuplicateLSN, r.receivedLSN+1, entry.LSN) + } + + // Append to local WAL (with retry on WAL full). + walOff, err := r.replicaAppendWithRetry(&entry) + if err != nil { + return fmt.Errorf("WAL append: %w", err) + } + + // Update dirty map. + switch entry.Type { + case EntryTypeWrite, EntryTypeTrim: + blocks := entry.Length / r.vol.super.BlockSize + for i := uint32(0); i < blocks; i++ { + r.vol.dirtyMap.Put(entry.LBA+uint64(i), walOff, entry.LSN, r.vol.super.BlockSize) + } + } + + // Update receivedLSN and signal barrier waiters. + r.receivedLSN = entry.LSN + r.cond.Broadcast() + + return nil +} + +// replicaAppendWithRetry appends a WAL entry, retrying on WAL-full by +// triggering the flusher. Caller must hold r.mu. +func (r *ReplicaReceiver) replicaAppendWithRetry(entry *WALEntry) (uint64, error) { + walOff, err := r.vol.wal.Append(entry) + if !errors.Is(err, ErrWALFull) { + return walOff, err + } + + deadline := time.After(r.vol.config.WALFullTimeout) + for errors.Is(err, ErrWALFull) { + select { + case <-r.stopCh: + return 0, fmt.Errorf("replica: stopped during WAL retry") + default: + } + if r.vol.flusher != nil { + r.vol.flusher.NotifyUrgent() + } + // Release mu briefly so barrier waiters can proceed and + // the flusher can make progress (it may need dirty map lock). + r.mu.Unlock() + select { + case <-deadline: + r.mu.Lock() + return 0, fmt.Errorf("replica: WAL full timeout: %w", ErrWALFull) + case <-time.After(1 * time.Millisecond): + } + r.mu.Lock() + walOff, err = r.vol.wal.Append(entry) + } + return walOff, err +} + +// ReceivedLSN returns the highest LSN received and written to the local WAL. +func (r *ReplicaReceiver) ReceivedLSN() uint64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.receivedLSN +} + +// DataAddr returns the data listener's address (useful for tests with :0 ports). +func (r *ReplicaReceiver) DataAddr() string { + return r.dataListener.Addr().String() +} + +// CtrlAddr returns the control listener's address. +func (r *ReplicaReceiver) CtrlAddr() string { + return r.ctrlListener.Addr().String() +} diff --git a/weed/storage/blockvol/replica_barrier.go b/weed/storage/blockvol/replica_barrier.go new file mode 100644 index 000000000..76e3b2830 --- /dev/null +++ b/weed/storage/blockvol/replica_barrier.go @@ -0,0 +1,101 @@ +package blockvol + +import ( + "log" + "net" + "time" +) + +// handleControlConn reads barrier requests from the control channel and +// responds with barrier status after ensuring durability. +func (r *ReplicaReceiver) handleControlConn(conn net.Conn) { + defer conn.Close() + for { + select { + case <-r.stopCh: + return + default: + } + + msgType, payload, err := ReadFrame(conn) + if err != nil { + select { + case <-r.stopCh: + default: + log.Printf("replica: ctrl read error: %v", err) + } + return + } + + if msgType != MsgBarrierReq { + log.Printf("replica: unexpected ctrl message type 0x%02x", msgType) + continue + } + + req, err := DecodeBarrierRequest(payload) + if err != nil { + log.Printf("replica: decode barrier request: %v", err) + continue + } + + resp := r.handleBarrier(req) + + respPayload := []byte{resp.Status} + if err := WriteFrame(conn, MsgBarrierResp, respPayload); err != nil { + log.Printf("replica: write barrier response: %v", err) + return + } + } +} + +// handleBarrier waits until all WAL entries up to req.LSN have been received, +// then fsyncs the WAL to ensure durability. +func (r *ReplicaReceiver) handleBarrier(req BarrierRequest) BarrierResponse { + // Fail fast on epoch mismatch. + localEpoch := r.vol.epoch.Load() + if req.Epoch != localEpoch { + return BarrierResponse{Status: BarrierEpochMismatch} + } + + // Use a timer goroutine to wake us on deadline. + timer := time.NewTimer(r.barrierTimeout) + defer timer.Stop() + timedOut := make(chan struct{}) + go func() { + select { + case <-timer.C: + close(timedOut) + // Broadcast to wake any cond.Wait blocked in the loop below. + r.mu.Lock() + r.cond.Broadcast() + r.mu.Unlock() + case <-r.stopCh: + } + }() + + r.mu.Lock() + for r.receivedLSN < req.LSN { + // Check if timed out or stopped. + select { + case <-timedOut: + r.mu.Unlock() + return BarrierResponse{Status: BarrierTimeout} + case <-r.stopCh: + r.mu.Unlock() + return BarrierResponse{Status: BarrierTimeout} + default: + } + + // Block on cond.Wait — woken by applyEntry or timeout goroutine. + r.cond.Wait() + } + r.mu.Unlock() + + // fsync WAL AFTER confirming all entries received. + if err := r.vol.fd.Sync(); err != nil { + log.Printf("replica: barrier fsync error: %v", err) + return BarrierResponse{Status: BarrierFsyncFailed} + } + + return BarrierResponse{Status: BarrierOK} +} diff --git a/weed/storage/blockvol/wal_shipper.go b/weed/storage/blockvol/wal_shipper.go new file mode 100644 index 000000000..5ca9348d2 --- /dev/null +++ b/weed/storage/blockvol/wal_shipper.go @@ -0,0 +1,197 @@ +package blockvol + +import ( + "errors" + "fmt" + "log" + "net" + "sync" + "sync/atomic" + "time" +) + +var ( + ErrReplicaDegraded = errors.New("blockvol: replica degraded") + ErrShipperStopped = errors.New("blockvol: shipper stopped") +) + +const barrierTimeout = 5 * time.Second + +// WALShipper streams WAL entries from the primary to a replica over TCP. +// Fire-and-forget: no per-entry ACK. Barriers provide durability confirmation. +type WALShipper struct { + dataAddr string + controlAddr string + epochFn func() uint64 + + mu sync.Mutex // protects dataConn + dataConn net.Conn + + ctrlMu sync.Mutex // protects ctrlConn + ctrlConn net.Conn + + shippedLSN atomic.Uint64 + degraded atomic.Bool + stopped atomic.Bool +} + +// NewWALShipper creates a WAL shipper. Connections are established lazily on +// first Ship/Barrier call. epochFn returns the current epoch for validation. +func NewWALShipper(dataAddr, controlAddr string, epochFn func() uint64) *WALShipper { + return &WALShipper{ + dataAddr: dataAddr, + controlAddr: controlAddr, + epochFn: epochFn, + } +} + +// Ship sends a WAL entry to the replica over the data channel. +// On write error, the shipper enters degraded mode permanently. +func (s *WALShipper) Ship(entry *WALEntry) error { + if s.stopped.Load() || s.degraded.Load() { + return nil + } + + // Validate epoch: drop stale entries. + if entry.Epoch != s.epochFn() { + log.Printf("wal_shipper: dropping entry LSN=%d with stale epoch %d (current %d)", + entry.LSN, entry.Epoch, s.epochFn()) + return nil + } + + encoded, err := entry.Encode() + if err != nil { + return fmt.Errorf("wal_shipper: encode entry: %w", err) + } + + s.mu.Lock() + defer s.mu.Unlock() + + if err := s.ensureDataConn(); err != nil { + s.markDegraded() + return nil + } + + if err := WriteFrame(s.dataConn, MsgWALEntry, encoded); err != nil { + s.markDegraded() + return nil + } + + s.shippedLSN.Store(entry.LSN) + return nil +} + +// Barrier sends a barrier request on the control channel and waits for the +// replica to confirm durability up to lsnMax. Returns ErrReplicaDegraded if +// the shipper is in degraded mode. +func (s *WALShipper) Barrier(lsnMax uint64) error { + if s.stopped.Load() { + return ErrShipperStopped + } + if s.degraded.Load() { + return ErrReplicaDegraded + } + + req := EncodeBarrierRequest(BarrierRequest{ + LSN: lsnMax, + Epoch: s.epochFn(), + }) + + s.ctrlMu.Lock() + defer s.ctrlMu.Unlock() + + if err := s.ensureCtrlConn(); err != nil { + s.markDegraded() + return ErrReplicaDegraded + } + + s.ctrlConn.SetDeadline(time.Now().Add(barrierTimeout)) + + if err := WriteFrame(s.ctrlConn, MsgBarrierReq, req); err != nil { + s.markDegraded() + return ErrReplicaDegraded + } + + msgType, payload, err := ReadFrame(s.ctrlConn) + if err != nil { + s.markDegraded() + return ErrReplicaDegraded + } + + if msgType != MsgBarrierResp || len(payload) < 1 { + s.markDegraded() + return ErrReplicaDegraded + } + + switch payload[0] { + case BarrierOK: + return nil + case BarrierEpochMismatch: + return fmt.Errorf("wal_shipper: barrier epoch mismatch") + case BarrierTimeout: + return fmt.Errorf("wal_shipper: barrier timeout on replica") + case BarrierFsyncFailed: + return fmt.Errorf("wal_shipper: barrier fsync failed on replica") + default: + return fmt.Errorf("wal_shipper: unknown barrier status %d", payload[0]) + } +} + +// ShippedLSN returns the highest LSN successfully sent to the replica. +func (s *WALShipper) ShippedLSN() uint64 { + return s.shippedLSN.Load() +} + +// IsDegraded returns true if the replica is unreachable. +func (s *WALShipper) IsDegraded() bool { + return s.degraded.Load() +} + +// Stop shuts down the shipper and closes connections. +func (s *WALShipper) Stop() { + if s.stopped.Swap(true) { + return + } + s.mu.Lock() + if s.dataConn != nil { + s.dataConn.Close() + s.dataConn = nil + } + s.mu.Unlock() + + s.ctrlMu.Lock() + if s.ctrlConn != nil { + s.ctrlConn.Close() + s.ctrlConn = nil + } + s.ctrlMu.Unlock() +} + +func (s *WALShipper) ensureDataConn() error { + if s.dataConn != nil { + return nil + } + conn, err := net.DialTimeout("tcp", s.dataAddr, 3*time.Second) + if err != nil { + return err + } + s.dataConn = conn + return nil +} + +func (s *WALShipper) ensureCtrlConn() error { + if s.ctrlConn != nil { + return nil + } + conn, err := net.DialTimeout("tcp", s.controlAddr, 3*time.Second) + if err != nil { + return err + } + s.ctrlConn = conn + return nil +} + +func (s *WALShipper) markDegraded() { + s.degraded.Store(true) + log.Printf("wal_shipper: replica degraded (data=%s, ctrl=%s)", s.dataAddr, s.controlAddr) +}