Browse Source

feat: CP11B-2 explainable placement / plan API

New POST /block/volume/plan endpoint returns full placement preview:
resolved policy, ordered candidate list, selected primary/replicas,
and per-server rejection reasons with stable string constants.

Core design: evaluateBlockPlacement() is a pure function with no
registry/topology dependency. gatherPlacementCandidates() is the
single topology bridge point. Plan and create share the same planner —
parity contract is same ordered candidate list for same cluster state.

Create path refactored: uses evaluateBlockPlacement() instead of
PickServer(), iterates all candidates (no 3-retry cap), recomputes
replica order after primary fallback. rf_not_satisfiable severity
is durability-mode-aware (warning for best_effort, error for strict).

15 unit tests + 20 QA adversarial tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
f501c63009
  1. 190
      weed/server/master_block_plan.go
  2. 382
      weed/server/master_block_plan_test.go
  3. 41
      weed/server/master_block_registry.go
  4. 37
      weed/server/master_grpc_server_block.go
  5. 1
      weed/server/master_server.go
  6. 622
      weed/server/qa_block_cp11b2_adversarial_test.go
  7. 21
      weed/storage/blockvol/blockapi/client.go
  8. 32
      weed/storage/blockvol/blockapi/types.go

190
weed/server/master_block_plan.go

@ -0,0 +1,190 @@
package weed_server
import (
"sort"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/blockapi"
)
// Stable placement rejection reason constants.
// These are product-grade strings suitable for UI/automation consumption.
const (
ReasonDiskTypeMismatch = "disk_type_mismatch"
ReasonInsufficientSpace = "insufficient_space"
ReasonAlreadySelected = "already_selected"
)
// Plan-level error reasons (not per-server).
const (
ReasonNoViablePrimary = "no_viable_primary"
ReasonRFNotSatisfiable = "rf_not_satisfiable"
)
// PlacementRejection records one server rejection with stable reason.
type PlacementRejection struct {
Server string
Reason string
}
// PlacementResult is the full output of the placement planner.
// Candidates is the ordered eligible list — Primary is Candidates[0],
// Replicas is Candidates[1:RF]. All derived from the same sequence.
type PlacementResult struct {
Candidates []string // full ordered eligible list
Primary string
Replicas []string
Rejections []PlacementRejection
Warnings []string
Errors []string
}
// evaluateBlockPlacement takes candidates and request parameters,
// applies filters, scores deterministically, and returns a placement plan.
// Pure function: no side effects, no registry/topology dependency.
func evaluateBlockPlacement(
candidates []PlacementCandidateInfo,
replicaFactor int,
diskType string,
sizeBytes uint64,
durabilityMode blockvol.DurabilityMode,
) PlacementResult {
var result PlacementResult
// Filter phase: reject ineligible candidates.
type eligible struct {
address string
volumeCount int
}
var kept []eligible
for _, c := range candidates {
// Disk type filter: skip when either side is empty (unknown/any).
if diskType != "" && c.DiskType != "" && c.DiskType != diskType {
result.Rejections = append(result.Rejections, PlacementRejection{
Server: c.Address,
Reason: ReasonDiskTypeMismatch,
})
continue
}
// Capacity filter: skip when AvailableBytes is 0 (unknown).
if sizeBytes > 0 && c.AvailableBytes > 0 && c.AvailableBytes < sizeBytes {
result.Rejections = append(result.Rejections, PlacementRejection{
Server: c.Address,
Reason: ReasonInsufficientSpace,
})
continue
}
kept = append(kept, eligible{address: c.Address, volumeCount: c.VolumeCount})
}
// Sort phase: volume count ascending, then address ascending (deterministic).
sort.Slice(kept, func(i, j int) bool {
if kept[i].volumeCount != kept[j].volumeCount {
return kept[i].volumeCount < kept[j].volumeCount
}
return kept[i].address < kept[j].address
})
// Build ordered candidate list.
result.Candidates = make([]string, len(kept))
for i, k := range kept {
result.Candidates[i] = k.address
}
// Select phase.
if len(result.Candidates) == 0 {
result.Errors = append(result.Errors, ReasonNoViablePrimary)
return result
}
result.Primary = result.Candidates[0]
// Replicas: RF means total copies including primary.
replicasNeeded := replicaFactor - 1
if replicasNeeded > 0 {
available := len(result.Candidates) - 1 // exclude primary
if available >= replicasNeeded {
result.Replicas = result.Candidates[1 : 1+replicasNeeded]
} else {
// Partial or zero replicas available.
if available > 0 {
result.Replicas = result.Candidates[1:]
}
// Severity depends on durability mode: strict modes error, best_effort warns.
requiredReplicas := durabilityMode.RequiredReplicas(replicaFactor)
if available < requiredReplicas {
result.Errors = append(result.Errors, ReasonRFNotSatisfiable)
} else {
result.Warnings = append(result.Warnings, ReasonRFNotSatisfiable)
}
}
}
return result
}
// gatherPlacementCandidates reads candidate data from the block registry.
// This is the topology bridge point: today it reads from the registry,
// long-term it would read from weed/topology.
func (ms *MasterServer) gatherPlacementCandidates() []PlacementCandidateInfo {
return ms.blockRegistry.PlacementCandidates()
}
// PlanBlockVolume is the top-level planning function.
// Resolves policy, gathers candidates, evaluates placement, builds response.
func (ms *MasterServer) PlanBlockVolume(req *blockapi.CreateVolumeRequest) *blockapi.VolumePlanResponse {
env := ms.buildEnvironmentInfo()
resolved := blockvol.ResolvePolicy(blockvol.PresetName(req.Preset),
req.DurabilityMode, req.ReplicaFactor, req.DiskType, env)
resp := &blockapi.VolumePlanResponse{
ResolvedPolicy: blockapi.ResolvedPolicyView{
Preset: string(resolved.Policy.Preset),
DurabilityMode: resolved.Policy.DurabilityMode,
ReplicaFactor: resolved.Policy.ReplicaFactor,
DiskType: resolved.Policy.DiskType,
TransportPreference: resolved.Policy.TransportPref,
WorkloadHint: resolved.Policy.WorkloadHint,
WALSizeRecommended: resolved.Policy.WALSizeRecommended,
StorageProfile: resolved.Policy.StorageProfile,
},
Warnings: resolved.Warnings,
Errors: resolved.Errors,
Plan: blockapi.VolumePlanView{Candidates: []string{}}, // never nil
}
// If resolve has errors, return without placement evaluation.
if len(resolved.Errors) > 0 {
return resp
}
durMode, _ := blockvol.ParseDurabilityMode(resolved.Policy.DurabilityMode)
candidates := ms.gatherPlacementCandidates()
placement := evaluateBlockPlacement(candidates, resolved.Policy.ReplicaFactor,
resolved.Policy.DiskType, req.SizeBytes, durMode)
resp.Plan = blockapi.VolumePlanView{
Primary: placement.Primary,
Replicas: placement.Replicas,
Candidates: placement.Candidates,
}
// Ensure Candidates is never nil (stable response shape).
if resp.Plan.Candidates == nil {
resp.Plan.Candidates = []string{}
}
// Convert internal rejections to API type.
for _, r := range placement.Rejections {
resp.Plan.Rejections = append(resp.Plan.Rejections, blockapi.VolumePlanRejection{
Server: r.Server,
Reason: r.Reason,
})
}
// Merge placement warnings and errors.
resp.Warnings = append(resp.Warnings, placement.Warnings...)
resp.Errors = append(resp.Errors, placement.Errors...)
return resp
}

