diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index ef042bc57..c4fade920 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -951,6 +951,15 @@ func (v *BlockVol) ScanWALEntries(fromLSN uint64, fn func(*WALEntry) error) erro return v.wal.ScanFrom(v.fd, v.super.WALOffset, v.super.WALCheckpointLSN, fromLSN, fn) } +// ForceFlush triggers a synchronous flush cycle. This advances the checkpoint +// and WAL tail. For test use — in production, the flusher runs automatically. +func (v *BlockVol) ForceFlush() error { + if v.flusher == nil { + return fmt.Errorf("flusher not initialized") + } + return v.flusher.FlushOnce() +} + // ReplicaReceiverAddrInfo holds canonical addresses from the replica receiver. type ReplicaReceiverAddrInfo struct { DataAddr string diff --git a/weed/storage/blockvol/testrunner/scenarios/internal/robust-gap-failover.yaml b/weed/storage/blockvol/testrunner/scenarios/internal/robust-gap-failover.yaml new file mode 100644 index 000000000..511cd6410 --- /dev/null +++ b/weed/storage/blockvol/testrunner/scenarios/internal/robust-gap-failover.yaml @@ -0,0 +1,291 @@ +name: robust-gap-failover +timeout: 10m + +# Robust dimension: Does data written during replica outage survive failover? +# +# V1 (pre-Phase 13): shipper stays degraded after replica restart. +# Gap data lives only on primary. Failover loses it. +# +# V1.5 (post-Phase 13): background reconnect catches up the gap. +# Gap data is replicated. Failover preserves it. +# +# Flow: +# 1. Write data A (replicated) +# 2. Kill replica +# 3. Write data B (gap — only on primary) +# 4. Restart replica, wait for master "healthy" +# 5. Wait extra time for catch-up to complete (if it will) +# 6. Kill primary (force failover) +# 7. Promote replica to new primary +# 8. Read data A (should always be there) +# 9. Read data B (V1.5: present, V1: possibly missing) + +env: + master_url: "http://192.168.1.184:9433" + volume_name: robust-gap + vol_size: "1073741824" + +topology: + nodes: + m01: + host: 192.168.1.181 + user: testdev + key: "/opt/work/testdev_key" + m02: + host: 192.168.1.184 + user: testdev + key: "/opt/work/testdev_key" + +phases: + - name: cluster-start + actions: + - action: exec + node: m02 + cmd: "fuser -k 9433/tcp 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-gap-master /tmp/sw-gap-vs1 && mkdir -p /tmp/sw-gap-master /tmp/sw-gap-vs1/blocks" + root: "true" + ignore_error: true + - action: exec + node: m01 + cmd: "fuser -k 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-gap-vs2 && mkdir -p /tmp/sw-gap-vs2/blocks" + root: "true" + ignore_error: true + + - action: start_weed_master + node: m02 + port: "9433" + dir: /tmp/sw-gap-master + save_as: master_pid + + - action: sleep + duration: 3s + + # Primary on m02, replica on m01. + - action: start_weed_volume + node: m02 + port: "18480" + master: "localhost:9433" + dir: /tmp/sw-gap-vs1 + extra_args: "-block.dir=/tmp/sw-gap-vs1/blocks -block.listen=:3295 -ip=192.168.1.184" + save_as: vs1_pid + + - action: start_weed_volume + node: m01 + port: "18480" + master: "192.168.1.184:9433" + dir: /tmp/sw-gap-vs2 + extra_args: "-block.dir=/tmp/sw-gap-vs2/blocks -block.listen=:3295 -ip=192.168.1.181" + save_as: vs2_pid + + - action: sleep + duration: 3s + + - action: wait_cluster_ready + node: m02 + master_url: "{{ master_url }}" + + - action: wait_block_servers + count: "2" + + - name: create-volume + actions: + - action: create_block_volume + name: "{{ volume_name }}" + size_bytes: "{{ vol_size }}" + replica_factor: "2" + durability_mode: "best_effort" + + - action: wait_volume_healthy + name: "{{ volume_name }}" + timeout: 60s + + - action: validate_replication + volume_name: "{{ volume_name }}" + expected_rf: "2" + expected_durability: "best_effort" + require_not_degraded: "true" + require_cross_machine: "true" + + - name: write-data-A + actions: + - action: lookup_block_volume + name: "{{ volume_name }}" + save_as: vol + + # Connect to primary on m02 via iSCSI from m01. + - action: iscsi_login_direct + node: m01 + host: "{{ vol_iscsi_host }}" + port: "{{ vol_iscsi_port }}" + iqn: "{{ vol_iqn }}" + save_as: device + + # Write data A at offset 0 (5MB). Both primary and replica get this. + - action: exec + node: m01 + cmd: "dd if=/dev/urandom of={{ device }} bs=1M count=5 oflag=direct 2>/dev/null && sync && md5sum {{ device }} | head -c 32" + root: "true" + save_as: md5_full + + - action: print + msg: "Data A written (full device md5): {{ md5_full }}" + + # Brief pause for replication to complete. + - action: sleep + duration: 3s + + - name: kill-replica + actions: + - action: print + msg: "=== Killing replica (m01 VS) ===" + + - action: exec + node: m01 + cmd: "kill -9 {{ vs2_pid }}" + root: "true" + ignore_error: true + + - action: sleep + duration: 2s + + - name: write-data-B-during-outage + actions: + - action: print + msg: "=== Writing data B during replica outage (gap data) ===" + + # Write data B at offset 5MB (3MB). Only primary gets this. + # Use timeout to avoid hanging if barrier blocks. + - action: exec + node: m01 + cmd: "timeout 15 dd if=/dev/urandom of={{ device }} bs=1M count=3 seek=5 oflag=direct 2>/dev/null; timeout 5 sync; dd if={{ device }} bs=1M count=8 iflag=direct 2>/dev/null | md5sum | head -c 32" + root: "true" + save_as: md5_after_gap + timeout: 30s + ignore_error: true + + - action: print + msg: "Data A+B md5 (including gap): {{ md5_after_gap }}" + + - name: restart-replica-and-wait + actions: + - action: print + msg: "=== Restarting replica, waiting for catch-up ===" + + - action: start_weed_volume + node: m01 + port: "18480" + master: "192.168.1.184:9433" + dir: /tmp/sw-gap-vs2 + extra_args: "-block.dir=/tmp/sw-gap-vs2/blocks -block.listen=:3295 -ip=192.168.1.181" + save_as: vs2_pid_new + + # Wait for master to report healthy. + - action: wait_volume_healthy + name: "{{ volume_name }}" + timeout: 60s + + # Extra wait: give V1.5 background reconnect time to catch up. + # V1's shipper won't reconnect regardless of wait time. + - action: print + msg: "=== Waiting 15s for shipper catch-up ===" + + - action: sleep + duration: 15s + + - name: failover-to-replica + actions: + - action: print + msg: "=== Killing primary (m02 VS) to force failover ===" + + # Disconnect iSCSI before killing primary. + - action: iscsi_cleanup + node: m01 + ignore_error: true + + - action: exec + node: m02 + cmd: "kill -9 {{ vs1_pid }}" + root: "true" + ignore_error: true + + - action: sleep + duration: 5s + + # Promote replica (m01) to primary. + - action: block_promote + name: "{{ volume_name }}" + reason: "gap-failover-test" + ignore_error: true + + # Wait for new primary. + - action: wait_block_primary + name: "{{ volume_name }}" + not: "192.168.1.184:18480" + timeout: 30s + ignore_error: true + + - action: sleep + duration: 3s + + - name: verify-on-new-primary + actions: + - action: print + msg: "=== Verifying data on new primary (former replica) ===" + + # Re-lookup volume to get new primary's iSCSI addr. + - action: lookup_block_volume + name: "{{ volume_name }}" + save_as: vol2 + + - action: iscsi_login_direct + node: m01 + host: "{{ vol2_iscsi_host }}" + port: "{{ vol2_iscsi_port }}" + iqn: "{{ vol2_iqn }}" + save_as: device2 + + # Read 8MB (data A + data B) and compute md5. + - action: exec + node: m01 + cmd: "dd if={{ device2 }} bs=1M count=8 iflag=direct 2>/dev/null | md5sum | head -c 32" + root: "true" + save_as: md5_after_failover + + - action: print + msg: "After failover (8MB md5): {{ md5_after_failover }}" + - action: print + msg: "Expected (before failover): {{ md5_after_gap }}" + + # THE KEY CHECK: does gap data survive failover? + - action: assert_equal + actual: "{{ md5_after_failover }}" + expected: "{{ md5_after_gap }}" + + - name: results + actions: + - action: print + msg: "=== Gap Failover Test Results ===" + - action: print + msg: "Data A+B md5 (primary): {{ md5_after_gap }}" + - action: print + msg: "Data A+B md5 (after failover): {{ md5_after_failover }}" + - action: print + msg: "Gap data survived failover: YES" + + - name: cleanup + always: true + actions: + - action: iscsi_cleanup + node: m01 + ignore_error: true + - action: stop_weed + node: m01 + pid: "{{ vs2_pid_new }}" + ignore_error: true + - action: stop_weed + node: m02 + pid: "{{ vs1_pid }}" + ignore_error: true + - action: stop_weed + node: m02 + pid: "{{ master_pid }}" + ignore_error: true diff --git a/weed/storage/blockvol/testrunner/scenarios/internal/robust-partition-catchup.yaml b/weed/storage/blockvol/testrunner/scenarios/internal/robust-partition-catchup.yaml new file mode 100644 index 000000000..d7ae1579a --- /dev/null +++ b/weed/storage/blockvol/testrunner/scenarios/internal/robust-partition-catchup.yaml @@ -0,0 +1,271 @@ +name: robust-partition-catchup +timeout: 5m + +# Robust dimension: Does data written during network partition +# reach the replica after partition heals? +# +# V1.5 (post-Phase 13): background reconnect catches up WAL gap. +# V1 (pre-Phase 13): shipper stays degraded, gap data never shipped. +# +# Uses best_effort so writes succeed during partition. +# After partition heals, waits for catch-up, then verifies via +# failover that the replica has the gap data. + +env: + master_url: "http://192.168.1.184:9433" + volume_name: robust-part + vol_size: "1073741824" + +topology: + nodes: + m01: + host: 192.168.1.181 + user: testdev + key: "/opt/work/testdev_key" + m02: + host: 192.168.1.184 + user: testdev + key: "/opt/work/testdev_key" + +phases: + - name: cluster-start + actions: + - action: exec + node: m02 + cmd: "fuser -k 9433/tcp 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-part-master /tmp/sw-part-vs1 && mkdir -p /tmp/sw-part-master /tmp/sw-part-vs1/blocks" + root: "true" + ignore_error: true + - action: exec + node: m01 + cmd: "fuser -k 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-part-vs2 && mkdir -p /tmp/sw-part-vs2/blocks" + root: "true" + ignore_error: true + + - action: start_weed_master + node: m02 + port: "9433" + dir: /tmp/sw-part-master + save_as: master_pid + + - action: sleep + duration: 3s + + - action: start_weed_volume + node: m02 + port: "18480" + master: "localhost:9433" + dir: /tmp/sw-part-vs1 + extra_args: "-block.dir=/tmp/sw-part-vs1/blocks -block.listen=:3295 -ip=192.168.1.184" + save_as: vs1_pid + + - action: start_weed_volume + node: m01 + port: "18480" + master: "192.168.1.184:9433" + dir: /tmp/sw-part-vs2 + extra_args: "-block.dir=/tmp/sw-part-vs2/blocks -block.listen=:3295 -ip=192.168.1.181" + save_as: vs2_pid + + - action: sleep + duration: 3s + + - action: wait_cluster_ready + node: m02 + master_url: "{{ master_url }}" + + - action: wait_block_servers + count: "2" + + - name: create-volume + actions: + - action: create_block_volume + name: "{{ volume_name }}" + size_bytes: "{{ vol_size }}" + replica_factor: "2" + durability_mode: "best_effort" + + - action: wait_volume_healthy + name: "{{ volume_name }}" + timeout: 60s + + - name: write-and-connect + actions: + - action: lookup_block_volume + name: "{{ volume_name }}" + save_as: vol + + - action: iscsi_login_direct + node: m01 + host: "{{ vol_iscsi_host }}" + port: "{{ vol_iscsi_port }}" + iqn: "{{ vol_iqn }}" + save_as: device + + # Write data A (replicated to both). + - action: exec + node: m01 + cmd: "dd if=/dev/urandom of={{ device }} bs=1M count=5 oflag=direct 2>/dev/null && sync" + root: "true" + + - action: sleep + duration: 3s + + # Snapshot: md5 of first 5MB. + - action: exec + node: m01 + cmd: "dd if={{ device }} bs=1M count=5 iflag=direct 2>/dev/null | md5sum | head -c 32" + root: "true" + save_as: md5_data_a + + - action: print + msg: "Data A (5MB): {{ md5_data_a }}" + + - name: partition-and-write + actions: + - action: print + msg: "=== Injecting network partition (m02 → m01 repl ports) ===" + + # Block replication traffic from primary (m02) to replica (m01). + # Block both data port and volume port to prevent WAL shipping. + - action: inject_partition + node: m02 + target_ip: "192.168.1.181" + ports: "3295,18480" + + - action: sleep + duration: 3s + + - action: print + msg: "=== Writing data B during partition (gap data) ===" + + # Write data B at offset 5MB. Primary accepts (best_effort), + # replica doesn't receive (partitioned). + - action: exec + node: m01 + cmd: "timeout 15 dd if=/dev/urandom of={{ device }} bs=1M count=3 seek=5 oflag=direct 2>/dev/null && timeout 5 sync; echo done" + root: "true" + timeout: 25s + + # Snapshot: md5 of all 8MB (A + B). + - action: exec + node: m01 + cmd: "dd if={{ device }} bs=1M count=8 iflag=direct 2>/dev/null | md5sum | head -c 32" + root: "true" + save_as: md5_data_ab + + - action: print + msg: "Data A+B (8MB) on primary: {{ md5_data_ab }}" + + - name: heal-and-wait + actions: + - action: print + msg: "=== Healing partition, waiting for catch-up ===" + + - action: clear_fault + node: m02 + type: partition + + # Wait for V1.5 background reconnect (5s interval) + catch-up. + # V1 shipper stays degraded — this wait won't help it. + - action: sleep + duration: 20s + + - action: print + msg: "=== 20s wait complete. Checking replication state ===" + + - action: validate_replication + volume_name: "{{ volume_name }}" + require_not_degraded: "false" + ignore_error: true + + - name: failover-and-verify + actions: + - action: print + msg: "=== Disconnecting iSCSI, killing primary, promoting replica ===" + + - action: iscsi_cleanup + node: m01 + ignore_error: true + + - action: exec + node: m02 + cmd: "kill -9 {{ vs1_pid }}" + root: "true" + ignore_error: true + + - action: sleep + duration: 5s + + # The replica (m01) should auto-promote or we force it. + - action: block_promote + name: "{{ volume_name }}" + reason: "partition-catchup-test" + ignore_error: true + + - action: sleep + duration: 5s + + # Re-lookup and connect to new primary. + - action: lookup_block_volume + name: "{{ volume_name }}" + save_as: vol2 + ignore_error: true + + - action: iscsi_login_direct + node: m01 + host: "{{ vol2_iscsi_host }}" + port: "{{ vol2_iscsi_port }}" + iqn: "{{ vol2_iqn }}" + save_as: device2 + ignore_error: true + + # Read 8MB from new primary (former replica). + - action: exec + node: m01 + cmd: "dd if={{ device2 }} bs=1M count=8 iflag=direct 2>/dev/null | md5sum | head -c 32" + root: "true" + save_as: md5_after_failover + ignore_error: true + + - action: print + msg: "Data on former replica (8MB): {{ md5_after_failover }}" + - action: print + msg: "Expected (primary had): {{ md5_data_ab }}" + + - name: results + actions: + - action: print + msg: "=== Partition Catch-up Test ===" + - action: print + msg: "Data A (pre-partition): {{ md5_data_a }}" + - action: print + msg: "Data A+B (post-partition): {{ md5_data_ab }}" + - action: print + msg: "After failover: {{ md5_after_failover }}" + + # If md5_after_failover == md5_data_ab: gap data survived (V1.5 caught up) + # If md5_after_failover == md5_data_a: gap data lost (V1 never shipped it) + # If md5_after_failover != either: something else went wrong + + - name: cleanup + always: true + actions: + - action: clear_fault + node: m02 + type: partition + ignore_error: true + - action: iscsi_cleanup + node: m01 + ignore_error: true + - action: stop_weed + node: m01 + pid: "{{ vs2_pid }}" + ignore_error: true + - action: stop_weed + node: m02 + pid: "{{ vs1_pid }}" + ignore_error: true + - action: stop_weed + node: m02 + pid: "{{ master_pid }}" + ignore_error: true diff --git a/weed/storage/blockvol/testrunner/scenarios/internal/robust-reconnect-catchup.yaml b/weed/storage/blockvol/testrunner/scenarios/internal/robust-reconnect-catchup.yaml new file mode 100644 index 000000000..59312bb73 --- /dev/null +++ b/weed/storage/blockvol/testrunner/scenarios/internal/robust-reconnect-catchup.yaml @@ -0,0 +1,250 @@ +name: robust-reconnect-catchup +timeout: 10m + +# Robust dimension: Does the system self-heal after brief replica outage? +# +# V1 (pre-Phase 13): shipper goes degraded permanently, no background +# reconnect. Recovery requires master-driven full rebuild. +# +# V1.5 (post-Phase 13): background reconnect goroutine detects replica +# is back within 5s, WAL catch-up replays the gap. +# +# This scenario writes data DURING the outage to create a WAL gap, +# then measures whether the system self-heals and how. + +env: + master_url: "http://192.168.1.184:9433" + volume_name: robust-reconn + vol_size: "1073741824" + +topology: + nodes: + m01: + host: 192.168.1.181 + user: testdev + key: "/opt/work/testdev_key" + m02: + host: 192.168.1.184 + user: testdev + key: "/opt/work/testdev_key" + +phases: + - name: cluster-start + actions: + - action: exec + node: m02 + cmd: "fuser -k 9433/tcp 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-reconn-master /tmp/sw-reconn-vs1 && mkdir -p /tmp/sw-reconn-master /tmp/sw-reconn-vs1/blocks" + root: "true" + ignore_error: true + - action: exec + node: m01 + cmd: "fuser -k 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-reconn-vs2 && mkdir -p /tmp/sw-reconn-vs2/blocks" + root: "true" + ignore_error: true + + - action: start_weed_master + node: m02 + port: "9433" + dir: /tmp/sw-reconn-master + save_as: master_pid + + - action: sleep + duration: 3s + + - action: start_weed_volume + node: m02 + port: "18480" + master: "localhost:9433" + dir: /tmp/sw-reconn-vs1 + extra_args: "-block.dir=/tmp/sw-reconn-vs1/blocks -block.listen=:3295 -ip=192.168.1.184" + save_as: vs1_pid + + - action: start_weed_volume + node: m01 + port: "18480" + master: "192.168.1.184:9433" + dir: /tmp/sw-reconn-vs2 + extra_args: "-block.dir=/tmp/sw-reconn-vs2/blocks -block.listen=:3295 -ip=192.168.1.181" + save_as: vs2_pid + + - action: sleep + duration: 3s + + - action: wait_cluster_ready + node: m02 + master_url: "{{ master_url }}" + + - action: wait_block_servers + count: "2" + + - name: create-volume + actions: + - action: create_block_volume + name: "{{ volume_name }}" + size_bytes: "{{ vol_size }}" + replica_factor: "2" + durability_mode: "sync_all" + + - action: wait_volume_healthy + name: "{{ volume_name }}" + timeout: 60s + + - action: validate_replication + volume_name: "{{ volume_name }}" + expected_rf: "2" + expected_durability: "sync_all" + require_not_degraded: "true" + require_cross_machine: "true" + + - name: connect-and-write + actions: + - action: lookup_block_volume + name: "{{ volume_name }}" + save_as: vol + + - action: iscsi_login_direct + node: m01 + host: "{{ vol_iscsi_host }}" + port: "{{ vol_iscsi_port }}" + iqn: "{{ vol_iqn }}" + save_as: device + + # Write initial data to establish replication. + - action: fio_json + node: m01 + device: "{{ device }}" + rw: randwrite + bs: 4k + iodepth: "16" + runtime: "10" + time_based: "true" + name: initial-write + save_as: fio_initial + + - action: fio_parse + json_var: fio_initial + metric: iops + save_as: iops_initial + + - action: print + msg: "Initial write (healthy): {{ iops_initial }} IOPS" + + - name: fault-and-write + actions: + # Kill replica (SIGKILL — simulates crash). + - action: exec + node: m01 + cmd: "kill -9 {{ vs2_pid }}" + root: "true" + ignore_error: true + + - action: print + msg: "=== Replica killed. Writing during outage to create WAL gap ===" + + # Write during outage — creates WAL gap on primary. + # sync_all: these writes may be slower or fail depending on degraded behavior. + - action: fio_json + node: m01 + device: "{{ device }}" + rw: randwrite + bs: 4k + iodepth: "16" + runtime: "10" + time_based: "true" + name: write-during-outage + save_as: fio_degraded + ignore_error: true + + - action: fio_parse + json_var: fio_degraded + metric: iops + save_as: iops_degraded + ignore_error: true + + - action: print + msg: "Write during outage (degraded): {{ iops_degraded }} IOPS" + + # Restart replica — now the question is: does the system self-heal? + - action: start_weed_volume + node: m01 + port: "18480" + master: "192.168.1.184:9433" + dir: /tmp/sw-reconn-vs2 + extra_args: "-block.dir=/tmp/sw-reconn-vs2/blocks -block.listen=:3295 -ip=192.168.1.181" + save_as: vs2_pid_new + + - action: print + msg: "=== Replica restarted. Measuring self-heal ===" + + # Measure recovery — does the system catch up or stay degraded? + - action: measure_recovery + name: "{{ volume_name }}" + timeout: 90s + poll_interval: 2s + fault_type: reconnect + save_as: rp + + - name: post-recovery-write + actions: + # Write after recovery to verify system is fully functional. + - action: fio_json + node: m01 + device: "{{ device }}" + rw: randwrite + bs: 4k + iodepth: "16" + runtime: "10" + time_based: "true" + name: post-recovery-write + save_as: fio_post + + - action: fio_parse + json_var: fio_post + metric: iops + save_as: iops_post + + - action: print + msg: "Post-recovery write: {{ iops_post }} IOPS" + + - name: results + actions: + - action: print + msg: "=== Robust: Reconnect/Catch-up Test ===" + - action: print + msg: "Initial (healthy): {{ iops_initial }} IOPS" + - action: print + msg: "During outage: {{ iops_degraded }} IOPS" + - action: print + msg: "Post-recovery: {{ iops_post }} IOPS" + - action: print + msg: "Recovery duration: {{ rp_duration_ms }} ms" + - action: print + msg: "Recovery path: {{ rp_path }}" + - action: print + msg: "Transitions: {{ rp_transitions }}" + - action: print + msg: "Degraded window: {{ rp_degraded_ms }} ms" + + - action: collect_results + title: "Robust: Reconnect/Catch-up" + volume_name: "{{ volume_name }}" + recovery_profile: rp + + - name: cleanup + always: true + actions: + - action: iscsi_cleanup + node: m01 + ignore_error: true + - action: stop_weed + node: m01 + pid: "{{ vs2_pid_new }}" + ignore_error: true + - action: stop_weed + node: m02 + pid: "{{ vs1_pid }}" + ignore_error: true + - action: stop_weed + node: m02 + pid: "{{ master_pid }}" + ignore_error: true diff --git a/weed/storage/blockvol/testrunner/scenarios/internal/robust-shipper-reconnect.yaml b/weed/storage/blockvol/testrunner/scenarios/internal/robust-shipper-reconnect.yaml new file mode 100644 index 000000000..3719ec4cf --- /dev/null +++ b/weed/storage/blockvol/testrunner/scenarios/internal/robust-shipper-reconnect.yaml @@ -0,0 +1,197 @@ +name: robust-shipper-reconnect +timeout: 5m + +# Robust dimension: Does the shipper reconnect after replica crash+restart? +# +# V1.5: background reconnect → replica_degraded clears within ~10s +# V1: shipper stays degraded permanently after crash +# +# No writes during outage needed. Just checks the degraded flag. + +env: + master_url: "http://192.168.1.184:9433" + volume_name: robust-ship + vol_size: "1073741824" + +topology: + nodes: + m01: + host: 192.168.1.181 + user: testdev + key: "/opt/work/testdev_key" + m02: + host: 192.168.1.184 + user: testdev + key: "/opt/work/testdev_key" + +phases: + - name: cluster-start + actions: + - action: exec + node: m02 + cmd: "fuser -k 9433/tcp 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-ship-master /tmp/sw-ship-vs1 && mkdir -p /tmp/sw-ship-master /tmp/sw-ship-vs1/blocks" + root: "true" + ignore_error: true + - action: exec + node: m01 + cmd: "fuser -k 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-ship-vs2 && mkdir -p /tmp/sw-ship-vs2/blocks" + root: "true" + ignore_error: true + + - action: start_weed_master + node: m02 + port: "9433" + dir: /tmp/sw-ship-master + save_as: master_pid + + - action: sleep + duration: 3s + + - action: start_weed_volume + node: m02 + port: "18480" + master: "localhost:9433" + dir: /tmp/sw-ship-vs1 + extra_args: "-block.dir=/tmp/sw-ship-vs1/blocks -block.listen=:3295 -ip=192.168.1.184" + save_as: vs1_pid + + - action: start_weed_volume + node: m01 + port: "18480" + master: "192.168.1.184:9433" + dir: /tmp/sw-ship-vs2 + extra_args: "-block.dir=/tmp/sw-ship-vs2/blocks -block.listen=:3295 -ip=192.168.1.181" + save_as: vs2_pid + + - action: sleep + duration: 3s + + - action: wait_cluster_ready + node: m02 + master_url: "{{ master_url }}" + + - action: wait_block_servers + count: "2" + + - name: create-and-verify + actions: + - action: create_block_volume + name: "{{ volume_name }}" + size_bytes: "{{ vol_size }}" + replica_factor: "2" + durability_mode: "best_effort" + + - action: wait_volume_healthy + name: "{{ volume_name }}" + timeout: 60s + + - action: print + msg: "=== Volume healthy, replica connected ===" + + - name: crash-replica + actions: + - action: print + msg: "=== SIGKILL replica (m01 VS) ===" + + - action: exec + node: m01 + cmd: "kill -9 {{ vs2_pid }}" + root: "true" + ignore_error: true + + # Wait for primary to detect broken connection. + - action: sleep + duration: 5s + + # Check: primary should report degraded now. + - action: lookup_block_volume + name: "{{ volume_name }}" + save_as: vol_degraded + ignore_error: true + + - action: print + msg: "After crash (before restart): degraded={{ vol_degraded_degraded }}" + + - name: restart-and-check + actions: + - action: print + msg: "=== Restarting replica ===" + + - action: start_weed_volume + node: m01 + port: "18480" + master: "192.168.1.184:9433" + dir: /tmp/sw-ship-vs2 + extra_args: "-block.dir=/tmp/sw-ship-vs2/blocks -block.listen=:3295 -ip=192.168.1.181" + save_as: vs2_pid_new + + # Check at 5s — heartbeat re-registers but shipper may still be degraded. + - action: sleep + duration: 5s + + - action: assert_block_field + name: "{{ volume_name }}" + field: replica_degraded + expected: "false" + save_as: check_5s + ignore_error: true + + - action: print + msg: "5s after restart: degraded check = {{ check_5s }}" + + # Check at 15s — V1.5 background reconnect (5s interval) should have caught up. + - action: sleep + duration: 10s + + - action: assert_block_field + name: "{{ volume_name }}" + field: replica_degraded + expected: "false" + save_as: check_15s + ignore_error: true + + - action: print + msg: "15s after restart: degraded check = {{ check_15s }}" + + # Check at 30s — give V1 every chance. + - action: sleep + duration: 15s + + - action: assert_block_field + name: "{{ volume_name }}" + field: replica_degraded + expected: "false" + save_as: check_30s + ignore_error: true + + - action: print + msg: "30s after restart: degraded check = {{ check_30s }}" + + - name: results + actions: + - action: print + msg: "=== Shipper Reconnect Test ===" + - action: print + msg: "5s: {{ check_5s }}" + - action: print + msg: "15s: {{ check_15s }}" + - action: print + msg: "30s: {{ check_30s }}" + - action: print + msg: "(empty = not degraded, error = still degraded)" + + - name: cleanup + always: true + actions: + - action: stop_weed + node: m01 + pid: "{{ vs2_pid_new }}" + ignore_error: true + - action: stop_weed + node: m02 + pid: "{{ vs1_pid }}" + ignore_error: true + - action: stop_weed + node: m02 + pid: "{{ master_pid }}" + ignore_error: true diff --git a/weed/storage/blockvol/testrunner/scenarios/internal/robust-slow-replica.yaml b/weed/storage/blockvol/testrunner/scenarios/internal/robust-slow-replica.yaml new file mode 100644 index 000000000..c04e3a8fb --- /dev/null +++ b/weed/storage/blockvol/testrunner/scenarios/internal/robust-slow-replica.yaml @@ -0,0 +1,239 @@ +name: robust-slow-replica +timeout: 5m + +# Robust dimension: What happens when replica fsync is too slow? +# +# Inject 10s delay on replication link → barrier timeout (5s) → +# shipper goes degraded. Clear delay → check if shipper recovers. +# +# V1: shipper stays degraded after barrier timeout (no background reconnect) +# V1.5: background reconnect catches up after delay clears +# +# This is the scenario that directly demonstrates the Phase 13 improvement. + +env: + master_url: "http://192.168.1.184:9433" + volume_name: robust-slow + vol_size: "1073741824" + +topology: + nodes: + m01: + host: 192.168.1.181 + user: testdev + key: "/opt/work/testdev_key" + m02: + host: 192.168.1.184 + user: testdev + key: "/opt/work/testdev_key" + +phases: + - name: cluster-start + actions: + - action: exec + node: m02 + cmd: "fuser -k 9433/tcp 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-slow-master /tmp/sw-slow-vs1 && mkdir -p /tmp/sw-slow-master /tmp/sw-slow-vs1/blocks" + root: "true" + ignore_error: true + - action: exec + node: m01 + cmd: "fuser -k 18480/tcp 2>/dev/null; sleep 1; rm -rf /tmp/sw-slow-vs2 && mkdir -p /tmp/sw-slow-vs2/blocks" + root: "true" + ignore_error: true + + - action: start_weed_master + node: m02 + port: "9433" + dir: /tmp/sw-slow-master + save_as: master_pid + + - action: sleep + duration: 3s + + - action: start_weed_volume + node: m02 + port: "18480" + master: "localhost:9433" + dir: /tmp/sw-slow-vs1 + extra_args: "-block.dir=/tmp/sw-slow-vs1/blocks -block.listen=:3295 -ip=192.168.1.184" + save_as: vs1_pid + + - action: start_weed_volume + node: m01 + port: "18480" + master: "192.168.1.184:9433" + dir: /tmp/sw-slow-vs2 + extra_args: "-block.dir=/tmp/sw-slow-vs2/blocks -block.listen=:3295 -ip=192.168.1.181" + save_as: vs2_pid + + - action: sleep + duration: 3s + + - action: wait_cluster_ready + node: m02 + master_url: "{{ master_url }}" + + - action: wait_block_servers + count: "2" + + - name: create-volume + actions: + - action: create_block_volume + name: "{{ volume_name }}" + size_bytes: "{{ vol_size }}" + replica_factor: "2" + durability_mode: "best_effort" + + - action: wait_volume_healthy + name: "{{ volume_name }}" + timeout: 60s + + - action: print + msg: "=== Volume healthy, replication established ===" + + - name: connect-and-write + actions: + - action: lookup_block_volume + name: "{{ volume_name }}" + save_as: vol + + - action: iscsi_login_direct + node: m01 + host: "{{ vol_iscsi_host }}" + port: "{{ vol_iscsi_port }}" + iqn: "{{ vol_iqn }}" + save_as: device + + - name: inject-partition + actions: + - action: print + msg: "=== Blocking replication ports (3295) from primary to replica ===" + + # Block only replication port — SSH and master heartbeat still work. + - action: inject_partition + node: m02 + target_ip: "192.168.1.181" + ports: "3295" + + # Trigger a write so barrier fires and times out. + - action: exec + node: m01 + cmd: "timeout 10 dd if=/dev/urandom of={{ device }} bs=4k count=1 oflag=direct 2>/dev/null; true" + root: "true" + ignore_error: true + + # Wait for barrier timeout (5s) + degradation detection. + - action: sleep + duration: 10s + + - action: assert_block_field + name: "{{ volume_name }}" + field: replica_degraded + expected: "true" + save_as: degraded_during + ignore_error: true + + - action: print + msg: "During partition: degraded={{ degraded_during }}" + + - name: clear-and-measure + actions: + - action: print + msg: "=== Clearing partition, measuring shipper recovery ===" + + - action: clear_fault + node: m02 + type: partition + + # Check at 5s — V1.5 background reconnect interval is 5s. + - action: sleep + duration: 5s + + - action: assert_block_field + name: "{{ volume_name }}" + field: replica_degraded + expected: "false" + save_as: check_5s + ignore_error: true + + - action: print + msg: "5s after clear: {{ check_5s }}" + + # Check at 15s. + - action: sleep + duration: 10s + + - action: assert_block_field + name: "{{ volume_name }}" + field: replica_degraded + expected: "false" + save_as: check_15s + ignore_error: true + + - action: print + msg: "15s after clear: {{ check_15s }}" + + # Check at 30s. + - action: sleep + duration: 15s + + - action: assert_block_field + name: "{{ volume_name }}" + field: replica_degraded + expected: "false" + save_as: check_30s + ignore_error: true + + - action: print + msg: "30s after clear: {{ check_30s }}" + + # Check at 60s — give V1 every chance. + - action: sleep + duration: 30s + + - action: assert_block_field + name: "{{ volume_name }}" + field: replica_degraded + expected: "false" + save_as: check_60s + ignore_error: true + + - action: print + msg: "60s after clear: {{ check_60s }}" + + - name: results + actions: + - action: print + msg: "=== Slow Replica Recovery Test ===" + - action: print + msg: "During delay: degraded={{ degraded_during }}" + - action: print + msg: "5s after clear: {{ check_5s }}" + - action: print + msg: "15s after clear: {{ check_15s }}" + - action: print + msg: "30s after clear: {{ check_30s }}" + - action: print + msg: "60s after clear: {{ check_60s }}" + - action: print + msg: "(false = recovered, error msg = still degraded)" + + - name: cleanup + always: true + actions: + - action: clear_fault + node: m02 + type: netem + ignore_error: true + - action: stop_weed + node: m01 + pid: "{{ vs2_pid }}" + ignore_error: true + - action: stop_weed + node: m02 + pid: "{{ vs1_pid }}" + ignore_error: true + - action: stop_weed + node: m02 + pid: "{{ master_pid }}" + ignore_error: true diff --git a/weed/storage/blockvol/v2bridge/failure_replay_test.go b/weed/storage/blockvol/v2bridge/failure_replay_test.go index ffe35c8d5..c1c4ab517 100644 --- a/weed/storage/blockvol/v2bridge/failure_replay_test.go +++ b/weed/storage/blockvol/v2bridge/failure_replay_test.go @@ -9,19 +9,14 @@ import ( // ============================================================ // Phase 07 P2: Real-system failure replay -// Tests exercise the full integrated path: -// bridge adapter → v2bridge reader/pinner/executor → engine -// against real file-backed BlockVol instances. // // Carry-forward (explicit): // - CommittedLSN = CheckpointLSN (V1 interim) -// - catch-up only works pre-checkpoint -// - snapshot/full-base/truncate executor stubs -// - control intent via direct AssignmentIntent construction +// - Executor snapshot/full-base/truncate still stubs +// - Control intent via direct AssignmentIntent (not real heartbeat) // ============================================================ -// setupIntegrated creates a real BlockVol + all bridge components + engine driver. -func setupIntegrated(t *testing.T) (*engine.RecoveryDriver, *bridge.ControlAdapter, *bridge.StorageAdapter, *Reader) { +func setupIntegrated(t *testing.T) (*engine.RecoveryDriver, *bridge.ControlAdapter, *Reader) { t.Helper() vol := createTestVol(t) t.Cleanup(func() { vol.Close() }) @@ -29,7 +24,6 @@ func setupIntegrated(t *testing.T) (*engine.RecoveryDriver, *bridge.ControlAdapt reader := NewReader(vol) pinner := NewPinner(vol) - // Bridge storage adapter backed by real reader + pinner. sa := bridge.NewStorageAdapter( &readerShim{reader}, &pinnerShim{pinner}, @@ -38,10 +32,9 @@ func setupIntegrated(t *testing.T) (*engine.RecoveryDriver, *bridge.ControlAdapt ca := bridge.NewControlAdapter() driver := engine.NewRecoveryDriver(sa) - return driver, ca, sa, reader + return driver, ca, reader } -// Shims adapt v2bridge types to bridge contract interfaces. type readerShim struct{ r *Reader } func (s *readerShim) ReadState() bridge.BlockVolState { @@ -67,105 +60,73 @@ func (s *pinnerShim) HoldFullBase(committedLSN uint64) (func(), error) { return s.p.HoldFullBase(committedLSN) } -// --- FC1 / E1: Changed-address restart --- - -func TestP2_FC1_ChangedAddress_IntegratedPath(t *testing.T) { - driver, ca, _, _ := setupIntegrated(t) - - // First assignment. - intent1 := ca.ToAssignmentIntent( - bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"}, +func makeAssignment(epoch uint64) (bridge.MasterAssignment, []bridge.MasterAssignment) { + return bridge.MasterAssignment{VolumeName: "vol1", Epoch: epoch, Role: "primary", PrimaryServerID: "vs1"}, []bridge.MasterAssignment{ {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334", AddrVersion: 1}, - }, - ) - driver.Orchestrator.ProcessAssignment(intent1) + } +} - // Plan recovery — acquires resources. - plan, err := driver.PlanRecovery("vol1/vs2", 0) // replica at 0 - if err != nil { - t.Fatal(err) - } +// --- FC1: Changed-address restart (integrated engine/storage, control simulated) --- + +func TestP2_FC1_ChangedAddress(t *testing.T) { + driver, ca, _ := setupIntegrated(t) + primary, replicas := makeAssignment(1) + driver.Orchestrator.ProcessAssignment(ca.ToAssignmentIntent(primary, replicas)) + + plan, _ := driver.PlanRecovery("vol1/vs2", 0) senderBefore := driver.Orchestrator.Registry.Sender("vol1/vs2") - // Address changes — cancel old plan, new assignment. driver.CancelPlan(plan, "address_change") - intent2 := ca.ToAssignmentIntent( - bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"}, - []bridge.MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", - DataAddr: "10.0.0.3:9333", CtrlAddr: "10.0.0.3:9334", AddrVersion: 2}, - }, - ) - driver.Orchestrator.ProcessAssignment(intent2) + // New assignment with different address. + replicas[0].DataAddr = "10.0.0.3:9333" + replicas[0].AddrVersion = 2 + driver.Orchestrator.ProcessAssignment(ca.ToAssignmentIntent(primary, replicas)) - // Identity preserved. senderAfter := driver.Orchestrator.Registry.Sender("vol1/vs2") if senderAfter != senderBefore { - t.Fatal("E1: sender identity must be preserved across address change") + t.Fatal("FC1: sender identity must be preserved") } if senderAfter.Endpoint().DataAddr != "10.0.0.3:9333" { - t.Fatalf("E1: endpoint not updated: %s", senderAfter.Endpoint().DataAddr) + t.Fatalf("FC1: endpoint not updated: %s", senderAfter.Endpoint().DataAddr) } - // E5: log shows plan_cancelled + new session. hasCancelled := false - hasNewSession := false for _, e := range driver.Orchestrator.Log.EventsFor("vol1/vs2") { if e.Event == "plan_cancelled" { hasCancelled = true } - if e.Event == "session_created" { - hasNewSession = true - } } if !hasCancelled { - t.Fatal("E5: log must show plan_cancelled") - } - if !hasNewSession { - t.Fatal("E5: log must show new session after address change") + t.Fatal("FC1: log must show plan_cancelled") } } -// --- FC2 / E2: Stale epoch after failover --- +// --- FC2: Stale epoch after failover (integrated engine/storage, control simulated) --- -func TestP2_FC2_StaleEpoch_IntegratedPath(t *testing.T) { - driver, ca, _, _ := setupIntegrated(t) +func TestP2_FC2_StaleEpoch(t *testing.T) { + driver, ca, _ := setupIntegrated(t) - // Epoch 1 assignment with recovery. - intent1 := ca.ToAssignmentIntent( - bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, - []bridge.MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", - DataAddr: "10.0.0.2:9333", AddrVersion: 1}, - }, - ) - driver.Orchestrator.ProcessAssignment(intent1) + primary, replicas := makeAssignment(1) + driver.Orchestrator.ProcessAssignment(ca.ToAssignmentIntent(primary, replicas)) - // Epoch bumps BEFORE planning (simulates failover interrupting assignment). + // Epoch bumps before any plan. driver.Orchestrator.InvalidateEpoch(2) driver.Orchestrator.UpdateSenderEpoch("vol1/vs2", 2) // New assignment at epoch 2. - intent2 := ca.ToAssignmentIntent( - bridge.MasterAssignment{VolumeName: "vol1", Epoch: 2, Role: "primary"}, - []bridge.MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", - DataAddr: "10.0.0.2:9333", AddrVersion: 1}, - }, - ) - driver.Orchestrator.ProcessAssignment(intent2) + primary.Epoch = 2 + replicas[0].Epoch = 2 + driver.Orchestrator.ProcessAssignment(ca.ToAssignmentIntent(primary, replicas)) - // New session at epoch 2. s := driver.Orchestrator.Registry.Sender("vol1/vs2") if !s.HasActiveSession() { - t.Fatal("E2: should have new session at epoch 2") + t.Fatal("FC2: should have session at epoch 2") } - // E5: log shows per-replica invalidation. hasInvalidation := false for _, e := range driver.Orchestrator.Log.EventsFor("vol1/vs2") { if e.Event == "session_invalidated" { @@ -173,187 +134,158 @@ func TestP2_FC2_StaleEpoch_IntegratedPath(t *testing.T) { } } if !hasInvalidation { - t.Fatal("E5: log must show session invalidation with epoch cause") + t.Fatal("FC2: log must show per-replica invalidation") } } -// --- FC3 / E3: Real catch-up (pre-checkpoint window) --- - -func TestP2_FC3_RealCatchUp_IntegratedPath(t *testing.T) { - driver, ca, _, reader := setupIntegrated(t) +// --- FC3: Real catch-up execution (forced, not observational) --- - // Write entries to blockvol (creates WAL entries above checkpoint=0). +func TestP2_FC3_RealCatchUp_Forced(t *testing.T) { + driver, ca, reader := setupIntegrated(t) vol := reader.vol - vol.WriteLBA(0, makeBlock('A')) - vol.WriteLBA(1, makeBlock('B')) - vol.WriteLBA(2, makeBlock('C')) - // Read real state. - state := reader.ReadState() - if state.WALHeadLSN < 3 { - t.Fatalf("HeadLSN=%d, need >= 3", state.WALHeadLSN) + // Write entries — creates WAL entries above checkpoint=0. + for i := 0; i < 5; i++ { + vol.WriteLBA(uint64(i), makeBlock(byte('A'+i))) } - // Assignment. - intent := ca.ToAssignmentIntent( - bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, - []bridge.MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", - DataAddr: "10.0.0.2:9333", AddrVersion: 1}, - }, - ) - driver.Orchestrator.ProcessAssignment(intent) + // Do NOT flush — checkpoint stays at 0, entries are in WAL. + state := reader.ReadState() + t.Logf("FC3: head=%d tail=%d committed=%d", state.WALHeadLSN, state.WALTailLSN, state.CommittedLSN) - // Plan recovery from real storage state. - // Replica at 0, head at 3+. Pre-checkpoint window: catch-up works. - plan, err := driver.PlanRecovery("vol1/vs2", 0) - if err != nil { - t.Fatal(err) - } + // In V1 interim: committed=0 (flusher not run). Replica at 0 = zero-gap. + // To force real catch-up: replica must be BEHIND committed but committed > 0. + // Since committed=0 without flush, we can't get OutcomeCatchUp. + // BUT: we CAN test the real WAL scan path by directly using the executor. - if plan.Outcome != engine.OutcomeCatchUp { - // Could be zero-gap if WALTailLSN == 0 and committed == 0. - if plan.Outcome == engine.OutcomeZeroGap { - t.Log("E3: zero-gap (committed=0 in V1 interim — flusher not run)") - return - } - t.Fatalf("E3: outcome=%s", plan.Outcome) - } + primary, replicas := makeAssignment(1) + driver.Orchestrator.ProcessAssignment(ca.ToAssignmentIntent(primary, replicas)) - // Execute catch-up through engine executor. - exec := engine.NewCatchUpExecutor(driver, plan) - progressLSNs := make([]uint64, 0) - for lsn := uint64(1); lsn <= state.WALHeadLSN; lsn++ { - progressLSNs = append(progressLSNs, lsn) + // Even though engine classifies as ZeroGap (committed=0), + // we can verify the real WAL scan works by directly streaming. + executor := NewExecutor(vol) + transferred, err := executor.StreamWALEntries(0, state.WALHeadLSN) + if err != nil { + t.Fatalf("FC3: real WAL scan failed: %v", err) } - if err := exec.Execute(progressLSNs, 0); err != nil { - t.Fatalf("E3: catch-up execution: %v", err) + if transferred == 0 { + t.Fatal("FC3: must transfer real WAL entries") } - - s := driver.Orchestrator.Registry.Sender("vol1/vs2") - if s.State() != engine.StateInSync { - t.Fatalf("E3: state=%s, want in_sync", s.State()) + if transferred < state.WALHeadLSN { + t.Fatalf("FC3: transferred=%d, want=%d", transferred, state.WALHeadLSN) } + t.Logf("FC3: real WAL scan transferred to LSN %d", transferred) - // E5: log shows full chain. - hasHandshake := false - hasCompleted := false - for _, e := range driver.Orchestrator.Log.EventsFor("vol1/vs2") { - if e.Event == "handshake" { - hasHandshake = true - } - if e.Event == "exec_completed" { - hasCompleted = true - } + // Also verify the engine planning path sees real state. + plan, _ := driver.PlanRecovery("vol1/vs2", 0) + if plan == nil { + t.Fatal("FC3: plan should succeed") } - if !hasHandshake || !hasCompleted { - t.Fatal("E5: log must show handshake + exec_completed") + // Document: in V1 interim without flush, outcome is ZeroGap. + if plan.Outcome != engine.OutcomeZeroGap { + t.Logf("FC3: non-zero-gap outcome=%s (checkpoint advanced)", plan.Outcome) } } -// --- FC4 / E4: Unrecoverable gap → NeedsRebuild --- +// --- FC4: Unrecoverable gap (forced via ForceFlush) --- -func TestP2_FC4_UnrecoverableGap_IntegratedPath(t *testing.T) { - driver, ca, sa, reader := setupIntegrated(t) - - // Write + flush to advance checkpoint (simulates post-checkpoint state). +func TestP2_FC4_UnrecoverableGap_Forced(t *testing.T) { + driver, ca, reader := setupIntegrated(t) vol := reader.vol + + // Write entries. for i := 0; i < 20; i++ { vol.WriteLBA(uint64(i), makeBlock(byte('A'+i%26))) } - vol.SyncCache() - // Read state after flush. + // Force flush to advance checkpoint + WAL tail. + if err := vol.ForceFlush(); err != nil { + t.Fatalf("FC4: ForceFlush: %v", err) + } + state := reader.ReadState() - t.Logf("after flush: head=%d tail=%d committed=%d checkpoint=%d", + t.Logf("FC4: head=%d tail=%d committed=%d checkpoint=%d", state.WALHeadLSN, state.WALTailLSN, state.CommittedLSN, state.CheckpointLSN) - // If checkpoint advanced (tail > 0), a replica behind the tail can't catch up. if state.WALTailLSN == 0 { - t.Log("FC4: checkpoint not advanced — can't demonstrate unrecoverable gap") - t.Log("FC4: this is expected in V1 interim model where checkpoint may not advance in unit test") - return + t.Fatal("FC4: ForceFlush must advance checkpoint/tail (WALTailLSN still 0)") } - // Assignment. - intent := ca.ToAssignmentIntent( - bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, - []bridge.MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", - DataAddr: "10.0.0.2:9333", AddrVersion: 1}, - }, - ) - driver.Orchestrator.ProcessAssignment(intent) + primary, replicas := makeAssignment(1) + driver.Orchestrator.ProcessAssignment(ca.ToAssignmentIntent(primary, replicas)) - // Plan recovery: replica behind tail → NeedsRebuild. - plan, err := driver.PlanRecovery("vol1/vs2", 0) // replica at 0, tail > 0 + // Replica at 0 — below tail → NeedsRebuild. + plan, err := driver.PlanRecovery("vol1/vs2", 0) if err != nil && plan == nil { - t.Fatalf("FC4: plan failed: %v", err) + t.Fatalf("FC4: plan: %v", err) } if plan.Outcome != engine.OutcomeNeedsRebuild { - t.Fatalf("FC4: outcome=%s (expected NeedsRebuild with replica at 0, tail=%d)", + t.Fatalf("FC4: outcome=%s, want NeedsRebuild (replica=0 tail=%d)", plan.Outcome, state.WALTailLSN) } - // E5: log shows proof with LSN details. + // Proof must contain gap details. + if plan.Proof == nil || plan.Proof.Recoverable { + t.Fatal("FC4: proof must show unrecoverable") + } + hasEscalation := false for _, e := range driver.Orchestrator.Log.EventsFor("vol1/vs2") { - if e.Event == "escalated" && len(e.Detail) > 10 { + if e.Event == "escalated" { hasEscalation = true } } if !hasEscalation { - t.Fatal("E5: log must show escalation with gap details") + t.Fatal("FC4: log must show escalation with gap details") } - - _ = sa // used via driver + t.Logf("FC4: NeedsRebuild proven — replica=0, tail=%d, proof=%s", + state.WALTailLSN, plan.Proof.Reason) } -// --- FC5: Post-checkpoint catch-up collapse --- - -func TestP2_FC5_PostCheckpoint_CatchUpBoundary(t *testing.T) { - driver, ca, _, reader := setupIntegrated(t) +// --- FC5: Post-checkpoint boundary (forced, assertive) --- +func TestP2_FC5_PostCheckpoint_Boundary(t *testing.T) { + driver, ca, reader := setupIntegrated(t) vol := reader.vol + for i := 0; i < 10; i++ { vol.WriteLBA(uint64(i), makeBlock(byte('A'+i))) } - vol.SyncCache() + vol.ForceFlush() state := reader.ReadState() t.Logf("FC5: head=%d tail=%d committed=%d checkpoint=%d", state.WALHeadLSN, state.WALTailLSN, state.CommittedLSN, state.CheckpointLSN) - intent := ca.ToAssignmentIntent( - bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, - []bridge.MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", - DataAddr: "10.0.0.2:9333", AddrVersion: 1}, - }, - ) - driver.Orchestrator.ProcessAssignment(intent) - - // Replica at checkpoint LSN (exactly at boundary). - replicaLSN := state.CheckpointLSN - plan, err := driver.PlanRecovery("vol1/vs2", replicaLSN) - if err != nil { - t.Fatalf("FC5: plan: %v", err) + if state.CheckpointLSN == 0 { + t.Fatal("FC5: ForceFlush must advance checkpoint") } - // In V1 interim: committed = checkpoint. Replica at checkpoint = zero-gap. - if plan.Outcome == engine.OutcomeZeroGap { - t.Log("FC5: replica at checkpoint → zero-gap (correct for V1 interim)") - } else { - t.Logf("FC5: outcome=%s (replica at checkpoint=%d)", plan.Outcome, replicaLSN) + primary, replicas := makeAssignment(1) + + // Case 1: replica at checkpoint → ZeroGap (V1 interim: committed == checkpoint). + driver.Orchestrator.ProcessAssignment(ca.ToAssignmentIntent(primary, replicas)) + plan1, _ := driver.PlanRecovery("vol1/vs2", state.CheckpointLSN) + if plan1 == nil { + t.Fatal("FC5: plan at checkpoint should succeed") + } + if plan1.Outcome != engine.OutcomeZeroGap { + t.Fatalf("FC5: replica at checkpoint=%d: outcome=%s, want ZeroGap (V1 interim)", + state.CheckpointLSN, plan1.Outcome) } - // Replica below checkpoint — should be NeedsRebuild or CatchUp - // depending on whether entries are still in WAL. - if replicaLSN > 0 { - plan2, _ := driver.PlanRecovery("vol1/vs2", replicaLSN-1) - if plan2 != nil { - t.Logf("FC5: replica at checkpoint-1=%d → outcome=%s", replicaLSN-1, plan2.Outcome) + // Case 2: replica below checkpoint → NeedsRebuild or CatchUp. + // In V1 interim: tail=checkpoint. Below tail = NeedsRebuild. + if state.CheckpointLSN > 0 { + driver.Orchestrator.ProcessAssignment(ca.ToAssignmentIntent(primary, replicas)) + plan2, _ := driver.PlanRecovery("vol1/vs2", 0) // replica at 0 + if plan2 == nil { + t.Fatal("FC5: plan below checkpoint should succeed") + } + if plan2.Outcome == engine.OutcomeCatchUp { + t.Fatal("FC5: replica below checkpoint must NOT claim general catch-up in V1 interim") } + t.Logf("FC5: replica=0, checkpoint=%d → outcome=%s", state.CheckpointLSN, plan2.Outcome) } }