Browse Source

feat: CP11B-1 provisioning presets + review fixes

Preset system: ResolvePolicy resolves named presets (database, general,
throughput) with per-field overrides into concrete volume parameters.
Create path now uses resolved policy instead of ad-hoc validation.
New /block/volume/resolve diagnostic endpoint for dry-run resolution.

Review fix 1 (MED): HasNVMeCapableServer now derives NVMe capability
from server-level heartbeat attribute (block_nvme_addr proto field)
instead of scanning volume entries. Fixes false "no NVMe" warning on
fresh clusters with NVMe-capable servers but no volumes yet.

Review fix 2 (LOW): /block/volume/resolve no longer proxied to leader —
read-only diagnostic endpoint can be served by any master.

Engine fix: ReadLBA retry loop closes stale dirty-map race when WAL
entry is recycled between lookup and read.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
683969086c
  1. 2
      weed/pb/master.proto
  2. 18
      weed/pb/master_pb/master.pb.go
  3. 46
      weed/server/master_block_registry.go
  4. 16
      weed/server/master_block_registry_test.go
  5. 2
      weed/server/master_grpc_server.go
  6. 4
      weed/server/master_grpc_server_block_test.go
  7. 1
      weed/server/master_server.go
  8. 65
      weed/server/master_server_handlers_block.go
  9. 590
      weed/server/qa_block_cp11b1_adversarial_test.go
  10. 6
      weed/server/qa_block_cp62_test.go
  11. 2
      weed/server/qa_block_cp63_test.go
  12. 22
      weed/server/qa_block_cp82_adversarial_test.go
  13. 8
      weed/server/qa_block_cp831_adversarial_test.go
  14. 4
      weed/server/qa_block_durability_test.go
  15. 16
      weed/server/qa_block_expand_adversarial_test.go
  16. 22
      weed/server/qa_block_nvme_publication_test.go
  17. 1
      weed/server/volume_grpc_client_to_master.go
  18. 21
      weed/storage/blockvol/blockapi/client.go
  19. 22
      weed/storage/blockvol/blockapi/types.go
  20. 29
      weed/storage/blockvol/blockvol.go
  21. 174
      weed/storage/blockvol/preset.go
  22. 185
      weed/storage/blockvol/preset_test.go

2
weed/pb/master.proto

@ -110,6 +110,8 @@ message Heartbeat {
repeated BlockVolumeShortInfoMessage new_block_volumes = 25;
repeated BlockVolumeShortInfoMessage deleted_block_volumes = 26;
bool has_no_block_volumes = 27;
// server-level NVMe/TCP target address (empty if NVMe disabled on this VS)
string block_nvme_addr = 28;
}
message HeartbeatResponse {

18
weed/pb/master_pb/master.pb.go

@ -54,8 +54,10 @@ type Heartbeat struct {
NewBlockVolumes []*BlockVolumeShortInfoMessage `protobuf:"bytes,25,rep,name=new_block_volumes,json=newBlockVolumes,proto3" json:"new_block_volumes,omitempty"`
DeletedBlockVolumes []*BlockVolumeShortInfoMessage `protobuf:"bytes,26,rep,name=deleted_block_volumes,json=deletedBlockVolumes,proto3" json:"deleted_block_volumes,omitempty"`
HasNoBlockVolumes bool `protobuf:"varint,27,opt,name=has_no_block_volumes,json=hasNoBlockVolumes,proto3" json:"has_no_block_volumes,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
// server-level NVMe/TCP target address (empty if NVMe disabled on this VS)
BlockNvmeAddr string `protobuf:"bytes,28,opt,name=block_nvme_addr,json=blockNvmeAddr,proto3" json:"block_nvme_addr,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *Heartbeat) Reset() {
@ -256,6 +258,13 @@ func (x *Heartbeat) GetHasNoBlockVolumes() bool {
return false
}
func (x *Heartbeat) GetBlockNvmeAddr() string {
if x != nil {
return x.BlockNvmeAddr
}
return ""
}
type HeartbeatResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
VolumeSizeLimit uint64 `protobuf:"varint,1,opt,name=volume_size_limit,json=volumeSizeLimit,proto3" json:"volume_size_limit,omitempty"`
@ -5478,7 +5487,7 @@ var File_master_proto protoreflect.FileDescriptor
const file_master_proto_rawDesc = "" +
"\n" +
"\fmaster.proto\x12\tmaster_pb\x1a\x13volume_server.proto\"\xbd\n" +
"\fmaster.proto\x12\tmaster_pb\x1a\x13volume_server.proto\"\xe5\n" +
"\n" +
"\tHeartbeat\x12\x0e\n" +
"\x02ip\x18\x01 \x01(\tR\x02ip\x12\x12\n" +
@ -5510,7 +5519,8 @@ const file_master_proto_rawDesc = "" +
"\x12block_volume_infos\x18\x18 \x03(\v2!.master_pb.BlockVolumeInfoMessageR\x10blockVolumeInfos\x12R\n" +
"\x11new_block_volumes\x18\x19 \x03(\v2&.master_pb.BlockVolumeShortInfoMessageR\x0fnewBlockVolumes\x12Z\n" +
"\x15deleted_block_volumes\x18\x1a \x03(\v2&.master_pb.BlockVolumeShortInfoMessageR\x13deletedBlockVolumes\x12/\n" +
"\x14has_no_block_volumes\x18\x1b \x01(\bR\x11hasNoBlockVolumes\x1aB\n" +
"\x14has_no_block_volumes\x18\x1b \x01(\bR\x11hasNoBlockVolumes\x12&\n" +
"\x0fblock_nvme_addr\x18\x1c \x01(\tR\rblockNvmeAddr\x1aB\n" +
"\x14MaxVolumeCountsEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\rR\x05value:\x028\x01\"\xa9\x03\n" +

46
weed/server/master_block_registry.go

@ -79,6 +79,9 @@ type BlockVolumeEntry struct {
// CP8-3-1: Durability mode.
DurabilityMode string // "best_effort", "sync_all", "sync_quorum"
// CP11B-1: Provisioning preset (control-plane metadata only).
Preset string // "database", "general", "throughput", or ""
// Lease tracking for failover (CP6-3 F2).
LastLeaseGrant time.Time
LeaseTTL time.Duration
@ -137,7 +140,7 @@ type BlockVolumeRegistry struct {
mu sync.RWMutex
volumes map[string]*BlockVolumeEntry // keyed by name
byServer map[string]map[string]bool // server -> set of volume names
blockServers map[string]bool // servers known to support block volumes
blockServers map[string]*blockServerInfo // servers known to support block volumes
// Promotion eligibility: max WAL LSN lag for replica to be promotable.
promotionLSNTolerance uint64
@ -153,12 +156,18 @@ type BlockVolumeRegistry struct {
type inflightEntry struct{}
// blockServerInfo tracks server-level capabilities reported via heartbeat.
type blockServerInfo struct {
NvmeAddr string // NVMe/TCP listen address; empty if NVMe disabled
}
// NewBlockVolumeRegistry creates an empty registry.
func NewBlockVolumeRegistry() *BlockVolumeRegistry {
return &BlockVolumeRegistry{
volumes: make(map[string]*BlockVolumeEntry),
byServer: make(map[string]map[string]bool),
blockServers: make(map[string]bool),
blockServers: make(map[string]*blockServerInfo),
promotionLSNTolerance: DefaultPromotionLSNTolerance,
}
}
@ -302,12 +311,12 @@ func (r *BlockVolumeRegistry) ListByServer(server string) []*BlockVolumeEntry {
// Called on the first heartbeat from a volume server.
// Marks reported volumes as Active, removes entries for this server
// that are not reported (stale).
func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master_pb.BlockVolumeInfoMessage) {
func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master_pb.BlockVolumeInfoMessage, nvmeAddr string) {
r.mu.Lock()
defer r.mu.Unlock()
// Mark server as block-capable since it sent block volume info.
r.blockServers[server] = true
// Mark server as block-capable and record server-level NVMe capability.
r.blockServers[server] = &blockServerInfo{NvmeAddr: nvmeAddr}
// Build set of reported paths.
reported := make(map[string]*master_pb.BlockVolumeInfoMessage, len(infos))
@ -922,7 +931,7 @@ func (r *BlockVolumeRegistry) evaluatePromotionLocked(entry *BlockVolumeEntry) P
continue
}
// Gate 4: server must be alive (in blockServers set) — B-12 fix.
if !r.blockServers[ri.Server] {
if r.blockServers[ri.Server] == nil {
result.Rejections = append(result.Rejections, PromotionRejection{
Server: ri.Server,
Reason: "server_dead",
@ -1073,7 +1082,7 @@ func (r *BlockVolumeRegistry) evaluateManualPromotionLocked(entry *BlockVolumeEn
}
// Primary-alive gate (soft — skipped when force=true).
if !force && r.blockServers[entry.VolumeServer] {
if !force && r.blockServers[entry.VolumeServer] != nil {
result.Reason = "primary_alive"
return result
}
@ -1152,7 +1161,7 @@ func (r *BlockVolumeRegistry) evaluateManualPromotionLocked(entry *BlockVolumeEn
}
// Hard gate: server must be alive (in blockServers set).
if !r.blockServers[ri.Server] {
if r.blockServers[ri.Server] == nil {
result.Rejections = append(result.Rejections, PromotionRejection{
Server: ri.Server,
Reason: "server_dead",
@ -1227,7 +1236,9 @@ func (r *BlockVolumeRegistry) ManualPromote(name, targetServer string, force boo
// MarkBlockCapable records that the given server supports block volumes.
func (r *BlockVolumeRegistry) MarkBlockCapable(server string) {
r.mu.Lock()
r.blockServers[server] = true
if r.blockServers[server] == nil {
r.blockServers[server] = &blockServerInfo{}
}
r.mu.Unlock()
}
@ -1330,7 +1341,7 @@ func (r *BlockVolumeRegistry) ServerSummaries() []BlockServerSummary {
func (r *BlockVolumeRegistry) IsBlockCapable(server string) bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.blockServers[server]
return r.blockServers[server] != nil
}
// VolumesWithDeadPrimary returns names of volumes where the given server is a replica
@ -1354,13 +1365,26 @@ func (r *BlockVolumeRegistry) VolumesWithDeadPrimary(replicaServer string) []str
continue
}
// Check if the primary server is dead.
if !r.blockServers[entry.VolumeServer] {
if r.blockServers[entry.VolumeServer] == nil {
orphaned = append(orphaned, name)
}
}
return orphaned
}
// HasNVMeCapableServer returns true if any registered block-capable server
// has reported a non-empty NVMe address via heartbeat.
func (r *BlockVolumeRegistry) HasNVMeCapableServer() bool {
r.mu.RLock()
defer r.mu.RUnlock()
for _, info := range r.blockServers {
if info != nil && info.NvmeAddr != "" {
return true
}
}
return false
}
// BlockCapableServers returns the list of servers known to support block volumes.
func (r *BlockVolumeRegistry) BlockCapableServers() []string {
r.mu.RLock()

16
weed/server/master_block_registry_test.go

@ -93,7 +93,7 @@ func TestRegistry_UpdateFullHeartbeat(t *testing.T) {
// Full heartbeat reports only vol1 (vol2 is stale).
r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/v1.blk", Epoch: 5, Role: 1},
})
}, "")
// vol1 should be Active.
e1, ok := r.Lookup("vol1")
@ -149,7 +149,7 @@ func TestRegistry_PendingToActive(t *testing.T) {
// Full heartbeat confirms the volume.
r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/v1.blk", Epoch: 1, Role: 1},
})
}, "")
e, _ := r.Lookup("vol1")
if e.Status != StatusActive {
@ -241,7 +241,7 @@ func TestRegistry_FullHeartbeatUpdatesSizeBytes(t *testing.T) {
// Heartbeat with updated size (online resize).
r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/v1.blk", VolumeSize: 2 << 30, Epoch: 1, Role: 1},
})
}, "")
e, _ := r.Lookup("vol1")
if e.SizeBytes != 2<<30 {
@ -421,7 +421,7 @@ func TestFullHeartbeat_UpdatesReplicaAddrs(t *testing.T) {
ReplicaDataAddr: "10.0.0.2:14260",
ReplicaCtrlAddr: "10.0.0.2:14261",
},
})
}, "")
entry, ok := r.Lookup("vol1")
if !ok {
@ -730,7 +730,7 @@ func TestRegistry_FullHeartbeat_UpdatesHealthScore(t *testing.T) {
ScrubErrors: 2,
WalHeadLsn: 500,
},
})
}, "")
e, _ := r.Lookup("vol1")
if e.HealthScore != 0.85 {
@ -757,7 +757,7 @@ func TestRegistry_ReplicaHeartbeat_DoesNotDeleteVolume(t *testing.T) {
// Replica sends heartbeat reporting its path.
r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 1, Role: 2},
})
}, "")
// Volume must still exist with primary intact.
e, ok := r.Lookup("vol1")
@ -784,7 +784,7 @@ func TestRegistry_ReplicaHeartbeat_StaleReplicaRemoved(t *testing.T) {
})
// replica1 heartbeat does NOT report vol1 path → stale replica.
r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{})
r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{}, "")
// Volume still exists, but replica1 removed.
e, ok := r.Lookup("vol1")
@ -813,7 +813,7 @@ func TestRegistry_ReplicaHeartbeat_ReconstructsAfterRestart(t *testing.T) {
// Replica heartbeat arrives — vol1 exists but has no record of this server.
r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 1, Role: 2, HealthScore: 0.95, WalHeadLsn: 42},
})
}, "")
// vol1 should now have replica1 in Replicas[].
e, ok := r.Lookup("vol1")

