You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1112 lines
32 KiB

package weed_server
import (
"fmt"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
func TestRegistry_RegisterLookup(t *testing.T) {
r := NewBlockVolumeRegistry()
entry := &BlockVolumeEntry{
Name: "vol1",
VolumeServer: "server1:9333",
Path: "/data/vol1.blk",
IQN: "iqn.2024.com.seaweedfs:vol1",
ISCSIAddr: "10.0.0.1:3260",
SizeBytes: 1 << 30,
Epoch: 1,
Role: 1,
Status: StatusPending,
}
if err := r.Register(entry); err != nil {
t.Fatalf("Register: %v", err)
}
got, ok := r.Lookup("vol1")
if !ok {
t.Fatal("Lookup: not found")
}
if got.Name != "vol1" || got.VolumeServer != "server1:9333" || got.Path != "/data/vol1.blk" {
t.Fatalf("Lookup: unexpected entry: %+v", got)
}
if got.Status != StatusPending {
t.Fatalf("Status: got %d, want %d", got.Status, StatusPending)
}
}
func TestRegistry_Unregister(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/vol1.blk"})
removed := r.Unregister("vol1")
if removed == nil {
t.Fatal("Unregister returned nil")
}
if _, ok := r.Lookup("vol1"); ok {
t.Fatal("vol1 should not be found after Unregister")
}
// Double unregister returns nil.
if r.Unregister("vol1") != nil {
t.Fatal("double Unregister should return nil")
}
}
func TestRegistry_DuplicateRegister(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/vol1.blk"})
err := r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s2", Path: "/vol1.blk"})
if err == nil {
t.Fatal("duplicate Register should return error")
}
}
func TestRegistry_ListByServer(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/v1.blk"})
r.Register(&BlockVolumeEntry{Name: "vol2", VolumeServer: "s1", Path: "/v2.blk"})
r.Register(&BlockVolumeEntry{Name: "vol3", VolumeServer: "s2", Path: "/v3.blk"})
s1Vols := r.ListByServer("s1")
if len(s1Vols) != 2 {
t.Fatalf("ListByServer(s1): got %d, want 2", len(s1Vols))
}
s2Vols := r.ListByServer("s2")
if len(s2Vols) != 1 {
t.Fatalf("ListByServer(s2): got %d, want 1", len(s2Vols))
}
s3Vols := r.ListByServer("s3")
if len(s3Vols) != 0 {
t.Fatalf("ListByServer(s3): got %d, want 0", len(s3Vols))
}
}
func TestRegistry_UpdateFullHeartbeat(t *testing.T) {
r := NewBlockVolumeRegistry()
// Register two volumes on server s1.
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/v1.blk", Status: StatusPending})
r.Register(&BlockVolumeEntry{Name: "vol2", VolumeServer: "s1", Path: "/v2.blk", Status: StatusPending})
// Full heartbeat reports only vol1 (vol2 is stale).
r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/v1.blk", Epoch: 5, Role: 1},
})
// vol1 should be Active.
e1, ok := r.Lookup("vol1")
if !ok {
t.Fatal("vol1 should exist after full heartbeat")
}
if e1.Status != StatusActive {
t.Fatalf("vol1 status: got %d, want %d", e1.Status, StatusActive)
}
if e1.Epoch != 5 {
t.Fatalf("vol1 epoch: got %d, want 5", e1.Epoch)
}
// vol2 should be removed (stale).
if _, ok := r.Lookup("vol2"); ok {
t.Fatal("vol2 should have been removed as stale")
}
}
func TestRegistry_UpdateDeltaHeartbeat(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/v1.blk", Status: StatusPending})
r.Register(&BlockVolumeEntry{Name: "vol2", VolumeServer: "s1", Path: "/v2.blk", Status: StatusActive})
// Delta: vol1 newly appeared, vol2 deleted.
r.UpdateDeltaHeartbeat("s1",
[]*master_pb.BlockVolumeShortInfoMessage{{Path: "/v1.blk"}},
[]*master_pb.BlockVolumeShortInfoMessage{{Path: "/v2.blk"}},
)
// vol1 should be Active.
e1, ok := r.Lookup("vol1")
if !ok {
t.Fatal("vol1 should exist")
}
if e1.Status != StatusActive {
t.Fatalf("vol1 status: got %d, want Active", e1.Status)
}
// vol2 should be removed.
if _, ok := r.Lookup("vol2"); ok {
t.Fatal("vol2 should have been removed by delta")
}
}
func TestRegistry_PendingToActive(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1", VolumeServer: "s1", Path: "/v1.blk",
Status: StatusPending, Epoch: 1,
})
// Full heartbeat confirms the volume.
r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/v1.blk", Epoch: 1, Role: 1},
})
e, _ := r.Lookup("vol1")
if e.Status != StatusActive {
t.Fatalf("expected Active after heartbeat, got %d", e.Status)
}
}
func TestRegistry_PickServer(t *testing.T) {
r := NewBlockVolumeRegistry()
// s1 has 2 volumes, s2 has 1, s3 has 0.
r.Register(&BlockVolumeEntry{Name: "v1", VolumeServer: "s1", Path: "/v1.blk"})
r.Register(&BlockVolumeEntry{Name: "v2", VolumeServer: "s1", Path: "/v2.blk"})
r.Register(&BlockVolumeEntry{Name: "v3", VolumeServer: "s2", Path: "/v3.blk"})
got, err := r.PickServer([]string{"s1", "s2", "s3"})
if err != nil {
t.Fatalf("PickServer: %v", err)
}
if got != "s3" {
t.Fatalf("PickServer: got %q, want s3 (fewest volumes)", got)
}
}
func TestRegistry_PickServerEmpty(t *testing.T) {
r := NewBlockVolumeRegistry()
_, err := r.PickServer(nil)
if err == nil {
t.Fatal("PickServer with no servers should return error")
}
}
func TestRegistry_InflightLock(t *testing.T) {
r := NewBlockVolumeRegistry()
// First acquire succeeds.
if !r.AcquireInflight("vol1") {
t.Fatal("first AcquireInflight should succeed")
}
// Second acquire for same name fails.
if r.AcquireInflight("vol1") {
t.Fatal("second AcquireInflight for same name should fail")
}
// Different name succeeds.
if !r.AcquireInflight("vol2") {
t.Fatal("AcquireInflight for different name should succeed")
}
// Release and re-acquire.
r.ReleaseInflight("vol1")
if !r.AcquireInflight("vol1") {
t.Fatal("AcquireInflight after release should succeed")
}
r.ReleaseInflight("vol1")
r.ReleaseInflight("vol2")
}
func TestRegistry_UnmarkDeadServer(t *testing.T) {
r := NewBlockVolumeRegistry()
r.MarkBlockCapable("s1")
r.MarkBlockCapable("s2")
servers := r.BlockCapableServers()
if len(servers) != 2 {
t.Fatalf("expected 2 servers, got %d", len(servers))
}
// Simulate s1 disconnect.
r.UnmarkBlockCapable("s1")
servers = r.BlockCapableServers()
if len(servers) != 1 {
t.Fatalf("expected 1 server after unmark, got %d", len(servers))
}
if servers[0] != "s2" {
t.Fatalf("expected s2, got %s", servers[0])
}
}
func TestRegistry_FullHeartbeatUpdatesSizeBytes(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1", VolumeServer: "s1", Path: "/v1.blk",
SizeBytes: 1 << 30, Status: StatusPending,
})
// Heartbeat with updated size (online resize).
r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/v1.blk", VolumeSize: 2 << 30, Epoch: 1, Role: 1},
})
e, _ := r.Lookup("vol1")
if e.SizeBytes != 2<<30 {
t.Fatalf("SizeBytes: got %d, want %d", e.SizeBytes, 2<<30)
}
}
func TestRegistry_ConcurrentAccess(t *testing.T) {
r := NewBlockVolumeRegistry()
var wg sync.WaitGroup
n := 50
// Concurrent register.
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
name := fmt.Sprintf("vol%d", i)
r.Register(&BlockVolumeEntry{
Name: name, VolumeServer: "s1",
Path: fmt.Sprintf("/v%d.blk", i),
})
}(i)
}
wg.Wait()
// All should be findable.
for i := 0; i < n; i++ {
name := fmt.Sprintf("vol%d", i)
if _, ok := r.Lookup(name); !ok {
t.Fatalf("vol%d not found after concurrent register", i)
}
}
// Concurrent unregister.
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
r.Unregister(fmt.Sprintf("vol%d", i))
}(i)
}
wg.Wait()
// All should be gone.
for i := 0; i < n; i++ {
if _, ok := r.Lookup(fmt.Sprintf("vol%d", i)); ok {
t.Fatalf("vol%d found after concurrent unregister", i)
}
}
}
func TestRegistry_SetReplica(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/v1.blk"})
err := r.SetReplica("vol1", "s2", "/replica/v1.blk", "10.0.0.2:3260", "iqn.2024.test:vol1-replica")
if err != nil {
t.Fatalf("SetReplica: %v", err)
}
e, _ := r.Lookup("vol1")
if e.ReplicaServer != "s2" {
t.Fatalf("ReplicaServer: got %q, want s2", e.ReplicaServer)
}
if e.ReplicaPath != "/replica/v1.blk" {
t.Fatalf("ReplicaPath: got %q", e.ReplicaPath)
}
if e.ReplicaISCSIAddr != "10.0.0.2:3260" {
t.Fatalf("ReplicaISCSIAddr: got %q", e.ReplicaISCSIAddr)
}
if e.ReplicaIQN != "iqn.2024.test:vol1-replica" {
t.Fatalf("ReplicaIQN: got %q", e.ReplicaIQN)
}
// Replica server should appear in byServer index.
s2Vols := r.ListByServer("s2")
if len(s2Vols) != 1 || s2Vols[0].Name != "vol1" {
t.Fatalf("ListByServer(s2): got %v, want [vol1]", s2Vols)
}
}
func TestRegistry_ClearReplica(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/v1.blk"})
r.SetReplica("vol1", "s2", "/replica/v1.blk", "10.0.0.2:3260", "iqn.2024.test:vol1-replica")
err := r.ClearReplica("vol1")
if err != nil {
t.Fatalf("ClearReplica: %v", err)
}
e, _ := r.Lookup("vol1")
if e.ReplicaServer != "" {
t.Fatalf("ReplicaServer should be empty, got %q", e.ReplicaServer)
}
if e.ReplicaPath != "" || e.ReplicaISCSIAddr != "" || e.ReplicaIQN != "" {
t.Fatal("replica fields should be empty after ClearReplica")
}
// Replica server should be gone from byServer index.
s2Vols := r.ListByServer("s2")
if len(s2Vols) != 0 {
t.Fatalf("ListByServer(s2) after clear: got %d, want 0", len(s2Vols))
}
}
func TestRegistry_SetReplicaNotFound(t *testing.T) {
r := NewBlockVolumeRegistry()
err := r.SetReplica("nonexistent", "s2", "/r.blk", "addr", "iqn")
if err == nil {
t.Fatal("SetReplica on nonexistent volume should return error")
}
}
func TestRegistry_SwapPrimaryReplica(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "s1",
Path: "/v1.blk",
IQN: "iqn:vol1-primary",
ISCSIAddr: "10.0.0.1:3260",
ReplicaServer: "s2",
ReplicaPath: "/replica/v1.blk",
ReplicaIQN: "iqn:vol1-replica",
ReplicaISCSIAddr: "10.0.0.2:3260",
Epoch: 3,
Role: 1,
})
newEpoch, err := r.SwapPrimaryReplica("vol1")
if err != nil {
t.Fatalf("SwapPrimaryReplica: %v", err)
}
if newEpoch != 4 {
t.Fatalf("newEpoch: got %d, want 4", newEpoch)
}
e, _ := r.Lookup("vol1")
// New primary should be the old replica.
if e.VolumeServer != "s2" {
t.Fatalf("VolumeServer after swap: got %q, want s2", e.VolumeServer)
}
if e.Path != "/replica/v1.blk" {
t.Fatalf("Path after swap: got %q", e.Path)
}
if e.Epoch != 4 {
t.Fatalf("Epoch after swap: got %d, want 4", e.Epoch)
}
// Old primary should become replica.
if e.ReplicaServer != "s1" {
t.Fatalf("ReplicaServer after swap: got %q, want s1", e.ReplicaServer)
}
if e.ReplicaPath != "/v1.blk" {
t.Fatalf("ReplicaPath after swap: got %q", e.ReplicaPath)
}
}
func TestFullHeartbeat_UpdatesReplicaAddrs(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "server1",
Path: "/data/vol1.blk",
SizeBytes: 1 << 30,
Status: StatusPending,
})
// Full heartbeat includes replica addresses.
r.UpdateFullHeartbeat("server1", []*master_pb.BlockVolumeInfoMessage{
{
Path: "/data/vol1.blk",
VolumeSize: 1 << 30,
Epoch: 5,
Role: 1,
ReplicaDataAddr: "10.0.0.2:14260",
ReplicaCtrlAddr: "10.0.0.2:14261",
},
})
entry, ok := r.Lookup("vol1")
if !ok {
t.Fatal("vol1 not found after heartbeat")
}
if entry.Status != StatusActive {
t.Fatalf("expected Active, got %v", entry.Status)
}
if entry.ReplicaDataAddr != "10.0.0.2:14260" {
t.Fatalf("ReplicaDataAddr: got %q, want 10.0.0.2:14260", entry.ReplicaDataAddr)
}
if entry.ReplicaCtrlAddr != "10.0.0.2:14261" {
t.Fatalf("ReplicaCtrlAddr: got %q, want 10.0.0.2:14261", entry.ReplicaCtrlAddr)
}
}
// --- CP8-2 new tests ---
func TestRegistry_AddReplica(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/v1.blk"})
err := r.AddReplica("vol1", ReplicaInfo{
Server: "s2",
Path: "/replica/v1.blk",
ISCSIAddr: "10.0.0.2:3260",
IQN: "iqn:vol1-r1",
DataAddr: "s2:14260",
CtrlAddr: "s2:14261",
})
if err != nil {
t.Fatalf("AddReplica: %v", err)
}
e, _ := r.Lookup("vol1")
if len(e.Replicas) != 1 {
t.Fatalf("Replicas len: got %d, want 1", len(e.Replicas))
}
if e.Replicas[0].Server != "s2" {
t.Fatalf("Replicas[0].Server: got %q", e.Replicas[0].Server)
}
// Deprecated scalar should be synced.
if e.ReplicaServer != "s2" {
t.Fatalf("ReplicaServer (deprecated): got %q", e.ReplicaServer)
}
// byServer index should include replica.
if len(r.ListByServer("s2")) != 1 {
t.Fatalf("ListByServer(s2): got %d, want 1", len(r.ListByServer("s2")))
}
}
func TestRegistry_AddReplica_TwoRF3(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/v1.blk", ReplicaFactor: 3})
r.AddReplica("vol1", ReplicaInfo{Server: "s2", Path: "/r1.blk", IQN: "iqn:r1"})
r.AddReplica("vol1", ReplicaInfo{Server: "s3", Path: "/r2.blk", IQN: "iqn:r2"})
e, _ := r.Lookup("vol1")
if len(e.Replicas) != 2 {
t.Fatalf("Replicas len: got %d, want 2", len(e.Replicas))
}
if e.Replicas[0].Server != "s2" || e.Replicas[1].Server != "s3" {
t.Fatalf("Replicas: got %+v", e.Replicas)
}
// byServer index should include both.
if len(r.ListByServer("s2")) != 1 || len(r.ListByServer("s3")) != 1 {
t.Fatal("byServer should include both replica servers")
}
}
func TestRegistry_AddReplica_Upsert(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/v1.blk"})
r.AddReplica("vol1", ReplicaInfo{Server: "s2", Path: "/r1.blk"})
r.AddReplica("vol1", ReplicaInfo{Server: "s2", Path: "/r1-new.blk"})
e, _ := r.Lookup("vol1")
if len(e.Replicas) != 1 {
t.Fatalf("Replicas len: got %d, want 1 (upsert, not duplicate)", len(e.Replicas))
}
if e.Replicas[0].Path != "/r1-new.blk" {
t.Fatalf("Replicas[0].Path: got %q, want /r1-new.blk", e.Replicas[0].Path)
}
}
func TestRegistry_RemoveReplica(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/v1.blk"})
r.AddReplica("vol1", ReplicaInfo{Server: "s2", Path: "/r1.blk"})
r.AddReplica("vol1", ReplicaInfo{Server: "s3", Path: "/r2.blk"})
err := r.RemoveReplica("vol1", "s2")
if err != nil {
t.Fatalf("RemoveReplica: %v", err)
}
e, _ := r.Lookup("vol1")
if len(e.Replicas) != 1 {
t.Fatalf("Replicas len: got %d, want 1", len(e.Replicas))
}
if e.Replicas[0].Server != "s3" {
t.Fatalf("remaining replica should be s3, got %q", e.Replicas[0].Server)
}
// Deprecated scalar should sync to first remaining replica.
if e.ReplicaServer != "s3" {
t.Fatalf("ReplicaServer (deprecated): got %q, want s3", e.ReplicaServer)
}
// s2 should be removed from byServer.
if len(r.ListByServer("s2")) != 0 {
t.Fatalf("ListByServer(s2): got %d, want 0", len(r.ListByServer("s2")))
}
}
func TestRegistry_PromoteBestReplica_PicksHighest(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "s1",
Path: "/v1.blk",
Epoch: 5,
Role: 1,
Replicas: []ReplicaInfo{
{Server: "s2", Path: "/r1.blk", IQN: "iqn:r1", ISCSIAddr: "s2:3260", HealthScore: 0.8, WALHeadLSN: 100},
{Server: "s3", Path: "/r2.blk", IQN: "iqn:r2", ISCSIAddr: "s3:3260", HealthScore: 0.95, WALHeadLSN: 90},
},
})
// Add to byServer for s2 and s3.
r.mu.Lock()
r.addToServer("s2", "vol1")
r.addToServer("s3", "vol1")
r.mu.Unlock()
newEpoch, err := r.PromoteBestReplica("vol1")
if err != nil {
t.Fatalf("PromoteBestReplica: %v", err)
}
if newEpoch != 6 {
t.Fatalf("newEpoch: got %d, want 6", newEpoch)
}
e, _ := r.Lookup("vol1")
// s3 had higher health score → promoted.
if e.VolumeServer != "s3" {
t.Fatalf("VolumeServer: got %q, want s3 (higher health)", e.VolumeServer)
}
if e.Path != "/r2.blk" {
t.Fatalf("Path: got %q", e.Path)
}
// s2 should remain in Replicas.
if len(e.Replicas) != 1 {
t.Fatalf("Replicas len: got %d, want 1 (s2 stays)", len(e.Replicas))
}
if e.Replicas[0].Server != "s2" {
t.Fatalf("remaining replica: got %q, want s2", e.Replicas[0].Server)
}
}
func TestRegistry_PromoteBestReplica_NoReplica(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/v1.blk"})
_, err := r.PromoteBestReplica("vol1")
if err == nil {
t.Fatal("PromoteBestReplica with no replicas should return error")
}
}
func TestRegistry_PromoteBestReplica_TiebreakByLSN(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "s1",
Path: "/v1.blk",
Epoch: 3,
Replicas: []ReplicaInfo{
{Server: "s2", Path: "/r1.blk", IQN: "iqn:r1", ISCSIAddr: "s2:3260", HealthScore: 0.9, WALHeadLSN: 50},
{Server: "s3", Path: "/r2.blk", IQN: "iqn:r2", ISCSIAddr: "s3:3260", HealthScore: 0.9, WALHeadLSN: 100},
},
})
r.mu.Lock()
r.addToServer("s2", "vol1")
r.addToServer("s3", "vol1")
r.mu.Unlock()
newEpoch, err := r.PromoteBestReplica("vol1")
if err != nil {
t.Fatalf("PromoteBestReplica: %v", err)
}
if newEpoch != 4 {
t.Fatalf("newEpoch: got %d, want 4", newEpoch)
}
e, _ := r.Lookup("vol1")
// Same health → tie-break by WALHeadLSN → s3 wins.
if e.VolumeServer != "s3" {
t.Fatalf("VolumeServer: got %q, want s3 (higher LSN)", e.VolumeServer)
}
if len(e.Replicas) != 1 || e.Replicas[0].Server != "s2" {
t.Fatalf("remaining replica: got %+v, want [s2]", e.Replicas)
}
}
func TestRegistry_PromoteBestReplica_KeepsOthers(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "s1",
Path: "/v1.blk",
Epoch: 1,
Replicas: []ReplicaInfo{
{Server: "s2", Path: "/r1.blk", IQN: "iqn:r1", ISCSIAddr: "s2:3260", HealthScore: 1.0, WALHeadLSN: 100},
{Server: "s3", Path: "/r2.blk", IQN: "iqn:r2", ISCSIAddr: "s3:3260", HealthScore: 0.5, WALHeadLSN: 100},
},
})
r.mu.Lock()
r.addToServer("s2", "vol1")
r.addToServer("s3", "vol1")
r.mu.Unlock()
r.PromoteBestReplica("vol1")
e, _ := r.Lookup("vol1")
// s2 promoted, s3 stays.
if e.VolumeServer != "s2" {
t.Fatalf("VolumeServer: got %q, want s2", e.VolumeServer)
}
if len(e.Replicas) != 1 || e.Replicas[0].Server != "s3" {
t.Fatalf("remaining replicas: got %+v, want [s3]", e.Replicas)
}
}
func TestRegistry_BackwardCompatAccessors(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/v1.blk"})
e, _ := r.Lookup("vol1")
if e.HasReplica() {
t.Fatal("HasReplica should be false with no replicas")
}
if e.FirstReplica() != nil {
t.Fatal("FirstReplica should be nil")
}
if e.BestReplicaForPromotion() != nil {
t.Fatal("BestReplicaForPromotion should be nil")
}
r.AddReplica("vol1", ReplicaInfo{Server: "s2", Path: "/r.blk", HealthScore: 0.9})
e, _ = r.Lookup("vol1")
if !e.HasReplica() {
t.Fatal("HasReplica should be true after AddReplica")
}
if e.FirstReplica() == nil || e.FirstReplica().Server != "s2" {
t.Fatal("FirstReplica should return s2")
}
if e.ReplicaByServer("s2") == nil {
t.Fatal("ReplicaByServer(s2) should not be nil")
}
if e.ReplicaByServer("s3") != nil {
t.Fatal("ReplicaByServer(s3) should be nil")
}
}
func TestRegistry_ReplicaFactorDefault(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "s1", Path: "/v1.blk"})
e, _ := r.Lookup("vol1")
// ReplicaFactor defaults to 0 (zero value). API handler defaults to 2.
if e.ReplicaFactor != 0 {
t.Fatalf("default ReplicaFactor: got %d, want 0", e.ReplicaFactor)
}
// Explicit RF=3.
r.Register(&BlockVolumeEntry{Name: "vol2", VolumeServer: "s1", Path: "/v2.blk", ReplicaFactor: 3})
e2, _ := r.Lookup("vol2")
if e2.ReplicaFactor != 3 {
t.Fatalf("ReplicaFactor: got %d, want 3", e2.ReplicaFactor)
}
}
func TestRegistry_FullHeartbeat_UpdatesHealthScore(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "s1",
Path: "/v1.blk",
Status: StatusPending,
})
r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{
{
Path: "/v1.blk",
VolumeSize: 1 << 30,
Epoch: 1,
Role: 1,
HealthScore: 0.85,
ScrubErrors: 2,
WalHeadLsn: 500,
},
})
e, _ := r.Lookup("vol1")
if e.HealthScore != 0.85 {
t.Fatalf("HealthScore: got %f, want 0.85", e.HealthScore)
}
if e.WALHeadLSN != 500 {
t.Fatalf("WALHeadLSN: got %d, want 500", e.WALHeadLSN)
}
}
// Fix #1: Replica heartbeat must NOT delete the volume.
func TestRegistry_ReplicaHeartbeat_DoesNotDeleteVolume(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "primary",
Path: "/data/vol1.blk",
Status: StatusActive,
Replicas: []ReplicaInfo{
{Server: "replica1", Path: "/data/vol1.blk"},
},
})
// Replica sends heartbeat reporting its path.
r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 1, Role: 2},
})
// Volume must still exist with primary intact.
e, ok := r.Lookup("vol1")
if !ok {
t.Fatal("vol1 should not be deleted when replica sends heartbeat")
}
if e.VolumeServer != "primary" {
t.Fatalf("primary should remain 'primary', got %q", e.VolumeServer)
}
}
// Fix #1: Replica path NOT reported → replica removed, volume preserved.
func TestRegistry_ReplicaHeartbeat_StaleReplicaRemoved(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "primary",
Path: "/data/vol1.blk",
Status: StatusActive,
Replicas: []ReplicaInfo{
{Server: "replica1", Path: "/data/vol1.blk"},
{Server: "replica2", Path: "/data/vol1.blk"},
},
})
// replica1 heartbeat does NOT report vol1 path → stale replica.
r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{})
// Volume still exists, but replica1 removed.
e, ok := r.Lookup("vol1")
if !ok {
t.Fatal("vol1 should exist (only replica removed)")
}
if len(e.Replicas) != 1 {
t.Fatalf("expected 1 replica after stale removal, got %d", len(e.Replicas))
}
if e.Replicas[0].Server != "replica2" {
t.Fatalf("remaining replica should be replica2, got %q", e.Replicas[0].Server)
}
}
// Fix #3: Replica heartbeat after master restart reconstructs ReplicaInfo.
func TestRegistry_ReplicaHeartbeat_ReconstructsAfterRestart(t *testing.T) {
r := NewBlockVolumeRegistry()
// Simulate master restart: primary heartbeat re-created entry without replicas.
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "primary",
Path: "/data/vol1.blk",
Status: StatusActive,
})
// Replica heartbeat arrives — vol1 exists but has no record of this server.
r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 1, Role: 2, HealthScore: 0.95, WalHeadLsn: 42},
})
// vol1 should now have replica1 in Replicas[].
e, ok := r.Lookup("vol1")
if !ok {
t.Fatal("vol1 should exist")
}
if len(e.Replicas) != 1 {
t.Fatalf("expected 1 replica after reconstruction, got %d", len(e.Replicas))
}
ri := e.Replicas[0]
if ri.Server != "replica1" {
t.Fatalf("replica server: got %q, want replica1", ri.Server)
}
if ri.HealthScore != 0.95 {
t.Fatalf("replica health: got %f, want 0.95", ri.HealthScore)
}
if ri.WALHeadLSN != 42 {
t.Fatalf("replica WALHeadLSN: got %d, want 42", ri.WALHeadLSN)
}
// byServer index should include replica1.
entries := r.ListByServer("replica1")
if len(entries) != 1 || entries[0].Name != "vol1" {
t.Fatalf("ListByServer(replica1) should return vol1, got %+v", entries)
}
}
// Fix #2: Stale replica (old heartbeat) not eligible for promotion.
func TestRegistry_PromoteBestReplica_StaleHeartbeatIneligible(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "primary",
Path: "/data/vol1.blk",
Epoch: 1,
LeaseTTL: 5 * time.Second,
WALHeadLSN: 100,
Replicas: []ReplicaInfo{
{
Server: "stale-replica",
Path: "/data/vol1.blk",
HealthScore: 1.0,
WALHeadLSN: 100,
LastHeartbeat: time.Now().Add(-30 * time.Second), // stale (>2×5s)
},
},
})
_, err := r.PromoteBestReplica("vol1")
if err == nil {
t.Fatal("expected error: stale replica should not be eligible")
}
}
// Fix #2: Replica with WAL lag too large is not eligible.
func TestRegistry_PromoteBestReplica_WALLagIneligible(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "primary",
Path: "/data/vol1.blk",
Epoch: 1,
LeaseTTL: 30 * time.Second,
WALHeadLSN: 1000,
Replicas: []ReplicaInfo{
{
Server: "lagging",
Path: "/data/vol1.blk",
HealthScore: 1.0,
WALHeadLSN: 800, // lag=200, tolerance=100
LastHeartbeat: time.Now(),
},
},
})
_, err := r.PromoteBestReplica("vol1")
if err == nil {
t.Fatal("expected error: lagging replica should not be eligible")
}
}
// Fix #2: Rebuilding replica is not eligible for promotion.
func TestRegistry_PromoteBestReplica_RebuildingIneligible(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "primary",
Path: "/data/vol1.blk",
Epoch: 1,
LeaseTTL: 30 * time.Second,
WALHeadLSN: 100,
Replicas: []ReplicaInfo{
{
Server: "rebuilding",
Path: "/data/vol1.blk",
HealthScore: 1.0,
WALHeadLSN: 100,
LastHeartbeat: time.Now(),
Role: blockvol.RoleToWire(blockvol.RoleRebuilding),
},
},
})
_, err := r.PromoteBestReplica("vol1")
if err == nil {
t.Fatal("expected error: rebuilding replica should not be eligible")
}
}
// Fix #2: Among eligible replicas, best (health+LSN) wins.
func TestRegistry_PromoteBestReplica_EligibilityFiltersCorrectly(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "primary",
Path: "/data/vol1.blk",
Epoch: 1,
LeaseTTL: 30 * time.Second,
WALHeadLSN: 100,
Replicas: []ReplicaInfo{
{
Server: "stale", // ineligible: old heartbeat
Path: "/data/vol1.blk",
HealthScore: 1.0,
WALHeadLSN: 100,
LastHeartbeat: time.Now().Add(-2 * time.Minute),
},
{
Server: "good", // eligible
Path: "/data/vol1.blk",
HealthScore: 0.8,
WALHeadLSN: 95,
LastHeartbeat: time.Now(),
},
},
})
_, err := r.PromoteBestReplica("vol1")
if err != nil {
t.Fatalf("expected promotion to succeed: %v", err)
}
e, _ := r.Lookup("vol1")
if e.VolumeServer != "good" {
t.Fatalf("expected 'good' promoted (only eligible), got %q", e.VolumeServer)
}
}
// Configurable tolerance: widen tolerance to allow lagging replicas.
func TestRegistry_PromoteBestReplica_ConfigurableTolerance(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "primary",
Path: "/data/vol1.blk",
Epoch: 1,
LeaseTTL: 30 * time.Second,
WALHeadLSN: 1000,
Replicas: []ReplicaInfo{
{
Server: "lagging",
Path: "/data/vol1.blk",
HealthScore: 1.0,
WALHeadLSN: 800, // lag=200
LastHeartbeat: time.Now(),
},
},
})
// Default tolerance (100): lag 200 > tolerance → ineligible.
_, err := r.PromoteBestReplica("vol1")
if err == nil {
t.Fatal("expected error with default tolerance")
}
// Widen tolerance to 250: lag 200 < tolerance → eligible.
r.SetPromotionLSNTolerance(250)
_, err = r.PromoteBestReplica("vol1")
if err != nil {
t.Fatalf("expected success with widened tolerance: %v", err)
}
e, _ := r.Lookup("vol1")
if e.VolumeServer != "lagging" {
t.Fatalf("expected 'lagging' promoted, got %q", e.VolumeServer)
}
}
// --- LeaseGrants ---
func TestRegistry_LeaseGrants_PrimaryOnly(t *testing.T) {
r := NewBlockVolumeRegistry()
// Register a primary volume.
r.Register(&BlockVolumeEntry{
Name: "prim1",
VolumeServer: "s1:18080",
Path: "/data/prim1.blk",
SizeBytes: 1 << 30,
Epoch: 5,
Role: blockvol.RoleToWire(blockvol.RolePrimary),
Status: StatusActive,
LeaseTTL: 30 * time.Second,
})
// Register a replica volume on the same server.
r.Register(&BlockVolumeEntry{
Name: "repl1",
VolumeServer: "s2:18080",
Path: "/data/repl1.blk",
SizeBytes: 1 << 30,
Epoch: 3,
Role: blockvol.RoleToWire(blockvol.RoleReplica),
Status: StatusActive,
})
r.AddReplica("repl1", ReplicaInfo{Server: "s1:18080", Path: "/data/repl1-replica.blk"})
// Register a none-role volume.
r.Register(&BlockVolumeEntry{
Name: "none1",
VolumeServer: "s1:18080",
Path: "/data/none1.blk",
SizeBytes: 1 << 30,
Epoch: 1,
Role: blockvol.RoleToWire(blockvol.RoleNone),
Status: StatusActive,
})
// LeaseGrants for s1 should only include prim1 (the primary).
grants := r.LeaseGrants("s1:18080", nil)
if len(grants) != 1 {
t.Fatalf("expected 1 grant, got %d: %+v", len(grants), grants)
}
if grants[0].Path != "/data/prim1.blk" {
t.Errorf("expected prim1 path, got %q", grants[0].Path)
}
if grants[0].Epoch != 5 {
t.Errorf("expected epoch 5, got %d", grants[0].Epoch)
}
if grants[0].LeaseTtlMs != 30000 {
t.Errorf("expected 30000ms TTL, got %d", grants[0].LeaseTtlMs)
}
}
func TestRegistry_LeaseGrants_PendingExcluded(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "vol1",
VolumeServer: "s1:18080",
Path: "/data/vol1.blk",
SizeBytes: 1 << 30,
Epoch: 2,
Role: blockvol.RoleToWire(blockvol.RolePrimary),
Status: StatusActive,
LeaseTTL: 30 * time.Second,
})
r.Register(&BlockVolumeEntry{
Name: "vol2",
VolumeServer: "s1:18080",
Path: "/data/vol2.blk",
SizeBytes: 1 << 30,
Epoch: 1,
Role: blockvol.RoleToWire(blockvol.RolePrimary),
Status: StatusActive,
LeaseTTL: 30 * time.Second,
})
// vol1 has a pending assignment — should be excluded.
pending := map[string]bool{"/data/vol1.blk": true}
grants := r.LeaseGrants("s1:18080", pending)
if len(grants) != 1 {
t.Fatalf("expected 1 grant (vol2 only), got %d: %+v", len(grants), grants)
}
if grants[0].Path != "/data/vol2.blk" {
t.Errorf("expected vol2 path, got %q", grants[0].Path)
}
}
func TestRegistry_LeaseGrants_InactiveExcluded(t *testing.T) {
r := NewBlockVolumeRegistry()
r.Register(&BlockVolumeEntry{
Name: "pending-vol",
VolumeServer: "s1:18080",
Path: "/data/pending.blk",
SizeBytes: 1 << 30,
Epoch: 1,
Role: blockvol.RoleToWire(blockvol.RolePrimary),
Status: StatusPending, // not yet confirmed by heartbeat
LeaseTTL: 30 * time.Second,
})
grants := r.LeaseGrants("s1:18080", nil)
if len(grants) != 0 {
t.Fatalf("expected 0 grants for pending volume, got %d", len(grants))
}
}
func TestRegistry_LeaseGrants_UnknownServer(t *testing.T) {
r := NewBlockVolumeRegistry()
grants := r.LeaseGrants("unknown:18080", nil)
if grants != nil {
t.Fatalf("expected nil for unknown server, got %+v", grants)
}
}