Browse Source

feat: rebuild fallback with per-replica heartbeat state (CP13-7)

Adds per-replica state reporting in heartbeat so master can identify
which specific replica needs rebuild, not just a volume-level boolean.

New ReplicaShipperStatus{DataAddr, State, FlushedLSN} type reported
via ReplicaShipperStates field on BlockVolumeInfoMessage. Populated
from ShipperGroup.ShipperStates() on each heartbeat. Scales to RF=3+.

V1 constraints (explicit):
- NeedsRebuild cleared only by control-plane reassignment (no local exit)
- Post-rebuild replica re-enters as Disconnected/bootstrap, not InSync
- flushedLSN = checkpointLSN after rebuild (durable baseline only)

4 new tests: heartbeat per-replica state, NeedsRebuild reporting,
rebuild-complete-reenters-InSync (full cycle), epoch mismatch abort.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 6 days ago
parent
commit
3e9358f2be
  1. 10
      weed/storage/blockvol/block_heartbeat.go
  2. 10
      weed/storage/blockvol/block_heartbeat_proto_test.go
  3. 9
      weed/storage/blockvol/blockvol.go
  4. 223
      weed/storage/blockvol/rebuild_v1_test.go
  5. 24
      weed/storage/blockvol/shipper_group.go

10
weed/storage/blockvol/block_heartbeat.go

@ -24,8 +24,9 @@ type BlockVolumeInfoMessage struct {
LastScrubTime int64 // CP8-2: unix seconds LastScrubTime int64 // CP8-2: unix seconds
ReplicaDegraded bool // CP8-2: true if any replica shipper degraded ReplicaDegraded bool // CP8-2: true if any replica shipper degraded
DurabilityMode string // CP8-3-1: "best_effort", "sync_all", "sync_quorum" DurabilityMode string // CP8-3-1: "best_effort", "sync_all", "sync_quorum"
NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled
NQN string // NVMe subsystem NQN, empty if NVMe disabled
NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled
NQN string // NVMe subsystem NQN, empty if NVMe disabled
ReplicaShipperStates []ReplicaShipperStatus // CP13-7: per-replica state from primary's shipper group
} }
// BlockVolumeShortInfoMessage is used for delta heartbeats // BlockVolumeShortInfoMessage is used for delta heartbeats
@ -70,8 +71,9 @@ func ToBlockVolumeInfoMessage(path, diskType string, vol *BlockVol) BlockVolumeI
HealthScore: status.HealthScore, HealthScore: status.HealthScore,
ScrubErrors: hs.ScrubErrors, ScrubErrors: hs.ScrubErrors,
LastScrubTime: hs.LastScrubTime, LastScrubTime: hs.LastScrubTime,
ReplicaDegraded: status.ReplicaDegraded,
DurabilityMode: vol.DurabilityMode().String(),
ReplicaDegraded: status.ReplicaDegraded,
DurabilityMode: vol.DurabilityMode().String(),
ReplicaShipperStates: vol.ReplicaShipperStates(),
} }
} }

10
weed/storage/blockvol/block_heartbeat_proto_test.go

