Browse Source

feat: Phase 4A CP4b-1 -- wire types, conversion helpers, heartbeat collection

Add BlockVolumeInfoMessage, BlockVolumeShortInfoMessage, BlockVolumeAssignment
wire-type structs (proto-shaped Go structs). Add conversion helpers with
DiskType plumbing, overflow-safe LeaseTTLToWire, validated RoleFromWire.
Add CollectBlockVolumeHeartbeat on BlockVolumeStore. 9 new tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
ffdde15bcd
  1. 4
      weed/server/volume_server_block.go
  2. 105
      weed/storage/blockvol/block_heartbeat.go
  3. 271
      weed/storage/blockvol/block_heartbeat_test.go
  4. 27
      weed/storage/store_blockvol.go
  5. 10
      weed/storage/store_blockvol_test.go

4
weed/server/volume_server_block.go

@ -55,7 +55,7 @@ func StartBlockService(listenAddr, blockDir, iqnPrefix string) *BlockService {
continue
}
path := filepath.Join(blockDir, entry.Name())
vol, err := bs.blockStore.AddBlockVolume(path)
vol, err := bs.blockStore.AddBlockVolume(path, "")
if err != nil {
// Auto-initialize raw files (e.g. created via truncate).
info, serr := entry.Info()
@ -70,7 +70,7 @@ func StartBlockService(listenAddr, blockDir, iqnPrefix string) *BlockService {
continue
}
created.Close()
vol, err = bs.blockStore.AddBlockVolume(path)
vol, err = bs.blockStore.AddBlockVolume(path, "")
if err != nil {
glog.Warningf("block service: skip %s after auto-create: %v", path, err)
continue

105
weed/storage/blockvol/block_heartbeat.go

@ -0,0 +1,105 @@
package blockvol
import (
"math"
"time"
)
// BlockVolumeInfoMessage is the heartbeat status for one block volume.
// Mirrors the proto message that will be generated from master.proto.
type BlockVolumeInfoMessage struct {
Path string // volume file path (unique ID on this server)
VolumeSize uint64 // logical size in bytes
BlockSize uint32 // block size in bytes
Epoch uint64 // current fencing epoch
Role uint32 // blockvol.Role as uint32 for wire compat
WalHeadLsn uint64 // WAL head LSN
CheckpointLsn uint64 // last flushed LSN
HasLease bool // whether volume holds a valid lease
DiskType string // e.g., "ssd", "hdd"
}
// BlockVolumeShortInfoMessage is used for delta heartbeats
// (new/deleted block volumes).
type BlockVolumeShortInfoMessage struct {
Path string
VolumeSize uint64
BlockSize uint32
DiskType string
}
// BlockVolumeAssignment carries a role/epoch/lease assignment
// from master to volume server for one block volume.
type BlockVolumeAssignment struct {
Path string // which block volume
Epoch uint64 // new epoch
Role uint32 // target role (blockvol.Role as uint32)
LeaseTtlMs uint32 // lease TTL in milliseconds (0 = no lease)
}
// ToBlockVolumeInfoMessage converts a BlockVol's current state
// to a heartbeat info message. diskType is caller-supplied metadata
// (e.g. "ssd", "hdd") since the volume itself does not track disk type.
func ToBlockVolumeInfoMessage(path, diskType string, vol *BlockVol) BlockVolumeInfoMessage {
info := vol.Info()
status := vol.Status()
return BlockVolumeInfoMessage{
Path: path,
VolumeSize: info.VolumeSize,
BlockSize: info.BlockSize,
Epoch: status.Epoch,
Role: RoleToWire(status.Role),
WalHeadLsn: status.WALHeadLSN,
CheckpointLsn: status.CheckpointLSN,
HasLease: status.HasLease,
DiskType: diskType,
}
}
// ToBlockVolumeShortInfoMessage returns a short info message
// for delta heartbeats. diskType is caller-supplied metadata.
func ToBlockVolumeShortInfoMessage(path, diskType string, vol *BlockVol) BlockVolumeShortInfoMessage {
info := vol.Info()
return BlockVolumeShortInfoMessage{
Path: path,
VolumeSize: info.VolumeSize,
BlockSize: info.BlockSize,
DiskType: diskType,
}
}
// maxValidRole is the highest defined Role value.
const maxValidRole = uint32(RoleDraining)
// RoleFromWire converts a uint32 wire role to blockvol.Role.
// Unknown values are mapped to RoleNone to prevent invalid roles
// from propagating through the system.
func RoleFromWire(r uint32) Role {
if r > maxValidRole {
return RoleNone
}
return Role(r)
}
// RoleToWire converts a blockvol.Role to uint32 for wire.
func RoleToWire(r Role) uint32 {
return uint32(r)
}
// LeaseTTLFromWire converts milliseconds to time.Duration.
func LeaseTTLFromWire(ms uint32) time.Duration {
return time.Duration(ms) * time.Millisecond
}
// LeaseTTLToWire converts time.Duration to milliseconds.
// Durations exceeding ~49.7 days are clamped to math.MaxUint32.
func LeaseTTLToWire(d time.Duration) uint32 {
ms := d.Milliseconds()
if ms > math.MaxUint32 {
return math.MaxUint32
}
if ms < 0 {
return 0
}
return uint32(ms)
}

271
weed/storage/blockvol/block_heartbeat_test.go

@ -0,0 +1,271 @@
package blockvol
import (
"math"
"path/filepath"
"testing"
"time"
)
func TestBlockHeartbeat(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{name: "to_info_message_fields", run: testToInfoMessageFields},
{name: "to_short_info_message", run: testToShortInfoMessage},
{name: "role_wire_roundtrip", run: testRoleWireRoundtrip},
{name: "role_from_wire_unknown_maps_to_none", run: testRoleFromWireUnknown},
{name: "lease_ttl_wire_roundtrip", run: testLeaseTTLWireRoundtrip},
{name: "lease_ttl_overflow_clamps", run: testLeaseTTLOverflowClamps},
{name: "disk_type_propagates", run: testDiskTypePropagates},
{name: "collect_heartbeat_empty", run: testCollectHeartbeatEmpty},
{name: "collect_heartbeat_multiple", run: testCollectHeartbeatMultiple},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
// testToInfoMessageFields creates a vol, writes data, and verifies all fields.
func testToInfoMessageFields(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "hb.blk")
vol, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 1 << 20, // 1MB
BlockSize: 4096,
})
if err != nil {
t.Fatalf("create: %v", err)
}
defer vol.Close()
// Write one block so WAL head LSN advances.
data := make([]byte, 4096)
data[0] = 0xAB
if err := vol.WriteLBA(0, data); err != nil {
t.Fatalf("write: %v", err)
}
msg := ToBlockVolumeInfoMessage(path, "ssd", vol)
if msg.Path != path {
t.Errorf("Path = %q, want %q", msg.Path, path)
}
if msg.VolumeSize != 1<<20 {
t.Errorf("VolumeSize = %d, want %d", msg.VolumeSize, 1<<20)
}
if msg.BlockSize != 4096 {
t.Errorf("BlockSize = %d, want 4096", msg.BlockSize)
}
if msg.WalHeadLsn == 0 {
t.Error("WalHeadLsn should be > 0 after a write")
}
// Role should be RoleNone (0) for a fresh volume.
if msg.Role != RoleToWire(RoleNone) {
t.Errorf("Role = %d, want %d", msg.Role, RoleToWire(RoleNone))
}
if msg.DiskType != "ssd" {
t.Errorf("DiskType = %q, want %q", msg.DiskType, "ssd")
}
}
// testToShortInfoMessage verifies short message has path/size/blockSize/diskType.
func testToShortInfoMessage(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "hb_short.blk")
vol, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 1 << 20,
BlockSize: 4096,
})
if err != nil {
t.Fatalf("create: %v", err)
}
defer vol.Close()
msg := ToBlockVolumeShortInfoMessage(path, "hdd", vol)
if msg.Path != path {
t.Errorf("Path = %q, want %q", msg.Path, path)
}
if msg.VolumeSize != 1<<20 {
t.Errorf("VolumeSize = %d, want %d", msg.VolumeSize, 1<<20)
}
if msg.BlockSize != 4096 {
t.Errorf("BlockSize = %d, want 4096", msg.BlockSize)
}
if msg.DiskType != "hdd" {
t.Errorf("DiskType = %q, want %q", msg.DiskType, "hdd")
}
}
// testRoleWireRoundtrip verifies all Role values roundtrip through wire conversion.
func testRoleWireRoundtrip(t *testing.T) {
roles := []Role{RoleNone, RolePrimary, RoleReplica, RoleStale, RoleRebuilding, RoleDraining}
for _, r := range roles {
wire := RoleToWire(r)
back := RoleFromWire(wire)
if back != r {
t.Errorf("RoleFromWire(RoleToWire(%s)) = %s, want %s", r, back, r)
}
}
}
// testRoleFromWireUnknown verifies unknown wire values map to RoleNone.
func testRoleFromWireUnknown(t *testing.T) {
unknowns := []uint32{100, 255, math.MaxUint32}
for _, u := range unknowns {
got := RoleFromWire(u)
if got != RoleNone {
t.Errorf("RoleFromWire(%d) = %s, want %s", u, got, RoleNone)
}
}
}
// testLeaseTTLWireRoundtrip verifies various durations roundtrip correctly.
func testLeaseTTLWireRoundtrip(t *testing.T) {
cases := []time.Duration{
0,
1 * time.Millisecond,
500 * time.Millisecond,
5 * time.Second,
30 * time.Second,
10 * time.Minute,
}
for _, d := range cases {
wire := LeaseTTLToWire(d)
back := LeaseTTLFromWire(wire)
if back != d {
t.Errorf("LeaseTTLFromWire(LeaseTTLToWire(%v)) = %v, want %v", d, back, d)
}
}
}
// testLeaseTTLOverflowClamps verifies large durations clamp to MaxUint32.
func testLeaseTTLOverflowClamps(t *testing.T) {
huge := 50 * 24 * time.Hour // 50 days > ~49.7 day uint32 ms limit
wire := LeaseTTLToWire(huge)
if wire != math.MaxUint32 {
t.Errorf("LeaseTTLToWire(50 days) = %d, want %d", wire, uint32(math.MaxUint32))
}
// Negative duration should clamp to 0.
wire = LeaseTTLToWire(-1 * time.Second)
if wire != 0 {
t.Errorf("LeaseTTLToWire(-1s) = %d, want 0", wire)
}
}
// testDiskTypePropagates verifies DiskType flows through both message types.
func testDiskTypePropagates(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "dt.blk")
vol, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 1 << 20,
BlockSize: 4096,
})
if err != nil {
t.Fatalf("create: %v", err)
}
defer vol.Close()
info := ToBlockVolumeInfoMessage(path, "nvme", vol)
if info.DiskType != "nvme" {
t.Errorf("info DiskType = %q, want %q", info.DiskType, "nvme")
}
short := ToBlockVolumeShortInfoMessage(path, "nvme", vol)
if short.DiskType != "nvme" {
t.Errorf("short DiskType = %q, want %q", short.DiskType, "nvme")
}
// Empty diskType should be allowed (unknown disk).
info2 := ToBlockVolumeInfoMessage(path, "", vol)
if info2.DiskType != "" {
t.Errorf("info DiskType = %q, want empty", info2.DiskType)
}
}
// testCollectHeartbeatEmpty verifies empty store returns empty slice.
func testCollectHeartbeatEmpty(t *testing.T) {
store := &testBlockVolumeStore{
volumes: make(map[string]*BlockVol),
diskTypes: make(map[string]string),
}
msgs := collectBlockVolumeHeartbeat(store)
if len(msgs) != 0 {
t.Errorf("expected empty slice, got %d messages", len(msgs))
}
}
// testCollectHeartbeatMultiple verifies store with 3 vols returns 3 messages.
func testCollectHeartbeatMultiple(t *testing.T) {
dir := t.TempDir()
store := &testBlockVolumeStore{
volumes: make(map[string]*BlockVol),
diskTypes: make(map[string]string),
}
paths := []string{"a.blk", "b.blk", "c.blk"}
dtypes := []string{"ssd", "hdd", "nvme"}
for i, name := range paths {
p := filepath.Join(dir, name)
vol, err := CreateBlockVol(p, CreateOptions{
VolumeSize: 1 << 20,
BlockSize: 4096,
})
if err != nil {
t.Fatalf("create %s: %v", name, err)
}
defer vol.Close()
store.volumes[p] = vol
store.diskTypes[p] = dtypes[i]
}
msgs := collectBlockVolumeHeartbeat(store)
if len(msgs) != 3 {
t.Fatalf("expected 3 messages, got %d", len(msgs))
}
// Verify each message has correct fields.
seen := make(map[string]BlockVolumeInfoMessage)
for _, m := range msgs {
seen[m.Path] = m
if m.VolumeSize != 1<<20 {
t.Errorf("msg %s: VolumeSize = %d, want %d", m.Path, m.VolumeSize, 1<<20)
}
if m.BlockSize != 4096 {
t.Errorf("msg %s: BlockSize = %d, want 4096", m.Path, m.BlockSize)
}
}
for i, name := range paths {
p := filepath.Join(dir, name)
m, ok := seen[p]
if !ok {
t.Errorf("missing message for %s", p)
continue
}
if m.DiskType != dtypes[i] {
t.Errorf("msg %s: DiskType = %q, want %q", p, m.DiskType, dtypes[i])
}
}
}
// testBlockVolumeStore is a minimal test double to avoid importing the storage package.
type testBlockVolumeStore struct {
volumes map[string]*BlockVol
diskTypes map[string]string
}
// collectBlockVolumeHeartbeat mirrors BlockVolumeStore.CollectBlockVolumeHeartbeat
// using the test double, exercising the same ToBlockVolumeInfoMessage logic.
func collectBlockVolumeHeartbeat(store *testBlockVolumeStore) []BlockVolumeInfoMessage {
msgs := make([]BlockVolumeInfoMessage, 0, len(store.volumes))
for path, vol := range store.volumes {
msgs = append(msgs, ToBlockVolumeInfoMessage(path, store.diskTypes[path], vol))
}
return msgs
}