2
weed/server/master_grpc_server.go

@ -277,7 +277,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
// (BlockVolumeInfos on first heartbeat) or deltas (NewBlockVolumes/DeletedBlockVolumes
// on subsequent heartbeats), never both in the same message.
if len(heartbeat.BlockVolumeInfos) > 0 || heartbeat.HasNoBlockVolumes {
ms.blockRegistry.UpdateFullHeartbeat(dn.Url(), heartbeat.BlockVolumeInfos)
ms.blockRegistry.UpdateFullHeartbeat(dn.Url(), heartbeat.BlockVolumeInfos, heartbeat.BlockNvmeAddr)
// T2 (B-06): After updating registry from heartbeat, check if this server
// is a replica for any volume whose primary is dead. If so, promote.
ms.reevaluateOrphanedPrimaries(dn.Url())

4
weed/server/master_grpc_server_block_test.go

@ -1542,7 +1542,7 @@ func TestMaster_ExpandCoordinated_HeartbeatSuppressedAfterPartialCommit(t *testi
Epoch: 1,
Role: 1,
},
})
}, "")
// Registry size must still be the OLD size — heartbeat must not leak the new size.
entry, _ = ms.blockRegistry.Lookup("hb-vol")
@ -1778,7 +1778,7 @@ func TestMaster_ExpandCoordinated_B10_HeartbeatDoesNotDeleteDuringExpand(t *test
// the entry from the registry.
ms.blockRegistry.UpdateFullHeartbeat(primaryServer, []*master_pb.BlockVolumeInfoMessage{
// Empty: primary restarted and hasn't loaded this volume yet.
})
}, "")
// Entry must still exist — expand is in progress.
_, ok := ms.blockRegistry.Lookup("b10-vol")

1
weed/server/master_server.go

@ -227,6 +227,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
r.HandleFunc("/block/volume/{name}", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeDeleteHandler)))).Methods("DELETE")
r.HandleFunc("/block/volume/{name}", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeLookupHandler))).Methods("GET")
r.HandleFunc("/block/volumes", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeListHandler))).Methods("GET")
r.HandleFunc("/block/volume/resolve", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeResolveHandler))).Methods("POST")
r.HandleFunc("/block/volume/{name}/expand", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeExpandHandler)))).Methods("POST")
r.HandleFunc("/block/volume/{name}/preflight", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumePreflightHandler))).Methods("GET")
r.HandleFunc("/block/volume/{name}/promote", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumePromoteHandler)))).Methods("POST")

65
weed/server/master_server_handlers_block.go

