Browse Source

feat: Phase 4A CP4b-3 -- assignment processing, 2 bug fixes, 20 QA tests

Add ProcessBlockVolumeAssignments to BlockVolumeStore and wire
AssignmentSource/AssignmentCallback into the heartbeat collector's
Run() loop. Assignments are fetched and applied each tick after
status collection.

Bug fixes:
- BUG-CP4B3-1: TOCTOU between GetBlockVolume and HandleAssignment.
  Added withVolume() helper that holds RLock across lookup+operation,
  preventing RemoveBlockVolume from closing the volume mid-assignment.
- BUG-CP4B3-2: Data race on callback fields read by Run() goroutine.
  Made StatusCallback/AssignmentSource/AssignmentCallback private,
  added cbMu mutex and SetXxx() setter methods. Lock held only for
  load/store, not during callback execution.

7 dev tests + 13 QA adversarial tests = 20 new tests.
972 total unit tests, all passing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
7c07d9c95a
  1. 108
      weed/server/block_heartbeat_loop.go
  2. 218
      weed/server/block_heartbeat_loop_test.go
  3. 520
      weed/server/qa_block_assign_test.go
  4. 34
      weed/server/qa_block_heartbeat_loop_test.go
  5. 35
      weed/storage/store_blockvol.go

108
weed/server/block_heartbeat_loop.go