@ -19,7 +19,7 @@ func TestInfoMessageRoundTrip(t *testing.T) {
} }
pb := InfoMessageToProto(orig) pb := InfoMessageToProto(orig)
back := InfoMessageFromProto(pb) back := InfoMessageFromProto(pb)
if back != orig {
if !reflect.DeepEqual(back, orig) {
t.Fatalf("round-trip mismatch:\n got %+v\n want %+v", back, orig) t.Fatalf("round-trip mismatch:\n got %+v\n want %+v", back, orig)
} }
} }
@ -33,7 +33,7 @@ func TestShortInfoRoundTrip(t *testing.T) {
} }
pb := ShortInfoToProto(orig) pb := ShortInfoToProto(orig)
back := ShortInfoFromProto(pb) back := ShortInfoFromProto(pb)
if back != orig {
if !reflect.DeepEqual(back, orig) {
t.Fatalf("round-trip mismatch:\n got %+v\n want %+v", back, orig) t.Fatalf("round-trip mismatch:\n got %+v\n want %+v", back, orig)
} }
} }
@ -63,7 +63,7 @@ func TestInfoMessagesSliceRoundTrip(t *testing.T) {
t.Fatalf("length mismatch: got %d, want %d", len(back), len(origSlice)) t.Fatalf("length mismatch: got %d, want %d", len(back), len(origSlice))
} }
for i := range origSlice { for i := range origSlice {
if back[i] != origSlice[i] {
if !reflect.DeepEqual(back[i], origSlice[i]) {
t.Fatalf("index %d mismatch:\n got %+v\n want %+v", i, back[i], origSlice[i]) t.Fatalf("index %d mismatch:\n got %+v\n want %+v", i, back[i], origSlice[i])
} }
} }
@ -102,7 +102,7 @@ func TestInfoMessageRoundTripWithReplicaAddrs(t *testing.T) {
} }
pb := InfoMessageToProto(orig) pb := InfoMessageToProto(orig)
back := InfoMessageFromProto(pb) back := InfoMessageFromProto(pb)
if back != orig {
if !reflect.DeepEqual(back, orig) {
t.Fatalf("round-trip mismatch:\n got %+v\n want %+v", back, orig) t.Fatalf("round-trip mismatch:\n got %+v\n want %+v", back, orig)
} }
} }
@ -188,7 +188,7 @@ func TestAssignmentsToProto(t *testing.T) {
func TestNilProtoConversions(t *testing.T) { func TestNilProtoConversions(t *testing.T) {
// Nil proto -> zero-value Go types. // Nil proto -> zero-value Go types.
info := InfoMessageFromProto(nil) info := InfoMessageFromProto(nil)
if info != (BlockVolumeInfoMessage{}) {
if !reflect.DeepEqual(info, BlockVolumeInfoMessage{}) {
t.Fatalf("nil info proto should yield zero value, got %+v", info) t.Fatalf("nil info proto should yield zero value, got %+v", info)
} }
short := ShortInfoFromProto(nil) short := ShortInfoFromProto(nil)

9
weed/storage/blockvol/blockvol.go

@ -855,6 +855,15 @@ func (v *BlockVol) SetReplicaAddrs(addrs []ReplicaAddr) {
go v.groupCommit.Run() go v.groupCommit.Run()
} }
// ReplicaShipperStates returns per-replica status from the primary's shipper group.
// Used by heartbeat to report which replicas need rebuild. Returns nil if no shippers.
func (v *BlockVol) ReplicaShipperStates() []ReplicaShipperStatus {
if v.shipperGroup == nil {
return nil
}
return v.shipperGroup.ShipperStates()
}
// StartReplicaReceiver starts listening for replicated WAL entries from a primary. // StartReplicaReceiver starts listening for replicated WAL entries from a primary.
func (v *BlockVol) StartReplicaReceiver(dataAddr, ctrlAddr string) error { func (v *BlockVol) StartReplicaReceiver(dataAddr, ctrlAddr string) error {
recv, err := NewReplicaReceiver(v, dataAddr, ctrlAddr) recv, err := NewReplicaReceiver(v, dataAddr, ctrlAddr)

223
weed/storage/blockvol/rebuild_v1_test.go

@ -0,0 +1,223 @@
package blockvol
// CP13-7: Rebuild fallback V1 tests.
import (
"path/filepath"
"testing"
"time"
)
// TestHeartbeat_ReportsPerReplicaState verifies that the heartbeat message
// carries per-replica shipper state, not just a boolean degraded flag.
func TestHeartbeat_ReportsPerReplicaState(t *testing.T) {
primary, replica := createSyncAllPair(t)
defer primary.Close()
defer replica.Close()
recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
recv.Serve()
defer recv.Stop()
primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr())
// Write + sync to bootstrap the shipper to InSync.
if err := primary.WriteLBA(0, makeBlock('A')); err != nil {
t.Fatal(err)
}
if err := primary.SyncCache(); err != nil {
t.Fatal(err)
}
// Check heartbeat includes per-replica states.
msg := ToBlockVolumeInfoMessage("/data/test.blk", "ssd", primary)
if len(msg.ReplicaShipperStates) == 0 {
t.Fatal("heartbeat should include ReplicaShipperStates")
}
rs := msg.ReplicaShipperStates[0]
if rs.State != "in_sync" {
t.Fatalf("expected replica state in_sync, got %q", rs.State)
}
if rs.DataAddr == "" {
t.Fatal("replica DataAddr should not be empty")
}
}
// TestHeartbeat_ReportsNeedsRebuild verifies that when a shipper is in
// NeedsRebuild state, the heartbeat reports it per-replica.
func TestHeartbeat_ReportsNeedsRebuild(t *testing.T) {
primary, replica := createSyncAllPair(t)
defer primary.Close()
defer replica.Close()
recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
recv.Serve()
defer recv.Stop()
primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr())
// Force shipper to NeedsRebuild.
shipper := primary.shipperGroup.Shipper(0)
shipper.state.Store(uint32(ReplicaNeedsRebuild))
msg := ToBlockVolumeInfoMessage("/data/test.blk", "ssd", primary)
if len(msg.ReplicaShipperStates) == 0 {
t.Fatal("heartbeat should include ReplicaShipperStates")
}
if msg.ReplicaShipperStates[0].State != "needs_rebuild" {
t.Fatalf("expected needs_rebuild, got %q", msg.ReplicaShipperStates[0].State)
}
}
// TestReplicaState_RebuildComplete_ReentersInSync verifies the full rebuild
// cycle: NeedsRebuild → StartRebuild → fresh shipper → bootstrap → InSync.
func TestReplicaState_RebuildComplete_ReentersInSync(t *testing.T) {
// Create primary + replica pair.
pDir := t.TempDir()
rDir := t.TempDir()
opts := CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
WALSize: 256 * 1024,
DurabilityMode: DurabilitySyncAll,
}
primary, err := CreateBlockVol(filepath.Join(pDir, "primary.blockvol"), opts)
if err != nil {
t.Fatal(err)
}
defer primary.Close()
primary.SetRole(RolePrimary)
primary.SetEpoch(1)
primary.SetMasterEpoch(1)
primary.lease.Grant(30 * time.Second)
replica, err := CreateBlockVol(filepath.Join(rDir, "replica.blockvol"), opts)
if err != nil {
t.Fatal(err)
}
defer replica.Close()
replica.SetRole(RoleReplica)
replica.SetEpoch(1)
replica.SetMasterEpoch(1)
// Phase 1: Set up replication and write data.
recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
recv.Serve()
defer recv.Stop()
primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr())
for i := uint64(0); i < 5; i++ {
if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil {
t.Fatal(err)
}
}
if err := primary.SyncCache(); err != nil {
t.Fatal(err)
}
// Phase 2: Start rebuild server on primary.
if err := primary.StartRebuildServer("127.0.0.1:0"); err != nil {
t.Fatal(err)
}
defer primary.StopRebuildServer()
rebuildAddr := primary.rebuildServer.Addr()
// Phase 3: Simulate VS restart on replica — close + reopen → RoleNone.
recv.Stop()
replica.Close()
replica, err = OpenBlockVol(filepath.Join(rDir, "replica.blockvol"))
if err != nil {
t.Fatalf("reopen replica: %v", err)
}
defer replica.Close() // close reopened volume
// Now RoleNone → assignment to Rebuilding is valid.
if err := HandleAssignment(replica, 1, RoleRebuilding, 0); err != nil {
t.Fatalf("HandleAssignment to Rebuilding: %v", err)
}
fromLSN := replica.Status().WALHeadLSN
if err := StartRebuild(replica, rebuildAddr, fromLSN, 1); err != nil {
t.Fatalf("StartRebuild: %v", err)
}
// Phase 4: After rebuild, replica should be RoleReplica.
if replica.Status().Role != RoleReplica {
t.Fatalf("expected RoleReplica after rebuild, got %s", replica.Status().Role)
}
// Phase 5: Simulate master sending fresh Primary assignment with rebuilt replica.
// This creates a new shipper (Disconnected) replacing the old NeedsRebuild one.
recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
recv2.Serve()
defer recv2.Stop()
primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr())
// Phase 6: Write + SyncCache — fresh shipper should bootstrap to InSync.
if err := primary.WriteLBA(10, makeBlock('Z')); err != nil {
t.Fatal(err)
}
if err := primary.SyncCache(); err != nil {
t.Fatalf("SyncCache after rebuild: %v", err)
}
shipper := primary.shipperGroup.Shipper(0)
if shipper.State() != ReplicaInSync {
t.Fatalf("expected InSync after rebuild + barrier, got %s", shipper.State())
}
}
// TestRebuild_AbortOnEpochChange verifies that StartRebuild fails when
// the rebuild server detects an epoch mismatch.
func TestRebuild_AbortOnEpochChange(t *testing.T) {
pDir := t.TempDir()
rDir := t.TempDir()
opts := CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
WALSize: 256 * 1024,
}
primary, err := CreateBlockVol(filepath.Join(pDir, "primary.blockvol"), opts)
if err != nil {
t.Fatal(err)
}
defer primary.Close()
primary.SetRole(RolePrimary)
primary.SetEpoch(2) // epoch 2
primary.SetMasterEpoch(2)
primary.lease.Grant(30 * time.Second)
replica, err := CreateBlockVol(filepath.Join(rDir, "replica.blockvol"), opts)
if err != nil {
t.Fatal(err)
}
defer replica.Close()
replica.SetEpoch(1) // stale epoch
replica.SetMasterEpoch(1)
replica.SetRole(RoleRebuilding)
if err := primary.StartRebuildServer("127.0.0.1:0"); err != nil {
t.Fatal(err)
}
defer primary.StopRebuildServer()
// Rebuild with epoch mismatch should fail.
err = StartRebuild(replica, primary.rebuildServer.Addr(), 0, 1) // epoch=1 vs primary epoch=2
if err == nil {
t.Fatal("StartRebuild should fail on epoch mismatch")
}
}
// createSyncAllPair is defined in sync_all_bug_test.go.
// makeBlock is defined in blockvol_test.go.

