You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
787 lines
24 KiB
787 lines
24 KiB
package weed_server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
|
|
)
|
|
|
|
// ============================================================
|
|
// QA helpers
|
|
// ============================================================
|
|
|
|
// testMSForQA creates a MasterServer with full failover support for adversarial tests.
|
|
func testMSForQA(t *testing.T) *MasterServer {
|
|
t.Helper()
|
|
ms := &MasterServer{
|
|
blockRegistry: NewBlockVolumeRegistry(),
|
|
blockAssignmentQueue: NewBlockAssignmentQueue(),
|
|
blockFailover: newBlockFailoverState(),
|
|
}
|
|
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
|
|
return &blockAllocResult{
|
|
Path: fmt.Sprintf("/data/%s.blk", name),
|
|
IQN: fmt.Sprintf("iqn.2024.test:%s", name),
|
|
ISCSIAddr: server + ":3260",
|
|
}, nil
|
|
}
|
|
ms.blockVSDelete = func(ctx context.Context, server string, name string) error {
|
|
return nil
|
|
}
|
|
return ms
|
|
}
|
|
|
|
// registerQAVolume creates a volume entry with optional replica, configurable lease state.
|
|
func registerQAVolume(t *testing.T, ms *MasterServer, name, primary, replica string, epoch uint64, leaseTTL time.Duration, leaseExpired bool) {
|
|
t.Helper()
|
|
entry := &BlockVolumeEntry{
|
|
Name: name,
|
|
VolumeServer: primary,
|
|
Path: fmt.Sprintf("/data/%s.blk", name),
|
|
IQN: fmt.Sprintf("iqn.2024.test:%s", name),
|
|
ISCSIAddr: primary + ":3260",
|
|
SizeBytes: 1 << 30,
|
|
Epoch: epoch,
|
|
Role: blockvol.RoleToWire(blockvol.RolePrimary),
|
|
Status: StatusActive,
|
|
LeaseTTL: leaseTTL,
|
|
}
|
|
if leaseExpired {
|
|
entry.LastLeaseGrant = time.Now().Add(-2 * leaseTTL)
|
|
} else {
|
|
entry.LastLeaseGrant = time.Now()
|
|
}
|
|
if replica != "" {
|
|
entry.ReplicaServer = replica
|
|
entry.ReplicaPath = fmt.Sprintf("/data/%s.blk", name)
|
|
entry.ReplicaIQN = fmt.Sprintf("iqn.2024.test:%s-r", name)
|
|
entry.ReplicaISCSIAddr = replica + ":3260"
|
|
// CP8-2: also populate Replicas[].
|
|
entry.Replicas = []ReplicaInfo{
|
|
{
|
|
Server: replica,
|
|
Path: fmt.Sprintf("/data/%s.blk", name),
|
|
IQN: fmt.Sprintf("iqn.2024.test:%s-r", name),
|
|
ISCSIAddr: replica + ":3260",
|
|
HealthScore: 1.0,
|
|
},
|
|
}
|
|
}
|
|
if err := ms.blockRegistry.Register(entry); err != nil {
|
|
t.Fatalf("register %s: %v", name, err)
|
|
}
|
|
}
|
|
|
|
// ============================================================
|
|
// A. Assignment Queue Adversarial
|
|
// ============================================================
|
|
|
|
func TestQA_Queue_ConfirmWrongEpoch(t *testing.T) {
|
|
q := NewBlockAssignmentQueue()
|
|
q.Enqueue("s1", mkAssign("/a.blk", 5, 1))
|
|
|
|
// Confirm with wrong epoch should NOT remove.
|
|
q.Confirm("s1", "/a.blk", 4)
|
|
if q.Pending("s1") != 1 {
|
|
t.Fatal("wrong-epoch confirm should not remove")
|
|
}
|
|
q.Confirm("s1", "/a.blk", 6)
|
|
if q.Pending("s1") != 1 {
|
|
t.Fatal("higher-epoch confirm should not remove")
|
|
}
|
|
// Correct epoch should remove.
|
|
q.Confirm("s1", "/a.blk", 5)
|
|
if q.Pending("s1") != 0 {
|
|
t.Fatal("exact-epoch confirm should remove")
|
|
}
|
|
}
|
|
|
|
func TestQA_Queue_HeartbeatPartialConfirm(t *testing.T) {
|
|
q := NewBlockAssignmentQueue()
|
|
q.Enqueue("s1", mkAssign("/a.blk", 5, 1))
|
|
q.Enqueue("s1", mkAssign("/b.blk", 3, 2))
|
|
|
|
// Heartbeat confirms only /a.blk@5, not /b.blk.
|
|
q.ConfirmFromHeartbeat("s1", []blockvol.BlockVolumeInfoMessage{
|
|
{Path: "/a.blk", Epoch: 5},
|
|
{Path: "/c.blk", Epoch: 99}, // unknown path, no effect
|
|
})
|
|
if q.Pending("s1") != 1 {
|
|
t.Fatalf("expected 1 remaining, got %d", q.Pending("s1"))
|
|
}
|
|
got := q.Peek("s1")
|
|
if got[0].Path != "/b.blk" {
|
|
t.Fatalf("wrong remaining: %v", got)
|
|
}
|
|
}
|
|
|
|
func TestQA_Queue_HeartbeatWrongEpochNoConfirm(t *testing.T) {
|
|
q := NewBlockAssignmentQueue()
|
|
q.Enqueue("s1", mkAssign("/a.blk", 5, 1))
|
|
|
|
// Heartbeat with same path but different epoch: should NOT confirm.
|
|
q.ConfirmFromHeartbeat("s1", []blockvol.BlockVolumeInfoMessage{
|
|
{Path: "/a.blk", Epoch: 4},
|
|
})
|
|
if q.Pending("s1") != 1 {
|
|
t.Fatal("wrong-epoch heartbeat should not confirm")
|
|
}
|
|
}
|
|
|
|
func TestQA_Queue_SamePathSameEpochDifferentRoles(t *testing.T) {
|
|
q := NewBlockAssignmentQueue()
|
|
// Edge case: same path+epoch but different roles (shouldn't happen in practice).
|
|
q.Enqueue("s1", blockvol.BlockVolumeAssignment{Path: "/a.blk", Epoch: 1, Role: blockvol.RoleToWire(blockvol.RolePrimary)})
|
|
q.Enqueue("s1", blockvol.BlockVolumeAssignment{Path: "/a.blk", Epoch: 1, Role: blockvol.RoleToWire(blockvol.RoleReplica)})
|
|
|
|
// Peek should NOT prune either (same epoch).
|
|
got := q.Peek("s1")
|
|
if len(got) != 2 {
|
|
t.Fatalf("expected 2 (same epoch, different roles), got %d", len(got))
|
|
}
|
|
}
|
|
|
|
func TestQA_Queue_ConfirmOnUnknownServer(t *testing.T) {
|
|
q := NewBlockAssignmentQueue()
|
|
// Confirm on a server with no queue should not panic.
|
|
q.Confirm("unknown", "/a.blk", 1)
|
|
q.ConfirmFromHeartbeat("unknown", []blockvol.BlockVolumeInfoMessage{{Path: "/a.blk", Epoch: 1}})
|
|
}
|
|
|
|
func TestQA_Queue_PeekReturnsCopy(t *testing.T) {
|
|
q := NewBlockAssignmentQueue()
|
|
q.Enqueue("s1", mkAssign("/a.blk", 1, 1))
|
|
|
|
got := q.Peek("s1")
|
|
// Mutate the returned copy.
|
|
got[0].Path = "/MUTATED"
|
|
|
|
// Original should be unchanged.
|
|
got2 := q.Peek("s1")
|
|
if got2[0].Path == "/MUTATED" {
|
|
t.Fatal("Peek should return a copy, not a reference to internal state")
|
|
}
|
|
}
|
|
|
|
func TestQA_Queue_ConcurrentEnqueueConfirmPeek(t *testing.T) {
|
|
q := NewBlockAssignmentQueue()
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 50; i++ {
|
|
wg.Add(3)
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
q.Enqueue("s1", mkAssign(fmt.Sprintf("/v%d.blk", i), uint64(i+1), 1))
|
|
}(i)
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
q.Confirm("s1", fmt.Sprintf("/v%d.blk", i), uint64(i+1))
|
|
}(i)
|
|
go func() {
|
|
defer wg.Done()
|
|
q.Peek("s1")
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
// No panics, no races.
|
|
}
|
|
|
|
// ============================================================
|
|
// B. Registry Adversarial
|
|
// ============================================================
|
|
|
|
func TestQA_Reg_DoubleSwap(t *testing.T) {
|
|
r := NewBlockVolumeRegistry()
|
|
r.Register(&BlockVolumeEntry{
|
|
Name: "vol1", VolumeServer: "vs1", Path: "/data/vol1.blk",
|
|
IQN: "iqn:vol1", ISCSIAddr: "vs1:3260", SizeBytes: 1 << 30,
|
|
Epoch: 1, Role: blockvol.RoleToWire(blockvol.RolePrimary),
|
|
ReplicaServer: "vs2", ReplicaPath: "/data/vol1.blk",
|
|
ReplicaIQN: "iqn:vol1-r", ReplicaISCSIAddr: "vs2:3260",
|
|
})
|
|
|
|
// First swap: vs1->vs2, epoch 2.
|
|
ep1, err := r.SwapPrimaryReplica("vol1")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if ep1 != 2 {
|
|
t.Fatalf("first swap epoch: got %d, want 2", ep1)
|
|
}
|
|
|
|
e, _ := r.Lookup("vol1")
|
|
if e.VolumeServer != "vs2" || e.ReplicaServer != "vs1" {
|
|
t.Fatalf("after first swap: primary=%s replica=%s", e.VolumeServer, e.ReplicaServer)
|
|
}
|
|
|
|
// Second swap: vs2->vs1, epoch 3.
|
|
ep2, err := r.SwapPrimaryReplica("vol1")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if ep2 != 3 {
|
|
t.Fatalf("second swap epoch: got %d, want 3", ep2)
|
|
}
|
|
|
|
e, _ = r.Lookup("vol1")
|
|
if e.VolumeServer != "vs1" || e.ReplicaServer != "vs2" {
|
|
t.Fatalf("after double swap: primary=%s replica=%s (should be back to original)", e.VolumeServer, e.ReplicaServer)
|
|
}
|
|
}
|
|
|
|
func TestQA_Reg_SwapNoReplica(t *testing.T) {
|
|
r := NewBlockVolumeRegistry()
|
|
r.Register(&BlockVolumeEntry{
|
|
Name: "vol1", VolumeServer: "vs1", Path: "/data/vol1.blk",
|
|
Epoch: 1, Role: blockvol.RoleToWire(blockvol.RolePrimary),
|
|
})
|
|
|
|
_, err := r.SwapPrimaryReplica("vol1")
|
|
if err == nil {
|
|
t.Fatal("swap with no replica should error")
|
|
}
|
|
}
|
|
|
|
func TestQA_Reg_SwapNotFound(t *testing.T) {
|
|
r := NewBlockVolumeRegistry()
|
|
_, err := r.SwapPrimaryReplica("nonexistent")
|
|
if err == nil {
|
|
t.Fatal("swap nonexistent should error")
|
|
}
|
|
}
|
|
|
|
func TestQA_Reg_ConcurrentSwapAndLookup(t *testing.T) {
|
|
r := NewBlockVolumeRegistry()
|
|
r.Register(&BlockVolumeEntry{
|
|
Name: "vol1", VolumeServer: "vs1", Path: "/data/vol1.blk",
|
|
IQN: "iqn:vol1", ISCSIAddr: "vs1:3260", Epoch: 1,
|
|
Role: blockvol.RoleToWire(blockvol.RolePrimary),
|
|
ReplicaServer: "vs2", ReplicaPath: "/data/vol1.blk",
|
|
ReplicaIQN: "iqn:vol1-r", ReplicaISCSIAddr: "vs2:3260",
|
|
})
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 50; i++ {
|
|
wg.Add(2)
|
|
go func() {
|
|
defer wg.Done()
|
|
r.SwapPrimaryReplica("vol1")
|
|
}()
|
|
go func() {
|
|
defer wg.Done()
|
|
r.Lookup("vol1")
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
// No panics or races.
|
|
}
|
|
|
|
func TestQA_Reg_SetReplicaTwice_ReplacesOld(t *testing.T) {
|
|
r := NewBlockVolumeRegistry()
|
|
r.Register(&BlockVolumeEntry{
|
|
Name: "vol1", VolumeServer: "vs1", Path: "/data/vol1.blk",
|
|
Epoch: 1, Role: blockvol.RoleToWire(blockvol.RolePrimary),
|
|
})
|
|
|
|
// Set replica to vs2.
|
|
r.SetReplica("vol1", "vs2", "/data/vol1.blk", "vs2:3260", "iqn:vol1-r")
|
|
// Replace with vs3.
|
|
r.SetReplica("vol1", "vs3", "/data/vol1.blk", "vs3:3260", "iqn:vol1-r2")
|
|
|
|
e, _ := r.Lookup("vol1")
|
|
if e.ReplicaServer != "vs3" {
|
|
t.Fatalf("replica should be vs3, got %s", e.ReplicaServer)
|
|
}
|
|
|
|
// vs3 should be in byServer index.
|
|
entries := r.ListByServer("vs3")
|
|
if len(entries) != 1 {
|
|
t.Fatalf("vs3 should have 1 entry, got %d", len(entries))
|
|
}
|
|
|
|
// BUG CHECK: vs2 should be removed from byServer when replaced.
|
|
// SetReplica doesn't remove the old replica server from byServer.
|
|
entries2 := r.ListByServer("vs2")
|
|
if len(entries2) != 0 {
|
|
t.Fatalf("BUG: vs2 still in byServer after replica replaced (got %d entries)", len(entries2))
|
|
}
|
|
}
|
|
|
|
func TestQA_Reg_FullHeartbeatDoesNotClobberReplicaServer(t *testing.T) {
|
|
r := NewBlockVolumeRegistry()
|
|
r.Register(&BlockVolumeEntry{
|
|
Name: "vol1", VolumeServer: "vs1", Path: "/data/vol1.blk",
|
|
Epoch: 1, Role: blockvol.RoleToWire(blockvol.RolePrimary),
|
|
Status: StatusPending,
|
|
ReplicaServer: "vs2", ReplicaPath: "/data/vol1.blk",
|
|
})
|
|
|
|
// Full heartbeat from vs1 — should NOT clear replica info.
|
|
r.UpdateFullHeartbeat("vs1", []*master_pb.BlockVolumeInfoMessage{
|
|
{Path: "/data/vol1.blk", Epoch: 1, Role: blockvol.RoleToWire(blockvol.RolePrimary), VolumeSize: 1 << 30},
|
|
})
|
|
|
|
e, _ := r.Lookup("vol1")
|
|
if e.ReplicaServer != "vs2" {
|
|
t.Fatalf("full heartbeat clobbered ReplicaServer: got %q, want vs2", e.ReplicaServer)
|
|
}
|
|
}
|
|
|
|
func TestQA_Reg_ListByServerIncludesBothPrimaryAndReplica(t *testing.T) {
|
|
r := NewBlockVolumeRegistry()
|
|
r.Register(&BlockVolumeEntry{
|
|
Name: "vol1", VolumeServer: "vs1", Path: "/data/vol1.blk",
|
|
Epoch: 1, Role: blockvol.RoleToWire(blockvol.RolePrimary),
|
|
})
|
|
r.SetReplica("vol1", "vs2", "/data/vol1.blk", "", "")
|
|
|
|
// ListByServer should return vol1 for BOTH vs1 and vs2.
|
|
for _, server := range []string{"vs1", "vs2"} {
|
|
entries := r.ListByServer(server)
|
|
if len(entries) != 1 || entries[0].Name != "vol1" {
|
|
t.Fatalf("ListByServer(%q) should return vol1, got %d entries", server, len(entries))
|
|
}
|
|
}
|
|
}
|
|
|
|
// ============================================================
|
|
// C. Failover Adversarial
|
|
// ============================================================
|
|
|
|
func TestQA_Failover_DeferredCancelledOnReconnect(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
registerQAVolume(t, ms, "vol1", "vs1", "vs2", 1, 500*time.Millisecond, false) // lease NOT expired
|
|
|
|
// Disconnect vs1 — deferred promotion scheduled.
|
|
ms.failoverBlockVolumes("vs1")
|
|
|
|
// vs1 should still be primary (lease not expired).
|
|
e, _ := ms.blockRegistry.Lookup("vol1")
|
|
if e.VolumeServer != "vs1" {
|
|
t.Fatalf("premature promotion: primary=%s", e.VolumeServer)
|
|
}
|
|
|
|
// vs1 reconnects before timer fires.
|
|
ms.recoverBlockVolumes("vs1")
|
|
|
|
// Wait well past the original lease expiry.
|
|
time.Sleep(800 * time.Millisecond)
|
|
|
|
// Promotion should NOT have happened (timer was cancelled).
|
|
e, _ = ms.blockRegistry.Lookup("vol1")
|
|
if e.VolumeServer != "vs1" {
|
|
t.Fatalf("BUG: promotion happened after reconnect (primary=%s, want vs1)", e.VolumeServer)
|
|
}
|
|
}
|
|
|
|
func TestQA_Failover_DoubleDisconnect_NoPanic(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
registerQAVolume(t, ms, "vol1", "vs1", "vs2", 1, 5*time.Second, true)
|
|
|
|
ms.failoverBlockVolumes("vs1")
|
|
// Second failover for same server after promotion — should not panic.
|
|
ms.failoverBlockVolumes("vs1")
|
|
}
|
|
|
|
func TestQA_Failover_PromoteIdempotent_NoReplicaAfterFirstSwap(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
registerQAVolume(t, ms, "vol1", "vs1", "vs2", 1, 5*time.Second, true)
|
|
|
|
ms.failoverBlockVolumes("vs1") // promotes vs2
|
|
|
|
// CP8-2: PromoteBestReplica does NOT add old primary back as replica.
|
|
// Reconnect vs1 first so it becomes a replica.
|
|
ms.recoverBlockVolumes("vs1")
|
|
|
|
e, _ := ms.blockRegistry.Lookup("vol1")
|
|
e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) // expire the new lease
|
|
ms.failoverBlockVolumes("vs2")
|
|
|
|
e, _ = ms.blockRegistry.Lookup("vol1")
|
|
// After double failover: should swap back to vs1 as primary.
|
|
if e.VolumeServer != "vs1" {
|
|
t.Fatalf("double failover: primary=%s, want vs1", e.VolumeServer)
|
|
}
|
|
if e.Epoch != 3 {
|
|
t.Fatalf("double failover: epoch=%d, want 3", e.Epoch)
|
|
}
|
|
}
|
|
|
|
func TestQA_Failover_MixedLeaseStates(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
// vol1: lease expired (immediate promotion).
|
|
registerQAVolume(t, ms, "vol1", "vs1", "vs2", 1, 5*time.Second, true)
|
|
// vol2: lease NOT expired (deferred).
|
|
registerQAVolume(t, ms, "vol2", "vs1", "vs3", 2, 500*time.Millisecond, false)
|
|
|
|
ms.failoverBlockVolumes("vs1")
|
|
|
|
// vol1: immediately promoted.
|
|
e1, _ := ms.blockRegistry.Lookup("vol1")
|
|
if e1.VolumeServer != "vs2" {
|
|
t.Fatalf("vol1: expected immediate promotion, got primary=%s", e1.VolumeServer)
|
|
}
|
|
|
|
// vol2: NOT yet promoted.
|
|
e2, _ := ms.blockRegistry.Lookup("vol2")
|
|
if e2.VolumeServer != "vs1" {
|
|
t.Fatalf("vol2: premature promotion, got primary=%s", e2.VolumeServer)
|
|
}
|
|
|
|
// Wait for vol2's deferred timer.
|
|
time.Sleep(700 * time.Millisecond)
|
|
e2, _ = ms.blockRegistry.Lookup("vol2")
|
|
if e2.VolumeServer != "vs3" {
|
|
t.Fatalf("vol2: deferred promotion failed, got primary=%s", e2.VolumeServer)
|
|
}
|
|
}
|
|
|
|
func TestQA_Failover_NoRegistryNoPanic(t *testing.T) {
|
|
ms := &MasterServer{} // no registry
|
|
ms.failoverBlockVolumes("vs1")
|
|
// Should not panic.
|
|
}
|
|
|
|
func TestQA_Failover_VolumeDeletedDuringDeferredTimer(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
registerQAVolume(t, ms, "vol1", "vs1", "vs2", 1, 200*time.Millisecond, false)
|
|
|
|
ms.failoverBlockVolumes("vs1")
|
|
|
|
// Delete the volume while timer is pending.
|
|
ms.blockRegistry.Unregister("vol1")
|
|
|
|
// Wait for timer to fire.
|
|
time.Sleep(400 * time.Millisecond)
|
|
|
|
// promoteReplica should gracefully handle missing volume (no panic).
|
|
_, ok := ms.blockRegistry.Lookup("vol1")
|
|
if ok {
|
|
t.Fatal("volume should have been deleted")
|
|
}
|
|
}
|
|
|
|
func TestQA_Failover_ConcurrentFailoverDifferentServers(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
// vol1: primary=vs1, replica=vs2
|
|
registerQAVolume(t, ms, "vol1", "vs1", "vs2", 1, 5*time.Second, true)
|
|
// vol2: primary=vs3, replica=vs4
|
|
registerQAVolume(t, ms, "vol2", "vs3", "vs4", 1, 5*time.Second, true)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
go func() { defer wg.Done(); ms.failoverBlockVolumes("vs1") }()
|
|
go func() { defer wg.Done(); ms.failoverBlockVolumes("vs3") }()
|
|
wg.Wait()
|
|
|
|
e1, _ := ms.blockRegistry.Lookup("vol1")
|
|
if e1.VolumeServer != "vs2" {
|
|
t.Fatalf("vol1: primary=%s, want vs2", e1.VolumeServer)
|
|
}
|
|
e2, _ := ms.blockRegistry.Lookup("vol2")
|
|
if e2.VolumeServer != "vs4" {
|
|
t.Fatalf("vol2: primary=%s, want vs4", e2.VolumeServer)
|
|
}
|
|
}
|
|
|
|
// ============================================================
|
|
// D. CreateBlockVolume + Failover Adversarial
|
|
// ============================================================
|
|
|
|
func TestQA_Create_LeaseNonZero_ImmediateFailoverSafe(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
ms.blockFailover = newBlockFailoverState()
|
|
ms.blockRegistry.MarkBlockCapable("vs1")
|
|
ms.blockRegistry.MarkBlockCapable("vs2")
|
|
|
|
// Create volume.
|
|
resp, err := ms.CreateBlockVolume(context.Background(), &master_pb.CreateBlockVolumeRequest{
|
|
Name: "vol1", SizeBytes: 1 << 30,
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Immediately failover the primary.
|
|
entry, _ := ms.blockRegistry.Lookup("vol1")
|
|
if entry.LastLeaseGrant.IsZero() {
|
|
t.Fatal("BUG: LastLeaseGrant is zero after Create (F1 regression)")
|
|
}
|
|
|
|
// Verify that lease is recent (within last second).
|
|
if time.Since(entry.LastLeaseGrant) > 1*time.Second {
|
|
t.Fatalf("LastLeaseGrant too old: %v", entry.LastLeaseGrant)
|
|
}
|
|
|
|
_ = resp
|
|
}
|
|
|
|
func TestQA_Create_ReplicaDeleteOnVolDelete(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
ms.blockFailover = newBlockFailoverState()
|
|
ms.blockRegistry.MarkBlockCapable("vs1")
|
|
ms.blockRegistry.MarkBlockCapable("vs2")
|
|
|
|
var deleteCalls sync.Map // server -> count
|
|
|
|
ms.blockVSDelete = func(ctx context.Context, server string, name string) error {
|
|
v, _ := deleteCalls.LoadOrStore(server, new(atomic.Int32))
|
|
v.(*atomic.Int32).Add(1)
|
|
return nil
|
|
}
|
|
|
|
ms.CreateBlockVolume(context.Background(), &master_pb.CreateBlockVolumeRequest{
|
|
Name: "vol1", SizeBytes: 1 << 30,
|
|
})
|
|
|
|
entry, _ := ms.blockRegistry.Lookup("vol1")
|
|
hasReplica := entry.ReplicaServer != ""
|
|
|
|
// Delete volume.
|
|
ms.DeleteBlockVolume(context.Background(), &master_pb.DeleteBlockVolumeRequest{Name: "vol1"})
|
|
|
|
// Verify primary delete was called.
|
|
v, ok := deleteCalls.Load(entry.VolumeServer)
|
|
if !ok || v.(*atomic.Int32).Load() != 1 {
|
|
t.Fatal("primary delete not called")
|
|
}
|
|
|
|
// If replica existed, verify replica delete was also called (F4 regression).
|
|
if hasReplica {
|
|
v, ok := deleteCalls.Load(entry.ReplicaServer)
|
|
if !ok || v.(*atomic.Int32).Load() != 1 {
|
|
t.Fatal("BUG: replica delete not called (F4 regression)")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestQA_Create_ReplicaDeleteFailure_PrimaryStillDeleted(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
ms.blockFailover = newBlockFailoverState()
|
|
ms.blockRegistry.MarkBlockCapable("vs1")
|
|
ms.blockRegistry.MarkBlockCapable("vs2")
|
|
|
|
ms.CreateBlockVolume(context.Background(), &master_pb.CreateBlockVolumeRequest{
|
|
Name: "vol1", SizeBytes: 1 << 30,
|
|
})
|
|
|
|
// Find the replica server and make its delete fail.
|
|
entry, _ := ms.blockRegistry.Lookup("vol1")
|
|
replicaServer := entry.ReplicaServer
|
|
ms.blockVSDelete = func(ctx context.Context, server string, name string) error {
|
|
if server == replicaServer {
|
|
return fmt.Errorf("replica down")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Delete should succeed even if replica delete fails (best-effort).
|
|
_, err := ms.DeleteBlockVolume(context.Background(), &master_pb.DeleteBlockVolumeRequest{Name: "vol1"})
|
|
if err != nil {
|
|
t.Fatalf("delete should succeed despite replica failure: %v", err)
|
|
}
|
|
|
|
// Volume should be unregistered.
|
|
_, ok := ms.blockRegistry.Lookup("vol1")
|
|
if ok {
|
|
t.Fatal("volume should be unregistered after delete")
|
|
}
|
|
}
|
|
|
|
// ============================================================
|
|
// E. Rebuild Adversarial
|
|
// ============================================================
|
|
|
|
func TestQA_Rebuild_DoubleReconnect_NoDuplicateAssignments(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
registerQAVolume(t, ms, "vol1", "vs1", "vs2", 1, 5*time.Second, true)
|
|
|
|
ms.failoverBlockVolumes("vs1")
|
|
|
|
// First reconnect.
|
|
ms.recoverBlockVolumes("vs1")
|
|
pending1 := ms.blockAssignmentQueue.Pending("vs1")
|
|
|
|
// Second reconnect — should NOT add duplicate rebuild assignments.
|
|
ms.recoverBlockVolumes("vs1")
|
|
pending2 := ms.blockAssignmentQueue.Pending("vs1")
|
|
|
|
if pending2 != pending1 {
|
|
t.Fatalf("double reconnect added duplicate assignments: %d -> %d", pending1, pending2)
|
|
}
|
|
}
|
|
|
|
func TestQA_Rebuild_RecoverNilFailoverState(t *testing.T) {
|
|
ms := &MasterServer{
|
|
blockRegistry: NewBlockVolumeRegistry(),
|
|
blockAssignmentQueue: NewBlockAssignmentQueue(),
|
|
blockFailover: nil, // nil
|
|
}
|
|
// Should not panic.
|
|
ms.recoverBlockVolumes("vs1")
|
|
ms.drainPendingRebuilds("vs1")
|
|
ms.recordPendingRebuild("vs1", pendingRebuild{})
|
|
}
|
|
|
|
func TestQA_Rebuild_FullCycle_CreateFailoverRecoverRebuild(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
ms.blockRegistry.MarkBlockCapable("vs1")
|
|
ms.blockRegistry.MarkBlockCapable("vs2")
|
|
|
|
// Create volume.
|
|
resp, err := ms.CreateBlockVolume(context.Background(), &master_pb.CreateBlockVolumeRequest{
|
|
Name: "vol1", SizeBytes: 1 << 30,
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
primary := resp.VolumeServer
|
|
replica := resp.ReplicaServer
|
|
if replica == "" {
|
|
t.Skip("no replica created (single server)")
|
|
}
|
|
|
|
// Expire lease.
|
|
entry, _ := ms.blockRegistry.Lookup("vol1")
|
|
entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
|
|
|
|
// Primary disconnects.
|
|
ms.failoverBlockVolumes(primary)
|
|
|
|
// Verify promotion.
|
|
entry, _ = ms.blockRegistry.Lookup("vol1")
|
|
if entry.VolumeServer != replica {
|
|
t.Fatalf("expected promotion to %s, got %s", replica, entry.VolumeServer)
|
|
}
|
|
if entry.Epoch != 2 {
|
|
t.Fatalf("expected epoch 2, got %d", entry.Epoch)
|
|
}
|
|
|
|
// Old primary reconnects.
|
|
ms.recoverBlockVolumes(primary)
|
|
|
|
// Verify rebuild assignment for old primary.
|
|
assignments := ms.blockAssignmentQueue.Peek(primary)
|
|
foundRebuild := false
|
|
for _, a := range assignments {
|
|
if blockvol.RoleFromWire(a.Role) == blockvol.RoleRebuilding {
|
|
foundRebuild = true
|
|
if a.Epoch != entry.Epoch {
|
|
t.Fatalf("rebuild epoch: got %d, want %d", a.Epoch, entry.Epoch)
|
|
}
|
|
}
|
|
}
|
|
if !foundRebuild {
|
|
t.Fatal("no rebuild assignment found for reconnected server")
|
|
}
|
|
|
|
// Verify registry: old primary is now the replica.
|
|
entry, _ = ms.blockRegistry.Lookup("vol1")
|
|
if entry.ReplicaServer != primary {
|
|
t.Fatalf("old primary should be replica, got %s", entry.ReplicaServer)
|
|
}
|
|
}
|
|
|
|
// ============================================================
|
|
// F. Queue + Failover Integration
|
|
// ============================================================
|
|
|
|
func TestQA_FailoverEnqueuesNewPrimaryAssignment(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
registerQAVolume(t, ms, "vol1", "vs1", "vs2", 5, 5*time.Second, true)
|
|
|
|
ms.failoverBlockVolumes("vs1")
|
|
|
|
// vs2 (new primary) should have an assignment with epoch=6, role=Primary.
|
|
assignments := ms.blockAssignmentQueue.Peek("vs2")
|
|
found := false
|
|
for _, a := range assignments {
|
|
if a.Epoch == 6 && blockvol.RoleFromWire(a.Role) == blockvol.RolePrimary {
|
|
found = true
|
|
if a.LeaseTtlMs == 0 {
|
|
t.Fatal("assignment should have non-zero LeaseTtlMs")
|
|
}
|
|
}
|
|
}
|
|
if !found {
|
|
t.Fatalf("expected Primary assignment with epoch=6 for vs2, got: %+v", assignments)
|
|
}
|
|
}
|
|
|
|
func TestQA_HeartbeatConfirmsFailoverAssignment(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
registerQAVolume(t, ms, "vol1", "vs1", "vs2", 1, 5*time.Second, true)
|
|
|
|
ms.failoverBlockVolumes("vs1")
|
|
|
|
// Simulate vs2 heartbeat confirming the promotion.
|
|
entry, _ := ms.blockRegistry.Lookup("vol1")
|
|
ms.blockAssignmentQueue.ConfirmFromHeartbeat("vs2", []blockvol.BlockVolumeInfoMessage{
|
|
{Path: entry.Path, Epoch: entry.Epoch},
|
|
})
|
|
|
|
if ms.blockAssignmentQueue.Pending("vs2") != 0 {
|
|
t.Fatal("heartbeat should have confirmed the failover assignment")
|
|
}
|
|
}
|
|
|
|
// ============================================================
|
|
// G. Edge Cases
|
|
// ============================================================
|
|
|
|
func TestQA_SwapEpochMonotonicallyIncreasing(t *testing.T) {
|
|
r := NewBlockVolumeRegistry()
|
|
r.Register(&BlockVolumeEntry{
|
|
Name: "vol1", VolumeServer: "vs1", Path: "/p1", IQN: "iqn1", ISCSIAddr: "vs1:3260",
|
|
Epoch: 100, Role: blockvol.RoleToWire(blockvol.RolePrimary),
|
|
ReplicaServer: "vs2", ReplicaPath: "/p2", ReplicaIQN: "iqn2", ReplicaISCSIAddr: "vs2:3260",
|
|
})
|
|
|
|
var prevEpoch uint64 = 100
|
|
for i := 0; i < 10; i++ {
|
|
ep, err := r.SwapPrimaryReplica("vol1")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if ep <= prevEpoch {
|
|
t.Fatalf("swap %d: epoch %d not > previous %d", i, ep, prevEpoch)
|
|
}
|
|
prevEpoch = ep
|
|
}
|
|
}
|
|
|
|
func TestQA_CancelDeferredTimers_NoPendingRebuilds(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
// Cancel with no timers — should not panic.
|
|
ms.cancelDeferredTimers("vs1")
|
|
}
|
|
|
|
func TestQA_Failover_ReplicaServerDies_PrimaryUntouched(t *testing.T) {
|
|
ms := testMSForQA(t)
|
|
registerQAVolume(t, ms, "vol1", "vs1", "vs2", 1, 5*time.Second, true)
|
|
|
|
// vs2 is the REPLICA, not primary. Failover should not promote.
|
|
ms.failoverBlockVolumes("vs2")
|
|
|
|
e, _ := ms.blockRegistry.Lookup("vol1")
|
|
if e.VolumeServer != "vs1" {
|
|
t.Fatalf("primary should remain vs1, got %s", e.VolumeServer)
|
|
}
|
|
if e.Epoch != 1 {
|
|
t.Fatalf("epoch should remain 1, got %d", e.Epoch)
|
|
}
|
|
}
|
|
|
|
func TestQA_Queue_EnqueueBatchEmpty(t *testing.T) {
|
|
q := NewBlockAssignmentQueue()
|
|
q.EnqueueBatch("s1", nil)
|
|
q.EnqueueBatch("s1", []blockvol.BlockVolumeAssignment{})
|
|
if q.Pending("s1") != 0 {
|
|
t.Fatal("empty batch should not add anything")
|
|
}
|
|
}
|