@ -13,6 +13,16 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/blockapi"
)
// buildEnvironmentInfo constructs a blockvol.EnvironmentInfo from registry state.
func (ms *MasterServer) buildEnvironmentInfo() blockvol.EnvironmentInfo {
return blockvol.EnvironmentInfo{
NVMeAvailable: ms.blockRegistry.HasNVMeCapableServer(),
ServerCount: len(ms.blockRegistry.BlockCapableServers()),
WALSizeDefault: 64 << 20, // engine default
BlockSizeDefault: 4096, // engine default
}
}
// blockVolumeCreateHandler handles POST /block/volume.
func (ms *MasterServer) blockVolumeCreateHandler(w http.ResponseWriter, r *http.Request) {
var req blockapi.CreateVolumeRequest
@ -27,29 +37,32 @@ func (ms *MasterServer) blockVolumeCreateHandler(w http.ResponseWriter, r *http.
replicaPlacement = "000"
}
// Pre-validate durability_mode (cosmetic — real validation is in gRPC handler).
if req.DurabilityMode != "" {
if _, perr := blockvol.ParseDurabilityMode(req.DurabilityMode); perr != nil {
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("invalid durability_mode: %w", perr))
return
}
// Resolve preset + overrides.
env := ms.buildEnvironmentInfo()
resolved := blockvol.ResolvePolicy(blockvol.PresetName(req.Preset),
req.DurabilityMode, req.ReplicaFactor, req.DiskType, env)
if len(resolved.Errors) > 0 {
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("%s", resolved.Errors[0]))
return
}
// Use resolved values for the gRPC call.
resp, err := ms.CreateBlockVolume(r.Context(), &master_pb.CreateBlockVolumeRequest{
Name: req.Name,
SizeBytes: req.SizeBytes,
DiskType: req.DiskType,
DurabilityMode: req.DurabilityMode,
ReplicaFactor: uint32(req.ReplicaFactor),
DiskType: resolved.Policy.DiskType,
DurabilityMode: resolved.Policy.DurabilityMode,
ReplicaFactor: uint32(resolved.Policy.ReplicaFactor),
})
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
// Store replica_placement on the registry entry.
// Store replica_placement and preset on the registry entry.
if entry, ok := ms.blockRegistry.Lookup(resp.VolumeId); ok {
entry.ReplicaPlacement = replicaPlacement
entry.Preset = req.Preset
}
// Look up the full entry to populate all fields.
@ -67,6 +80,37 @@ func (ms *MasterServer) blockVolumeCreateHandler(w http.ResponseWriter, r *http.
writeJsonQuiet(w, r, http.StatusOK, info)
}
// blockVolumeResolveHandler handles POST /block/volume/resolve.
// Diagnostic endpoint: always returns 200, even with errors[].
func (ms *MasterServer) blockVolumeResolveHandler(w http.ResponseWriter, r *http.Request) {
var req blockapi.CreateVolumeRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("invalid request body: %w", err))
return
}
env := ms.buildEnvironmentInfo()
resolved := blockvol.ResolvePolicy(blockvol.PresetName(req.Preset),
req.DurabilityMode, req.ReplicaFactor, req.DiskType, env)
resp := blockapi.ResolvedPolicyResponse{
Policy: blockapi.ResolvedPolicyView{
Preset: string(resolved.Policy.Preset),
DurabilityMode: resolved.Policy.DurabilityMode,
ReplicaFactor: resolved.Policy.ReplicaFactor,
DiskType: resolved.Policy.DiskType,
TransportPreference: resolved.Policy.TransportPref,
WorkloadHint: resolved.Policy.WorkloadHint,
WALSizeRecommended: resolved.Policy.WALSizeRecommended,
StorageProfile: resolved.Policy.StorageProfile,
},
Overrides: resolved.Overrides,
Warnings: resolved.Warnings,
Errors: resolved.Errors,
}
writeJsonQuiet(w, r, http.StatusOK, resp)
}
// blockVolumeDeleteHandler handles DELETE /block/volume/{name}.
func (ms *MasterServer) blockVolumeDeleteHandler(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
@ -333,6 +377,7 @@ func entryToVolumeInfo(e *BlockVolumeEntry) blockapi.VolumeInfo {
HealthScore: e.HealthScore,
ReplicaDegraded: e.ReplicaDegraded,
DurabilityMode: durMode,
Preset: e.Preset,
NvmeAddr: e.NvmeAddr,
NQN: e.NQN,
}

590
weed/server/qa_block_cp11b1_adversarial_test.go

@ -0,0 +1,590 @@
package weed_server
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/blockapi"
)
// ============================================================
// CP11B-1 QA Adversarial Tests
//
// Provisioning Presets + Resolved Policy View
// ============================================================
// qaPresetMaster creates a MasterServer with stub allocators for preset tests.
func qaPresetMaster(t *testing.T) *MasterServer {
t.Helper()
ms := &MasterServer{
blockRegistry: NewBlockVolumeRegistry(),
blockAssignmentQueue: NewBlockAssignmentQueue(),
blockFailover: newBlockFailoverState(),
}
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
return &blockAllocResult{
Path: fmt.Sprintf("/data/%s.blk", name),
IQN: fmt.Sprintf("iqn.2024.test:%s", name),
ISCSIAddr: server + ":3260",
ReplicaDataAddr: server + ":14260",
ReplicaCtrlAddr: server + ":14261",
RebuildListenAddr: server + ":15000",
}, nil
}
ms.blockVSDelete = func(ctx context.Context, server string, name string) error {
return nil
}
ms.blockVSExpand = func(ctx context.Context, server string, name string, newSize uint64) (uint64, error) {
return newSize, nil
}
ms.blockVSPrepareExpand = func(ctx context.Context, server string, name string, newSize, expandEpoch uint64) error {
return nil
}
ms.blockVSCommitExpand = func(ctx context.Context, server string, name string, expandEpoch uint64) (uint64, error) {
return 2 << 30, nil
}
ms.blockVSCancelExpand = func(ctx context.Context, server string, name string, expandEpoch uint64) error {
return nil
}
ms.blockRegistry.MarkBlockCapable("vs1:9333")
ms.blockRegistry.MarkBlockCapable("vs2:9333")
return ms
}
// --- QA-CP11B1-1: Database preset produces correct defaults ---
func TestQA_CP11B1_DatabasePreset_Defaults(t *testing.T) {
r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "", 0, "", blockvol.EnvironmentInfo{
NVMeAvailable: true, ServerCount: 3, WALSizeDefault: 128 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
p := r.Policy
if p.DurabilityMode != "sync_all" {
t.Errorf("durability_mode = %q, want sync_all", p.DurabilityMode)
}
if p.ReplicaFactor != 2 {
t.Errorf("replica_factor = %d, want 2", p.ReplicaFactor)
}
if p.DiskType != "ssd" {
t.Errorf("disk_type = %q, want ssd", p.DiskType)
}
if p.TransportPref != "nvme" {
t.Errorf("transport = %q, want nvme", p.TransportPref)
}
if p.WALSizeRecommended != 128<<20 {
t.Errorf("wal_rec = %d, want %d", p.WALSizeRecommended, 128<<20)
}
if p.StorageProfile != "single" {
t.Errorf("storage_profile = %q, want single", p.StorageProfile)
}
if len(r.Overrides) != 0 {
t.Errorf("overrides = %v, want empty", r.Overrides)
}
}
// --- QA-CP11B1-2: Override precedence — durability wins over preset ---
func TestQA_CP11B1_OverridePrecedence_DurabilityWins(t *testing.T) {
r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "best_effort", 0, "", blockvol.EnvironmentInfo{
NVMeAvailable: true, ServerCount: 2, WALSizeDefault: 128 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
if r.Policy.DurabilityMode != "best_effort" {
t.Errorf("durability_mode = %q, want best_effort (override)", r.Policy.DurabilityMode)
}
found := false
for _, o := range r.Overrides {
if o == "durability_mode" {
found = true
}
}
if !found {
t.Errorf("overrides = %v, want durability_mode present", r.Overrides)
}
}
// --- QA-CP11B1-3: Invalid preset rejected ---
func TestQA_CP11B1_InvalidPreset_Rejected(t *testing.T) {
r := blockvol.ResolvePolicy("nosuch", "", 0, "", blockvol.EnvironmentInfo{})
if len(r.Errors) == 0 {
t.Fatal("expected error for unknown preset")
}
if !strings.Contains(r.Errors[0], "unknown preset") {
t.Errorf("error = %q, want to contain 'unknown preset'", r.Errors[0])
}
}
// --- QA-CP11B1-4: sync_quorum + RF=2 rejected ---
func TestQA_CP11B1_SyncQuorum_RF2_Rejected(t *testing.T) {
r := blockvol.ResolvePolicy("", "sync_quorum", 2, "", blockvol.EnvironmentInfo{
ServerCount: 3, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) == 0 {
t.Fatal("expected error for sync_quorum + RF=2")
}
if !strings.Contains(r.Errors[0], "replica_factor >= 3") {
t.Errorf("error = %q, want sync_quorum RF constraint", r.Errors[0])
}
}
// --- QA-CP11B1-5: NVMe preferred but unavailable → warning ---
func TestQA_CP11B1_NVMePref_NoNVMe_Warning(t *testing.T) {
r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "", 0, "", blockvol.EnvironmentInfo{
NVMeAvailable: false, ServerCount: 2, WALSizeDefault: 128 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
found := false
for _, w := range r.Warnings {
if strings.Contains(w, "NVMe") {
found = true
}
}
if !found {
t.Errorf("expected NVMe warning, got: %v", r.Warnings)
}
}
// --- QA-CP11B1-6: No preset, explicit fields → backward compat ---
func TestQA_CP11B1_NoPreset_BackwardCompat(t *testing.T) {
r := blockvol.ResolvePolicy("", "sync_all", 2, "hdd", blockvol.EnvironmentInfo{
ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
if r.Policy.Preset != "" {
t.Errorf("preset = %q, want empty", r.Policy.Preset)
}
if r.Policy.DurabilityMode != "sync_all" {
t.Errorf("durability_mode = %q, want sync_all", r.Policy.DurabilityMode)
}
if r.Policy.ReplicaFactor != 2 {
t.Errorf("replica_factor = %d, want 2", r.Policy.ReplicaFactor)
}
if r.Policy.DiskType != "hdd" {
t.Errorf("disk_type = %q, want hdd", r.Policy.DiskType)
}
}
// --- QA-CP11B1-7: Resolve endpoint returns 200 with correct fields ---
func TestQA_CP11B1_ResolveHandler_HTTP(t *testing.T) {
ms := qaPresetMaster(t)
body, _ := json.Marshal(blockapi.CreateVolumeRequest{
Preset: "database",
})
req := httptest.NewRequest(http.MethodPost, "/block/volume/resolve", bytes.NewReader(body))
w := httptest.NewRecorder()
ms.blockVolumeResolveHandler(w, req)
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want 200; body: %s", w.Code, w.Body.String())
}
var resp blockapi.ResolvedPolicyResponse
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("decode: %v", err)
}
if resp.Policy.DurabilityMode != "sync_all" {
t.Errorf("durability_mode = %q, want sync_all", resp.Policy.DurabilityMode)
}
if resp.Policy.ReplicaFactor != 2 {
t.Errorf("replica_factor = %d, want 2", resp.Policy.ReplicaFactor)
}
if resp.Policy.StorageProfile != "single" {
t.Errorf("storage_profile = %q, want single", resp.Policy.StorageProfile)
}
if resp.Policy.TransportPreference != "nvme" {
t.Errorf("transport = %q, want nvme", resp.Policy.TransportPreference)
}
}
// --- QA-CP11B1-8: Create with preset stores preset on registry entry ---
func TestQA_CP11B1_CreateWithPreset_StoresPreset(t *testing.T) {
ms := qaPresetMaster(t)
ctx := context.Background()
_, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "preset-vol",
SizeBytes: 1 << 30,
DurabilityMode: "best_effort",
ReplicaFactor: 2,
})
if err != nil {
t.Fatalf("create: %v", err)
}
// Simulate what the HTTP handler does after create.
if entry, ok := ms.blockRegistry.Lookup("preset-vol"); ok {
entry.Preset = "general"
}
entry, ok := ms.blockRegistry.Lookup("preset-vol")
if !ok {
t.Fatal("volume not found in registry")
}
if entry.Preset != "general" {
t.Errorf("preset = %q, want general", entry.Preset)
}
// Verify entryToVolumeInfo propagates preset.
info := entryToVolumeInfo(entry)
if info.Preset != "general" {
t.Errorf("VolumeInfo.Preset = %q, want general", info.Preset)
}
}
// --- QA-CP11B1-9: RF exceeds servers → warning ---
func TestQA_CP11B1_RF_ExceedsServers_Warning(t *testing.T) {
r := blockvol.ResolvePolicy(blockvol.PresetGeneral, "", 3, "", blockvol.EnvironmentInfo{
ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
found := false
for _, w := range r.Warnings {
if strings.Contains(w, "exceeds available servers") {
found = true
}
}
if !found {
t.Errorf("expected RF>servers warning, got: %v", r.Warnings)
}
}
// --- QA-CP11B1-10: All response fields present and typed correctly ---
func TestQA_CP11B1_StableOutputFields(t *testing.T) {
r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "", 0, "", blockvol.EnvironmentInfo{
NVMeAvailable: true, ServerCount: 3, WALSizeDefault: 128 << 20, BlockSizeDefault: 4096,
})
// Verify every field in VolumePolicy is populated.
p := r.Policy
if p.Preset != blockvol.PresetDatabase {
t.Errorf("preset = %q", p.Preset)
}
if p.DurabilityMode == "" {
t.Error("durability_mode is empty")
}
if p.ReplicaFactor == 0 {
t.Error("replica_factor is 0")
}
if p.TransportPref == "" {
t.Error("transport_preference is empty")
}
if p.WorkloadHint == "" {
t.Error("workload_hint is empty")
}
if p.WALSizeRecommended == 0 {
t.Error("wal_size_recommended is 0")
}
if p.StorageProfile == "" {
t.Error("storage_profile is empty")
}
// Verify JSON round-trip preserves all fields.
data, err := json.Marshal(r)
if err != nil {
t.Fatalf("marshal: %v", err)
}
var r2 blockvol.ResolvedPolicy
if err := json.Unmarshal(data, &r2); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if r2.Policy.DurabilityMode != p.DurabilityMode {
t.Errorf("round-trip durability = %q, want %q", r2.Policy.DurabilityMode, p.DurabilityMode)
}
if r2.Policy.ReplicaFactor != p.ReplicaFactor {
t.Errorf("round-trip RF = %d, want %d", r2.Policy.ReplicaFactor, p.ReplicaFactor)
}
}
// --- QA-CP11B1-11: Create + Resolve parity ---
// Create with preset + overrides produces the same effective values
// as /resolve for the same request. Catches drift between resolve and create.
func TestQA_CP11B1_CreateResolve_Parity(t *testing.T) {
ms := qaPresetMaster(t)
// Resolve first.
preset := blockvol.PresetDatabase
durOverride := "best_effort"
rfOverride := 0
diskOverride := ""
env := ms.buildEnvironmentInfo()
resolved := blockvol.ResolvePolicy(preset, durOverride, rfOverride, diskOverride, env)
if len(resolved.Errors) > 0 {
t.Fatalf("resolve errors: %v", resolved.Errors)
}
// Create using the same resolved values (simulating what the handler does).
ctx := context.Background()
_, err := ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
Name: "parity-vol",
SizeBytes: 1 << 30,
DiskType: resolved.Policy.DiskType,
DurabilityMode: resolved.Policy.DurabilityMode,
ReplicaFactor: uint32(resolved.Policy.ReplicaFactor),
})
if err != nil {
t.Fatalf("create: %v", err)
}
entry, ok := ms.blockRegistry.Lookup("parity-vol")
if !ok {
t.Fatal("volume not found")
}
// Check parity: the created volume should have the resolved durability + RF.
if entry.DurabilityMode != resolved.Policy.DurabilityMode {
t.Errorf("durability: entry=%q, resolved=%q", entry.DurabilityMode, resolved.Policy.DurabilityMode)
}
if entry.ReplicaFactor != resolved.Policy.ReplicaFactor {
t.Errorf("RF: entry=%d, resolved=%d", entry.ReplicaFactor, resolved.Policy.ReplicaFactor)
}
}
// ============================================================
// CP11B-1 Review Round: Additional Adversarial Tests
// ============================================================
// QA-CP11B1-12: NVMe capability from heartbeat, not volumes.
// On a fresh cluster with NVMe-capable servers but no volumes,
// database preset should NOT warn about missing NVMe.
func TestQA_CP11B1_NVMeFromHeartbeat_FreshCluster(t *testing.T) {
r := NewBlockVolumeRegistry()
// Simulate heartbeat from NVMe-capable server (no volumes created yet).
r.UpdateFullHeartbeat("vs1:9333", nil, "192.168.1.10:4420")
if !r.HasNVMeCapableServer() {
t.Fatal("HasNVMeCapableServer should be true after heartbeat with NVMe addr")
}
// Resolve database preset — should NOT warn about NVMe.
env := blockvol.EnvironmentInfo{
NVMeAvailable: r.HasNVMeCapableServer(),
ServerCount: 1,
WALSizeDefault: 128 << 20,
}
resolved := blockvol.ResolvePolicy(blockvol.PresetDatabase, "", 0, "", env)
for _, w := range resolved.Warnings {
if strings.Contains(w, "NVMe") {
t.Fatalf("should NOT warn about NVMe on fresh cluster with NVMe server: %s", w)
}
}
}
// QA-CP11B1-13: NVMe capability lost after server unmark.
// When NVMe server disconnects, HasNVMeCapableServer should return false.
func TestQA_CP11B1_NVMeLostAfterUnmark(t *testing.T) {
r := NewBlockVolumeRegistry()
r.UpdateFullHeartbeat("nvme-vs:9333", nil, "10.0.0.1:4420")
r.UpdateFullHeartbeat("plain-vs:9333", nil, "")
if !r.HasNVMeCapableServer() {
t.Fatal("should have NVMe before unmark")
}
// NVMe server disconnects.
r.UnmarkBlockCapable("nvme-vs:9333")
if r.HasNVMeCapableServer() {
t.Fatal("should NOT have NVMe after NVMe server disconnected")
}
}
// QA-CP11B1-14: MarkBlockCapable does NOT overwrite NVMe addr set by heartbeat.
func TestQA_CP11B1_MarkBlockCapable_PreservesNVMe(t *testing.T) {
r := NewBlockVolumeRegistry()
// Heartbeat sets NVMe addr.
r.UpdateFullHeartbeat("vs1:9333", nil, "10.0.0.1:4420")
// MarkBlockCapable is called again (e.g. from another code path).
r.MarkBlockCapable("vs1:9333")
// NVMe addr should NOT be cleared.
if !r.HasNVMeCapableServer() {
t.Fatal("MarkBlockCapable should not clear NVMe addr set by heartbeat")
}
}
// QA-CP11B1-15: Resolve with invalid durability_mode string returns error.
func TestQA_CP11B1_InvalidDurabilityString_Rejected(t *testing.T) {
r := blockvol.ResolvePolicy(blockvol.PresetGeneral, "turbo_sync", 0, "", blockvol.EnvironmentInfo{
ServerCount: 2, WALSizeDefault: 64 << 20,
})
if len(r.Errors) == 0 {
t.Fatal("expected error for invalid durability_mode string")
}
if !strings.Contains(r.Errors[0], "invalid durability_mode") {
t.Errorf("error = %q, want 'invalid durability_mode'", r.Errors[0])
}
}
// QA-CP11B1-16: Override disk_type on database preset (ssd → hdd).
func TestQA_CP11B1_OverrideDiskType(t *testing.T) {
r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "", 0, "hdd", blockvol.EnvironmentInfo{
NVMeAvailable: true, ServerCount: 2, WALSizeDefault: 128 << 20,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
if r.Policy.DiskType != "hdd" {
t.Errorf("disk_type = %q, want hdd (override)", r.Policy.DiskType)
}
if !containsStr(r.Overrides, "disk_type") {
t.Errorf("overrides = %v, want disk_type present", r.Overrides)
}
// Preset default was "ssd" — verify it was overridden, not merged.
if r.Policy.Preset != blockvol.PresetDatabase {
t.Errorf("preset = %q, want database", r.Policy.Preset)
}
}
// QA-CP11B1-17: All three overrides at once.
func TestQA_CP11B1_AllOverridesAtOnce(t *testing.T) {
r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "best_effort", 3, "hdd", blockvol.EnvironmentInfo{
NVMeAvailable: true, ServerCount: 3, WALSizeDefault: 128 << 20,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
if r.Policy.DurabilityMode != "best_effort" {
t.Errorf("durability = %q, want best_effort", r.Policy.DurabilityMode)
}
if r.Policy.ReplicaFactor != 3 {
t.Errorf("RF = %d, want 3", r.Policy.ReplicaFactor)
}
if r.Policy.DiskType != "hdd" {
t.Errorf("disk_type = %q, want hdd", r.Policy.DiskType)
}
if len(r.Overrides) != 3 {
t.Errorf("overrides count = %d, want 3: %v", len(r.Overrides), r.Overrides)
}
// Transport and workload_hint should still come from preset.
if r.Policy.TransportPref != "nvme" {
t.Errorf("transport = %q, want nvme (from preset, not overridable)", r.Policy.TransportPref)
}
}
// QA-CP11B1-18: Preset override that creates an incompatible combo.
// database preset (sync_all) + override RF=1 → warning (not error).
func TestQA_CP11B1_PresetOverride_SyncAll_RF1_Warning(t *testing.T) {
r := blockvol.ResolvePolicy(blockvol.PresetDatabase, "", 1, "", blockvol.EnvironmentInfo{
NVMeAvailable: true, ServerCount: 2, WALSizeDefault: 128 << 20,
})
// sync_all + RF=1 is valid (no error) but should warn.
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
found := false
for _, w := range r.Warnings {
if strings.Contains(w, "no replication benefit") {
found = true
}
}
if !found {
t.Errorf("expected 'no replication benefit' warning, got: %v", r.Warnings)
}
}
// QA-CP11B1-19: Zero ServerCount → RF warning suppressed (unknown cluster state).
func TestQA_CP11B1_ZeroServerCount_NoRFWarning(t *testing.T) {
r := blockvol.ResolvePolicy(blockvol.PresetGeneral, "", 3, "", blockvol.EnvironmentInfo{
ServerCount: 0, WALSizeDefault: 64 << 20,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
for _, w := range r.Warnings {
if strings.Contains(w, "exceeds available servers") {
t.Fatalf("should NOT warn about RF vs servers when ServerCount=0: %s", w)
}
}
}
// QA-CP11B1-20: Resolve endpoint returns errors[] for invalid preset (not HTTP error).
func TestQA_CP11B1_ResolveEndpoint_InvalidPreset_Returns200WithErrors(t *testing.T) {
ms := qaPresetMaster(t)
body, _ := json.Marshal(blockapi.CreateVolumeRequest{Preset: "bogus"})
req := httptest.NewRequest(http.MethodPost, "/block/volume/resolve", bytes.NewReader(body))
w := httptest.NewRecorder()
ms.blockVolumeResolveHandler(w, req)
// Resolve always returns 200, even with errors.
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want 200", w.Code)
}
var resp blockapi.ResolvedPolicyResponse
json.NewDecoder(w.Body).Decode(&resp)
if len(resp.Errors) == 0 {
t.Fatal("expected errors[] for invalid preset")
}
if !strings.Contains(resp.Errors[0], "unknown preset") {
t.Errorf("error = %q, want 'unknown preset'", resp.Errors[0])
}
}
// QA-CP11B1-21: Create handler rejects invalid preset with 400 (not 200).
func TestQA_CP11B1_CreateHandler_InvalidPreset_Returns400(t *testing.T) {
ms := qaPresetMaster(t)
body, _ := json.Marshal(blockapi.CreateVolumeRequest{
Name: "bad-preset-vol", SizeBytes: 1 << 30, Preset: "bogus",
})
req := httptest.NewRequest(http.MethodPost, "/block/volume", bytes.NewReader(body))
w := httptest.NewRecorder()
ms.blockVolumeCreateHandler(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("status = %d, want 400; body: %s", w.Code, w.Body.String())
}
}
// QA-CP11B1-22: Concurrent resolve calls don't panic.
func TestQA_CP11B1_ConcurrentResolve_NoPanic(t *testing.T) {
ms := qaPresetMaster(t)
// Simulate heartbeats to populate server info.
ms.blockRegistry.UpdateFullHeartbeat("vs1:9333", nil, "10.0.0.1:4420")
ms.blockRegistry.UpdateFullHeartbeat("vs2:9333", nil, "")
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
preset := []string{"database", "general", "throughput", "bogus", ""}[i%5]
go func(p string) {
defer wg.Done()
body, _ := json.Marshal(blockapi.CreateVolumeRequest{Preset: p})
req := httptest.NewRequest(http.MethodPost, "/block/volume/resolve", bytes.NewReader(body))
w := httptest.NewRecorder()
ms.blockVolumeResolveHandler(w, req)
}(preset)
}
wg.Wait()
// No panic = pass.
}
// containsStr is a helper (may already exist in the file from preset_test.go,
// but it's in a different package — blockvol vs weed_server).
func containsStr(ss []string, target string) bool {
for _, s := range ss {
if s == target {
return true
}
}
return false
}

6
weed/server/qa_block_cp62_test.go

@ -34,7 +34,7 @@ func TestQA_Reg_FullHeartbeatCrossTalk(t *testing.T) {
// Full heartbeat from s1 reports vol1 — should NOT affect s2's volumes.
r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/v1.blk", Epoch: 1},
})
}, "")
// vol2 on s2 should still exist.
if _, ok := r.Lookup("vol2"); !ok {
@ -49,7 +49,7 @@ func TestQA_Reg_FullHeartbeatEmptyServer(t *testing.T) {
r.Register(&BlockVolumeEntry{Name: "vol2", VolumeServer: "s1", Path: "/v2.blk", Status: StatusActive})
// Empty heartbeat from s1 (HasNoBlockVolumes=true, zero infos).
r.UpdateFullHeartbeat("s1", nil)
r.UpdateFullHeartbeat("s1", nil, "")
if _, ok := r.Lookup("vol1"); ok {
t.Error("BUG: vol1 should be removed after empty full heartbeat")
@ -78,7 +78,7 @@ func TestQA_Reg_ConcurrentHeartbeatAndRegister(t *testing.T) {
}()
r.UpdateFullHeartbeat("s1", []*master_pb.BlockVolumeInfoMessage{
{Path: fmt.Sprintf("/v%d.blk", i), Epoch: uint64(i)},
})
}, "")
}(i)
go func(i int) {
defer wg.Done()

2
weed/server/qa_block_cp63_test.go

@ -331,7 +331,7 @@ func TestQA_Reg_FullHeartbeatDoesNotClobberReplicaServer(t *testing.T) {
// Full heartbeat from vs1 — should NOT clear replica info.
r.UpdateFullHeartbeat("vs1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 1, Role: blockvol.RoleToWire(blockvol.RolePrimary), VolumeSize: 1 << 30},
})
}, "")
e, _ := r.Lookup("vol1")
if e.ReplicaServer != "vs2" {

22
weed/server/qa_block_cp82_adversarial_test.go

@ -197,7 +197,7 @@ func TestQA_CP82_ReplicaHeartbeatSpoof_DoesNotDeletePrimary(t *testing.T) {
Epoch: 1,
Role: blockvol.RoleToWire(blockvol.RoleReplica),
},
})
}, "")
entry, ok := ms.blockRegistry.Lookup("vol-spoof")
if !ok {
@ -209,7 +209,7 @@ func TestQA_CP82_ReplicaHeartbeatSpoof_DoesNotDeletePrimary(t *testing.T) {
// Now simulate a full heartbeat from vs2 with NO volumes at all.
// This should remove vs2 as replica but NOT delete the volume.
ms.blockRegistry.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{})
ms.blockRegistry.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{}, "")
entry, ok = ms.blockRegistry.Lookup("vol-spoof")
if !ok {
@ -381,7 +381,7 @@ func TestQA_CP82_MasterRestart_ReconstructReplicas_ThenFailover(t *testing.T) {
Role: blockvol.RoleToWire(blockvol.RolePrimary),
WalHeadLsn: 500,
},
})
}, "")
entry, ok := ms.blockRegistry.Lookup("vol-restart")
if !ok {
@ -400,7 +400,7 @@ func TestQA_CP82_MasterRestart_ReconstructReplicas_ThenFailover(t *testing.T) {
Role: blockvol.RoleToWire(blockvol.RoleReplica),
WalHeadLsn: 498,
},
})
}, "")
entry, _ = ms.blockRegistry.Lookup("vol-restart")
if len(entry.Replicas) == 0 {
@ -473,10 +473,10 @@ func TestQA_CP82_RF3_OneReplicaFlaps_UnderWriteLoad(t *testing.T) {
Role: blockvol.RoleToWire(blockvol.RoleReplica),
WalHeadLsn: 100,
},
})
}, "")
} else {
// Replica reports no volumes (simulates disconnect).
ms.blockRegistry.UpdateFullHeartbeat("vs3:9333", []*master_pb.BlockVolumeInfoMessage{})
ms.blockRegistry.UpdateFullHeartbeat("vs3:9333", []*master_pb.BlockVolumeInfoMessage{}, "")
}
}
}()
@ -494,7 +494,7 @@ func TestQA_CP82_RF3_OneReplicaFlaps_UnderWriteLoad(t *testing.T) {
Role: blockvol.RoleToWire(blockvol.RolePrimary),
WalHeadLsn: uint64(100 + i),
},
})
}, "")
}
}()
@ -511,7 +511,7 @@ func TestQA_CP82_RF3_OneReplicaFlaps_UnderWriteLoad(t *testing.T) {
Role: blockvol.RoleToWire(blockvol.RoleReplica),
WalHeadLsn: uint64(100 + i),
},
})
}, "")
}
}()
@ -626,7 +626,7 @@ func TestQA_CP82_ScrubConcurrentWrites_NoFalseCorruption(t *testing.T) {
WalHeadLsn: uint64(500 + i),
HealthScore: 1.0, // Clean — no false positives
},
})
}, "")
}
}()
@ -644,7 +644,7 @@ func TestQA_CP82_ScrubConcurrentWrites_NoFalseCorruption(t *testing.T) {
WalHeadLsn: uint64(500 + i),
HealthScore: 1.0,
},
})
}, "")
}
}()
@ -690,7 +690,7 @@ func TestQA_CP82_ScrubDetectsCorruption_HealthDrops_PromotionAvoids(t *testing.T
HealthScore: 0.3,
ScrubErrors: 5,
},
})
}, "")
// Trigger failover — vs3 (healthy) should be promoted, not vs2.
newEpoch, err := ms.blockRegistry.PromoteBestReplica("vol-corrupt")

