You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

748 lines
25 KiB

package weed_server
import (
"context"
"fmt"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
// ============================================================
// Integration Tests: Cross-component flows for CP6-3
//
// These tests simulate the full lifecycle spanning multiple
// components (master registry, assignment queue, failover state,
// CSI publish) without real gRPC or iSCSI infrastructure.
// ============================================================
// integrationMaster creates a MasterServer wired with registry, queue, and
// failover state, plus two block-capable servers with deterministic mock
// allocate/delete callbacks. Suitable for end-to-end control-plane tests.
func integrationMaster(t *testing.T) *MasterServer {
t.Helper()
ms := &MasterServer{
blockRegistry: NewBlockVolumeRegistry(),
blockAssignmentQueue: NewBlockAssignmentQueue(),
blockFailover: newBlockFailoverState(),
}
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
return &blockAllocResult{
Path: fmt.Sprintf("/data/%s.blk", name),
IQN: fmt.Sprintf("iqn.2024.test:%s", name),
ISCSIAddr: server + ":3260",
ReplicaDataAddr: server + ":14260",
ReplicaCtrlAddr: server + ":14261",
RebuildListenAddr: server + ":15000",
}, nil
}
ms.blockVSDelete = func(ctx context.Context, server string, name string) error {
return nil
}
ms.blockRegistry.MarkBlockCapable("vs1:9333")
ms.blockRegistry.MarkBlockCapable("vs2:9333")
return ms
}
// ============================================================
// Required #1: Failover + CSI Publish
//
// Goal: after primary dies, replica is promoted and
// LookupBlockVolume (used by ControllerPublishVolume) returns
// the new iSCSI address.
// ============================================================
func TestIntegration_FailoverCSIPublish(t *testing.T) {
ms := integrationMaster(t)
ctx := context.Background()
// Step 1: Create replicated volume.
createResp, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "pvc-data-1",
SizeBytes: 1 << 30,
})
if err != nil {
t.Fatalf("CreateBlockVolume: %v", err)
}
if createResp.ReplicaServer == "" {
t.Fatal("expected replica server")
}
primaryVS := createResp.VolumeServer
replicaVS := createResp.ReplicaServer
// Step 2: Verify initial CSI publish returns primary's address.
lookupResp, err := ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "pvc-data-1"})
if err != nil {
t.Fatalf("initial Lookup: %v", err)
}
if lookupResp.IscsiAddr != primaryVS+":3260" {
t.Fatalf("initial publish should return primary iSCSI addr %q, got %q",
primaryVS+":3260", lookupResp.IscsiAddr)
}
// Step 3: Expire lease so failover is immediate.
entry, _ := ms.blockRegistry.Lookup("pvc-data-1")
entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
// Step 4: Primary VS dies — triggers failover.
ms.failoverBlockVolumes(primaryVS)
// Step 5: Verify registry swap.
entry, _ = ms.blockRegistry.Lookup("pvc-data-1")
if entry.VolumeServer != replicaVS {
t.Fatalf("after failover: primary should be %q, got %q", replicaVS, entry.VolumeServer)
}
if entry.Epoch != 2 {
t.Fatalf("epoch should be bumped to 2, got %d", entry.Epoch)
}
// Step 6: CSI ControllerPublishVolume (simulated via Lookup) returns NEW address.
lookupResp, err = ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "pvc-data-1"})
if err != nil {
t.Fatalf("post-failover Lookup: %v", err)
}
if lookupResp.IscsiAddr == primaryVS+":3260" {
t.Fatalf("post-failover publish should NOT return dead primary's addr %q", lookupResp.IscsiAddr)
}
if lookupResp.IscsiAddr != replicaVS+":3260" {
t.Fatalf("post-failover publish should return promoted replica's addr %q, got %q",
replicaVS+":3260", lookupResp.IscsiAddr)
}
// Step 7: Verify new primary assignment was enqueued for the promoted server.
assignments := ms.blockAssignmentQueue.Peek(replicaVS)
foundPrimary := false
for _, a := range assignments {
if blockvol.RoleFromWire(a.Role) == blockvol.RolePrimary && a.Epoch == 2 {
foundPrimary = true
}
}
if !foundPrimary {
t.Fatal("new primary assignment (epoch=2) should be queued for promoted server")
}
}
// ============================================================
// Required #2: Rebuild on Recovery
//
// Goal: old primary comes back, gets Rebuilding assignment,
// and WAL catch-up + extent rebuild are wired correctly.
// ============================================================
func TestIntegration_RebuildOnRecovery(t *testing.T) {
ms := integrationMaster(t)
ctx := context.Background()
// Step 1: Create replicated volume.
createResp, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "pvc-db-1",
SizeBytes: 10 << 30,
})
if err != nil {
t.Fatalf("CreateBlockVolume: %v", err)
}
primaryVS := createResp.VolumeServer
replicaVS := createResp.ReplicaServer
// Step 2: Expire lease for immediate failover.
entry, _ := ms.blockRegistry.Lookup("pvc-db-1")
entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
// Step 3: Primary dies → replica promoted.
ms.failoverBlockVolumes(primaryVS)
entryAfterFailover, _ := ms.blockRegistry.Lookup("pvc-db-1")
if entryAfterFailover.VolumeServer != replicaVS {
t.Fatalf("failover: primary should be %q, got %q", replicaVS, entryAfterFailover.VolumeServer)
}
newEpoch := entryAfterFailover.Epoch
// Step 4: Verify pending rebuild recorded for dead primary.
ms.blockFailover.mu.Lock()
rebuilds := ms.blockFailover.pendingRebuilds[primaryVS]
ms.blockFailover.mu.Unlock()
if len(rebuilds) != 1 {
t.Fatalf("expected 1 pending rebuild for %s, got %d", primaryVS, len(rebuilds))
}
if rebuilds[0].VolumeName != "pvc-db-1" {
t.Fatalf("pending rebuild volume: got %q, want pvc-db-1", rebuilds[0].VolumeName)
}
// Step 5: Old primary reconnects.
ms.recoverBlockVolumes(primaryVS)
// Step 6: Pending rebuilds drained.
ms.blockFailover.mu.Lock()
remainingRebuilds := ms.blockFailover.pendingRebuilds[primaryVS]
ms.blockFailover.mu.Unlock()
if len(remainingRebuilds) != 0 {
t.Fatalf("pending rebuilds should be drained after recovery, got %d", len(remainingRebuilds))
}
// Step 7: Rebuilding assignment enqueued for old primary.
assignments := ms.blockAssignmentQueue.Peek(primaryVS)
var rebuildAssignment *blockvol.BlockVolumeAssignment
for i, a := range assignments {
if blockvol.RoleFromWire(a.Role) == blockvol.RoleRebuilding {
rebuildAssignment = &assignments[i]
break
}
}
if rebuildAssignment == nil {
t.Fatal("expected Rebuilding assignment for reconnected server")
}
if rebuildAssignment.Epoch != newEpoch {
t.Fatalf("rebuild epoch: got %d, want %d (matches promoted primary)", rebuildAssignment.Epoch, newEpoch)
}
if rebuildAssignment.RebuildAddr == "" {
// RebuildListenAddr is set on the entry by tryCreateReplica
t.Log("NOTE: RebuildAddr empty (allocate mock doesn't propagate to entry.RebuildListenAddr after swap)")
}
// Step 8: Registry shows old primary as new replica.
entry, _ = ms.blockRegistry.Lookup("pvc-db-1")
if entry.ReplicaServer != primaryVS {
t.Fatalf("after recovery: replica should be %q (old primary), got %q", primaryVS, entry.ReplicaServer)
}
// Step 9: Simulate VS heartbeat confirming rebuild complete.
// VS reports volume with matching epoch = rebuild confirmed.
ms.blockAssignmentQueue.ConfirmFromHeartbeat(primaryVS, []blockvol.BlockVolumeInfoMessage{
{
Path: rebuildAssignment.Path,
Epoch: rebuildAssignment.Epoch,
Role: blockvol.RoleToWire(blockvol.RoleReplica), // after rebuild → replica
},
})
if ms.blockAssignmentQueue.Pending(primaryVS) != 0 {
t.Fatalf("rebuild assignment should be confirmed by heartbeat, got %d pending",
ms.blockAssignmentQueue.Pending(primaryVS))
}
}
// ============================================================
// Required #3: Assignment Delivery + Confirmation Loop
//
// Goal: assignment queue is drained only after heartbeat
// confirms — assignments remain pending until VS reports
// matching (path, epoch).
// ============================================================
func TestIntegration_AssignmentDeliveryConfirmation(t *testing.T) {
ms := integrationMaster(t)
ctx := context.Background()
// Step 1: Create replicated volume → assignments enqueued.
resp, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "pvc-logs-1",
SizeBytes: 5 << 30,
})
if err != nil {
t.Fatalf("CreateBlockVolume: %v", err)
}
primaryVS := resp.VolumeServer
replicaVS := resp.ReplicaServer
if replicaVS == "" {
t.Fatal("expected replica server")
}
// Step 2: Both servers have 1 pending assignment each.
if n := ms.blockAssignmentQueue.Pending(primaryVS); n != 1 {
t.Fatalf("primary pending: got %d, want 1", n)
}
if n := ms.blockAssignmentQueue.Pending(replicaVS); n != 1 {
t.Fatalf("replica pending: got %d, want 1", n)
}
// Step 3: Simulate heartbeat delivery — Peek returns pending assignments.
primaryAssignments := ms.blockAssignmentQueue.Peek(primaryVS)
if len(primaryAssignments) != 1 {
t.Fatalf("Peek primary: got %d, want 1", len(primaryAssignments))
}
if blockvol.RoleFromWire(primaryAssignments[0].Role) != blockvol.RolePrimary {
t.Fatalf("primary assignment role: got %d, want Primary", primaryAssignments[0].Role)
}
if primaryAssignments[0].Epoch != 1 {
t.Fatalf("primary assignment epoch: got %d, want 1", primaryAssignments[0].Epoch)
}
replicaAssignments := ms.blockAssignmentQueue.Peek(replicaVS)
if len(replicaAssignments) != 1 {
t.Fatalf("Peek replica: got %d, want 1", len(replicaAssignments))
}
if blockvol.RoleFromWire(replicaAssignments[0].Role) != blockvol.RoleReplica {
t.Fatalf("replica assignment role: got %d, want Replica", replicaAssignments[0].Role)
}
// Step 4: Peek again — assignments still pending (not consumed by Peek).
if n := ms.blockAssignmentQueue.Pending(primaryVS); n != 1 {
t.Fatalf("after Peek, primary still pending: got %d, want 1", n)
}
// Step 5: Simulate heartbeat from PRIMARY with wrong epoch — no confirmation.
ms.blockAssignmentQueue.ConfirmFromHeartbeat(primaryVS, []blockvol.BlockVolumeInfoMessage{
{
Path: primaryAssignments[0].Path,
Epoch: 999, // wrong epoch
},
})
if n := ms.blockAssignmentQueue.Pending(primaryVS); n != 1 {
t.Fatalf("wrong epoch should NOT confirm: primary pending %d, want 1", n)
}
// Step 6: Simulate heartbeat from PRIMARY with correct (path, epoch) — confirmed.
ms.blockAssignmentQueue.ConfirmFromHeartbeat(primaryVS, []blockvol.BlockVolumeInfoMessage{
{
Path: primaryAssignments[0].Path,
Epoch: primaryAssignments[0].Epoch,
},
})
if n := ms.blockAssignmentQueue.Pending(primaryVS); n != 0 {
t.Fatalf("correct heartbeat should confirm: primary pending %d, want 0", n)
}
// Step 7: Replica still pending (independent confirmation).
if n := ms.blockAssignmentQueue.Pending(replicaVS); n != 1 {
t.Fatalf("replica should still be pending: got %d, want 1", n)
}
// Step 8: Confirm replica.
ms.blockAssignmentQueue.ConfirmFromHeartbeat(replicaVS, []blockvol.BlockVolumeInfoMessage{
{
Path: replicaAssignments[0].Path,
Epoch: replicaAssignments[0].Epoch,
},
})
if n := ms.blockAssignmentQueue.Pending(replicaVS); n != 0 {
t.Fatalf("replica should be confirmed: got %d, want 0", n)
}
}
// ============================================================
// Nice-to-have #1: Lease-aware promotion timing
//
// Ensures promotion happens only after TTL expires.
// ============================================================
func TestIntegration_LeaseAwarePromotion(t *testing.T) {
ms := integrationMaster(t)
ctx := context.Background()
// Create with replica.
resp, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "pvc-lease-1",
SizeBytes: 1 << 30,
})
if err != nil {
t.Fatalf("create: %v", err)
}
primaryVS := resp.VolumeServer
// Set a short but non-zero lease TTL (lease just granted → not yet expired).
entry, _ := ms.blockRegistry.Lookup("pvc-lease-1")
entry.LeaseTTL = 300 * time.Millisecond
entry.LastLeaseGrant = time.Now()
// Primary dies.
ms.failoverBlockVolumes(primaryVS)
// Immediately: primary should NOT be swapped (lease still valid).
e, _ := ms.blockRegistry.Lookup("pvc-lease-1")
if e.VolumeServer != primaryVS {
t.Fatalf("should NOT promote before lease expires, got primary=%q", e.VolumeServer)
}
// Wait for lease to expire + timer to fire.
time.Sleep(500 * time.Millisecond)
// Now promotion should have happened.
e, _ = ms.blockRegistry.Lookup("pvc-lease-1")
if e.VolumeServer == primaryVS {
t.Fatalf("should promote after lease expires, still %q", e.VolumeServer)
}
if e.Epoch != 2 {
t.Fatalf("epoch should be 2 after deferred promotion, got %d", e.Epoch)
}
}
// ============================================================
// Nice-to-have #2: Replica create failure → single-copy mode
//
// Primary alone works; no replica assignments sent.
// ============================================================
func TestIntegration_ReplicaFailureSingleCopy(t *testing.T) {
ms := integrationMaster(t)
ctx := context.Background()
// Make replica allocation always fail.
callCount := 0
origAllocate := ms.blockVSAllocate
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
callCount++
if callCount > 1 {
// Second call (replica) fails.
return nil, fmt.Errorf("disk full on replica")
}
return origAllocate(ctx, server, name, sizeBytes, diskType, durabilityMode)
}
resp, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "pvc-single-1",
SizeBytes: 1 << 30,
})
if err != nil {
t.Fatalf("should succeed in single-copy mode: %v", err)
}
if resp.ReplicaServer != "" {
t.Fatalf("should have no replica, got %q", resp.ReplicaServer)
}
primaryVS := resp.VolumeServer
// Only primary assignment should be enqueued.
if n := ms.blockAssignmentQueue.Pending(primaryVS); n != 1 {
t.Fatalf("primary pending: got %d, want 1", n)
}
// Check there's only a Primary assignment (no Replica assignment anywhere).
assignments := ms.blockAssignmentQueue.Peek(primaryVS)
for _, a := range assignments {
if blockvol.RoleFromWire(a.Role) == blockvol.RoleReplica {
t.Fatal("should not have Replica assignment in single-copy mode")
}
}
// No failover possible without replica.
entry, _ := ms.blockRegistry.Lookup("pvc-single-1")
entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
ms.failoverBlockVolumes(primaryVS)
e, _ := ms.blockRegistry.Lookup("pvc-single-1")
if e.VolumeServer != primaryVS {
t.Fatalf("single-copy volume should not failover, got %q", e.VolumeServer)
}
}
// ============================================================
// Nice-to-have #3: Lease-deferred timer cancelled on reconnect
//
// VS reconnects during lease window → no promotion (no split-brain).
// ============================================================
func TestIntegration_TransientDisconnectNoSplitBrain(t *testing.T) {
ms := integrationMaster(t)
ctx := context.Background()
resp, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "pvc-transient-1",
SizeBytes: 1 << 30,
})
if err != nil {
t.Fatalf("create: %v", err)
}
primaryVS := resp.VolumeServer
replicaVS := resp.ReplicaServer
// Set lease with long TTL (not expired).
entry, _ := ms.blockRegistry.Lookup("pvc-transient-1")
entry.LeaseTTL = 1 * time.Second
entry.LastLeaseGrant = time.Now()
// Primary disconnects → deferred promotion timer set.
ms.failoverBlockVolumes(primaryVS)
// Primary should NOT be swapped yet.
e, _ := ms.blockRegistry.Lookup("pvc-transient-1")
if e.VolumeServer != primaryVS {
t.Fatal("should not promote during lease window")
}
// VS reconnects (before lease expires) → deferred timers cancelled.
ms.recoverBlockVolumes(primaryVS)
// Wait well past the original lease TTL.
time.Sleep(1500 * time.Millisecond)
// Primary should STILL be the same (timer was cancelled).
e, _ = ms.blockRegistry.Lookup("pvc-transient-1")
if e.VolumeServer != primaryVS {
t.Fatalf("reconnected primary should remain primary, got %q", e.VolumeServer)
}
// No failover happened, so no pending rebuilds.
ms.blockFailover.mu.Lock()
rebuilds := ms.blockFailover.pendingRebuilds[primaryVS]
ms.blockFailover.mu.Unlock()
if len(rebuilds) != 0 {
t.Fatalf("no pending rebuilds for reconnected server, got %d", len(rebuilds))
}
// CSI publish should still return original primary.
lookupResp, err := ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "pvc-transient-1"})
if err != nil {
t.Fatalf("Lookup after reconnect: %v", err)
}
if lookupResp.IscsiAddr != primaryVS+":3260" {
t.Fatalf("iSCSI addr should be original primary %q, got %q",
primaryVS+":3260", lookupResp.IscsiAddr)
}
_ = replicaVS // used implicitly via CreateBlockVolume
}
// ============================================================
// Full lifecycle: Create → Publish → Failover → Re-publish →
// Recover → Rebuild confirm → Verify registry health
// ============================================================
func TestIntegration_FullLifecycle(t *testing.T) {
ms := integrationMaster(t)
ctx := context.Background()
// --- Phase 1: Create ---
resp, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "pvc-lifecycle-1",
SizeBytes: 1 << 30,
})
if err != nil {
t.Fatalf("create: %v", err)
}
primaryVS := resp.VolumeServer
replicaVS := resp.ReplicaServer
if replicaVS == "" {
t.Fatal("expected replica")
}
// --- Phase 2: Initial publish ---
lookupResp, err := ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "pvc-lifecycle-1"})
if err != nil {
t.Fatalf("initial lookup: %v", err)
}
initialAddr := lookupResp.IscsiAddr
// --- Phase 3: Confirm initial assignments ---
entry, _ := ms.blockRegistry.Lookup("pvc-lifecycle-1")
ms.blockAssignmentQueue.ConfirmFromHeartbeat(primaryVS, []blockvol.BlockVolumeInfoMessage{
{Path: entry.Path, Epoch: 1},
})
ms.blockAssignmentQueue.ConfirmFromHeartbeat(replicaVS, []blockvol.BlockVolumeInfoMessage{
{Path: entry.ReplicaPath, Epoch: 1},
})
if ms.blockAssignmentQueue.Pending(primaryVS) != 0 || ms.blockAssignmentQueue.Pending(replicaVS) != 0 {
t.Fatal("assignments should be confirmed")
}
// --- Phase 4: Expire lease + kill primary ---
entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
ms.failoverBlockVolumes(primaryVS)
// --- Phase 5: Verify failover ---
entry, _ = ms.blockRegistry.Lookup("pvc-lifecycle-1")
if entry.VolumeServer != replicaVS {
t.Fatalf("after failover: primary should be %q", replicaVS)
}
if entry.Epoch != 2 {
t.Fatalf("epoch should be 2, got %d", entry.Epoch)
}
// --- Phase 6: Re-publish → new address ---
lookupResp, err = ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "pvc-lifecycle-1"})
if err != nil {
t.Fatalf("post-failover lookup: %v", err)
}
if lookupResp.IscsiAddr == initialAddr {
t.Fatal("post-failover addr should differ from initial")
}
// --- Phase 7: Confirm failover assignment for new primary ---
ms.blockAssignmentQueue.ConfirmFromHeartbeat(replicaVS, []blockvol.BlockVolumeInfoMessage{
{Path: entry.Path, Epoch: 2},
})
// --- Phase 8: Old primary reconnects → rebuild ---
ms.recoverBlockVolumes(primaryVS)
rebuildAssignments := ms.blockAssignmentQueue.Peek(primaryVS)
var rebuildPath string
var rebuildEpoch uint64
for _, a := range rebuildAssignments {
if blockvol.RoleFromWire(a.Role) == blockvol.RoleRebuilding {
rebuildPath = a.Path
rebuildEpoch = a.Epoch
}
}
if rebuildPath == "" {
t.Fatal("expected rebuild assignment")
}
// --- Phase 9: Old primary confirms rebuild via heartbeat ---
ms.blockAssignmentQueue.ConfirmFromHeartbeat(primaryVS, []blockvol.BlockVolumeInfoMessage{
{Path: rebuildPath, Epoch: rebuildEpoch, Role: blockvol.RoleToWire(blockvol.RoleReplica)},
})
if ms.blockAssignmentQueue.Pending(primaryVS) != 0 {
t.Fatalf("rebuild should be confirmed, got %d pending", ms.blockAssignmentQueue.Pending(primaryVS))
}
// --- Phase 10: Final registry state ---
final, _ := ms.blockRegistry.Lookup("pvc-lifecycle-1")
if final.VolumeServer != replicaVS {
t.Fatalf("final primary: got %q, want %q", final.VolumeServer, replicaVS)
}
if final.ReplicaServer != primaryVS {
t.Fatalf("final replica: got %q, want %q", final.ReplicaServer, primaryVS)
}
if final.Epoch != 2 {
t.Fatalf("final epoch: got %d, want 2", final.Epoch)
}
// --- Phase 11: Delete ---
_, err = ms.DeleteBlockVolume(ctx, &master_pb.DeleteBlockVolumeRequest{Name: "pvc-lifecycle-1"})
if err != nil {
t.Fatalf("delete: %v", err)
}
if _, ok := ms.blockRegistry.Lookup("pvc-lifecycle-1"); ok {
t.Fatal("volume should be deleted")
}
}
// ============================================================
// Double failover: primary dies, promoted replica dies, then
// the original server comes back — verify correct state.
// ============================================================
func TestIntegration_DoubleFailover(t *testing.T) {
ms := integrationMaster(t)
ctx := context.Background()
resp, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "pvc-double-1",
SizeBytes: 1 << 30,
})
if err != nil {
t.Fatalf("create: %v", err)
}
vs1 := resp.VolumeServer
vs2 := resp.ReplicaServer
// First failover: vs1 dies → vs2 promoted.
entry, _ := ms.blockRegistry.Lookup("pvc-double-1")
entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
ms.failoverBlockVolumes(vs1)
e1, _ := ms.blockRegistry.Lookup("pvc-double-1")
if e1.VolumeServer != vs2 {
t.Fatalf("first failover: primary should be %q, got %q", vs2, e1.VolumeServer)
}
if e1.Epoch != 2 {
t.Fatalf("first failover epoch: got %d, want 2", e1.Epoch)
}
// CP8-2: PromoteBestReplica does NOT add old primary back as replica.
// Reconnect vs1 first so it becomes a replica (via recoverBlockVolumes).
ms.recoverBlockVolumes(vs1)
// Simulate heartbeat from vs1 that restores iSCSI addr and health score
// (in production this happens when the VS re-registers after reconnect).
e1, _ = ms.blockRegistry.Lookup("pvc-double-1")
for i := range e1.Replicas {
if e1.Replicas[i].Server == vs1 {
e1.Replicas[i].ISCSIAddr = vs1 + ":3260"
e1.Replicas[i].HealthScore = 1.0
}
}
// Now vs1 is back as replica. Second failover: vs2 dies → vs1 promoted.
e1, _ = ms.blockRegistry.Lookup("pvc-double-1")
e1.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
ms.failoverBlockVolumes(vs2)
e2, _ := ms.blockRegistry.Lookup("pvc-double-1")
if e2.VolumeServer != vs1 {
t.Fatalf("second failover: primary should be %q, got %q", vs1, e2.VolumeServer)
}
if e2.Epoch != 3 {
t.Fatalf("second failover epoch: got %d, want 3", e2.Epoch)
}
// Verify CSI publish returns vs1.
lookupResp, err := ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "pvc-double-1"})
if err != nil {
t.Fatalf("lookup: %v", err)
}
if lookupResp.IscsiAddr != vs1+":3260" {
t.Fatalf("after double failover: iSCSI addr should be %q, got %q",
vs1+":3260", lookupResp.IscsiAddr)
}
}
// ============================================================
// Multiple volumes: failover + rebuild affects all volumes on
// the dead server, not just one.
// ============================================================
func TestIntegration_MultiVolumeFailoverRebuild(t *testing.T) {
ms := integrationMaster(t)
ctx := context.Background()
// Create 3 volumes — all will land on vs1+vs2.
for i := 1; i <= 3; i++ {
_, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: fmt.Sprintf("pvc-multi-%d", i),
SizeBytes: 1 << 30,
})
if err != nil {
t.Fatalf("create pvc-multi-%d: %v", i, err)
}
}
// Find which server is primary for each volume.
primaryCounts := map[string]int{}
for i := 1; i <= 3; i++ {
e, _ := ms.blockRegistry.Lookup(fmt.Sprintf("pvc-multi-%d", i))
primaryCounts[e.VolumeServer]++
// Expire lease.
e.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
}
// Kill the server with the most primaries.
deadServer := "vs1:9333"
if primaryCounts["vs2:9333"] > primaryCounts["vs1:9333"] {
deadServer = "vs2:9333"
}
otherServer := "vs2:9333"
if deadServer == "vs2:9333" {
otherServer = "vs1:9333"
}
ms.failoverBlockVolumes(deadServer)
// All volumes should now have the other server as primary.
for i := 1; i <= 3; i++ {
name := fmt.Sprintf("pvc-multi-%d", i)
e, _ := ms.blockRegistry.Lookup(name)
if e.VolumeServer == deadServer {
t.Fatalf("%s: primary should not be dead server %q", name, deadServer)
}
}
// Reconnect dead server → rebuild assignments.
ms.recoverBlockVolumes(deadServer)
rebuildCount := 0
for _, a := range ms.blockAssignmentQueue.Peek(deadServer) {
if blockvol.RoleFromWire(a.Role) == blockvol.RoleRebuilding {
rebuildCount++
}
}
_ = otherServer
// CP8-2: With 2 servers and 3 volumes, deadServer hosts all 3 volumes
// (as primary for some, replica for others). All need rebuild on reconnect.
if rebuildCount != 3 {
t.Fatalf("expected 3 rebuild assignments for %s (all volumes), got %d",
deadServer, rebuildCount)
}
}