diff --git a/weed/pb/master.proto b/weed/pb/master.proto index 7b32a73f1..99dd97ef0 100644 --- a/weed/pb/master.proto +++ b/weed/pb/master.proto @@ -110,6 +110,8 @@ message Heartbeat { repeated BlockVolumeShortInfoMessage new_block_volumes = 25; repeated BlockVolumeShortInfoMessage deleted_block_volumes = 26; bool has_no_block_volumes = 27; + // server-level NVMe/TCP target address (empty if NVMe disabled on this VS) + string block_nvme_addr = 28; } message HeartbeatResponse { diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go index 6ceaf2ebb..6c330a8b5 100644 --- a/weed/pb/master_pb/master.pb.go +++ b/weed/pb/master_pb/master.pb.go @@ -54,8 +54,10 @@ type Heartbeat struct { NewBlockVolumes []*BlockVolumeShortInfoMessage `protobuf:"bytes,25,rep,name=new_block_volumes,json=newBlockVolumes,proto3" json:"new_block_volumes,omitempty"` DeletedBlockVolumes []*BlockVolumeShortInfoMessage `protobuf:"bytes,26,rep,name=deleted_block_volumes,json=deletedBlockVolumes,proto3" json:"deleted_block_volumes,omitempty"` HasNoBlockVolumes bool `protobuf:"varint,27,opt,name=has_no_block_volumes,json=hasNoBlockVolumes,proto3" json:"has_no_block_volumes,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // server-level NVMe/TCP target address (empty if NVMe disabled on this VS) + BlockNvmeAddr string `protobuf:"bytes,28,opt,name=block_nvme_addr,json=blockNvmeAddr,proto3" json:"block_nvme_addr,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Heartbeat) Reset() { @@ -256,6 +258,13 @@ func (x *Heartbeat) GetHasNoBlockVolumes() bool { return false } +func (x *Heartbeat) GetBlockNvmeAddr() string { + if x != nil { + return x.BlockNvmeAddr + } + return "" +} + type HeartbeatResponse struct { state protoimpl.MessageState `protogen:"open.v1"` VolumeSizeLimit uint64 `protobuf:"varint,1,opt,name=volume_size_limit,json=volumeSizeLimit,proto3" json:"volume_size_limit,omitempty"` @@ -5478,7 +5487,7 @@ var File_master_proto protoreflect.FileDescriptor const file_master_proto_rawDesc = "" + "\n" + - "\fmaster.proto\x12\tmaster_pb\x1a\x13volume_server.proto\"\xbd\n" + + "\fmaster.proto\x12\tmaster_pb\x1a\x13volume_server.proto\"\xe5\n" + "\n" + "\tHeartbeat\x12\x0e\n" + "\x02ip\x18\x01 \x01(\tR\x02ip\x12\x12\n" + @@ -5510,7 +5519,8 @@ const file_master_proto_rawDesc = "" + "\x12block_volume_infos\x18\x18 \x03(\v2!.master_pb.BlockVolumeInfoMessageR\x10blockVolumeInfos\x12R\n" + "\x11new_block_volumes\x18\x19 \x03(\v2&.master_pb.BlockVolumeShortInfoMessageR\x0fnewBlockVolumes\x12Z\n" + "\x15deleted_block_volumes\x18\x1a \x03(\v2&.master_pb.BlockVolumeShortInfoMessageR\x13deletedBlockVolumes\x12/\n" + - "\x14has_no_block_volumes\x18\x1b \x01(\bR\x11hasNoBlockVolumes\x1aB\n" + + "\x14has_no_block_volumes\x18\x1b \x01(\bR\x11hasNoBlockVolumes\x12&\n" + + "\x0fblock_nvme_addr\x18\x1c \x01(\tR\rblockNvmeAddr\x1aB\n" + "\x14MaxVolumeCountsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\rR\x05value:\x028\x01\"\xa9\x03\n" + diff --git a/weed/server/master_block_registry.go b/weed/server/master_block_registry.go index 9155e26a6..7910d0cf1 100644 --- a/weed/server/master_block_registry.go +++ b/weed/server/master_block_registry.go @@ -79,6 +79,9 @@ type BlockVolumeEntry struct { // CP8-3-1: Durability mode. DurabilityMode string // "best_effort", "sync_all", "sync_quorum" + // CP11B-1: Provisioning preset (control-plane metadata only). + Preset string // "database", "general", "throughput", or "" + // Lease tracking for failover (CP6-3 F2). LastLeaseGrant time.Time LeaseTTL time.Duration @@ -137,7 +140,7 @@ type BlockVolumeRegistry struct { mu sync.RWMutex volumes map[string]*BlockVolumeEntry // keyed by name byServer map[string]map[string]bool // server -> set of volume names - blockServers map[string]bool // servers known to support block volumes + blockServers map[string]*blockServerInfo // servers known to support block volumes // Promotion eligibility: max WAL LSN lag for replica to be promotable. promotionLSNTolerance uint64 @@ -153,12 +156,18 @@ type BlockVolumeRegistry struct { type inflightEntry struct{} +// blockServerInfo tracks server-level capabilities reported via heartbeat. +type blockServerInfo struct { + NvmeAddr string // NVMe/TCP listen address; empty if NVMe disabled +} + + // NewBlockVolumeRegistry creates an empty registry. func NewBlockVolumeRegistry() *BlockVolumeRegistry { return &BlockVolumeRegistry{ volumes: make(map[string]*BlockVolumeEntry), byServer: make(map[string]map[string]bool), - blockServers: make(map[string]bool), + blockServers: make(map[string]*blockServerInfo), promotionLSNTolerance: DefaultPromotionLSNTolerance, } } @@ -302,12 +311,12 @@ func (r *BlockVolumeRegistry) ListByServer(server string) []*BlockVolumeEntry { // Called on the first heartbeat from a volume server. // Marks reported volumes as Active, removes entries for this server // that are not reported (stale). -func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master_pb.BlockVolumeInfoMessage) { +func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master_pb.BlockVolumeInfoMessage, nvmeAddr string) { r.mu.Lock() defer r.mu.Unlock() - // Mark server as block-capable since it sent block volume info. - r.blockServers[server] = true + // Mark server as block-capable and record server-level NVMe capability. + r.blockServers[server] = &blockServerInfo{NvmeAddr: nvmeAddr} // Build set of reported paths. reported := make(map[string]*master_pb.BlockVolumeInfoMessage, len(infos)) @@ -922,7 +931,7 @@ func (r *BlockVolumeRegistry) evaluatePromotionLocked(entry *BlockVolumeEntry) P continue } // Gate 4: server must be alive (in blockServers set) — B-12 fix. - if !r.blockServers[ri.Server] { + if r.blockServers[ri.Server] == nil { result.Rejections = append(result.Rejections, PromotionRejection{ Server: ri.Server, Reason: "server_dead", @@ -1073,7 +1082,7 @@ func (r *BlockVolumeRegistry) evaluateManualPromotionLocked(entry *BlockVolumeEn } // Primary-alive gate (soft — skipped when force=true). - if !force && r.blockServers[entry.VolumeServer] { + if !force && r.blockServers[entry.VolumeServer] != nil { result.Reason = "primary_alive" return result } @@ -1152,7 +1161,7 @@ func (r *BlockVolumeRegistry) evaluateManualPromotionLocked(entry *BlockVolumeEn } // Hard gate: server must be alive (in blockServers set). - if !r.blockServers[ri.Server] { + if r.blockServers[ri.Server] == nil { result.Rejections = append(result.Rejections, PromotionRejection{ Server: ri.Server, Reason: "server_dead", @@ -1227,7 +1236,9 @@ func (r *BlockVolumeRegistry) ManualPromote(name, targetServer string, force boo // MarkBlockCapable records that the given server supports block volumes. func (r *BlockVolumeRegistry) MarkBlockCapable(server string) { r.mu.Lock() - r.blockServers[server] = true + if r.blockServers[server] == nil { + r.blockServers[server] = &blockServerInfo{} + } r.mu.Unlock() } @@ -1330,7 +1341,7 @@ func (r *BlockVolumeRegistry) ServerSummaries() []BlockServerSummary { func (r *BlockVolumeRegistry) IsBlockCapable(server string) bool { r.mu.RLock() defer r.mu.RUnlock() - return r.blockServers[server] + return r.blockServers[server] != nil } // VolumesWithDeadPrimary returns names of volumes where the given server is a replica @@ -1354,13 +1365,26 @@ func (r *BlockVolumeRegistry) VolumesWithDeadPrimary(replicaServer string) []str continue } // Check if the primary server is dead. - if !r.blockServers[entry.VolumeServer] { + if r.blockServers[entry.VolumeServer] == nil { orphaned = append(orphaned, name) } } return orphaned } +// HasNVMeCapableServer returns true if any registered block-capable server +// has reported a non-empty NVMe address via heartbeat. +func (r *BlockVolumeRegistry) HasNVMeCapableServer() bool { + r.mu.RLock() + defer r.mu.RUnlock() + for _, info := range r.blockServers { + if info != nil && info.NvmeAddr != "" { + return true + } + } + return false +} + // BlockCapableServers returns the list of servers known to support block volumes. func (r *BlockVolumeRegistry) BlockCapableServers() []string { r.mu.RLock() diff --git a/weed/server/master_block_registry_test.go b/weed/server/master_block_registry_test.go index bea8061b1..489fbf30c 100644 --- a/weed/server/master_block_registry_test.go +++ b/weed/server/master_block_registry_test.go @@ -93,7 +93,7 @@ func TestRegistry_UpdateFullHeartbeat(t *testing.T) { // Full heartbeat reports only vol1 (vol2 is stale). r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{ {Path: "/v1.blk", Epoch: 5, Role: 1}, - }) + }, "") // vol1 should be Active. e1, ok := r.Lookup("vol1") @@ -149,7 +149,7 @@ func TestRegistry_PendingToActive(t *testing.T) { // Full heartbeat confirms the volume. r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{ {Path: "/v1.blk", Epoch: 1, Role: 1}, - }) + }, "") e, _ := r.Lookup("vol1") if e.Status != StatusActive { @@ -241,7 +241,7 @@ func TestRegistry_FullHeartbeatUpdatesSizeBytes(t *testing.T) { // Heartbeat with updated size (online resize). r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{ {Path: "/v1.blk", VolumeSize: 2 << 30, Epoch: 1, Role: 1}, - }) + }, "") e, _ := r.Lookup("vol1") if e.SizeBytes != 2<<30 { @@ -421,7 +421,7 @@ func TestFullHeartbeat_UpdatesReplicaAddrs(t *testing.T) { ReplicaDataAddr: "10.0.0.2:14260", ReplicaCtrlAddr: "10.0.0.2:14261", }, - }) + }, "") entry, ok := r.Lookup("vol1") if !ok { @@ -730,7 +730,7 @@ func TestRegistry_FullHeartbeat_UpdatesHealthScore(t *testing.T) { ScrubErrors: 2, WalHeadLsn: 500, }, - }) + }, "") e, _ := r.Lookup("vol1") if e.HealthScore != 0.85 { @@ -757,7 +757,7 @@ func TestRegistry_ReplicaHeartbeat_DoesNotDeleteVolume(t *testing.T) { // Replica sends heartbeat reporting its path. r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{ {Path: "/data/vol1.blk", Epoch: 1, Role: 2}, - }) + }, "") // Volume must still exist with primary intact. e, ok := r.Lookup("vol1") @@ -784,7 +784,7 @@ func TestRegistry_ReplicaHeartbeat_StaleReplicaRemoved(t *testing.T) { }) // replica1 heartbeat does NOT report vol1 path → stale replica. - r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{}) + r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{}, "") // Volume still exists, but replica1 removed. e, ok := r.Lookup("vol1") @@ -813,7 +813,7 @@ func TestRegistry_ReplicaHeartbeat_ReconstructsAfterRestart(t *testing.T) { // Replica heartbeat arrives — vol1 exists but has no record of this server. r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{ {Path: "/data/vol1.blk", Epoch: 1, Role: 2, HealthScore: 0.95, WalHeadLsn: 42}, - }) + }, "") // vol1 should now have replica1 in Replicas[]. e, ok := r.Lookup("vol1") diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 59f5a9aa8..34c1142e2 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -277,7 +277,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ // (BlockVolumeInfos on first heartbeat) or deltas (NewBlockVolumes/DeletedBlockVolumes // on subsequent heartbeats), never both in the same message. if len(heartbeat.BlockVolumeInfos) > 0 || heartbeat.HasNoBlockVolumes { - ms.blockRegistry.UpdateFullHeartbeat(dn.Url(), heartbeat.BlockVolumeInfos) + ms.blockRegistry.UpdateFullHeartbeat(dn.Url(), heartbeat.BlockVolumeInfos, heartbeat.BlockNvmeAddr) // T2 (B-06): After updating registry from heartbeat, check if this server // is a replica for any volume whose primary is dead. If so, promote. ms.reevaluateOrphanedPrimaries(dn.Url()) diff --git a/weed/server/master_grpc_server_block_test.go b/weed/server/master_grpc_server_block_test.go index f82f9a818..ed1abf80f 100644 --- a/weed/server/master_grpc_server_block_test.go +++ b/weed/server/master_grpc_server_block_test.go @@ -1542,7 +1542,7 @@ func TestMaster_ExpandCoordinated_HeartbeatSuppressedAfterPartialCommit(t *testi Epoch: 1, Role: 1, }, - }) + }, "") // Registry size must still be the OLD size — heartbeat must not leak the new size. entry, _ = ms.blockRegistry.Lookup("hb-vol") @@ -1778,7 +1778,7 @@ func TestMaster_ExpandCoordinated_B10_HeartbeatDoesNotDeleteDuringExpand(t *test // the entry from the registry. ms.blockRegistry.UpdateFullHeartbeat(primaryServer, []*master_pb.BlockVolumeInfoMessage{ // Empty: primary restarted and hasn't loaded this volume yet. - }) + }, "") // Entry must still exist — expand is in progress. _, ok := ms.blockRegistry.Lookup("b10-vol") diff --git a/weed/server/master_server.go b/weed/server/master_server.go index ac57ae1bf..8b8fe9006 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -227,6 +227,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se r.HandleFunc("/block/volume/{name}", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeDeleteHandler)))).Methods("DELETE") r.HandleFunc("/block/volume/{name}", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeLookupHandler))).Methods("GET") r.HandleFunc("/block/volumes", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeListHandler))).Methods("GET") + r.HandleFunc("/block/volume/resolve", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeResolveHandler))).Methods("POST") r.HandleFunc("/block/volume/{name}/expand", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeExpandHandler)))).Methods("POST") r.HandleFunc("/block/volume/{name}/preflight", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumePreflightHandler))).Methods("GET") r.HandleFunc("/block/volume/{name}/promote", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumePromoteHandler)))).Methods("POST") diff --git a/weed/server/master_server_handlers_block.go b/weed/server/master_server_handlers_block.go index fde6181d6..8d7325418 100644 --- a/weed/server/master_server_handlers_block.go +++ b/weed/server/master_server_handlers_block.go @@ -13,6 +13,16 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/blockapi" ) +// buildEnvironmentInfo constructs a blockvol.EnvironmentInfo from registry state. +func (ms *MasterServer) buildEnvironmentInfo() blockvol.EnvironmentInfo { + return blockvol.EnvironmentInfo{ + NVMeAvailable: ms.blockRegistry.HasNVMeCapableServer(), + ServerCount: len(ms.blockRegistry.BlockCapableServers()), + WALSizeDefault: 64 << 20, // engine default + BlockSizeDefault: 4096, // engine default + } +} + // blockVolumeCreateHandler handles POST /block/volume. func (ms *MasterServer) blockVolumeCreateHandler(w http.ResponseWriter, r *http.Request) { var req blockapi.CreateVolumeRequest @@ -27,29 +37,32 @@ func (ms *MasterServer) blockVolumeCreateHandler(w http.ResponseWriter, r *http. replicaPlacement = "000" } - // Pre-validate durability_mode (cosmetic — real validation is in gRPC handler). - if req.DurabilityMode != "" { - if _, perr := blockvol.ParseDurabilityMode(req.DurabilityMode); perr != nil { - writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("invalid durability_mode: %w", perr)) - return - } + // Resolve preset + overrides. + env := ms.buildEnvironmentInfo() + resolved := blockvol.ResolvePolicy(blockvol.PresetName(req.Preset), + req.DurabilityMode, req.ReplicaFactor, req.DiskType, env) + if len(resolved.Errors) > 0 { + writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("%s", resolved.Errors[0])) + return } + // Use resolved values for the gRPC call. resp, err := ms.CreateBlockVolume(r.Context(), &master_pb.CreateBlockVolumeRequest{ Name: req.Name, SizeBytes: req.SizeBytes, - DiskType: req.DiskType, - DurabilityMode: req.DurabilityMode, - ReplicaFactor: uint32(req.ReplicaFactor), + DiskType: resolved.Policy.DiskType, + DurabilityMode: resolved.Policy.DurabilityMode, + ReplicaFactor: uint32(resolved.Policy.ReplicaFactor), }) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return } - // Store replica_placement on the registry entry. + // Store replica_placement and preset on the registry entry. if entry, ok := ms.blockRegistry.Lookup(resp.VolumeId); ok { entry.ReplicaPlacement = replicaPlacement + entry.Preset = req.Preset } // Look up the full entry to populate all fields. @@ -67,6 +80,37 @@ func (ms *MasterServer) blockVolumeCreateHandler(w http.ResponseWriter, r *http. writeJsonQuiet(w, r, http.StatusOK, info) } +// blockVolumeResolveHandler handles POST /block/volume/resolve. +// Diagnostic endpoint: always returns 200, even with errors[]. +func (ms *MasterServer) blockVolumeResolveHandler(w http.ResponseWriter, r *http.Request) { + var req blockapi.CreateVolumeRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("invalid request body: %w", err)) + return + } + + env := ms.buildEnvironmentInfo() + resolved := blockvol.ResolvePolicy(blockvol.PresetName(req.Preset), + req.DurabilityMode, req.ReplicaFactor, req.DiskType, env) + + resp := blockapi.ResolvedPolicyResponse{ + Policy: blockapi.ResolvedPolicyView{ + Preset: string(resolved.Policy.Preset), + DurabilityMode: resolved.Policy.DurabilityMode, + ReplicaFactor: resolved.Policy.ReplicaFactor, + DiskType: resolved.Policy.DiskType, + TransportPreference: resolved.Policy.TransportPref, + WorkloadHint: resolved.Policy.WorkloadHint, + WALSizeRecommended: resolved.Policy.WALSizeRecommended, + StorageProfile: resolved.Policy.StorageProfile, + }, + Overrides: resolved.Overrides, + Warnings: resolved.Warnings, + Errors: resolved.Errors, + } + writeJsonQuiet(w, r, http.StatusOK, resp) +} + // blockVolumeDeleteHandler handles DELETE /block/volume/{name}. func (ms *MasterServer) blockVolumeDeleteHandler(w http.ResponseWriter, r *http.Request) { name := mux.Vars(r)["name"] @@ -333,6 +377,7 @@ func entryToVolumeInfo(e *BlockVolumeEntry) blockapi.VolumeInfo { HealthScore: e.HealthScore, ReplicaDegraded: e.ReplicaDegraded, DurabilityMode: durMode, + Preset: e.Preset, NvmeAddr: e.NvmeAddr, NQN: e.NQN, } diff --git a/weed/server/qa_block_cp11b1_adversarial_test.go b/weed/server/qa_block_cp11b1_adversarial_test.go new file mode 100644 index 000000000..d49cf318c --- /dev/null +++ b/weed/server/qa_block_cp11b1_adversarial_test.go @@ -0,0 +1,590 @@ +package weed_server + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/blockapi" +) + +// ============================================================ +// CP11B-1 QA Adversarial Tests +// +// Provisioning Presets + Resolved Policy View +// ============================================================ + +// qaPresetMaster creates a MasterServer with stub allocators for preset tests. +func qaPresetMaster(t *testing.T) *MasterServer { + t.Helper() + ms := &MasterServer{ + blockRegistry: NewBlockVolumeRegistry(), + blockAssignmentQueue: NewBlockAssignmentQueue(), + blockFailover: newBlockFailoverState(), + } + ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) { + return &blockAllocResult{ + Path: fmt.Sprintf("/data/%s.blk", name), + IQN: fmt.Sprintf("iqn.2024.test:%s", name), + ISCSIAddr: server + ":3260", + ReplicaDataAddr: server + ":14260", + ReplicaCtrlAddr: server + ":14261", + RebuildListenAddr: server + ":15000", + }, nil + } + ms.blockVSDelete = func(ctx context.Context, server string, name string) error { + return nil + } + ms.blockVSExpand = func(ctx context.Context, server string, name string, newSize uint64) (uint64, error) { + return newSize, nil + } + ms.blockVSPrepareExpand = func(ctx context.Context, server string, name string, newSize, expandEpoch uint64) error { + return nil + } + ms.blockVSCommitExpand = func(ctx context.Context, server string, name string, expandEpoch uint64) (uint64, error) { + return 2 << 30, nil + } + ms.blockVSCancelExpand = func(ctx context.Context, server string, name string, expandEpoch uint64) error { + return nil + } + ms.blockRegistry.MarkBlockCapable("vs1:9333") + ms.blockRegistry.MarkBlockCapable("vs2:9333") + return ms +} + +// --- QA-CP11B1-1: Database preset produces correct defaults --- +func TestQA_CP11B1_DatabasePreset_Defaults(t *testing.T) { + r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "", 0, "", blockvol.EnvironmentInfo{ + NVMeAvailable: true, ServerCount: 3, WALSizeDefault: 128 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + p := r.Policy + if p.DurabilityMode != "sync_all" { + t.Errorf("durability_mode = %q, want sync_all", p.DurabilityMode) + } + if p.ReplicaFactor != 2 { + t.Errorf("replica_factor = %d, want 2", p.ReplicaFactor) + } + if p.DiskType != "ssd" { + t.Errorf("disk_type = %q, want ssd", p.DiskType) + } + if p.TransportPref != "nvme" { + t.Errorf("transport = %q, want nvme", p.TransportPref) + } + if p.WALSizeRecommended != 128<<20 { + t.Errorf("wal_rec = %d, want %d", p.WALSizeRecommended, 128<<20) + } + if p.StorageProfile != "single" { + t.Errorf("storage_profile = %q, want single", p.StorageProfile) + } + if len(r.Overrides) != 0 { + t.Errorf("overrides = %v, want empty", r.Overrides) + } +} + +// --- QA-CP11B1-2: Override precedence — durability wins over preset --- +func TestQA_CP11B1_OverridePrecedence_DurabilityWins(t *testing.T) { + r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "best_effort", 0, "", blockvol.EnvironmentInfo{ + NVMeAvailable: true, ServerCount: 2, WALSizeDefault: 128 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + if r.Policy.DurabilityMode != "best_effort" { + t.Errorf("durability_mode = %q, want best_effort (override)", r.Policy.DurabilityMode) + } + found := false + for _, o := range r.Overrides { + if o == "durability_mode" { + found = true + } + } + if !found { + t.Errorf("overrides = %v, want durability_mode present", r.Overrides) + } +} + +// --- QA-CP11B1-3: Invalid preset rejected --- +func TestQA_CP11B1_InvalidPreset_Rejected(t *testing.T) { + r := blockvol.ResolvePolicy("nosuch", "", 0, "", blockvol.EnvironmentInfo{}) + if len(r.Errors) == 0 { + t.Fatal("expected error for unknown preset") + } + if !strings.Contains(r.Errors[0], "unknown preset") { + t.Errorf("error = %q, want to contain 'unknown preset'", r.Errors[0]) + } +} + +// --- QA-CP11B1-4: sync_quorum + RF=2 rejected --- +func TestQA_CP11B1_SyncQuorum_RF2_Rejected(t *testing.T) { + r := blockvol.ResolvePolicy("", "sync_quorum", 2, "", blockvol.EnvironmentInfo{ + ServerCount: 3, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) == 0 { + t.Fatal("expected error for sync_quorum + RF=2") + } + if !strings.Contains(r.Errors[0], "replica_factor >= 3") { + t.Errorf("error = %q, want sync_quorum RF constraint", r.Errors[0]) + } +} + +// --- QA-CP11B1-5: NVMe preferred but unavailable → warning --- +func TestQA_CP11B1_NVMePref_NoNVMe_Warning(t *testing.T) { + r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "", 0, "", blockvol.EnvironmentInfo{ + NVMeAvailable: false, ServerCount: 2, WALSizeDefault: 128 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + found := false + for _, w := range r.Warnings { + if strings.Contains(w, "NVMe") { + found = true + } + } + if !found { + t.Errorf("expected NVMe warning, got: %v", r.Warnings) + } +} + +// --- QA-CP11B1-6: No preset, explicit fields → backward compat --- +func TestQA_CP11B1_NoPreset_BackwardCompat(t *testing.T) { + r := blockvol.ResolvePolicy("", "sync_all", 2, "hdd", blockvol.EnvironmentInfo{ + ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + if r.Policy.Preset != "" { + t.Errorf("preset = %q, want empty", r.Policy.Preset) + } + if r.Policy.DurabilityMode != "sync_all" { + t.Errorf("durability_mode = %q, want sync_all", r.Policy.DurabilityMode) + } + if r.Policy.ReplicaFactor != 2 { + t.Errorf("replica_factor = %d, want 2", r.Policy.ReplicaFactor) + } + if r.Policy.DiskType != "hdd" { + t.Errorf("disk_type = %q, want hdd", r.Policy.DiskType) + } +} + +// --- QA-CP11B1-7: Resolve endpoint returns 200 with correct fields --- +func TestQA_CP11B1_ResolveHandler_HTTP(t *testing.T) { + ms := qaPresetMaster(t) + + body, _ := json.Marshal(blockapi.CreateVolumeRequest{ + Preset: "database", + }) + req := httptest.NewRequest(http.MethodPost, "/block/volume/resolve", bytes.NewReader(body)) + w := httptest.NewRecorder() + ms.blockVolumeResolveHandler(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200; body: %s", w.Code, w.Body.String()) + } + + var resp blockapi.ResolvedPolicyResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.Policy.DurabilityMode != "sync_all" { + t.Errorf("durability_mode = %q, want sync_all", resp.Policy.DurabilityMode) + } + if resp.Policy.ReplicaFactor != 2 { + t.Errorf("replica_factor = %d, want 2", resp.Policy.ReplicaFactor) + } + if resp.Policy.StorageProfile != "single" { + t.Errorf("storage_profile = %q, want single", resp.Policy.StorageProfile) + } + if resp.Policy.TransportPreference != "nvme" { + t.Errorf("transport = %q, want nvme", resp.Policy.TransportPreference) + } +} + +// --- QA-CP11B1-8: Create with preset stores preset on registry entry --- +func TestQA_CP11B1_CreateWithPreset_StoresPreset(t *testing.T) { + ms := qaPresetMaster(t) + ctx := context.Background() + + _, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{ + Name: "preset-vol", + SizeBytes: 1 << 30, + DurabilityMode: "best_effort", + ReplicaFactor: 2, + }) + if err != nil { + t.Fatalf("create: %v", err) + } + + // Simulate what the HTTP handler does after create. + if entry, ok := ms.blockRegistry.Lookup("preset-vol"); ok { + entry.Preset = "general" + } + + entry, ok := ms.blockRegistry.Lookup("preset-vol") + if !ok { + t.Fatal("volume not found in registry") + } + if entry.Preset != "general" { + t.Errorf("preset = %q, want general", entry.Preset) + } + + // Verify entryToVolumeInfo propagates preset. + info := entryToVolumeInfo(entry) + if info.Preset != "general" { + t.Errorf("VolumeInfo.Preset = %q, want general", info.Preset) + } +} + +// --- QA-CP11B1-9: RF exceeds servers → warning --- +func TestQA_CP11B1_RF_ExceedsServers_Warning(t *testing.T) { + r := blockvol.ResolvePolicy(blockvol.PresetGeneral, "", 3, "", blockvol.EnvironmentInfo{ + ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + found := false + for _, w := range r.Warnings { + if strings.Contains(w, "exceeds available servers") { + found = true + } + } + if !found { + t.Errorf("expected RF>servers warning, got: %v", r.Warnings) + } +} + +// --- QA-CP11B1-10: All response fields present and typed correctly --- +func TestQA_CP11B1_StableOutputFields(t *testing.T) { + r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "", 0, "", blockvol.EnvironmentInfo{ + NVMeAvailable: true, ServerCount: 3, WALSizeDefault: 128 << 20, BlockSizeDefault: 4096, + }) + // Verify every field in VolumePolicy is populated. + p := r.Policy + if p.Preset != blockvol.PresetDatabase { + t.Errorf("preset = %q", p.Preset) + } + if p.DurabilityMode == "" { + t.Error("durability_mode is empty") + } + if p.ReplicaFactor == 0 { + t.Error("replica_factor is 0") + } + if p.TransportPref == "" { + t.Error("transport_preference is empty") + } + if p.WorkloadHint == "" { + t.Error("workload_hint is empty") + } + if p.WALSizeRecommended == 0 { + t.Error("wal_size_recommended is 0") + } + if p.StorageProfile == "" { + t.Error("storage_profile is empty") + } + + // Verify JSON round-trip preserves all fields. + data, err := json.Marshal(r) + if err != nil { + t.Fatalf("marshal: %v", err) + } + var r2 blockvol.ResolvedPolicy + if err := json.Unmarshal(data, &r2); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if r2.Policy.DurabilityMode != p.DurabilityMode { + t.Errorf("round-trip durability = %q, want %q", r2.Policy.DurabilityMode, p.DurabilityMode) + } + if r2.Policy.ReplicaFactor != p.ReplicaFactor { + t.Errorf("round-trip RF = %d, want %d", r2.Policy.ReplicaFactor, p.ReplicaFactor) + } +} + +// --- QA-CP11B1-11: Create + Resolve parity --- +// Create with preset + overrides produces the same effective values +// as /resolve for the same request. Catches drift between resolve and create. +func TestQA_CP11B1_CreateResolve_Parity(t *testing.T) { + ms := qaPresetMaster(t) + + // Resolve first. + preset := blockvol.PresetDatabase + durOverride := "best_effort" + rfOverride := 0 + diskOverride := "" + + env := ms.buildEnvironmentInfo() + resolved := blockvol.ResolvePolicy(preset, durOverride, rfOverride, diskOverride, env) + if len(resolved.Errors) > 0 { + t.Fatalf("resolve errors: %v", resolved.Errors) + } + + // Create using the same resolved values (simulating what the handler does). + ctx := context.Background() + _, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{ + Name: "parity-vol", + SizeBytes: 1 << 30, + DiskType: resolved.Policy.DiskType, + DurabilityMode: resolved.Policy.DurabilityMode, + ReplicaFactor: uint32(resolved.Policy.ReplicaFactor), + }) + if err != nil { + t.Fatalf("create: %v", err) + } + + entry, ok := ms.blockRegistry.Lookup("parity-vol") + if !ok { + t.Fatal("volume not found") + } + + // Check parity: the created volume should have the resolved durability + RF. + if entry.DurabilityMode != resolved.Policy.DurabilityMode { + t.Errorf("durability: entry=%q, resolved=%q", entry.DurabilityMode, resolved.Policy.DurabilityMode) + } + if entry.ReplicaFactor != resolved.Policy.ReplicaFactor { + t.Errorf("RF: entry=%d, resolved=%d", entry.ReplicaFactor, resolved.Policy.ReplicaFactor) + } +} + +// ============================================================ +// CP11B-1 Review Round: Additional Adversarial Tests +// ============================================================ + +// QA-CP11B1-12: NVMe capability from heartbeat, not volumes. +// On a fresh cluster with NVMe-capable servers but no volumes, +// database preset should NOT warn about missing NVMe. +func TestQA_CP11B1_NVMeFromHeartbeat_FreshCluster(t *testing.T) { + r := NewBlockVolumeRegistry() + // Simulate heartbeat from NVMe-capable server (no volumes created yet). + r.UpdateFullHeartbeat("vs1:9333", nil, "192.168.1.10:4420") + + if !r.HasNVMeCapableServer() { + t.Fatal("HasNVMeCapableServer should be true after heartbeat with NVMe addr") + } + + // Resolve database preset — should NOT warn about NVMe. + env := blockvol.EnvironmentInfo{ + NVMeAvailable: r.HasNVMeCapableServer(), + ServerCount: 1, + WALSizeDefault: 128 << 20, + } + resolved := blockvol.ResolvePolicy(blockvol.PresetDatabase, "", 0, "", env) + for _, w := range resolved.Warnings { + if strings.Contains(w, "NVMe") { + t.Fatalf("should NOT warn about NVMe on fresh cluster with NVMe server: %s", w) + } + } +} + +// QA-CP11B1-13: NVMe capability lost after server unmark. +// When NVMe server disconnects, HasNVMeCapableServer should return false. +func TestQA_CP11B1_NVMeLostAfterUnmark(t *testing.T) { + r := NewBlockVolumeRegistry() + r.UpdateFullHeartbeat("nvme-vs:9333", nil, "10.0.0.1:4420") + r.UpdateFullHeartbeat("plain-vs:9333", nil, "") + + if !r.HasNVMeCapableServer() { + t.Fatal("should have NVMe before unmark") + } + + // NVMe server disconnects. + r.UnmarkBlockCapable("nvme-vs:9333") + + if r.HasNVMeCapableServer() { + t.Fatal("should NOT have NVMe after NVMe server disconnected") + } +} + +// QA-CP11B1-14: MarkBlockCapable does NOT overwrite NVMe addr set by heartbeat. +func TestQA_CP11B1_MarkBlockCapable_PreservesNVMe(t *testing.T) { + r := NewBlockVolumeRegistry() + // Heartbeat sets NVMe addr. + r.UpdateFullHeartbeat("vs1:9333", nil, "10.0.0.1:4420") + + // MarkBlockCapable is called again (e.g. from another code path). + r.MarkBlockCapable("vs1:9333") + + // NVMe addr should NOT be cleared. + if !r.HasNVMeCapableServer() { + t.Fatal("MarkBlockCapable should not clear NVMe addr set by heartbeat") + } +} + +// QA-CP11B1-15: Resolve with invalid durability_mode string returns error. +func TestQA_CP11B1_InvalidDurabilityString_Rejected(t *testing.T) { + r := blockvol.ResolvePolicy(blockvol.PresetGeneral, "turbo_sync", 0, "", blockvol.EnvironmentInfo{ + ServerCount: 2, WALSizeDefault: 64 << 20, + }) + if len(r.Errors) == 0 { + t.Fatal("expected error for invalid durability_mode string") + } + if !strings.Contains(r.Errors[0], "invalid durability_mode") { + t.Errorf("error = %q, want 'invalid durability_mode'", r.Errors[0]) + } +} + +// QA-CP11B1-16: Override disk_type on database preset (ssd → hdd). +func TestQA_CP11B1_OverrideDiskType(t *testing.T) { + r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "", 0, "hdd", blockvol.EnvironmentInfo{ + NVMeAvailable: true, ServerCount: 2, WALSizeDefault: 128 << 20, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + if r.Policy.DiskType != "hdd" { + t.Errorf("disk_type = %q, want hdd (override)", r.Policy.DiskType) + } + if !containsStr(r.Overrides, "disk_type") { + t.Errorf("overrides = %v, want disk_type present", r.Overrides) + } + // Preset default was "ssd" — verify it was overridden, not merged. + if r.Policy.Preset != blockvol.PresetDatabase { + t.Errorf("preset = %q, want database", r.Policy.Preset) + } +} + +// QA-CP11B1-17: All three overrides at once. +func TestQA_CP11B1_AllOverridesAtOnce(t *testing.T) { + r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "best_effort", 3, "hdd", blockvol.EnvironmentInfo{ + NVMeAvailable: true, ServerCount: 3, WALSizeDefault: 128 << 20, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + if r.Policy.DurabilityMode != "best_effort" { + t.Errorf("durability = %q, want best_effort", r.Policy.DurabilityMode) + } + if r.Policy.ReplicaFactor != 3 { + t.Errorf("RF = %d, want 3", r.Policy.ReplicaFactor) + } + if r.Policy.DiskType != "hdd" { + t.Errorf("disk_type = %q, want hdd", r.Policy.DiskType) + } + if len(r.Overrides) != 3 { + t.Errorf("overrides count = %d, want 3: %v", len(r.Overrides), r.Overrides) + } + // Transport and workload_hint should still come from preset. + if r.Policy.TransportPref != "nvme" { + t.Errorf("transport = %q, want nvme (from preset, not overridable)", r.Policy.TransportPref) + } +} + +// QA-CP11B1-18: Preset override that creates an incompatible combo. +// database preset (sync_all) + override RF=1 → warning (not error). +func TestQA_CP11B1_PresetOverride_SyncAll_RF1_Warning(t *testing.T) { + r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "", 1, "", blockvol.EnvironmentInfo{ + NVMeAvailable: true, ServerCount: 2, WALSizeDefault: 128 << 20, + }) + // sync_all + RF=1 is valid (no error) but should warn. + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + found := false + for _, w := range r.Warnings { + if strings.Contains(w, "no replication benefit") { + found = true + } + } + if !found { + t.Errorf("expected 'no replication benefit' warning, got: %v", r.Warnings) + } +} + +// QA-CP11B1-19: Zero ServerCount → RF warning suppressed (unknown cluster state). +func TestQA_CP11B1_ZeroServerCount_NoRFWarning(t *testing.T) { + r := blockvol.ResolvePolicy(blockvol.PresetGeneral, "", 3, "", blockvol.EnvironmentInfo{ + ServerCount: 0, WALSizeDefault: 64 << 20, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + for _, w := range r.Warnings { + if strings.Contains(w, "exceeds available servers") { + t.Fatalf("should NOT warn about RF vs servers when ServerCount=0: %s", w) + } + } +} + +// QA-CP11B1-20: Resolve endpoint returns errors[] for invalid preset (not HTTP error). +func TestQA_CP11B1_ResolveEndpoint_InvalidPreset_Returns200WithErrors(t *testing.T) { + ms := qaPresetMaster(t) + + body, _ := json.Marshal(blockapi.CreateVolumeRequest{Preset: "bogus"}) + req := httptest.NewRequest(http.MethodPost, "/block/volume/resolve", bytes.NewReader(body)) + w := httptest.NewRecorder() + ms.blockVolumeResolveHandler(w, req) + + // Resolve always returns 200, even with errors. + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", w.Code) + } + var resp blockapi.ResolvedPolicyResponse + json.NewDecoder(w.Body).Decode(&resp) + if len(resp.Errors) == 0 { + t.Fatal("expected errors[] for invalid preset") + } + if !strings.Contains(resp.Errors[0], "unknown preset") { + t.Errorf("error = %q, want 'unknown preset'", resp.Errors[0]) + } +} + +// QA-CP11B1-21: Create handler rejects invalid preset with 400 (not 200). +func TestQA_CP11B1_CreateHandler_InvalidPreset_Returns400(t *testing.T) { + ms := qaPresetMaster(t) + + body, _ := json.Marshal(blockapi.CreateVolumeRequest{ + Name: "bad-preset-vol", SizeBytes: 1 << 30, Preset: "bogus", + }) + req := httptest.NewRequest(http.MethodPost, "/block/volume", bytes.NewReader(body)) + w := httptest.NewRecorder() + ms.blockVolumeCreateHandler(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want 400; body: %s", w.Code, w.Body.String()) + } +} + +// QA-CP11B1-22: Concurrent resolve calls don't panic. +func TestQA_CP11B1_ConcurrentResolve_NoPanic(t *testing.T) { + ms := qaPresetMaster(t) + // Simulate heartbeats to populate server info. + ms.blockRegistry.UpdateFullHeartbeat("vs1:9333", nil, "10.0.0.1:4420") + ms.blockRegistry.UpdateFullHeartbeat("vs2:9333", nil, "") + + var wg sync.WaitGroup + for i := 0; i < 20; i++ { + wg.Add(1) + preset := []string{"database", "general", "throughput", "bogus", ""}[i%5] + go func(p string) { + defer wg.Done() + body, _ := json.Marshal(blockapi.CreateVolumeRequest{Preset: p}) + req := httptest.NewRequest(http.MethodPost, "/block/volume/resolve", bytes.NewReader(body)) + w := httptest.NewRecorder() + ms.blockVolumeResolveHandler(w, req) + }(preset) + } + wg.Wait() + // No panic = pass. +} + +// containsStr is a helper (may already exist in the file from preset_test.go, +// but it's in a different package — blockvol vs weed_server). +func containsStr(ss []string, target string) bool { + for _, s := range ss { + if s == target { + return true + } + } + return false +} diff --git a/weed/server/qa_block_cp62_test.go b/weed/server/qa_block_cp62_test.go index 27259bfb0..6e291c725 100644 --- a/weed/server/qa_block_cp62_test.go +++ b/weed/server/qa_block_cp62_test.go @@ -34,7 +34,7 @@ func TestQA_Reg_FullHeartbeatCrossTalk(t *testing.T) { // Full heartbeat from s1 reports vol1 — should NOT affect s2's volumes. r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{ {Path: "/v1.blk", Epoch: 1}, - }) + }, "") // vol2 on s2 should still exist. if _, ok := r.Lookup("vol2"); !ok { @@ -49,7 +49,7 @@ func TestQA_Reg_FullHeartbeatEmptyServer(t *testing.T) { r.Register(&BlockVolumeEntry{Name: "vol2", VolumeServer: "s1", Path: "/v2.blk", Status: StatusActive}) // Empty heartbeat from s1 (HasNoBlockVolumes=true, zero infos). - r.UpdateFullHeartbeat("s1", nil) + r.UpdateFullHeartbeat("s1", nil, "") if _, ok := r.Lookup("vol1"); ok { t.Error("BUG: vol1 should be removed after empty full heartbeat") @@ -78,7 +78,7 @@ func TestQA_Reg_ConcurrentHeartbeatAndRegister(t *testing.T) { }() r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{ {Path: fmt.Sprintf("/v%d.blk", i), Epoch: uint64(i)}, - }) + }, "") }(i) go func(i int) { defer wg.Done() diff --git a/weed/server/qa_block_cp63_test.go b/weed/server/qa_block_cp63_test.go index e7115cd52..b16b38b10 100644 --- a/weed/server/qa_block_cp63_test.go +++ b/weed/server/qa_block_cp63_test.go @@ -331,7 +331,7 @@ func TestQA_Reg_FullHeartbeatDoesNotClobberReplicaServer(t *testing.T) { // Full heartbeat from vs1 — should NOT clear replica info. r.UpdateFullHeartbeat("vs1", []*master_pb.BlockVolumeInfoMessage{ {Path: "/data/vol1.blk", Epoch: 1, Role: blockvol.RoleToWire(blockvol.RolePrimary), VolumeSize: 1 << 30}, - }) + }, "") e, _ := r.Lookup("vol1") if e.ReplicaServer != "vs2" { diff --git a/weed/server/qa_block_cp82_adversarial_test.go b/weed/server/qa_block_cp82_adversarial_test.go index 1be417498..761dab5a6 100644 --- a/weed/server/qa_block_cp82_adversarial_test.go +++ b/weed/server/qa_block_cp82_adversarial_test.go @@ -197,7 +197,7 @@ func TestQA_CP82_ReplicaHeartbeatSpoof_DoesNotDeletePrimary(t *testing.T) { Epoch: 1, Role: blockvol.RoleToWire(blockvol.RoleReplica), }, - }) + }, "") entry, ok := ms.blockRegistry.Lookup("vol-spoof") if !ok { @@ -209,7 +209,7 @@ func TestQA_CP82_ReplicaHeartbeatSpoof_DoesNotDeletePrimary(t *testing.T) { // Now simulate a full heartbeat from vs2 with NO volumes at all. // This should remove vs2 as replica but NOT delete the volume. - ms.blockRegistry.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{}) + ms.blockRegistry.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{}, "") entry, ok = ms.blockRegistry.Lookup("vol-spoof") if !ok { @@ -381,7 +381,7 @@ func TestQA_CP82_MasterRestart_ReconstructReplicas_ThenFailover(t *testing.T) { Role: blockvol.RoleToWire(blockvol.RolePrimary), WalHeadLsn: 500, }, - }) + }, "") entry, ok := ms.blockRegistry.Lookup("vol-restart") if !ok { @@ -400,7 +400,7 @@ func TestQA_CP82_MasterRestart_ReconstructReplicas_ThenFailover(t *testing.T) { Role: blockvol.RoleToWire(blockvol.RoleReplica), WalHeadLsn: 498, }, - }) + }, "") entry, _ = ms.blockRegistry.Lookup("vol-restart") if len(entry.Replicas) == 0 { @@ -473,10 +473,10 @@ func TestQA_CP82_RF3_OneReplicaFlaps_UnderWriteLoad(t *testing.T) { Role: blockvol.RoleToWire(blockvol.RoleReplica), WalHeadLsn: 100, }, - }) + }, "") } else { // Replica reports no volumes (simulates disconnect). - ms.blockRegistry.UpdateFullHeartbeat("vs3:9333", []*master_pb.BlockVolumeInfoMessage{}) + ms.blockRegistry.UpdateFullHeartbeat("vs3:9333", []*master_pb.BlockVolumeInfoMessage{}, "") } } }() @@ -494,7 +494,7 @@ func TestQA_CP82_RF3_OneReplicaFlaps_UnderWriteLoad(t *testing.T) { Role: blockvol.RoleToWire(blockvol.RolePrimary), WalHeadLsn: uint64(100 + i), }, - }) + }, "") } }() @@ -511,7 +511,7 @@ func TestQA_CP82_RF3_OneReplicaFlaps_UnderWriteLoad(t *testing.T) { Role: blockvol.RoleToWire(blockvol.RoleReplica), WalHeadLsn: uint64(100 + i), }, - }) + }, "") } }() @@ -626,7 +626,7 @@ func TestQA_CP82_ScrubConcurrentWrites_NoFalseCorruption(t *testing.T) { WalHeadLsn: uint64(500 + i), HealthScore: 1.0, // Clean — no false positives }, - }) + }, "") } }() @@ -644,7 +644,7 @@ func TestQA_CP82_ScrubConcurrentWrites_NoFalseCorruption(t *testing.T) { WalHeadLsn: uint64(500 + i), HealthScore: 1.0, }, - }) + }, "") } }() @@ -690,7 +690,7 @@ func TestQA_CP82_ScrubDetectsCorruption_HealthDrops_PromotionAvoids(t *testing.T HealthScore: 0.3, ScrubErrors: 5, }, - }) + }, "") // Trigger failover — vs3 (healthy) should be promoted, not vs2. newEpoch, err := ms.blockRegistry.PromoteBestReplica("vol-corrupt") diff --git a/weed/server/qa_block_cp831_adversarial_test.go b/weed/server/qa_block_cp831_adversarial_test.go index 7d13231d1..610c9b37a 100644 --- a/weed/server/qa_block_cp831_adversarial_test.go +++ b/weed/server/qa_block_cp831_adversarial_test.go @@ -179,7 +179,7 @@ func TestQA_CP831_HeartbeatEmptyMode_DoesNotOverwriteStrict(t *testing.T) { VolumeSize: 1 << 30, DurabilityMode: "", // empty — must not overwrite }, - }) + }, "") } entry, ok := ms.blockRegistry.Lookup("hb-strict") @@ -220,7 +220,7 @@ func TestQA_CP831_HeartbeatNonEmptyMode_DoesUpdate(t *testing.T) { VolumeSize: 1 << 30, DurabilityMode: "sync_all", }, - }) + }, "") entry, ok := ms.blockRegistry.Lookup("hb-update") if !ok { @@ -718,7 +718,7 @@ func TestQA_CP831_MasterRestart_AutoRegister_PreservesDurabilityMode(t *testing. VolumeSize: 1 << 30, DurabilityMode: "sync_all", }, - }) + }, "") entry, ok := ms.blockRegistry.Lookup("strict-vol") if !ok { @@ -736,7 +736,7 @@ func TestQA_CP831_MasterRestart_AutoRegister_PreservesDurabilityMode(t *testing. VolumeSize: 1 << 30, // DurabilityMode omitted }, - }) + }, "") entry2, ok := ms2.blockRegistry.Lookup("legacy-vol") if !ok { diff --git a/weed/server/qa_block_durability_test.go b/weed/server/qa_block_durability_test.go index 8e2812880..15f5aef7d 100644 --- a/weed/server/qa_block_durability_test.go +++ b/weed/server/qa_block_durability_test.go @@ -316,7 +316,7 @@ func TestDurability_Heartbeat_ReportsMode(t *testing.T) { VolumeSize: 1 << 30, DurabilityMode: "", }, - }) + }, "") entry, ok = ms.blockRegistry.Lookup("hb-vol") if !ok { t.Fatal("volume not in registry after heartbeat") @@ -332,7 +332,7 @@ func TestDurability_Heartbeat_ReportsMode(t *testing.T) { VolumeSize: 1 << 30, DurabilityMode: "sync_all", }, - }) + }, "") entry, ok = ms.blockRegistry.Lookup("hb-vol") if !ok { t.Fatal("volume not in registry after second heartbeat") diff --git a/weed/server/qa_block_expand_adversarial_test.go b/weed/server/qa_block_expand_adversarial_test.go index a14b7e285..e87ce67a6 100644 --- a/weed/server/qa_block_expand_adversarial_test.go +++ b/weed/server/qa_block_expand_adversarial_test.go @@ -291,7 +291,7 @@ func TestQA_B10_RepeatedEmptyHeartbeats_DuringExpand(t *testing.T) { // 10 empty heartbeats from the primary — each one would delete // the entry without the B-10 guard. for i := 0; i < 10; i++ { - ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{}) + ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{}, "") } _, ok := ms.blockRegistry.Lookup("multi-hb") @@ -322,7 +322,7 @@ func TestQA_B10_ExpandFailed_HeartbeatStillProtected(t *testing.T) { ms.blockRegistry.MarkExpandFailed("fail-hb") // Empty heartbeat should not delete — ExpandFailed keeps ExpandInProgress=true. - ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{}) + ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{}, "") e, ok := ms.blockRegistry.Lookup("fail-hb") if !ok { @@ -337,7 +337,7 @@ func TestQA_B10_ExpandFailed_HeartbeatStillProtected(t *testing.T) { // After ClearExpandFailed, empty heartbeat should delete normally. ms.blockRegistry.ClearExpandFailed("fail-hb") - ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{}) + ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{}, "") _, ok = ms.blockRegistry.Lookup("fail-hb") if ok { @@ -372,7 +372,7 @@ func TestQA_B10_HeartbeatSizeSuppress_DuringExpand(t *testing.T) { Epoch: 1, Role: blockvol.RoleToWire(blockvol.RolePrimary), }, - }) + }, "") entry, _ = ms.blockRegistry.Lookup("size-suppress") if entry.SizeBytes != origSize { @@ -388,7 +388,7 @@ func TestQA_B10_HeartbeatSizeSuppress_DuringExpand(t *testing.T) { Epoch: 1, Role: blockvol.RoleToWire(blockvol.RolePrimary), }, - }) + }, "") entry, _ = ms.blockRegistry.Lookup("size-suppress") if entry.SizeBytes != origSize { @@ -441,7 +441,7 @@ func TestQA_B10_ConcurrentHeartbeatsAndExpand(t *testing.T) { for i := 0; i < rounds; i++ { if i%5 == 0 { // Every 5th: empty heartbeat (simulates brief restart). - ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{}) + ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{}, "") } else { ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{ { @@ -451,7 +451,7 @@ func TestQA_B10_ConcurrentHeartbeatsAndExpand(t *testing.T) { Role: blockvol.RoleToWire(blockvol.RolePrimary), WalHeadLsn: uint64(100 + i), }, - }) + }, "") } } }() @@ -470,7 +470,7 @@ func TestQA_B10_ConcurrentHeartbeatsAndExpand(t *testing.T) { Role: blockvol.RoleToWire(blockvol.RoleReplica), WalHeadLsn: uint64(99 + i), }, - }) + }, "") } }() } diff --git a/weed/server/qa_block_nvme_publication_test.go b/weed/server/qa_block_nvme_publication_test.go index ddf09e48f..2cfbadbaf 100644 --- a/weed/server/qa_block_nvme_publication_test.go +++ b/weed/server/qa_block_nvme_publication_test.go @@ -111,7 +111,7 @@ func TestQA_NVMe_HeartbeatSetsNvmeFields(t *testing.T) { NvmeAddr: "10.0.0.1:4420", Nqn: "nqn.2024-01.com.seaweedfs:vol1", }, - }) + }, "") entry, ok := r.Lookup("vol1") if !ok { @@ -152,7 +152,7 @@ func TestQA_NVMe_HeartbeatClearsStaleNvme(t *testing.T) { Role: 1, // NvmeAddr and Nqn intentionally empty. }, - }) + }, "") entry, _ := r.Lookup("vol1") // After heartbeat with empty NVMe fields, stale NVMe info should be cleared. @@ -370,7 +370,7 @@ func TestQA_NVMe_FullHeartbeat_MasterRestart(t *testing.T) { NvmeAddr: "10.0.0.1:4420", Nqn: "nqn.2024-01.com.seaweedfs:vol1", }, - }) + }, "") // After heartbeat, volume should be reconstructed with NVMe fields. // Currently the registry uses nameFromPath() to find/create entries. @@ -596,7 +596,7 @@ func TestIntegration_NVMe_FailoverUpdatesNvmeAddr(t *testing.T) { NvmeAddr: newPrimaryHost + ":4420", Nqn: fmt.Sprintf("nqn.2024-01.com.seaweedfs:vol.pvc-failover-nvme"), }, - }) + }, "") // CSI re-publishes after failover: Lookup must return new NVMe address. lookupResp, err := ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "pvc-failover-nvme"}) @@ -651,7 +651,7 @@ func TestIntegration_NVMe_HeartbeatReconstructionAfterMasterRestart(t *testing.T NvmeAddr: primaryHost + ":4420", Nqn: "nqn.2024-01.com.seaweedfs:vol.pvc-restart-1", }, - }) + }, "") // Step 4: CSI calls Lookup — must find NVMe details. lookupResp, err := ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "pvc-restart-1"}) @@ -895,7 +895,7 @@ func TestIntegration_NVMe_FullLifecycle_K8s(t *testing.T) { NvmeAddr: newPrimaryHost + ":4420", Nqn: "nqn.2024-01.com.seaweedfs:vol.pvc-k8s-data", }, - }) + }, "") // ── Step 7: CSI re-publishes → node plugin reconnects via NVMe ── lookupResp, err = ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "pvc-k8s-data"}) @@ -977,7 +977,7 @@ func TestQA_NVMe_ToggleNvmeOnRunningVS(t *testing.T) { NvmeAddr: "10.0.0.1:4420", Nqn: "nqn.2024-01.com.seaweedfs:toggle-vol", }, - }) + }, "") entry, _ = r.Lookup("toggle-vol") if entry.NvmeAddr != "10.0.0.1:4420" { @@ -997,7 +997,7 @@ func TestQA_NVMe_ToggleNvmeOnRunningVS(t *testing.T) { Role: 1, // NvmeAddr and Nqn intentionally empty — NVMe disabled. }, - }) + }, "") entry, _ = r.Lookup("toggle-vol") if entry.NvmeAddr != "" { @@ -1080,7 +1080,7 @@ func TestQA_NVMe_ToggleNvmeOnRunningVS_ReplicaSide(t *testing.T) { NvmeAddr: "10.0.0.2:4420", Nqn: "nqn.2024-01.com.seaweedfs:toggle-replica-vol", }, - }) + }, "") entry, _ = r.Lookup("toggle-replica-vol") if entry.Replicas[0].NvmeAddr != "10.0.0.2:4420" { @@ -1101,7 +1101,7 @@ func TestQA_NVMe_ToggleNvmeOnRunningVS_ReplicaSide(t *testing.T) { WalHeadLsn: 100, // NvmeAddr and Nqn intentionally empty — NVMe disabled. }, - }) + }, "") entry, _ = r.Lookup("toggle-replica-vol") if entry.Replicas[0].NvmeAddr != "" { @@ -1325,7 +1325,7 @@ func TestQA_NVMe_PromotionThenImmediateLookup(t *testing.T) { NvmeAddr: "10.0.0.4:4420", Nqn: "nqn.2024-01.com.seaweedfs:promo-fix-vol", }, - }) + }, "") // Now Lookup should return the NvmeAddr. entry, ok := r.Lookup("promo-fix-vol") diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 7b3b4efd9..10be5b1b7 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -371,5 +371,6 @@ func (vs *VolumeServer) collectBlockVolumeHeartbeat(ip string, port uint32, dc, Rack: rack, BlockVolumeInfos: blockvol.InfoMessagesToProto(msgs), HasNoBlockVolumes: len(msgs) == 0, + BlockNvmeAddr: vs.blockService.NvmeListenAddr(), } } diff --git a/weed/storage/blockvol/blockapi/client.go b/weed/storage/blockvol/blockapi/client.go index 7916f20ef..fbbec1a9a 100644 --- a/weed/storage/blockvol/blockapi/client.go +++ b/weed/storage/blockvol/blockapi/client.go @@ -191,6 +191,27 @@ func (c *Client) Preflight(ctx context.Context, name string) (*PreflightResponse return &out, nil } +// ResolvePolicy resolves a preset + overrides without creating a volume. +func (c *Client) ResolvePolicy(ctx context.Context, req CreateVolumeRequest) (*ResolvedPolicyResponse, error) { + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + resp, err := c.doRequest(ctx, http.MethodPost, "/block/volume/resolve", bytes.NewReader(body)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if err := checkStatus(resp, http.StatusOK); err != nil { + return nil, err + } + var out ResolvedPolicyResponse + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode response: %w", err) + } + return &out, nil +} + // ListServers lists all block-capable volume servers. func (c *Client) ListServers(ctx context.Context) ([]ServerInfo, error) { resp, err := c.doRequest(ctx, http.MethodGet, "/block/servers", nil) diff --git a/weed/storage/blockvol/blockapi/types.go b/weed/storage/blockvol/blockapi/types.go index d381eb2b2..f41d8b7a9 100644 --- a/weed/storage/blockvol/blockapi/types.go +++ b/weed/storage/blockvol/blockapi/types.go @@ -14,6 +14,7 @@ type CreateVolumeRequest struct { DiskType string `json:"disk_type"` // e.g. "ssd", "hdd" DurabilityMode string `json:"durability_mode,omitempty"` // "best_effort", "sync_all", "sync_quorum" ReplicaFactor int `json:"replica_factor,omitempty"` // 1, 2, or 3 (default: 2) + Preset string `json:"preset,omitempty"` // "database", "general", "throughput", or "" } // VolumeInfo describes a block volume. @@ -38,10 +39,31 @@ type VolumeInfo struct { HealthScore float64 `json:"health_score"` ReplicaDegraded bool `json:"replica_degraded,omitempty"` DurabilityMode string `json:"durability_mode"` // CP8-3-1 + Preset string `json:"preset,omitempty"` // CP11B-1: preset used at creation NvmeAddr string `json:"nvme_addr,omitempty"` NQN string `json:"nqn,omitempty"` } +// ResolvedPolicyResponse is the response for POST /block/volume/resolve. +type ResolvedPolicyResponse struct { + Policy ResolvedPolicyView `json:"policy"` + Overrides []string `json:"overrides,omitempty"` + Warnings []string `json:"warnings,omitempty"` + Errors []string `json:"errors,omitempty"` +} + +// ResolvedPolicyView is the fully resolved policy shown to the user. +type ResolvedPolicyView struct { + Preset string `json:"preset,omitempty"` + DurabilityMode string `json:"durability_mode"` + ReplicaFactor int `json:"replica_factor"` + DiskType string `json:"disk_type,omitempty"` + TransportPreference string `json:"transport_preference"` + WorkloadHint string `json:"workload_hint"` + WALSizeRecommended uint64 `json:"wal_size_recommended"` + StorageProfile string `json:"storage_profile"` +} + // ReplicaDetail describes one replica in the API response. type ReplicaDetail struct { Server string `json:"server"` diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index be1efac3b..a486e097b 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -464,20 +464,35 @@ func (v *BlockVol) ReadLBA(lba uint64, length uint32) ([]byte, error) { // readOneBlock reads a single block, checking dirty map first, then extent. func (v *BlockVol) readOneBlock(lba uint64) ([]byte, error) { - walOff, lsn, _, ok := v.dirtyMap.Get(lba) - if ok { + for { + walOff, lsn, _, ok := v.dirtyMap.Get(lba) + if !ok { + return v.readBlockFromExtent(lba) + } data, stale, err := v.readBlockFromWAL(walOff, lba, lsn) if err != nil { return nil, err } - if !stale { + if stale { + // WAL slot was reused. Extent may not have the latest: a newer write + // could have updated dirtyMap after we got our old entry. Re-check + // dirtyMap before trusting extent; if a newer entry exists, retry. + _, currentLSN, _, stillOk := v.dirtyMap.Get(lba) + if stillOk && currentLSN != lsn { + continue + } + return v.readBlockFromExtent(lba) + } + // Verify no newer write overwrote this LBA while we were reading. + // A concurrent WriteLBA could have Put(lba, walOff_new, lsn_new) after + // our Get; we would have read old data at walOff. Re-check and retry. + _, currentLSN, _, stillOk := v.dirtyMap.Get(lba) + if stillOk && currentLSN == lsn { return data, nil } - // WAL slot was reused (flusher reclaimed it between our - // dirty map read and WAL read). The data is already flushed - // to the extent region, so fall through to extent read. + // LSN changed (newer write) or flusher removed: retry with fresh dirtyMap state. + continue } - return v.readBlockFromExtent(lba) } // maxWALEntryDataLen caps the data length we trust from a WAL entry header. diff --git a/weed/storage/blockvol/preset.go b/weed/storage/blockvol/preset.go new file mode 100644 index 000000000..14f48249a --- /dev/null +++ b/weed/storage/blockvol/preset.go @@ -0,0 +1,174 @@ +package blockvol + +import "fmt" + +// PresetName identifies a named provisioning preset. +type PresetName string + +const ( + PresetDatabase PresetName = "database" + PresetGeneral PresetName = "general" + PresetThroughput PresetName = "throughput" +) + +// VolumePolicy is the fully resolved configuration for a block volume. +type VolumePolicy struct { + Preset PresetName `json:"preset,omitempty"` + DurabilityMode string `json:"durability_mode"` + ReplicaFactor int `json:"replica_factor"` + DiskType string `json:"disk_type,omitempty"` + TransportPref string `json:"transport_preference"` + WorkloadHint string `json:"workload_hint"` + WALSizeRecommended uint64 `json:"wal_size_recommended"` + StorageProfile string `json:"storage_profile"` +} + +// ResolvedPolicy is the result of resolving a preset + user overrides. +type ResolvedPolicy struct { + Policy VolumePolicy `json:"policy"` + Overrides []string `json:"overrides,omitempty"` + Warnings []string `json:"warnings,omitempty"` + Errors []string `json:"errors,omitempty"` +} + +// EnvironmentInfo provides cluster state to the resolver. +type EnvironmentInfo struct { + NVMeAvailable bool + ServerCount int + WALSizeDefault uint64 + BlockSizeDefault uint32 +} + +// presetDefaults holds the default policy for each named preset. +type presetDefaults struct { + DurabilityMode string + ReplicaFactor int + DiskType string + TransportPref string + WorkloadHint string + WALSizeRec uint64 +} + +var presets = map[PresetName]presetDefaults{ + PresetDatabase: { + DurabilityMode: "sync_all", + ReplicaFactor: 2, + DiskType: "ssd", + TransportPref: "nvme", + WorkloadHint: WorkloadDatabase, + WALSizeRec: 128 << 20, + }, + PresetGeneral: { + DurabilityMode: "best_effort", + ReplicaFactor: 2, + DiskType: "", + TransportPref: "iscsi", + WorkloadHint: WorkloadGeneral, + WALSizeRec: 64 << 20, + }, + PresetThroughput: { + DurabilityMode: "best_effort", + ReplicaFactor: 2, + DiskType: "", + TransportPref: "iscsi", + WorkloadHint: WorkloadThroughput, + WALSizeRec: 128 << 20, + }, +} + +// system defaults when no preset is specified +var systemDefaults = presetDefaults{ + DurabilityMode: "best_effort", + ReplicaFactor: 2, + DiskType: "", + TransportPref: "iscsi", + WorkloadHint: WorkloadGeneral, + WALSizeRec: 64 << 20, +} + +// ResolvePolicy resolves a preset + explicit request fields into a final policy. +// Pure function — no side effects, no server dependencies. +func ResolvePolicy(preset PresetName, durabilityMode string, replicaFactor int, + diskType string, env EnvironmentInfo) ResolvedPolicy { + + var result ResolvedPolicy + + // Step 1: Look up preset defaults. + defaults := systemDefaults + if preset != "" { + pd, ok := presets[preset] + if !ok { + result.Errors = append(result.Errors, fmt.Sprintf("unknown preset %q", preset)) + return result + } + defaults = pd + } + + // Start with defaults. + policy := VolumePolicy{ + Preset: preset, + DurabilityMode: defaults.DurabilityMode, + ReplicaFactor: defaults.ReplicaFactor, + DiskType: defaults.DiskType, + TransportPref: defaults.TransportPref, + WorkloadHint: defaults.WorkloadHint, + WALSizeRecommended: defaults.WALSizeRec, + StorageProfile: "single", + } + + // Step 2: Apply overrides. + if durabilityMode != "" { + policy.DurabilityMode = durabilityMode + result.Overrides = append(result.Overrides, "durability_mode") + } + if replicaFactor != 0 { + policy.ReplicaFactor = replicaFactor + result.Overrides = append(result.Overrides, "replica_factor") + } + if diskType != "" { + policy.DiskType = diskType + result.Overrides = append(result.Overrides, "disk_type") + } + + // Step 3: Normalize + validate durability_mode. + durMode, err := ParseDurabilityMode(policy.DurabilityMode) + if err != nil { + result.Errors = append(result.Errors, fmt.Sprintf("invalid durability_mode: %s", err)) + result.Policy = policy + return result + } + + // Step 4: Cross-validate durability vs RF. + if err := durMode.Validate(policy.ReplicaFactor); err != nil { + result.Errors = append(result.Errors, err.Error()) + result.Policy = policy + return result + } + + // Step 5: Advisory warnings. + if policy.ReplicaFactor == 1 && durMode == DurabilitySyncAll { + result.Warnings = append(result.Warnings, + "sync_all with replica_factor=1 provides no replication benefit") + } + if env.ServerCount > 0 && policy.ReplicaFactor > env.ServerCount { + result.Warnings = append(result.Warnings, + fmt.Sprintf("replica_factor=%d exceeds available servers (%d)", policy.ReplicaFactor, env.ServerCount)) + } + + // Step 6: Transport advisory. + if policy.TransportPref == "nvme" && !env.NVMeAvailable { + result.Warnings = append(result.Warnings, + "preset recommends NVMe transport but no NVMe-capable servers are available") + } + + // Step 7: WAL sizing advisory. + // Check engine default against preset recommendation. + if env.WALSizeDefault > 0 && env.WALSizeDefault < policy.WALSizeRecommended { + result.Warnings = append(result.Warnings, + fmt.Sprintf("preset recommends %dMB WAL but engine default is %dMB", + policy.WALSizeRecommended>>20, env.WALSizeDefault>>20)) + } + + result.Policy = policy + return result +} diff --git a/weed/storage/blockvol/preset_test.go b/weed/storage/blockvol/preset_test.go new file mode 100644 index 000000000..0dec52b6b --- /dev/null +++ b/weed/storage/blockvol/preset_test.go @@ -0,0 +1,185 @@ +package blockvol + +import ( + "strings" + "testing" +) + +func TestResolvePolicy_DatabaseDefaults(t *testing.T) { + r := ResolvePolicy(PresetDatabase, "", 0, "", EnvironmentInfo{ + NVMeAvailable: true, ServerCount: 3, WALSizeDefault: 128 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + p := r.Policy + if p.DurabilityMode != "sync_all" { + t.Errorf("durability_mode = %q, want sync_all", p.DurabilityMode) + } + if p.ReplicaFactor != 2 { + t.Errorf("replica_factor = %d, want 2", p.ReplicaFactor) + } + if p.DiskType != "ssd" { + t.Errorf("disk_type = %q, want ssd", p.DiskType) + } + if p.TransportPref != "nvme" { + t.Errorf("transport_preference = %q, want nvme", p.TransportPref) + } + if p.WALSizeRecommended != 128<<20 { + t.Errorf("wal_size_recommended = %d, want %d", p.WALSizeRecommended, 128<<20) + } + if p.WorkloadHint != "database" { + t.Errorf("workload_hint = %q, want database", p.WorkloadHint) + } + if p.StorageProfile != "single" { + t.Errorf("storage_profile = %q, want single", p.StorageProfile) + } + if len(r.Overrides) != 0 { + t.Errorf("overrides = %v, want empty", r.Overrides) + } +} + +func TestResolvePolicy_GeneralDefaults(t *testing.T) { + r := ResolvePolicy(PresetGeneral, "", 0, "", EnvironmentInfo{ + ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + p := r.Policy + if p.DurabilityMode != "best_effort" { + t.Errorf("durability_mode = %q, want best_effort", p.DurabilityMode) + } + if p.ReplicaFactor != 2 { + t.Errorf("replica_factor = %d, want 2", p.ReplicaFactor) + } + if p.DiskType != "" { + t.Errorf("disk_type = %q, want empty", p.DiskType) + } + if p.TransportPref != "iscsi" { + t.Errorf("transport_preference = %q, want iscsi", p.TransportPref) + } + if p.WALSizeRecommended != 64<<20 { + t.Errorf("wal_size_recommended = %d, want %d", p.WALSizeRecommended, 64<<20) + } +} + +func TestResolvePolicy_ThroughputDefaults(t *testing.T) { + r := ResolvePolicy(PresetThroughput, "", 0, "", EnvironmentInfo{ + ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + p := r.Policy + if p.DurabilityMode != "best_effort" { + t.Errorf("durability_mode = %q, want best_effort", p.DurabilityMode) + } + if p.WALSizeRecommended != 128<<20 { + t.Errorf("wal_size_recommended = %d, want %d", p.WALSizeRecommended, 128<<20) + } + if p.WorkloadHint != "throughput" { + t.Errorf("workload_hint = %q, want throughput", p.WorkloadHint) + } +} + +func TestResolvePolicy_OverrideDurability(t *testing.T) { + r := ResolvePolicy(PresetDatabase, "best_effort", 0, "", EnvironmentInfo{ + NVMeAvailable: true, ServerCount: 2, WALSizeDefault: 128 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + if r.Policy.DurabilityMode != "best_effort" { + t.Errorf("durability_mode = %q, want best_effort (override)", r.Policy.DurabilityMode) + } + if !containsStr(r.Overrides, "durability_mode") { + t.Errorf("overrides = %v, want durability_mode present", r.Overrides) + } +} + +func TestResolvePolicy_OverrideRF(t *testing.T) { + r := ResolvePolicy(PresetGeneral, "", 3, "", EnvironmentInfo{ + ServerCount: 3, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + if r.Policy.ReplicaFactor != 3 { + t.Errorf("replica_factor = %d, want 3 (override)", r.Policy.ReplicaFactor) + } + if !containsStr(r.Overrides, "replica_factor") { + t.Errorf("overrides = %v, want replica_factor present", r.Overrides) + } +} + +func TestResolvePolicy_NoPreset_SystemDefaults(t *testing.T) { + r := ResolvePolicy("", "", 0, "", EnvironmentInfo{ + ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + p := r.Policy + if p.DurabilityMode != "best_effort" { + t.Errorf("durability_mode = %q, want best_effort", p.DurabilityMode) + } + if p.ReplicaFactor != 2 { + t.Errorf("replica_factor = %d, want 2", p.ReplicaFactor) + } + if p.TransportPref != "iscsi" { + t.Errorf("transport_preference = %q, want iscsi", p.TransportPref) + } + if p.Preset != "" { + t.Errorf("preset = %q, want empty", p.Preset) + } +} + +func TestResolvePolicy_InvalidPreset(t *testing.T) { + r := ResolvePolicy("nosuch", "", 0, "", EnvironmentInfo{}) + if len(r.Errors) == 0 { + t.Fatal("expected error for unknown preset") + } + if !strings.Contains(r.Errors[0], "unknown preset") { + t.Errorf("error = %q, want to contain 'unknown preset'", r.Errors[0]) + } +} + +func TestResolvePolicy_IncompatibleCombo(t *testing.T) { + r := ResolvePolicy("", "sync_quorum", 2, "", EnvironmentInfo{ + ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) == 0 { + t.Fatal("expected error for sync_quorum + RF=2") + } + if !strings.Contains(r.Errors[0], "replica_factor >= 3") { + t.Errorf("error = %q, want sync_quorum RF constraint", r.Errors[0]) + } +} + +func TestResolvePolicy_WALWarning(t *testing.T) { + r := ResolvePolicy(PresetDatabase, "", 0, "", EnvironmentInfo{ + NVMeAvailable: true, ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096, + }) + if len(r.Errors) > 0 { + t.Fatalf("unexpected errors: %v", r.Errors) + } + found := false + for _, w := range r.Warnings { + if strings.Contains(w, "128MB WAL") && strings.Contains(w, "64MB") { + found = true + } + } + if !found { + t.Errorf("expected WAL sizing warning, got warnings: %v", r.Warnings) + } +} + +func containsStr(ss []string, target string) bool { + for _, s := range ss { + if s == target { + return true + } + } + return false +}