27
weed/storage/store_blockvol.go

@ -12,19 +12,22 @@ import (
// It is a standalone component held by VolumeServer, not embedded into Store,
// to keep the existing Store codebase unchanged.
type BlockVolumeStore struct {
mu sync.RWMutex
volumes map[string]*blockvol.BlockVol // keyed by volume file path
mu sync.RWMutex
volumes map[string]*blockvol.BlockVol // keyed by volume file path
diskTypes map[string]string // path -> disk type (e.g. "ssd")
}
// NewBlockVolumeStore creates a new block volume manager.
func NewBlockVolumeStore() *BlockVolumeStore {
return &BlockVolumeStore{
volumes: make(map[string]*blockvol.BlockVol),
volumes: make(map[string]*blockvol.BlockVol),
diskTypes: make(map[string]string),
}
}
// AddBlockVolume opens and registers a block volume.
func (bs *BlockVolumeStore) AddBlockVolume(path string, cfgs ...blockvol.BlockVolConfig) (*blockvol.BlockVol, error) {
// diskType is metadata for heartbeat reporting (e.g. "ssd", "hdd").
func (bs *BlockVolumeStore) AddBlockVolume(path, diskType string, cfgs ...blockvol.BlockVolConfig) (*blockvol.BlockVol, error) {
bs.mu.Lock()
defer bs.mu.Unlock()
@ -38,7 +41,8 @@ func (bs *BlockVolumeStore) AddBlockVolume(path string, cfgs ...blockvol.BlockVo
}
bs.volumes[path] = vol
glog.V(0).Infof("block volume registered: %s", path)
bs.diskTypes[path] = diskType
glog.V(0).Infof("block volume registered: %s (disk=%s)", path, diskType)
return vol, nil
}
@ -56,6 +60,7 @@ func (bs *BlockVolumeStore) RemoveBlockVolume(path string) error {
glog.Warningf("error closing block volume %s: %v", path, err)
}
delete(bs.volumes, path)
delete(bs.diskTypes, path)
glog.V(0).Infof("block volume removed: %s", path)
return nil
}
@ -79,6 +84,18 @@ func (bs *BlockVolumeStore) ListBlockVolumes() []string {
return paths
}
// CollectBlockVolumeHeartbeat returns status for all registered
// block volumes, suitable for inclusion in a heartbeat message.
func (bs *BlockVolumeStore) CollectBlockVolumeHeartbeat() []blockvol.BlockVolumeInfoMessage {
bs.mu.RLock()
defer bs.mu.RUnlock()
msgs := make([]blockvol.BlockVolumeInfoMessage, 0, len(bs.volumes))
for path, vol := range bs.volumes {
msgs = append(msgs, blockvol.ToBlockVolumeInfoMessage(path, bs.diskTypes[path], vol))
}
return msgs
}
// Close closes all block volumes.
func (bs *BlockVolumeStore) Close() {
bs.mu.Lock()

10
weed/storage/store_blockvol_test.go

@ -30,7 +30,7 @@ func TestStoreAddBlockVolume(t *testing.T) {
bs := NewBlockVolumeStore()
defer bs.Close()
vol, err := bs.AddBlockVolume(path)
vol, err := bs.AddBlockVolume(path, "")
if err != nil {
t.Fatal(err)
}
@ -39,7 +39,7 @@ func TestStoreAddBlockVolume(t *testing.T) {
}
// Duplicate add should fail.
_, err = bs.AddBlockVolume(path)
_, err = bs.AddBlockVolume(path, "")
if err == nil {
t.Fatal("expected error on duplicate add")
}
@ -64,7 +64,7 @@ func TestStoreRemoveBlockVolume(t *testing.T) {
bs := NewBlockVolumeStore()
defer bs.Close()
if _, err := bs.AddBlockVolume(path); err != nil {
if _, err := bs.AddBlockVolume(path, ""); err != nil {
t.Fatal(err)
}
@ -90,10 +90,10 @@ func TestStoreCloseAllBlockVolumes(t *testing.T) {
bs := NewBlockVolumeStore()
if _, err := bs.AddBlockVolume(path1); err != nil {
if _, err := bs.AddBlockVolume(path1, ""); err != nil {
t.Fatal(err)
}
if _, err := bs.AddBlockVolume(path2); err != nil {
if _, err := bs.AddBlockVolume(path2, ""); err != nil {
t.Fatal(err)
}

Loading…
Cancel
Save