diff --git a/weed/server/integration_block_test.go b/weed/server/integration_block_test.go index 6f8cf4894..2517cd68d 100644 --- a/weed/server/integration_block_test.go +++ b/weed/server/integration_block_test.go @@ -84,14 +84,15 @@ func TestIntegration_FailoverCSIPublish(t *testing.T) { } // Step 3: Expire lease so failover is immediate. - entry, _ := ms.blockRegistry.Lookup("pvc-data-1") - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry("pvc-data-1", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) // Step 4: Primary VS dies — triggers failover. ms.failoverBlockVolumes(primaryVS) // Step 5: Verify registry swap. - entry, _ = ms.blockRegistry.Lookup("pvc-data-1") + entry, _ := ms.blockRegistry.Lookup("pvc-data-1") if entry.VolumeServer != replicaVS { t.Fatalf("after failover: primary should be %q, got %q", replicaVS, entry.VolumeServer) } @@ -148,8 +149,9 @@ func TestIntegration_RebuildOnRecovery(t *testing.T) { replicaVS := createResp.ReplicaServer // Step 2: Expire lease for immediate failover. - entry, _ := ms.blockRegistry.Lookup("pvc-db-1") - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry("pvc-db-1", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) // Step 3: Primary dies → replica promoted. ms.failoverBlockVolumes(primaryVS) @@ -203,7 +205,7 @@ func TestIntegration_RebuildOnRecovery(t *testing.T) { } // Step 8: Registry shows old primary as new replica. - entry, _ = ms.blockRegistry.Lookup("pvc-db-1") + entry, _ := ms.blockRegistry.Lookup("pvc-db-1") if entry.ReplicaServer != primaryVS { t.Fatalf("after recovery: replica should be %q (old primary), got %q", primaryVS, entry.ReplicaServer) } @@ -343,9 +345,10 @@ func TestIntegration_LeaseAwarePromotion(t *testing.T) { primaryVS := resp.VolumeServer // Set a short but non-zero lease TTL (lease just granted → not yet expired). - entry, _ := ms.blockRegistry.Lookup("pvc-lease-1") - entry.LeaseTTL = 300 * time.Millisecond - entry.LastLeaseGrant = time.Now() + ms.blockRegistry.UpdateEntry("pvc-lease-1", func(e *BlockVolumeEntry) { + e.LeaseTTL = 300 * time.Millisecond + e.LastLeaseGrant = time.Now() + }) // Primary dies. ms.failoverBlockVolumes(primaryVS) @@ -418,8 +421,9 @@ func TestIntegration_ReplicaFailureSingleCopy(t *testing.T) { } // No failover possible without replica. - entry, _ := ms.blockRegistry.Lookup("pvc-single-1") - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry("pvc-single-1", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) ms.failoverBlockVolumes(primaryVS) e, _ := ms.blockRegistry.Lookup("pvc-single-1") @@ -449,9 +453,10 @@ func TestIntegration_TransientDisconnectNoSplitBrain(t *testing.T) { replicaVS := resp.ReplicaServer // Set lease with long TTL (not expired). - entry, _ := ms.blockRegistry.Lookup("pvc-transient-1") - entry.LeaseTTL = 1 * time.Second - entry.LastLeaseGrant = time.Now() + ms.blockRegistry.UpdateEntry("pvc-transient-1", func(e *BlockVolumeEntry) { + e.LeaseTTL = 1 * time.Second + e.LastLeaseGrant = time.Now() + }) // Primary disconnects → deferred promotion timer set. ms.failoverBlockVolumes(primaryVS) @@ -537,7 +542,9 @@ func TestIntegration_FullLifecycle(t *testing.T) { } // --- Phase 4: Expire lease + kill primary --- - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry("pvc-lifecycle-1", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) ms.failoverBlockVolumes(primaryVS) // --- Phase 5: Verify failover --- @@ -629,8 +636,9 @@ func TestIntegration_DoubleFailover(t *testing.T) { vs2 := resp.ReplicaServer // First failover: vs1 dies → vs2 promoted. - entry, _ := ms.blockRegistry.Lookup("pvc-double-1") - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry("pvc-double-1", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) ms.failoverBlockVolumes(vs1) e1, _ := ms.blockRegistry.Lookup("pvc-double-1") @@ -648,19 +656,21 @@ func TestIntegration_DoubleFailover(t *testing.T) { // Simulate heartbeat from vs1 that restores iSCSI addr, health score, // role, and heartbeat timestamp (in production this happens when the // VS re-registers after reconnect and completes rebuild). - e1, _ = ms.blockRegistry.Lookup("pvc-double-1") - for i := range e1.Replicas { - if e1.Replicas[i].Server == vs1 { - e1.Replicas[i].ISCSIAddr = vs1 + ":3260" - e1.Replicas[i].HealthScore = 1.0 - e1.Replicas[i].Role = blockvol.RoleToWire(blockvol.RoleReplica) - e1.Replicas[i].LastHeartbeat = time.Now() + ms.blockRegistry.UpdateEntry("pvc-double-1", func(e *BlockVolumeEntry) { + for i := range e.Replicas { + if e.Replicas[i].Server == vs1 { + e.Replicas[i].ISCSIAddr = vs1 + ":3260" + e.Replicas[i].HealthScore = 1.0 + e.Replicas[i].Role = blockvol.RoleToWire(blockvol.RoleReplica) + e.Replicas[i].LastHeartbeat = time.Now() + } } - } + }) // Now vs1 is back as replica. Second failover: vs2 dies → vs1 promoted. - e1, _ = ms.blockRegistry.Lookup("pvc-double-1") - e1.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry("pvc-double-1", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) ms.failoverBlockVolumes(vs2) e2, _ := ms.blockRegistry.Lookup("pvc-double-1") @@ -705,10 +715,13 @@ func TestIntegration_MultiVolumeFailoverRebuild(t *testing.T) { // Find which server is primary for each volume. primaryCounts := map[string]int{} for i := 1; i <= 3; i++ { - e, _ := ms.blockRegistry.Lookup(fmt.Sprintf("pvc-multi-%d", i)) + name := fmt.Sprintf("pvc-multi-%d", i) + e, _ := ms.blockRegistry.Lookup(name) primaryCounts[e.VolumeServer]++ // Expire lease. - e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry(name, func(entry *BlockVolumeEntry) { + entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) } // Kill the server with the most primaries. diff --git a/weed/server/master_block_failover_test.go b/weed/server/master_block_failover_test.go index afe604a43..e89456182 100644 --- a/weed/server/master_block_failover_test.go +++ b/weed/server/master_block_failover_test.go @@ -533,13 +533,14 @@ func TestLifecycle_CreateFailoverRebuild(t *testing.T) { } // Update lease so it's expired (simulate time passage). - entry, _ := ms.blockRegistry.Lookup("vol1") - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry("vol1", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) // Primary dies. ms.failoverBlockVolumes(primary) - entry, _ = ms.blockRegistry.Lookup("vol1") + entry, _ := ms.blockRegistry.Lookup("vol1") if entry.VolumeServer != replica { t.Fatalf("after failover: primary=%q, want %q", entry.VolumeServer, replica) } @@ -623,13 +624,14 @@ func TestRF3_PrimaryDies_BestReplicaPromoted(t *testing.T) { registerVolumeRF3(t, ms, "vol1", "vs1", "vs2", "vs3", 1, 5*time.Second) // Give vs3 a higher health score so it should be promoted. - entry, _ := ms.blockRegistry.Lookup("vol1") - entry.Replicas[0].HealthScore = 0.8 // vs2 - entry.Replicas[1].HealthScore = 1.0 // vs3 + ms.blockRegistry.UpdateEntry("vol1", func(e *BlockVolumeEntry) { + e.Replicas[0].HealthScore = 0.8 // vs2 + e.Replicas[1].HealthScore = 1.0 // vs3 + }) ms.failoverBlockVolumes("vs1") - entry, _ = ms.blockRegistry.Lookup("vol1") + entry, _ := ms.blockRegistry.Lookup("vol1") if entry.VolumeServer != "vs3" { t.Fatalf("primary should be vs3 (highest health), got %q", entry.VolumeServer) } @@ -1124,14 +1126,15 @@ func TestT3_DeferredTimer_EpochChanged_NoPromotion(t *testing.T) { ms.failoverBlockVolumes("vs1") // Before timer fires, manually bump the epoch (simulating another event). - e, _ := ms.blockRegistry.Lookup("vol1") - e.Epoch = 99 + ms.blockRegistry.UpdateEntry("vol1", func(e *BlockVolumeEntry) { + e.Epoch = 99 + }) // Wait for timer to fire. time.Sleep(350 * time.Millisecond) // Timer should have been rejected (epoch mismatch). Epoch stays at 99. - e, _ = ms.blockRegistry.Lookup("vol1") + e, _ := ms.blockRegistry.Lookup("vol1") if e.Epoch != 99 { t.Fatalf("epoch should remain 99 (timer rejected), got %d", e.Epoch) } diff --git a/weed/server/master_block_registry.go b/weed/server/master_block_registry.go index df12fcd0f..17c4634c7 100644 --- a/weed/server/master_block_registry.go +++ b/weed/server/master_block_registry.go @@ -295,26 +295,55 @@ func (r *BlockVolumeRegistry) UpdateSize(name string, newSizeBytes uint64) error return nil } -// Lookup returns the entry for the given name. -func (r *BlockVolumeRegistry) Lookup(name string) (*BlockVolumeEntry, bool) { +// clone returns a deep copy of the entry. The Replicas slice is copied +// so the caller cannot mutate registry state through the returned value. +func (e *BlockVolumeEntry) clone() BlockVolumeEntry { + c := *e + if len(e.Replicas) > 0 { + c.Replicas = make([]ReplicaInfo, len(e.Replicas)) + copy(c.Replicas, e.Replicas) + } + return c +} + +// Lookup returns a copy of the entry for the given name. +// The returned value is safe to read without holding any lock. +// To mutate registry state, use UpdateEntry instead. +func (r *BlockVolumeRegistry) Lookup(name string) (BlockVolumeEntry, bool) { r.mu.RLock() defer r.mu.RUnlock() e, ok := r.volumes[name] - return e, ok + if !ok { + return BlockVolumeEntry{}, false + } + return e.clone(), ok +} + +// UpdateEntry calls fn with the internal entry under write lock. +// Use this for any mutation that must be visible to the registry. +func (r *BlockVolumeRegistry) UpdateEntry(name string, fn func(*BlockVolumeEntry)) error { + r.mu.Lock() + defer r.mu.Unlock() + e, ok := r.volumes[name] + if !ok { + return fmt.Errorf("block volume %q not found", name) + } + fn(e) + return nil } -// ListByServer returns all entries hosted on the given server. -func (r *BlockVolumeRegistry) ListByServer(server string) []*BlockVolumeEntry { +// ListByServer returns copies of all entries hosted on the given server. +func (r *BlockVolumeRegistry) ListByServer(server string) []BlockVolumeEntry { r.mu.RLock() defer r.mu.RUnlock() names, ok := r.byServer[server] if !ok { return nil } - entries := make([]*BlockVolumeEntry, 0, len(names)) + entries := make([]BlockVolumeEntry, 0, len(names)) for name := range names { if e, ok := r.volumes[name]; ok { - entries = append(entries, e) + entries = append(entries, e.clone()) } } return entries @@ -1312,12 +1341,12 @@ func (r *BlockVolumeRegistry) LeaseGrants(server string, pendingPaths map[string } // ListAll returns all registered block volume entries, sorted by name. -func (r *BlockVolumeRegistry) ListAll() []*BlockVolumeEntry { +func (r *BlockVolumeRegistry) ListAll() []BlockVolumeEntry { r.mu.RLock() defer r.mu.RUnlock() - entries := make([]*BlockVolumeEntry, 0, len(r.volumes)) + entries := make([]BlockVolumeEntry, 0, len(r.volumes)) for _, e := range r.volumes { - entries = append(entries, e) + entries = append(entries, e.clone()) } sort.Slice(entries, func(i, j int) bool { return entries[i].Name < entries[j].Name }) return entries diff --git a/weed/server/master_block_registry_test.go b/weed/server/master_block_registry_test.go index 489fbf30c..ce239fb90 100644 --- a/weed/server/master_block_registry_test.go +++ b/weed/server/master_block_registry_test.go @@ -1617,3 +1617,102 @@ func TestRegistry_ManualPromote_Force_StillRejectsDeadServer(t *testing.T) { t.Fatalf("expected server_dead rejection, got %+v", pf.Rejections) } } + +// --- Copy semantics tests (pointer escape fix) --- + +func TestLookup_ReturnsCopy(t *testing.T) { + r := NewBlockVolumeRegistry() + r.Register(&BlockVolumeEntry{ + Name: "vol1", + VolumeServer: "vs1:9333", + Path: "/data/vol1.blk", + Epoch: 1, + Role: blockvol.RoleToWire(blockvol.RolePrimary), + Status: StatusActive, + }) + + // Get a copy via Lookup. + entry, ok := r.Lookup("vol1") + if !ok { + t.Fatal("vol1 not found") + } + + // Mutate the copy. + entry.Epoch = 999 + entry.VolumeServer = "mutated:9333" + + // Registry must be unaffected. + original, _ := r.Lookup("vol1") + if original.Epoch != 1 { + t.Fatalf("Lookup copy mutation leaked: Epoch=%d, want 1", original.Epoch) + } + if original.VolumeServer != "vs1:9333" { + t.Fatalf("Lookup copy mutation leaked: VolumeServer=%q", original.VolumeServer) + } +} + +func TestLookup_ReplicaSliceCopy(t *testing.T) { + r := NewBlockVolumeRegistry() + r.Register(&BlockVolumeEntry{ + Name: "vol1", + VolumeServer: "vs1:9333", + Path: "/data/vol1.blk", + Status: StatusActive, + Replicas: []ReplicaInfo{{Server: "vs2:9333", HealthScore: 1.0}}, + }) + + entry, _ := r.Lookup("vol1") + // Mutate replica slice on the copy. + entry.Replicas[0].HealthScore = 0.0 + entry.Replicas = append(entry.Replicas, ReplicaInfo{Server: "vs3:9333"}) + + // Registry must be unaffected. + original, _ := r.Lookup("vol1") + if len(original.Replicas) != 1 { + t.Fatalf("Replica slice mutation leaked: len=%d, want 1", len(original.Replicas)) + } + if original.Replicas[0].HealthScore != 1.0 { + t.Fatalf("Replica HealthScore mutation leaked: %f", original.Replicas[0].HealthScore) + } +} + +func TestListAll_ReturnsCopies(t *testing.T) { + r := NewBlockVolumeRegistry() + r.Register(&BlockVolumeEntry{ + Name: "vol1", VolumeServer: "vs1:9333", Path: "/data/vol1.blk", Status: StatusActive, + }) + + entries := r.ListAll() + entries[0].Epoch = 999 + + original, _ := r.Lookup("vol1") + if original.Epoch != 0 { + t.Fatalf("ListAll copy mutation leaked: Epoch=%d", original.Epoch) + } +} + +func TestUpdateEntry_MutatesRegistry(t *testing.T) { + r := NewBlockVolumeRegistry() + r.Register(&BlockVolumeEntry{ + Name: "vol1", VolumeServer: "vs1:9333", Path: "/data/vol1.blk", Status: StatusActive, + }) + + r.UpdateEntry("vol1", func(e *BlockVolumeEntry) { + e.Preset = "database" + }) + + entry, _ := r.Lookup("vol1") + if entry.Preset != "database" { + t.Fatalf("UpdateEntry did not mutate: Preset=%q", entry.Preset) + } +} + +func TestUpdateEntry_NotFound(t *testing.T) { + r := NewBlockVolumeRegistry() + err := r.UpdateEntry("nonexistent", func(e *BlockVolumeEntry) { + e.Epoch = 99 + }) + if err == nil { + t.Fatal("expected error for nonexistent volume") + } +} diff --git a/weed/server/master_grpc_server_block.go b/weed/server/master_grpc_server_block.go index 58a5c5f71..75fb78948 100644 --- a/weed/server/master_grpc_server_block.go +++ b/weed/server/master_grpc_server_block.go @@ -54,10 +54,10 @@ func (ms *MasterServer) CreateBlockVolume(ctx context.Context, req *master_pb.Cr // Idempotent: if already registered, return existing entry (validate size + mode + RF). if entry, ok := ms.blockRegistry.Lookup(req.Name); ok { - if err := ms.validateIdempotentCreate(entry, req, durMode, replicaFactor); err != nil { + if err := ms.validateIdempotentCreate(&entry, req, durMode, replicaFactor); err != nil { return nil, err } - return ms.createBlockVolumeResponseFromEntry(entry), nil + return ms.createBlockVolumeResponseFromEntry(&entry), nil } // Per-name inflight lock prevents concurrent creates for the same name. @@ -68,10 +68,10 @@ func (ms *MasterServer) CreateBlockVolume(ctx context.Context, req *master_pb.Cr // Double-check after acquiring lock (another goroutine may have finished). if entry, ok := ms.blockRegistry.Lookup(req.Name); ok { - if err := ms.validateIdempotentCreate(entry, req, durMode, replicaFactor); err != nil { + if err := ms.validateIdempotentCreate(&entry, req, durMode, replicaFactor); err != nil { return nil, err } - return ms.createBlockVolumeResponseFromEntry(entry), nil + return ms.createBlockVolumeResponseFromEntry(&entry), nil } // Evaluate placement using the shared planner (parity with /block/volume/plan). @@ -153,7 +153,7 @@ func (ms *MasterServer) CreateBlockVolume(ctx context.Context, req *master_pb.Cr if err := ms.blockRegistry.Register(entry); err != nil { // Already registered (race condition) — return the existing entry. if existing, ok := ms.blockRegistry.Lookup(req.Name); ok { - return ms.createBlockVolumeResponseFromEntry(existing), nil + return ms.createBlockVolumeResponseFromEntry(&existing), nil } return nil, fmt.Errorf("register block volume: %w", err) } @@ -247,7 +247,7 @@ func (ms *MasterServer) LookupBlockVolume(ctx context.Context, req *master_pb.Lo return nil, fmt.Errorf("block volume %q not found", req.Name) } - replicaServers := replicaServerList(entry) + replicaServers := replicaServerList(&entry) rf := entry.ReplicaFactor if rf == 0 { rf = 2 // default for pre-CP8-2 entries diff --git a/weed/server/master_server_handlers_block.go b/weed/server/master_server_handlers_block.go index 5dbf9881b..02fdad76f 100644 --- a/weed/server/master_server_handlers_block.go +++ b/weed/server/master_server_handlers_block.go @@ -59,11 +59,11 @@ func (ms *MasterServer) blockVolumeCreateHandler(w http.ResponseWriter, r *http. return } - // Store replica_placement and preset on the registry entry. - if entry, ok := ms.blockRegistry.Lookup(resp.VolumeId); ok { - entry.ReplicaPlacement = replicaPlacement - entry.Preset = req.Preset - } + // Store replica_placement and preset on the registry entry (locked mutation). + ms.blockRegistry.UpdateEntry(resp.VolumeId, func(e *BlockVolumeEntry) { + e.ReplicaPlacement = replicaPlacement + e.Preset = req.Preset + }) // Look up the full entry to populate all fields. info := blockapi.VolumeInfo{ @@ -75,7 +75,7 @@ func (ms *MasterServer) blockVolumeCreateHandler(w http.ResponseWriter, r *http. IQN: resp.Iqn, } if entry, ok := ms.blockRegistry.Lookup(resp.VolumeId); ok { - info = entryToVolumeInfo(entry, ms.blockRegistry.IsBlockCapable(entry.VolumeServer)) + info = entryToVolumeInfo(&entry, ms.blockRegistry.IsBlockCapable(entry.VolumeServer)) } writeJsonQuiet(w, r, http.StatusOK, info) } @@ -155,15 +155,15 @@ func (ms *MasterServer) blockVolumeLookupHandler(w http.ResponseWriter, r *http. writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("block volume %q not found", name)) return } - writeJsonQuiet(w, r, http.StatusOK, entryToVolumeInfo(entry, ms.blockRegistry.IsBlockCapable(entry.VolumeServer))) + writeJsonQuiet(w, r, http.StatusOK, entryToVolumeInfo(&entry, ms.blockRegistry.IsBlockCapable(entry.VolumeServer))) } // blockVolumeListHandler handles GET /block/volumes. func (ms *MasterServer) blockVolumeListHandler(w http.ResponseWriter, r *http.Request) { entries := ms.blockRegistry.ListAll() infos := make([]blockapi.VolumeInfo, len(entries)) - for i, e := range entries { - infos[i] = entryToVolumeInfo(e, ms.blockRegistry.IsBlockCapable(e.VolumeServer)) + for i := range entries { + infos[i] = entryToVolumeInfo(&entries[i], ms.blockRegistry.IsBlockCapable(entries[i].VolumeServer)) } writeJsonQuiet(w, r, http.StatusOK, infos) } diff --git a/weed/server/master_server_handlers_block_ui.go b/weed/server/master_server_handlers_block_ui.go index f638f11fc..5b2cb6d2b 100644 --- a/weed/server/master_server_handlers_block_ui.go +++ b/weed/server/master_server_handlers_block_ui.go @@ -40,7 +40,8 @@ func (ms *MasterServer) buildBlockUIData(tab string) blockUIData { volumes := make([]blockUIVolume, len(entries)) var totalSizeMB uint64 var activeCount, pendingCount int - for i, e := range entries { + for i := range entries { + e := &entries[i] info := entryToVolumeInfo(e, ms.blockRegistry.IsBlockCapable(e.VolumeServer)) mb := info.SizeBytes / (1024 * 1024) var maxLag uint64 diff --git a/weed/server/qa_block_cp11b1_adversarial_test.go b/weed/server/qa_block_cp11b1_adversarial_test.go index 2e343882b..94c480143 100644 --- a/weed/server/qa_block_cp11b1_adversarial_test.go +++ b/weed/server/qa_block_cp11b1_adversarial_test.go @@ -228,9 +228,9 @@ func TestQA_CP11B1_CreateWithPreset_StoresPreset(t *testing.T) { } // Simulate what the HTTP handler does after create. - if entry, ok := ms.blockRegistry.Lookup("preset-vol"); ok { - entry.Preset = "general" - } + ms.blockRegistry.UpdateEntry("preset-vol", func(e *BlockVolumeEntry) { + e.Preset = "general" + }) entry, ok := ms.blockRegistry.Lookup("preset-vol") if !ok { @@ -241,7 +241,7 @@ func TestQA_CP11B1_CreateWithPreset_StoresPreset(t *testing.T) { } // Verify entryToVolumeInfo propagates preset. - info := entryToVolumeInfo(entry, true) + info := entryToVolumeInfo(&entry, true) if info.Preset != "general" { t.Errorf("VolumeInfo.Preset = %q, want general", info.Preset) } diff --git a/weed/server/qa_block_cp11b3_adversarial_test.go b/weed/server/qa_block_cp11b3_adversarial_test.go index e999d6146..f2377aa4f 100644 --- a/weed/server/qa_block_cp11b3_adversarial_test.go +++ b/weed/server/qa_block_cp11b3_adversarial_test.go @@ -76,8 +76,9 @@ func TestQA_T1_WALLag_ExactBoundary(t *testing.T) { } // Now set replica at LSN 149 → lag = 51 > tolerance → ineligible. - e, _ := r.Lookup("vol1") - e.Replicas[0].WALHeadLSN = 149 + r.UpdateEntry("vol1", func(e *BlockVolumeEntry) { + e.Replicas[0].WALHeadLSN = 149 + }) pf, _ = r.EvaluatePromotion("vol1") if pf.Promotable { @@ -714,9 +715,10 @@ func TestQA_T2_RF3_OrphanedPrimary_BestReplicaPromoted(t *testing.T) { registerVolumeRF3(t, ms, "vol1", "vs1", "vs2", "vs3", 1, 5*time.Second) // Give vs3 higher health. - entry, _ := ms.blockRegistry.Lookup("vol1") - entry.Replicas[0].HealthScore = 0.7 // vs2 - entry.Replicas[1].HealthScore = 1.0 // vs3 + ms.blockRegistry.UpdateEntry("vol1", func(entry *BlockVolumeEntry) { + entry.Replicas[0].HealthScore = 0.7 // vs2 + entry.Replicas[1].HealthScore = 1.0 // vs3 + }) // Kill primary without calling failoverBlockVolumes (simulates missed failover). ms.blockRegistry.UnmarkBlockCapable("vs1") @@ -724,7 +726,7 @@ func TestQA_T2_RF3_OrphanedPrimary_BestReplicaPromoted(t *testing.T) { // vs2 reconnects → orphan detected → best replica (vs3) promoted. ms.reevaluateOrphanedPrimaries("vs2") - entry, _ = ms.blockRegistry.Lookup("vol1") + entry, _ := ms.blockRegistry.Lookup("vol1") if entry.VolumeServer != "vs3" { t.Fatalf("expected vs3 promoted (highest health), got %q", entry.VolumeServer) } @@ -898,12 +900,13 @@ func TestQA_T3_OrphanDeferredTimer_EpochChanged_NoPromotion(t *testing.T) { ms.reevaluateOrphanedPrimaries("vs2") // Before timer fires, bump epoch (simulates admin intervention). - e, _ := ms.blockRegistry.Lookup("vol1") - e.Epoch = 42 + ms.blockRegistry.UpdateEntry("vol1", func(e *BlockVolumeEntry) { + e.Epoch = 42 + }) time.Sleep(350 * time.Millisecond) - e, _ = ms.blockRegistry.Lookup("vol1") + e, _ := ms.blockRegistry.Lookup("vol1") if e.Epoch != 42 { t.Fatalf("epoch should remain 42 (timer rejected), got %d", e.Epoch) } @@ -928,7 +931,9 @@ func TestQA_T4_RebuildAddr_UpdatedByHeartbeat(t *testing.T) { } // New primary (vs2) heartbeats with RebuildListenAddr. - entry.RebuildListenAddr = "vs2:15000" + ms.blockRegistry.UpdateEntry("vol1", func(e *BlockVolumeEntry) { + e.RebuildListenAddr = "vs2:15000" + }) // vs1 reconnects → rebuild should use the updated addr. ms.recoverBlockVolumes("vs1") diff --git a/weed/server/qa_block_cp63_test.go b/weed/server/qa_block_cp63_test.go index b16b38b10..7ae247a22 100644 --- a/weed/server/qa_block_cp63_test.go +++ b/weed/server/qa_block_cp63_test.go @@ -406,18 +406,19 @@ func TestQA_Failover_PromoteIdempotent_NoReplicaAfterFirstSwap(t *testing.T) { ms.recoverBlockVolumes("vs1") // Simulate rebuild completion: mark vs1 as a healthy replica. - e, _ := ms.blockRegistry.Lookup("vol1") - for i := range e.Replicas { - if e.Replicas[i].Server == "vs1" { - e.Replicas[i].Role = blockvol.RoleToWire(blockvol.RoleReplica) - e.Replicas[i].LastHeartbeat = time.Now() - e.Replicas[i].HealthScore = 1.0 + ms.blockRegistry.UpdateEntry("vol1", func(e *BlockVolumeEntry) { + for i := range e.Replicas { + if e.Replicas[i].Server == "vs1" { + e.Replicas[i].Role = blockvol.RoleToWire(blockvol.RoleReplica) + e.Replicas[i].LastHeartbeat = time.Now() + e.Replicas[i].HealthScore = 1.0 + } } - } - e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) // expire the new lease + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) // expire the new lease + }) ms.failoverBlockVolumes("vs2") - e, _ = ms.blockRegistry.Lookup("vol1") + e, _ := ms.blockRegistry.Lookup("vol1") // After double failover: should swap back to vs1 as primary. if e.VolumeServer != "vs1" { t.Fatalf("double failover: primary=%s, want vs1", e.VolumeServer) @@ -662,14 +663,15 @@ func TestQA_Rebuild_FullCycle_CreateFailoverRecoverRebuild(t *testing.T) { } // Expire lease. - entry, _ := ms.blockRegistry.Lookup("vol1") - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry("vol1", func(entry *BlockVolumeEntry) { + entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) // Primary disconnects. ms.failoverBlockVolumes(primary) // Verify promotion. - entry, _ = ms.blockRegistry.Lookup("vol1") + entry, _ := ms.blockRegistry.Lookup("vol1") if entry.VolumeServer != replica { t.Fatalf("expected promotion to %s, got %s", replica, entry.VolumeServer) } diff --git a/weed/server/qa_block_cp82_adversarial_test.go b/weed/server/qa_block_cp82_adversarial_test.go index 761dab5a6..91a11acfc 100644 --- a/weed/server/qa_block_cp82_adversarial_test.go +++ b/weed/server/qa_block_cp82_adversarial_test.go @@ -411,11 +411,13 @@ func TestQA_CP82_MasterRestart_ReconstructReplicas_ThenFailover(t *testing.T) { } // Phase 3: Set lease expired and trigger failover. - entry.LeaseTTL = 5 * time.Second - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) - // Ensure replica is eligible for promotion. - entry.Replicas[0].LastHeartbeat = time.Now() - entry.Replicas[0].Role = blockvol.RoleToWire(blockvol.RoleReplica) + ms.blockRegistry.UpdateEntry("vol-restart", func(e *BlockVolumeEntry) { + e.LeaseTTL = 5 * time.Second + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + // Ensure replica is eligible for promotion. + e.Replicas[0].LastHeartbeat = time.Now() + e.Replicas[0].Role = blockvol.RoleToWire(blockvol.RoleReplica) + }) ms.failoverBlockVolumes("vs1:9333") diff --git a/weed/server/qa_block_cp831_adversarial_test.go b/weed/server/qa_block_cp831_adversarial_test.go index 610c9b37a..9a1706a22 100644 --- a/weed/server/qa_block_cp831_adversarial_test.go +++ b/weed/server/qa_block_cp831_adversarial_test.go @@ -475,19 +475,21 @@ func TestQA_CP831_FailoverPreservesDurabilityMode(t *testing.T) { t.Fatalf("create: %v", err) } - entry, _ := ms.blockRegistry.Lookup("fo-mode") - primary := entry.VolumeServer + readEntry, _ := ms.blockRegistry.Lookup("fo-mode") + primary := readEntry.VolumeServer // Expire the lease so failover will actually promote. - entry.LeaseTTL = 5 * time.Second - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) - - // Ensure replica has fresh heartbeat for promotion eligibility. - if len(entry.Replicas) > 0 { - entry.Replicas[0].LastHeartbeat = time.Now() - entry.Replicas[0].Role = blockvol.RoleToWire(blockvol.RoleReplica) - entry.Replicas[0].WALHeadLSN = entry.WALHeadLSN - } + ms.blockRegistry.UpdateEntry("fo-mode", func(entry *BlockVolumeEntry) { + entry.LeaseTTL = 5 * time.Second + entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + + // Ensure replica has fresh heartbeat for promotion eligibility. + if len(entry.Replicas) > 0 { + entry.Replicas[0].LastHeartbeat = time.Now() + entry.Replicas[0].Role = blockvol.RoleToWire(blockvol.RoleReplica) + entry.Replicas[0].WALHeadLSN = entry.WALHeadLSN + } + }) ms.failoverBlockVolumes(primary) diff --git a/weed/server/qa_block_nvme_publication_test.go b/weed/server/qa_block_nvme_publication_test.go index 2cfbadbaf..de4e3ac8a 100644 --- a/weed/server/qa_block_nvme_publication_test.go +++ b/weed/server/qa_block_nvme_publication_test.go @@ -572,14 +572,15 @@ func TestIntegration_NVMe_FailoverUpdatesNvmeAddr(t *testing.T) { originalNvmeAddr := createResp.NvmeAddr // Expire lease for immediate failover. - entry, _ := ms.blockRegistry.Lookup("pvc-failover-nvme") - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry("pvc-failover-nvme", func(entry *BlockVolumeEntry) { + entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) // Primary dies → replica promoted. ms.failoverBlockVolumes(primaryVS) // Verify new primary is different. - entry, _ = ms.blockRegistry.Lookup("pvc-failover-nvme") + entry, _ := ms.blockRegistry.Lookup("pvc-failover-nvme") if entry.VolumeServer == primaryVS { t.Fatal("failover didn't promote replica") } @@ -874,7 +875,9 @@ func TestIntegration_NVMe_FullLifecycle_K8s(t *testing.T) { }) // ── Step 5: Primary VS dies ── - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry("pvc-k8s-data", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) ms.failoverBlockVolumes(primaryVS) entry, _ = ms.blockRegistry.Lookup("pvc-k8s-data") diff --git a/weed/server/qa_block_rf3_test.go b/weed/server/qa_block_rf3_test.go index a055c4ea1..960af4b8b 100644 --- a/weed/server/qa_block_rf3_test.go +++ b/weed/server/qa_block_rf3_test.go @@ -137,17 +137,19 @@ func TestQA_RF3_PrimaryDies_BestReplicaPromoted(t *testing.T) { entry, _ := ms.blockRegistry.Lookup("pvc-rf3-promo") primary := resp.VolumeServer + expectedPromoted := entry.Replicas[1].Server // higher health score // Set different health scores on replicas. - for i := range entry.Replicas { - if i == 0 { - entry.Replicas[i].HealthScore = 0.7 - } else { - entry.Replicas[i].HealthScore = 1.0 + ms.blockRegistry.UpdateEntry("pvc-rf3-promo", func(e *BlockVolumeEntry) { + for i := range e.Replicas { + if i == 0 { + e.Replicas[i].HealthScore = 0.7 + } else { + e.Replicas[i].HealthScore = 1.0 + } } - } - expectedPromoted := entry.Replicas[1].Server // higher health score - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) // expired + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) // expired + }) // Kill primary. ms.failoverBlockVolumes(primary) @@ -275,14 +277,16 @@ func TestQA_RF3_HealthScore_FailoverPreference(t *testing.T) { entry, _ := ms.blockRegistry.Lookup("pvc-rf3-health") primary := entry.VolumeServer + expectedWinner := entry.Replicas[1].Server // Set health scores: replica[0] = 0.3 (low), replica[1] = 0.9 (high). - entry.Replicas[0].HealthScore = 0.3 - entry.Replicas[0].WALHeadLSN = 100 - entry.Replicas[1].HealthScore = 0.9 - entry.Replicas[1].WALHeadLSN = 100 - expectedWinner := entry.Replicas[1].Server - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) // expired + ms.blockRegistry.UpdateEntry("pvc-rf3-health", func(e *BlockVolumeEntry) { + e.Replicas[0].HealthScore = 0.3 + e.Replicas[0].WALHeadLSN = 100 + e.Replicas[1].HealthScore = 0.9 + e.Replicas[1].WALHeadLSN = 100 + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) // expired + }) ms.failoverBlockVolumes(primary) @@ -327,7 +331,9 @@ func TestQA_RF3_BackwardCompat_RF2_Unchanged(t *testing.T) { // Failover should work identically. primary := resp.VolumeServer - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry("pvc-rf2-compat", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) ms.failoverBlockVolumes(primary) entry, _ = ms.blockRegistry.Lookup("pvc-rf2-compat") @@ -365,7 +371,9 @@ func TestQA_RF3_FullLifecycle(t *testing.T) { vs1 := resp.VolumeServer vs2 := entry.Replicas[0].Server vs3 := entry.Replicas[1].Server - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + ms.blockRegistry.UpdateEntry("pvc-rf3-lifecycle", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) // Step 1: Kill vs1 (primary). One of vs2/vs3 promoted. ms.failoverBlockVolumes(vs1) @@ -387,15 +395,17 @@ func TestQA_RF3_FullLifecycle(t *testing.T) { } // Step 3: Kill step1Primary. The surviving replica (step1Replica or vs1) should be promoted. - entry.LastLeaseGrant = time.Now().Add(-1 * time.Minute) - // Set health scores: vs1 low (just rebuilt), step1Replica high. - for i := range entry.Replicas { - if entry.Replicas[i].Server == vs1 { - entry.Replicas[i].HealthScore = 0.5 - } else { - entry.Replicas[i].HealthScore = 1.0 + ms.blockRegistry.UpdateEntry("pvc-rf3-lifecycle", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + // Set health scores: vs1 low (just rebuilt), step1Replica high. + for i := range e.Replicas { + if e.Replicas[i].Server == vs1 { + e.Replicas[i].HealthScore = 0.5 + } else { + e.Replicas[i].HealthScore = 1.0 + } } - } + }) ms.failoverBlockVolumes(step1Primary) entry, _ = ms.blockRegistry.Lookup("pvc-rf3-lifecycle")