382
weed/server/master_block_plan_test.go

@ -0,0 +1,382 @@
package weed_server
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/blockapi"
)
// --- evaluateBlockPlacement unit tests ---
func TestEvaluateBlockPlacement_SingleCandidate_RF1(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0},
}
result := evaluateBlockPlacement(candidates, 1, "", 0, blockvol.DurabilityBestEffort)
if result.Primary != "vs1:9333" {
t.Fatalf("expected primary vs1:9333, got %q", result.Primary)
}
if len(result.Replicas) != 0 {
t.Fatalf("expected 0 replicas, got %d", len(result.Replicas))
}
if len(result.Rejections) != 0 {
t.Fatalf("expected 0 rejections, got %d", len(result.Rejections))
}
if len(result.Candidates) != 1 || result.Candidates[0] != "vs1:9333" {
t.Fatalf("expected candidates [vs1:9333], got %v", result.Candidates)
}
}
func TestEvaluateBlockPlacement_LeastLoaded(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 5},
{Address: "vs2:9333", VolumeCount: 2},
{Address: "vs3:9333", VolumeCount: 8},
}
result := evaluateBlockPlacement(candidates, 1, "", 0, blockvol.DurabilityBestEffort)
if result.Primary != "vs2:9333" {
t.Fatalf("expected least-loaded vs2:9333 as primary, got %q", result.Primary)
}
// Candidates should be sorted: vs2 (2), vs1 (5), vs3 (8)
expected := []string{"vs2:9333", "vs1:9333", "vs3:9333"}
if len(result.Candidates) != 3 {
t.Fatalf("expected 3 candidates, got %d", len(result.Candidates))
}
for i, e := range expected {
if result.Candidates[i] != e {
t.Fatalf("candidates[%d]: expected %q, got %q", i, e, result.Candidates[i])
}
}
}
func TestEvaluateBlockPlacement_DeterministicTiebreak(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs3:9333", VolumeCount: 0},
{Address: "vs1:9333", VolumeCount: 0},
{Address: "vs2:9333", VolumeCount: 0},
}
result := evaluateBlockPlacement(candidates, 1, "", 0, blockvol.DurabilityBestEffort)
// All same count — address tiebreaker: vs1, vs2, vs3
if result.Primary != "vs1:9333" {
t.Fatalf("expected vs1:9333 (lowest address), got %q", result.Primary)
}
expected := []string{"vs1:9333", "vs2:9333", "vs3:9333"}
for i, e := range expected {
if result.Candidates[i] != e {
t.Fatalf("candidates[%d]: expected %q, got %q", i, e, result.Candidates[i])
}
}
}
func TestEvaluateBlockPlacement_RF2_PrimaryAndReplica(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 3},
{Address: "vs2:9333", VolumeCount: 1},
{Address: "vs3:9333", VolumeCount: 5},
}
result := evaluateBlockPlacement(candidates, 2, "", 0, blockvol.DurabilityBestEffort)
if result.Primary != "vs2:9333" {
t.Fatalf("expected primary vs2:9333, got %q", result.Primary)
}
if len(result.Replicas) != 1 || result.Replicas[0] != "vs1:9333" {
t.Fatalf("expected replicas [vs1:9333], got %v", result.Replicas)
}
if len(result.Errors) != 0 {
t.Fatalf("unexpected errors: %v", result.Errors)
}
}
func TestEvaluateBlockPlacement_RF3_AllSelected(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0},
{Address: "vs2:9333", VolumeCount: 0},
{Address: "vs3:9333", VolumeCount: 0},
}
result := evaluateBlockPlacement(candidates, 3, "", 0, blockvol.DurabilityBestEffort)
if result.Primary != "vs1:9333" {
t.Fatalf("expected primary vs1:9333, got %q", result.Primary)
}
if len(result.Replicas) != 2 {
t.Fatalf("expected 2 replicas, got %d", len(result.Replicas))
}
if len(result.Errors) != 0 {
t.Fatalf("unexpected errors: %v", result.Errors)
}
}
func TestEvaluateBlockPlacement_RF_ExceedsServers(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0},
{Address: "vs2:9333", VolumeCount: 0},
}
result := evaluateBlockPlacement(candidates, 3, "", 0, blockvol.DurabilityBestEffort)
// Primary should be selected, but only 1 replica possible out of 2 needed
if result.Primary != "vs1:9333" {
t.Fatalf("expected primary vs1:9333, got %q", result.Primary)
}
if len(result.Replicas) != 1 {
t.Fatalf("expected 1 partial replica, got %d", len(result.Replicas))
}
// Should have rf_not_satisfiable warning
found := false
for _, w := range result.Warnings {
if w == ReasonRFNotSatisfiable {
found = true
}
}
if !found {
t.Fatalf("expected warning %q, got %v", ReasonRFNotSatisfiable, result.Warnings)
}
}
// TestEvaluateBlockPlacement_SingleServer_BestEffort_RF2 verifies that with one server,
// RF=2, best_effort: plan warns (not errors), matching create behavior which succeeds as single-copy.
func TestEvaluateBlockPlacement_SingleServer_BestEffort_RF2(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0},
}
result := evaluateBlockPlacement(candidates, 2, "", 0, blockvol.DurabilityBestEffort)
if result.Primary != "vs1:9333" {
t.Fatalf("expected primary vs1:9333, got %q", result.Primary)
}
// best_effort: zero replicas available should be a warning, not error
if len(result.Errors) != 0 {
t.Fatalf("best_effort should not produce errors, got %v", result.Errors)
}
found := false
for _, w := range result.Warnings {
if w == ReasonRFNotSatisfiable {
found = true
}
}
if !found {
t.Fatalf("expected warning %q, got %v", ReasonRFNotSatisfiable, result.Warnings)
}
}
// TestEvaluateBlockPlacement_SingleServer_SyncAll_RF2 verifies that with one server,
// RF=2, sync_all: plan errors because sync_all requires replicas.
func TestEvaluateBlockPlacement_SingleServer_SyncAll_RF2(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0},
}
result := evaluateBlockPlacement(candidates, 2, "", 0, blockvol.DurabilitySyncAll)
if result.Primary != "vs1:9333" {
t.Fatalf("expected primary vs1:9333, got %q", result.Primary)
}
// sync_all: zero replicas available should be an error
found := false
for _, e := range result.Errors {
if e == ReasonRFNotSatisfiable {
found = true
}
}
if !found {
t.Fatalf("expected error %q for sync_all, got errors=%v warnings=%v",
ReasonRFNotSatisfiable, result.Errors, result.Warnings)
}
}
func TestEvaluateBlockPlacement_NoServers(t *testing.T) {
result := evaluateBlockPlacement(nil, 1, "", 0, blockvol.DurabilityBestEffort)
if result.Primary != "" {
t.Fatalf("expected empty primary, got %q", result.Primary)
}
found := false
for _, e := range result.Errors {
if e == ReasonNoViablePrimary {
found = true
}
}
if !found {
t.Fatalf("expected error %q, got %v", ReasonNoViablePrimary, result.Errors)
}
if len(result.Candidates) != 0 {
t.Fatalf("expected empty candidates, got %v", result.Candidates)
}
}
func TestEvaluateBlockPlacement_DiskTypeMismatch(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0, DiskType: "ssd"},
{Address: "vs2:9333", VolumeCount: 0, DiskType: "hdd"},
{Address: "vs3:9333", VolumeCount: 0, DiskType: "ssd"},
}
result := evaluateBlockPlacement(candidates, 1, "ssd", 0, blockvol.DurabilityBestEffort)
if result.Primary != "vs1:9333" {
t.Fatalf("expected primary vs1:9333, got %q", result.Primary)
}
// vs2 should be rejected
if len(result.Rejections) != 1 || result.Rejections[0].Server != "vs2:9333" {
t.Fatalf("expected vs2:9333 rejected, got %v", result.Rejections)
}
if result.Rejections[0].Reason != ReasonDiskTypeMismatch {
t.Fatalf("expected reason %q, got %q", ReasonDiskTypeMismatch, result.Rejections[0].Reason)
}
if len(result.Candidates) != 2 {
t.Fatalf("expected 2 eligible candidates, got %d", len(result.Candidates))
}
}
func TestEvaluateBlockPlacement_InsufficientSpace(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0, AvailableBytes: 100 << 30}, // 100GB
{Address: "vs2:9333", VolumeCount: 0, AvailableBytes: 5 << 30}, // 5GB
}
result := evaluateBlockPlacement(candidates, 1, "", 10<<30, blockvol.DurabilityBestEffort) // request 10GB
if result.Primary != "vs1:9333" {
t.Fatalf("expected primary vs1:9333, got %q", result.Primary)
}
if len(result.Rejections) != 1 || result.Rejections[0].Server != "vs2:9333" {
t.Fatalf("expected vs2:9333 rejected, got %v", result.Rejections)
}
if result.Rejections[0].Reason != ReasonInsufficientSpace {
t.Fatalf("expected reason %q, got %q", ReasonInsufficientSpace, result.Rejections[0].Reason)
}
}
func TestEvaluateBlockPlacement_UnknownCapacity_Allowed(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0, AvailableBytes: 0}, // unknown
{Address: "vs2:9333", VolumeCount: 0, AvailableBytes: 0}, // unknown
}
result := evaluateBlockPlacement(candidates, 1, "", 10<<30, blockvol.DurabilityBestEffort)
if result.Primary != "vs1:9333" {
t.Fatalf("expected primary vs1:9333, got %q", result.Primary)
}
if len(result.Rejections) != 0 {
t.Fatalf("expected no rejections for unknown capacity, got %v", result.Rejections)
}
}
// --- HTTP handler tests ---
func qaPlanMaster(t *testing.T) *MasterServer {
t.Helper()
ms := &MasterServer{
blockRegistry: NewBlockVolumeRegistry(),
blockAssignmentQueue: NewBlockAssignmentQueue(),
blockFailover: newBlockFailoverState(),
}
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
return &blockAllocResult{
Path: fmt.Sprintf("/data/%s.blk", name),
IQN: fmt.Sprintf("iqn.2024.test:%s", name),
ISCSIAddr: server + ":3260",
ReplicaDataAddr: server + ":14260",
ReplicaCtrlAddr: server + ":14261",
RebuildListenAddr: server + ":15000",
}, nil
}
ms.blockVSDelete = func(ctx context.Context, server string, name string) error {
return nil
}
ms.blockRegistry.MarkBlockCapable("vs1:9333")
ms.blockRegistry.MarkBlockCapable("vs2:9333")
ms.blockRegistry.MarkBlockCapable("vs3:9333")
return ms
}
func TestBlockVolumePlanHandler_HappyPath(t *testing.T) {
ms := qaPlanMaster(t)
body := `{"name":"test-vol","size_bytes":1073741824,"replica_factor":2}`
req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
ms.blockVolumePlanHandler(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp blockapi.VolumePlanResponse
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("decode: %v", err)
}
if resp.Plan.Primary == "" {
t.Fatal("expected non-empty primary")
}
if len(resp.Plan.Candidates) == 0 {
t.Fatal("expected non-empty candidates")
}
if resp.Plan.Candidates == nil {
t.Fatal("candidates must never be nil")
}
if len(resp.Plan.Replicas) != 1 {
t.Fatalf("expected 1 replica for RF=2, got %d", len(resp.Plan.Replicas))
}
if len(resp.Errors) != 0 {
t.Fatalf("unexpected errors: %v", resp.Errors)
}
}
func TestBlockVolumePlanHandler_WithPreset(t *testing.T) {
ms := qaPlanMaster(t)
body := `{"name":"db-vol","size_bytes":1073741824,"preset":"database"}`
req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
ms.blockVolumePlanHandler(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var resp blockapi.VolumePlanResponse
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("decode: %v", err)
}
if resp.ResolvedPolicy.Preset != "database" {
t.Fatalf("expected preset database, got %q", resp.ResolvedPolicy.Preset)
}
if resp.ResolvedPolicy.DurabilityMode != "sync_all" {
t.Fatalf("expected sync_all, got %q", resp.ResolvedPolicy.DurabilityMode)
}
if resp.Plan.Primary == "" {
t.Fatal("expected non-empty primary")
}
}
func TestBlockVolumePlanHandler_NoServers(t *testing.T) {
ms := &MasterServer{
blockRegistry: NewBlockVolumeRegistry(),
blockAssignmentQueue: NewBlockAssignmentQueue(),
}
body := `{"name":"test-vol","size_bytes":1073741824}`
req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
ms.blockVolumePlanHandler(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 even with errors, got %d", w.Code)
}
var resp blockapi.VolumePlanResponse
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("decode: %v", err)
}
if len(resp.Errors) == 0 {
t.Fatal("expected errors for no servers")
}
found := false
for _, e := range resp.Errors {
if e == ReasonNoViablePrimary {
found = true
}
}
if !found {
t.Fatalf("expected %q in errors, got %v", ReasonNoViablePrimary, resp.Errors)
}
if resp.Plan.Candidates == nil {
t.Fatal("candidates must never be nil, even on error")
}
}

