diff --git a/weed/storage/blockvol/block_heartbeat.go b/weed/storage/blockvol/block_heartbeat.go index 7bf21a781..4e788b689 100644 --- a/weed/storage/blockvol/block_heartbeat.go +++ b/weed/storage/blockvol/block_heartbeat.go @@ -24,8 +24,9 @@ type BlockVolumeInfoMessage struct { LastScrubTime int64 // CP8-2: unix seconds ReplicaDegraded bool // CP8-2: true if any replica shipper degraded DurabilityMode string // CP8-3-1: "best_effort", "sync_all", "sync_quorum" - NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled - NQN string // NVMe subsystem NQN, empty if NVMe disabled + NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled + NQN string // NVMe subsystem NQN, empty if NVMe disabled + ReplicaShipperStates []ReplicaShipperStatus // CP13-7: per-replica state from primary's shipper group } // BlockVolumeShortInfoMessage is used for delta heartbeats @@ -70,8 +71,9 @@ func ToBlockVolumeInfoMessage(path, diskType string, vol *BlockVol) BlockVolumeI HealthScore: status.HealthScore, ScrubErrors: hs.ScrubErrors, LastScrubTime: hs.LastScrubTime, - ReplicaDegraded: status.ReplicaDegraded, - DurabilityMode: vol.DurabilityMode().String(), + ReplicaDegraded: status.ReplicaDegraded, + DurabilityMode: vol.DurabilityMode().String(), + ReplicaShipperStates: vol.ReplicaShipperStates(), } } diff --git a/weed/storage/blockvol/block_heartbeat_proto_test.go b/weed/storage/blockvol/block_heartbeat_proto_test.go index 5e6cd842b..591ebc8fd 100644 --- a/weed/storage/blockvol/block_heartbeat_proto_test.go +++ b/weed/storage/blockvol/block_heartbeat_proto_test.go @@ -19,7 +19,7 @@ func TestInfoMessageRoundTrip(t *testing.T) { } pb := InfoMessageToProto(orig) back := InfoMessageFromProto(pb) - if back != orig { + if !reflect.DeepEqual(back, orig) { t.Fatalf("round-trip mismatch:\n got %+v\n want %+v", back, orig) } } @@ -33,7 +33,7 @@ func TestShortInfoRoundTrip(t *testing.T) { } pb := ShortInfoToProto(orig) back := ShortInfoFromProto(pb) - if back != orig { + if !reflect.DeepEqual(back, orig) { t.Fatalf("round-trip mismatch:\n got %+v\n want %+v", back, orig) } } @@ -63,7 +63,7 @@ func TestInfoMessagesSliceRoundTrip(t *testing.T) { t.Fatalf("length mismatch: got %d, want %d", len(back), len(origSlice)) } for i := range origSlice { - if back[i] != origSlice[i] { + if !reflect.DeepEqual(back[i], origSlice[i]) { t.Fatalf("index %d mismatch:\n got %+v\n want %+v", i, back[i], origSlice[i]) } } @@ -102,7 +102,7 @@ func TestInfoMessageRoundTripWithReplicaAddrs(t *testing.T) { } pb := InfoMessageToProto(orig) back := InfoMessageFromProto(pb) - if back != orig { + if !reflect.DeepEqual(back, orig) { t.Fatalf("round-trip mismatch:\n got %+v\n want %+v", back, orig) } } @@ -188,7 +188,7 @@ func TestAssignmentsToProto(t *testing.T) { func TestNilProtoConversions(t *testing.T) { // Nil proto -> zero-value Go types. info := InfoMessageFromProto(nil) - if info != (BlockVolumeInfoMessage{}) { + if !reflect.DeepEqual(info, BlockVolumeInfoMessage{}) { t.Fatalf("nil info proto should yield zero value, got %+v", info) } short := ShortInfoFromProto(nil) diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index 0683169f8..db4297fd2 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -855,6 +855,15 @@ func (v *BlockVol) SetReplicaAddrs(addrs []ReplicaAddr) { go v.groupCommit.Run() } +// ReplicaShipperStates returns per-replica status from the primary's shipper group. +// Used by heartbeat to report which replicas need rebuild. Returns nil if no shippers. +func (v *BlockVol) ReplicaShipperStates() []ReplicaShipperStatus { + if v.shipperGroup == nil { + return nil + } + return v.shipperGroup.ShipperStates() +} + // StartReplicaReceiver starts listening for replicated WAL entries from a primary. func (v *BlockVol) StartReplicaReceiver(dataAddr, ctrlAddr string) error { recv, err := NewReplicaReceiver(v, dataAddr, ctrlAddr) diff --git a/weed/storage/blockvol/rebuild_v1_test.go b/weed/storage/blockvol/rebuild_v1_test.go new file mode 100644 index 000000000..e0f62862c --- /dev/null +++ b/weed/storage/blockvol/rebuild_v1_test.go @@ -0,0 +1,223 @@ +package blockvol + +// CP13-7: Rebuild fallback V1 tests. + +import ( + "path/filepath" + "testing" + "time" +) + +// TestHeartbeat_ReportsPerReplicaState verifies that the heartbeat message +// carries per-replica shipper state, not just a boolean degraded flag. +func TestHeartbeat_ReportsPerReplicaState(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write + sync to bootstrap the shipper to InSync. + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatal(err) + } + if err := primary.SyncCache(); err != nil { + t.Fatal(err) + } + + // Check heartbeat includes per-replica states. + msg := ToBlockVolumeInfoMessage("/data/test.blk", "ssd", primary) + if len(msg.ReplicaShipperStates) == 0 { + t.Fatal("heartbeat should include ReplicaShipperStates") + } + rs := msg.ReplicaShipperStates[0] + if rs.State != "in_sync" { + t.Fatalf("expected replica state in_sync, got %q", rs.State) + } + if rs.DataAddr == "" { + t.Fatal("replica DataAddr should not be empty") + } +} + +// TestHeartbeat_ReportsNeedsRebuild verifies that when a shipper is in +// NeedsRebuild state, the heartbeat reports it per-replica. +func TestHeartbeat_ReportsNeedsRebuild(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Force shipper to NeedsRebuild. + shipper := primary.shipperGroup.Shipper(0) + shipper.state.Store(uint32(ReplicaNeedsRebuild)) + + msg := ToBlockVolumeInfoMessage("/data/test.blk", "ssd", primary) + if len(msg.ReplicaShipperStates) == 0 { + t.Fatal("heartbeat should include ReplicaShipperStates") + } + if msg.ReplicaShipperStates[0].State != "needs_rebuild" { + t.Fatalf("expected needs_rebuild, got %q", msg.ReplicaShipperStates[0].State) + } +} + +// TestReplicaState_RebuildComplete_ReentersInSync verifies the full rebuild +// cycle: NeedsRebuild → StartRebuild → fresh shipper → bootstrap → InSync. +func TestReplicaState_RebuildComplete_ReentersInSync(t *testing.T) { + // Create primary + replica pair. + pDir := t.TempDir() + rDir := t.TempDir() + opts := CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 256 * 1024, + DurabilityMode: DurabilitySyncAll, + } + primary, err := CreateBlockVol(filepath.Join(pDir, "primary.blockvol"), opts) + if err != nil { + t.Fatal(err) + } + defer primary.Close() + primary.SetRole(RolePrimary) + primary.SetEpoch(1) + primary.SetMasterEpoch(1) + primary.lease.Grant(30 * time.Second) + + replica, err := CreateBlockVol(filepath.Join(rDir, "replica.blockvol"), opts) + if err != nil { + t.Fatal(err) + } + defer replica.Close() + replica.SetRole(RoleReplica) + replica.SetEpoch(1) + replica.SetMasterEpoch(1) + + // Phase 1: Set up replication and write data. + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + for i := uint64(0); i < 5; i++ { + if err := primary.WriteLBA(i, makeBlock(byte('A'+i))); err != nil { + t.Fatal(err) + } + } + if err := primary.SyncCache(); err != nil { + t.Fatal(err) + } + + // Phase 2: Start rebuild server on primary. + if err := primary.StartRebuildServer("127.0.0.1:0"); err != nil { + t.Fatal(err) + } + defer primary.StopRebuildServer() + rebuildAddr := primary.rebuildServer.Addr() + + // Phase 3: Simulate VS restart on replica — close + reopen → RoleNone. + recv.Stop() + replica.Close() + replica, err = OpenBlockVol(filepath.Join(rDir, "replica.blockvol")) + if err != nil { + t.Fatalf("reopen replica: %v", err) + } + defer replica.Close() // close reopened volume + // Now RoleNone → assignment to Rebuilding is valid. + if err := HandleAssignment(replica, 1, RoleRebuilding, 0); err != nil { + t.Fatalf("HandleAssignment to Rebuilding: %v", err) + } + fromLSN := replica.Status().WALHeadLSN + if err := StartRebuild(replica, rebuildAddr, fromLSN, 1); err != nil { + t.Fatalf("StartRebuild: %v", err) + } + + // Phase 4: After rebuild, replica should be RoleReplica. + if replica.Status().Role != RoleReplica { + t.Fatalf("expected RoleReplica after rebuild, got %s", replica.Status().Role) + } + + // Phase 5: Simulate master sending fresh Primary assignment with rebuilt replica. + // This creates a new shipper (Disconnected) replacing the old NeedsRebuild one. + recv2, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv2.Serve() + defer recv2.Stop() + primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr()) + + // Phase 6: Write + SyncCache — fresh shipper should bootstrap to InSync. + if err := primary.WriteLBA(10, makeBlock('Z')); err != nil { + t.Fatal(err) + } + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache after rebuild: %v", err) + } + + shipper := primary.shipperGroup.Shipper(0) + if shipper.State() != ReplicaInSync { + t.Fatalf("expected InSync after rebuild + barrier, got %s", shipper.State()) + } +} + +// TestRebuild_AbortOnEpochChange verifies that StartRebuild fails when +// the rebuild server detects an epoch mismatch. +func TestRebuild_AbortOnEpochChange(t *testing.T) { + pDir := t.TempDir() + rDir := t.TempDir() + opts := CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 256 * 1024, + } + primary, err := CreateBlockVol(filepath.Join(pDir, "primary.blockvol"), opts) + if err != nil { + t.Fatal(err) + } + defer primary.Close() + primary.SetRole(RolePrimary) + primary.SetEpoch(2) // epoch 2 + primary.SetMasterEpoch(2) + primary.lease.Grant(30 * time.Second) + + replica, err := CreateBlockVol(filepath.Join(rDir, "replica.blockvol"), opts) + if err != nil { + t.Fatal(err) + } + defer replica.Close() + replica.SetEpoch(1) // stale epoch + replica.SetMasterEpoch(1) + replica.SetRole(RoleRebuilding) + + if err := primary.StartRebuildServer("127.0.0.1:0"); err != nil { + t.Fatal(err) + } + defer primary.StopRebuildServer() + + // Rebuild with epoch mismatch should fail. + err = StartRebuild(replica, primary.rebuildServer.Addr(), 0, 1) // epoch=1 vs primary epoch=2 + if err == nil { + t.Fatal("StartRebuild should fail on epoch mismatch") + } +} + +// createSyncAllPair is defined in sync_all_bug_test.go. +// makeBlock is defined in blockvol_test.go. diff --git a/weed/storage/blockvol/shipper_group.go b/weed/storage/blockvol/shipper_group.go index 07ddd8695..96db846d0 100644 --- a/weed/storage/blockvol/shipper_group.go +++ b/weed/storage/blockvol/shipper_group.go @@ -6,6 +6,14 @@ import ( "time" ) +// ReplicaShipperStatus reports per-replica state from the primary's shipper group. +// Used in heartbeat so master can identify which replica needs rebuild. +type ReplicaShipperStatus struct { + DataAddr string // replica identity + State string // "disconnected", "in_sync", "degraded", "needs_rebuild", etc. + FlushedLSN uint64 // last known durable progress +} + // ShipperGroup wraps multiple WALShippers for fan-out replication to N replicas. // Len()==0 means standalone (no replicas), Len()==1 is RF=2, Len()==2 is RF=3. type ShipperGroup struct { @@ -180,6 +188,22 @@ func (sg *ShipperGroup) EvaluateRetentionBudgets(timeout time.Duration) { } } +// ShipperStates returns per-replica status for heartbeat reporting. +// Master uses this to identify which replicas need rebuild. +func (sg *ShipperGroup) ShipperStates() []ReplicaShipperStatus { + sg.mu.RLock() + defer sg.mu.RUnlock() + states := make([]ReplicaShipperStatus, len(sg.shippers)) + for i, s := range sg.shippers { + states[i] = ReplicaShipperStatus{ + DataAddr: s.dataAddr, + State: s.State().String(), + FlushedLSN: s.ReplicaFlushedLSN(), + } + } + return states +} + // InSyncCount returns the number of shippers in ReplicaInSync state. func (sg *ShipperGroup) InSyncCount() int { sg.mu.RLock()