8
weed/server/qa_block_cp831_adversarial_test.go

@ -179,7 +179,7 @@ func TestQA_CP831_HeartbeatEmptyMode_DoesNotOverwriteStrict(t *testing.T) {
VolumeSize: 1 << 30,
DurabilityMode: "", // empty — must not overwrite
},
})
}, "")
}
entry, ok := ms.blockRegistry.Lookup("hb-strict")
@ -220,7 +220,7 @@ func TestQA_CP831_HeartbeatNonEmptyMode_DoesUpdate(t *testing.T) {
VolumeSize: 1 << 30,
DurabilityMode: "sync_all",
},
})
}, "")
entry, ok := ms.blockRegistry.Lookup("hb-update")
if !ok {
@ -718,7 +718,7 @@ func TestQA_CP831_MasterRestart_AutoRegister_PreservesDurabilityMode(t *testing.
VolumeSize: 1 << 30,
DurabilityMode: "sync_all",
},
})
}, "")
entry, ok := ms.blockRegistry.Lookup("strict-vol")
if !ok {
@ -736,7 +736,7 @@ func TestQA_CP831_MasterRestart_AutoRegister_PreservesDurabilityMode(t *testing.
VolumeSize: 1 << 30,
// DurabilityMode omitted
},
})
}, "")
entry2, ok := ms2.blockRegistry.Lookup("legacy-vol")
if !ok {

4
weed/server/qa_block_durability_test.go

@ -316,7 +316,7 @@ func TestDurability_Heartbeat_ReportsMode(t *testing.T) {
VolumeSize: 1 << 30,
DurabilityMode: "",
},
})
}, "")
entry, ok = ms.blockRegistry.Lookup("hb-vol")
if !ok {
t.Fatal("volume not in registry after heartbeat")
@ -332,7 +332,7 @@ func TestDurability_Heartbeat_ReportsMode(t *testing.T) {
VolumeSize: 1 << 30,
DurabilityMode: "sync_all",
},
})
}, "")
entry, ok = ms.blockRegistry.Lookup("hb-vol")
if !ok {
t.Fatal("volume not in registry after second heartbeat")

16
weed/server/qa_block_expand_adversarial_test.go

@ -291,7 +291,7 @@ func TestQA_B10_RepeatedEmptyHeartbeats_DuringExpand(t *testing.T) {
// 10 empty heartbeats from the primary — each one would delete
// the entry without the B-10 guard.
for i := 0; i < 10; i++ {
ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{})
ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{}, "")
}
_, ok := ms.blockRegistry.Lookup("multi-hb")
@ -322,7 +322,7 @@ func TestQA_B10_ExpandFailed_HeartbeatStillProtected(t *testing.T) {
ms.blockRegistry.MarkExpandFailed("fail-hb")
// Empty heartbeat should not delete — ExpandFailed keeps ExpandInProgress=true.
ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{})
ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{}, "")
e, ok := ms.blockRegistry.Lookup("fail-hb")
if !ok {
@ -337,7 +337,7 @@ func TestQA_B10_ExpandFailed_HeartbeatStillProtected(t *testing.T) {
// After ClearExpandFailed, empty heartbeat should delete normally.
ms.blockRegistry.ClearExpandFailed("fail-hb")
ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{})
ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{}, "")
_, ok = ms.blockRegistry.Lookup("fail-hb")
if ok {
@ -372,7 +372,7 @@ func TestQA_B10_HeartbeatSizeSuppress_DuringExpand(t *testing.T) {
Epoch: 1,
Role: blockvol.RoleToWire(blockvol.RolePrimary),
},
})
}, "")
entry, _ = ms.blockRegistry.Lookup("size-suppress")
if entry.SizeBytes != origSize {
@ -388,7 +388,7 @@ func TestQA_B10_HeartbeatSizeSuppress_DuringExpand(t *testing.T) {
Epoch: 1,
Role: blockvol.RoleToWire(blockvol.RolePrimary),
},
})
}, "")
entry, _ = ms.blockRegistry.Lookup("size-suppress")
if entry.SizeBytes != origSize {
@ -441,7 +441,7 @@ func TestQA_B10_ConcurrentHeartbeatsAndExpand(t *testing.T) {
for i := 0; i < rounds; i++ {
if i%5 == 0 {
// Every 5th: empty heartbeat (simulates brief restart).
ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{})
ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{}, "")
} else {
ms.blockRegistry.UpdateFullHeartbeat(primary, []*master_pb.BlockVolumeInfoMessage{
{
@ -451,7 +451,7 @@ func TestQA_B10_ConcurrentHeartbeatsAndExpand(t *testing.T) {
Role: blockvol.RoleToWire(blockvol.RolePrimary),
WalHeadLsn: uint64(100 + i),
},
})
}, "")
}
}
}()
@ -470,7 +470,7 @@ func TestQA_B10_ConcurrentHeartbeatsAndExpand(t *testing.T) {
Role: blockvol.RoleToWire(blockvol.RoleReplica),
WalHeadLsn: uint64(99 + i),
},
})
}, "")
}
}()
}