41
weed/server/master_block_registry.go

@ -158,7 +158,20 @@ type inflightEntry struct{}
// blockServerInfo tracks server-level capabilities reported via heartbeat.
type blockServerInfo struct {
NvmeAddr string // NVMe/TCP listen address; empty if NVMe disabled
NvmeAddr string // NVMe/TCP listen address; empty if NVMe disabled
DiskType string // reported via heartbeat (future)
AvailableBytes uint64 // reported via heartbeat (future)
}
// PlacementCandidateInfo is the registry's view of a placement candidate.
// Used by the placement planner — the single bridge point between registry
// and the pure evaluateBlockPlacement() function.
type PlacementCandidateInfo struct {
Address string
VolumeCount int
DiskType string // empty = unknown/any
AvailableBytes uint64 // 0 = unknown
NvmeCapable bool
}
@ -1337,6 +1350,32 @@ func (r *BlockVolumeRegistry) ServerSummaries() []BlockServerSummary {
return summaries
}
// PlacementCandidates returns enriched candidate information for placement planning.
// This is the bridge point between the registry and the placement planner.
// Long-term, this would be replaced by topology-backed candidate gathering.
func (r *BlockVolumeRegistry) PlacementCandidates() []PlacementCandidateInfo {
r.mu.RLock()
defer r.mu.RUnlock()
candidates := make([]PlacementCandidateInfo, 0, len(r.blockServers))
for addr, info := range r.blockServers {
count := 0
if names, ok := r.byServer[addr]; ok {
count = len(names)
}
c := PlacementCandidateInfo{
Address: addr,
VolumeCount: count,
}
if info != nil {
c.NvmeCapable = info.NvmeAddr != ""
c.DiskType = info.DiskType
c.AvailableBytes = info.AvailableBytes
}
candidates = append(candidates, c)
}
return candidates
}
// IsBlockCapable returns true if the given server is in the block-capable set (alive).
func (r *BlockVolumeRegistry) IsBlockCapable(server string) bool {
r.mu.RLock()

37
weed/server/master_grpc_server_block.go

@ -74,30 +74,22 @@ func (ms *MasterServer) CreateBlockVolume(ctx context.Context, req *master_pb.Cr
return ms.createBlockVolumeResponseFromEntry(entry), nil
}
// Get candidate servers.
servers := ms.blockRegistry.BlockCapableServers()
if len(servers) == 0 {
// Evaluate placement using the shared planner (parity with /block/volume/plan).
candidates := ms.gatherPlacementCandidates()
placement := evaluateBlockPlacement(candidates, replicaFactor, req.DiskType, req.SizeBytes, durMode)
if len(placement.Candidates) == 0 {
return nil, fmt.Errorf("no block volume servers available")
}
// Try up to 3 servers (or all available, whichever is smaller).
maxRetries := 3
if len(servers) < maxRetries {
maxRetries = len(servers)
}
// Try all candidates in planner order; fall back to next on RPC failure.
var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
server, err := ms.blockRegistry.PickServer(servers)
if err != nil {
return nil, err
}
for attempt := 0; attempt < len(placement.Candidates); attempt++ {
server := placement.Candidates[attempt]
result, err := ms.blockVSAllocate(ctx, server, req.Name, req.SizeBytes, req.DiskType, req.DurabilityMode)
if err != nil {
lastErr = fmt.Errorf("server %s: %w", server, err)
glog.V(0).Infof("[reqID=%s] CreateBlockVolume %q: attempt %d on %s failed: %v", blockReqID(ctx), req.Name, attempt+1, server, err)
servers = removeServer(servers, server)
continue
}
@ -119,12 +111,19 @@ func (ms *MasterServer) CreateBlockVolume(ctx context.Context, req *master_pb.Cr
LastLeaseGrant: time.Now(), // R2-F1: set BEFORE Register to avoid stale-lease race
}
// Recompute replica attempt order from Candidates with successful primary removed.
replicaCandidates := make([]string, 0, len(placement.Candidates)-1)
for _, c := range placement.Candidates {
if c != server {
replicaCandidates = append(replicaCandidates, c)
}
}
// Create replicaFactor-1 replicas on different servers (F4: partial create OK).
remaining := removeServer(servers, server)
for i := 0; i < replicaFactor-1 && len(remaining) > 0; i++ {
replicaServer := ms.tryCreateOneReplica(ctx, req, entry, result, remaining)
for i := 0; i < replicaFactor-1 && len(replicaCandidates) > 0; i++ {
replicaServer := ms.tryCreateOneReplica(ctx, req, entry, result, replicaCandidates)
if replicaServer != "" {
remaining = removeServer(remaining, replicaServer)
replicaCandidates = removeServer(replicaCandidates, replicaServer)
}
}
if len(entry.Replicas) == 0 && replicaFactor > 1 {

1
weed/server/master_server.go

@ -228,6 +228,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
r.HandleFunc("/block/volume/{name}", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeLookupHandler))).Methods("GET")
r.HandleFunc("/block/volumes", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeListHandler))).Methods("GET")
r.HandleFunc("/block/volume/resolve", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeResolveHandler))).Methods("POST")
r.HandleFunc("/block/volume/plan", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumePlanHandler)))).Methods("POST")
r.HandleFunc("/block/volume/{name}/expand", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeExpandHandler)))).Methods("POST")
r.HandleFunc("/block/volume/{name}/preflight", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumePreflightHandler))).Methods("GET")
r.HandleFunc("/block/volume/{name}/promote", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumePromoteHandler)))).Methods("POST")

