Browse Source

fix: refactor reconnect tests to preserve shipper identity (CP13-5)

Updated 3 reconnect tests to stop/restart the ReplicaReceiver on
the same addresses WITHOUT calling SetReplicaAddr. This preserves
the shipper object, its ReplicaFlushedLSN, HasFlushedProgress flag,
and catch-up state across the disconnect/reconnect cycle.

All 3 tests now PASS:
- TestReconnect_CatchupFromRetainedWal
- CatchupReplay_DataIntegrity_AllBlocksMatch
- CatchupReplay_DuplicateEntry_Idempotent

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
Ping Qiu 6 days ago
parent
commit
24de2cea2a
  1. 44
      weed/storage/blockvol/sync_all_protocol_test.go

44
weed/storage/blockvol/sync_all_protocol_test.go

@ -232,9 +232,10 @@ func TestReconnect_CatchupFromRetainedWal(t *testing.T) {
t.Fatal(err)
}
recv.Serve()
defer recv.Stop()
primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr())
savedDataAddr := recv.DataAddr()
savedCtrlAddr := recv.CtrlAddr()
primary.SetReplicaAddr(savedDataAddr, savedCtrlAddr)
// Write 3 entries while healthy.
for i := uint64(0); i < 3; i++ {
@ -246,7 +247,7 @@ func TestReconnect_CatchupFromRetainedWal(t *testing.T) {
t.Fatalf("SyncCache healthy: %v", err)
}
// Disconnect replica.
// Disconnect replica — stop receiver but keep shipper (preserves progress).
recv.Stop()
time.Sleep(50 * time.Millisecond)
@ -257,14 +258,14 @@ func TestReconnect_CatchupFromRetainedWal(t *testing.T) {
}
}
// Reconnect replica.
recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
// Reconnect replica on same addresses. Same shipper, same flushed progress.
recv2, err := NewReplicaReceiver(replica, savedDataAddr, savedCtrlAddr)
if err != nil {
t.Fatal(err)
t.Fatalf("reconnect receiver on same addr: %v", err)
}
recv2.Serve()
defer recv2.Stop()
primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr())
// DO NOT call SetReplicaAddr — shipper identity/state must be preserved.
// SyncCache after reconnect — must succeed after catch-up.
syncDone := make(chan error, 1)
@ -955,9 +956,10 @@ func TestCatchupReplay_DataIntegrity_AllBlocksMatch(t *testing.T) {
t.Fatal(err)
}
recv.Serve()
defer recv.Stop()
primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr())
savedDataAddr := recv.DataAddr()
savedCtrlAddr := recv.CtrlAddr()
primary.SetReplicaAddr(savedDataAddr, savedCtrlAddr)
// Phase 1: Write 5 blocks while healthy.
for i := uint64(0); i < 5; i++ {
@ -979,14 +981,13 @@ func TestCatchupReplay_DataIntegrity_AllBlocksMatch(t *testing.T) {
}
}
// Phase 3: Reconnect — catch-up should deliver blocks 5-9.
recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
// Phase 3: Reconnect on same addresses — same shipper, same progress.
recv2, err := NewReplicaReceiver(replica, savedDataAddr, savedCtrlAddr)
if err != nil {
t.Fatal(err)
t.Fatalf("reconnect receiver: %v", err)
}
recv2.Serve()
defer recv2.Stop()
primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr())
// Wait for catch-up + barrier.
syncDone := make(chan error, 1)
@ -1038,9 +1039,10 @@ func TestCatchupReplay_DuplicateEntry_Idempotent(t *testing.T) {
t.Fatal(err)
}
recv.Serve()
defer recv.Stop()
primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr())
savedDataAddr := recv.DataAddr()
savedCtrlAddr := recv.CtrlAddr()
primary.SetReplicaAddr(savedDataAddr, savedCtrlAddr)
// Write 5 entries and sync — replica has all 5.
for i := uint64(0); i < 5; i++ {
@ -1067,17 +1069,15 @@ func TestCatchupReplay_DuplicateEntry_Idempotent(t *testing.T) {
}
}
// Reconnect. After CP13-5, the catch-up handshake might replay from
// an LSN the replica already has (overlap). The replay must be safe:
// - entries already applied should be skipped (LSN <= replicaFlushedLSN)
// - no double-write to the same WAL position
recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
// Reconnect on same addresses — same shipper, same flushed progress.
// Catch-up may replay from an LSN the replica already has (overlap).
// The replay must be safe: entries <= receivedLSN are skipped.
recv2, err := NewReplicaReceiver(replica, savedDataAddr, savedCtrlAddr)
if err != nil {
t.Fatal(err)
t.Fatalf("reconnect receiver: %v", err)
}
recv2.Serve()
defer recv2.Stop()
primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr())
syncDone := make(chan error, 1)
go func() {

Loading…
Cancel
Save