22
weed/server/qa_block_nvme_publication_test.go

@ -111,7 +111,7 @@ func TestQA_NVMe_HeartbeatSetsNvmeFields(t *testing.T) {
NvmeAddr: "10.0.0.1:4420",
Nqn: "nqn.2024-01.com.seaweedfs:vol1",
},
})
}, "")
entry, ok := r.Lookup("vol1")
if !ok {
@ -152,7 +152,7 @@ func TestQA_NVMe_HeartbeatClearsStaleNvme(t *testing.T) {
Role: 1,
// NvmeAddr and Nqn intentionally empty.
},
})
}, "")
entry, _ := r.Lookup("vol1")
// After heartbeat with empty NVMe fields, stale NVMe info should be cleared.
@ -370,7 +370,7 @@ func TestQA_NVMe_FullHeartbeat_MasterRestart(t *testing.T) {
NvmeAddr: "10.0.0.1:4420",
Nqn: "nqn.2024-01.com.seaweedfs:vol1",
},
})
}, "")
// After heartbeat, volume should be reconstructed with NVMe fields.
// Currently the registry uses nameFromPath() to find/create entries.
@ -596,7 +596,7 @@ func TestIntegration_NVMe_FailoverUpdatesNvmeAddr(t *testing.T) {
NvmeAddr: newPrimaryHost + ":4420",
Nqn: fmt.Sprintf("nqn.2024-01.com.seaweedfs:vol.pvc-failover-nvme"),
},
})
}, "")
// CSI re-publishes after failover: Lookup must return new NVMe address.
lookupResp, err := ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "pvc-failover-nvme"})
@ -651,7 +651,7 @@ func TestIntegration_NVMe_HeartbeatReconstructionAfterMasterRestart(t *testing.T
NvmeAddr: primaryHost + ":4420",
Nqn: "nqn.2024-01.com.seaweedfs:vol.pvc-restart-1",
},
})
}, "")
// Step 4: CSI calls Lookup — must find NVMe details.
lookupResp, err := ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "pvc-restart-1"})
@ -895,7 +895,7 @@ func TestIntegration_NVMe_FullLifecycle_K8s(t *testing.T) {
NvmeAddr: newPrimaryHost + ":4420",
Nqn: "nqn.2024-01.com.seaweedfs:vol.pvc-k8s-data",
},
})
}, "")
// ── Step 7: CSI re-publishes → node plugin reconnects via NVMe ──
lookupResp, err = ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "pvc-k8s-data"})
@ -977,7 +977,7 @@ func TestQA_NVMe_ToggleNvmeOnRunningVS(t *testing.T) {
NvmeAddr: "10.0.0.1:4420",
Nqn: "nqn.2024-01.com.seaweedfs:toggle-vol",
},
})
}, "")
entry, _ = r.Lookup("toggle-vol")
if entry.NvmeAddr != "10.0.0.1:4420" {
@ -997,7 +997,7 @@ func TestQA_NVMe_ToggleNvmeOnRunningVS(t *testing.T) {
Role: 1,
// NvmeAddr and Nqn intentionally empty — NVMe disabled.
},
})
}, "")
entry, _ = r.Lookup("toggle-vol")
if entry.NvmeAddr != "" {
@ -1080,7 +1080,7 @@ func TestQA_NVMe_ToggleNvmeOnRunningVS_ReplicaSide(t *testing.T) {
NvmeAddr: "10.0.0.2:4420",
Nqn: "nqn.2024-01.com.seaweedfs:toggle-replica-vol",
},
})
}, "")
entry, _ = r.Lookup("toggle-replica-vol")
if entry.Replicas[0].NvmeAddr != "10.0.0.2:4420" {
@ -1101,7 +1101,7 @@ func TestQA_NVMe_ToggleNvmeOnRunningVS_ReplicaSide(t *testing.T) {
WalHeadLsn: 100,
// NvmeAddr and Nqn intentionally empty — NVMe disabled.
},
})
}, "")
entry, _ = r.Lookup("toggle-replica-vol")
if entry.Replicas[0].NvmeAddr != "" {
@ -1325,7 +1325,7 @@ func TestQA_NVMe_PromotionThenImmediateLookup(t *testing.T) {
NvmeAddr: "10.0.0.4:4420",
Nqn: "nqn.2024-01.com.seaweedfs:promo-fix-vol",
},
})
}, "")
// Now Lookup should return the NvmeAddr.
entry, ok := r.Lookup("promo-fix-vol")