@ -14,7 +14,7 @@ import (
const minHeartbeatInterval = time.Millisecond
// BlockVolumeHeartbeatCollector periodically collects block volume status
// and delivers it via StatusCallback. Standalone does not touch the
// and delivers it via StatusCallback. Standalone -- does not touch the
// existing gRPC heartbeat stream. When proto is updated, the callback
// will be wired to stream.Send().
type BlockVolumeHeartbeatCollector struct {
@ -25,16 +25,23 @@ type BlockVolumeHeartbeatCollector struct {
stopOnce sync.Once
started atomic.Bool // BUG-CP4B2-1: tracks whether Run() was called
// StatusCallback is called each tick with collected status.
StatusCallback func([]blockvol.BlockVolumeInfoMessage)
// cbMu protects the callback/source fields against concurrent
// read (Run goroutine) and write (SetXxx callers). BUG-CP4B3-2.
cbMu sync.Mutex
statusCallback func([]blockvol.BlockVolumeInfoMessage)
assignmentSource func() []blockvol.BlockVolumeAssignment
assignmentCallback func([]blockvol.BlockVolumeAssignment, []error)
}
// NewBlockVolumeHeartbeatCollector creates a collector that calls
// StatusCallback every interval with the current block volume status.
// Intervals 0 are clamped to minHeartbeatInterval (BUG-CP4B2-2).
// the status callback every interval with the current block volume status.
// Intervals <= 0 are clamped to minHeartbeatInterval (BUG-CP4B2-2).
func NewBlockVolumeHeartbeatCollector(
bs *BlockService, interval time.Duration,
) *BlockVolumeHeartbeatCollector {
if bs == nil {
panic("block heartbeat: nil BlockService")
}
if interval <= 0 {
interval = minHeartbeatInterval
}
@ -46,7 +53,35 @@ func NewBlockVolumeHeartbeatCollector(
}
}
// Run blocks until Stop() is called. Collects status on each tick.
// SetStatusCallback sets the function called each tick with collected status.
// Safe to call before or after Run() (BUG-CP4B3-2).
func (c *BlockVolumeHeartbeatCollector) SetStatusCallback(fn func([]blockvol.BlockVolumeInfoMessage)) {
c.cbMu.Lock()
c.statusCallback = fn
c.cbMu.Unlock()
}
// SetAssignmentSource sets the function called each tick to fetch pending
// assignments. If nil, no assignments are processed.
// Safe to call before or after Run() (BUG-CP4B3-2).
func (c *BlockVolumeHeartbeatCollector) SetAssignmentSource(fn func() []blockvol.BlockVolumeAssignment) {
c.cbMu.Lock()
c.assignmentSource = fn
c.cbMu.Unlock()
}
// SetAssignmentCallback sets the function called after processing with
// per-assignment errors. If nil, errors are silently dropped (already
// logged by ProcessBlockVolumeAssignments).
// Safe to call before or after Run() (BUG-CP4B3-2).
func (c *BlockVolumeHeartbeatCollector) SetAssignmentCallback(fn func([]blockvol.BlockVolumeAssignment, []error)) {
c.cbMu.Lock()
c.assignmentCallback = fn
c.cbMu.Unlock()
}
// Run blocks until Stop() is called. Collects status on each tick,
// then processes any pending assignments.
func (c *BlockVolumeHeartbeatCollector) Run() {
c.started.Store(true)
defer close(c.done)
@ -57,17 +92,45 @@ func (c *BlockVolumeHeartbeatCollector) Run() {
for {
select {
case <-ticker.C:
// Outbound: collect and report status.
msgs := c.blockService.Store().CollectBlockVolumeHeartbeat()
c.safeCallback(msgs)
// Inbound: process any pending assignments.
c.processAssignments()
case <-c.stopCh:
return
}
}
}
// safeCallback invokes StatusCallback with panic recovery (BUG-CP4B2-3).
// processAssignments fetches and applies pending assignments from master.
func (c *BlockVolumeHeartbeatCollector) processAssignments() {
c.cbMu.Lock()
src := c.assignmentSource
c.cbMu.Unlock()
if src == nil {
return
}
assignments := c.safeFuncCall(src)
if len(assignments) == 0 {
return
}
errs := c.blockService.Store().ProcessBlockVolumeAssignments(assignments)
c.cbMu.Lock()
cb := c.assignmentCallback
c.cbMu.Unlock()
if cb != nil {
c.safeAssignmentCallback(cb, assignments, errs)
}
}
// safeCallback loads and invokes the status callback with panic recovery
// (BUG-CP4B2-3). Lock is held only for the load, not during the call.
func (c *BlockVolumeHeartbeatCollector) safeCallback(msgs []blockvol.BlockVolumeInfoMessage) {
if c.StatusCallback == nil {
c.cbMu.Lock()
fn := c.statusCallback
c.cbMu.Unlock()
if fn == nil {
return
}
defer func() {
@ -75,7 +138,32 @@ func (c *BlockVolumeHeartbeatCollector) safeCallback(msgs []blockvol.BlockVolume
log.Printf("block heartbeat: callback panic: %v", r)
}
}()
c.StatusCallback(msgs)
fn(msgs)
}
// safeFuncCall invokes an AssignmentSource with panic recovery.
func (c *BlockVolumeHeartbeatCollector) safeFuncCall(
fn func() []blockvol.BlockVolumeAssignment,
) []blockvol.BlockVolumeAssignment {
defer func() {
if r := recover(); r != nil {
log.Printf("block heartbeat: assignment source panic: %v", r)
}
}()
return fn()
}
// safeAssignmentCallback invokes an AssignmentCallback with panic recovery.
func (c *BlockVolumeHeartbeatCollector) safeAssignmentCallback(
fn func([]blockvol.BlockVolumeAssignment, []error),
assignments []blockvol.BlockVolumeAssignment, errs []error,
) {
defer func() {
if r := recover(); r != nil {
log.Printf("block heartbeat: assignment callback panic: %v", r)
}
}()
fn(assignments, errs)
}
// Stop signals Run() to exit and waits for it to finish.
@ -83,7 +171,7 @@ func (c *BlockVolumeHeartbeatCollector) safeCallback(msgs []blockvol.BlockVolume
func (c *BlockVolumeHeartbeatCollector) Stop() {
c.stopOnce.Do(func() { close(c.stopCh) })
if !c.started.Load() {
return // Run() never started nothing to wait for.
return // Run() never started -- nothing to wait for.
}
<-c.done
}

218
weed/server/block_heartbeat_loop_test.go

@ -1,6 +1,7 @@
package weed_server
import (
"strings"
"sync"
"sync/atomic"
"testing"
@ -9,7 +10,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
// ─── QA adversarial tests (CP4b-2) ───────────────────────────────────────────
// --- QA adversarial tests (CP4b-2) ----------------------------------------
// 9 tests exercising edge cases and concurrency in BlockVolumeHeartbeatCollector.
// Bugs found: BUG-CP4B2-1 (Stop before Run), BUG-CP4B2-2 (zero interval),
// BUG-CP4B2-3 (callback panic). All fixed.
@ -29,7 +30,7 @@ func TestBlockQA_StopBeforeRun_NoPanic(t *testing.T) {
}()
select {
case <-done:
// ok Stop returned without deadlock
// ok -- Stop returned without deadlock
case <-time.After(2 * time.Second):
t.Fatal("BUG-CP4B2-1: Stop() before Run() deadlocked (blocked >2s on <-c.done)")
}
@ -64,11 +65,11 @@ func TestBlockQA_StopDuringCallback(t *testing.T) {
var finished atomic.Bool
collector := NewBlockVolumeHeartbeatCollector(bs, 5*time.Millisecond)
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
entered.Store(true)
time.Sleep(50 * time.Millisecond) // slow callback
finished.Store(true)
}
})
go collector.Run()
// Wait for callback to enter.
@ -96,9 +97,9 @@ func TestBlockQA_ZeroInterval_Clamped(t *testing.T) {
bs := newTestBlockService(t)
var count atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 0)
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
count.Add(1)
}
})
go collector.Run()
defer collector.Stop()
time.Sleep(50 * time.Millisecond)
@ -113,9 +114,9 @@ func TestBlockQA_NegativeInterval_Clamped(t *testing.T) {
bs := newTestBlockService(t)
var count atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, -5*time.Second)
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
count.Add(1)
}
})
go collector.Run()
defer collector.Stop()
time.Sleep(50 * time.Millisecond)
@ -134,13 +135,13 @@ func TestBlockQA_CallbackPanic_Survives(t *testing.T) {
var afterPanic atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
if !panicked.Load() {
panicked.Store(true)
panic("test panic in callback")
}
afterPanic.Add(1)
}
})
go collector.Run()
defer collector.Stop()
@ -156,16 +157,16 @@ func TestBlockQA_CallbackPanic_Survives(t *testing.T) {
}
// TestBlockQA_SlowCallback_NoAccumulation verifies that a slow callback
// doesn't cause tick accumulation ticks are dropped while callback runs.
// doesn't cause tick accumulation -- ticks are dropped while callback runs.
func TestBlockQA_SlowCallback_NoAccumulation(t *testing.T) {
bs := newTestBlockService(t)
var count atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 5*time.Millisecond)
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
count.Add(1)
time.Sleep(50 * time.Millisecond) // 10× the tick interval
}
time.Sleep(50 * time.Millisecond) // 10x the tick interval
})
go collector.Run()
time.Sleep(200 * time.Millisecond)
@ -173,14 +174,14 @@ func TestBlockQA_SlowCallback_NoAccumulation(t *testing.T) {
// With 50ms sleep per callback over 200ms, expect ~4 callbacks, not 40.
n := count.Load()
if n > 10 {
t.Fatalf("expected ≤10 callbacks (slow callback), got %d — tick accumulation?", n)
t.Fatalf("expected <=10 callbacks (slow callback), got %d -- tick accumulation?", n)
}
if n < 1 {
t.Fatal("expected at least 1 callback")
}
}
// TestBlockQA_CallbackSetAfterRun verifies setting StatusCallback after
// TestBlockQA_CallbackSetAfterRun verifies setting SetStatusCallback after
// Run() has started still works on the next tick.
func TestBlockQA_CallbackSetAfterRun(t *testing.T) {
bs := newTestBlockService(t)
@ -195,9 +196,9 @@ func TestBlockQA_CallbackSetAfterRun(t *testing.T) {
// Now set the callback.
var called atomic.Bool
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
called.Store(true)
}
})
deadline := time.After(200 * time.Millisecond)
for !called.Load() {
@ -261,16 +262,16 @@ func TestBlockCollectorPeriodicTick(t *testing.T) {
var calls [][]blockvol.BlockVolumeInfoMessage
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
mu.Lock()
calls = append(calls, msgs)
mu.Unlock()
}
})
go collector.Run()
defer collector.Stop()
// Wait up to 3× interval for ≥2 callbacks.
// Wait up to 3x interval for >=2 callbacks.
deadline := time.After(200 * time.Millisecond)
for {
mu.Lock()
@ -284,7 +285,7 @@ func TestBlockCollectorPeriodicTick(t *testing.T) {
mu.Lock()
n = len(calls)
mu.Unlock()
t.Fatalf("expected 2 callbacks, got %d", n)
t.Fatalf("expected >=2 callbacks, got %d", n)
case <-time.After(5 * time.Millisecond):
}
}
@ -307,9 +308,9 @@ func TestBlockCollectorStopNoLeak(t *testing.T) {
var count atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
count.Add(1)
}
})
go collector.Run()
@ -327,7 +328,7 @@ func TestBlockCollectorStopNoLeak(t *testing.T) {
case <-done:
// ok
case <-time.After(2 * time.Second):
t.Fatal("Stop() did not return within 2s goroutine leak")
t.Fatal("Stop() did not return within 2s -- goroutine leak")
}
// After stop, no more callbacks should fire.
@ -346,8 +347,173 @@ func TestBlockCollectorNilCallback(t *testing.T) {
go collector.Run()
// Let it tick should not panic.
// Let it tick -- should not panic.
time.Sleep(50 * time.Millisecond)
collector.Stop()
}
// --- Dev tests: Assignment Processing (CP4b-3) ----------------------------
// 7 tests verifying ProcessBlockVolumeAssignments and AssignmentSource wiring.
// testBlockVolPath returns the path of the single block volume in the test service.
func testBlockVolPath(t *testing.T, bs *BlockService) string {
t.Helper()
paths := bs.Store().ListBlockVolumes()
if len(paths) != 1 {
t.Fatalf("expected 1 volume, got %d", len(paths))
}
return paths[0]
}
// TestBlockAssign_Success verifies a valid assignment (RoleNone -> Primary)
// changes the volume's role.
func TestBlockAssign_Success(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
assignments := []blockvol.BlockVolumeAssignment{{
Path: path,
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
LeaseTtlMs: 30000,
}}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
if errs[0] != nil {
t.Fatalf("expected nil error, got %v", errs[0])
}
vol, _ := bs.Store().GetBlockVolume(path)
if vol.Role() != blockvol.RolePrimary {
t.Fatalf("expected Primary role, got %s", vol.Role())
}
}
// TestBlockAssign_UnknownVolume verifies an assignment for a non-existent
// volume returns an error without stopping processing.
func TestBlockAssign_UnknownVolume(t *testing.T) {
bs := newTestBlockService(t)
assignments := []blockvol.BlockVolumeAssignment{{
Path: "/nonexistent/vol.blk",
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
}}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
if errs[0] == nil {
t.Fatal("expected error for unknown volume")
}
if !strings.Contains(errs[0].Error(), "not found") {
t.Fatalf("expected 'not found' error, got: %v", errs[0])
}
}
// TestBlockAssign_InvalidTransition verifies an invalid role transition
// (RoleNone -> Stale) returns an error.
func TestBlockAssign_InvalidTransition(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
assignments := []blockvol.BlockVolumeAssignment{{
Path: path,
Epoch: 1,
Role: uint32(blockvol.RoleStale),
}}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
if errs[0] == nil {
t.Fatal("expected error for invalid transition")
}
}
// TestBlockAssign_EmptyAssignments verifies an empty slice produces no errors.
func TestBlockAssign_EmptyAssignments(t *testing.T) {
bs := newTestBlockService(t)
errs := bs.Store().ProcessBlockVolumeAssignments(nil)
if len(errs) != 0 {
t.Fatalf("expected 0 errors for nil input, got %d", len(errs))
}
errs = bs.Store().ProcessBlockVolumeAssignments([]blockvol.BlockVolumeAssignment{})
if len(errs) != 0 {
t.Fatalf("expected 0 errors for empty input, got %d", len(errs))
}
}
// TestBlockAssign_NilSource verifies that a nil AssignmentSource doesn't
// cause panics and status collection still works.
func TestBlockAssign_NilSource(t *testing.T) {
bs := newTestBlockService(t)
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
// AssignmentSource left nil intentionally.
var statusCalled atomic.Bool
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
statusCalled.Store(true)
})
go collector.Run()
defer collector.Stop()
deadline := time.After(200 * time.Millisecond)
for !statusCalled.Load() {
select {
case <-deadline:
t.Fatal("status callback never fired with nil AssignmentSource")
case <-time.After(5 * time.Millisecond):
}
}
}
// TestBlockAssign_MixedBatch verifies a batch with 1 success, 1 unknown volume,
// and 1 invalid transition returns parallel errors correctly.
func TestBlockAssign_MixedBatch(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
assignments := []blockvol.BlockVolumeAssignment{
{Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000},
{Path: "/nonexistent/vol.blk", Epoch: 1, Role: uint32(blockvol.RolePrimary)},
{Path: path, Epoch: 2, Role: uint32(blockvol.RoleReplica)}, // Primary->Replica is invalid
}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
if len(errs) != 3 {
t.Fatalf("expected 3 errors, got %d", len(errs))
}
if errs[0] != nil {
t.Fatalf("assignment 0: expected nil error, got %v", errs[0])
}
if errs[1] == nil {
t.Fatal("assignment 1: expected error for unknown volume")
}
if errs[2] == nil {
t.Fatal("assignment 2: expected error for invalid transition")
}
// Volume should still be Primary from assignment 0.
vol, _ := bs.Store().GetBlockVolume(path)
if vol.Role() != blockvol.RolePrimary {
t.Fatalf("expected Primary role after mixed batch, got %s", vol.Role())
}
}
// TestBlockAssign_RoleNoneIgnored verifies a RoleNone (0) assignment is a
// no-op same-role refresh -- no crash, no write gate opened.
func TestBlockAssign_RoleNoneIgnored(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
assignments := []blockvol.BlockVolumeAssignment{{
Path: path,
Epoch: 0,
Role: uint32(blockvol.RoleNone),
}}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
if errs[0] != nil {
t.Fatalf("expected nil error for RoleNone assignment, got %v", errs[0])
}
// Volume should still be RoleNone.
vol, _ := bs.Store().GetBlockVolume(path)
if vol.Role() != blockvol.RoleNone {
t.Fatalf("expected RoleNone, got %s", vol.Role())
}
}

