Browse Source

feat: CP11A-1 storage profile type, superblock persistence, and validation

Add StorageProfile enum (single=0, striped=1 reserved) persisted at
superblock offset 105. Existing volumes auto-map to single via zero-pad
backward compatibility. CreateBlockVol rejects striped and invalid
profile values before file creation. ParseStorageProfile is
case-insensitive and whitespace-tolerant.

13 tests: enum string/parse, superblock persistence, backward compat,
create/open/reopen, striped rejection, invalid profile rejection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 2 days ago
parent
commit
74e8a4ce68
  1. 12
      weed/storage/blockvol/blockvol.go
  2. 48
      weed/storage/blockvol/storage_profile.go
  3. 235
      weed/storage/blockvol/storage_profile_test.go
  4. 11
      weed/storage/blockvol/superblock.go

12
weed/storage/blockvol/blockvol.go

@ -25,6 +25,7 @@ type CreateOptions struct {
WALSize uint64 // default 64MB
Replication string // default "000"
DurabilityMode DurabilityMode // CP8-3-1: default best_effort (0)
StorageProfile StorageProfile // CP11A-1: default single (0)
}
// ErrVolumeClosed is returned when an operation is attempted on a closed BlockVol.
@ -92,6 +93,12 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B
if opts.VolumeSize == 0 {
return nil, ErrInvalidVolumeSize
}
if opts.StorageProfile == ProfileStriped {
return nil, ErrStripedNotImplemented
}
if opts.StorageProfile > ProfileStriped {
return nil, fmt.Errorf("%w: %d", ErrInvalidStorageProfile, opts.StorageProfile)
}
sb, err := NewSuperblock(opts.VolumeSize, opts)
if err != nil {
@ -620,6 +627,11 @@ func (v *BlockVol) DurabilityMode() DurabilityMode {
return DurabilityMode(v.super.DurabilityMode)
}
// Profile returns the volume's storage profile from the superblock.
func (v *BlockVol) Profile() StorageProfile {
return StorageProfile(v.super.StorageProfile)
}
// WALUsedFraction returns the fraction of WAL space currently in use (0.0 to 1.0).
func (v *BlockVol) WALUsedFraction() float64 {
if v.wal == nil {

48
weed/storage/blockvol/storage_profile.go

@ -0,0 +1,48 @@
package blockvol
import (
"errors"
"fmt"
"strings"
)
// StorageProfile controls the data layout strategy for a block volume.
// - ProfileSingle (0): single-server, one file (default, backward-compat)
// - ProfileStriped (1): multi-server striping (reserved, not yet implemented)
type StorageProfile uint8
const (
ProfileSingle StorageProfile = 0 // zero-value = backward compat
ProfileStriped StorageProfile = 1 // reserved — rejected at creation time
)
var (
ErrInvalidStorageProfile = errors.New("blockvol: invalid storage profile")
ErrStripedNotImplemented = errors.New("blockvol: striped profile is not yet implemented")
)
// ParseStorageProfile converts a string to StorageProfile.
// Empty string is treated as "single" for backward compatibility.
// Parsing is case-insensitive.
func ParseStorageProfile(s string) (StorageProfile, error) {
switch strings.ToLower(strings.TrimSpace(s)) {
case "", "single":
return ProfileSingle, nil
case "striped":
return ProfileStriped, nil
default:
return 0, fmt.Errorf("%w: %q", ErrInvalidStorageProfile, s)
}
}
// String returns the canonical string representation.
func (p StorageProfile) String() string {
switch p {
case ProfileSingle:
return "single"
case ProfileStriped:
return "striped"
default:
return fmt.Sprintf("unknown(%d)", p)
}
}

235
weed/storage/blockvol/storage_profile_test.go

@ -0,0 +1,235 @@
package blockvol
import (
"bytes"
"errors"
"os"
"path/filepath"
"testing"
)
func TestStorageProfile_String(t *testing.T) {
tests := []struct {
p StorageProfile
want string
}{
{ProfileSingle, "single"},
{ProfileStriped, "striped"},
{StorageProfile(99), "unknown(99)"},
}
for _, tt := range tests {
got := tt.p.String()
if got != tt.want {
t.Errorf("StorageProfile(%d).String() = %q, want %q", tt.p, got, tt.want)
}
}
}
func TestParseStorageProfile_Valid(t *testing.T) {
tests := []struct {
input string
want StorageProfile
}{
{"single", ProfileSingle},
{"striped", ProfileStriped},
{"", ProfileSingle}, // empty = backward compat
{"Single", ProfileSingle}, // case-insensitive
{"STRIPED", ProfileStriped},
{" single ", ProfileSingle}, // whitespace-tolerant
}
for _, tt := range tests {
got, err := ParseStorageProfile(tt.input)
if err != nil {
t.Errorf("ParseStorageProfile(%q) error: %v", tt.input, err)
}
if got != tt.want {
t.Errorf("ParseStorageProfile(%q) = %v, want %v", tt.input, got, tt.want)
}
}
}
func TestParseStorageProfile_Invalid(t *testing.T) {
_, err := ParseStorageProfile("mirrored")
if !errors.Is(err, ErrInvalidStorageProfile) {
t.Errorf("ParseStorageProfile(mirrored) error = %v, want ErrInvalidStorageProfile", err)
}
}
func TestStorageProfile_StringRoundTrip(t *testing.T) {
for _, p := range []StorageProfile{ProfileSingle, ProfileStriped} {
s := p.String()
got, err := ParseStorageProfile(s)
if err != nil {
t.Errorf("round-trip %v -> %q: parse error: %v", p, s, err)
}
if got != p {
t.Errorf("round-trip %v -> %q -> %v", p, s, got)
}
}
}
func TestSuperblock_ProfilePersistence(t *testing.T) {
// ProfileSingle persists and reads back correctly.
sb, err := NewSuperblock(1*1024*1024*1024, CreateOptions{
StorageProfile: ProfileSingle,
})
if err != nil {
t.Fatalf("NewSuperblock: %v", err)
}
if sb.StorageProfile != uint8(ProfileSingle) {
t.Fatalf("StorageProfile = %d, want %d", sb.StorageProfile, ProfileSingle)
}
var buf bytes.Buffer
sb.WriteTo(&buf)
got, err := ReadSuperblock(&buf)
if err != nil {
t.Fatalf("ReadSuperblock: %v", err)
}
if got.StorageProfile != uint8(ProfileSingle) {
t.Errorf("StorageProfile = %d, want %d", got.StorageProfile, ProfileSingle)
}
}
func TestSuperblock_BackwardCompat_ProfileZero(t *testing.T) {
// Existing volumes have 0 at profile offset → ProfileSingle.
sb, _ := NewSuperblock(1*1024*1024*1024, CreateOptions{})
var buf bytes.Buffer
sb.WriteTo(&buf)
got, err := ReadSuperblock(&buf)
if err != nil {
t.Fatalf("ReadSuperblock: %v", err)
}
if StorageProfile(got.StorageProfile) != ProfileSingle {
t.Errorf("backward compat: StorageProfile = %d, want %d (single)", got.StorageProfile, ProfileSingle)
}
}
func TestSuperblock_InvalidProfileRejected(t *testing.T) {
sb, _ := NewSuperblock(1*1024*1024*1024, CreateOptions{})
sb.StorageProfile = 99
err := sb.Validate()
if err == nil {
t.Error("Validate should reject StorageProfile=99")
}
if !errors.Is(err, ErrInvalidSuperblock) {
t.Errorf("Validate error = %v, want ErrInvalidSuperblock", err)
}
}
func TestCreate_WithProfile(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.blk")
vol, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 4 * 1024 * 1024,
StorageProfile: ProfileSingle,
})
if err != nil {
t.Fatalf("Create: %v", err)
}
defer vol.Close()
if vol.Profile() != ProfileSingle {
t.Errorf("Profile() = %v, want single", vol.Profile())
}
}
func TestCreate_DefaultProfile(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.blk")
vol, err := CreateBlockVol(path, CreateOptions{VolumeSize: 4 * 1024 * 1024})
if err != nil {
t.Fatalf("Create: %v", err)
}
defer vol.Close()
if vol.Profile() != ProfileSingle {
t.Errorf("default Profile() = %v, want single", vol.Profile())
}
}
func TestOpen_ProfileSurvivesReopen(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.blk")
vol, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 4 * 1024 * 1024,
StorageProfile: ProfileSingle,
})
if err != nil {
t.Fatalf("Create: %v", err)
}
vol.Close()
vol2, err := OpenBlockVol(path)
if err != nil {
t.Fatalf("Open: %v", err)
}
defer vol2.Close()
if vol2.Profile() != ProfileSingle {
t.Errorf("Profile() after reopen = %v, want single", vol2.Profile())
}
}
func TestCreate_StripedRejected(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.blk")
_, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 4 * 1024 * 1024,
StorageProfile: ProfileStriped,
})
if !errors.Is(err, ErrStripedNotImplemented) {
t.Errorf("CreateBlockVol(striped) error = %v, want ErrStripedNotImplemented", err)
}
// File should not exist.
if _, statErr := os.Stat(path); !os.IsNotExist(statErr) {
t.Error("CreateBlockVol(striped) should not leave a file behind")
}
}
func TestCreate_InvalidProfileRejected(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.blk")
_, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 4 * 1024 * 1024,
StorageProfile: StorageProfile(99),
})
if !errors.Is(err, ErrInvalidStorageProfile) {
t.Errorf("CreateBlockVol(profile=99) error = %v, want ErrInvalidStorageProfile", err)
}
// File should not exist.
if _, statErr := os.Stat(path); !os.IsNotExist(statErr) {
t.Error("CreateBlockVol(invalid profile) should not leave a file behind")
}
}
func TestOpen_InvalidProfileOnDisk(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.blk")
vol, err := CreateBlockVol(path, CreateOptions{VolumeSize: 4 * 1024 * 1024})
if err != nil {
t.Fatalf("Create: %v", err)
}
vol.Close()
// Corrupt the StorageProfile byte on disk (offset 105).
f, err := os.OpenFile(path, os.O_RDWR, 0644)
if err != nil {
t.Fatalf("open: %v", err)
}
if _, err := f.WriteAt([]byte{99}, 105); err != nil {
t.Fatalf("write corrupt byte: %v", err)
}
f.Close()
_, err = OpenBlockVol(path)
if err == nil {
t.Fatal("OpenBlockVol should fail with invalid StorageProfile")
}
}

