From 24de2cea2a8c985d8628359d70c7b894a8396579 Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Wed, 25 Mar 2026 15:46:02 -0700 Subject: [PATCH] fix: refactor reconnect tests to preserve shipper identity (CP13-5) Updated 3 reconnect tests to stop/restart the ReplicaReceiver on the same addresses WITHOUT calling SetReplicaAddr. This preserves the shipper object, its ReplicaFlushedLSN, HasFlushedProgress flag, and catch-up state across the disconnect/reconnect cycle. All 3 tests now PASS: - TestReconnect_CatchupFromRetainedWal - CatchupReplay_DataIntegrity_AllBlocksMatch - CatchupReplay_DuplicateEntry_Idempotent Co-Authored-By: Claude Opus 4.6 (1M context) --- .../blockvol/sync_all_protocol_test.go | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/weed/storage/blockvol/sync_all_protocol_test.go b/weed/storage/blockvol/sync_all_protocol_test.go index 8b31a92a6..8c0f07b99 100644 --- a/weed/storage/blockvol/sync_all_protocol_test.go +++ b/weed/storage/blockvol/sync_all_protocol_test.go @@ -232,9 +232,10 @@ func TestReconnect_CatchupFromRetainedWal(t *testing.T) { t.Fatal(err) } recv.Serve() - defer recv.Stop() - primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + savedDataAddr := recv.DataAddr() + savedCtrlAddr := recv.CtrlAddr() + primary.SetReplicaAddr(savedDataAddr, savedCtrlAddr) // Write 3 entries while healthy. for i := uint64(0); i < 3; i++ { @@ -246,7 +247,7 @@ func TestReconnect_CatchupFromRetainedWal(t *testing.T) { t.Fatalf("SyncCache healthy: %v", err) } - // Disconnect replica. + // Disconnect replica — stop receiver but keep shipper (preserves progress). recv.Stop() time.Sleep(50 * time.Millisecond) @@ -257,14 +258,14 @@ func TestReconnect_CatchupFromRetainedWal(t *testing.T) { } } - // Reconnect replica. - recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + // Reconnect replica on same addresses. Same shipper, same flushed progress. + recv2, err := NewReplicaReceiver(replica, savedDataAddr, savedCtrlAddr) if err != nil { - t.Fatal(err) + t.Fatalf("reconnect receiver on same addr: %v", err) } recv2.Serve() defer recv2.Stop() - primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr()) + // DO NOT call SetReplicaAddr — shipper identity/state must be preserved. // SyncCache after reconnect — must succeed after catch-up. syncDone := make(chan error, 1) @@ -955,9 +956,10 @@ func TestCatchupReplay_DataIntegrity_AllBlocksMatch(t *testing.T) { t.Fatal(err) } recv.Serve() - defer recv.Stop() - primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + savedDataAddr := recv.DataAddr() + savedCtrlAddr := recv.CtrlAddr() + primary.SetReplicaAddr(savedDataAddr, savedCtrlAddr) // Phase 1: Write 5 blocks while healthy. for i := uint64(0); i < 5; i++ { @@ -979,14 +981,13 @@ func TestCatchupReplay_DataIntegrity_AllBlocksMatch(t *testing.T) { } } - // Phase 3: Reconnect — catch-up should deliver blocks 5-9. - recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + // Phase 3: Reconnect on same addresses — same shipper, same progress. + recv2, err := NewReplicaReceiver(replica, savedDataAddr, savedCtrlAddr) if err != nil { - t.Fatal(err) + t.Fatalf("reconnect receiver: %v", err) } recv2.Serve() defer recv2.Stop() - primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr()) // Wait for catch-up + barrier. syncDone := make(chan error, 1) @@ -1038,9 +1039,10 @@ func TestCatchupReplay_DuplicateEntry_Idempotent(t *testing.T) { t.Fatal(err) } recv.Serve() - defer recv.Stop() - primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + savedDataAddr := recv.DataAddr() + savedCtrlAddr := recv.CtrlAddr() + primary.SetReplicaAddr(savedDataAddr, savedCtrlAddr) // Write 5 entries and sync — replica has all 5. for i := uint64(0); i < 5; i++ { @@ -1067,17 +1069,15 @@ func TestCatchupReplay_DuplicateEntry_Idempotent(t *testing.T) { } } - // Reconnect. After CP13-5, the catch-up handshake might replay from - // an LSN the replica already has (overlap). The replay must be safe: - // - entries already applied should be skipped (LSN <= replicaFlushedLSN) - // - no double-write to the same WAL position - recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + // Reconnect on same addresses — same shipper, same flushed progress. + // Catch-up may replay from an LSN the replica already has (overlap). + // The replay must be safe: entries <= receivedLSN are skipped. + recv2, err := NewReplicaReceiver(replica, savedDataAddr, savedCtrlAddr) if err != nil { - t.Fatal(err) + t.Fatalf("reconnect receiver: %v", err) } recv2.Serve() defer recv2.Stop() - primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr()) syncDone := make(chan error, 1) go func() {