622
weed/server/qa_block_cp11b2_adversarial_test.go

@ -0,0 +1,622 @@
package weed_server
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/blockapi"
)
// --- QA adversarial tests for CP11B-2: Explainable Placement / Plan API ---
// TestQA_CP11B2_ConcurrentPlanCalls verifies that 100 concurrent plan calls
// complete without panic or data race.
func TestQA_CP11B2_ConcurrentPlanCalls(t *testing.T) {
ms := qaPlanMaster(t)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
preset := ""
if n%3 == 0 {
preset = "database"
} else if n%3 == 1 {
preset = "general"
}
body := fmt.Sprintf(`{"name":"vol-%d","size_bytes":1073741824,"preset":"%s"}`, n, preset)
req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body))
w := httptest.NewRecorder()
ms.blockVolumePlanHandler(w, req)
if w.Code != http.StatusOK {
t.Errorf("goroutine %d: expected 200, got %d", n, w.Code)
}
}(i)
}
wg.Wait()
}
// TestQA_CP11B2_NoBlockCapableServers verifies that plan returns a structured
// error when no servers are available, not a panic.
func TestQA_CP11B2_NoBlockCapableServers(t *testing.T) {
ms := &MasterServer{
blockRegistry: NewBlockVolumeRegistry(),
blockAssignmentQueue: NewBlockAssignmentQueue(),
blockFailover: newBlockFailoverState(),
}
body := `{"name":"test-vol","size_bytes":1073741824,"replica_factor":2}`
req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body))
w := httptest.NewRecorder()
ms.blockVolumePlanHandler(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var resp blockapi.VolumePlanResponse
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("decode: %v", err)
}
if len(resp.Errors) == 0 {
t.Fatal("expected errors for no servers")
}
if resp.Plan.Primary != "" {
t.Fatalf("expected empty primary, got %q", resp.Plan.Primary)
}
}
// TestQA_CP11B2_RF_ExceedsAvailable verifies clear warning when RF exceeds servers.
func TestQA_CP11B2_RF_ExceedsAvailable(t *testing.T) {
ms := &MasterServer{
blockRegistry: NewBlockVolumeRegistry(),
blockAssignmentQueue: NewBlockAssignmentQueue(),
blockFailover: newBlockFailoverState(),
}
ms.blockRegistry.MarkBlockCapable("vs1:9333")
ms.blockRegistry.MarkBlockCapable("vs2:9333")
body := `{"name":"test-vol","size_bytes":1073741824,"replica_factor":3}`
req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body))
w := httptest.NewRecorder()
ms.blockVolumePlanHandler(w, req)
var resp blockapi.VolumePlanResponse
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("decode: %v", err)
}
// Should have rf_not_satisfiable in warnings (partial replica possible)
found := false
for _, warning := range resp.Warnings {
if warning == ReasonRFNotSatisfiable {
found = true
}
}
if !found {
t.Fatalf("expected warning %q, got warnings=%v, errors=%v", ReasonRFNotSatisfiable, resp.Warnings, resp.Errors)
}
}
// TestQA_CP11B2_PlanThenCreate_PolicyConsistency verifies that /resolve, /plan,
// and create all agree on the resolved policy for the same request.
func TestQA_CP11B2_PlanThenCreate_PolicyConsistency(t *testing.T) {
ms := qaPlanMaster(t)
reqBody := `{"name":"policy-test","size_bytes":1073741824,"preset":"database"}`
// Call /resolve
resolveReq := httptest.NewRequest(http.MethodPost, "/block/volume/resolve", strings.NewReader(reqBody))
resolveW := httptest.NewRecorder()
ms.blockVolumeResolveHandler(resolveW, resolveReq)
var resolveResp blockapi.ResolvedPolicyResponse
json.NewDecoder(resolveW.Body).Decode(&resolveResp)
// Call /plan
planReq := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(reqBody))
planW := httptest.NewRecorder()
ms.blockVolumePlanHandler(planW, planReq)
var planResp blockapi.VolumePlanResponse
json.NewDecoder(planW.Body).Decode(&planResp)
// Compare resolved policy fields
if resolveResp.Policy.DurabilityMode != planResp.ResolvedPolicy.DurabilityMode {
t.Fatalf("durability_mode mismatch: resolve=%q plan=%q",
resolveResp.Policy.DurabilityMode, planResp.ResolvedPolicy.DurabilityMode)
}
if resolveResp.Policy.ReplicaFactor != planResp.ResolvedPolicy.ReplicaFactor {
t.Fatalf("replica_factor mismatch: resolve=%d plan=%d",
resolveResp.Policy.ReplicaFactor, planResp.ResolvedPolicy.ReplicaFactor)
}
if resolveResp.Policy.DiskType != planResp.ResolvedPolicy.DiskType {
t.Fatalf("disk_type mismatch: resolve=%q plan=%q",
resolveResp.Policy.DiskType, planResp.ResolvedPolicy.DiskType)
}
if resolveResp.Policy.Preset != planResp.ResolvedPolicy.Preset {
t.Fatalf("preset mismatch: resolve=%q plan=%q",
resolveResp.Policy.Preset, planResp.ResolvedPolicy.Preset)
}
}
// TestQA_CP11B2_PlanThenCreate_OrderedCandidateParity verifies that plan and create
// derive the same ordered candidate list from the same cluster state.
func TestQA_CP11B2_PlanThenCreate_OrderedCandidateParity(t *testing.T) {
ms := qaPlanMaster(t)
// Record which servers create tries, in order.
var createAttempts []string
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
createAttempts = append(createAttempts, server)
return &blockAllocResult{
Path: fmt.Sprintf("/data/%s.blk", name),
IQN: fmt.Sprintf("iqn.2024.test:%s", name),
ISCSIAddr: server + ":3260",
ReplicaDataAddr: server + ":14260",
ReplicaCtrlAddr: server + ":14261",
RebuildListenAddr: server + ":15000",
}, nil
}
// Get plan
body := `{"name":"parity-test","size_bytes":1073741824,"replica_factor":2}`
planReq := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body))
planW := httptest.NewRecorder()
ms.blockVolumePlanHandler(planW, planReq)
var planResp blockapi.VolumePlanResponse
json.NewDecoder(planW.Body).Decode(&planResp)
// Create volume — allocate will record attempt order
createReq := &master_pb.CreateBlockVolumeRequest{
Name: "parity-test",
SizeBytes: 1073741824,
}
_, err := ms.CreateBlockVolume(context.Background(), createReq)
if err != nil {
t.Fatalf("create failed: %v", err)
}
// createAttempts[0] = primary attempt, createAttempts[1] = replica attempt
if len(createAttempts) < 2 {
t.Fatalf("expected at least 2 allocations (primary + replica), got %d", len(createAttempts))
}
// Plan's candidate order should match create's attempt order.
// Primary is Candidates[0], replica is from remaining Candidates.
if planResp.Plan.Primary != createAttempts[0] {
t.Fatalf("primary mismatch: plan=%q create=%q", planResp.Plan.Primary, createAttempts[0])
}
}
// TestQA_CP11B2_PlanThenCreate_ReplicaOrderParity verifies that the plan's replica
// ordering matches the create path's replica attempt ordering.
func TestQA_CP11B2_PlanThenCreate_ReplicaOrderParity(t *testing.T) {
ms := qaPlanMaster(t)
var allocOrder []string
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
allocOrder = append(allocOrder, server)
return &blockAllocResult{
Path: fmt.Sprintf("/data/%s.blk", name),
IQN: fmt.Sprintf("iqn.2024.test:%s", name),
ISCSIAddr: server + ":3260",
ReplicaDataAddr: server + ":14260",
ReplicaCtrlAddr: server + ":14261",
RebuildListenAddr: server + ":15000",
}, nil
}
// Plan with RF=3 (all 3 servers)
body := `{"name":"replica-order","size_bytes":1073741824,"replica_factor":3}`
planReq := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body))
planW := httptest.NewRecorder()
ms.blockVolumePlanHandler(planW, planReq)
var planResp blockapi.VolumePlanResponse
json.NewDecoder(planW.Body).Decode(&planResp)
// Create
createReq := &master_pb.CreateBlockVolumeRequest{
Name: "replica-order",
SizeBytes: 1073741824,
ReplicaFactor: 3,
}
_, err := ms.CreateBlockVolume(context.Background(), createReq)
if err != nil {
t.Fatalf("create failed: %v", err)
}
// allocOrder: [primary, replica1, replica2]
if len(allocOrder) != 3 {
t.Fatalf("expected 3 allocations, got %d", len(allocOrder))
}
// Plan candidates should match allocation order
if len(planResp.Plan.Candidates) != 3 {
t.Fatalf("expected 3 plan candidates, got %d", len(planResp.Plan.Candidates))
}
for i := 0; i < 3; i++ {
if planResp.Plan.Candidates[i] != allocOrder[i] {
t.Fatalf("candidate[%d] mismatch: plan=%q create=%q",
i, planResp.Plan.Candidates[i], allocOrder[i])
}
}
}
// TestQA_CP11B2_Create_FallbackOnRPCFailure verifies that when the first candidate
// fails RPC, create uses the next candidate from the same ordered list.
func TestQA_CP11B2_Create_FallbackOnRPCFailure(t *testing.T) {
ms := qaPlanMaster(t)
callCount := 0
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
callCount++
if callCount == 1 {
return nil, fmt.Errorf("simulated RPC failure")
}
return &blockAllocResult{
Path: fmt.Sprintf("/data/%s.blk", name),
IQN: fmt.Sprintf("iqn.2024.test:%s", name),
ISCSIAddr: server + ":3260",
ReplicaDataAddr: server + ":14260",
ReplicaCtrlAddr: server + ":14261",
RebuildListenAddr: server + ":15000",
}, nil
}
// Get plan to know expected order
body := `{"name":"fallback-test","size_bytes":1073741824,"replica_factor":1}`
planReq := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body))
planW := httptest.NewRecorder()
ms.blockVolumePlanHandler(planW, planReq)
var planResp blockapi.VolumePlanResponse
json.NewDecoder(planW.Body).Decode(&planResp)
if len(planResp.Plan.Candidates) < 2 {
t.Fatalf("need at least 2 candidates for fallback test, got %d", len(planResp.Plan.Candidates))
}
expectedFallback := planResp.Plan.Candidates[1]
// Create — first attempt fails, should fall back to second candidate
callCount = 0
createReq := &master_pb.CreateBlockVolumeRequest{
Name: "fallback-test",
SizeBytes: 1073741824,
}
resp, err := ms.CreateBlockVolume(context.Background(), createReq)
if err != nil {
t.Fatalf("create failed: %v", err)
}
// The volume should be on the second candidate (fallback)
entry, ok := ms.blockRegistry.Lookup("fallback-test")
if !ok {
t.Fatal("volume not in registry")
}
if entry.VolumeServer != expectedFallback {
t.Fatalf("expected fallback to %q, got %q", expectedFallback, entry.VolumeServer)
}
_ = resp
}
// TestQA_CP11B2_PlanIsReadOnly verifies that plan does not register volumes
// or enqueue assignments.
func TestQA_CP11B2_PlanIsReadOnly(t *testing.T) {
ms := qaPlanMaster(t)
// Snapshot state before
_, existsBefore := ms.blockRegistry.Lookup("readonly-test")
queueBefore := ms.blockAssignmentQueue.TotalPending()
body := `{"name":"readonly-test","size_bytes":1073741824,"replica_factor":2}`
req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body))
w := httptest.NewRecorder()
ms.blockVolumePlanHandler(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
// Verify no side effects
_, existsAfter := ms.blockRegistry.Lookup("readonly-test")
queueAfter := ms.blockAssignmentQueue.TotalPending()
if existsBefore || existsAfter {
t.Fatal("plan should not register volume")
}
if queueAfter != queueBefore {
t.Fatalf("plan should not enqueue assignments: before=%d after=%d", queueBefore, queueAfter)
}
}
// TestQA_CP11B2_RejectionReasonStability verifies that rejection reason strings
// match the defined constants — no typos, no ad-hoc strings.
func TestQA_CP11B2_RejectionReasonStability(t *testing.T) {
validReasons := map[string]bool{
ReasonDiskTypeMismatch: true,
ReasonInsufficientSpace: true,
ReasonAlreadySelected: true,
ReasonNoViablePrimary: true,
ReasonRFNotSatisfiable: true,
}
// Create a scenario that produces rejections
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0, DiskType: "ssd"},
{Address: "vs2:9333", VolumeCount: 0, DiskType: "hdd"},
{Address: "vs3:9333", VolumeCount: 0, AvailableBytes: 100}, // too small
}
result := evaluateBlockPlacement(candidates, 1, "ssd", 1<<30, blockvol.DurabilityBestEffort)
for _, r := range result.Rejections {
if !validReasons[r.Reason] {
t.Fatalf("rejection reason %q is not a known constant", r.Reason)
}
}
for _, e := range result.Errors {
if !validReasons[e] {
t.Fatalf("error reason %q is not a known constant", e)
}
}
}
// TestQA_CP11B2_DeterministicOrder_MultipleInvocations verifies that calling
// plan 10 times with the same state produces identical results each time.
func TestQA_CP11B2_DeterministicOrder_MultipleInvocations(t *testing.T) {
ms := qaPlanMaster(t)
body := `{"name":"determ-test","size_bytes":1073741824,"replica_factor":2}`
var firstResp blockapi.VolumePlanResponse
for i := 0; i < 10; i++ {
req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body))
w := httptest.NewRecorder()
ms.blockVolumePlanHandler(w, req)
var resp blockapi.VolumePlanResponse
json.NewDecoder(w.Body).Decode(&resp)
if i == 0 {
firstResp = resp
continue
}
// Compare with first response
if resp.Plan.Primary != firstResp.Plan.Primary {
t.Fatalf("invocation %d: primary %q != first %q", i, resp.Plan.Primary, firstResp.Plan.Primary)
}
if len(resp.Plan.Candidates) != len(firstResp.Plan.Candidates) {
t.Fatalf("invocation %d: candidate count %d != first %d",
i, len(resp.Plan.Candidates), len(firstResp.Plan.Candidates))
}
for j := range resp.Plan.Candidates {
if resp.Plan.Candidates[j] != firstResp.Plan.Candidates[j] {
t.Fatalf("invocation %d: candidates[%d] %q != first %q",
i, j, resp.Plan.Candidates[j], firstResp.Plan.Candidates[j])
}
}
}
}
// ============================================================
// CP11B-2 Review Round: Additional Adversarial Tests
// ============================================================
// QA-CP11B2-11: RF=0 treated as RF=1 (primary only, no replicas).
func TestQA_CP11B2_RF0_BehavesAsRF1(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0},
{Address: "vs2:9333", VolumeCount: 0},
}
result := evaluateBlockPlacement(candidates, 0, "", 0, blockvol.DurabilityBestEffort)
if result.Primary != "vs1:9333" {
t.Fatalf("primary: got %q, want vs1:9333", result.Primary)
}
if len(result.Replicas) != 0 {
t.Fatalf("replicas: got %d, want 0 for RF=0", len(result.Replicas))
}
if len(result.Errors) != 0 {
t.Fatalf("unexpected errors for RF=0: %v", result.Errors)
}
}
// QA-CP11B2-12: RF=1 with sync_all — no replica needed, no warning.
func TestQA_CP11B2_RF1_NoReplicaNeeded(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0},
}
result := evaluateBlockPlacement(candidates, 1, "", 0, blockvol.DurabilitySyncAll)
if result.Primary != "vs1:9333" {
t.Fatalf("primary: got %q", result.Primary)
}
if len(result.Warnings) != 0 {
t.Fatalf("RF=1 should not warn about replicas: %v", result.Warnings)
}
if len(result.Errors) != 0 {
t.Fatalf("RF=1 should not error: %v", result.Errors)
}
}
// QA-CP11B2-13: All candidates rejected by disk type → no_viable_primary.
func TestQA_CP11B2_AllRejected_DiskType(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0, DiskType: "hdd"},
{Address: "vs2:9333", VolumeCount: 0, DiskType: "hdd"},
}
result := evaluateBlockPlacement(candidates, 1, "ssd", 0, blockvol.DurabilityBestEffort)
if result.Primary != "" {
t.Fatalf("expected no primary, got %q", result.Primary)
}
if len(result.Rejections) != 2 {
t.Fatalf("expected 2 rejections, got %d", len(result.Rejections))
}
foundErr := false
for _, e := range result.Errors {
if e == ReasonNoViablePrimary {
foundErr = true
}
}
if !foundErr {
t.Fatalf("expected %q, got %v", ReasonNoViablePrimary, result.Errors)
}
}
// QA-CP11B2-14: All candidates rejected by capacity → no_viable_primary.
func TestQA_CP11B2_AllRejected_Capacity(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0, AvailableBytes: 1 << 20},
{Address: "vs2:9333", VolumeCount: 0, AvailableBytes: 2 << 20},
}
result := evaluateBlockPlacement(candidates, 1, "", 100<<20, blockvol.DurabilityBestEffort)
if result.Primary != "" {
t.Fatalf("expected no primary, got %q", result.Primary)
}
if len(result.Rejections) != 2 {
t.Fatalf("expected 2 rejections, got %d", len(result.Rejections))
}
}
// QA-CP11B2-15: Mixed rejections — disk + capacity + eligible.
func TestQA_CP11B2_MixedRejections(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0, DiskType: "hdd", AvailableBytes: 100 << 30},
{Address: "vs2:9333", VolumeCount: 0, DiskType: "ssd", AvailableBytes: 1 << 20},
{Address: "vs3:9333", VolumeCount: 0, DiskType: "ssd", AvailableBytes: 100 << 30},
{Address: "vs4:9333", VolumeCount: 5, DiskType: "ssd", AvailableBytes: 100 << 30},
}
result := evaluateBlockPlacement(candidates, 1, "ssd", 50<<30, blockvol.DurabilityBestEffort)
if result.Primary != "vs3:9333" {
t.Fatalf("primary: got %q, want vs3:9333", result.Primary)
}
if len(result.Rejections) != 2 {
t.Fatalf("expected 2 rejections, got %d", len(result.Rejections))
}
reasons := map[string]string{}
for _, r := range result.Rejections {
reasons[r.Server] = r.Reason
}
if reasons["vs1:9333"] != ReasonDiskTypeMismatch {
t.Fatalf("vs1: got %q", reasons["vs1:9333"])
}
if reasons["vs2:9333"] != ReasonInsufficientSpace {
t.Fatalf("vs2: got %q", reasons["vs2:9333"])
}
}
// QA-CP11B2-16: sync_quorum RF=3 filtered to 2 — quorum met, warn not error.
func TestQA_CP11B2_SyncQuorum_RF3_FilteredTo2(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0, DiskType: "ssd"},
{Address: "vs2:9333", VolumeCount: 0, DiskType: "ssd"},
{Address: "vs3:9333", VolumeCount: 0, DiskType: "hdd"},
}
result := evaluateBlockPlacement(candidates, 3, "ssd", 0, blockvol.DurabilitySyncQuorum)
if len(result.Errors) != 0 {
t.Fatalf("quorum met, should not error: %v", result.Errors)
}
foundWarn := false
for _, w := range result.Warnings {
if w == ReasonRFNotSatisfiable {
foundWarn = true
}
}
if !foundWarn {
t.Fatalf("expected rf_not_satisfiable warning, got %v", result.Warnings)
}
}
// QA-CP11B2-17: Unknown DiskType passes any filter.
func TestQA_CP11B2_UnknownDiskType_PassesFilter(t *testing.T) {
candidates := []PlacementCandidateInfo{
{Address: "vs1:9333", VolumeCount: 0, DiskType: ""},
{Address: "vs2:9333", VolumeCount: 0, DiskType: "ssd"},
{Address: "vs3:9333", VolumeCount: 0, DiskType: "hdd"},
}
result := evaluateBlockPlacement(candidates, 1, "ssd", 0, blockvol.DurabilityBestEffort)
if len(result.Candidates) != 2 {
t.Fatalf("expected 2 eligible, got %d: %v", len(result.Candidates), result.Candidates)
}
if result.Primary != "vs1:9333" {
t.Fatalf("primary: got %q, want vs1:9333 (unknown passes)", result.Primary)
}
}
// QA-CP11B2-18: 50-server list — deterministic ordering.
func TestQA_CP11B2_LargeCandidateList(t *testing.T) {
candidates := make([]PlacementCandidateInfo, 50)
for i := range candidates {
candidates[i] = PlacementCandidateInfo{
Address: fmt.Sprintf("vs%02d:9333", i),
VolumeCount: i % 5,
}
}
result := evaluateBlockPlacement(candidates, 3, "", 0, blockvol.DurabilityBestEffort)
if result.Primary != "vs00:9333" {
t.Fatalf("primary: got %q, want vs00:9333", result.Primary)
}
if result.Replicas[0] != "vs05:9333" {
t.Fatalf("replica[0]: got %q, want vs05:9333", result.Replicas[0])
}
if len(result.Candidates) != 50 {
t.Fatalf("candidates: got %d, want 50", len(result.Candidates))
}
result2 := evaluateBlockPlacement(candidates, 3, "", 0, blockvol.DurabilityBestEffort)
if result.Primary != result2.Primary {
t.Fatalf("not deterministic")
}
}
// QA-CP11B2-19: Failed primary still tried as replica.
func TestQA_CP11B2_FailedPrimary_TriedAsReplica(t *testing.T) {
ms := qaPlanMaster(t)
var allocLog []string
callCount := 0
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
allocLog = append(allocLog, server)
callCount++
if callCount == 1 {
return nil, fmt.Errorf("simulated primary failure")
}
return &blockAllocResult{
Path: fmt.Sprintf("/data/%s.blk", name), IQN: fmt.Sprintf("iqn.test:%s", name),
ISCSIAddr: server + ":3260", ReplicaDataAddr: server + ":14260",
ReplicaCtrlAddr: server + ":14261", RebuildListenAddr: server + ":15000",
}, nil
}
_, err := ms.CreateBlockVolume(context.Background(), &master_pb.CreateBlockVolumeRequest{
Name: "fallback-replica", SizeBytes: 1 << 30, ReplicaFactor: 2,
})
if err != nil {
t.Fatalf("create failed: %v", err)
}
entry, ok := ms.blockRegistry.Lookup("fallback-replica")
if !ok {
t.Fatal("not in registry")
}
if entry.VolumeServer == allocLog[0] {
t.Fatalf("primary should not be the failed server %q", allocLog[0])
}
}
// QA-CP11B2-20: Plan with invalid preset — errors, not panic.
func TestQA_CP11B2_PlanWithInvalidPreset(t *testing.T) {
ms := qaPlanMaster(t)
body := `{"name":"bad","size_bytes":1073741824,"preset":"nonexistent"}`
req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body))
w := httptest.NewRecorder()
ms.blockVolumePlanHandler(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var resp blockapi.VolumePlanResponse
json.NewDecoder(w.Body).Decode(&resp)
if len(resp.Errors) == 0 {
t.Fatal("expected errors for invalid preset")
}
if resp.Plan.Candidates == nil {
t.Fatal("candidates must never be nil")
}
}

