Browse Source

feat: Phase 5 CP5-2 -- CoW snapshots, 10 tests

Sparse delta-file snapshots with copy-on-write in the flusher.
Zero write-path overhead when no snapshot is active.

New: snapshot.go (SnapshotBitmap, SnapshotHeader, delta file I/O)
Modified: flusher.go (flushMu, CoW phase in FlushOnce, PauseAndFlush)
Modified: blockvol.go (Create/Read/Delete/Restore/ListSnapshots, recovery)
Modified: wal_writer.go (Reset for snapshot restore)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
d874e21f93
  1. 275
      weed/storage/blockvol/blockvol.go
  2. 128
      weed/storage/blockvol/flusher.go
  3. 296
      weed/storage/blockvol/snapshot.go
  4. 512
      weed/storage/blockvol/snapshot_test.go
  5. 8
      weed/storage/blockvol/wal_writer.go

275
weed/storage/blockvol/blockvol.go

@ -9,6 +9,7 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
@ -58,6 +59,10 @@ type BlockVol struct {
rebuildServer *RebuildServer
assignMu sync.Mutex // serializes HandleAssignment calls
drainTimeout time.Duration // default 10s, for demote drain
// Snapshot fields (Phase 5 CP5-2).
snapMu sync.RWMutex
snapshots map[uint32]*activeSnapshot
}
// CreateBlockVol creates a new block volume file at path.
@ -118,6 +123,7 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B
wal: wal,
dirtyMap: dm,
opsDrained: make(chan struct{}, 1),
snapshots: make(map[uint32]*activeSnapshot),
}
v.nextLSN.Store(1)
v.healthy.Store(true)
@ -211,6 +217,28 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
Interval: cfg.FlushInterval,
})
go v.flusher.Run()
// Discover and reopen existing snapshots.
v.snapshots = make(map[uint32]*activeSnapshot)
snapFiles, _ := filepath.Glob(path + ".snap.*")
for _, sf := range snapFiles {
snap, err := openDeltaFile(sf)
if err != nil {
log.Printf("blockvol: skipping snapshot file %s: %v", sf, err)
continue
}
if snap.header.ParentUUID != v.super.UUID {
log.Printf("blockvol: skipping snapshot %d: parent UUID mismatch", snap.id)
snap.Close()
continue
}
v.snapshots[snap.id] = snap
v.flusher.AddSnapshot(snap)
}
if len(v.snapshots) > 0 {
log.Printf("blockvol: recovered %d snapshot(s)", len(v.snapshots))
}
return v, nil
}
@ -610,6 +638,244 @@ func (v *BlockVol) Status() BlockVolumeStatus {
}
}
// CreateSnapshot creates a point-in-time snapshot. The snapshot captures the
// exact volume state after all pending writes have been flushed to the extent.
// Only allowed on Primary or None (standalone) roles.
func (v *BlockVol) CreateSnapshot(id uint32) error {
if err := v.beginOp(); err != nil {
return err
}
defer v.endOp()
// Role check: only Primary or None (standalone).
r := Role(v.role.Load())
if r != RoleNone && r != RolePrimary {
return ErrSnapshotRoleReject
}
// Hold snapMu across duplicate check + insert to prevent two concurrent
// CreateSnapshot(id) from both passing the check. (Fix #3: TOCTOU race.)
v.snapMu.Lock()
if _, exists := v.snapshots[id]; exists {
v.snapMu.Unlock()
return ErrSnapshotExists
}
// SyncCache: ensure all prior WAL writes are durable.
if err := v.groupCommit.Submit(); err != nil {
v.snapMu.Unlock()
return fmt.Errorf("blockvol: snapshot sync: %w", err)
}
// Pause flusher and flush everything to extent.
if err := v.flusher.PauseAndFlush(); err != nil {
v.flusher.Resume()
v.snapMu.Unlock()
return fmt.Errorf("blockvol: snapshot flush: %w", err)
}
// flusher is now paused (flushMu held) -- extent is consistent.
baseLSN := v.flusher.CheckpointLSN()
// Create delta file.
deltaPath := deltaFilePath(v.path, id)
snap, err := createDeltaFile(deltaPath, id, v, baseLSN)
if err != nil {
v.flusher.Resume()
v.snapMu.Unlock()
return err
}
// Register snapshot in flusher and volume (still under snapMu + flushMu).
v.flusher.AddSnapshot(snap)
v.snapshots[id] = snap
v.snapMu.Unlock()
v.flusher.Resume()
// Update superblock snapshot count.
v.snapMu.RLock()
v.super.SnapshotCount = uint32(len(v.snapshots))
v.snapMu.RUnlock()
return v.persistSuperblock()
}
// ReadSnapshot reads data from a snapshot at the given LBA and length (bytes).
func (v *BlockVol) ReadSnapshot(id uint32, lba uint64, length uint32) ([]byte, error) {
if err := v.beginOp(); err != nil {
return nil, err
}
defer v.endOp()
if err := ValidateWrite(lba, length, v.super.VolumeSize, v.super.BlockSize); err != nil {
return nil, err
}
v.snapMu.RLock()
snap, ok := v.snapshots[id]
v.snapMu.RUnlock()
if !ok {
return nil, ErrSnapshotNotFound
}
blocks := length / v.super.BlockSize
result := make([]byte, length)
for i := uint32(0); i < blocks; i++ {
blockLBA := lba + uint64(i)
off := i * v.super.BlockSize
if snap.bitmap.Get(blockLBA) {
// CoW'd block: read from delta file.
deltaOff := int64(snap.dataOffset + blockLBA*uint64(v.super.BlockSize))
if _, err := snap.fd.ReadAt(result[off:off+v.super.BlockSize], deltaOff); err != nil {
return nil, fmt.Errorf("blockvol: read snapshot delta LBA %d: %w", blockLBA, err)
}
} else {
// Not CoW'd: read from extent (still has snapshot-time data).
extentStart := v.super.WALOffset + v.super.WALSize
extentOff := int64(extentStart + blockLBA*uint64(v.super.BlockSize))
if _, err := v.fd.ReadAt(result[off:off+v.super.BlockSize], extentOff); err != nil {
return nil, fmt.Errorf("blockvol: read snapshot extent LBA %d: %w", blockLBA, err)
}
}
}
return result, nil
}
// DeleteSnapshot removes a snapshot and deletes its delta file.
func (v *BlockVol) DeleteSnapshot(id uint32) error {
if err := v.beginOp(); err != nil {
return err
}
defer v.endOp()
v.snapMu.Lock()
snap, ok := v.snapshots[id]
if !ok {
v.snapMu.Unlock()
return ErrSnapshotNotFound
}
delete(v.snapshots, id)
remaining := uint32(len(v.snapshots))
v.snapMu.Unlock()
// Pause flusher so no in-flight CoW cycle can use snap.fd after close.
// (Fix #1: use-after-close race.)
v.flusher.PauseAndFlush()
v.flusher.RemoveSnapshot(id)
v.flusher.Resume()
// Close and remove delta file. Safe now -- flusher cannot reference snap.
deltaPath := deltaFilePath(v.path, id)
snap.Close()
os.Remove(deltaPath)
// Update superblock.
v.super.SnapshotCount = remaining
return v.persistSuperblock()
}
// RestoreSnapshot reverts the live volume to the state captured by the snapshot.
// This is destructive: all writes after the snapshot are lost. All snapshots are
// removed after restore. The volume must have no active I/O (call after Close
// coordination or on a standalone volume).
func (v *BlockVol) RestoreSnapshot(id uint32) error {
if err := v.beginOp(); err != nil {
return err
}
defer v.endOp()
v.snapMu.RLock()
snap, ok := v.snapshots[id]
v.snapMu.RUnlock()
if !ok {
return ErrSnapshotNotFound
}
// Pause flusher. Check error -- if flush fails, don't proceed.
if err := v.flusher.PauseAndFlush(); err != nil {
v.flusher.Resume()
return fmt.Errorf("blockvol: restore flush: %w", err)
}
defer v.flusher.Resume()
// Copy CoW'd blocks from delta back to extent.
extentStart := v.super.WALOffset + v.super.WALSize
totalBlocks := v.super.VolumeSize / uint64(v.super.BlockSize)
buf := make([]byte, v.super.BlockSize)
for lba := uint64(0); lba < totalBlocks; lba++ {
if snap.bitmap.Get(lba) {
deltaOff := int64(snap.dataOffset + lba*uint64(v.super.BlockSize))
if _, err := snap.fd.ReadAt(buf, deltaOff); err != nil {
return fmt.Errorf("blockvol: restore read delta LBA %d: %w", lba, err)
}
extentOff := int64(extentStart + lba*uint64(v.super.BlockSize))
if _, err := v.fd.WriteAt(buf, extentOff); err != nil {
return fmt.Errorf("blockvol: restore write extent LBA %d: %w", lba, err)
}
}
}
// Fsync extent.
if err := v.fd.Sync(); err != nil {
return fmt.Errorf("blockvol: restore fsync: %w", err)
}
// Clear dirty map.
v.dirtyMap.Clear()
// Reset WAL.
v.wal.Reset()
v.super.WALHead = 0
v.super.WALTail = 0
v.super.WALCheckpointLSN = snap.header.BaseLSN
v.nextLSN.Store(snap.header.BaseLSN + 1)
// Remove ALL snapshots. Flusher is paused, so no CoW cycle can race.
v.snapMu.Lock()
for sid, s := range v.snapshots {
v.flusher.RemoveSnapshot(sid)
s.Close()
os.Remove(deltaFilePath(v.path, sid))
}
v.snapshots = make(map[uint32]*activeSnapshot)
v.snapMu.Unlock()
// Update superblock.
v.super.SnapshotCount = 0
return v.persistSuperblock()
}
// ListSnapshots returns metadata for all active snapshots.
func (v *BlockVol) ListSnapshots() []SnapshotInfo {
v.snapMu.RLock()
defer v.snapMu.RUnlock()
infos := make([]SnapshotInfo, 0, len(v.snapshots))
for _, snap := range v.snapshots {
infos = append(infos, SnapshotInfo{
ID: snap.id,
BaseLSN: snap.header.BaseLSN,
CreatedAt: time.Unix(int64(snap.header.CreatedAt), 0),
CoWBlocks: snap.bitmap.CountSet(),
})
}
return infos
}
// persistSuperblock writes the superblock to disk and fsyncs.
func (v *BlockVol) persistSuperblock() error {
if _, err := v.fd.Seek(0, 0); err != nil {
return fmt.Errorf("blockvol: persist superblock seek: %w", err)
}
if _, err := v.super.WriteTo(v.fd); err != nil {
return fmt.Errorf("blockvol: persist superblock write: %w", err)
}
if err := v.fd.Sync(); err != nil {
return fmt.Errorf("blockvol: persist superblock sync: %w", err)
}
return nil
}
// Close shuts down the block volume and closes the file.
// Shutdown order: shipper -> replica receiver -> rebuild server -> drain ops -> group committer -> flusher -> final flush -> close fd.
func (v *BlockVol) Close() error {
@ -644,6 +910,15 @@ func (v *BlockVol) Close() error {
v.flusher.Stop() // stop background goroutine first (no concurrent flush)
flushErr = v.flusher.FlushOnce() // then do final flush safely
}
// Close snapshot delta files.
v.snapMu.Lock()
for _, snap := range v.snapshots {
snap.Close()
}
v.snapshots = nil
v.snapMu.Unlock()
closeErr := v.fd.Close()
if flushErr != nil {
return flushErr

128
weed/storage/blockvol/flusher.go

@ -25,6 +25,16 @@ type Flusher struct {
checkpointLSN uint64 // last flushed LSN
checkpointTail uint64 // WAL physical tail after last flush
// flushMu serializes FlushOnce calls and is acquired by CreateSnapshot
// to pause the flusher while the snapshot is being set up.
// Lock order: flushMu -> snapMu (flushMu acquired first).
flushMu sync.Mutex
// snapMu protects the snapshots slice. Acquired under flushMu in
// FlushOnce (RLock) and under flushMu in PauseAndFlush callers.
snapMu sync.RWMutex
snapshots []*activeSnapshot
logger *log.Logger
lastErr bool // true if last FlushOnce returned error
@ -126,24 +136,109 @@ func (f *Flusher) Stop() {
<-f.done
}
// FlushOnce performs a single flush cycle: scan dirty map, copy data to
// extent region, fsync, update checkpoint, advance WAL tail.
func (f *Flusher) FlushOnce() error {
// Snapshot dirty entries. We use a full scan (Range over all possible LBAs
// is impractical), so we collect from the dirty map directly.
type flushEntry struct {
lba uint64
walOff uint64
lsn uint64
length uint32
// AddSnapshot adds a snapshot to the flusher's active list.
func (f *Flusher) AddSnapshot(snap *activeSnapshot) {
f.snapMu.Lock()
f.snapshots = append(f.snapshots, snap)
f.snapMu.Unlock()
}
// RemoveSnapshot removes a snapshot from the flusher's active list by ID.
func (f *Flusher) RemoveSnapshot(id uint32) {
f.snapMu.Lock()
for i, s := range f.snapshots {
if s.id == id {
f.snapshots = append(f.snapshots[:i], f.snapshots[i+1:]...)
break
}
}
f.snapMu.Unlock()
}
// HasActiveSnapshots returns true if there are active snapshots needing CoW.
func (f *Flusher) HasActiveSnapshots() bool {
f.snapMu.RLock()
n := len(f.snapshots)
f.snapMu.RUnlock()
return n > 0
}
// PauseAndFlush acquires flushMu (pausing the flusher), then runs FlushOnce.
// The caller must call Resume() when done.
func (f *Flusher) PauseAndFlush() error {
f.flushMu.Lock()
return f.flushOnceLocked()
}
// Resume releases flushMu, allowing the flusher to resume.
func (f *Flusher) Resume() {
f.flushMu.Unlock()
}
// FlushOnce performs a single flush cycle: scan dirty map, CoW for active
// snapshots, copy data to extent region, fsync, update checkpoint, advance WAL tail.
func (f *Flusher) FlushOnce() error {
f.flushMu.Lock()
defer f.flushMu.Unlock()
return f.flushOnceLocked()
}
// flushOnceLocked is the inner FlushOnce. Caller must hold flushMu.
func (f *Flusher) flushOnceLocked() error {
entries := f.dirtyMap.Snapshot()
if len(entries) == 0 {
return nil
}
// Find the max LSN and max WAL offset to know where to advance tail.
// --- Phase 1: CoW for active snapshots ---
f.snapMu.RLock()
snaps := make([]*activeSnapshot, len(f.snapshots))
copy(snaps, f.snapshots)
f.snapMu.RUnlock()
if len(snaps) > 0 {
cowDirty := false
for _, e := range entries {
for _, snap := range snaps {
if !snap.bitmap.Get(e.Lba) {
// Read old data from extent (pre-modification state).
oldData := make([]byte, f.blockSize)
extentOff := int64(f.extentStart + e.Lba*uint64(f.blockSize))
if _, err := f.fd.ReadAt(oldData, extentOff); err != nil {
return fmt.Errorf("flusher: CoW read extent LBA %d: %w", e.Lba, err)
}
// Write old data to delta file.
deltaOff := int64(snap.dataOffset + e.Lba*uint64(f.blockSize))
if _, err := snap.fd.WriteAt(oldData, deltaOff); err != nil {
return fmt.Errorf("flusher: CoW write delta LBA %d snap %d: %w", e.Lba, snap.id, err)
}
snap.bitmap.Set(e.Lba)
snap.dirty = true
cowDirty = true
}
}
}
if cowDirty {
// Crash safety: delta data -> fsync -> bitmap persist -> fsync -> extent write.
for _, snap := range snaps {
if snap.dirty {
if err := snap.fd.Sync(); err != nil {
return fmt.Errorf("flusher: fsync delta snap %d: %w", snap.id, err)
}
if err := snap.bitmap.WriteTo(snap.fd, SnapHeaderSize); err != nil {
return fmt.Errorf("flusher: persist bitmap snap %d: %w", snap.id, err)
}
if err := snap.fd.Sync(); err != nil {
return fmt.Errorf("flusher: fsync bitmap snap %d: %w", snap.id, err)
}
snap.dirty = false
}
}
}
}
// --- Phase 2: Extent writes (existing, unchanged) ---
var maxLSN uint64
var maxWALEnd uint64
@ -156,8 +251,6 @@ func (f *Flusher) FlushOnce() error {
}
// WAL reuse guard: validate LSN before trusting the entry.
// Between Snapshot() and this read, the WAL slot could have been
// reused (by a previous flush cycle advancing the tail + new writes).
entryLSN := binary.LittleEndian.Uint64(headerBuf[0:8])
if entryLSN != e.Lsn {
continue // stale --WAL slot reused, skip this entry
@ -180,9 +273,6 @@ func (f *Flusher) FlushOnce() error {
continue // corrupt or partially overwritten --skip
}
// Write only this block's data to extent (not all blocks in the
// WAL entry). Other blocks may have been overwritten by newer
// writes and their dirty map entries point elsewhere.
if e.Lba < entry.LBA {
continue // LBA mismatch --stale entry
}
@ -196,21 +286,17 @@ func (f *Flusher) FlushOnce() error {
}
}
// Track WAL end position for tail advance.
walEnd := e.WalOffset + uint64(entryLen)
if walEnd > maxWALEnd {
maxWALEnd = walEnd
}
} else if entryType == EntryTypeTrim {
// TRIM entries: zero the extent region for this LBA.
// Each dirty map entry represents one trimmed block.
zeroBlock := make([]byte, f.blockSize)
extentOff := int64(f.extentStart + e.Lba*uint64(f.blockSize))
if _, err := f.fd.WriteAt(zeroBlock, extentOff); err != nil {
return fmt.Errorf("flusher: zero extent at LBA %d: %w", e.Lba, err)
}
// TRIM entry has no data payload, just a header.
walEnd := e.WalOffset + uint64(walEntryHeaderSize)
if walEnd > maxWALEnd {
maxWALEnd = walEnd
@ -230,8 +316,6 @@ func (f *Flusher) FlushOnce() error {
// Remove flushed entries from dirty map.
f.mu.Lock()
for _, e := range entries {
// Only remove if the dirty map entry still has the same LSN
// (a newer write may have updated it).
_, currentLSN, _, ok := f.dirtyMap.Get(e.Lba)
if ok && currentLSN == e.Lsn {
f.dirtyMap.Delete(e.Lba)

296
weed/storage/blockvol/snapshot.go

@ -0,0 +1,296 @@
package blockvol
import (
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"time"
)
// Snapshot delta file constants.
const (
SnapMagic = "SWBS"
SnapVersion = 1
SnapHeaderSize = 4096
)
var (
ErrSnapshotNotFound = errors.New("blockvol: snapshot not found")
ErrSnapshotExists = errors.New("blockvol: snapshot already exists")
ErrSnapshotBadMagic = errors.New("blockvol: snapshot bad magic")
ErrSnapshotBadVersion = errors.New("blockvol: snapshot unsupported version")
ErrSnapshotBadParent = errors.New("blockvol: snapshot parent UUID mismatch")
ErrSnapshotRoleReject = errors.New("blockvol: snapshots only allowed on primary or standalone")
)
// SnapshotBitmap is a dense bitset tracking which blocks have been CoW'd.
// No internal locking -- callers (flusher, CreateSnapshot) serialize access.
type SnapshotBitmap struct {
data []byte
bits uint64 // total number of bits (= VolumeSize/BlockSize)
}
// NewSnapshotBitmap creates a zero-initialized bitmap for totalBlocks blocks.
func NewSnapshotBitmap(totalBlocks uint64) *SnapshotBitmap {
byteLen := (totalBlocks + 7) / 8
return &SnapshotBitmap{
data: make([]byte, byteLen),
bits: totalBlocks,
}
}
// Get returns true if the bit at position lba is set.
func (b *SnapshotBitmap) Get(lba uint64) bool {
if lba >= b.bits {
return false
}
return b.data[lba/8]&(1<<(lba%8)) != 0
}
// Set sets the bit at position lba.
func (b *SnapshotBitmap) Set(lba uint64) {
if lba >= b.bits {
return
}
b.data[lba/8] |= 1 << (lba % 8)
}
// ByteSize returns the number of bytes in the bitmap.
func (b *SnapshotBitmap) ByteSize() int {
return len(b.data)
}
// WriteTo writes the bitmap data to w at the given offset.
func (b *SnapshotBitmap) WriteTo(w io.WriterAt, offset int64) error {
_, err := w.WriteAt(b.data, offset)
return err
}
// ReadFrom reads bitmap data from r at the given offset.
func (b *SnapshotBitmap) ReadFrom(r io.ReaderAt, offset int64) error {
_, err := r.ReadAt(b.data, offset)
return err
}
// CountSet returns the number of set bits (CoW'd blocks).
func (b *SnapshotBitmap) CountSet() uint64 {
var count uint64
for _, v := range b.data {
count += uint64(popcount8(v))
}
return count
}
// popcount8 returns the number of set bits in a byte.
func popcount8(x byte) int {
x = x - ((x >> 1) & 0x55)
x = (x & 0x33) + ((x >> 2) & 0x33)
return int((x + (x >> 4)) & 0x0F)
}
// SnapshotHeader is the on-disk header for a snapshot delta file.
type SnapshotHeader struct {
Magic [4]byte
Version uint16
SnapshotID uint32
BaseLSN uint64
VolumeSize uint64
BlockSize uint32
BitmapSize uint64
DataOffset uint64 // = SnapHeaderSize + BitmapSize, aligned to BlockSize
CreatedAt uint64
ParentUUID [16]byte
}
// WriteTo serializes the header as a SnapHeaderSize-byte block to w.
func (h *SnapshotHeader) WriteTo(w io.Writer) (int64, error) {
buf := make([]byte, SnapHeaderSize)
endian := binary.LittleEndian
off := 0
off += copy(buf[off:], h.Magic[:])
endian.PutUint16(buf[off:], h.Version)
off += 2
endian.PutUint32(buf[off:], h.SnapshotID)
off += 4
endian.PutUint64(buf[off:], h.BaseLSN)
off += 8
endian.PutUint64(buf[off:], h.VolumeSize)
off += 8
endian.PutUint32(buf[off:], h.BlockSize)
off += 4
endian.PutUint64(buf[off:], h.BitmapSize)
off += 8
endian.PutUint64(buf[off:], h.DataOffset)
off += 8
endian.PutUint64(buf[off:], h.CreatedAt)
off += 8
copy(buf[off:], h.ParentUUID[:])
n, err := w.Write(buf)
return int64(n), err
}
// ReadSnapshotHeader reads a SnapshotHeader from r.
func ReadSnapshotHeader(r io.Reader) (*SnapshotHeader, error) {
buf := make([]byte, SnapHeaderSize)
if _, err := io.ReadFull(r, buf); err != nil {
return nil, fmt.Errorf("blockvol: read snapshot header: %w", err)
}
endian := binary.LittleEndian
h := &SnapshotHeader{}
off := 0
copy(h.Magic[:], buf[off:off+4])
off += 4
if string(h.Magic[:]) != SnapMagic {
return nil, ErrSnapshotBadMagic
}
h.Version = endian.Uint16(buf[off:])
off += 2
if h.Version != SnapVersion {
return nil, fmt.Errorf("%w: got %d, want %d", ErrSnapshotBadVersion, h.Version, SnapVersion)
}
h.SnapshotID = endian.Uint32(buf[off:])
off += 4
h.BaseLSN = endian.Uint64(buf[off:])
off += 8
h.VolumeSize = endian.Uint64(buf[off:])
off += 8
h.BlockSize = endian.Uint32(buf[off:])
off += 4
h.BitmapSize = endian.Uint64(buf[off:])
off += 8
h.DataOffset = endian.Uint64(buf[off:])
off += 8
h.CreatedAt = endian.Uint64(buf[off:])
off += 8
copy(h.ParentUUID[:], buf[off:off+16])
return h, nil
}
// activeSnapshot represents an open snapshot delta file with its in-memory bitmap.
type activeSnapshot struct {
id uint32
fd *os.File
header SnapshotHeader
bitmap *SnapshotBitmap
dataOffset uint64
dirty bool // bitmap changed since last persist
}
// Close closes the delta file.
func (s *activeSnapshot) Close() error {
return s.fd.Close()
}
// deltaFilePath returns the path for a snapshot delta file.
func deltaFilePath(volPath string, id uint32) string {
return fmt.Sprintf("%s.snap.%d", volPath, id)
}
// createDeltaFile creates a new snapshot delta file and returns an activeSnapshot.
func createDeltaFile(path string, id uint32, vol *BlockVol, baseLSN uint64) (*activeSnapshot, error) {
totalBlocks := vol.super.VolumeSize / uint64(vol.super.BlockSize)
bitmap := NewSnapshotBitmap(totalBlocks)
bitmapSize := uint64(bitmap.ByteSize())
// Align DataOffset to BlockSize.
dataOffset := uint64(SnapHeaderSize) + bitmapSize
rem := dataOffset % uint64(vol.super.BlockSize)
if rem != 0 {
dataOffset += uint64(vol.super.BlockSize) - rem
}
hdr := SnapshotHeader{
Version: SnapVersion,
SnapshotID: id,
BaseLSN: baseLSN,
VolumeSize: vol.super.VolumeSize,
BlockSize: vol.super.BlockSize,
BitmapSize: bitmapSize,
DataOffset: dataOffset,
CreatedAt: uint64(time.Now().Unix()),
ParentUUID: vol.super.UUID,
}
copy(hdr.Magic[:], SnapMagic)
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0644)
if err != nil {
return nil, fmt.Errorf("blockvol: create delta file: %w", err)
}
// Truncate to full size (sparse file -- only header+bitmap consume disk).
totalSize := int64(dataOffset + vol.super.VolumeSize)
if err := fd.Truncate(totalSize); err != nil {
fd.Close()
os.Remove(path)
return nil, fmt.Errorf("blockvol: truncate delta: %w", err)
}
// Write header.
if _, err := hdr.WriteTo(fd); err != nil {
fd.Close()
os.Remove(path)
return nil, fmt.Errorf("blockvol: write snapshot header: %w", err)
}
// Write zero bitmap.
if err := bitmap.WriteTo(fd, SnapHeaderSize); err != nil {
fd.Close()
os.Remove(path)
return nil, fmt.Errorf("blockvol: write snapshot bitmap: %w", err)
}
// Fsync delta file.
if err := fd.Sync(); err != nil {
fd.Close()
os.Remove(path)
return nil, fmt.Errorf("blockvol: sync delta: %w", err)
}
return &activeSnapshot{
id: id,
fd: fd,
header: hdr,
bitmap: bitmap,
dataOffset: dataOffset,
}, nil
}
// openDeltaFile opens an existing snapshot delta file, reads its header and bitmap.
func openDeltaFile(path string) (*activeSnapshot, error) {
fd, err := os.OpenFile(path, os.O_RDWR, 0644)
if err != nil {
return nil, fmt.Errorf("blockvol: open delta file: %w", err)
}
hdr, err := ReadSnapshotHeader(fd)
if err != nil {
fd.Close()
return nil, err
}
totalBlocks := hdr.VolumeSize / uint64(hdr.BlockSize)
bitmap := NewSnapshotBitmap(totalBlocks)
if err := bitmap.ReadFrom(fd, SnapHeaderSize); err != nil {
fd.Close()
return nil, fmt.Errorf("blockvol: read snapshot bitmap: %w", err)
}
return &activeSnapshot{
id: hdr.SnapshotID,
fd: fd,
header: *hdr,
bitmap: bitmap,
dataOffset: hdr.DataOffset,
}, nil
}
// SnapshotInfo contains read-only snapshot metadata for listing.
type SnapshotInfo struct {
ID uint32
BaseLSN uint64
CreatedAt time.Time
CoWBlocks uint64 // number of CoW'd blocks (bitmap.CountSet())
}

512
weed/storage/blockvol/snapshot_test.go

@ -0,0 +1,512 @@
package blockvol
import (
"bytes"
"os"
"path/filepath"
"sync"
"testing"
"time"
)
func TestSnapshots(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{name: "bitmap_get_set", run: testSnap_BitmapGetSet},
{name: "bitmap_persist_reload", run: testSnap_BitmapPersistReload},
{name: "header_roundtrip", run: testSnap_HeaderRoundtrip},
{name: "create_and_read", run: testSnap_CreateAndRead},
{name: "multiple_snapshots", run: testSnap_MultipleSnapshots},
{name: "delete_removes_file", run: testSnap_DeleteRemovesFile},
{name: "cow_only_touched_blocks", run: testSnap_CoWOnlyTouchedBlocks},
{name: "during_active_writes", run: testSnap_DuringActiveWrites},
{name: "survives_recovery", run: testSnap_SurvivesRecovery},
{name: "restore_rewinds", run: testSnap_RestoreRewinds},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func testSnap_BitmapGetSet(t *testing.T) {
bm := NewSnapshotBitmap(1024)
// All bits start as zero.
for i := uint64(0); i < 1024; i++ {
if bm.Get(i) {
t.Fatalf("bit %d should be 0", i)
}
}
// Set some bits.
bm.Set(0)
bm.Set(7)
bm.Set(8)
bm.Set(1023)
if !bm.Get(0) {
t.Fatal("bit 0 not set")
}
if !bm.Get(7) {
t.Fatal("bit 7 not set")
}
if !bm.Get(8) {
t.Fatal("bit 8 not set")
}
if !bm.Get(1023) {
t.Fatal("bit 1023 not set")
}
if bm.Get(1) {
t.Fatal("bit 1 should not be set")
}
if bm.CountSet() != 4 {
t.Fatalf("CountSet = %d, want 4", bm.CountSet())
}
// Out-of-range: Get returns false, Set is a no-op.
if bm.Get(1024) {
t.Fatal("out-of-range Get should return false")
}
bm.Set(1024) // no panic
// ByteSize.
if bm.ByteSize() != 128 { // 1024/8 = 128
t.Fatalf("ByteSize = %d, want 128", bm.ByteSize())
}
}
func testSnap_BitmapPersistReload(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "bitmap.dat")
bm := NewSnapshotBitmap(256)
bm.Set(0)
bm.Set(100)
bm.Set(255)
// Write to file.
fd, err := os.Create(path)
if err != nil {
t.Fatal(err)
}
if err := bm.WriteTo(fd, 0); err != nil {
t.Fatal(err)
}
fd.Close()
// Read back.
fd2, err := os.Open(path)
if err != nil {
t.Fatal(err)
}
defer fd2.Close()
bm2 := NewSnapshotBitmap(256)
if err := bm2.ReadFrom(fd2, 0); err != nil {
t.Fatal(err)
}
if !bm2.Get(0) || !bm2.Get(100) || !bm2.Get(255) {
t.Fatal("reloaded bitmap missing set bits")
}
if bm2.Get(1) || bm2.Get(99) || bm2.Get(254) {
t.Fatal("reloaded bitmap has spurious bits")
}
if bm2.CountSet() != 3 {
t.Fatalf("CountSet = %d, want 3", bm2.CountSet())
}
}
func testSnap_HeaderRoundtrip(t *testing.T) {
hdr := SnapshotHeader{
Version: SnapVersion,
SnapshotID: 42,
BaseLSN: 1000,
VolumeSize: 1 << 30,
BlockSize: 4096,
BitmapSize: 32768,
DataOffset: 36864,
CreatedAt: uint64(time.Now().Unix()),
}
copy(hdr.Magic[:], SnapMagic)
copy(hdr.ParentUUID[:], "0123456789abcdef")
var buf bytes.Buffer
if _, err := hdr.WriteTo(&buf); err != nil {
t.Fatal(err)
}
if buf.Len() != SnapHeaderSize {
t.Fatalf("header size = %d, want %d", buf.Len(), SnapHeaderSize)
}
hdr2, err := ReadSnapshotHeader(&buf)
if err != nil {
t.Fatal(err)
}
if hdr2.SnapshotID != 42 {
t.Fatalf("SnapshotID = %d, want 42", hdr2.SnapshotID)
}
if hdr2.BaseLSN != 1000 {
t.Fatalf("BaseLSN = %d, want 1000", hdr2.BaseLSN)
}
if hdr2.VolumeSize != 1<<30 {
t.Fatalf("VolumeSize = %d, want %d", hdr2.VolumeSize, 1<<30)
}
if hdr2.BlockSize != 4096 {
t.Fatalf("BlockSize = %d, want 4096", hdr2.BlockSize)
}
if hdr2.BitmapSize != 32768 {
t.Fatalf("BitmapSize = %d", hdr2.BitmapSize)
}
if hdr2.DataOffset != 36864 {
t.Fatalf("DataOffset = %d", hdr2.DataOffset)
}
if hdr2.ParentUUID != hdr.ParentUUID {
t.Fatalf("ParentUUID mismatch")
}
}
func testSnap_CreateAndRead(t *testing.T) {
v := createTestVol(t)
defer v.Close()
// Write 'A' to LBA 0.
if err := v.WriteLBA(0, makeBlock('A')); err != nil {
t.Fatal(err)
}
if err := v.SyncCache(); err != nil {
t.Fatal(err)
}
// Create snapshot 1.
if err := v.CreateSnapshot(1); err != nil {
t.Fatal(err)
}
// Write 'B' to LBA 0 (after snapshot).
if err := v.WriteLBA(0, makeBlock('B')); err != nil {
t.Fatal(err)
}
if err := v.SyncCache(); err != nil {
t.Fatal(err)
}
// Force flush so CoW happens.
v.flusher.FlushOnce()
// Live read should see 'B'.
live, err := v.ReadLBA(0, 4096)
if err != nil {
t.Fatal(err)
}
if live[0] != 'B' {
t.Fatalf("live read: got %c, want B", live[0])
}
// Snapshot read should see 'A'.
snapData, err := v.ReadSnapshot(1, 0, 4096)
if err != nil {
t.Fatal(err)
}
if snapData[0] != 'A' {
t.Fatalf("snapshot read: got %c, want A", snapData[0])
}
}
func testSnap_MultipleSnapshots(t *testing.T) {
v := createTestVol(t)
defer v.Close()
// Write 'A' to LBA 5, snapshot S1.
if err := v.WriteLBA(5, makeBlock('A')); err != nil {
t.Fatal(err)
}
v.SyncCache()
if err := v.CreateSnapshot(1); err != nil {
t.Fatal(err)
}
// Write 'B' to LBA 5, snapshot S2.
if err := v.WriteLBA(5, makeBlock('B')); err != nil {
t.Fatal(err)
}
v.SyncCache()
v.flusher.FlushOnce() // CoW 'A' to S1
if err := v.CreateSnapshot(2); err != nil {
t.Fatal(err)
}
// Write 'C' to LBA 5.
if err := v.WriteLBA(5, makeBlock('C')); err != nil {
t.Fatal(err)
}
v.SyncCache()
v.flusher.FlushOnce() // CoW 'B' to S2, S1 already done
// S1 sees 'A', S2 sees 'B', live sees 'C'.
s1, err := v.ReadSnapshot(1, 5, 4096)
if err != nil {
t.Fatal(err)
}
if s1[0] != 'A' {
t.Fatalf("S1: got %c, want A", s1[0])
}
s2, err := v.ReadSnapshot(2, 5, 4096)
if err != nil {
t.Fatal(err)
}
if s2[0] != 'B' {
t.Fatalf("S2: got %c, want B", s2[0])
}
live, err := v.ReadLBA(5, 4096)
if err != nil {
t.Fatal(err)
}
if live[0] != 'C' {
t.Fatalf("live: got %c, want C", live[0])
}
// List should show 2 snapshots.
infos := v.ListSnapshots()
if len(infos) != 2 {
t.Fatalf("ListSnapshots: got %d, want 2", len(infos))
}
}
func testSnap_DeleteRemovesFile(t *testing.T) {
v := createTestVol(t)
defer v.Close()
v.WriteLBA(0, makeBlock('X'))
v.SyncCache()
v.CreateSnapshot(1)
deltaPath := deltaFilePath(v.path, 1)
if _, err := os.Stat(deltaPath); err != nil {
t.Fatalf("delta file should exist: %v", err)
}
if err := v.DeleteSnapshot(1); err != nil {
t.Fatal(err)
}
if _, err := os.Stat(deltaPath); !os.IsNotExist(err) {
t.Fatalf("delta file should be removed, got err: %v", err)
}
// Reading deleted snapshot returns error.
if _, err := v.ReadSnapshot(1, 0, 4096); err != ErrSnapshotNotFound {
t.Fatalf("expected ErrSnapshotNotFound, got %v", err)
}
if len(v.ListSnapshots()) != 0 {
t.Fatal("ListSnapshots should be empty")
}
}
func testSnap_CoWOnlyTouchedBlocks(t *testing.T) {
v := createTestVol(t)
defer v.Close()
// Write LBAs 0, 1, 2.
for i := uint64(0); i < 3; i++ {
v.WriteLBA(i, makeBlock(byte('A'+i)))
}
v.SyncCache()
v.CreateSnapshot(1)
// Only modify LBA 1.
v.WriteLBA(1, makeBlock('Z'))
v.SyncCache()
v.flusher.FlushOnce()
// Check bitmap: only LBA 1 should be CoW'd.
v.snapMu.RLock()
snap := v.snapshots[1]
cowCount := snap.bitmap.CountSet()
v.snapMu.RUnlock()
if cowCount != 1 {
t.Fatalf("CoW count = %d, want 1", cowCount)
}
// Snapshot should still see original values.
for i := uint64(0); i < 3; i++ {
data, err := v.ReadSnapshot(1, i, 4096)
if err != nil {
t.Fatal(err)
}
expected := byte('A' + i)
if data[0] != expected {
t.Fatalf("LBA %d: got %c, want %c", i, data[0], expected)
}
}
}
func testSnap_DuringActiveWrites(t *testing.T) {
v := createTestVol(t)
defer v.Close()
// Write initial data.
v.WriteLBA(0, makeBlock('A'))
v.SyncCache()
// Start concurrent writes and create snapshot.
var wg sync.WaitGroup
errCh := make(chan error, 20)
// Writer goroutine: continuously write to LBA 10.
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
if err := v.WriteLBA(10, makeBlock(byte('0'+i))); err != nil {
errCh <- err
return
}
time.Sleep(1 * time.Millisecond)
}
}()
// Snapshot creation mid-writes.
time.Sleep(5 * time.Millisecond)
if err := v.CreateSnapshot(1); err != nil {
t.Fatal(err)
}
wg.Wait()
close(errCh)
for err := range errCh {
t.Fatalf("concurrent write error: %v", err)
}
// Snapshot should exist and be readable.
_, err := v.ReadSnapshot(1, 0, 4096)
if err != nil {
t.Fatal(err)
}
}
func testSnap_SurvivesRecovery(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.blockvol")
// Create volume, write, snapshot.
v, err := CreateBlockVol(path, CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
WALSize: 256 * 1024,
})
if err != nil {
t.Fatal(err)
}
v.WriteLBA(0, makeBlock('A'))
v.SyncCache()
v.CreateSnapshot(1)
// Write new data after snapshot.
v.WriteLBA(0, makeBlock('B'))
v.SyncCache()
v.flusher.FlushOnce()
// Close and reopen.
v.Close()
v2, err := OpenBlockVol(path)
if err != nil {
t.Fatal(err)
}
defer v2.Close()
// Snapshot should survive.
if len(v2.ListSnapshots()) != 1 {
t.Fatalf("expected 1 snapshot after recovery, got %d", len(v2.ListSnapshots()))
}
// Snapshot data should read 'A'.
snapData, err := v2.ReadSnapshot(1, 0, 4096)
if err != nil {
t.Fatal(err)
}
if snapData[0] != 'A' {
t.Fatalf("snapshot after recovery: got %c, want A", snapData[0])
}
// Live data should read 'B'.
live, err := v2.ReadLBA(0, 4096)
if err != nil {
t.Fatal(err)
}
if live[0] != 'B' {
t.Fatalf("live after recovery: got %c, want B", live[0])
}
}
func testSnap_RestoreRewinds(t *testing.T) {
v := createTestVol(t)
defer v.Close()
// Write 'A' to LBA 0 and 'X' to LBA 1.
v.WriteLBA(0, makeBlock('A'))
v.WriteLBA(1, makeBlock('X'))
v.SyncCache()
v.CreateSnapshot(1)
// Write 'B' to LBA 0 (overwrites 'A').
v.WriteLBA(0, makeBlock('B'))
v.SyncCache()
v.flusher.FlushOnce()
// Live should be 'B'.
live, _ := v.ReadLBA(0, 4096)
if live[0] != 'B' {
t.Fatalf("pre-restore live: got %c, want B", live[0])
}
// Restore snapshot 1.
if err := v.RestoreSnapshot(1); err != nil {
t.Fatal(err)
}
// Live should now be 'A' (reverted).
live2, err := v.ReadLBA(0, 4096)
if err != nil {
t.Fatal(err)
}
if live2[0] != 'A' {
t.Fatalf("post-restore LBA 0: got %c, want A", live2[0])
}
// LBA 1 should still be 'X' (unchanged, not CoW'd).
live3, err := v.ReadLBA(1, 4096)
if err != nil {
t.Fatal(err)
}
if live3[0] != 'X' {
t.Fatalf("post-restore LBA 1: got %c, want X", live3[0])
}
// All snapshots should be gone.
if len(v.ListSnapshots()) != 0 {
t.Fatalf("snapshots should be empty after restore, got %d", len(v.ListSnapshots()))
}
// Volume should still be writable.
if err := v.WriteLBA(0, makeBlock('C')); err != nil {
t.Fatalf("write after restore: %v", err)
}
v.SyncCache()
data, err := v.ReadLBA(0, 4096)
if err != nil {
t.Fatal(err)
}
if data[0] != 'C' {
t.Fatalf("read after restore write: got %c, want C", data[0])
}
}

8
weed/storage/blockvol/wal_writer.go

@ -155,6 +155,14 @@ func (w *WALWriter) AdvanceTail(newTail uint64) {
w.mu.Unlock()
}
// Reset resets the WAL to empty state (head=tail=0). Used by snapshot restore.
func (w *WALWriter) Reset() {
w.mu.Lock()
w.logicalHead = 0
w.logicalTail = 0
w.mu.Unlock()
}
// Head returns the current physical head position (relative to WAL start).
func (w *WALWriter) Head() uint64 {
w.mu.Lock()

Loading…
Cancel
Save