Browse Source

fix: sync flusher checkpointLSN after rebuild (CP13-7)

rebuildFullExtent updated superblock.WALCheckpointLSN but not the
flusher's internal checkpointLSN. NewReplicaReceiver then read
stale 0 from flusher.CheckpointLSN(), causing post-rebuild
flushedLSN to be wrong.

Added Flusher.SetCheckpointLSN() and call it after rebuild
superblock persist. TestRebuild_PostRebuild_FlushedLSN_IsCheckpoint
flips FAIL→PASS.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 1 week ago
parent
commit
aa4688d5d5
  1. 8
      weed/storage/blockvol/flusher.go
  2. 7
      weed/storage/blockvol/rebuild.go
  3. 210
      weed/storage/blockvol/rebuild_v1_test.go

8
weed/storage/blockvol/flusher.go

@ -467,6 +467,14 @@ func (f *Flusher) CheckpointLSN() uint64 {
return f.checkpointLSN
}
// SetCheckpointLSN updates the flusher's internal checkpoint state.
// Used after rebuild to sync flusher with the rebuilt superblock state.
func (f *Flusher) SetCheckpointLSN(lsn uint64) {
f.mu.Lock()
f.checkpointLSN = lsn
f.mu.Unlock()
}
// CloseBatchIO releases the batch I/O backend resources (e.g. io_uring ring).
// Must be called after Stop() and the final FlushOnce().
func (f *Flusher) CloseBatchIO() error {

7
weed/storage/blockvol/rebuild.go

@ -337,6 +337,13 @@ extentDone:
}
vol.mu.Unlock()
// Sync flusher's internal checkpoint with the rebuilt superblock state.
// Without this, NewReplicaReceiver reads stale checkpointLSN=0 from
// flusher.CheckpointLSN(), causing post-rebuild flushedLSN to be wrong.
if vol.flusher != nil {
vol.flusher.SetCheckpointLSN(checkpointLSN)
}
conn.Close()
// Second catch-up scan: capture writes during extent copy.

210
weed/storage/blockvol/rebuild_v1_test.go

