From 7c07d9c95a0c89c5a51b320d5e2ee59a95afbd57 Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Mon, 2 Mar 2026 11:34:06 -0800 Subject: [PATCH] feat: Phase 4A CP4b-3 -- assignment processing, 2 bug fixes, 20 QA tests Add ProcessBlockVolumeAssignments to BlockVolumeStore and wire AssignmentSource/AssignmentCallback into the heartbeat collector's Run() loop. Assignments are fetched and applied each tick after status collection. Bug fixes: - BUG-CP4B3-1: TOCTOU between GetBlockVolume and HandleAssignment. Added withVolume() helper that holds RLock across lookup+operation, preventing RemoveBlockVolume from closing the volume mid-assignment. - BUG-CP4B3-2: Data race on callback fields read by Run() goroutine. Made StatusCallback/AssignmentSource/AssignmentCallback private, added cbMu mutex and SetXxx() setter methods. Lock held only for load/store, not during callback execution. 7 dev tests + 13 QA adversarial tests = 20 new tests. 972 total unit tests, all passing. Co-Authored-By: Claude Opus 4.6 --- weed/server/block_heartbeat_loop.go | 108 +++- weed/server/block_heartbeat_loop_test.go | 218 +++++++- weed/server/qa_block_assign_test.go | 520 ++++++++++++++++++++ weed/server/qa_block_heartbeat_loop_test.go | 34 +- weed/storage/store_blockvol.go | 35 ++ 5 files changed, 862 insertions(+), 53 deletions(-) create mode 100644 weed/server/qa_block_assign_test.go diff --git a/weed/server/block_heartbeat_loop.go b/weed/server/block_heartbeat_loop.go index ff5ab31e2..e2804447b 100644 --- a/weed/server/block_heartbeat_loop.go +++ b/weed/server/block_heartbeat_loop.go @@ -14,7 +14,7 @@ import ( const minHeartbeatInterval = time.Millisecond // BlockVolumeHeartbeatCollector periodically collects block volume status -// and delivers it via StatusCallback. Standalone — does not touch the +// and delivers it via StatusCallback. Standalone -- does not touch the // existing gRPC heartbeat stream. When proto is updated, the callback // will be wired to stream.Send(). type BlockVolumeHeartbeatCollector struct { @@ -25,16 +25,23 @@ type BlockVolumeHeartbeatCollector struct { stopOnce sync.Once started atomic.Bool // BUG-CP4B2-1: tracks whether Run() was called - // StatusCallback is called each tick with collected status. - StatusCallback func([]blockvol.BlockVolumeInfoMessage) + // cbMu protects the callback/source fields against concurrent + // read (Run goroutine) and write (SetXxx callers). BUG-CP4B3-2. + cbMu sync.Mutex + statusCallback func([]blockvol.BlockVolumeInfoMessage) + assignmentSource func() []blockvol.BlockVolumeAssignment + assignmentCallback func([]blockvol.BlockVolumeAssignment, []error) } // NewBlockVolumeHeartbeatCollector creates a collector that calls -// StatusCallback every interval with the current block volume status. -// Intervals ≤ 0 are clamped to minHeartbeatInterval (BUG-CP4B2-2). +// the status callback every interval with the current block volume status. +// Intervals <= 0 are clamped to minHeartbeatInterval (BUG-CP4B2-2). func NewBlockVolumeHeartbeatCollector( bs *BlockService, interval time.Duration, ) *BlockVolumeHeartbeatCollector { + if bs == nil { + panic("block heartbeat: nil BlockService") + } if interval <= 0 { interval = minHeartbeatInterval } @@ -46,7 +53,35 @@ func NewBlockVolumeHeartbeatCollector( } } -// Run blocks until Stop() is called. Collects status on each tick. +// SetStatusCallback sets the function called each tick with collected status. +// Safe to call before or after Run() (BUG-CP4B3-2). +func (c *BlockVolumeHeartbeatCollector) SetStatusCallback(fn func([]blockvol.BlockVolumeInfoMessage)) { + c.cbMu.Lock() + c.statusCallback = fn + c.cbMu.Unlock() +} + +// SetAssignmentSource sets the function called each tick to fetch pending +// assignments. If nil, no assignments are processed. +// Safe to call before or after Run() (BUG-CP4B3-2). +func (c *BlockVolumeHeartbeatCollector) SetAssignmentSource(fn func() []blockvol.BlockVolumeAssignment) { + c.cbMu.Lock() + c.assignmentSource = fn + c.cbMu.Unlock() +} + +// SetAssignmentCallback sets the function called after processing with +// per-assignment errors. If nil, errors are silently dropped (already +// logged by ProcessBlockVolumeAssignments). +// Safe to call before or after Run() (BUG-CP4B3-2). +func (c *BlockVolumeHeartbeatCollector) SetAssignmentCallback(fn func([]blockvol.BlockVolumeAssignment, []error)) { + c.cbMu.Lock() + c.assignmentCallback = fn + c.cbMu.Unlock() +} + +// Run blocks until Stop() is called. Collects status on each tick, +// then processes any pending assignments. func (c *BlockVolumeHeartbeatCollector) Run() { c.started.Store(true) defer close(c.done) @@ -57,17 +92,45 @@ func (c *BlockVolumeHeartbeatCollector) Run() { for { select { case <-ticker.C: + // Outbound: collect and report status. msgs := c.blockService.Store().CollectBlockVolumeHeartbeat() c.safeCallback(msgs) + // Inbound: process any pending assignments. + c.processAssignments() case <-c.stopCh: return } } } -// safeCallback invokes StatusCallback with panic recovery (BUG-CP4B2-3). +// processAssignments fetches and applies pending assignments from master. +func (c *BlockVolumeHeartbeatCollector) processAssignments() { + c.cbMu.Lock() + src := c.assignmentSource + c.cbMu.Unlock() + if src == nil { + return + } + assignments := c.safeFuncCall(src) + if len(assignments) == 0 { + return + } + errs := c.blockService.Store().ProcessBlockVolumeAssignments(assignments) + c.cbMu.Lock() + cb := c.assignmentCallback + c.cbMu.Unlock() + if cb != nil { + c.safeAssignmentCallback(cb, assignments, errs) + } +} + +// safeCallback loads and invokes the status callback with panic recovery +// (BUG-CP4B2-3). Lock is held only for the load, not during the call. func (c *BlockVolumeHeartbeatCollector) safeCallback(msgs []blockvol.BlockVolumeInfoMessage) { - if c.StatusCallback == nil { + c.cbMu.Lock() + fn := c.statusCallback + c.cbMu.Unlock() + if fn == nil { return } defer func() { @@ -75,7 +138,32 @@ func (c *BlockVolumeHeartbeatCollector) safeCallback(msgs []blockvol.BlockVolume log.Printf("block heartbeat: callback panic: %v", r) } }() - c.StatusCallback(msgs) + fn(msgs) +} + +// safeFuncCall invokes an AssignmentSource with panic recovery. +func (c *BlockVolumeHeartbeatCollector) safeFuncCall( + fn func() []blockvol.BlockVolumeAssignment, +) []blockvol.BlockVolumeAssignment { + defer func() { + if r := recover(); r != nil { + log.Printf("block heartbeat: assignment source panic: %v", r) + } + }() + return fn() +} + +// safeAssignmentCallback invokes an AssignmentCallback with panic recovery. +func (c *BlockVolumeHeartbeatCollector) safeAssignmentCallback( + fn func([]blockvol.BlockVolumeAssignment, []error), + assignments []blockvol.BlockVolumeAssignment, errs []error, +) { + defer func() { + if r := recover(); r != nil { + log.Printf("block heartbeat: assignment callback panic: %v", r) + } + }() + fn(assignments, errs) } // Stop signals Run() to exit and waits for it to finish. @@ -83,7 +171,7 @@ func (c *BlockVolumeHeartbeatCollector) safeCallback(msgs []blockvol.BlockVolume func (c *BlockVolumeHeartbeatCollector) Stop() { c.stopOnce.Do(func() { close(c.stopCh) }) if !c.started.Load() { - return // Run() never started — nothing to wait for. + return // Run() never started -- nothing to wait for. } <-c.done } diff --git a/weed/server/block_heartbeat_loop_test.go b/weed/server/block_heartbeat_loop_test.go index 2e37c1d40..358d019a1 100644 --- a/weed/server/block_heartbeat_loop_test.go +++ b/weed/server/block_heartbeat_loop_test.go @@ -1,6 +1,7 @@ package weed_server import ( + "strings" "sync" "sync/atomic" "testing" @@ -9,7 +10,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" ) -// ─── QA adversarial tests (CP4b-2) ─────────────────────────────────────────── +// --- QA adversarial tests (CP4b-2) ---------------------------------------- // 9 tests exercising edge cases and concurrency in BlockVolumeHeartbeatCollector. // Bugs found: BUG-CP4B2-1 (Stop before Run), BUG-CP4B2-2 (zero interval), // BUG-CP4B2-3 (callback panic). All fixed. @@ -29,7 +30,7 @@ func TestBlockQA_StopBeforeRun_NoPanic(t *testing.T) { }() select { case <-done: - // ok — Stop returned without deadlock + // ok -- Stop returned without deadlock case <-time.After(2 * time.Second): t.Fatal("BUG-CP4B2-1: Stop() before Run() deadlocked (blocked >2s on <-c.done)") } @@ -64,11 +65,11 @@ func TestBlockQA_StopDuringCallback(t *testing.T) { var finished atomic.Bool collector := NewBlockVolumeHeartbeatCollector(bs, 5*time.Millisecond) - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { entered.Store(true) time.Sleep(50 * time.Millisecond) // slow callback finished.Store(true) - } + }) go collector.Run() // Wait for callback to enter. @@ -96,9 +97,9 @@ func TestBlockQA_ZeroInterval_Clamped(t *testing.T) { bs := newTestBlockService(t) var count atomic.Int64 collector := NewBlockVolumeHeartbeatCollector(bs, 0) - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { count.Add(1) - } + }) go collector.Run() defer collector.Stop() time.Sleep(50 * time.Millisecond) @@ -113,9 +114,9 @@ func TestBlockQA_NegativeInterval_Clamped(t *testing.T) { bs := newTestBlockService(t) var count atomic.Int64 collector := NewBlockVolumeHeartbeatCollector(bs, -5*time.Second) - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { count.Add(1) - } + }) go collector.Run() defer collector.Stop() time.Sleep(50 * time.Millisecond) @@ -134,13 +135,13 @@ func TestBlockQA_CallbackPanic_Survives(t *testing.T) { var afterPanic atomic.Int64 collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { if !panicked.Load() { panicked.Store(true) panic("test panic in callback") } afterPanic.Add(1) - } + }) go collector.Run() defer collector.Stop() @@ -156,16 +157,16 @@ func TestBlockQA_CallbackPanic_Survives(t *testing.T) { } // TestBlockQA_SlowCallback_NoAccumulation verifies that a slow callback -// doesn't cause tick accumulation — ticks are dropped while callback runs. +// doesn't cause tick accumulation -- ticks are dropped while callback runs. func TestBlockQA_SlowCallback_NoAccumulation(t *testing.T) { bs := newTestBlockService(t) var count atomic.Int64 collector := NewBlockVolumeHeartbeatCollector(bs, 5*time.Millisecond) - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { count.Add(1) - time.Sleep(50 * time.Millisecond) // 10× the tick interval - } + time.Sleep(50 * time.Millisecond) // 10x the tick interval + }) go collector.Run() time.Sleep(200 * time.Millisecond) @@ -173,14 +174,14 @@ func TestBlockQA_SlowCallback_NoAccumulation(t *testing.T) { // With 50ms sleep per callback over 200ms, expect ~4 callbacks, not 40. n := count.Load() if n > 10 { - t.Fatalf("expected ≤10 callbacks (slow callback), got %d — tick accumulation?", n) + t.Fatalf("expected <=10 callbacks (slow callback), got %d -- tick accumulation?", n) } if n < 1 { t.Fatal("expected at least 1 callback") } } -// TestBlockQA_CallbackSetAfterRun verifies setting StatusCallback after +// TestBlockQA_CallbackSetAfterRun verifies setting SetStatusCallback after // Run() has started still works on the next tick. func TestBlockQA_CallbackSetAfterRun(t *testing.T) { bs := newTestBlockService(t) @@ -195,9 +196,9 @@ func TestBlockQA_CallbackSetAfterRun(t *testing.T) { // Now set the callback. var called atomic.Bool - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { called.Store(true) - } + }) deadline := time.After(200 * time.Millisecond) for !called.Load() { @@ -261,16 +262,16 @@ func TestBlockCollectorPeriodicTick(t *testing.T) { var calls [][]blockvol.BlockVolumeInfoMessage collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { mu.Lock() calls = append(calls, msgs) mu.Unlock() - } + }) go collector.Run() defer collector.Stop() - // Wait up to 3× interval for ≥2 callbacks. + // Wait up to 3x interval for >=2 callbacks. deadline := time.After(200 * time.Millisecond) for { mu.Lock() @@ -284,7 +285,7 @@ func TestBlockCollectorPeriodicTick(t *testing.T) { mu.Lock() n = len(calls) mu.Unlock() - t.Fatalf("expected ≥2 callbacks, got %d", n) + t.Fatalf("expected >=2 callbacks, got %d", n) case <-time.After(5 * time.Millisecond): } } @@ -307,9 +308,9 @@ func TestBlockCollectorStopNoLeak(t *testing.T) { var count atomic.Int64 collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { count.Add(1) - } + }) go collector.Run() @@ -327,7 +328,7 @@ func TestBlockCollectorStopNoLeak(t *testing.T) { case <-done: // ok case <-time.After(2 * time.Second): - t.Fatal("Stop() did not return within 2s — goroutine leak") + t.Fatal("Stop() did not return within 2s -- goroutine leak") } // After stop, no more callbacks should fire. @@ -346,8 +347,173 @@ func TestBlockCollectorNilCallback(t *testing.T) { go collector.Run() - // Let it tick — should not panic. + // Let it tick -- should not panic. time.Sleep(50 * time.Millisecond) collector.Stop() } + +// --- Dev tests: Assignment Processing (CP4b-3) ---------------------------- +// 7 tests verifying ProcessBlockVolumeAssignments and AssignmentSource wiring. + +// testBlockVolPath returns the path of the single block volume in the test service. +func testBlockVolPath(t *testing.T, bs *BlockService) string { + t.Helper() + paths := bs.Store().ListBlockVolumes() + if len(paths) != 1 { + t.Fatalf("expected 1 volume, got %d", len(paths)) + } + return paths[0] +} + +// TestBlockAssign_Success verifies a valid assignment (RoleNone -> Primary) +// changes the volume's role. +func TestBlockAssign_Success(t *testing.T) { + bs := newTestBlockService(t) + path := testBlockVolPath(t, bs) + + assignments := []blockvol.BlockVolumeAssignment{{ + Path: path, + Epoch: 1, + Role: uint32(blockvol.RolePrimary), + LeaseTtlMs: 30000, + }} + errs := bs.Store().ProcessBlockVolumeAssignments(assignments) + if errs[0] != nil { + t.Fatalf("expected nil error, got %v", errs[0]) + } + + vol, _ := bs.Store().GetBlockVolume(path) + if vol.Role() != blockvol.RolePrimary { + t.Fatalf("expected Primary role, got %s", vol.Role()) + } +} + +// TestBlockAssign_UnknownVolume verifies an assignment for a non-existent +// volume returns an error without stopping processing. +func TestBlockAssign_UnknownVolume(t *testing.T) { + bs := newTestBlockService(t) + + assignments := []blockvol.BlockVolumeAssignment{{ + Path: "/nonexistent/vol.blk", + Epoch: 1, + Role: uint32(blockvol.RolePrimary), + }} + errs := bs.Store().ProcessBlockVolumeAssignments(assignments) + if errs[0] == nil { + t.Fatal("expected error for unknown volume") + } + if !strings.Contains(errs[0].Error(), "not found") { + t.Fatalf("expected 'not found' error, got: %v", errs[0]) + } +} + +// TestBlockAssign_InvalidTransition verifies an invalid role transition +// (RoleNone -> Stale) returns an error. +func TestBlockAssign_InvalidTransition(t *testing.T) { + bs := newTestBlockService(t) + path := testBlockVolPath(t, bs) + + assignments := []blockvol.BlockVolumeAssignment{{ + Path: path, + Epoch: 1, + Role: uint32(blockvol.RoleStale), + }} + errs := bs.Store().ProcessBlockVolumeAssignments(assignments) + if errs[0] == nil { + t.Fatal("expected error for invalid transition") + } +} + +// TestBlockAssign_EmptyAssignments verifies an empty slice produces no errors. +func TestBlockAssign_EmptyAssignments(t *testing.T) { + bs := newTestBlockService(t) + + errs := bs.Store().ProcessBlockVolumeAssignments(nil) + if len(errs) != 0 { + t.Fatalf("expected 0 errors for nil input, got %d", len(errs)) + } + + errs = bs.Store().ProcessBlockVolumeAssignments([]blockvol.BlockVolumeAssignment{}) + if len(errs) != 0 { + t.Fatalf("expected 0 errors for empty input, got %d", len(errs)) + } +} + +// TestBlockAssign_NilSource verifies that a nil AssignmentSource doesn't +// cause panics and status collection still works. +func TestBlockAssign_NilSource(t *testing.T) { + bs := newTestBlockService(t) + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + // AssignmentSource left nil intentionally. + var statusCalled atomic.Bool + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { + statusCalled.Store(true) + }) + go collector.Run() + defer collector.Stop() + + deadline := time.After(200 * time.Millisecond) + for !statusCalled.Load() { + select { + case <-deadline: + t.Fatal("status callback never fired with nil AssignmentSource") + case <-time.After(5 * time.Millisecond): + } + } +} + +// TestBlockAssign_MixedBatch verifies a batch with 1 success, 1 unknown volume, +// and 1 invalid transition returns parallel errors correctly. +func TestBlockAssign_MixedBatch(t *testing.T) { + bs := newTestBlockService(t) + path := testBlockVolPath(t, bs) + + assignments := []blockvol.BlockVolumeAssignment{ + {Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000}, + {Path: "/nonexistent/vol.blk", Epoch: 1, Role: uint32(blockvol.RolePrimary)}, + {Path: path, Epoch: 2, Role: uint32(blockvol.RoleReplica)}, // Primary->Replica is invalid + } + errs := bs.Store().ProcessBlockVolumeAssignments(assignments) + if len(errs) != 3 { + t.Fatalf("expected 3 errors, got %d", len(errs)) + } + if errs[0] != nil { + t.Fatalf("assignment 0: expected nil error, got %v", errs[0]) + } + if errs[1] == nil { + t.Fatal("assignment 1: expected error for unknown volume") + } + if errs[2] == nil { + t.Fatal("assignment 2: expected error for invalid transition") + } + + // Volume should still be Primary from assignment 0. + vol, _ := bs.Store().GetBlockVolume(path) + if vol.Role() != blockvol.RolePrimary { + t.Fatalf("expected Primary role after mixed batch, got %s", vol.Role()) + } +} + +// TestBlockAssign_RoleNoneIgnored verifies a RoleNone (0) assignment is a +// no-op same-role refresh -- no crash, no write gate opened. +func TestBlockAssign_RoleNoneIgnored(t *testing.T) { + bs := newTestBlockService(t) + path := testBlockVolPath(t, bs) + + assignments := []blockvol.BlockVolumeAssignment{{ + Path: path, + Epoch: 0, + Role: uint32(blockvol.RoleNone), + }} + errs := bs.Store().ProcessBlockVolumeAssignments(assignments) + if errs[0] != nil { + t.Fatalf("expected nil error for RoleNone assignment, got %v", errs[0]) + } + + // Volume should still be RoleNone. + vol, _ := bs.Store().GetBlockVolume(path) + if vol.Role() != blockvol.RoleNone { + t.Fatalf("expected RoleNone, got %s", vol.Role()) + } +} diff --git a/weed/server/qa_block_assign_test.go b/weed/server/qa_block_assign_test.go new file mode 100644 index 000000000..9450dd249 --- /dev/null +++ b/weed/server/qa_block_assign_test.go @@ -0,0 +1,520 @@ +package weed_server + +import ( + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" +) + +// --------------------------------------------------------------------------- +// QA adversarial tests for CP4b-3: Assignment Processing +// Targets: ProcessBlockVolumeAssignments, processAssignments, safe wrappers +// +// Bugs found: +// BUG-CP4B3-1 [Medium]: TOCTOU in ProcessBlockVolumeAssignments -- volume +// can be removed between GetBlockVolume and HandleAssignment. +// BUG-CP4B3-2 [Low]: Data race on callback fields -- fixed with cbMu + +// SetXxx methods. +// --------------------------------------------------------------------------- + +func TestQABlockAssignmentProcessing(t *testing.T) { + tests := []struct { + name string + run func(t *testing.T) + }{ + // --- Source / callback panic recovery --- + {name: "source_panic_recovery", run: testSourcePanicRecovery}, + {name: "callback_panic_recovery", run: testAssignCallbackPanicRecovery}, + + // --- Source edge cases --- + {name: "source_returns_nil", run: testSourceReturnsNil}, + {name: "source_returns_empty", run: testSourceReturnsEmpty}, + {name: "slow_source_blocks_tick", run: testSlowSourceBlocksTick}, + {name: "source_set_after_run", run: testAssignSourceSetAfterRun}, + + // --- Batch processing --- + {name: "same_volume_batch_ordering", run: testSameVolumeBatchOrdering}, + {name: "unknown_role_in_assignment", run: testUnknownRoleInAssignment}, + {name: "large_batch_100_assignments", run: testLargeBatch100}, + {name: "batch_all_unknown_volumes", run: testBatchAllUnknownVolumes}, + + // --- Integration: collector + assignments --- + {name: "stop_during_slow_assignment", run: testStopDuringSlowAssignment}, + {name: "assignment_and_status_both_fire", run: testAssignmentAndStatusBothFire}, + {name: "assignment_callback_receives_errs", run: testAssignmentCallbackReceivesErrs}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.run(t) + }) + } +} + +// --------------------------------------------------------------------------- +// Source / callback panic recovery +// --------------------------------------------------------------------------- + +// testSourcePanicRecovery verifies that a panicking AssignmentSource doesn't +// kill the collector. Status callbacks should continue after the panic. +func testSourcePanicRecovery(t *testing.T) { + bs := newTestBlockService(t) + var statusCount atomic.Int64 + var sourceCalls atomic.Int64 + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { + statusCount.Add(1) + }) + collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment { + n := sourceCalls.Add(1) + if n == 1 { + panic("test: source panic") + } + return nil // subsequent calls return empty + }) + go collector.Run() + defer collector.Stop() + + // Wait for post-panic ticks (status callbacks should keep firing). + deadline := time.After(500 * time.Millisecond) + for statusCount.Load() < 3 { + select { + case <-deadline: + t.Fatalf("collector died after source panic: only %d status callbacks", statusCount.Load()) + case <-time.After(5 * time.Millisecond): + } + } + + if sourceCalls.Load() < 2 { + t.Errorf("expected AssignmentSource called again after panic, got %d calls", sourceCalls.Load()) + } +} + +// testAssignCallbackPanicRecovery verifies that a panicking AssignmentCallback +// doesn't kill the collector. +func testAssignCallbackPanicRecovery(t *testing.T) { + bs := newTestBlockService(t) + path := testBlockVolPath(t, bs) + + var statusCount atomic.Int64 + var cbCalls atomic.Int64 + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { + statusCount.Add(1) + }) + collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment { + // Always return a valid promote-to-Primary assignment. + // After first call, HandleAssignment will return "same role" (no-op or error), + // but that's fine -- we're testing the callback panic, not the assignment. + return []blockvol.BlockVolumeAssignment{{ + Path: path, + Epoch: 1, + Role: uint32(blockvol.RolePrimary), + LeaseTtlMs: 30000, + }} + }) + collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) { + n := cbCalls.Add(1) + if n == 1 { + panic("test: callback panic") + } + }) + + go collector.Run() + defer collector.Stop() + + deadline := time.After(500 * time.Millisecond) + for statusCount.Load() < 3 { + select { + case <-deadline: + t.Fatalf("collector died after callback panic: only %d status callbacks", statusCount.Load()) + case <-time.After(5 * time.Millisecond): + } + } + + if cbCalls.Load() < 2 { + t.Errorf("expected AssignmentCallback called again after panic, got %d calls", cbCalls.Load()) + } +} + +// --------------------------------------------------------------------------- +// Source edge cases +// --------------------------------------------------------------------------- + +// testSourceReturnsNil verifies that AssignmentSource returning nil +// doesn't cause ProcessBlockVolumeAssignments to be called. +func testSourceReturnsNil(t *testing.T) { + bs := newTestBlockService(t) + var processAttempted atomic.Bool + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment { + return nil + }) + // If ProcessBlockVolumeAssignments were called with nil, + // it returns a nil slice (len 0) -- harmless. But the collector + // short-circuits on len==0 before calling Process. We verify + // by ensuring AssignmentCallback is never called. + collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) { + processAttempted.Store(true) + }) + + go collector.Run() + time.Sleep(50 * time.Millisecond) + collector.Stop() + + if processAttempted.Load() { + t.Error("AssignmentCallback called when source returned nil -- should short-circuit") + } +} + +// testSourceReturnsEmpty verifies that an empty slice from AssignmentSource +// is handled the same as nil (no processing). +func testSourceReturnsEmpty(t *testing.T) { + bs := newTestBlockService(t) + var processAttempted atomic.Bool + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment { + return []blockvol.BlockVolumeAssignment{} + }) + collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) { + processAttempted.Store(true) + }) + + go collector.Run() + time.Sleep(50 * time.Millisecond) + collector.Stop() + + if processAttempted.Load() { + t.Error("AssignmentCallback called when source returned empty -- should short-circuit") + } +} + +// testSlowSourceBlocksTick verifies that a slow AssignmentSource blocks the +// entire collector tick (status + assignment are sequential in the same tick). +func testSlowSourceBlocksTick(t *testing.T) { + bs := newTestBlockService(t) + var statusCount atomic.Int64 + + collector := NewBlockVolumeHeartbeatCollector(bs, 5*time.Millisecond) + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { + statusCount.Add(1) + }) + collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment { + time.Sleep(50 * time.Millisecond) // 10x the tick interval + return nil + }) + + go collector.Run() + time.Sleep(200 * time.Millisecond) + collector.Stop() + + // With 50ms source sleep, expect ~4 status callbacks (200ms/50ms), not 40. + n := statusCount.Load() + if n > 10 { + t.Errorf("expected slow source to throttle ticks, got %d status callbacks", n) + } + if n < 1 { + t.Error("expected at least 1 status callback") + } + t.Logf("slow source: %d status callbacks in 200ms (5ms interval, 50ms source)", n) +} + +// testAssignSourceSetAfterRun verifies that setting AssignmentSource after +// Run() started still picks up assignments on subsequent ticks. +func testAssignSourceSetAfterRun(t *testing.T) { + bs := newTestBlockService(t) + path := testBlockVolPath(t, bs) + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {}) + // AssignmentSource left nil initially. + + go collector.Run() + defer collector.Stop() + + // Let a few nil-source ticks fire. + time.Sleep(30 * time.Millisecond) + + // Now set the source + callback. + var cbCalled atomic.Bool + collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment { + return []blockvol.BlockVolumeAssignment{{ + Path: path, + Epoch: 1, + Role: uint32(blockvol.RolePrimary), + LeaseTtlMs: 30000, + }} + }) + collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) { + cbCalled.Store(true) + }) + + deadline := time.After(200 * time.Millisecond) + for !cbCalled.Load() { + select { + case <-deadline: + t.Fatal("AssignmentCallback never fired after setting source post-Run") + case <-time.After(5 * time.Millisecond): + } + } +} + +// --------------------------------------------------------------------------- +// Batch processing +// --------------------------------------------------------------------------- + +// testSameVolumeBatchOrdering verifies that when the same volume appears +// multiple times in a batch, assignments are applied sequentially in order. +// HandleAssignment: Primary->Stale (demote does Primary->Draining->Stale internally). +func testSameVolumeBatchOrdering(t *testing.T) { + bs := newTestBlockService(t) + path := testBlockVolPath(t, bs) + + // Batch: promote to Primary (epoch 1), then demote to Stale (epoch 2). + // Order matters -- first must run before second. + assignments := []blockvol.BlockVolumeAssignment{ + {Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000}, + {Path: path, Epoch: 2, Role: uint32(blockvol.RoleStale)}, + } + errs := bs.Store().ProcessBlockVolumeAssignments(assignments) + if errs[0] != nil { + t.Fatalf("assignment 0 (promote): %v", errs[0]) + } + if errs[1] != nil { + t.Fatalf("assignment 1 (demote): %v", errs[1]) + } + + vol, _ := bs.Store().GetBlockVolume(path) + if vol.Role() != blockvol.RoleStale { + t.Errorf("expected Stale after sequential batch, got %s", vol.Role()) + } +} + +// testUnknownRoleInAssignment verifies that a wire role value > maxValidRole +// is mapped to RoleNone by RoleFromWire. If the volume is already RoleNone, +// HandleAssignment with RoleNone is a no-op (same-role refresh). +func testUnknownRoleInAssignment(t *testing.T) { + bs := newTestBlockService(t) + path := testBlockVolPath(t, bs) + + assignments := []blockvol.BlockVolumeAssignment{{ + Path: path, + Epoch: 0, + Role: 255, // unknown -- RoleFromWire maps to RoleNone + }} + errs := bs.Store().ProcessBlockVolumeAssignments(assignments) + // RoleNone -> RoleNone is a no-op, should succeed. + if errs[0] != nil { + t.Fatalf("expected nil error for unknown role mapped to RoleNone, got: %v", errs[0]) + } + + vol, _ := bs.Store().GetBlockVolume(path) + if vol.Role() != blockvol.RoleNone { + t.Errorf("expected RoleNone, got %s", vol.Role()) + } +} + +// testLargeBatch100 verifies that 100 assignments in a single batch all +// target the same (valid) volume path. The first promotes to Primary, +// and subsequent 99 are same-role refreshes (same epoch). +func testLargeBatch100(t *testing.T) { + bs := newTestBlockService(t) + path := testBlockVolPath(t, bs) + + assignments := make([]blockvol.BlockVolumeAssignment, 100) + assignments[0] = blockvol.BlockVolumeAssignment{ + Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000, + } + for i := 1; i < 100; i++ { + assignments[i] = blockvol.BlockVolumeAssignment{ + Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000, + } + } + + errs := bs.Store().ProcessBlockVolumeAssignments(assignments) + if len(errs) != 100 { + t.Fatalf("expected 100 error slots, got %d", len(errs)) + } + // First should succeed (promote). + if errs[0] != nil { + t.Fatalf("assignment 0: %v", errs[0]) + } + // Remaining are same-role refreshes -- should all succeed. + for i := 1; i < 100; i++ { + if errs[i] != nil { + t.Errorf("assignment %d: unexpected error: %v", i, errs[i]) + } + } +} + +// testBatchAllUnknownVolumes verifies that a batch where every assignment +// targets a nonexistent volume returns all errors but doesn't panic. +func testBatchAllUnknownVolumes(t *testing.T) { + bs := newTestBlockService(t) + + assignments := make([]blockvol.BlockVolumeAssignment, 5) + for i := range assignments { + assignments[i] = blockvol.BlockVolumeAssignment{ + Path: "/nonexistent/vol.blk", + Epoch: 1, + Role: uint32(blockvol.RolePrimary), + } + } + + errs := bs.Store().ProcessBlockVolumeAssignments(assignments) + for i, err := range errs { + if err == nil { + t.Errorf("assignment %d: expected error for unknown volume", i) + } else if !strings.Contains(err.Error(), "not found") { + t.Errorf("assignment %d: expected 'not found', got: %v", i, err) + } + } +} + +// --------------------------------------------------------------------------- +// Integration: collector + assignments +// --------------------------------------------------------------------------- + +// testStopDuringSlowAssignment verifies that Stop() waits for a slow +// assignment source to complete before returning. +func testStopDuringSlowAssignment(t *testing.T) { + bs := newTestBlockService(t) + var sourceEntered atomic.Bool + var sourceFinished atomic.Bool + + collector := NewBlockVolumeHeartbeatCollector(bs, 5*time.Millisecond) + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {}) + collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment { + sourceEntered.Store(true) + time.Sleep(80 * time.Millisecond) + sourceFinished.Store(true) + return nil + }) + + go collector.Run() + + // Wait for source to enter. + deadline := time.After(2 * time.Second) + for !sourceEntered.Load() { + select { + case <-deadline: + collector.Stop() + t.Fatal("source never entered") + return + case <-time.After(time.Millisecond): + } + } + + // Stop should block until the current tick (including slow source) finishes. + collector.Stop() + if !sourceFinished.Load() { + t.Error("Stop() returned before slow AssignmentSource finished") + } +} + +// testAssignmentAndStatusBothFire verifies that both StatusCallback and +// AssignmentCallback fire on the same tick. +func testAssignmentAndStatusBothFire(t *testing.T) { + bs := newTestBlockService(t) + path := testBlockVolPath(t, bs) + + var statusCount atomic.Int64 + var assignCount atomic.Int64 + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { + statusCount.Add(1) + }) + collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment { + return []blockvol.BlockVolumeAssignment{{ + Path: path, + Epoch: 1, + Role: uint32(blockvol.RolePrimary), + LeaseTtlMs: 30000, + }} + }) + collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) { + assignCount.Add(1) + }) + + go collector.Run() + defer collector.Stop() + + deadline := time.After(500 * time.Millisecond) + for statusCount.Load() < 3 || assignCount.Load() < 3 { + select { + case <-deadline: + t.Fatalf("expected >=3 each, got status=%d assign=%d", + statusCount.Load(), assignCount.Load()) + case <-time.After(5 * time.Millisecond): + } + } +} + +// testAssignmentCallbackReceivesErrs verifies that the AssignmentCallback +// receives the correct parallel error slice from ProcessBlockVolumeAssignments. +func testAssignmentCallbackReceivesErrs(t *testing.T) { + bs := newTestBlockService(t) + path := testBlockVolPath(t, bs) + + var mu sync.Mutex + var gotAssignments []blockvol.BlockVolumeAssignment + var gotErrs []error + received := make(chan struct{}, 1) + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {}) + collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment { + // Mixed batch: 1 valid + 1 unknown + return []blockvol.BlockVolumeAssignment{ + {Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000}, + {Path: "/nonexistent/vol.blk", Epoch: 1, Role: uint32(blockvol.RolePrimary)}, + } + }) + collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) { + mu.Lock() + if gotAssignments == nil { + gotAssignments = a + gotErrs = errs + } + mu.Unlock() + select { + case received <- struct{}{}: + default: + } + }) + + go collector.Run() + defer collector.Stop() + + select { + case <-received: + case <-time.After(500 * time.Millisecond): + t.Fatal("AssignmentCallback never fired") + } + + mu.Lock() + defer mu.Unlock() + + if len(gotAssignments) != 2 { + t.Fatalf("expected 2 assignments, got %d", len(gotAssignments)) + } + if len(gotErrs) != 2 { + t.Fatalf("expected 2 errors, got %d", len(gotErrs)) + } + if gotErrs[0] != nil { + t.Errorf("assignment 0: expected nil error, got %v", gotErrs[0]) + } + if gotErrs[1] == nil { + t.Error("assignment 1: expected error for unknown volume") + } else if !strings.Contains(gotErrs[1].Error(), "not found") { + t.Errorf("assignment 1: expected 'not found', got: %v", gotErrs[1]) + } +} diff --git a/weed/server/qa_block_heartbeat_loop_test.go b/weed/server/qa_block_heartbeat_loop_test.go index 927173cc0..cc3dbddc1 100644 --- a/weed/server/qa_block_heartbeat_loop_test.go +++ b/weed/server/qa_block_heartbeat_loop_test.go @@ -69,7 +69,7 @@ func testStopBeforeRunDeadlocks(t *testing.T) { func testDoubleStopNoPanic(t *testing.T) { bs := newTestBlockService(t) collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {} + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {}) go collector.Run() time.Sleep(30 * time.Millisecond) @@ -103,13 +103,13 @@ func testStopDuringCallback(t *testing.T) { callbackStarted := make(chan struct{}) callbackRelease := make(chan struct{}) - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { select { case callbackStarted <- struct{}{}: default: } <-callbackRelease - } + }) go collector.Run() @@ -157,9 +157,9 @@ func testZeroIntervalPanics(t *testing.T) { collector := NewBlockVolumeHeartbeatCollector(bs, 0) var count atomic.Int64 - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { count.Add(1) - } + }) go collector.Run() time.Sleep(30 * time.Millisecond) @@ -177,9 +177,9 @@ func testVeryShortInterval(t *testing.T) { var count atomic.Int64 collector := NewBlockVolumeHeartbeatCollector(bs, 1*time.Millisecond) - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { count.Add(1) - } + }) go collector.Run() time.Sleep(50 * time.Millisecond) @@ -203,12 +203,12 @@ func testCallbackPanicCrashesGoroutine(t *testing.T) { collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) var callCount atomic.Int64 - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { n := callCount.Add(1) if n == 1 { panic("deliberate test panic in callback") } - } + }) go collector.Run() @@ -228,10 +228,10 @@ func testCallbackSlowBlocksNextTick(t *testing.T) { var count atomic.Int64 collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { count.Add(1) time.Sleep(50 * time.Millisecond) // 5x the interval - } + }) go collector.Run() time.Sleep(200 * time.Millisecond) @@ -247,20 +247,20 @@ func testCallbackSlowBlocksNextTick(t *testing.T) { } func testCallbackSetAfterRun(t *testing.T) { - // Setting StatusCallback after Run() starts -- data race? - // The field is read without sync in Run(). This documents the behavior. + // Setting SetStatusCallback after Run() starts -- now safe with cbMu + // (BUG-CP4B3-2 fix). bs := newTestBlockService(t) collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) // Start with nil callback. go collector.Run() - // Set callback after Run started. Under -race, this would flag a data race. + // Set callback after Run started. With cbMu, this is race-free. time.Sleep(5 * time.Millisecond) var called atomic.Bool - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) { called.Store(true) - } + }) time.Sleep(50 * time.Millisecond) collector.Stop() @@ -275,7 +275,7 @@ func testCallbackSetAfterRun(t *testing.T) { func testConcurrentStopCalls(t *testing.T) { bs := newTestBlockService(t) collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) - collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {} + collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {}) go collector.Run() time.Sleep(30 * time.Millisecond) diff --git a/weed/storage/store_blockvol.go b/weed/storage/store_blockvol.go index 49b97e99e..5046c3085 100644 --- a/weed/storage/store_blockvol.go +++ b/weed/storage/store_blockvol.go @@ -96,6 +96,41 @@ func (bs *BlockVolumeStore) CollectBlockVolumeHeartbeat() []blockvol.BlockVolume return msgs } +// withVolume looks up a volume by path and calls fn while holding RLock. +// This prevents RemoveBlockVolume from closing the volume while fn runs +// (BUG-CP4B3-1: TOCTOU between GetBlockVolume and HandleAssignment). +func (bs *BlockVolumeStore) withVolume(path string, fn func(*blockvol.BlockVol) error) error { + bs.mu.RLock() + defer bs.mu.RUnlock() + vol, ok := bs.volumes[path] + if !ok { + return fmt.Errorf("block volume not found: %s", path) + } + return fn(vol) +} + +// ProcessBlockVolumeAssignments applies a batch of assignments from master. +// Returns a slice of errors parallel to the input (nil = success). +// Unknown volumes and invalid transitions are logged and returned as errors, +// but do not stop processing of remaining assignments. +func (bs *BlockVolumeStore) ProcessBlockVolumeAssignments( + assignments []blockvol.BlockVolumeAssignment, +) []error { + errs := make([]error, len(assignments)) + for i, a := range assignments { + role := blockvol.RoleFromWire(a.Role) + ttl := blockvol.LeaseTTLFromWire(a.LeaseTtlMs) + if err := bs.withVolume(a.Path, func(vol *blockvol.BlockVol) error { + return vol.HandleAssignment(a.Epoch, role, ttl) + }); err != nil { + errs[i] = err + glog.Warningf("assignment: volume %s epoch=%d role=%s: %v", + a.Path, a.Epoch, role, err) + } + } + return errs +} + // Close closes all block volumes. func (bs *BlockVolumeStore) Close() { bs.mu.Lock()