You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

520 lines
17 KiB

package weed_server
import (
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
// ---------------------------------------------------------------------------
// QA adversarial tests for CP4b-3: Assignment Processing
// Targets: ProcessBlockVolumeAssignments, processAssignments, safe wrappers
//
// Bugs found:
// BUG-CP4B3-1 [Medium]: TOCTOU in ProcessBlockVolumeAssignments -- volume
// can be removed between GetBlockVolume and HandleAssignment.
// BUG-CP4B3-2 [Low]: Data race on callback fields -- fixed with cbMu +
// SetXxx methods.
// ---------------------------------------------------------------------------
func TestQABlockAssignmentProcessing(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
// --- Source / callback panic recovery ---
{name: "source_panic_recovery", run: testSourcePanicRecovery},
{name: "callback_panic_recovery", run: testAssignCallbackPanicRecovery},
// --- Source edge cases ---
{name: "source_returns_nil", run: testSourceReturnsNil},
{name: "source_returns_empty", run: testSourceReturnsEmpty},
{name: "slow_source_blocks_tick", run: testSlowSourceBlocksTick},
{name: "source_set_after_run", run: testAssignSourceSetAfterRun},
// --- Batch processing ---
{name: "same_volume_batch_ordering", run: testSameVolumeBatchOrdering},
{name: "unknown_role_in_assignment", run: testUnknownRoleInAssignment},
{name: "large_batch_100_assignments", run: testLargeBatch100},
{name: "batch_all_unknown_volumes", run: testBatchAllUnknownVolumes},
// --- Integration: collector + assignments ---
{name: "stop_during_slow_assignment", run: testStopDuringSlowAssignment},
{name: "assignment_and_status_both_fire", run: testAssignmentAndStatusBothFire},
{name: "assignment_callback_receives_errs", run: testAssignmentCallbackReceivesErrs},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.run(t)
})
}
}
// ---------------------------------------------------------------------------
// Source / callback panic recovery
// ---------------------------------------------------------------------------
// testSourcePanicRecovery verifies that a panicking AssignmentSource doesn't
// kill the collector. Status callbacks should continue after the panic.
func testSourcePanicRecovery(t *testing.T) {
bs := newTestBlockService(t)
var statusCount atomic.Int64
var sourceCalls atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
statusCount.Add(1)
})
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
n := sourceCalls.Add(1)
if n == 1 {
panic("test: source panic")
}
return nil // subsequent calls return empty
})
go collector.Run()
defer collector.Stop()
// Wait for post-panic ticks (status callbacks should keep firing).
deadline := time.After(500 * time.Millisecond)
for statusCount.Load() < 3 {
select {
case <-deadline:
t.Fatalf("collector died after source panic: only %d status callbacks", statusCount.Load())
case <-time.After(5 * time.Millisecond):
}
}
if sourceCalls.Load() < 2 {
t.Errorf("expected AssignmentSource called again after panic, got %d calls", sourceCalls.Load())
}
}
// testAssignCallbackPanicRecovery verifies that a panicking AssignmentCallback
// doesn't kill the collector.
func testAssignCallbackPanicRecovery(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
var statusCount atomic.Int64
var cbCalls atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
statusCount.Add(1)
})
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
// Always return a valid promote-to-Primary assignment.
// After first call, HandleAssignment will return "same role" (no-op or error),
// but that's fine -- we're testing the callback panic, not the assignment.
return []blockvol.BlockVolumeAssignment{{
Path: path,
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
LeaseTtlMs: 30000,
}}
})
collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) {
n := cbCalls.Add(1)
if n == 1 {
panic("test: callback panic")
}
})
go collector.Run()
defer collector.Stop()
deadline := time.After(500 * time.Millisecond)
for statusCount.Load() < 3 {
select {
case <-deadline:
t.Fatalf("collector died after callback panic: only %d status callbacks", statusCount.Load())
case <-time.After(5 * time.Millisecond):
}
}
if cbCalls.Load() < 2 {
t.Errorf("expected AssignmentCallback called again after panic, got %d calls", cbCalls.Load())
}
}
// ---------------------------------------------------------------------------
// Source edge cases
// ---------------------------------------------------------------------------
// testSourceReturnsNil verifies that AssignmentSource returning nil
// doesn't cause ProcessBlockVolumeAssignments to be called.
func testSourceReturnsNil(t *testing.T) {
bs := newTestBlockService(t)
var processAttempted atomic.Bool
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
return nil
})
// If ProcessBlockVolumeAssignments were called with nil,
// it returns a nil slice (len 0) -- harmless. But the collector
// short-circuits on len==0 before calling Process. We verify
// by ensuring AssignmentCallback is never called.
collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) {
processAttempted.Store(true)
})
go collector.Run()
time.Sleep(50 * time.Millisecond)
collector.Stop()
if processAttempted.Load() {
t.Error("AssignmentCallback called when source returned nil -- should short-circuit")
}
}
// testSourceReturnsEmpty verifies that an empty slice from AssignmentSource
// is handled the same as nil (no processing).
func testSourceReturnsEmpty(t *testing.T) {
bs := newTestBlockService(t)
var processAttempted atomic.Bool
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
return []blockvol.BlockVolumeAssignment{}
})
collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) {
processAttempted.Store(true)
})
go collector.Run()
time.Sleep(50 * time.Millisecond)
collector.Stop()
if processAttempted.Load() {
t.Error("AssignmentCallback called when source returned empty -- should short-circuit")
}
}
// testSlowSourceBlocksTick verifies that a slow AssignmentSource blocks the
// entire collector tick (status + assignment are sequential in the same tick).
func testSlowSourceBlocksTick(t *testing.T) {
bs := newTestBlockService(t)
var statusCount atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 5*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
statusCount.Add(1)
})
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
time.Sleep(50 * time.Millisecond) // 10x the tick interval
return nil
})
go collector.Run()
time.Sleep(200 * time.Millisecond)
collector.Stop()
// With 50ms source sleep, expect ~4 status callbacks (200ms/50ms), not 40.
n := statusCount.Load()
if n > 10 {
t.Errorf("expected slow source to throttle ticks, got %d status callbacks", n)
}
if n < 1 {
t.Error("expected at least 1 status callback")
}
t.Logf("slow source: %d status callbacks in 200ms (5ms interval, 50ms source)", n)
}
// testAssignSourceSetAfterRun verifies that setting AssignmentSource after
// Run() started still picks up assignments on subsequent ticks.
func testAssignSourceSetAfterRun(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {})
// AssignmentSource left nil initially.
go collector.Run()
defer collector.Stop()
// Let a few nil-source ticks fire.
time.Sleep(30 * time.Millisecond)
// Now set the source + callback.
var cbCalled atomic.Bool
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
return []blockvol.BlockVolumeAssignment{{
Path: path,
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
LeaseTtlMs: 30000,
}}
})
collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) {
cbCalled.Store(true)
})
deadline := time.After(200 * time.Millisecond)
for !cbCalled.Load() {
select {
case <-deadline:
t.Fatal("AssignmentCallback never fired after setting source post-Run")
case <-time.After(5 * time.Millisecond):
}
}
}
// ---------------------------------------------------------------------------
// Batch processing
// ---------------------------------------------------------------------------
// testSameVolumeBatchOrdering verifies that when the same volume appears
// multiple times in a batch, assignments are applied sequentially in order.
// HandleAssignment: Primary->Stale (demote does Primary->Draining->Stale internally).
func testSameVolumeBatchOrdering(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
// Batch: promote to Primary (epoch 1), then demote to Stale (epoch 2).
// Order matters -- first must run before second.
assignments := []blockvol.BlockVolumeAssignment{
{Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000},
{Path: path, Epoch: 2, Role: uint32(blockvol.RoleStale)},
}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
if errs[0] != nil {
t.Fatalf("assignment 0 (promote): %v", errs[0])
}
if errs[1] != nil {
t.Fatalf("assignment 1 (demote): %v", errs[1])
}
vol, _ := bs.Store().GetBlockVolume(path)
if vol.Role() != blockvol.RoleStale {
t.Errorf("expected Stale after sequential batch, got %s", vol.Role())
}
}
// testUnknownRoleInAssignment verifies that a wire role value > maxValidRole
// is mapped to RoleNone by RoleFromWire. If the volume is already RoleNone,
// HandleAssignment with RoleNone is a no-op (same-role refresh).
func testUnknownRoleInAssignment(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
assignments := []blockvol.BlockVolumeAssignment{{
Path: path,
Epoch: 0,
Role: 255, // unknown -- RoleFromWire maps to RoleNone
}}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
// RoleNone -> RoleNone is a no-op, should succeed.
if errs[0] != nil {
t.Fatalf("expected nil error for unknown role mapped to RoleNone, got: %v", errs[0])
}
vol, _ := bs.Store().GetBlockVolume(path)
if vol.Role() != blockvol.RoleNone {
t.Errorf("expected RoleNone, got %s", vol.Role())
}
}
// testLargeBatch100 verifies that 100 assignments in a single batch all
// target the same (valid) volume path. The first promotes to Primary,
// and subsequent 99 are same-role refreshes (same epoch).
func testLargeBatch100(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
assignments := make([]blockvol.BlockVolumeAssignment, 100)
assignments[0] = blockvol.BlockVolumeAssignment{
Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000,
}
for i := 1; i < 100; i++ {
assignments[i] = blockvol.BlockVolumeAssignment{
Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000,
}
}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
if len(errs) != 100 {
t.Fatalf("expected 100 error slots, got %d", len(errs))
}
// First should succeed (promote).
if errs[0] != nil {
t.Fatalf("assignment 0: %v", errs[0])
}
// Remaining are same-role refreshes -- should all succeed.
for i := 1; i < 100; i++ {
if errs[i] != nil {
t.Errorf("assignment %d: unexpected error: %v", i, errs[i])
}
}
}
// testBatchAllUnknownVolumes verifies that a batch where every assignment
// targets a nonexistent volume returns all errors but doesn't panic.
func testBatchAllUnknownVolumes(t *testing.T) {
bs := newTestBlockService(t)
assignments := make([]blockvol.BlockVolumeAssignment, 5)
for i := range assignments {
assignments[i] = blockvol.BlockVolumeAssignment{
Path: "/nonexistent/vol.blk",
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
}
}
errs := bs.Store().ProcessBlockVolumeAssignments(assignments)
for i, err := range errs {
if err == nil {
t.Errorf("assignment %d: expected error for unknown volume", i)
} else if !strings.Contains(err.Error(), "not found") {
t.Errorf("assignment %d: expected 'not found', got: %v", i, err)
}
}
}
// ---------------------------------------------------------------------------
// Integration: collector + assignments
// ---------------------------------------------------------------------------
// testStopDuringSlowAssignment verifies that Stop() waits for a slow
// assignment source to complete before returning.
func testStopDuringSlowAssignment(t *testing.T) {
bs := newTestBlockService(t)
var sourceEntered atomic.Bool
var sourceFinished atomic.Bool
collector := NewBlockVolumeHeartbeatCollector(bs, 5*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {})
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
sourceEntered.Store(true)
time.Sleep(80 * time.Millisecond)
sourceFinished.Store(true)
return nil
})
go collector.Run()
// Wait for source to enter.
deadline := time.After(2 * time.Second)
for !sourceEntered.Load() {
select {
case <-deadline:
collector.Stop()
t.Fatal("source never entered")
return
case <-time.After(time.Millisecond):
}
}
// Stop should block until the current tick (including slow source) finishes.
collector.Stop()
if !sourceFinished.Load() {
t.Error("Stop() returned before slow AssignmentSource finished")
}
}
// testAssignmentAndStatusBothFire verifies that both StatusCallback and
// AssignmentCallback fire on the same tick.
func testAssignmentAndStatusBothFire(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
var statusCount atomic.Int64
var assignCount atomic.Int64
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {
statusCount.Add(1)
})
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
return []blockvol.BlockVolumeAssignment{{
Path: path,
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
LeaseTtlMs: 30000,
}}
})
collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) {
assignCount.Add(1)
})
go collector.Run()
defer collector.Stop()
deadline := time.After(500 * time.Millisecond)
for statusCount.Load() < 3 || assignCount.Load() < 3 {
select {
case <-deadline:
t.Fatalf("expected >=3 each, got status=%d assign=%d",
statusCount.Load(), assignCount.Load())
case <-time.After(5 * time.Millisecond):
}
}
}
// testAssignmentCallbackReceivesErrs verifies that the AssignmentCallback
// receives the correct parallel error slice from ProcessBlockVolumeAssignments.
func testAssignmentCallbackReceivesErrs(t *testing.T) {
bs := newTestBlockService(t)
path := testBlockVolPath(t, bs)
var mu sync.Mutex
var gotAssignments []blockvol.BlockVolumeAssignment
var gotErrs []error
received := make(chan struct{}, 1)
collector := NewBlockVolumeHeartbeatCollector(bs, 10*time.Millisecond)
collector.SetStatusCallback(func(msgs []blockvol.BlockVolumeInfoMessage) {})
collector.SetAssignmentSource(func() []blockvol.BlockVolumeAssignment {
// Mixed batch: 1 valid + 1 unknown
return []blockvol.BlockVolumeAssignment{
{Path: path, Epoch: 1, Role: uint32(blockvol.RolePrimary), LeaseTtlMs: 30000},
{Path: "/nonexistent/vol.blk", Epoch: 1, Role: uint32(blockvol.RolePrimary)},
}
})
collector.SetAssignmentCallback(func(a []blockvol.BlockVolumeAssignment, errs []error) {
mu.Lock()
if gotAssignments == nil {
gotAssignments = a
gotErrs = errs
}
mu.Unlock()
select {
case received <- struct{}{}:
default:
}
})
go collector.Run()
defer collector.Stop()
select {
case <-received:
case <-time.After(500 * time.Millisecond):
t.Fatal("AssignmentCallback never fired")
}
mu.Lock()
defer mu.Unlock()
if len(gotAssignments) != 2 {
t.Fatalf("expected 2 assignments, got %d", len(gotAssignments))
}
if len(gotErrs) != 2 {
t.Fatalf("expected 2 errors, got %d", len(gotErrs))
}
if gotErrs[0] != nil {
t.Errorf("assignment 0: expected nil error, got %v", gotErrs[0])
}
if gotErrs[1] == nil {
t.Error("assignment 1: expected error for unknown volume")
} else if !strings.Contains(gotErrs[1].Error(), "not found") {
t.Errorf("assignment 1: expected 'not found', got: %v", gotErrs[1])
}
}