21
weed/storage/blockvol/blockapi/client.go

@ -212,6 +212,27 @@ func (c *Client) ResolvePolicy(ctx context.Context, req CreateVolumeRequest) (*R
return &out, nil
}
// PlanVolume requests a placement plan without creating a volume.
func (c *Client) PlanVolume(ctx context.Context, req CreateVolumeRequest) (*VolumePlanResponse, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
resp, err := c.doRequest(ctx, http.MethodPost, "/block/volume/plan", bytes.NewReader(body))
if err != nil {
return nil, err
}
defer resp.Body.Close()
if err := checkStatus(resp, http.StatusOK); err != nil {
return nil, err
}
var out VolumePlanResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return &out, nil
}
// ListServers lists all block-capable volume servers.
func (c *Client) ListServers(ctx context.Context) ([]ServerInfo, error) {
resp, err := c.doRequest(ctx, http.MethodGet, "/block/servers", nil)

32
weed/storage/blockvol/blockapi/types.go

@ -42,6 +42,8 @@ type VolumeInfo struct {
Preset string `json:"preset,omitempty"` // CP11B-1: preset used at creation
NvmeAddr string `json:"nvme_addr,omitempty"`
NQN string `json:"nqn,omitempty"`
// CP11B-4: Operator-facing health state.
HealthState string `json:"health_state"` // "healthy", "degraded", "rebuilding", "unsafe"
}
// ResolvedPolicyResponse is the response for POST /block/volume/resolve.
@ -123,6 +125,12 @@ type BlockStatusResponse struct {
FailoversTotal int64 `json:"failovers_total"`
RebuildsTotal int64 `json:"rebuilds_total"`
AssignmentQueueDepth int `json:"assignment_queue_depth"`
// CP11B-4: Operator summary fields.
HealthyCount int `json:"healthy_count"`
DegradedCount int `json:"degraded_count"`
RebuildingCount int `json:"rebuilding_count"`
UnsafeCount int `json:"unsafe_count"`
NvmeCapableServers int `json:"nvme_capable_servers"`
}
// PreflightRejection describes why a specific replica was rejected for promotion.
@ -144,6 +152,30 @@ type PreflightResponse struct {
PrimaryAlive bool `json:"primary_alive"`
}
// VolumePlanResponse is the response for POST /block/volume/plan.
type VolumePlanResponse struct {
ResolvedPolicy ResolvedPolicyView `json:"resolved_policy"`
Plan VolumePlanView `json:"plan"`
Warnings []string `json:"warnings,omitempty"`
Errors []string `json:"errors,omitempty"`
}
// VolumePlanView describes the placement plan.
// Candidates is the full ordered eligible list and is always present (empty slice, never omitted).
// ReplicaFactor means total copies including primary: RF=2 → 1 primary + 1 replica.
type VolumePlanView struct {
Primary string `json:"primary"`
Replicas []string `json:"replicas,omitempty"`
Candidates []string `json:"candidates"`
Rejections []VolumePlanRejection `json:"rejections,omitempty"`
}
// VolumePlanRejection explains why a candidate server was not selected.
type VolumePlanRejection struct {
Server string `json:"server"`
Reason string `json:"reason"`
}
// RoleFromString converts a role string to its uint32 wire value.
// Returns 0 (RoleNone) for unrecognized strings.
func RoleFromString(s string) uint32 {

Loading…
Cancel
Save