@ -219,5 +219,215 @@ func TestRebuild_AbortOnEpochChange(t *testing.T) {
}
}
// TestRebuild_PostRebuild_FlushedLSN_IsCheckpoint verifies that after a
// full extent rebuild, the replica's state is correctly initialized:
// - flushedLSN = checkpointLSN (baseline durable, not 0, not headLSN)
// - receivedLSN = nextLSN - 1 (ready to accept next entry from primary)
// - replica re-enters as Disconnected/bootstrap, NOT InSync
//
// BUG FOUND: rebuildFullExtent updates superblock.WALCheckpointLSN but does
// NOT update flusher.checkpointLSN. So NewReplicaReceiver reads stale 0
// from flusher.CheckpointLSN(). Fix: rebuild must sync flusher state.
func TestRebuild_PostRebuild_FlushedLSN_IsCheckpoint(t *testing.T) {
pDir := t.TempDir()
rDir := t.TempDir()
opts := CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
WALSize: 256 * 1024,
DurabilityMode: DurabilitySyncAll,
}
primary, err := CreateBlockVol(filepath.Join(pDir, "primary.blockvol"), opts)
if err != nil {
t.Fatal(err)
}
defer primary.Close()
primary.SetRole(RolePrimary)
primary.SetEpoch(1)
primary.SetMasterEpoch(1)
primary.lease.Grant(30 * time.Second)
// Write data on primary and flush to extent.
for i := uint64(0); i < 10; i++ {
if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil {
t.Fatal(err)
}
}
if err := primary.SyncCache(); err != nil {
// No replica configured — SyncCache does local fsync only.
}
primary.flusher.FlushOnce()
checkpointBefore := primary.flusher.CheckpointLSN()
if checkpointBefore == 0 {
t.Fatal("primary checkpointLSN should be >0 after flush")
}
t.Logf("primary checkpointLSN=%d, nextLSN=%d", checkpointBefore, primary.nextLSN.Load())
// Start rebuild server.
if err := primary.StartRebuildServer("127.0.0.1:0"); err != nil {
t.Fatal(err)
}
defer primary.StopRebuildServer()
// Create replica and rebuild from primary.
replica, err := CreateBlockVol(filepath.Join(rDir, "replica.blockvol"), opts)
if err != nil {
t.Fatal(err)
}
defer replica.Close()
replica.SetEpoch(1)
replica.SetMasterEpoch(1)
if err := HandleAssignment(replica, 1, RoleRebuilding, 0); err != nil {
t.Fatal(err)
}
if err := StartRebuild(replica, primary.rebuildServer.Addr(), 0, 1); err != nil {
t.Fatalf("StartRebuild: %v", err)
}
// After rebuild: check replica state.
status := replica.Status()
t.Logf("post-rebuild: role=%s nextLSN=%d walHeadLSN=%d", status.Role, replica.nextLSN.Load(), status.WALHeadLSN)
// Role should be Replica (not InSync — InSync is a shipper concept).
if status.Role != RoleReplica {
t.Fatalf("expected RoleReplica, got %s", status.Role)
}
// nextLSN should be > 0 (set by syncLSNAfterRebuild).
if replica.nextLSN.Load() <= 1 {
t.Fatalf("nextLSN=%d, expected > 1 after rebuild", replica.nextLSN.Load())
}
// Create a receiver — it initializes flushedLSN from vol state.
recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer recv.Stop()
// FlushedLSN should be initialized to vol.nextLSN-1 (the rebuilt baseline).
// It should NOT be 0 (would cause catch-up to replay everything).
flushed := recv.FlushedLSN()
if flushed == 0 {
t.Fatal("post-rebuild flushedLSN=0 — should be initialized from rebuilt baseline")
}
t.Logf("post-rebuild receiver: flushedLSN=%d receivedLSN=%d", flushed, recv.ReceivedLSN())
// Verify data integrity: all 10 blocks should be present on replica.
for i := uint64(0); i < 10; i++ {
got, err := replica.ReadLBA(i, 4096)
if err != nil {
t.Fatalf("ReadLBA(%d): %v", i, err)
}
if got[0] != byte('A'+i) {
t.Fatalf("LBA %d: expected %c, got %c — rebuild data integrity failure", i, 'A'+i, got[0])
}
}
}
// TestRebuild_MissingTailRestartsOrFailsCleanly verifies that if the
// trailing WAL required after a full extent rebuild has been reclaimed,
// the rebuild fails cleanly instead of promoting a partial replica.
//
// Scenario: Primary writes 100 entries → flush + reclaim → rebuild starts.
// The full extent copy succeeds. But the second catch-up (trailing WAL)
// requests entries that have been reclaimed → should error, not promote.
func TestRebuild_MissingTailRestartsOrFailsCleanly(t *testing.T) {
pDir := t.TempDir()
rDir := t.TempDir()
opts := CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
WALSize: 32 * 1024, // tiny WAL — reclaimed quickly
DurabilityMode: DurabilitySyncAll,
}
primary, err := CreateBlockVol(filepath.Join(pDir, "primary.blockvol"), opts)
if err != nil {
t.Fatal(err)
}
defer primary.Close()
primary.SetRole(RolePrimary)
primary.SetEpoch(1)
primary.SetMasterEpoch(1)
primary.lease.Grant(30 * time.Second)
// Write enough data to fill and reclaim the tiny WAL multiple times.
for i := uint64(0); i < 100; i++ {
if err := primary.WriteLBA(i%16, makeBlock(byte('0'+i%10))); err != nil {
t.Fatal(err)
}
}
if err := primary.SyncCache(); err != nil {
// No replica — local only.
}
// Flush aggressively to reclaim WAL.
for j := 0; j < 5; j++ {
primary.flusher.FlushOnce()
}
headLSN := primary.nextLSN.Load() - 1
checkpointLSN := primary.flusher.CheckpointLSN()
t.Logf("primary: headLSN=%d checkpointLSN=%d", headLSN, checkpointLSN)
// Start rebuild server.
if err := primary.StartRebuildServer("127.0.0.1:0"); err != nil {
t.Fatal(err)
}
defer primary.StopRebuildServer()
// Now write MORE data AFTER starting the rebuild server.
// This creates entries between snapshotLSN and the current head.
// The second catch-up will need these entries.
for i := uint64(0); i < 100; i++ {
if err := primary.WriteLBA(i%16, makeBlock(byte('A'+i%10))); err != nil {
t.Fatal(err)
}
}
// Flush aggressively again — reclaim the entries the second catch-up needs.
for j := 0; j < 5; j++ {
primary.flusher.FlushOnce()
}
newHead := primary.nextLSN.Load() - 1
t.Logf("primary after more writes: headLSN=%d (entries between %d and %d likely reclaimed)", newHead, headLSN, newHead)
// Create replica and attempt rebuild.
replica, err := CreateBlockVol(filepath.Join(rDir, "replica.blockvol"), opts)
if err != nil {
t.Fatal(err)
}
defer replica.Close()
replica.SetEpoch(1)
replica.SetMasterEpoch(1)
if err := HandleAssignment(replica, 1, RoleRebuilding, 0); err != nil {
t.Fatal(err)
}
// StartRebuild: WAL catch-up fails (WAL_RECYCLED) → falls back to full extent.
// Full extent succeeds. Second catch-up may fail if trailing WAL is reclaimed.
err = StartRebuild(replica, primary.rebuildServer.Addr(), 0, 1)
if err != nil {
// Expected: rebuild failed because trailing WAL was reclaimed.
// Replica should NOT be promoted to RoleReplica.
t.Logf("rebuild correctly failed: %v", err)
if replica.Role() == RoleReplica {
t.Fatal("replica should NOT be RoleReplica after failed rebuild — partial data")
}
} else {
// Rebuild succeeded — the trailing WAL was still available.
// This is also acceptable (depends on timing/flusher behavior).
t.Log("rebuild succeeded — trailing WAL was still retained")
if replica.Role() != RoleReplica {
t.Fatalf("expected RoleReplica after successful rebuild, got %s", replica.Role())
}
}
}
// createSyncAllPair is defined in sync_all_bug_test.go.
// makeBlock is defined in blockvol_test.go.
Loading…
Cancel
Save