11
weed/storage/blockvol/superblock.go

@ -41,6 +41,7 @@ type Superblock struct {
SnapshotCount uint32
Epoch uint64 // fencing epoch (0 = no fencing, Phase 3 compat)
DurabilityMode uint8 // CP8-3-1: 0=best_effort, 1=sync_all, 2=sync_quorum
StorageProfile uint8 // CP11A-1: 0=single, 1=striped (reserved)
}
// superblockOnDisk is the fixed-size on-disk layout (binary.Write/Read target).
@ -63,6 +64,7 @@ type superblockOnDisk struct {
SnapshotCount uint32
Epoch uint64
DurabilityMode uint8
StorageProfile uint8
}
// NewSuperblock creates a superblock with defaults and a fresh UUID.
@ -101,6 +103,7 @@ func NewSuperblock(volumeSize uint64, opts CreateOptions) (Superblock, error) {
WALOffset: SuperblockSize,
WALSize: walSize,
DurabilityMode: uint8(opts.DurabilityMode),
StorageProfile: uint8(opts.StorageProfile),
}
copy(sb.Magic[:], MagicSWBK)
sb.UUID = id
@ -131,6 +134,7 @@ func (sb *Superblock) WriteTo(w io.Writer) (int64, error) {
SnapshotCount: sb.SnapshotCount,
Epoch: sb.Epoch,
DurabilityMode: sb.DurabilityMode,
StorageProfile: sb.StorageProfile,
}
// Encode into beginning of buf; rest stays zero (padding).
@ -166,6 +170,8 @@ func (sb *Superblock) WriteTo(w io.Writer) (int64, error) {
endian.PutUint64(buf[off:], d.Epoch)
off += 8
buf[off] = d.DurabilityMode
off++
buf[off] = d.StorageProfile
n, err := w.Write(buf)
return int64(n), err
@ -228,6 +234,8 @@ func ReadSuperblock(r io.Reader) (Superblock, error) {
sb.Epoch = endian.Uint64(buf[off:])
off += 8
sb.DurabilityMode = buf[off]
off++
sb.StorageProfile = buf[off]
return sb, nil
}
@ -263,5 +271,8 @@ func (sb *Superblock) Validate() error {
if sb.DurabilityMode > 2 {
return fmt.Errorf("%w: invalid DurabilityMode %d", ErrInvalidSuperblock, sb.DurabilityMode)
}
if sb.StorageProfile > 1 {
return fmt.Errorf("%w: invalid StorageProfile %d", ErrInvalidSuperblock, sb.StorageProfile)
}
return nil
}
Loading…
Cancel
Save