From f501c630096d17eb46b4f36e264aa3f8e4be6228 Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Mon, 23 Mar 2026 02:12:25 -0700 Subject: [PATCH] feat: CP11B-2 explainable placement / plan API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New POST /block/volume/plan endpoint returns full placement preview: resolved policy, ordered candidate list, selected primary/replicas, and per-server rejection reasons with stable string constants. Core design: evaluateBlockPlacement() is a pure function with no registry/topology dependency. gatherPlacementCandidates() is the single topology bridge point. Plan and create share the same planner — parity contract is same ordered candidate list for same cluster state. Create path refactored: uses evaluateBlockPlacement() instead of PickServer(), iterates all candidates (no 3-retry cap), recomputes replica order after primary fallback. rf_not_satisfiable severity is durability-mode-aware (warning for best_effort, error for strict). 15 unit tests + 20 QA adversarial tests. Co-Authored-By: Claude Opus 4.6 (1M context) --- weed/server/master_block_plan.go | 190 ++++++ weed/server/master_block_plan_test.go | 382 +++++++++++ weed/server/master_block_registry.go | 41 +- weed/server/master_grpc_server_block.go | 37 +- weed/server/master_server.go | 1 + .../qa_block_cp11b2_adversarial_test.go | 622 ++++++++++++++++++ weed/storage/blockvol/blockapi/client.go | 21 + weed/storage/blockvol/blockapi/types.go | 32 + 8 files changed, 1306 insertions(+), 20 deletions(-) create mode 100644 weed/server/master_block_plan.go create mode 100644 weed/server/master_block_plan_test.go create mode 100644 weed/server/qa_block_cp11b2_adversarial_test.go diff --git a/weed/server/master_block_plan.go b/weed/server/master_block_plan.go new file mode 100644 index 000000000..8b409c7ea --- /dev/null +++ b/weed/server/master_block_plan.go @@ -0,0 +1,190 @@ +package weed_server + +import ( + "sort" + + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/blockapi" +) + +// Stable placement rejection reason constants. +// These are product-grade strings suitable for UI/automation consumption. +const ( + ReasonDiskTypeMismatch = "disk_type_mismatch" + ReasonInsufficientSpace = "insufficient_space" + ReasonAlreadySelected = "already_selected" +) + +// Plan-level error reasons (not per-server). +const ( + ReasonNoViablePrimary = "no_viable_primary" + ReasonRFNotSatisfiable = "rf_not_satisfiable" +) + +// PlacementRejection records one server rejection with stable reason. +type PlacementRejection struct { + Server string + Reason string +} + +// PlacementResult is the full output of the placement planner. +// Candidates is the ordered eligible list — Primary is Candidates[0], +// Replicas is Candidates[1:RF]. All derived from the same sequence. +type PlacementResult struct { + Candidates []string // full ordered eligible list + Primary string + Replicas []string + Rejections []PlacementRejection + Warnings []string + Errors []string +} + +// evaluateBlockPlacement takes candidates and request parameters, +// applies filters, scores deterministically, and returns a placement plan. +// Pure function: no side effects, no registry/topology dependency. +func evaluateBlockPlacement( + candidates []PlacementCandidateInfo, + replicaFactor int, + diskType string, + sizeBytes uint64, + durabilityMode blockvol.DurabilityMode, +) PlacementResult { + var result PlacementResult + + // Filter phase: reject ineligible candidates. + type eligible struct { + address string + volumeCount int + } + var kept []eligible + + for _, c := range candidates { + // Disk type filter: skip when either side is empty (unknown/any). + if diskType != "" && c.DiskType != "" && c.DiskType != diskType { + result.Rejections = append(result.Rejections, PlacementRejection{ + Server: c.Address, + Reason: ReasonDiskTypeMismatch, + }) + continue + } + // Capacity filter: skip when AvailableBytes is 0 (unknown). + if sizeBytes > 0 && c.AvailableBytes > 0 && c.AvailableBytes < sizeBytes { + result.Rejections = append(result.Rejections, PlacementRejection{ + Server: c.Address, + Reason: ReasonInsufficientSpace, + }) + continue + } + kept = append(kept, eligible{address: c.Address, volumeCount: c.VolumeCount}) + } + + // Sort phase: volume count ascending, then address ascending (deterministic). + sort.Slice(kept, func(i, j int) bool { + if kept[i].volumeCount != kept[j].volumeCount { + return kept[i].volumeCount < kept[j].volumeCount + } + return kept[i].address < kept[j].address + }) + + // Build ordered candidate list. + result.Candidates = make([]string, len(kept)) + for i, k := range kept { + result.Candidates[i] = k.address + } + + // Select phase. + if len(result.Candidates) == 0 { + result.Errors = append(result.Errors, ReasonNoViablePrimary) + return result + } + + result.Primary = result.Candidates[0] + + // Replicas: RF means total copies including primary. + replicasNeeded := replicaFactor - 1 + if replicasNeeded > 0 { + available := len(result.Candidates) - 1 // exclude primary + if available >= replicasNeeded { + result.Replicas = result.Candidates[1 : 1+replicasNeeded] + } else { + // Partial or zero replicas available. + if available > 0 { + result.Replicas = result.Candidates[1:] + } + // Severity depends on durability mode: strict modes error, best_effort warns. + requiredReplicas := durabilityMode.RequiredReplicas(replicaFactor) + if available < requiredReplicas { + result.Errors = append(result.Errors, ReasonRFNotSatisfiable) + } else { + result.Warnings = append(result.Warnings, ReasonRFNotSatisfiable) + } + } + } + + return result +} + +// gatherPlacementCandidates reads candidate data from the block registry. +// This is the topology bridge point: today it reads from the registry, +// long-term it would read from weed/topology. +func (ms *MasterServer) gatherPlacementCandidates() []PlacementCandidateInfo { + return ms.blockRegistry.PlacementCandidates() +} + +// PlanBlockVolume is the top-level planning function. +// Resolves policy, gathers candidates, evaluates placement, builds response. +func (ms *MasterServer) PlanBlockVolume(req *blockapi.CreateVolumeRequest) *blockapi.VolumePlanResponse { + env := ms.buildEnvironmentInfo() + resolved := blockvol.ResolvePolicy(blockvol.PresetName(req.Preset), + req.DurabilityMode, req.ReplicaFactor, req.DiskType, env) + + resp := &blockapi.VolumePlanResponse{ + ResolvedPolicy: blockapi.ResolvedPolicyView{ + Preset: string(resolved.Policy.Preset), + DurabilityMode: resolved.Policy.DurabilityMode, + ReplicaFactor: resolved.Policy.ReplicaFactor, + DiskType: resolved.Policy.DiskType, + TransportPreference: resolved.Policy.TransportPref, + WorkloadHint: resolved.Policy.WorkloadHint, + WALSizeRecommended: resolved.Policy.WALSizeRecommended, + StorageProfile: resolved.Policy.StorageProfile, + }, + Warnings: resolved.Warnings, + Errors: resolved.Errors, + Plan: blockapi.VolumePlanView{Candidates: []string{}}, // never nil + } + + // If resolve has errors, return without placement evaluation. + if len(resolved.Errors) > 0 { + return resp + } + + durMode, _ := blockvol.ParseDurabilityMode(resolved.Policy.DurabilityMode) + candidates := ms.gatherPlacementCandidates() + placement := evaluateBlockPlacement(candidates, resolved.Policy.ReplicaFactor, + resolved.Policy.DiskType, req.SizeBytes, durMode) + + resp.Plan = blockapi.VolumePlanView{ + Primary: placement.Primary, + Replicas: placement.Replicas, + Candidates: placement.Candidates, + } + // Ensure Candidates is never nil (stable response shape). + if resp.Plan.Candidates == nil { + resp.Plan.Candidates = []string{} + } + + // Convert internal rejections to API type. + for _, r := range placement.Rejections { + resp.Plan.Rejections = append(resp.Plan.Rejections, blockapi.VolumePlanRejection{ + Server: r.Server, + Reason: r.Reason, + }) + } + + // Merge placement warnings and errors. + resp.Warnings = append(resp.Warnings, placement.Warnings...) + resp.Errors = append(resp.Errors, placement.Errors...) + + return resp +} diff --git a/weed/server/master_block_plan_test.go b/weed/server/master_block_plan_test.go new file mode 100644 index 000000000..3183f3875 --- /dev/null +++ b/weed/server/master_block_plan_test.go @@ -0,0 +1,382 @@ +package weed_server + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" + + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/blockapi" +) + +// --- evaluateBlockPlacement unit tests --- + +func TestEvaluateBlockPlacement_SingleCandidate_RF1(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0}, + } + result := evaluateBlockPlacement(candidates, 1, "", 0, blockvol.DurabilityBestEffort) + if result.Primary != "vs1:9333" { + t.Fatalf("expected primary vs1:9333, got %q", result.Primary) + } + if len(result.Replicas) != 0 { + t.Fatalf("expected 0 replicas, got %d", len(result.Replicas)) + } + if len(result.Rejections) != 0 { + t.Fatalf("expected 0 rejections, got %d", len(result.Rejections)) + } + if len(result.Candidates) != 1 || result.Candidates[0] != "vs1:9333" { + t.Fatalf("expected candidates [vs1:9333], got %v", result.Candidates) + } +} + +func TestEvaluateBlockPlacement_LeastLoaded(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 5}, + {Address: "vs2:9333", VolumeCount: 2}, + {Address: "vs3:9333", VolumeCount: 8}, + } + result := evaluateBlockPlacement(candidates, 1, "", 0, blockvol.DurabilityBestEffort) + if result.Primary != "vs2:9333" { + t.Fatalf("expected least-loaded vs2:9333 as primary, got %q", result.Primary) + } + // Candidates should be sorted: vs2 (2), vs1 (5), vs3 (8) + expected := []string{"vs2:9333", "vs1:9333", "vs3:9333"} + if len(result.Candidates) != 3 { + t.Fatalf("expected 3 candidates, got %d", len(result.Candidates)) + } + for i, e := range expected { + if result.Candidates[i] != e { + t.Fatalf("candidates[%d]: expected %q, got %q", i, e, result.Candidates[i]) + } + } +} + +func TestEvaluateBlockPlacement_DeterministicTiebreak(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs3:9333", VolumeCount: 0}, + {Address: "vs1:9333", VolumeCount: 0}, + {Address: "vs2:9333", VolumeCount: 0}, + } + result := evaluateBlockPlacement(candidates, 1, "", 0, blockvol.DurabilityBestEffort) + // All same count — address tiebreaker: vs1, vs2, vs3 + if result.Primary != "vs1:9333" { + t.Fatalf("expected vs1:9333 (lowest address), got %q", result.Primary) + } + expected := []string{"vs1:9333", "vs2:9333", "vs3:9333"} + for i, e := range expected { + if result.Candidates[i] != e { + t.Fatalf("candidates[%d]: expected %q, got %q", i, e, result.Candidates[i]) + } + } +} + +func TestEvaluateBlockPlacement_RF2_PrimaryAndReplica(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 3}, + {Address: "vs2:9333", VolumeCount: 1}, + {Address: "vs3:9333", VolumeCount: 5}, + } + result := evaluateBlockPlacement(candidates, 2, "", 0, blockvol.DurabilityBestEffort) + if result.Primary != "vs2:9333" { + t.Fatalf("expected primary vs2:9333, got %q", result.Primary) + } + if len(result.Replicas) != 1 || result.Replicas[0] != "vs1:9333" { + t.Fatalf("expected replicas [vs1:9333], got %v", result.Replicas) + } + if len(result.Errors) != 0 { + t.Fatalf("unexpected errors: %v", result.Errors) + } +} + +func TestEvaluateBlockPlacement_RF3_AllSelected(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0}, + {Address: "vs2:9333", VolumeCount: 0}, + {Address: "vs3:9333", VolumeCount: 0}, + } + result := evaluateBlockPlacement(candidates, 3, "", 0, blockvol.DurabilityBestEffort) + if result.Primary != "vs1:9333" { + t.Fatalf("expected primary vs1:9333, got %q", result.Primary) + } + if len(result.Replicas) != 2 { + t.Fatalf("expected 2 replicas, got %d", len(result.Replicas)) + } + if len(result.Errors) != 0 { + t.Fatalf("unexpected errors: %v", result.Errors) + } +} + +func TestEvaluateBlockPlacement_RF_ExceedsServers(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0}, + {Address: "vs2:9333", VolumeCount: 0}, + } + result := evaluateBlockPlacement(candidates, 3, "", 0, blockvol.DurabilityBestEffort) + // Primary should be selected, but only 1 replica possible out of 2 needed + if result.Primary != "vs1:9333" { + t.Fatalf("expected primary vs1:9333, got %q", result.Primary) + } + if len(result.Replicas) != 1 { + t.Fatalf("expected 1 partial replica, got %d", len(result.Replicas)) + } + // Should have rf_not_satisfiable warning + found := false + for _, w := range result.Warnings { + if w == ReasonRFNotSatisfiable { + found = true + } + } + if !found { + t.Fatalf("expected warning %q, got %v", ReasonRFNotSatisfiable, result.Warnings) + } +} + +// TestEvaluateBlockPlacement_SingleServer_BestEffort_RF2 verifies that with one server, +// RF=2, best_effort: plan warns (not errors), matching create behavior which succeeds as single-copy. +func TestEvaluateBlockPlacement_SingleServer_BestEffort_RF2(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0}, + } + result := evaluateBlockPlacement(candidates, 2, "", 0, blockvol.DurabilityBestEffort) + if result.Primary != "vs1:9333" { + t.Fatalf("expected primary vs1:9333, got %q", result.Primary) + } + // best_effort: zero replicas available should be a warning, not error + if len(result.Errors) != 0 { + t.Fatalf("best_effort should not produce errors, got %v", result.Errors) + } + found := false + for _, w := range result.Warnings { + if w == ReasonRFNotSatisfiable { + found = true + } + } + if !found { + t.Fatalf("expected warning %q, got %v", ReasonRFNotSatisfiable, result.Warnings) + } +} + +// TestEvaluateBlockPlacement_SingleServer_SyncAll_RF2 verifies that with one server, +// RF=2, sync_all: plan errors because sync_all requires replicas. +func TestEvaluateBlockPlacement_SingleServer_SyncAll_RF2(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0}, + } + result := evaluateBlockPlacement(candidates, 2, "", 0, blockvol.DurabilitySyncAll) + if result.Primary != "vs1:9333" { + t.Fatalf("expected primary vs1:9333, got %q", result.Primary) + } + // sync_all: zero replicas available should be an error + found := false + for _, e := range result.Errors { + if e == ReasonRFNotSatisfiable { + found = true + } + } + if !found { + t.Fatalf("expected error %q for sync_all, got errors=%v warnings=%v", + ReasonRFNotSatisfiable, result.Errors, result.Warnings) + } +} + +func TestEvaluateBlockPlacement_NoServers(t *testing.T) { + result := evaluateBlockPlacement(nil, 1, "", 0, blockvol.DurabilityBestEffort) + if result.Primary != "" { + t.Fatalf("expected empty primary, got %q", result.Primary) + } + found := false + for _, e := range result.Errors { + if e == ReasonNoViablePrimary { + found = true + } + } + if !found { + t.Fatalf("expected error %q, got %v", ReasonNoViablePrimary, result.Errors) + } + if len(result.Candidates) != 0 { + t.Fatalf("expected empty candidates, got %v", result.Candidates) + } +} + +func TestEvaluateBlockPlacement_DiskTypeMismatch(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0, DiskType: "ssd"}, + {Address: "vs2:9333", VolumeCount: 0, DiskType: "hdd"}, + {Address: "vs3:9333", VolumeCount: 0, DiskType: "ssd"}, + } + result := evaluateBlockPlacement(candidates, 1, "ssd", 0, blockvol.DurabilityBestEffort) + if result.Primary != "vs1:9333" { + t.Fatalf("expected primary vs1:9333, got %q", result.Primary) + } + // vs2 should be rejected + if len(result.Rejections) != 1 || result.Rejections[0].Server != "vs2:9333" { + t.Fatalf("expected vs2:9333 rejected, got %v", result.Rejections) + } + if result.Rejections[0].Reason != ReasonDiskTypeMismatch { + t.Fatalf("expected reason %q, got %q", ReasonDiskTypeMismatch, result.Rejections[0].Reason) + } + if len(result.Candidates) != 2 { + t.Fatalf("expected 2 eligible candidates, got %d", len(result.Candidates)) + } +} + +func TestEvaluateBlockPlacement_InsufficientSpace(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0, AvailableBytes: 100 << 30}, // 100GB + {Address: "vs2:9333", VolumeCount: 0, AvailableBytes: 5 << 30}, // 5GB + } + result := evaluateBlockPlacement(candidates, 1, "", 10<<30, blockvol.DurabilityBestEffort) // request 10GB + if result.Primary != "vs1:9333" { + t.Fatalf("expected primary vs1:9333, got %q", result.Primary) + } + if len(result.Rejections) != 1 || result.Rejections[0].Server != "vs2:9333" { + t.Fatalf("expected vs2:9333 rejected, got %v", result.Rejections) + } + if result.Rejections[0].Reason != ReasonInsufficientSpace { + t.Fatalf("expected reason %q, got %q", ReasonInsufficientSpace, result.Rejections[0].Reason) + } +} + +func TestEvaluateBlockPlacement_UnknownCapacity_Allowed(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0, AvailableBytes: 0}, // unknown + {Address: "vs2:9333", VolumeCount: 0, AvailableBytes: 0}, // unknown + } + result := evaluateBlockPlacement(candidates, 1, "", 10<<30, blockvol.DurabilityBestEffort) + if result.Primary != "vs1:9333" { + t.Fatalf("expected primary vs1:9333, got %q", result.Primary) + } + if len(result.Rejections) != 0 { + t.Fatalf("expected no rejections for unknown capacity, got %v", result.Rejections) + } +} + +// --- HTTP handler tests --- + +func qaPlanMaster(t *testing.T) *MasterServer { + t.Helper() + ms := &MasterServer{ + blockRegistry: NewBlockVolumeRegistry(), + blockAssignmentQueue: NewBlockAssignmentQueue(), + blockFailover: newBlockFailoverState(), + } + ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) { + return &blockAllocResult{ + Path: fmt.Sprintf("/data/%s.blk", name), + IQN: fmt.Sprintf("iqn.2024.test:%s", name), + ISCSIAddr: server + ":3260", + ReplicaDataAddr: server + ":14260", + ReplicaCtrlAddr: server + ":14261", + RebuildListenAddr: server + ":15000", + }, nil + } + ms.blockVSDelete = func(ctx context.Context, server string, name string) error { + return nil + } + ms.blockRegistry.MarkBlockCapable("vs1:9333") + ms.blockRegistry.MarkBlockCapable("vs2:9333") + ms.blockRegistry.MarkBlockCapable("vs3:9333") + return ms +} + +func TestBlockVolumePlanHandler_HappyPath(t *testing.T) { + ms := qaPlanMaster(t) + body := `{"name":"test-vol","size_bytes":1073741824,"replica_factor":2}` + req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + ms.blockVolumePlanHandler(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp blockapi.VolumePlanResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.Plan.Primary == "" { + t.Fatal("expected non-empty primary") + } + if len(resp.Plan.Candidates) == 0 { + t.Fatal("expected non-empty candidates") + } + if resp.Plan.Candidates == nil { + t.Fatal("candidates must never be nil") + } + if len(resp.Plan.Replicas) != 1 { + t.Fatalf("expected 1 replica for RF=2, got %d", len(resp.Plan.Replicas)) + } + if len(resp.Errors) != 0 { + t.Fatalf("unexpected errors: %v", resp.Errors) + } +} + +func TestBlockVolumePlanHandler_WithPreset(t *testing.T) { + ms := qaPlanMaster(t) + body := `{"name":"db-vol","size_bytes":1073741824,"preset":"database"}` + req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + ms.blockVolumePlanHandler(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + + var resp blockapi.VolumePlanResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.ResolvedPolicy.Preset != "database" { + t.Fatalf("expected preset database, got %q", resp.ResolvedPolicy.Preset) + } + if resp.ResolvedPolicy.DurabilityMode != "sync_all" { + t.Fatalf("expected sync_all, got %q", resp.ResolvedPolicy.DurabilityMode) + } + if resp.Plan.Primary == "" { + t.Fatal("expected non-empty primary") + } +} + +func TestBlockVolumePlanHandler_NoServers(t *testing.T) { + ms := &MasterServer{ + blockRegistry: NewBlockVolumeRegistry(), + blockAssignmentQueue: NewBlockAssignmentQueue(), + } + body := `{"name":"test-vol","size_bytes":1073741824}` + req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + ms.blockVolumePlanHandler(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 even with errors, got %d", w.Code) + } + + var resp blockapi.VolumePlanResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if len(resp.Errors) == 0 { + t.Fatal("expected errors for no servers") + } + found := false + for _, e := range resp.Errors { + if e == ReasonNoViablePrimary { + found = true + } + } + if !found { + t.Fatalf("expected %q in errors, got %v", ReasonNoViablePrimary, resp.Errors) + } + if resp.Plan.Candidates == nil { + t.Fatal("candidates must never be nil, even on error") + } +} diff --git a/weed/server/master_block_registry.go b/weed/server/master_block_registry.go index 7910d0cf1..df12fcd0f 100644 --- a/weed/server/master_block_registry.go +++ b/weed/server/master_block_registry.go @@ -158,7 +158,20 @@ type inflightEntry struct{} // blockServerInfo tracks server-level capabilities reported via heartbeat. type blockServerInfo struct { - NvmeAddr string // NVMe/TCP listen address; empty if NVMe disabled + NvmeAddr string // NVMe/TCP listen address; empty if NVMe disabled + DiskType string // reported via heartbeat (future) + AvailableBytes uint64 // reported via heartbeat (future) +} + +// PlacementCandidateInfo is the registry's view of a placement candidate. +// Used by the placement planner — the single bridge point between registry +// and the pure evaluateBlockPlacement() function. +type PlacementCandidateInfo struct { + Address string + VolumeCount int + DiskType string // empty = unknown/any + AvailableBytes uint64 // 0 = unknown + NvmeCapable bool } @@ -1337,6 +1350,32 @@ func (r *BlockVolumeRegistry) ServerSummaries() []BlockServerSummary { return summaries } +// PlacementCandidates returns enriched candidate information for placement planning. +// This is the bridge point between the registry and the placement planner. +// Long-term, this would be replaced by topology-backed candidate gathering. +func (r *BlockVolumeRegistry) PlacementCandidates() []PlacementCandidateInfo { + r.mu.RLock() + defer r.mu.RUnlock() + candidates := make([]PlacementCandidateInfo, 0, len(r.blockServers)) + for addr, info := range r.blockServers { + count := 0 + if names, ok := r.byServer[addr]; ok { + count = len(names) + } + c := PlacementCandidateInfo{ + Address: addr, + VolumeCount: count, + } + if info != nil { + c.NvmeCapable = info.NvmeAddr != "" + c.DiskType = info.DiskType + c.AvailableBytes = info.AvailableBytes + } + candidates = append(candidates, c) + } + return candidates +} + // IsBlockCapable returns true if the given server is in the block-capable set (alive). func (r *BlockVolumeRegistry) IsBlockCapable(server string) bool { r.mu.RLock() diff --git a/weed/server/master_grpc_server_block.go b/weed/server/master_grpc_server_block.go index 9c70dfad8..58a5c5f71 100644 --- a/weed/server/master_grpc_server_block.go +++ b/weed/server/master_grpc_server_block.go @@ -74,30 +74,22 @@ func (ms *MasterServer) CreateBlockVolume(ctx context.Context, req *master_pb.Cr return ms.createBlockVolumeResponseFromEntry(entry), nil } - // Get candidate servers. - servers := ms.blockRegistry.BlockCapableServers() - if len(servers) == 0 { + // Evaluate placement using the shared planner (parity with /block/volume/plan). + candidates := ms.gatherPlacementCandidates() + placement := evaluateBlockPlacement(candidates, replicaFactor, req.DiskType, req.SizeBytes, durMode) + if len(placement.Candidates) == 0 { return nil, fmt.Errorf("no block volume servers available") } - // Try up to 3 servers (or all available, whichever is smaller). - maxRetries := 3 - if len(servers) < maxRetries { - maxRetries = len(servers) - } - + // Try all candidates in planner order; fall back to next on RPC failure. var lastErr error - for attempt := 0; attempt < maxRetries; attempt++ { - server, err := ms.blockRegistry.PickServer(servers) - if err != nil { - return nil, err - } + for attempt := 0; attempt < len(placement.Candidates); attempt++ { + server := placement.Candidates[attempt] result, err := ms.blockVSAllocate(ctx, server, req.Name, req.SizeBytes, req.DiskType, req.DurabilityMode) if err != nil { lastErr = fmt.Errorf("server %s: %w", server, err) glog.V(0).Infof("[reqID=%s] CreateBlockVolume %q: attempt %d on %s failed: %v", blockReqID(ctx), req.Name, attempt+1, server, err) - servers = removeServer(servers, server) continue } @@ -119,12 +111,19 @@ func (ms *MasterServer) CreateBlockVolume(ctx context.Context, req *master_pb.Cr LastLeaseGrant: time.Now(), // R2-F1: set BEFORE Register to avoid stale-lease race } + // Recompute replica attempt order from Candidates with successful primary removed. + replicaCandidates := make([]string, 0, len(placement.Candidates)-1) + for _, c := range placement.Candidates { + if c != server { + replicaCandidates = append(replicaCandidates, c) + } + } + // Create replicaFactor-1 replicas on different servers (F4: partial create OK). - remaining := removeServer(servers, server) - for i := 0; i < replicaFactor-1 && len(remaining) > 0; i++ { - replicaServer := ms.tryCreateOneReplica(ctx, req, entry, result, remaining) + for i := 0; i < replicaFactor-1 && len(replicaCandidates) > 0; i++ { + replicaServer := ms.tryCreateOneReplica(ctx, req, entry, result, replicaCandidates) if replicaServer != "" { - remaining = removeServer(remaining, replicaServer) + replicaCandidates = removeServer(replicaCandidates, replicaServer) } } if len(entry.Replicas) == 0 && replicaFactor > 1 { diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 8b8fe9006..69bb7f01f 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -228,6 +228,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se r.HandleFunc("/block/volume/{name}", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeLookupHandler))).Methods("GET") r.HandleFunc("/block/volumes", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeListHandler))).Methods("GET") r.HandleFunc("/block/volume/resolve", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeResolveHandler))).Methods("POST") + r.HandleFunc("/block/volume/plan", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumePlanHandler)))).Methods("POST") r.HandleFunc("/block/volume/{name}/expand", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeExpandHandler)))).Methods("POST") r.HandleFunc("/block/volume/{name}/preflight", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumePreflightHandler))).Methods("GET") r.HandleFunc("/block/volume/{name}/promote", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumePromoteHandler)))).Methods("POST") diff --git a/weed/server/qa_block_cp11b2_adversarial_test.go b/weed/server/qa_block_cp11b2_adversarial_test.go new file mode 100644 index 000000000..17097b669 --- /dev/null +++ b/weed/server/qa_block_cp11b2_adversarial_test.go @@ -0,0 +1,622 @@ +package weed_server + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/blockapi" +) + +// --- QA adversarial tests for CP11B-2: Explainable Placement / Plan API --- + +// TestQA_CP11B2_ConcurrentPlanCalls verifies that 100 concurrent plan calls +// complete without panic or data race. +func TestQA_CP11B2_ConcurrentPlanCalls(t *testing.T) { + ms := qaPlanMaster(t) + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + preset := "" + if n%3 == 0 { + preset = "database" + } else if n%3 == 1 { + preset = "general" + } + body := fmt.Sprintf(`{"name":"vol-%d","size_bytes":1073741824,"preset":"%s"}`, n, preset) + req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body)) + w := httptest.NewRecorder() + ms.blockVolumePlanHandler(w, req) + if w.Code != http.StatusOK { + t.Errorf("goroutine %d: expected 200, got %d", n, w.Code) + } + }(i) + } + wg.Wait() +} + +// TestQA_CP11B2_NoBlockCapableServers verifies that plan returns a structured +// error when no servers are available, not a panic. +func TestQA_CP11B2_NoBlockCapableServers(t *testing.T) { + ms := &MasterServer{ + blockRegistry: NewBlockVolumeRegistry(), + blockAssignmentQueue: NewBlockAssignmentQueue(), + blockFailover: newBlockFailoverState(), + } + body := `{"name":"test-vol","size_bytes":1073741824,"replica_factor":2}` + req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body)) + w := httptest.NewRecorder() + ms.blockVolumePlanHandler(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + var resp blockapi.VolumePlanResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if len(resp.Errors) == 0 { + t.Fatal("expected errors for no servers") + } + if resp.Plan.Primary != "" { + t.Fatalf("expected empty primary, got %q", resp.Plan.Primary) + } +} + +// TestQA_CP11B2_RF_ExceedsAvailable verifies clear warning when RF exceeds servers. +func TestQA_CP11B2_RF_ExceedsAvailable(t *testing.T) { + ms := &MasterServer{ + blockRegistry: NewBlockVolumeRegistry(), + blockAssignmentQueue: NewBlockAssignmentQueue(), + blockFailover: newBlockFailoverState(), + } + ms.blockRegistry.MarkBlockCapable("vs1:9333") + ms.blockRegistry.MarkBlockCapable("vs2:9333") + + body := `{"name":"test-vol","size_bytes":1073741824,"replica_factor":3}` + req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body)) + w := httptest.NewRecorder() + ms.blockVolumePlanHandler(w, req) + + var resp blockapi.VolumePlanResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + // Should have rf_not_satisfiable in warnings (partial replica possible) + found := false + for _, warning := range resp.Warnings { + if warning == ReasonRFNotSatisfiable { + found = true + } + } + if !found { + t.Fatalf("expected warning %q, got warnings=%v, errors=%v", ReasonRFNotSatisfiable, resp.Warnings, resp.Errors) + } +} + +// TestQA_CP11B2_PlanThenCreate_PolicyConsistency verifies that /resolve, /plan, +// and create all agree on the resolved policy for the same request. +func TestQA_CP11B2_PlanThenCreate_PolicyConsistency(t *testing.T) { + ms := qaPlanMaster(t) + + reqBody := `{"name":"policy-test","size_bytes":1073741824,"preset":"database"}` + + // Call /resolve + resolveReq := httptest.NewRequest(http.MethodPost, "/block/volume/resolve", strings.NewReader(reqBody)) + resolveW := httptest.NewRecorder() + ms.blockVolumeResolveHandler(resolveW, resolveReq) + var resolveResp blockapi.ResolvedPolicyResponse + json.NewDecoder(resolveW.Body).Decode(&resolveResp) + + // Call /plan + planReq := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(reqBody)) + planW := httptest.NewRecorder() + ms.blockVolumePlanHandler(planW, planReq) + var planResp blockapi.VolumePlanResponse + json.NewDecoder(planW.Body).Decode(&planResp) + + // Compare resolved policy fields + if resolveResp.Policy.DurabilityMode != planResp.ResolvedPolicy.DurabilityMode { + t.Fatalf("durability_mode mismatch: resolve=%q plan=%q", + resolveResp.Policy.DurabilityMode, planResp.ResolvedPolicy.DurabilityMode) + } + if resolveResp.Policy.ReplicaFactor != planResp.ResolvedPolicy.ReplicaFactor { + t.Fatalf("replica_factor mismatch: resolve=%d plan=%d", + resolveResp.Policy.ReplicaFactor, planResp.ResolvedPolicy.ReplicaFactor) + } + if resolveResp.Policy.DiskType != planResp.ResolvedPolicy.DiskType { + t.Fatalf("disk_type mismatch: resolve=%q plan=%q", + resolveResp.Policy.DiskType, planResp.ResolvedPolicy.DiskType) + } + if resolveResp.Policy.Preset != planResp.ResolvedPolicy.Preset { + t.Fatalf("preset mismatch: resolve=%q plan=%q", + resolveResp.Policy.Preset, planResp.ResolvedPolicy.Preset) + } +} + +// TestQA_CP11B2_PlanThenCreate_OrderedCandidateParity verifies that plan and create +// derive the same ordered candidate list from the same cluster state. +func TestQA_CP11B2_PlanThenCreate_OrderedCandidateParity(t *testing.T) { + ms := qaPlanMaster(t) + + // Record which servers create tries, in order. + var createAttempts []string + ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) { + createAttempts = append(createAttempts, server) + return &blockAllocResult{ + Path: fmt.Sprintf("/data/%s.blk", name), + IQN: fmt.Sprintf("iqn.2024.test:%s", name), + ISCSIAddr: server + ":3260", + ReplicaDataAddr: server + ":14260", + ReplicaCtrlAddr: server + ":14261", + RebuildListenAddr: server + ":15000", + }, nil + } + + // Get plan + body := `{"name":"parity-test","size_bytes":1073741824,"replica_factor":2}` + planReq := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body)) + planW := httptest.NewRecorder() + ms.blockVolumePlanHandler(planW, planReq) + var planResp blockapi.VolumePlanResponse + json.NewDecoder(planW.Body).Decode(&planResp) + + // Create volume — allocate will record attempt order + createReq := &master_pb.CreateBlockVolumeRequest{ + Name: "parity-test", + SizeBytes: 1073741824, + } + _, err := ms.CreateBlockVolume(context.Background(), createReq) + if err != nil { + t.Fatalf("create failed: %v", err) + } + + // createAttempts[0] = primary attempt, createAttempts[1] = replica attempt + if len(createAttempts) < 2 { + t.Fatalf("expected at least 2 allocations (primary + replica), got %d", len(createAttempts)) + } + + // Plan's candidate order should match create's attempt order. + // Primary is Candidates[0], replica is from remaining Candidates. + if planResp.Plan.Primary != createAttempts[0] { + t.Fatalf("primary mismatch: plan=%q create=%q", planResp.Plan.Primary, createAttempts[0]) + } +} + +// TestQA_CP11B2_PlanThenCreate_ReplicaOrderParity verifies that the plan's replica +// ordering matches the create path's replica attempt ordering. +func TestQA_CP11B2_PlanThenCreate_ReplicaOrderParity(t *testing.T) { + ms := qaPlanMaster(t) + + var allocOrder []string + ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) { + allocOrder = append(allocOrder, server) + return &blockAllocResult{ + Path: fmt.Sprintf("/data/%s.blk", name), + IQN: fmt.Sprintf("iqn.2024.test:%s", name), + ISCSIAddr: server + ":3260", + ReplicaDataAddr: server + ":14260", + ReplicaCtrlAddr: server + ":14261", + RebuildListenAddr: server + ":15000", + }, nil + } + + // Plan with RF=3 (all 3 servers) + body := `{"name":"replica-order","size_bytes":1073741824,"replica_factor":3}` + planReq := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body)) + planW := httptest.NewRecorder() + ms.blockVolumePlanHandler(planW, planReq) + var planResp blockapi.VolumePlanResponse + json.NewDecoder(planW.Body).Decode(&planResp) + + // Create + createReq := &master_pb.CreateBlockVolumeRequest{ + Name: "replica-order", + SizeBytes: 1073741824, + ReplicaFactor: 3, + } + _, err := ms.CreateBlockVolume(context.Background(), createReq) + if err != nil { + t.Fatalf("create failed: %v", err) + } + + // allocOrder: [primary, replica1, replica2] + if len(allocOrder) != 3 { + t.Fatalf("expected 3 allocations, got %d", len(allocOrder)) + } + + // Plan candidates should match allocation order + if len(planResp.Plan.Candidates) != 3 { + t.Fatalf("expected 3 plan candidates, got %d", len(planResp.Plan.Candidates)) + } + for i := 0; i < 3; i++ { + if planResp.Plan.Candidates[i] != allocOrder[i] { + t.Fatalf("candidate[%d] mismatch: plan=%q create=%q", + i, planResp.Plan.Candidates[i], allocOrder[i]) + } + } +} + +// TestQA_CP11B2_Create_FallbackOnRPCFailure verifies that when the first candidate +// fails RPC, create uses the next candidate from the same ordered list. +func TestQA_CP11B2_Create_FallbackOnRPCFailure(t *testing.T) { + ms := qaPlanMaster(t) + + callCount := 0 + ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) { + callCount++ + if callCount == 1 { + return nil, fmt.Errorf("simulated RPC failure") + } + return &blockAllocResult{ + Path: fmt.Sprintf("/data/%s.blk", name), + IQN: fmt.Sprintf("iqn.2024.test:%s", name), + ISCSIAddr: server + ":3260", + ReplicaDataAddr: server + ":14260", + ReplicaCtrlAddr: server + ":14261", + RebuildListenAddr: server + ":15000", + }, nil + } + + // Get plan to know expected order + body := `{"name":"fallback-test","size_bytes":1073741824,"replica_factor":1}` + planReq := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body)) + planW := httptest.NewRecorder() + ms.blockVolumePlanHandler(planW, planReq) + var planResp blockapi.VolumePlanResponse + json.NewDecoder(planW.Body).Decode(&planResp) + + if len(planResp.Plan.Candidates) < 2 { + t.Fatalf("need at least 2 candidates for fallback test, got %d", len(planResp.Plan.Candidates)) + } + expectedFallback := planResp.Plan.Candidates[1] + + // Create — first attempt fails, should fall back to second candidate + callCount = 0 + createReq := &master_pb.CreateBlockVolumeRequest{ + Name: "fallback-test", + SizeBytes: 1073741824, + } + resp, err := ms.CreateBlockVolume(context.Background(), createReq) + if err != nil { + t.Fatalf("create failed: %v", err) + } + + // The volume should be on the second candidate (fallback) + entry, ok := ms.blockRegistry.Lookup("fallback-test") + if !ok { + t.Fatal("volume not in registry") + } + if entry.VolumeServer != expectedFallback { + t.Fatalf("expected fallback to %q, got %q", expectedFallback, entry.VolumeServer) + } + _ = resp +} + +// TestQA_CP11B2_PlanIsReadOnly verifies that plan does not register volumes +// or enqueue assignments. +func TestQA_CP11B2_PlanIsReadOnly(t *testing.T) { + ms := qaPlanMaster(t) + + // Snapshot state before + _, existsBefore := ms.blockRegistry.Lookup("readonly-test") + queueBefore := ms.blockAssignmentQueue.TotalPending() + + body := `{"name":"readonly-test","size_bytes":1073741824,"replica_factor":2}` + req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body)) + w := httptest.NewRecorder() + ms.blockVolumePlanHandler(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + + // Verify no side effects + _, existsAfter := ms.blockRegistry.Lookup("readonly-test") + queueAfter := ms.blockAssignmentQueue.TotalPending() + + if existsBefore || existsAfter { + t.Fatal("plan should not register volume") + } + if queueAfter != queueBefore { + t.Fatalf("plan should not enqueue assignments: before=%d after=%d", queueBefore, queueAfter) + } +} + +// TestQA_CP11B2_RejectionReasonStability verifies that rejection reason strings +// match the defined constants — no typos, no ad-hoc strings. +func TestQA_CP11B2_RejectionReasonStability(t *testing.T) { + validReasons := map[string]bool{ + ReasonDiskTypeMismatch: true, + ReasonInsufficientSpace: true, + ReasonAlreadySelected: true, + ReasonNoViablePrimary: true, + ReasonRFNotSatisfiable: true, + } + + // Create a scenario that produces rejections + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0, DiskType: "ssd"}, + {Address: "vs2:9333", VolumeCount: 0, DiskType: "hdd"}, + {Address: "vs3:9333", VolumeCount: 0, AvailableBytes: 100}, // too small + } + result := evaluateBlockPlacement(candidates, 1, "ssd", 1<<30, blockvol.DurabilityBestEffort) + + for _, r := range result.Rejections { + if !validReasons[r.Reason] { + t.Fatalf("rejection reason %q is not a known constant", r.Reason) + } + } + for _, e := range result.Errors { + if !validReasons[e] { + t.Fatalf("error reason %q is not a known constant", e) + } + } +} + +// TestQA_CP11B2_DeterministicOrder_MultipleInvocations verifies that calling +// plan 10 times with the same state produces identical results each time. +func TestQA_CP11B2_DeterministicOrder_MultipleInvocations(t *testing.T) { + ms := qaPlanMaster(t) + + body := `{"name":"determ-test","size_bytes":1073741824,"replica_factor":2}` + + var firstResp blockapi.VolumePlanResponse + for i := 0; i < 10; i++ { + req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body)) + w := httptest.NewRecorder() + ms.blockVolumePlanHandler(w, req) + + var resp blockapi.VolumePlanResponse + json.NewDecoder(w.Body).Decode(&resp) + + if i == 0 { + firstResp = resp + continue + } + + // Compare with first response + if resp.Plan.Primary != firstResp.Plan.Primary { + t.Fatalf("invocation %d: primary %q != first %q", i, resp.Plan.Primary, firstResp.Plan.Primary) + } + if len(resp.Plan.Candidates) != len(firstResp.Plan.Candidates) { + t.Fatalf("invocation %d: candidate count %d != first %d", + i, len(resp.Plan.Candidates), len(firstResp.Plan.Candidates)) + } + for j := range resp.Plan.Candidates { + if resp.Plan.Candidates[j] != firstResp.Plan.Candidates[j] { + t.Fatalf("invocation %d: candidates[%d] %q != first %q", + i, j, resp.Plan.Candidates[j], firstResp.Plan.Candidates[j]) + } + } + } +} + +// ============================================================ +// CP11B-2 Review Round: Additional Adversarial Tests +// ============================================================ + +// QA-CP11B2-11: RF=0 treated as RF=1 (primary only, no replicas). +func TestQA_CP11B2_RF0_BehavesAsRF1(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0}, + {Address: "vs2:9333", VolumeCount: 0}, + } + result := evaluateBlockPlacement(candidates, 0, "", 0, blockvol.DurabilityBestEffort) + if result.Primary != "vs1:9333" { + t.Fatalf("primary: got %q, want vs1:9333", result.Primary) + } + if len(result.Replicas) != 0 { + t.Fatalf("replicas: got %d, want 0 for RF=0", len(result.Replicas)) + } + if len(result.Errors) != 0 { + t.Fatalf("unexpected errors for RF=0: %v", result.Errors) + } +} + +// QA-CP11B2-12: RF=1 with sync_all — no replica needed, no warning. +func TestQA_CP11B2_RF1_NoReplicaNeeded(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0}, + } + result := evaluateBlockPlacement(candidates, 1, "", 0, blockvol.DurabilitySyncAll) + if result.Primary != "vs1:9333" { + t.Fatalf("primary: got %q", result.Primary) + } + if len(result.Warnings) != 0 { + t.Fatalf("RF=1 should not warn about replicas: %v", result.Warnings) + } + if len(result.Errors) != 0 { + t.Fatalf("RF=1 should not error: %v", result.Errors) + } +} + +// QA-CP11B2-13: All candidates rejected by disk type → no_viable_primary. +func TestQA_CP11B2_AllRejected_DiskType(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0, DiskType: "hdd"}, + {Address: "vs2:9333", VolumeCount: 0, DiskType: "hdd"}, + } + result := evaluateBlockPlacement(candidates, 1, "ssd", 0, blockvol.DurabilityBestEffort) + if result.Primary != "" { + t.Fatalf("expected no primary, got %q", result.Primary) + } + if len(result.Rejections) != 2 { + t.Fatalf("expected 2 rejections, got %d", len(result.Rejections)) + } + foundErr := false + for _, e := range result.Errors { + if e == ReasonNoViablePrimary { + foundErr = true + } + } + if !foundErr { + t.Fatalf("expected %q, got %v", ReasonNoViablePrimary, result.Errors) + } +} + +// QA-CP11B2-14: All candidates rejected by capacity → no_viable_primary. +func TestQA_CP11B2_AllRejected_Capacity(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0, AvailableBytes: 1 << 20}, + {Address: "vs2:9333", VolumeCount: 0, AvailableBytes: 2 << 20}, + } + result := evaluateBlockPlacement(candidates, 1, "", 100<<20, blockvol.DurabilityBestEffort) + if result.Primary != "" { + t.Fatalf("expected no primary, got %q", result.Primary) + } + if len(result.Rejections) != 2 { + t.Fatalf("expected 2 rejections, got %d", len(result.Rejections)) + } +} + +// QA-CP11B2-15: Mixed rejections — disk + capacity + eligible. +func TestQA_CP11B2_MixedRejections(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0, DiskType: "hdd", AvailableBytes: 100 << 30}, + {Address: "vs2:9333", VolumeCount: 0, DiskType: "ssd", AvailableBytes: 1 << 20}, + {Address: "vs3:9333", VolumeCount: 0, DiskType: "ssd", AvailableBytes: 100 << 30}, + {Address: "vs4:9333", VolumeCount: 5, DiskType: "ssd", AvailableBytes: 100 << 30}, + } + result := evaluateBlockPlacement(candidates, 1, "ssd", 50<<30, blockvol.DurabilityBestEffort) + if result.Primary != "vs3:9333" { + t.Fatalf("primary: got %q, want vs3:9333", result.Primary) + } + if len(result.Rejections) != 2 { + t.Fatalf("expected 2 rejections, got %d", len(result.Rejections)) + } + reasons := map[string]string{} + for _, r := range result.Rejections { + reasons[r.Server] = r.Reason + } + if reasons["vs1:9333"] != ReasonDiskTypeMismatch { + t.Fatalf("vs1: got %q", reasons["vs1:9333"]) + } + if reasons["vs2:9333"] != ReasonInsufficientSpace { + t.Fatalf("vs2: got %q", reasons["vs2:9333"]) + } +} + +// QA-CP11B2-16: sync_quorum RF=3 filtered to 2 — quorum met, warn not error. +func TestQA_CP11B2_SyncQuorum_RF3_FilteredTo2(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0, DiskType: "ssd"}, + {Address: "vs2:9333", VolumeCount: 0, DiskType: "ssd"}, + {Address: "vs3:9333", VolumeCount: 0, DiskType: "hdd"}, + } + result := evaluateBlockPlacement(candidates, 3, "ssd", 0, blockvol.DurabilitySyncQuorum) + if len(result.Errors) != 0 { + t.Fatalf("quorum met, should not error: %v", result.Errors) + } + foundWarn := false + for _, w := range result.Warnings { + if w == ReasonRFNotSatisfiable { + foundWarn = true + } + } + if !foundWarn { + t.Fatalf("expected rf_not_satisfiable warning, got %v", result.Warnings) + } +} + +// QA-CP11B2-17: Unknown DiskType passes any filter. +func TestQA_CP11B2_UnknownDiskType_PassesFilter(t *testing.T) { + candidates := []PlacementCandidateInfo{ + {Address: "vs1:9333", VolumeCount: 0, DiskType: ""}, + {Address: "vs2:9333", VolumeCount: 0, DiskType: "ssd"}, + {Address: "vs3:9333", VolumeCount: 0, DiskType: "hdd"}, + } + result := evaluateBlockPlacement(candidates, 1, "ssd", 0, blockvol.DurabilityBestEffort) + if len(result.Candidates) != 2 { + t.Fatalf("expected 2 eligible, got %d: %v", len(result.Candidates), result.Candidates) + } + if result.Primary != "vs1:9333" { + t.Fatalf("primary: got %q, want vs1:9333 (unknown passes)", result.Primary) + } +} + +// QA-CP11B2-18: 50-server list — deterministic ordering. +func TestQA_CP11B2_LargeCandidateList(t *testing.T) { + candidates := make([]PlacementCandidateInfo, 50) + for i := range candidates { + candidates[i] = PlacementCandidateInfo{ + Address: fmt.Sprintf("vs%02d:9333", i), + VolumeCount: i % 5, + } + } + result := evaluateBlockPlacement(candidates, 3, "", 0, blockvol.DurabilityBestEffort) + if result.Primary != "vs00:9333" { + t.Fatalf("primary: got %q, want vs00:9333", result.Primary) + } + if result.Replicas[0] != "vs05:9333" { + t.Fatalf("replica[0]: got %q, want vs05:9333", result.Replicas[0]) + } + if len(result.Candidates) != 50 { + t.Fatalf("candidates: got %d, want 50", len(result.Candidates)) + } + result2 := evaluateBlockPlacement(candidates, 3, "", 0, blockvol.DurabilityBestEffort) + if result.Primary != result2.Primary { + t.Fatalf("not deterministic") + } +} + +// QA-CP11B2-19: Failed primary still tried as replica. +func TestQA_CP11B2_FailedPrimary_TriedAsReplica(t *testing.T) { + ms := qaPlanMaster(t) + var allocLog []string + callCount := 0 + ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) { + allocLog = append(allocLog, server) + callCount++ + if callCount == 1 { + return nil, fmt.Errorf("simulated primary failure") + } + return &blockAllocResult{ + Path: fmt.Sprintf("/data/%s.blk", name), IQN: fmt.Sprintf("iqn.test:%s", name), + ISCSIAddr: server + ":3260", ReplicaDataAddr: server + ":14260", + ReplicaCtrlAddr: server + ":14261", RebuildListenAddr: server + ":15000", + }, nil + } + _, err := ms.CreateBlockVolume(context.Background(), &master_pb.CreateBlockVolumeRequest{ + Name: "fallback-replica", SizeBytes: 1 << 30, ReplicaFactor: 2, + }) + if err != nil { + t.Fatalf("create failed: %v", err) + } + entry, ok := ms.blockRegistry.Lookup("fallback-replica") + if !ok { + t.Fatal("not in registry") + } + if entry.VolumeServer == allocLog[0] { + t.Fatalf("primary should not be the failed server %q", allocLog[0]) + } +} + +// QA-CP11B2-20: Plan with invalid preset — errors, not panic. +func TestQA_CP11B2_PlanWithInvalidPreset(t *testing.T) { + ms := qaPlanMaster(t) + body := `{"name":"bad","size_bytes":1073741824,"preset":"nonexistent"}` + req := httptest.NewRequest(http.MethodPost, "/block/volume/plan", strings.NewReader(body)) + w := httptest.NewRecorder() + ms.blockVolumePlanHandler(w, req) + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + var resp blockapi.VolumePlanResponse + json.NewDecoder(w.Body).Decode(&resp) + if len(resp.Errors) == 0 { + t.Fatal("expected errors for invalid preset") + } + if resp.Plan.Candidates == nil { + t.Fatal("candidates must never be nil") + } +} diff --git a/weed/storage/blockvol/blockapi/client.go b/weed/storage/blockvol/blockapi/client.go index fbbec1a9a..dc2ca1460 100644 --- a/weed/storage/blockvol/blockapi/client.go +++ b/weed/storage/blockvol/blockapi/client.go @@ -212,6 +212,27 @@ func (c *Client) ResolvePolicy(ctx context.Context, req CreateVolumeRequest) (*R return &out, nil } +// PlanVolume requests a placement plan without creating a volume. +func (c *Client) PlanVolume(ctx context.Context, req CreateVolumeRequest) (*VolumePlanResponse, error) { + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + resp, err := c.doRequest(ctx, http.MethodPost, "/block/volume/plan", bytes.NewReader(body)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if err := checkStatus(resp, http.StatusOK); err != nil { + return nil, err + } + var out VolumePlanResponse + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode response: %w", err) + } + return &out, nil +} + // ListServers lists all block-capable volume servers. func (c *Client) ListServers(ctx context.Context) ([]ServerInfo, error) { resp, err := c.doRequest(ctx, http.MethodGet, "/block/servers", nil) diff --git a/weed/storage/blockvol/blockapi/types.go b/weed/storage/blockvol/blockapi/types.go index f41d8b7a9..04aed511a 100644 --- a/weed/storage/blockvol/blockapi/types.go +++ b/weed/storage/blockvol/blockapi/types.go @@ -42,6 +42,8 @@ type VolumeInfo struct { Preset string `json:"preset,omitempty"` // CP11B-1: preset used at creation NvmeAddr string `json:"nvme_addr,omitempty"` NQN string `json:"nqn,omitempty"` + // CP11B-4: Operator-facing health state. + HealthState string `json:"health_state"` // "healthy", "degraded", "rebuilding", "unsafe" } // ResolvedPolicyResponse is the response for POST /block/volume/resolve. @@ -123,6 +125,12 @@ type BlockStatusResponse struct { FailoversTotal int64 `json:"failovers_total"` RebuildsTotal int64 `json:"rebuilds_total"` AssignmentQueueDepth int `json:"assignment_queue_depth"` + // CP11B-4: Operator summary fields. + HealthyCount int `json:"healthy_count"` + DegradedCount int `json:"degraded_count"` + RebuildingCount int `json:"rebuilding_count"` + UnsafeCount int `json:"unsafe_count"` + NvmeCapableServers int `json:"nvme_capable_servers"` } // PreflightRejection describes why a specific replica was rejected for promotion. @@ -144,6 +152,30 @@ type PreflightResponse struct { PrimaryAlive bool `json:"primary_alive"` } +// VolumePlanResponse is the response for POST /block/volume/plan. +type VolumePlanResponse struct { + ResolvedPolicy ResolvedPolicyView `json:"resolved_policy"` + Plan VolumePlanView `json:"plan"` + Warnings []string `json:"warnings,omitempty"` + Errors []string `json:"errors,omitempty"` +} + +// VolumePlanView describes the placement plan. +// Candidates is the full ordered eligible list and is always present (empty slice, never omitted). +// ReplicaFactor means total copies including primary: RF=2 → 1 primary + 1 replica. +type VolumePlanView struct { + Primary string `json:"primary"` + Replicas []string `json:"replicas,omitempty"` + Candidates []string `json:"candidates"` + Rejections []VolumePlanRejection `json:"rejections,omitempty"` +} + +// VolumePlanRejection explains why a candidate server was not selected. +type VolumePlanRejection struct { + Server string `json:"server"` + Reason string `json:"reason"` +} + // RoleFromString converts a role string to its uint32 wire value. // Returns 0 (RoleNone) for unrecognized strings. func RoleFromString(s string) uint32 {