1
weed/server/volume_grpc_client_to_master.go

@ -371,5 +371,6 @@ func (vs *VolumeServer) collectBlockVolumeHeartbeat(ip string, port uint32, dc,
Rack: rack,
BlockVolumeInfos: blockvol.InfoMessagesToProto(msgs),
HasNoBlockVolumes: len(msgs) == 0,
BlockNvmeAddr: vs.blockService.NvmeListenAddr(),
}
}

21
weed/storage/blockvol/blockapi/client.go

@ -191,6 +191,27 @@ func (c *Client) Preflight(ctx context.Context, name string) (*PreflightResponse
return &out, nil
}
// ResolvePolicy resolves a preset + overrides without creating a volume.
func (c *Client) ResolvePolicy(ctx context.Context, req CreateVolumeRequest) (*ResolvedPolicyResponse, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
resp, err := c.doRequest(ctx, http.MethodPost, "/block/volume/resolve", bytes.NewReader(body))
if err != nil {
return nil, err
}
defer resp.Body.Close()
if err := checkStatus(resp, http.StatusOK); err != nil {
return nil, err
}
var out ResolvedPolicyResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return &out, nil
}
// ListServers lists all block-capable volume servers.
func (c *Client) ListServers(ctx context.Context) ([]ServerInfo, error) {
resp, err := c.doRequest(ctx, http.MethodGet, "/block/servers", nil)

22
weed/storage/blockvol/blockapi/types.go

@ -14,6 +14,7 @@ type CreateVolumeRequest struct {
DiskType string `json:"disk_type"` // e.g. "ssd", "hdd"
DurabilityMode string `json:"durability_mode,omitempty"` // "best_effort", "sync_all", "sync_quorum"
ReplicaFactor int `json:"replica_factor,omitempty"` // 1, 2, or 3 (default: 2)
Preset string `json:"preset,omitempty"` // "database", "general", "throughput", or ""
}
// VolumeInfo describes a block volume.
@ -38,10 +39,31 @@ type VolumeInfo struct {
HealthScore float64 `json:"health_score"`
ReplicaDegraded bool `json:"replica_degraded,omitempty"`
DurabilityMode string `json:"durability_mode"` // CP8-3-1
Preset string `json:"preset,omitempty"` // CP11B-1: preset used at creation
NvmeAddr string `json:"nvme_addr,omitempty"`
NQN string `json:"nqn,omitempty"`
}
// ResolvedPolicyResponse is the response for POST /block/volume/resolve.
type ResolvedPolicyResponse struct {
Policy ResolvedPolicyView `json:"policy"`
Overrides []string `json:"overrides,omitempty"`
Warnings []string `json:"warnings,omitempty"`
Errors []string `json:"errors,omitempty"`
}
// ResolvedPolicyView is the fully resolved policy shown to the user.
type ResolvedPolicyView struct {
Preset string `json:"preset,omitempty"`
DurabilityMode string `json:"durability_mode"`
ReplicaFactor int `json:"replica_factor"`
DiskType string `json:"disk_type,omitempty"`
TransportPreference string `json:"transport_preference"`
WorkloadHint string `json:"workload_hint"`
WALSizeRecommended uint64 `json:"wal_size_recommended"`
StorageProfile string `json:"storage_profile"`
}
// ReplicaDetail describes one replica in the API response.
type ReplicaDetail struct {
Server string `json:"server"`

29
weed/storage/blockvol/blockvol.go

@ -464,20 +464,35 @@ func (v *BlockVol) ReadLBA(lba uint64, length uint32) ([]byte, error) {
// readOneBlock reads a single block, checking dirty map first, then extent.
func (v *BlockVol) readOneBlock(lba uint64) ([]byte, error) {
walOff, lsn, _, ok := v.dirtyMap.Get(lba)
if ok {
for {
walOff, lsn, _, ok := v.dirtyMap.Get(lba)
if !ok {
return v.readBlockFromExtent(lba)
}
data, stale, err := v.readBlockFromWAL(walOff, lba, lsn)
if err != nil {
return nil, err
}
if !stale {
if stale {
// WAL slot was reused. Extent may not have the latest: a newer write
// could have updated dirtyMap after we got our old entry. Re-check
// dirtyMap before trusting extent; if a newer entry exists, retry.
_, currentLSN, _, stillOk := v.dirtyMap.Get(lba)
if stillOk && currentLSN != lsn {
continue
}
return v.readBlockFromExtent(lba)
}
// Verify no newer write overwrote this LBA while we were reading.
// A concurrent WriteLBA could have Put(lba, walOff_new, lsn_new) after
// our Get; we would have read old data at walOff. Re-check and retry.
_, currentLSN, _, stillOk := v.dirtyMap.Get(lba)
if stillOk && currentLSN == lsn {
return data, nil
}
// WAL slot was reused (flusher reclaimed it between our
// dirty map read and WAL read). The data is already flushed
// to the extent region, so fall through to extent read.
// LSN changed (newer write) or flusher removed: retry with fresh dirtyMap state.
continue
}
return v.readBlockFromExtent(lba)
}
// maxWALEntryDataLen caps the data length we trust from a WAL entry header.

174
weed/storage/blockvol/preset.go

@ -0,0 +1,174 @@
package blockvol
import "fmt"
// PresetName identifies a named provisioning preset.
type PresetName string
const (
PresetDatabase PresetName = "database"
PresetGeneral PresetName = "general"
PresetThroughput PresetName = "throughput"
)
// VolumePolicy is the fully resolved configuration for a block volume.
type VolumePolicy struct {
Preset PresetName `json:"preset,omitempty"`
DurabilityMode string `json:"durability_mode"`
ReplicaFactor int `json:"replica_factor"`
DiskType string `json:"disk_type,omitempty"`
TransportPref string `json:"transport_preference"`
WorkloadHint string `json:"workload_hint"`
WALSizeRecommended uint64 `json:"wal_size_recommended"`
StorageProfile string `json:"storage_profile"`
}
// ResolvedPolicy is the result of resolving a preset + user overrides.
type ResolvedPolicy struct {
Policy VolumePolicy `json:"policy"`
Overrides []string `json:"overrides,omitempty"`
Warnings []string `json:"warnings,omitempty"`
Errors []string `json:"errors,omitempty"`
}
// EnvironmentInfo provides cluster state to the resolver.
type EnvironmentInfo struct {
NVMeAvailable bool
ServerCount int
WALSizeDefault uint64
BlockSizeDefault uint32
}
// presetDefaults holds the default policy for each named preset.
type presetDefaults struct {
DurabilityMode string
ReplicaFactor int
DiskType string
TransportPref string
WorkloadHint string
WALSizeRec uint64
}
var presets = map[PresetName]presetDefaults{
PresetDatabase: {
DurabilityMode: "sync_all",
ReplicaFactor: 2,
DiskType: "ssd",
TransportPref: "nvme",
WorkloadHint: WorkloadDatabase,
WALSizeRec: 128 << 20,
},
PresetGeneral: {
DurabilityMode: "best_effort",
ReplicaFactor: 2,
DiskType: "",
TransportPref: "iscsi",
WorkloadHint: WorkloadGeneral,
WALSizeRec: 64 << 20,
},
PresetThroughput: {
DurabilityMode: "best_effort",
ReplicaFactor: 2,
DiskType: "",
TransportPref: "iscsi",
WorkloadHint: WorkloadThroughput,
WALSizeRec: 128 << 20,
},
}
// system defaults when no preset is specified
var systemDefaults = presetDefaults{
DurabilityMode: "best_effort",
ReplicaFactor: 2,
DiskType: "",
TransportPref: "iscsi",
WorkloadHint: WorkloadGeneral,
WALSizeRec: 64 << 20,
}
// ResolvePolicy resolves a preset + explicit request fields into a final policy.
// Pure function — no side effects, no server dependencies.
func ResolvePolicy(preset PresetName, durabilityMode string, replicaFactor int,
diskType string, env EnvironmentInfo) ResolvedPolicy {
var result ResolvedPolicy
// Step 1: Look up preset defaults.
defaults := systemDefaults
if preset != "" {
pd, ok := presets[preset]
if !ok {
result.Errors = append(result.Errors, fmt.Sprintf("unknown preset %q", preset))
return result
}
defaults = pd
}
// Start with defaults.
policy := VolumePolicy{
Preset: preset,
DurabilityMode: defaults.DurabilityMode,
ReplicaFactor: defaults.ReplicaFactor,
DiskType: defaults.DiskType,
TransportPref: defaults.TransportPref,
WorkloadHint: defaults.WorkloadHint,
WALSizeRecommended: defaults.WALSizeRec,
StorageProfile: "single",
}
// Step 2: Apply overrides.
if durabilityMode != "" {
policy.DurabilityMode = durabilityMode
result.Overrides = append(result.Overrides, "durability_mode")
}
if replicaFactor != 0 {
policy.ReplicaFactor = replicaFactor
result.Overrides = append(result.Overrides, "replica_factor")
}
if diskType != "" {
policy.DiskType = diskType
result.Overrides = append(result.Overrides, "disk_type")
}
// Step 3: Normalize + validate durability_mode.
durMode, err := ParseDurabilityMode(policy.DurabilityMode)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("invalid durability_mode: %s", err))
result.Policy = policy
return result
}
// Step 4: Cross-validate durability vs RF.
if err := durMode.Validate(policy.ReplicaFactor); err != nil {
result.Errors = append(result.Errors, err.Error())
result.Policy = policy
return result
}
// Step 5: Advisory warnings.
if policy.ReplicaFactor == 1 && durMode == DurabilitySyncAll {
result.Warnings = append(result.Warnings,
"sync_all with replica_factor=1 provides no replication benefit")
}
if env.ServerCount > 0 && policy.ReplicaFactor > env.ServerCount {
result.Warnings = append(result.Warnings,
fmt.Sprintf("replica_factor=%d exceeds available servers (%d)", policy.ReplicaFactor, env.ServerCount))
}
// Step 6: Transport advisory.
if policy.TransportPref == "nvme" && !env.NVMeAvailable {
result.Warnings = append(result.Warnings,
"preset recommends NVMe transport but no NVMe-capable servers are available")
}
// Step 7: WAL sizing advisory.
// Check engine default against preset recommendation.
if env.WALSizeDefault > 0 && env.WALSizeDefault < policy.WALSizeRecommended {
result.Warnings = append(result.Warnings,
fmt.Sprintf("preset recommends %dMB WAL but engine default is %dMB",
policy.WALSizeRecommended>>20, env.WALSizeDefault>>20))
}
result.Policy = policy
return result
}

185
weed/storage/blockvol/preset_test.go

@ -0,0 +1,185 @@
package blockvol
import (
"strings"
"testing"
)
func TestResolvePolicy_DatabaseDefaults(t *testing.T) {
r := ResolvePolicy(PresetDatabase, "", 0, "", EnvironmentInfo{
NVMeAvailable: true, ServerCount: 3, WALSizeDefault: 128 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
p := r.Policy
if p.DurabilityMode != "sync_all" {
t.Errorf("durability_mode = %q, want sync_all", p.DurabilityMode)
}
if p.ReplicaFactor != 2 {
t.Errorf("replica_factor = %d, want 2", p.ReplicaFactor)
}
if p.DiskType != "ssd" {
t.Errorf("disk_type = %q, want ssd", p.DiskType)
}
if p.TransportPref != "nvme" {
t.Errorf("transport_preference = %q, want nvme", p.TransportPref)
}
if p.WALSizeRecommended != 128<<20 {
t.Errorf("wal_size_recommended = %d, want %d", p.WALSizeRecommended, 128<<20)
}
if p.WorkloadHint != "database" {
t.Errorf("workload_hint = %q, want database", p.WorkloadHint)
}
if p.StorageProfile != "single" {
t.Errorf("storage_profile = %q, want single", p.StorageProfile)
}
if len(r.Overrides) != 0 {
t.Errorf("overrides = %v, want empty", r.Overrides)
}
}
func TestResolvePolicy_GeneralDefaults(t *testing.T) {
r := ResolvePolicy(PresetGeneral, "", 0, "", EnvironmentInfo{
ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
p := r.Policy
if p.DurabilityMode != "best_effort" {
t.Errorf("durability_mode = %q, want best_effort", p.DurabilityMode)
}
if p.ReplicaFactor != 2 {
t.Errorf("replica_factor = %d, want 2", p.ReplicaFactor)
}
if p.DiskType != "" {
t.Errorf("disk_type = %q, want empty", p.DiskType)
}
if p.TransportPref != "iscsi" {
t.Errorf("transport_preference = %q, want iscsi", p.TransportPref)
}
if p.WALSizeRecommended != 64<<20 {
t.Errorf("wal_size_recommended = %d, want %d", p.WALSizeRecommended, 64<<20)
}
}
func TestResolvePolicy_ThroughputDefaults(t *testing.T) {
r := ResolvePolicy(PresetThroughput, "", 0, "", EnvironmentInfo{
ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
p := r.Policy
if p.DurabilityMode != "best_effort" {
t.Errorf("durability_mode = %q, want best_effort", p.DurabilityMode)
}
if p.WALSizeRecommended != 128<<20 {
t.Errorf("wal_size_recommended = %d, want %d", p.WALSizeRecommended, 128<<20)
}
if p.WorkloadHint != "throughput" {
t.Errorf("workload_hint = %q, want throughput", p.WorkloadHint)
}
}
func TestResolvePolicy_OverrideDurability(t *testing.T) {
r := ResolvePolicy(PresetDatabase, "best_effort", 0, "", EnvironmentInfo{
NVMeAvailable: true, ServerCount: 2, WALSizeDefault: 128 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
if r.Policy.DurabilityMode != "best_effort" {
t.Errorf("durability_mode = %q, want best_effort (override)", r.Policy.DurabilityMode)
}
if !containsStr(r.Overrides, "durability_mode") {
t.Errorf("overrides = %v, want durability_mode present", r.Overrides)
}
}
func TestResolvePolicy_OverrideRF(t *testing.T) {
r := ResolvePolicy(PresetGeneral, "", 3, "", EnvironmentInfo{
ServerCount: 3, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
if r.Policy.ReplicaFactor != 3 {
t.Errorf("replica_factor = %d, want 3 (override)", r.Policy.ReplicaFactor)
}
if !containsStr(r.Overrides, "replica_factor") {
t.Errorf("overrides = %v, want replica_factor present", r.Overrides)
}
}
func TestResolvePolicy_NoPreset_SystemDefaults(t *testing.T) {
r := ResolvePolicy("", "", 0, "", EnvironmentInfo{
ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
p := r.Policy
if p.DurabilityMode != "best_effort" {
t.Errorf("durability_mode = %q, want best_effort", p.DurabilityMode)
}
if p.ReplicaFactor != 2 {
t.Errorf("replica_factor = %d, want 2", p.ReplicaFactor)
}
if p.TransportPref != "iscsi" {
t.Errorf("transport_preference = %q, want iscsi", p.TransportPref)
}
if p.Preset != "" {
t.Errorf("preset = %q, want empty", p.Preset)
}
}
func TestResolvePolicy_InvalidPreset(t *testing.T) {
r := ResolvePolicy("nosuch", "", 0, "", EnvironmentInfo{})
if len(r.Errors) == 0 {
t.Fatal("expected error for unknown preset")
}
if !strings.Contains(r.Errors[0], "unknown preset") {
t.Errorf("error = %q, want to contain 'unknown preset'", r.Errors[0])
}
}
func TestResolvePolicy_IncompatibleCombo(t *testing.T) {
r := ResolvePolicy("", "sync_quorum", 2, "", EnvironmentInfo{
ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) == 0 {
t.Fatal("expected error for sync_quorum + RF=2")
}
if !strings.Contains(r.Errors[0], "replica_factor >= 3") {
t.Errorf("error = %q, want sync_quorum RF constraint", r.Errors[0])
}
}
func TestResolvePolicy_WALWarning(t *testing.T) {
r := ResolvePolicy(PresetDatabase, "", 0, "", EnvironmentInfo{
NVMeAvailable: true, ServerCount: 2, WALSizeDefault: 64 << 20, BlockSizeDefault: 4096,
})
if len(r.Errors) > 0 {
t.Fatalf("unexpected errors: %v", r.Errors)
}
found := false
for _, w := range r.Warnings {
if strings.Contains(w, "128MB WAL") && strings.Contains(w, "64MB") {
found = true
}
}
if !found {
t.Errorf("expected WAL sizing warning, got warnings: %v", r.Warnings)
}
}
func containsStr(ss []string, target string) bool {
for _, s := range ss {
if s == target {
return true
}
}
return false
}
Loading…
Cancel
Save