520
weed/server/qa_block_assign_test.go

@ -0,0 +1,520 @@
package weed_server
import (
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
// ---------------------------------------------------------------------------
// QA adversarial tests for CP4b-3: Assignment Processing
// Targets: ProcessBlockVolumeAssignments, processAssignments, safe wrappers
//
// Bugs found:
// BUG-CP4B3-1 [Medium]: TOCTOU in ProcessBlockVolumeAssignments -- volume
// can be removed between GetBlockVolume and HandleAssignment.
// BUG-CP4B3-2 [Low]: Data race on callback fields -- fixed with cbMu +
// SetXxx methods.
// ---------------------------------------------------------------------------
func TestQABlockAssignmentProcessing(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
// --- Source / callback panic recovery ---
{name: "source_panic_recovery", run: testSourcePanicRecovery},
{name: "callback_panic_recovery", run: testAssignCallbackPanicRecovery},
// --- Source edge cases ---
{name: "source_returns_nil", run: testSourceReturnsNil},
{name: "source_returns_empty", run: testSourceReturnsEmpty},
{name: "slow_source_blocks_tick", run: testSlowSourceBlocksTick},
{name: "source_set_after_run", run: testAssignSourceSetAfterRun},
// --- Batch processing ---
{name: "same_volume_batch_ordering", run: testSameVolumeBatchOrdering},
{name: "unknown_role_in_assignment", run: testUnknownRoleInAssignment},
{name: "large_batch_100_assignments", run: testLargeBatch100},
{name: "batch_all_unknown_volumes", run: testBatchAllUnknownVolumes},
// --- Integration: collector + assignments ---
{name: "stop_during_slow_assignment", run: testStopDuringSlowAssignment},
{name: "assignment_and_status_both_fire", run: testAssignmentAndStatusBothFire},
{name: "assignment_callback_receives_errs", run: testAssignmentCallbackReceivesErrs},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.run(t)
})
}
}
// ---------------------------------------------------------------------------
// Source / callback panic recovery
// ---------------------------------------------------------------------------
// testSourcePanicRecovery verifies that a panicking AssignmentSource doesn't
// kill the collector. Status callbacks should continue after the panic.
func testSourcePanicRecovery(t *testing.T) {
bs := newTestBlockService(t)
var statusCount atomic.Int64
var sourceCalls atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
statusCount.Add(1)
})
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
n := sourceCalls.Add(1)
if n == 1 {
panic("test: source panic")
}
return nil // subsequent calls return empty
})
go collector.Run()
defer collector.Stop()
// Wait for post-panic ticks (status callbacks should keep firing).
deadline := time.After(500 * time.Millisecond)
for statusCount.Load() < 3 {
select {
case <-deadline:
t.Fatalf("collector died after source panic: only %d status callbacks", statusCount.Load())
case <-time.After(5 * time.Millisecond):
}
}
if sourceCalls.Load() < 2 {
t.Errorf("expected AssignmentSource called again after panic, got %d calls", sourceCalls.Load())
}
}
// testAssignCallbackPanicRecovery verifies that a panicking AssignmentCallback
// doesn't kill the collector.
func testAssignCallbackPanicRecovery(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
var statusCount atomic.Int64
var cbCalls atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
statusCount.Add(1)
})
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
// Always return a valid promote-to-Primary assignment.
// After first call, HandleAssignment will return "same role" (no-op or error),
// but that's fine -- we're testing the callback panic, not the assignment.
return []blockvol.BlockVolumeAssignment{{
Path: path,
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
LeaseTtlMs: 30000,
}}
})
collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) {
n := cbCalls.Add(1)
if n == 1 {
panic("test: callback panic")
}
})
go collector.Run()
defer collector.Stop()
deadline := time.After(500 * time.Millisecond)
for statusCount.Load() < 3 {
select {
case <-deadline:
t.Fatalf("collector died after callback panic: only %d status callbacks", statusCount.Load())
case <-time.After(5 * time.Millisecond):
}
}
if cbCalls.Load() < 2 {
t.Errorf("expected AssignmentCallback called again after panic, got %d calls", cbCalls.Load())
}
}
// ---------------------------------------------------------------------------
// Source edge cases
// ---------------------------------------------------------------------------
// testSourceReturnsNil verifies that AssignmentSource returning nil
// doesn't cause ProcessBlockVolumeAssignments to be called.
func testSourceReturnsNil(t *testing.T) {
bs := newTestBlockService(t)
var processAttempted atomic.Bool
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
return nil
})
// If ProcessBlockVolumeAssignments were called with nil,
// it returns a nil slice (len 0) -- harmless. But the collector
// short-circuits on len==0 before calling Process. We verify
// by ensuring AssignmentCallback is never called.
collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) {
processAttempted.Store(true)
})
go collector.Run()
time.Sleep(50 * time.Millisecond)
collector.Stop()
if processAttempted.Load() {
t.Error("AssignmentCallback called when source returned nil -- should short-circuit")
}
}
// testSourceReturnsEmpty verifies that an empty slice from AssignmentSource
// is handled the same as nil (no processing).
func testSourceReturnsEmpty(t *testing.T) {
bs := newTestBlockService(t)
var processAttempted atomic.Bool
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
return []blockvol.BlockVolumeAssignment{}
})
collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) {
processAttempted.Store(true)
})
go collector.Run()
time.Sleep(50 * time.Millisecond)
collector.Stop()
if processAttempted.Load() {
t.Error("AssignmentCallback called when source returned empty -- should short-circuit")
}
}
// testSlowSourceBlocksTick verifies that a slow AssignmentSource blocks the
// entire collector tick (status + assignment are sequential in the same tick).
func testSlowSourceBlocksTick(t *testing.T) {
bs := newTestBlockService(t)
var statusCount atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 5*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
statusCount.Add(1)
})
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
time.Sleep(50 * time.Millisecond) // 10x the tick interval
return nil
})
go collector.Run()
time.Sleep(200 * time.Millisecond)
collector.Stop()
// With 50ms source sleep, expect ~4 status callbacks (200ms/50ms), not 40.
n := statusCount.Load()
if n > 10 {
t.Errorf("expected slow source to throttle ticks, got %d status callbacks", n)
}
if n < 1 {
t.Error("expected at least 1 status callback")
}
t.Logf("slow source: %d status callbacks in 200ms (5ms interval, 50ms source)", n)
}
// testAssignSourceSetAfterRun verifies that setting AssignmentSource after
// Run() started still picks up assignments on subsequent ticks.
func testAssignSourceSetAfterRun(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {})
// AssignmentSource left nil initially.
go collector.Run()
defer collector.Stop()
// Let a few nil-source ticks fire.
time.Sleep(30 * time.Millisecond)
// Now set the source + callback.
var cbCalled atomic.Bool
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
return []blockvol.BlockVolumeAssignment{{
Path: path,
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
LeaseTtlMs: 30000,
}}
})
collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) {
cbCalled.Store(true)
})
deadline := time.After(200 * time.Millisecond)
for !cbCalled.Load() {
select {
case <-deadline:
t.Fatal("AssignmentCallback never fired after setting source post-Run")
case <-time.After(5 * time.Millisecond):
}
}
}
// ---------------------------------------------------------------------------
// Batch processing
// ---------------------------------------------------------------------------
// testSameVolumeBatchOrdering verifies that when the same volume appears
// multiple times in a batch, assignments are applied sequentially in order.
// HandleAssignment: Primary->Stale (demote does Primary->Draining->Stale internally).
func testSameVolumeBatchOrdering(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
// Batch: promote to Primary (epoch 1), then demote to Stale (epoch 2).
// Order matters -- first must run before second.
assignments := []blockvol.BlockVolumeAssignment{
{Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000},
{Path: path, Epoch: 2, Role: uint32(blockvol.RoleStale)},
}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
if errs[0] != nil {
t.Fatalf("assignment 0 (promote): %v", errs[0])
}
if errs[1] != nil {
t.Fatalf("assignment 1 (demote): %v", errs[1])
}
vol, _ := bs.Store().GetBlockVolume(path)
if vol.Role() != blockvol.RoleStale {
t.Errorf("expected Stale after sequential batch, got %s", vol.Role())
}
}
// testUnknownRoleInAssignment verifies that a wire role value > maxValidRole
// is mapped to RoleNone by RoleFromWire. If the volume is already RoleNone,
// HandleAssignment with RoleNone is a no-op (same-role refresh).
func testUnknownRoleInAssignment(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
assignments := []blockvol.BlockVolumeAssignment{{
Path: path,
Epoch: 0,
Role: 255, // unknown -- RoleFromWire maps to RoleNone
}}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
// RoleNone -> RoleNone is a no-op, should succeed.
if errs[0] != nil {
t.Fatalf("expected nil error for unknown role mapped to RoleNone, got: %v", errs[0])
}
vol, _ := bs.Store().GetBlockVolume(path)
if vol.Role() != blockvol.RoleNone {
t.Errorf("expected RoleNone, got %s", vol.Role())
}
}
// testLargeBatch100 verifies that 100 assignments in a single batch all
// target the same (valid) volume path. The first promotes to Primary,
// and subsequent 99 are same-role refreshes (same epoch).
func testLargeBatch100(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
assignments := make([]blockvol.BlockVolumeAssignment, 100)
assignments[0] = blockvol.BlockVolumeAssignment{
Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000,
}
for i := 1; i < 100; i++ {
assignments[i] = blockvol.BlockVolumeAssignment{
Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000,
}
}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
if len(errs) != 100 {
t.Fatalf("expected 100 error slots, got %d", len(errs))
}
// First should succeed (promote).
if errs[0] != nil {
t.Fatalf("assignment 0: %v", errs[0])
}
// Remaining are same-role refreshes -- should all succeed.
for i := 1; i < 100; i++ {
if errs[i] != nil {
t.Errorf("assignment %d: unexpected error: %v", i, errs[i])
}
}
}
// testBatchAllUnknownVolumes verifies that a batch where every assignment
// targets a nonexistent volume returns all errors but doesn't panic.
func testBatchAllUnknownVolumes(t *testing.T) {
bs := newTestBlockService(t)
assignments := make([]blockvol.BlockVolumeAssignment, 5)
for i := range assignments {
assignments[i] = blockvol.BlockVolumeAssignment{
Path: "/nonexistent/vol.blk",
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
}
}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
for i, err := range errs {
if err == nil {
t.Errorf("assignment %d: expected error for unknown volume", i)
} else if !strings.Contains(err.Error(), "not found") {
t.Errorf("assignment %d: expected 'not found', got: %v", i, err)
}
}
}
// ---------------------------------------------------------------------------
// Integration: collector + assignments
// ---------------------------------------------------------------------------
// testStopDuringSlowAssignment verifies that Stop() waits for a slow
// assignment source to complete before returning.
func testStopDuringSlowAssignment(t *testing.T) {
bs := newTestBlockService(t)
var sourceEntered atomic.Bool
var sourceFinished atomic.Bool
collector := NewBlockVolumeHeartbeatCollector(bs, 5*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {})
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
sourceEntered.Store(true)
time.Sleep(80 * time.Millisecond)
sourceFinished.Store(true)
return nil
})
go collector.Run()
// Wait for source to enter.
deadline := time.After(2 * time.Second)
for !sourceEntered.Load() {
select {
case <-deadline:
collector.Stop()
t.Fatal("source never entered")
return
case <-time.After(time.Millisecond):
}
}
// Stop should block until the current tick (including slow source) finishes.
collector.Stop()
if !sourceFinished.Load() {
t.Error("Stop() returned before slow AssignmentSource finished")
}
}
// testAssignmentAndStatusBothFire verifies that both StatusCallback and
// AssignmentCallback fire on the same tick.
func testAssignmentAndStatusBothFire(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
var statusCount atomic.Int64
var assignCount atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
statusCount.Add(1)
})
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
return []blockvol.BlockVolumeAssignment{{
Path: path,
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
LeaseTtlMs: 30000,
}}
})
collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) {
assignCount.Add(1)
})
go collector.Run()
defer collector.Stop()
deadline := time.After(500 * time.Millisecond)
for statusCount.Load() < 3 || assignCount.Load() < 3 {
select {
case <-deadline:
t.Fatalf("expected >=3 each, got status=%d assign=%d",
statusCount.Load(), assignCount.Load())
case <-time.After(5 * time.Millisecond):
}
}
}
// testAssignmentCallbackReceivesErrs verifies that the AssignmentCallback
// receives the correct parallel error slice from ProcessBlockVolumeAssignments.
func testAssignmentCallbackReceivesErrs(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
var mu sync.Mutex
var gotAssignments []blockvol.BlockVolumeAssignment
var gotErrs []error
received := make(chan struct{}, 1)
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {})
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
// Mixed batch: 1 valid + 1 unknown
return []blockvol.BlockVolumeAssignment{
{Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000},
{Path: "/nonexistent/vol.blk", Epoch: 1, Role: uint32(blockvol.RolePrimary)},
}
})
collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) {
mu.Lock()
if gotAssignments == nil {
gotAssignments = a
gotErrs = errs
}
mu.Unlock()
select {
case received <- struct{}{}:
default:
}
})
go collector.Run()
defer collector.Stop()
select {
case <-received:
case <-time.After(500 * time.Millisecond):
t.Fatal("AssignmentCallback never fired")
}
mu.Lock()
defer mu.Unlock()
if len(gotAssignments) != 2 {
t.Fatalf("expected 2 assignments, got %d", len(gotAssignments))
}
if len(gotErrs) != 2 {
t.Fatalf("expected 2 errors, got %d", len(gotErrs))
}
if gotErrs[0] != nil {
t.Errorf("assignment 0: expected nil error, got %v", gotErrs[0])
}
if gotErrs[1] == nil {
t.Error("assignment 1: expected error for unknown volume")
} else if !strings.Contains(gotErrs[1].Error(), "not found") {
t.Errorf("assignment 1: expected 'not found', got: %v", gotErrs[1])
}
}

