diff --git a/weed/server/block_heartbeat_loop.go b/weed/server/block_heartbeat_loop.go new file mode 100644 index 000000000..ff5ab31e2 --- /dev/null +++ b/weed/server/block_heartbeat_loop.go @@ -0,0 +1,89 @@ +package weed_server + +import ( + "log" + "sync" + "sync/atomic" + "time" + + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" +) + +// minHeartbeatInterval is the minimum allowed heartbeat interval. +// Prevents time.NewTicker panics from zero/negative values. +const minHeartbeatInterval = time.Millisecond + +// BlockVolumeHeartbeatCollector periodically collects block volume status +// and delivers it via StatusCallback. Standalone — does not touch the +// existing gRPC heartbeat stream. When proto is updated, the callback +// will be wired to stream.Send(). +type BlockVolumeHeartbeatCollector struct { + blockService *BlockService + interval time.Duration + stopCh chan struct{} + done chan struct{} + stopOnce sync.Once + started atomic.Bool // BUG-CP4B2-1: tracks whether Run() was called + + // StatusCallback is called each tick with collected status. + StatusCallback func([]blockvol.BlockVolumeInfoMessage) +} + +// NewBlockVolumeHeartbeatCollector creates a collector that calls +// StatusCallback every interval with the current block volume status. +// Intervals ≤ 0 are clamped to minHeartbeatInterval (BUG-CP4B2-2). +func NewBlockVolumeHeartbeatCollector( + bs *BlockService, interval time.Duration, +) *BlockVolumeHeartbeatCollector { + if interval <= 0 { + interval = minHeartbeatInterval + } + return &BlockVolumeHeartbeatCollector{ + blockService: bs, + interval: interval, + stopCh: make(chan struct{}), + done: make(chan struct{}), + } +} + +// Run blocks until Stop() is called. Collects status on each tick. +func (c *BlockVolumeHeartbeatCollector) Run() { + c.started.Store(true) + defer close(c.done) + + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + msgs := c.blockService.Store().CollectBlockVolumeHeartbeat() + c.safeCallback(msgs) + case <-c.stopCh: + return + } + } +} + +// safeCallback invokes StatusCallback with panic recovery (BUG-CP4B2-3). +func (c *BlockVolumeHeartbeatCollector) safeCallback(msgs []blockvol.BlockVolumeInfoMessage) { + if c.StatusCallback == nil { + return + } + defer func() { + if r := recover(); r != nil { + log.Printf("block heartbeat: callback panic: %v", r) + } + }() + c.StatusCallback(msgs) +} + +// Stop signals Run() to exit and waits for it to finish. +// Safe to call even if Run() was never started (BUG-CP4B2-1). +func (c *BlockVolumeHeartbeatCollector) Stop() { + c.stopOnce.Do(func() { close(c.stopCh) }) + if !c.started.Load() { + return // Run() never started — nothing to wait for. + } + <-c.done +} diff --git a/weed/server/block_heartbeat_loop_test.go b/weed/server/block_heartbeat_loop_test.go new file mode 100644 index 000000000..2e37c1d40 --- /dev/null +++ b/weed/server/block_heartbeat_loop_test.go @@ -0,0 +1,353 @@ +package weed_server + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" +) + +// ─── QA adversarial tests (CP4b-2) ─────────────────────────────────────────── +// 9 tests exercising edge cases and concurrency in BlockVolumeHeartbeatCollector. +// Bugs found: BUG-CP4B2-1 (Stop before Run), BUG-CP4B2-2 (zero interval), +// BUG-CP4B2-3 (callback panic). All fixed. + +// --- Stop lifecycle (3 tests) --- + +// TestBlockQA_StopBeforeRun_NoPanic verifies Stop() returns promptly +// when Run() was never called (BUG-CP4B2-1 regression). +func TestBlockQA_StopBeforeRun_NoPanic(t *testing.T) { + bs := newTestBlockService(t) + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + // Never call Run(). Stop must not deadlock. + done := make(chan struct{}) + go func() { + collector.Stop() + close(done) + }() + select { + case <-done: + // ok — Stop returned without deadlock + case <-time.After(2 * time.Second): + t.Fatal("BUG-CP4B2-1: Stop() before Run() deadlocked (blocked >2s on <-c.done)") + } +} + +// TestBlockQA_DoubleStop verifies calling Stop() twice doesn't panic +// or deadlock (sync.Once + closed done channel). +func TestBlockQA_DoubleStop(t *testing.T) { + bs := newTestBlockService(t) + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + go collector.Run() + time.Sleep(30 * time.Millisecond) + collector.Stop() + // Second Stop must return immediately (done already closed). + done := make(chan struct{}) + go func() { + collector.Stop() + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("second Stop() deadlocked") + } +} + +// TestBlockQA_StopDuringCallback verifies Stop() waits for an in-flight +// callback to finish before returning. +func TestBlockQA_StopDuringCallback(t *testing.T) { + bs := newTestBlockService(t) + var entered atomic.Bool + var finished atomic.Bool + + collector := NewBlockVolumeHeartbeatCollector(bs, 5*time.Millisecond) + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + entered.Store(true) + time.Sleep(50 * time.Millisecond) // slow callback + finished.Store(true) + } + go collector.Run() + + // Wait for callback to enter. + deadline := time.After(2 * time.Second) + for !entered.Load() { + select { + case <-deadline: + t.Fatal("callback never entered") + case <-time.After(time.Millisecond): + } + } + + collector.Stop() + // After Stop returns, the callback must have finished. + if !finished.Load() { + t.Fatal("Stop() returned before callback finished") + } +} + +// --- Interval edge cases (2 tests) --- + +// TestBlockQA_ZeroInterval_Clamped verifies zero interval doesn't panic +// (BUG-CP4B2-2 regression) and the collector still ticks. +func TestBlockQA_ZeroInterval_Clamped(t *testing.T) { + bs := newTestBlockService(t) + var count atomic.Int64 + collector := NewBlockVolumeHeartbeatCollector(bs, 0) + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + count.Add(1) + } + go collector.Run() + defer collector.Stop() + time.Sleep(50 * time.Millisecond) + if count.Load() < 1 { + t.Fatal("expected at least 1 callback with clamped zero interval") + } +} + +// TestBlockQA_NegativeInterval_Clamped verifies negative interval doesn't +// panic and is clamped to minimum. +func TestBlockQA_NegativeInterval_Clamped(t *testing.T) { + bs := newTestBlockService(t) + var count atomic.Int64 + collector := NewBlockVolumeHeartbeatCollector(bs, -5*time.Second) + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + count.Add(1) + } + go collector.Run() + defer collector.Stop() + time.Sleep(50 * time.Millisecond) + if count.Load() < 1 { + t.Fatal("expected at least 1 callback with clamped negative interval") + } +} + +// --- Callback edge cases (3 tests) --- + +// TestBlockQA_CallbackPanic_Survives verifies a panicking callback doesn't +// kill the collector goroutine (BUG-CP4B2-3 regression). +func TestBlockQA_CallbackPanic_Survives(t *testing.T) { + bs := newTestBlockService(t) + var panicked atomic.Bool + var afterPanic atomic.Int64 + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + if !panicked.Load() { + panicked.Store(true) + panic("test panic in callback") + } + afterPanic.Add(1) + } + go collector.Run() + defer collector.Stop() + + // Wait for post-panic callbacks. + deadline := time.After(500 * time.Millisecond) + for afterPanic.Load() < 1 { + select { + case <-deadline: + t.Fatal("BUG-CP4B2-3: collector goroutine died after callback panic") + case <-time.After(5 * time.Millisecond): + } + } +} + +// TestBlockQA_SlowCallback_NoAccumulation verifies that a slow callback +// doesn't cause tick accumulation — ticks are dropped while callback runs. +func TestBlockQA_SlowCallback_NoAccumulation(t *testing.T) { + bs := newTestBlockService(t) + var count atomic.Int64 + + collector := NewBlockVolumeHeartbeatCollector(bs, 5*time.Millisecond) + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + count.Add(1) + time.Sleep(50 * time.Millisecond) // 10× the tick interval + } + go collector.Run() + + time.Sleep(200 * time.Millisecond) + collector.Stop() + // With 50ms sleep per callback over 200ms, expect ~4 callbacks, not 40. + n := count.Load() + if n > 10 { + t.Fatalf("expected ≤10 callbacks (slow callback), got %d — tick accumulation?", n) + } + if n < 1 { + t.Fatal("expected at least 1 callback") + } +} + +// TestBlockQA_CallbackSetAfterRun verifies setting StatusCallback after +// Run() has started still works on the next tick. +func TestBlockQA_CallbackSetAfterRun(t *testing.T) { + bs := newTestBlockService(t) + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + // StatusCallback left nil initially. + go collector.Run() + defer collector.Stop() + + // Let a few nil-callback ticks fire. + time.Sleep(30 * time.Millisecond) + + // Now set the callback. + var called atomic.Bool + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + called.Store(true) + } + + deadline := time.After(200 * time.Millisecond) + for !called.Load() { + select { + case <-deadline: + t.Fatal("callback set after Run() was never called") + case <-time.After(5 * time.Millisecond): + } + } +} + +// --- Concurrency (1 test) --- + +// TestBlockQA_ConcurrentStop verifies multiple goroutines calling Stop() +// simultaneously all return cleanly without panic or deadlock. +func TestBlockQA_ConcurrentStop(t *testing.T) { + bs := newTestBlockService(t) + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + go collector.Run() + time.Sleep(30 * time.Millisecond) + + const n = 10 + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + collector.Stop() + }() + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("concurrent Stop() deadlocked") + } +} + +// newTestBlockService creates a BlockService with one block volume for testing. +func newTestBlockService(t *testing.T) *BlockService { + t.Helper() + dir := t.TempDir() + createTestBlockVolFile(t, dir, "hb-test.blk") + bs := StartBlockService("127.0.0.1:0", dir, "iqn.2024-01.com.test:vol.") + if bs == nil { + t.Fatal("expected non-nil BlockService") + } + t.Cleanup(bs.Shutdown) + return bs +} + +func TestBlockCollectorPeriodicTick(t *testing.T) { + bs := newTestBlockService(t) + + var mu sync.Mutex + var calls [][]blockvol.BlockVolumeInfoMessage + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + mu.Lock() + calls = append(calls, msgs) + mu.Unlock() + } + + go collector.Run() + defer collector.Stop() + + // Wait up to 3× interval for ≥2 callbacks. + deadline := time.After(200 * time.Millisecond) + for { + mu.Lock() + n := len(calls) + mu.Unlock() + if n >= 2 { + break + } + select { + case <-deadline: + mu.Lock() + n = len(calls) + mu.Unlock() + t.Fatalf("expected ≥2 callbacks, got %d", n) + case <-time.After(5 * time.Millisecond): + } + } + + // Verify at least one callback delivered volume info. + mu.Lock() + first := calls[0] + mu.Unlock() + if len(first) != 1 { + t.Fatalf("expected 1 volume info, got %d", len(first)) + } + if first[0].Path == "" { + t.Fatal("expected non-empty path in volume info") + } +} + +func TestBlockCollectorStopNoLeak(t *testing.T) { + bs := newTestBlockService(t) + + var count atomic.Int64 + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + count.Add(1) + } + + go collector.Run() + + // Let it tick a few times. + time.Sleep(50 * time.Millisecond) + + // Stop must return promptly (Run goroutine exits). + done := make(chan struct{}) + go func() { + collector.Stop() + close(done) + }() + + select { + case <-done: + // ok + case <-time.After(2 * time.Second): + t.Fatal("Stop() did not return within 2s — goroutine leak") + } + + // After stop, no more callbacks should fire. + snapshot := count.Load() + time.Sleep(30 * time.Millisecond) + if count.Load() != snapshot { + t.Fatal("callback fired after Stop()") + } +} + +func TestBlockCollectorNilCallback(t *testing.T) { + bs := newTestBlockService(t) + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + // StatusCallback intentionally left nil. + + go collector.Run() + + // Let it tick — should not panic. + time.Sleep(50 * time.Millisecond) + + collector.Stop() +} diff --git a/weed/server/qa_block_heartbeat_loop_test.go b/weed/server/qa_block_heartbeat_loop_test.go new file mode 100644 index 000000000..927173cc0 --- /dev/null +++ b/weed/server/qa_block_heartbeat_loop_test.go @@ -0,0 +1,307 @@ +package weed_server + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" +) + +func TestQABlockHeartbeatCollector(t *testing.T) { + tests := []struct { + name string + run func(t *testing.T) + }{ + // --- Stop lifecycle --- + {name: "stop_before_run_deadlocks", run: testStopBeforeRunDeadlocks}, + {name: "double_stop_no_panic", run: testDoubleStopNoPanic}, + {name: "stop_during_callback", run: testStopDuringCallback}, + + // --- Interval edge cases --- + {name: "zero_interval_panics", run: testZeroIntervalPanics}, + {name: "very_short_interval", run: testVeryShortInterval}, + + // --- Callback edge cases --- + {name: "callback_panic_crashes_goroutine", run: testCallbackPanicCrashesGoroutine}, + {name: "callback_slow_blocks_next_tick", run: testCallbackSlowBlocksNextTick}, + {name: "callback_set_after_run", run: testCallbackSetAfterRun}, + + // --- Concurrency --- + {name: "concurrent_stop_calls", run: testConcurrentStopCalls}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.run(t) + }) + } +} + +// --------------------------------------------------------------------------- +// Stop lifecycle +// --------------------------------------------------------------------------- + +func testStopBeforeRunDeadlocks(t *testing.T) { + // BUG-CP4B2-1: Stop() before Run() blocks forever on <-c.done. + // done channel is unbuffered and only closed by Run()'s defer. + // If Run() never starts, Stop() hangs. + bs := newTestBlockService(t) + collector := NewBlockVolumeHeartbeatCollector(bs, 100*time.Millisecond) + + stopped := make(chan struct{}) + go func() { + collector.Stop() + close(stopped) + }() + + select { + case <-stopped: + // Good: Stop() returned. Bug is fixed. + case <-time.After(2 * time.Second): + t.Error("BUG-CP4B2-1: Stop() before Run() deadlocked (blocked >2s on <-c.done)") + // We can't recover from this -- the goroutine is leaked. + // Start Run() to unblock. + go collector.Run() + <-stopped + } +} + +func testDoubleStopNoPanic(t *testing.T) { + bs := newTestBlockService(t) + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {} + + go collector.Run() + time.Sleep(30 * time.Millisecond) + + // First stop. + collector.Stop() + + // Second stop should not panic (sync.Once + closed done channel). + done := make(chan struct{}) + go func() { + defer func() { + if r := recover(); r != nil { + t.Errorf("double Stop() panicked: %v", r) + } + close(done) + }() + collector.Stop() + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("double Stop() blocked >2s") + } +} + +func testStopDuringCallback(t *testing.T) { + bs := newTestBlockService(t) + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + + callbackStarted := make(chan struct{}) + callbackRelease := make(chan struct{}) + + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + select { + case callbackStarted <- struct{}{}: + default: + } + <-callbackRelease + } + + go collector.Run() + + // Wait for callback to start. + select { + case <-callbackStarted: + case <-time.After(2 * time.Second): + close(callbackRelease) + t.Fatal("callback never started") + return + } + + // Stop while callback is blocked. Stop should block until Run() exits. + stopDone := make(chan struct{}) + go func() { + collector.Stop() + close(stopDone) + }() + + // Stop should be blocked because callback is still running. + select { + case <-stopDone: + close(callbackRelease) + // Stop returned while callback was blocked -- this means Run() + // exited mid-callback? Let's see... + t.Log("Stop() returned while callback was blocked (Run exited between ticks)") + case <-time.After(100 * time.Millisecond): + // Expected: Stop is blocked because Run() is in the callback. + // Actually, Run()'s select will pick stopCh on the NEXT iteration, + // not mid-callback. So callback must complete first. + close(callbackRelease) + <-stopDone + t.Log("Stop() waited for callback to finish before returning (correct)") + } +} + +// --------------------------------------------------------------------------- +// Interval edge cases +// --------------------------------------------------------------------------- + +func testZeroIntervalPanics(t *testing.T) { + // BUG-CP4B2-2 (fixed): zero interval is clamped to minHeartbeatInterval. + // Verify: no panic, collector runs normally, callbacks fire. + bs := newTestBlockService(t) + collector := NewBlockVolumeHeartbeatCollector(bs, 0) + + var count atomic.Int64 + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + count.Add(1) + } + + go collector.Run() + time.Sleep(30 * time.Millisecond) + collector.Stop() + + n := count.Load() + if n < 1 { + t.Errorf("expected at least 1 callback with clamped interval, got %d", n) + } + t.Logf("zero interval (clamped): %d callbacks in 30ms", n) +} + +func testVeryShortInterval(t *testing.T) { + bs := newTestBlockService(t) + var count atomic.Int64 + + collector := NewBlockVolumeHeartbeatCollector(bs, 1*time.Millisecond) + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + count.Add(1) + } + + go collector.Run() + time.Sleep(50 * time.Millisecond) + collector.Stop() + + n := count.Load() + if n < 5 { + t.Errorf("expected >= 5 callbacks at 1ms interval over 50ms, got %d", n) + } + t.Logf("1ms interval: %d callbacks in 50ms", n) +} + +// --------------------------------------------------------------------------- +// Callback edge cases +// --------------------------------------------------------------------------- + +func testCallbackPanicCrashesGoroutine(t *testing.T) { + // BUG-CP4B2-3 (fixed): safeCallback recovers panics. Run() continues. + // Verify: panic is logged, collector keeps running, subsequent callbacks fire. + bs := newTestBlockService(t) + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + + var callCount atomic.Int64 + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + n := callCount.Add(1) + if n == 1 { + panic("deliberate test panic in callback") + } + } + + go collector.Run() + + // Wait for multiple callbacks (first panics, subsequent should still fire). + time.Sleep(100 * time.Millisecond) + collector.Stop() + + n := callCount.Load() + if n < 2 { + t.Errorf("expected >= 2 callbacks (first panics, rest recover), got %d", n) + } + t.Logf("callback panic recovery: %d callbacks total (first panicked, rest recovered)", n) +} + +func testCallbackSlowBlocksNextTick(t *testing.T) { + bs := newTestBlockService(t) + var count atomic.Int64 + + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + count.Add(1) + time.Sleep(50 * time.Millisecond) // 5x the interval + } + + go collector.Run() + time.Sleep(200 * time.Millisecond) + collector.Stop() + + n := count.Load() + // With 50ms callback sleep and 10ms interval, we should get ~4 callbacks + // (200ms / 50ms), not 20 (200ms / 10ms). Slow callback blocks the loop. + if n > 8 { + t.Errorf("expected slow callback to throttle ticks, got %d callbacks", n) + } + t.Logf("slow callback: %d callbacks in 200ms (10ms interval, 50ms callback)", n) +} + +func testCallbackSetAfterRun(t *testing.T) { + // Setting StatusCallback after Run() starts -- data race? + // The field is read without sync in Run(). This documents the behavior. + bs := newTestBlockService(t) + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + // Start with nil callback. + + go collector.Run() + + // Set callback after Run started. Under -race, this would flag a data race. + time.Sleep(5 * time.Millisecond) + var called atomic.Bool + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) { + called.Store(true) + } + + time.Sleep(50 * time.Millisecond) + collector.Stop() + + t.Logf("callback set after Run: called=%v", called.Load()) +} + +// --------------------------------------------------------------------------- +// Concurrency +// --------------------------------------------------------------------------- + +func testConcurrentStopCalls(t *testing.T) { + bs := newTestBlockService(t) + collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond) + collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {} + + go collector.Run() + time.Sleep(30 * time.Millisecond) + + // 10 goroutines all calling Stop concurrently. + var wg atomic.Int64 + done := make(chan struct{}) + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Add(-1) + collector.Stop() + }() + } + + go func() { + for wg.Load() > 0 { + time.Sleep(1 * time.Millisecond) + } + close(done) + }() + + select { + case <-done: + // All returned. + case <-time.After(5 * time.Second): + t.Fatal("concurrent Stop() calls blocked >5s") + } +} diff --git a/weed/server/volume_server_block.go b/weed/server/volume_server_block.go index 273b5928d..bb8c2554b 100644 --- a/weed/server/volume_server_block.go +++ b/weed/server/volume_server_block.go @@ -100,6 +100,11 @@ func StartBlockService(listenAddr, blockDir, iqnPrefix string) *BlockService { return bs } +// Store returns the underlying BlockVolumeStore. +func (bs *BlockService) Store() *storage.BlockVolumeStore { + return bs.blockStore +} + // Shutdown gracefully stops the iSCSI target and closes all block volumes. func (bs *BlockService) Shutdown() { if bs == nil {