24
weed/storage/blockvol/shipper_group.go

@ -6,6 +6,14 @@ import (
"time" "time"
) )
// ReplicaShipperStatus reports per-replica state from the primary's shipper group.
// Used in heartbeat so master can identify which replica needs rebuild.
type ReplicaShipperStatus struct {
DataAddr string // replica identity
State string // "disconnected", "in_sync", "degraded", "needs_rebuild", etc.
FlushedLSN uint64 // last known durable progress
}
// ShipperGroup wraps multiple WALShippers for fan-out replication to N replicas. // ShipperGroup wraps multiple WALShippers for fan-out replication to N replicas.
// Len()==0 means standalone (no replicas), Len()==1 is RF=2, Len()==2 is RF=3. // Len()==0 means standalone (no replicas), Len()==1 is RF=2, Len()==2 is RF=3.
type ShipperGroup struct { type ShipperGroup struct {
@ -180,6 +188,22 @@ func (sg *ShipperGroup) EvaluateRetentionBudgets(timeout time.Duration) {
} }
} }
// ShipperStates returns per-replica status for heartbeat reporting.
// Master uses this to identify which replicas need rebuild.
func (sg *ShipperGroup) ShipperStates() []ReplicaShipperStatus {
sg.mu.RLock()
defer sg.mu.RUnlock()
states := make([]ReplicaShipperStatus, len(sg.shippers))
for i, s := range sg.shippers {
states[i] = ReplicaShipperStatus{
DataAddr: s.dataAddr,
State: s.State().String(),
FlushedLSN: s.ReplicaFlushedLSN(),
}
}
return states
}
// InSyncCount returns the number of shippers in ReplicaInSync state. // InSyncCount returns the number of shippers in ReplicaInSync state.
func (sg *ShipperGroup) InSyncCount() int { func (sg *ShipperGroup) InSyncCount() int {
sg.mu.RLock() sg.mu.RLock()

Loading…
Cancel
Save