diff --git a/weed/server/volume_server_block.go b/weed/server/volume_server_block.go index 0de28350e..273b5928d 100644 --- a/weed/server/volume_server_block.go +++ b/weed/server/volume_server_block.go @@ -55,7 +55,7 @@ func StartBlockService(listenAddr, blockDir, iqnPrefix string) *BlockService { continue } path := filepath.Join(blockDir, entry.Name()) - vol, err := bs.blockStore.AddBlockVolume(path) + vol, err := bs.blockStore.AddBlockVolume(path, "") if err != nil { // Auto-initialize raw files (e.g. created via truncate). info, serr := entry.Info() @@ -70,7 +70,7 @@ func StartBlockService(listenAddr, blockDir, iqnPrefix string) *BlockService { continue } created.Close() - vol, err = bs.blockStore.AddBlockVolume(path) + vol, err = bs.blockStore.AddBlockVolume(path, "") if err != nil { glog.Warningf("block service: skip %s after auto-create: %v", path, err) continue diff --git a/weed/storage/blockvol/block_heartbeat.go b/weed/storage/blockvol/block_heartbeat.go new file mode 100644 index 000000000..5ff9e5740 --- /dev/null +++ b/weed/storage/blockvol/block_heartbeat.go @@ -0,0 +1,105 @@ +package blockvol + +import ( + "math" + "time" +) + +// BlockVolumeInfoMessage is the heartbeat status for one block volume. +// Mirrors the proto message that will be generated from master.proto. +type BlockVolumeInfoMessage struct { + Path string // volume file path (unique ID on this server) + VolumeSize uint64 // logical size in bytes + BlockSize uint32 // block size in bytes + Epoch uint64 // current fencing epoch + Role uint32 // blockvol.Role as uint32 for wire compat + WalHeadLsn uint64 // WAL head LSN + CheckpointLsn uint64 // last flushed LSN + HasLease bool // whether volume holds a valid lease + DiskType string // e.g., "ssd", "hdd" +} + +// BlockVolumeShortInfoMessage is used for delta heartbeats +// (new/deleted block volumes). +type BlockVolumeShortInfoMessage struct { + Path string + VolumeSize uint64 + BlockSize uint32 + DiskType string +} + +// BlockVolumeAssignment carries a role/epoch/lease assignment +// from master to volume server for one block volume. +type BlockVolumeAssignment struct { + Path string // which block volume + Epoch uint64 // new epoch + Role uint32 // target role (blockvol.Role as uint32) + LeaseTtlMs uint32 // lease TTL in milliseconds (0 = no lease) +} + +// ToBlockVolumeInfoMessage converts a BlockVol's current state +// to a heartbeat info message. diskType is caller-supplied metadata +// (e.g. "ssd", "hdd") since the volume itself does not track disk type. +func ToBlockVolumeInfoMessage(path, diskType string, vol *BlockVol) BlockVolumeInfoMessage { + info := vol.Info() + status := vol.Status() + return BlockVolumeInfoMessage{ + Path: path, + VolumeSize: info.VolumeSize, + BlockSize: info.BlockSize, + Epoch: status.Epoch, + Role: RoleToWire(status.Role), + WalHeadLsn: status.WALHeadLSN, + CheckpointLsn: status.CheckpointLSN, + HasLease: status.HasLease, + DiskType: diskType, + } +} + +// ToBlockVolumeShortInfoMessage returns a short info message +// for delta heartbeats. diskType is caller-supplied metadata. +func ToBlockVolumeShortInfoMessage(path, diskType string, vol *BlockVol) BlockVolumeShortInfoMessage { + info := vol.Info() + return BlockVolumeShortInfoMessage{ + Path: path, + VolumeSize: info.VolumeSize, + BlockSize: info.BlockSize, + DiskType: diskType, + } +} + +// maxValidRole is the highest defined Role value. +const maxValidRole = uint32(RoleDraining) + +// RoleFromWire converts a uint32 wire role to blockvol.Role. +// Unknown values are mapped to RoleNone to prevent invalid roles +// from propagating through the system. +func RoleFromWire(r uint32) Role { + if r > maxValidRole { + return RoleNone + } + return Role(r) +} + +// RoleToWire converts a blockvol.Role to uint32 for wire. +func RoleToWire(r Role) uint32 { + return uint32(r) +} + +// LeaseTTLFromWire converts milliseconds to time.Duration. +func LeaseTTLFromWire(ms uint32) time.Duration { + return time.Duration(ms) * time.Millisecond +} + +// LeaseTTLToWire converts time.Duration to milliseconds. +// Durations exceeding ~49.7 days are clamped to math.MaxUint32. +func LeaseTTLToWire(d time.Duration) uint32 { + ms := d.Milliseconds() + if ms > math.MaxUint32 { + return math.MaxUint32 + } + if ms < 0 { + return 0 + } + return uint32(ms) +} diff --git a/weed/storage/blockvol/block_heartbeat_test.go b/weed/storage/blockvol/block_heartbeat_test.go new file mode 100644 index 000000000..b640f89f9 --- /dev/null +++ b/weed/storage/blockvol/block_heartbeat_test.go @@ -0,0 +1,271 @@ +package blockvol + +import ( + "math" + "path/filepath" + "testing" + "time" +) + +func TestBlockHeartbeat(t *testing.T) { + tests := []struct { + name string + run func(t *testing.T) + }{ + {name: "to_info_message_fields", run: testToInfoMessageFields}, + {name: "to_short_info_message", run: testToShortInfoMessage}, + {name: "role_wire_roundtrip", run: testRoleWireRoundtrip}, + {name: "role_from_wire_unknown_maps_to_none", run: testRoleFromWireUnknown}, + {name: "lease_ttl_wire_roundtrip", run: testLeaseTTLWireRoundtrip}, + {name: "lease_ttl_overflow_clamps", run: testLeaseTTLOverflowClamps}, + {name: "disk_type_propagates", run: testDiskTypePropagates}, + {name: "collect_heartbeat_empty", run: testCollectHeartbeatEmpty}, + {name: "collect_heartbeat_multiple", run: testCollectHeartbeatMultiple}, + } + for _, tt := range tests { + t.Run(tt.name, tt.run) + } +} + +// testToInfoMessageFields creates a vol, writes data, and verifies all fields. +func testToInfoMessageFields(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "hb.blk") + + vol, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 1 << 20, // 1MB + BlockSize: 4096, + }) + if err != nil { + t.Fatalf("create: %v", err) + } + defer vol.Close() + + // Write one block so WAL head LSN advances. + data := make([]byte, 4096) + data[0] = 0xAB + if err := vol.WriteLBA(0, data); err != nil { + t.Fatalf("write: %v", err) + } + + msg := ToBlockVolumeInfoMessage(path, "ssd", vol) + + if msg.Path != path { + t.Errorf("Path = %q, want %q", msg.Path, path) + } + if msg.VolumeSize != 1<<20 { + t.Errorf("VolumeSize = %d, want %d", msg.VolumeSize, 1<<20) + } + if msg.BlockSize != 4096 { + t.Errorf("BlockSize = %d, want 4096", msg.BlockSize) + } + if msg.WalHeadLsn == 0 { + t.Error("WalHeadLsn should be > 0 after a write") + } + // Role should be RoleNone (0) for a fresh volume. + if msg.Role != RoleToWire(RoleNone) { + t.Errorf("Role = %d, want %d", msg.Role, RoleToWire(RoleNone)) + } + if msg.DiskType != "ssd" { + t.Errorf("DiskType = %q, want %q", msg.DiskType, "ssd") + } +} + +// testToShortInfoMessage verifies short message has path/size/blockSize/diskType. +func testToShortInfoMessage(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "hb_short.blk") + + vol, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 1 << 20, + BlockSize: 4096, + }) + if err != nil { + t.Fatalf("create: %v", err) + } + defer vol.Close() + + msg := ToBlockVolumeShortInfoMessage(path, "hdd", vol) + + if msg.Path != path { + t.Errorf("Path = %q, want %q", msg.Path, path) + } + if msg.VolumeSize != 1<<20 { + t.Errorf("VolumeSize = %d, want %d", msg.VolumeSize, 1<<20) + } + if msg.BlockSize != 4096 { + t.Errorf("BlockSize = %d, want 4096", msg.BlockSize) + } + if msg.DiskType != "hdd" { + t.Errorf("DiskType = %q, want %q", msg.DiskType, "hdd") + } +} + +// testRoleWireRoundtrip verifies all Role values roundtrip through wire conversion. +func testRoleWireRoundtrip(t *testing.T) { + roles := []Role{RoleNone, RolePrimary, RoleReplica, RoleStale, RoleRebuilding, RoleDraining} + for _, r := range roles { + wire := RoleToWire(r) + back := RoleFromWire(wire) + if back != r { + t.Errorf("RoleFromWire(RoleToWire(%s)) = %s, want %s", r, back, r) + } + } +} + +// testRoleFromWireUnknown verifies unknown wire values map to RoleNone. +func testRoleFromWireUnknown(t *testing.T) { + unknowns := []uint32{100, 255, math.MaxUint32} + for _, u := range unknowns { + got := RoleFromWire(u) + if got != RoleNone { + t.Errorf("RoleFromWire(%d) = %s, want %s", u, got, RoleNone) + } + } +} + +// testLeaseTTLWireRoundtrip verifies various durations roundtrip correctly. +func testLeaseTTLWireRoundtrip(t *testing.T) { + cases := []time.Duration{ + 0, + 1 * time.Millisecond, + 500 * time.Millisecond, + 5 * time.Second, + 30 * time.Second, + 10 * time.Minute, + } + for _, d := range cases { + wire := LeaseTTLToWire(d) + back := LeaseTTLFromWire(wire) + if back != d { + t.Errorf("LeaseTTLFromWire(LeaseTTLToWire(%v)) = %v, want %v", d, back, d) + } + } +} + +// testLeaseTTLOverflowClamps verifies large durations clamp to MaxUint32. +func testLeaseTTLOverflowClamps(t *testing.T) { + huge := 50 * 24 * time.Hour // 50 days > ~49.7 day uint32 ms limit + wire := LeaseTTLToWire(huge) + if wire != math.MaxUint32 { + t.Errorf("LeaseTTLToWire(50 days) = %d, want %d", wire, uint32(math.MaxUint32)) + } + + // Negative duration should clamp to 0. + wire = LeaseTTLToWire(-1 * time.Second) + if wire != 0 { + t.Errorf("LeaseTTLToWire(-1s) = %d, want 0", wire) + } +} + +// testDiskTypePropagates verifies DiskType flows through both message types. +func testDiskTypePropagates(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "dt.blk") + + vol, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 1 << 20, + BlockSize: 4096, + }) + if err != nil { + t.Fatalf("create: %v", err) + } + defer vol.Close() + + info := ToBlockVolumeInfoMessage(path, "nvme", vol) + if info.DiskType != "nvme" { + t.Errorf("info DiskType = %q, want %q", info.DiskType, "nvme") + } + + short := ToBlockVolumeShortInfoMessage(path, "nvme", vol) + if short.DiskType != "nvme" { + t.Errorf("short DiskType = %q, want %q", short.DiskType, "nvme") + } + + // Empty diskType should be allowed (unknown disk). + info2 := ToBlockVolumeInfoMessage(path, "", vol) + if info2.DiskType != "" { + t.Errorf("info DiskType = %q, want empty", info2.DiskType) + } +} + +// testCollectHeartbeatEmpty verifies empty store returns empty slice. +func testCollectHeartbeatEmpty(t *testing.T) { + store := &testBlockVolumeStore{ + volumes: make(map[string]*BlockVol), + diskTypes: make(map[string]string), + } + msgs := collectBlockVolumeHeartbeat(store) + if len(msgs) != 0 { + t.Errorf("expected empty slice, got %d messages", len(msgs)) + } +} + +// testCollectHeartbeatMultiple verifies store with 3 vols returns 3 messages. +func testCollectHeartbeatMultiple(t *testing.T) { + dir := t.TempDir() + store := &testBlockVolumeStore{ + volumes: make(map[string]*BlockVol), + diskTypes: make(map[string]string), + } + + paths := []string{"a.blk", "b.blk", "c.blk"} + dtypes := []string{"ssd", "hdd", "nvme"} + for i, name := range paths { + p := filepath.Join(dir, name) + vol, err := CreateBlockVol(p, CreateOptions{ + VolumeSize: 1 << 20, + BlockSize: 4096, + }) + if err != nil { + t.Fatalf("create %s: %v", name, err) + } + defer vol.Close() + store.volumes[p] = vol + store.diskTypes[p] = dtypes[i] + } + + msgs := collectBlockVolumeHeartbeat(store) + if len(msgs) != 3 { + t.Fatalf("expected 3 messages, got %d", len(msgs)) + } + + // Verify each message has correct fields. + seen := make(map[string]BlockVolumeInfoMessage) + for _, m := range msgs { + seen[m.Path] = m + if m.VolumeSize != 1<<20 { + t.Errorf("msg %s: VolumeSize = %d, want %d", m.Path, m.VolumeSize, 1<<20) + } + if m.BlockSize != 4096 { + t.Errorf("msg %s: BlockSize = %d, want 4096", m.Path, m.BlockSize) + } + } + for i, name := range paths { + p := filepath.Join(dir, name) + m, ok := seen[p] + if !ok { + t.Errorf("missing message for %s", p) + continue + } + if m.DiskType != dtypes[i] { + t.Errorf("msg %s: DiskType = %q, want %q", p, m.DiskType, dtypes[i]) + } + } +} + +// testBlockVolumeStore is a minimal test double to avoid importing the storage package. +type testBlockVolumeStore struct { + volumes map[string]*BlockVol + diskTypes map[string]string +} + +// collectBlockVolumeHeartbeat mirrors BlockVolumeStore.CollectBlockVolumeHeartbeat +// using the test double, exercising the same ToBlockVolumeInfoMessage logic. +func collectBlockVolumeHeartbeat(store *testBlockVolumeStore) []BlockVolumeInfoMessage { + msgs := make([]BlockVolumeInfoMessage, 0, len(store.volumes)) + for path, vol := range store.volumes { + msgs = append(msgs, ToBlockVolumeInfoMessage(path, store.diskTypes[path], vol)) + } + return msgs +} diff --git a/weed/storage/store_blockvol.go b/weed/storage/store_blockvol.go index 98a89779b..49b97e99e 100644 --- a/weed/storage/store_blockvol.go +++ b/weed/storage/store_blockvol.go @@ -12,19 +12,22 @@ import ( // It is a standalone component held by VolumeServer, not embedded into Store, // to keep the existing Store codebase unchanged. type BlockVolumeStore struct { - mu sync.RWMutex - volumes map[string]*blockvol.BlockVol // keyed by volume file path + mu sync.RWMutex + volumes map[string]*blockvol.BlockVol // keyed by volume file path + diskTypes map[string]string // path -> disk type (e.g. "ssd") } // NewBlockVolumeStore creates a new block volume manager. func NewBlockVolumeStore() *BlockVolumeStore { return &BlockVolumeStore{ - volumes: make(map[string]*blockvol.BlockVol), + volumes: make(map[string]*blockvol.BlockVol), + diskTypes: make(map[string]string), } } // AddBlockVolume opens and registers a block volume. -func (bs *BlockVolumeStore) AddBlockVolume(path string, cfgs ...blockvol.BlockVolConfig) (*blockvol.BlockVol, error) { +// diskType is metadata for heartbeat reporting (e.g. "ssd", "hdd"). +func (bs *BlockVolumeStore) AddBlockVolume(path, diskType string, cfgs ...blockvol.BlockVolConfig) (*blockvol.BlockVol, error) { bs.mu.Lock() defer bs.mu.Unlock() @@ -38,7 +41,8 @@ func (bs *BlockVolumeStore) AddBlockVolume(path string, cfgs ...blockvol.BlockVo } bs.volumes[path] = vol - glog.V(0).Infof("block volume registered: %s", path) + bs.diskTypes[path] = diskType + glog.V(0).Infof("block volume registered: %s (disk=%s)", path, diskType) return vol, nil } @@ -56,6 +60,7 @@ func (bs *BlockVolumeStore) RemoveBlockVolume(path string) error { glog.Warningf("error closing block volume %s: %v", path, err) } delete(bs.volumes, path) + delete(bs.diskTypes, path) glog.V(0).Infof("block volume removed: %s", path) return nil } @@ -79,6 +84,18 @@ func (bs *BlockVolumeStore) ListBlockVolumes() []string { return paths } +// CollectBlockVolumeHeartbeat returns status for all registered +// block volumes, suitable for inclusion in a heartbeat message. +func (bs *BlockVolumeStore) CollectBlockVolumeHeartbeat() []blockvol.BlockVolumeInfoMessage { + bs.mu.RLock() + defer bs.mu.RUnlock() + msgs := make([]blockvol.BlockVolumeInfoMessage, 0, len(bs.volumes)) + for path, vol := range bs.volumes { + msgs = append(msgs, blockvol.ToBlockVolumeInfoMessage(path, bs.diskTypes[path], vol)) + } + return msgs +} + // Close closes all block volumes. func (bs *BlockVolumeStore) Close() { bs.mu.Lock() diff --git a/weed/storage/store_blockvol_test.go b/weed/storage/store_blockvol_test.go index 1f8fc8f49..e1858fca7 100644 --- a/weed/storage/store_blockvol_test.go +++ b/weed/storage/store_blockvol_test.go @@ -30,7 +30,7 @@ func TestStoreAddBlockVolume(t *testing.T) { bs := NewBlockVolumeStore() defer bs.Close() - vol, err := bs.AddBlockVolume(path) + vol, err := bs.AddBlockVolume(path, "") if err != nil { t.Fatal(err) } @@ -39,7 +39,7 @@ func TestStoreAddBlockVolume(t *testing.T) { } // Duplicate add should fail. - _, err = bs.AddBlockVolume(path) + _, err = bs.AddBlockVolume(path, "") if err == nil { t.Fatal("expected error on duplicate add") } @@ -64,7 +64,7 @@ func TestStoreRemoveBlockVolume(t *testing.T) { bs := NewBlockVolumeStore() defer bs.Close() - if _, err := bs.AddBlockVolume(path); err != nil { + if _, err := bs.AddBlockVolume(path, ""); err != nil { t.Fatal(err) } @@ -90,10 +90,10 @@ func TestStoreCloseAllBlockVolumes(t *testing.T) { bs := NewBlockVolumeStore() - if _, err := bs.AddBlockVolume(path1); err != nil { + if _, err := bs.AddBlockVolume(path1, ""); err != nil { t.Fatal(err) } - if _, err := bs.AddBlockVolume(path2); err != nil { + if _, err := bs.AddBlockVolume(path2, ""); err != nil { t.Fatal(err) }