34
weed/server/qa_block_heartbeat_loop_test.go

@ -69,7 +69,7 @@ func testStopBeforeRunDeadlocks(t *testing.T) {
func testDoubleStopNoPanic(t *testing.T) {
bs := newTestBlockService(t)
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {}
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {})
go collector.Run()
time.Sleep(30 * time.Millisecond)
@ -103,13 +103,13 @@ func testStopDuringCallback(t *testing.T) {
callbackStarted := make(chan struct{})
callbackRelease := make(chan struct{})
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
select {
case callbackStarted <- struct{}{}:
default:
}
<-callbackRelease
}
})
go collector.Run()
@ -157,9 +157,9 @@ func testZeroIntervalPanics(t *testing.T) {
collector := NewBlockVolumeHeartbeatCollector(bs, 0)
var count atomic.Int64
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
count.Add(1)
}
})
go collector.Run()
time.Sleep(30 * time.Millisecond)
@ -177,9 +177,9 @@ func testVeryShortInterval(t *testing.T) {
var count atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 1*time.Millisecond)
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
count.Add(1)
}
})
go collector.Run()
time.Sleep(50 * time.Millisecond)
@ -203,12 +203,12 @@ func testCallbackPanicCrashesGoroutine(t *testing.T) {
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
var callCount atomic.Int64
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
n := callCount.Add(1)
if n == 1 {
panic("deliberate test panic in callback")
}
}
})
go collector.Run()
@ -228,10 +228,10 @@ func testCallbackSlowBlocksNextTick(t *testing.T) {
var count atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
count.Add(1)
time.Sleep(50 * time.Millisecond) // 5x the interval
}
})
go collector.Run()
time.Sleep(200 * time.Millisecond)
@ -247,20 +247,20 @@ func testCallbackSlowBlocksNextTick(t *testing.T) {
}
func testCallbackSetAfterRun(t *testing.T) {
// Setting StatusCallback after Run() starts -- data race?
// The field is read without sync in Run(). This documents the behavior.
// Setting SetStatusCallback after Run() starts -- now safe with cbMu
// (BUG-CP4B3-2 fix).
bs := newTestBlockService(t)
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
// Start with nil callback.
go collector.Run()
// Set callback after Run started. Under -race, this would flag a data race.
// Set callback after Run started. With cbMu, this is race-free.
time.Sleep(5 * time.Millisecond)
var called atomic.Bool
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
called.Store(true)
}
})
time.Sleep(50 * time.Millisecond)
collector.Stop()
@ -275,7 +275,7 @@ func testCallbackSetAfterRun(t *testing.T) {
func testConcurrentStopCalls(t *testing.T) {
bs := newTestBlockService(t)
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.StatusCallback = func(msgs []blockvol.BlockVolumeInfoMessage) {}
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {})
go collector.Run()
time.Sleep(30 * time.Millisecond)

35
weed/storage/store_blockvol.go

@ -96,6 +96,41 @@ func (bs *BlockVolumeStore) CollectBlockVolumeHeartbeat() []blockvol.BlockVolume
return msgs
}
// withVolume looks up a volume by path and calls fn while holding RLock.
// This prevents RemoveBlockVolume from closing the volume while fn runs
// (BUG-CP4B3-1: TOCTOU between GetBlockVolume and HandleAssignment).
func (bs *BlockVolumeStore) withVolume(path string, fn func(*blockvol.BlockVol) error) error {
bs.mu.RLock()
defer bs.mu.RUnlock()
vol, ok := bs.volumes[path]
if !ok {
return fmt.Errorf("block volume not found: %s", path)
}
return fn(vol)
}
// ProcessBlockVolumeAssignments applies a batch of assignments from master.
// Returns a slice of errors parallel to the input (nil = success).
// Unknown volumes and invalid transitions are logged and returned as errors,
// but do not stop processing of remaining assignments.
func (bs *BlockVolumeStore) ProcessBlockVolumeAssignments(
assignments []blockvol.BlockVolumeAssignment,
) []error {
errs := make([]error, len(assignments))
for i, a := range assignments {
role := blockvol.RoleFromWire(a.Role)
ttl := blockvol.LeaseTTLFromWire(a.LeaseTtlMs)
if err := bs.withVolume(a.Path, func(vol *blockvol.BlockVol) error {
return vol.HandleAssignment(a.Epoch, role, ttl)
}); err != nil {
errs[i] = err
glog.Warningf("assignment: volume %s epoch=%d role=%s: %v",
a.Path, a.Epoch, role, err)
}
}
return errs
}
// Close closes all block volumes.
func (bs *BlockVolumeStore) Close() {
bs.mu.Lock()

Loading…
Cancel
Save