From d874e21f93c8247b2766fbc91ac9c2a6223db6f2 Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Tue, 3 Mar 2026 00:53:33 -0800 Subject: [PATCH] feat: Phase 5 CP5-2 -- CoW snapshots, 10 tests Sparse delta-file snapshots with copy-on-write in the flusher. Zero write-path overhead when no snapshot is active. New: snapshot.go (SnapshotBitmap, SnapshotHeader, delta file I/O) Modified: flusher.go (flushMu, CoW phase in FlushOnce, PauseAndFlush) Modified: blockvol.go (Create/Read/Delete/Restore/ListSnapshots, recovery) Modified: wal_writer.go (Reset for snapshot restore) Co-Authored-By: Claude Opus 4.6 --- weed/storage/blockvol/blockvol.go | 275 +++++++++++++ weed/storage/blockvol/flusher.go | 128 +++++-- weed/storage/blockvol/snapshot.go | 296 ++++++++++++++ weed/storage/blockvol/snapshot_test.go | 512 +++++++++++++++++++++++++ weed/storage/blockvol/wal_writer.go | 8 + 5 files changed, 1197 insertions(+), 22 deletions(-) create mode 100644 weed/storage/blockvol/snapshot.go create mode 100644 weed/storage/blockvol/snapshot_test.go diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index e523fdf32..ebb607349 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -9,6 +9,7 @@ import ( "fmt" "log" "os" + "path/filepath" "sync" "sync/atomic" "time" @@ -58,6 +59,10 @@ type BlockVol struct { rebuildServer *RebuildServer assignMu sync.Mutex // serializes HandleAssignment calls drainTimeout time.Duration // default 10s, for demote drain + + // Snapshot fields (Phase 5 CP5-2). + snapMu sync.RWMutex + snapshots map[uint32]*activeSnapshot } // CreateBlockVol creates a new block volume file at path. @@ -118,6 +123,7 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B wal: wal, dirtyMap: dm, opsDrained: make(chan struct{}, 1), + snapshots: make(map[uint32]*activeSnapshot), } v.nextLSN.Store(1) v.healthy.Store(true) @@ -211,6 +217,28 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) { Interval: cfg.FlushInterval, }) go v.flusher.Run() + + // Discover and reopen existing snapshots. + v.snapshots = make(map[uint32]*activeSnapshot) + snapFiles, _ := filepath.Glob(path + ".snap.*") + for _, sf := range snapFiles { + snap, err := openDeltaFile(sf) + if err != nil { + log.Printf("blockvol: skipping snapshot file %s: %v", sf, err) + continue + } + if snap.header.ParentUUID != v.super.UUID { + log.Printf("blockvol: skipping snapshot %d: parent UUID mismatch", snap.id) + snap.Close() + continue + } + v.snapshots[snap.id] = snap + v.flusher.AddSnapshot(snap) + } + if len(v.snapshots) > 0 { + log.Printf("blockvol: recovered %d snapshot(s)", len(v.snapshots)) + } + return v, nil } @@ -610,6 +638,244 @@ func (v *BlockVol) Status() BlockVolumeStatus { } } +// CreateSnapshot creates a point-in-time snapshot. The snapshot captures the +// exact volume state after all pending writes have been flushed to the extent. +// Only allowed on Primary or None (standalone) roles. +func (v *BlockVol) CreateSnapshot(id uint32) error { + if err := v.beginOp(); err != nil { + return err + } + defer v.endOp() + + // Role check: only Primary or None (standalone). + r := Role(v.role.Load()) + if r != RoleNone && r != RolePrimary { + return ErrSnapshotRoleReject + } + + // Hold snapMu across duplicate check + insert to prevent two concurrent + // CreateSnapshot(id) from both passing the check. (Fix #3: TOCTOU race.) + v.snapMu.Lock() + if _, exists := v.snapshots[id]; exists { + v.snapMu.Unlock() + return ErrSnapshotExists + } + + // SyncCache: ensure all prior WAL writes are durable. + if err := v.groupCommit.Submit(); err != nil { + v.snapMu.Unlock() + return fmt.Errorf("blockvol: snapshot sync: %w", err) + } + + // Pause flusher and flush everything to extent. + if err := v.flusher.PauseAndFlush(); err != nil { + v.flusher.Resume() + v.snapMu.Unlock() + return fmt.Errorf("blockvol: snapshot flush: %w", err) + } + // flusher is now paused (flushMu held) -- extent is consistent. + + baseLSN := v.flusher.CheckpointLSN() + + // Create delta file. + deltaPath := deltaFilePath(v.path, id) + snap, err := createDeltaFile(deltaPath, id, v, baseLSN) + if err != nil { + v.flusher.Resume() + v.snapMu.Unlock() + return err + } + + // Register snapshot in flusher and volume (still under snapMu + flushMu). + v.flusher.AddSnapshot(snap) + v.snapshots[id] = snap + v.snapMu.Unlock() + v.flusher.Resume() + + // Update superblock snapshot count. + v.snapMu.RLock() + v.super.SnapshotCount = uint32(len(v.snapshots)) + v.snapMu.RUnlock() + return v.persistSuperblock() +} + +// ReadSnapshot reads data from a snapshot at the given LBA and length (bytes). +func (v *BlockVol) ReadSnapshot(id uint32, lba uint64, length uint32) ([]byte, error) { + if err := v.beginOp(); err != nil { + return nil, err + } + defer v.endOp() + + if err := ValidateWrite(lba, length, v.super.VolumeSize, v.super.BlockSize); err != nil { + return nil, err + } + + v.snapMu.RLock() + snap, ok := v.snapshots[id] + v.snapMu.RUnlock() + if !ok { + return nil, ErrSnapshotNotFound + } + + blocks := length / v.super.BlockSize + result := make([]byte, length) + + for i := uint32(0); i < blocks; i++ { + blockLBA := lba + uint64(i) + off := i * v.super.BlockSize + if snap.bitmap.Get(blockLBA) { + // CoW'd block: read from delta file. + deltaOff := int64(snap.dataOffset + blockLBA*uint64(v.super.BlockSize)) + if _, err := snap.fd.ReadAt(result[off:off+v.super.BlockSize], deltaOff); err != nil { + return nil, fmt.Errorf("blockvol: read snapshot delta LBA %d: %w", blockLBA, err) + } + } else { + // Not CoW'd: read from extent (still has snapshot-time data). + extentStart := v.super.WALOffset + v.super.WALSize + extentOff := int64(extentStart + blockLBA*uint64(v.super.BlockSize)) + if _, err := v.fd.ReadAt(result[off:off+v.super.BlockSize], extentOff); err != nil { + return nil, fmt.Errorf("blockvol: read snapshot extent LBA %d: %w", blockLBA, err) + } + } + } + + return result, nil +} + +// DeleteSnapshot removes a snapshot and deletes its delta file. +func (v *BlockVol) DeleteSnapshot(id uint32) error { + if err := v.beginOp(); err != nil { + return err + } + defer v.endOp() + + v.snapMu.Lock() + snap, ok := v.snapshots[id] + if !ok { + v.snapMu.Unlock() + return ErrSnapshotNotFound + } + delete(v.snapshots, id) + remaining := uint32(len(v.snapshots)) + v.snapMu.Unlock() + + // Pause flusher so no in-flight CoW cycle can use snap.fd after close. + // (Fix #1: use-after-close race.) + v.flusher.PauseAndFlush() + v.flusher.RemoveSnapshot(id) + v.flusher.Resume() + + // Close and remove delta file. Safe now -- flusher cannot reference snap. + deltaPath := deltaFilePath(v.path, id) + snap.Close() + os.Remove(deltaPath) + + // Update superblock. + v.super.SnapshotCount = remaining + return v.persistSuperblock() +} + +// RestoreSnapshot reverts the live volume to the state captured by the snapshot. +// This is destructive: all writes after the snapshot are lost. All snapshots are +// removed after restore. The volume must have no active I/O (call after Close +// coordination or on a standalone volume). +func (v *BlockVol) RestoreSnapshot(id uint32) error { + if err := v.beginOp(); err != nil { + return err + } + defer v.endOp() + + v.snapMu.RLock() + snap, ok := v.snapshots[id] + v.snapMu.RUnlock() + if !ok { + return ErrSnapshotNotFound + } + + // Pause flusher. Check error -- if flush fails, don't proceed. + if err := v.flusher.PauseAndFlush(); err != nil { + v.flusher.Resume() + return fmt.Errorf("blockvol: restore flush: %w", err) + } + defer v.flusher.Resume() + + // Copy CoW'd blocks from delta back to extent. + extentStart := v.super.WALOffset + v.super.WALSize + totalBlocks := v.super.VolumeSize / uint64(v.super.BlockSize) + buf := make([]byte, v.super.BlockSize) + for lba := uint64(0); lba < totalBlocks; lba++ { + if snap.bitmap.Get(lba) { + deltaOff := int64(snap.dataOffset + lba*uint64(v.super.BlockSize)) + if _, err := snap.fd.ReadAt(buf, deltaOff); err != nil { + return fmt.Errorf("blockvol: restore read delta LBA %d: %w", lba, err) + } + extentOff := int64(extentStart + lba*uint64(v.super.BlockSize)) + if _, err := v.fd.WriteAt(buf, extentOff); err != nil { + return fmt.Errorf("blockvol: restore write extent LBA %d: %w", lba, err) + } + } + } + + // Fsync extent. + if err := v.fd.Sync(); err != nil { + return fmt.Errorf("blockvol: restore fsync: %w", err) + } + + // Clear dirty map. + v.dirtyMap.Clear() + + // Reset WAL. + v.wal.Reset() + v.super.WALHead = 0 + v.super.WALTail = 0 + v.super.WALCheckpointLSN = snap.header.BaseLSN + v.nextLSN.Store(snap.header.BaseLSN + 1) + + // Remove ALL snapshots. Flusher is paused, so no CoW cycle can race. + v.snapMu.Lock() + for sid, s := range v.snapshots { + v.flusher.RemoveSnapshot(sid) + s.Close() + os.Remove(deltaFilePath(v.path, sid)) + } + v.snapshots = make(map[uint32]*activeSnapshot) + v.snapMu.Unlock() + + // Update superblock. + v.super.SnapshotCount = 0 + return v.persistSuperblock() +} + +// ListSnapshots returns metadata for all active snapshots. +func (v *BlockVol) ListSnapshots() []SnapshotInfo { + v.snapMu.RLock() + defer v.snapMu.RUnlock() + infos := make([]SnapshotInfo, 0, len(v.snapshots)) + for _, snap := range v.snapshots { + infos = append(infos, SnapshotInfo{ + ID: snap.id, + BaseLSN: snap.header.BaseLSN, + CreatedAt: time.Unix(int64(snap.header.CreatedAt), 0), + CoWBlocks: snap.bitmap.CountSet(), + }) + } + return infos +} + +// persistSuperblock writes the superblock to disk and fsyncs. +func (v *BlockVol) persistSuperblock() error { + if _, err := v.fd.Seek(0, 0); err != nil { + return fmt.Errorf("blockvol: persist superblock seek: %w", err) + } + if _, err := v.super.WriteTo(v.fd); err != nil { + return fmt.Errorf("blockvol: persist superblock write: %w", err) + } + if err := v.fd.Sync(); err != nil { + return fmt.Errorf("blockvol: persist superblock sync: %w", err) + } + return nil +} + // Close shuts down the block volume and closes the file. // Shutdown order: shipper -> replica receiver -> rebuild server -> drain ops -> group committer -> flusher -> final flush -> close fd. func (v *BlockVol) Close() error { @@ -644,6 +910,15 @@ func (v *BlockVol) Close() error { v.flusher.Stop() // stop background goroutine first (no concurrent flush) flushErr = v.flusher.FlushOnce() // then do final flush safely } + + // Close snapshot delta files. + v.snapMu.Lock() + for _, snap := range v.snapshots { + snap.Close() + } + v.snapshots = nil + v.snapMu.Unlock() + closeErr := v.fd.Close() if flushErr != nil { return flushErr diff --git a/weed/storage/blockvol/flusher.go b/weed/storage/blockvol/flusher.go index d7f84cf45..805f83fb1 100644 --- a/weed/storage/blockvol/flusher.go +++ b/weed/storage/blockvol/flusher.go @@ -25,6 +25,16 @@ type Flusher struct { checkpointLSN uint64 // last flushed LSN checkpointTail uint64 // WAL physical tail after last flush + // flushMu serializes FlushOnce calls and is acquired by CreateSnapshot + // to pause the flusher while the snapshot is being set up. + // Lock order: flushMu -> snapMu (flushMu acquired first). + flushMu sync.Mutex + + // snapMu protects the snapshots slice. Acquired under flushMu in + // FlushOnce (RLock) and under flushMu in PauseAndFlush callers. + snapMu sync.RWMutex + snapshots []*activeSnapshot + logger *log.Logger lastErr bool // true if last FlushOnce returned error @@ -126,24 +136,109 @@ func (f *Flusher) Stop() { <-f.done } -// FlushOnce performs a single flush cycle: scan dirty map, copy data to -// extent region, fsync, update checkpoint, advance WAL tail. -func (f *Flusher) FlushOnce() error { - // Snapshot dirty entries. We use a full scan (Range over all possible LBAs - // is impractical), so we collect from the dirty map directly. - type flushEntry struct { - lba uint64 - walOff uint64 - lsn uint64 - length uint32 +// AddSnapshot adds a snapshot to the flusher's active list. +func (f *Flusher) AddSnapshot(snap *activeSnapshot) { + f.snapMu.Lock() + f.snapshots = append(f.snapshots, snap) + f.snapMu.Unlock() +} + +// RemoveSnapshot removes a snapshot from the flusher's active list by ID. +func (f *Flusher) RemoveSnapshot(id uint32) { + f.snapMu.Lock() + for i, s := range f.snapshots { + if s.id == id { + f.snapshots = append(f.snapshots[:i], f.snapshots[i+1:]...) + break + } } + f.snapMu.Unlock() +} + +// HasActiveSnapshots returns true if there are active snapshots needing CoW. +func (f *Flusher) HasActiveSnapshots() bool { + f.snapMu.RLock() + n := len(f.snapshots) + f.snapMu.RUnlock() + return n > 0 +} + +// PauseAndFlush acquires flushMu (pausing the flusher), then runs FlushOnce. +// The caller must call Resume() when done. +func (f *Flusher) PauseAndFlush() error { + f.flushMu.Lock() + return f.flushOnceLocked() +} + +// Resume releases flushMu, allowing the flusher to resume. +func (f *Flusher) Resume() { + f.flushMu.Unlock() +} +// FlushOnce performs a single flush cycle: scan dirty map, CoW for active +// snapshots, copy data to extent region, fsync, update checkpoint, advance WAL tail. +func (f *Flusher) FlushOnce() error { + f.flushMu.Lock() + defer f.flushMu.Unlock() + return f.flushOnceLocked() +} + +// flushOnceLocked is the inner FlushOnce. Caller must hold flushMu. +func (f *Flusher) flushOnceLocked() error { entries := f.dirtyMap.Snapshot() if len(entries) == 0 { return nil } - // Find the max LSN and max WAL offset to know where to advance tail. + // --- Phase 1: CoW for active snapshots --- + f.snapMu.RLock() + snaps := make([]*activeSnapshot, len(f.snapshots)) + copy(snaps, f.snapshots) + f.snapMu.RUnlock() + + if len(snaps) > 0 { + cowDirty := false + for _, e := range entries { + for _, snap := range snaps { + if !snap.bitmap.Get(e.Lba) { + // Read old data from extent (pre-modification state). + oldData := make([]byte, f.blockSize) + extentOff := int64(f.extentStart + e.Lba*uint64(f.blockSize)) + if _, err := f.fd.ReadAt(oldData, extentOff); err != nil { + return fmt.Errorf("flusher: CoW read extent LBA %d: %w", e.Lba, err) + } + // Write old data to delta file. + deltaOff := int64(snap.dataOffset + e.Lba*uint64(f.blockSize)) + if _, err := snap.fd.WriteAt(oldData, deltaOff); err != nil { + return fmt.Errorf("flusher: CoW write delta LBA %d snap %d: %w", e.Lba, snap.id, err) + } + snap.bitmap.Set(e.Lba) + snap.dirty = true + cowDirty = true + } + } + } + + if cowDirty { + // Crash safety: delta data -> fsync -> bitmap persist -> fsync -> extent write. + for _, snap := range snaps { + if snap.dirty { + if err := snap.fd.Sync(); err != nil { + return fmt.Errorf("flusher: fsync delta snap %d: %w", snap.id, err) + } + if err := snap.bitmap.WriteTo(snap.fd, SnapHeaderSize); err != nil { + return fmt.Errorf("flusher: persist bitmap snap %d: %w", snap.id, err) + } + if err := snap.fd.Sync(); err != nil { + return fmt.Errorf("flusher: fsync bitmap snap %d: %w", snap.id, err) + } + snap.dirty = false + } + } + } + } + + // --- Phase 2: Extent writes (existing, unchanged) --- var maxLSN uint64 var maxWALEnd uint64 @@ -156,8 +251,6 @@ func (f *Flusher) FlushOnce() error { } // WAL reuse guard: validate LSN before trusting the entry. - // Between Snapshot() and this read, the WAL slot could have been - // reused (by a previous flush cycle advancing the tail + new writes). entryLSN := binary.LittleEndian.Uint64(headerBuf[0:8]) if entryLSN != e.Lsn { continue // stale --WAL slot reused, skip this entry @@ -180,9 +273,6 @@ func (f *Flusher) FlushOnce() error { continue // corrupt or partially overwritten --skip } - // Write only this block's data to extent (not all blocks in the - // WAL entry). Other blocks may have been overwritten by newer - // writes and their dirty map entries point elsewhere. if e.Lba < entry.LBA { continue // LBA mismatch --stale entry } @@ -196,21 +286,17 @@ func (f *Flusher) FlushOnce() error { } } - // Track WAL end position for tail advance. walEnd := e.WalOffset + uint64(entryLen) if walEnd > maxWALEnd { maxWALEnd = walEnd } } else if entryType == EntryTypeTrim { - // TRIM entries: zero the extent region for this LBA. - // Each dirty map entry represents one trimmed block. zeroBlock := make([]byte, f.blockSize) extentOff := int64(f.extentStart + e.Lba*uint64(f.blockSize)) if _, err := f.fd.WriteAt(zeroBlock, extentOff); err != nil { return fmt.Errorf("flusher: zero extent at LBA %d: %w", e.Lba, err) } - // TRIM entry has no data payload, just a header. walEnd := e.WalOffset + uint64(walEntryHeaderSize) if walEnd > maxWALEnd { maxWALEnd = walEnd @@ -230,8 +316,6 @@ func (f *Flusher) FlushOnce() error { // Remove flushed entries from dirty map. f.mu.Lock() for _, e := range entries { - // Only remove if the dirty map entry still has the same LSN - // (a newer write may have updated it). _, currentLSN, _, ok := f.dirtyMap.Get(e.Lba) if ok && currentLSN == e.Lsn { f.dirtyMap.Delete(e.Lba) diff --git a/weed/storage/blockvol/snapshot.go b/weed/storage/blockvol/snapshot.go new file mode 100644 index 000000000..3c333d642 --- /dev/null +++ b/weed/storage/blockvol/snapshot.go @@ -0,0 +1,296 @@ +package blockvol + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "time" +) + +// Snapshot delta file constants. +const ( + SnapMagic = "SWBS" + SnapVersion = 1 + SnapHeaderSize = 4096 +) + +var ( + ErrSnapshotNotFound = errors.New("blockvol: snapshot not found") + ErrSnapshotExists = errors.New("blockvol: snapshot already exists") + ErrSnapshotBadMagic = errors.New("blockvol: snapshot bad magic") + ErrSnapshotBadVersion = errors.New("blockvol: snapshot unsupported version") + ErrSnapshotBadParent = errors.New("blockvol: snapshot parent UUID mismatch") + ErrSnapshotRoleReject = errors.New("blockvol: snapshots only allowed on primary or standalone") +) + +// SnapshotBitmap is a dense bitset tracking which blocks have been CoW'd. +// No internal locking -- callers (flusher, CreateSnapshot) serialize access. +type SnapshotBitmap struct { + data []byte + bits uint64 // total number of bits (= VolumeSize/BlockSize) +} + +// NewSnapshotBitmap creates a zero-initialized bitmap for totalBlocks blocks. +func NewSnapshotBitmap(totalBlocks uint64) *SnapshotBitmap { + byteLen := (totalBlocks + 7) / 8 + return &SnapshotBitmap{ + data: make([]byte, byteLen), + bits: totalBlocks, + } +} + +// Get returns true if the bit at position lba is set. +func (b *SnapshotBitmap) Get(lba uint64) bool { + if lba >= b.bits { + return false + } + return b.data[lba/8]&(1<<(lba%8)) != 0 +} + +// Set sets the bit at position lba. +func (b *SnapshotBitmap) Set(lba uint64) { + if lba >= b.bits { + return + } + b.data[lba/8] |= 1 << (lba % 8) +} + +// ByteSize returns the number of bytes in the bitmap. +func (b *SnapshotBitmap) ByteSize() int { + return len(b.data) +} + +// WriteTo writes the bitmap data to w at the given offset. +func (b *SnapshotBitmap) WriteTo(w io.WriterAt, offset int64) error { + _, err := w.WriteAt(b.data, offset) + return err +} + +// ReadFrom reads bitmap data from r at the given offset. +func (b *SnapshotBitmap) ReadFrom(r io.ReaderAt, offset int64) error { + _, err := r.ReadAt(b.data, offset) + return err +} + +// CountSet returns the number of set bits (CoW'd blocks). +func (b *SnapshotBitmap) CountSet() uint64 { + var count uint64 + for _, v := range b.data { + count += uint64(popcount8(v)) + } + return count +} + +// popcount8 returns the number of set bits in a byte. +func popcount8(x byte) int { + x = x - ((x >> 1) & 0x55) + x = (x & 0x33) + ((x >> 2) & 0x33) + return int((x + (x >> 4)) & 0x0F) +} + +// SnapshotHeader is the on-disk header for a snapshot delta file. +type SnapshotHeader struct { + Magic [4]byte + Version uint16 + SnapshotID uint32 + BaseLSN uint64 + VolumeSize uint64 + BlockSize uint32 + BitmapSize uint64 + DataOffset uint64 // = SnapHeaderSize + BitmapSize, aligned to BlockSize + CreatedAt uint64 + ParentUUID [16]byte +} + +// WriteTo serializes the header as a SnapHeaderSize-byte block to w. +func (h *SnapshotHeader) WriteTo(w io.Writer) (int64, error) { + buf := make([]byte, SnapHeaderSize) + endian := binary.LittleEndian + off := 0 + off += copy(buf[off:], h.Magic[:]) + endian.PutUint16(buf[off:], h.Version) + off += 2 + endian.PutUint32(buf[off:], h.SnapshotID) + off += 4 + endian.PutUint64(buf[off:], h.BaseLSN) + off += 8 + endian.PutUint64(buf[off:], h.VolumeSize) + off += 8 + endian.PutUint32(buf[off:], h.BlockSize) + off += 4 + endian.PutUint64(buf[off:], h.BitmapSize) + off += 8 + endian.PutUint64(buf[off:], h.DataOffset) + off += 8 + endian.PutUint64(buf[off:], h.CreatedAt) + off += 8 + copy(buf[off:], h.ParentUUID[:]) + n, err := w.Write(buf) + return int64(n), err +} + +// ReadSnapshotHeader reads a SnapshotHeader from r. +func ReadSnapshotHeader(r io.Reader) (*SnapshotHeader, error) { + buf := make([]byte, SnapHeaderSize) + if _, err := io.ReadFull(r, buf); err != nil { + return nil, fmt.Errorf("blockvol: read snapshot header: %w", err) + } + + endian := binary.LittleEndian + h := &SnapshotHeader{} + off := 0 + copy(h.Magic[:], buf[off:off+4]) + off += 4 + if string(h.Magic[:]) != SnapMagic { + return nil, ErrSnapshotBadMagic + } + h.Version = endian.Uint16(buf[off:]) + off += 2 + if h.Version != SnapVersion { + return nil, fmt.Errorf("%w: got %d, want %d", ErrSnapshotBadVersion, h.Version, SnapVersion) + } + h.SnapshotID = endian.Uint32(buf[off:]) + off += 4 + h.BaseLSN = endian.Uint64(buf[off:]) + off += 8 + h.VolumeSize = endian.Uint64(buf[off:]) + off += 8 + h.BlockSize = endian.Uint32(buf[off:]) + off += 4 + h.BitmapSize = endian.Uint64(buf[off:]) + off += 8 + h.DataOffset = endian.Uint64(buf[off:]) + off += 8 + h.CreatedAt = endian.Uint64(buf[off:]) + off += 8 + copy(h.ParentUUID[:], buf[off:off+16]) + return h, nil +} + +// activeSnapshot represents an open snapshot delta file with its in-memory bitmap. +type activeSnapshot struct { + id uint32 + fd *os.File + header SnapshotHeader + bitmap *SnapshotBitmap + dataOffset uint64 + dirty bool // bitmap changed since last persist +} + +// Close closes the delta file. +func (s *activeSnapshot) Close() error { + return s.fd.Close() +} + +// deltaFilePath returns the path for a snapshot delta file. +func deltaFilePath(volPath string, id uint32) string { + return fmt.Sprintf("%s.snap.%d", volPath, id) +} + +// createDeltaFile creates a new snapshot delta file and returns an activeSnapshot. +func createDeltaFile(path string, id uint32, vol *BlockVol, baseLSN uint64) (*activeSnapshot, error) { + totalBlocks := vol.super.VolumeSize / uint64(vol.super.BlockSize) + bitmap := NewSnapshotBitmap(totalBlocks) + bitmapSize := uint64(bitmap.ByteSize()) + + // Align DataOffset to BlockSize. + dataOffset := uint64(SnapHeaderSize) + bitmapSize + rem := dataOffset % uint64(vol.super.BlockSize) + if rem != 0 { + dataOffset += uint64(vol.super.BlockSize) - rem + } + + hdr := SnapshotHeader{ + Version: SnapVersion, + SnapshotID: id, + BaseLSN: baseLSN, + VolumeSize: vol.super.VolumeSize, + BlockSize: vol.super.BlockSize, + BitmapSize: bitmapSize, + DataOffset: dataOffset, + CreatedAt: uint64(time.Now().Unix()), + ParentUUID: vol.super.UUID, + } + copy(hdr.Magic[:], SnapMagic) + + fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0644) + if err != nil { + return nil, fmt.Errorf("blockvol: create delta file: %w", err) + } + + // Truncate to full size (sparse file -- only header+bitmap consume disk). + totalSize := int64(dataOffset + vol.super.VolumeSize) + if err := fd.Truncate(totalSize); err != nil { + fd.Close() + os.Remove(path) + return nil, fmt.Errorf("blockvol: truncate delta: %w", err) + } + + // Write header. + if _, err := hdr.WriteTo(fd); err != nil { + fd.Close() + os.Remove(path) + return nil, fmt.Errorf("blockvol: write snapshot header: %w", err) + } + + // Write zero bitmap. + if err := bitmap.WriteTo(fd, SnapHeaderSize); err != nil { + fd.Close() + os.Remove(path) + return nil, fmt.Errorf("blockvol: write snapshot bitmap: %w", err) + } + + // Fsync delta file. + if err := fd.Sync(); err != nil { + fd.Close() + os.Remove(path) + return nil, fmt.Errorf("blockvol: sync delta: %w", err) + } + + return &activeSnapshot{ + id: id, + fd: fd, + header: hdr, + bitmap: bitmap, + dataOffset: dataOffset, + }, nil +} + +// openDeltaFile opens an existing snapshot delta file, reads its header and bitmap. +func openDeltaFile(path string) (*activeSnapshot, error) { + fd, err := os.OpenFile(path, os.O_RDWR, 0644) + if err != nil { + return nil, fmt.Errorf("blockvol: open delta file: %w", err) + } + + hdr, err := ReadSnapshotHeader(fd) + if err != nil { + fd.Close() + return nil, err + } + + totalBlocks := hdr.VolumeSize / uint64(hdr.BlockSize) + bitmap := NewSnapshotBitmap(totalBlocks) + if err := bitmap.ReadFrom(fd, SnapHeaderSize); err != nil { + fd.Close() + return nil, fmt.Errorf("blockvol: read snapshot bitmap: %w", err) + } + + return &activeSnapshot{ + id: hdr.SnapshotID, + fd: fd, + header: *hdr, + bitmap: bitmap, + dataOffset: hdr.DataOffset, + }, nil +} + +// SnapshotInfo contains read-only snapshot metadata for listing. +type SnapshotInfo struct { + ID uint32 + BaseLSN uint64 + CreatedAt time.Time + CoWBlocks uint64 // number of CoW'd blocks (bitmap.CountSet()) +} diff --git a/weed/storage/blockvol/snapshot_test.go b/weed/storage/blockvol/snapshot_test.go new file mode 100644 index 000000000..98f0f370d --- /dev/null +++ b/weed/storage/blockvol/snapshot_test.go @@ -0,0 +1,512 @@ +package blockvol + +import ( + "bytes" + "os" + "path/filepath" + "sync" + "testing" + "time" +) + +func TestSnapshots(t *testing.T) { + tests := []struct { + name string + run func(t *testing.T) + }{ + {name: "bitmap_get_set", run: testSnap_BitmapGetSet}, + {name: "bitmap_persist_reload", run: testSnap_BitmapPersistReload}, + {name: "header_roundtrip", run: testSnap_HeaderRoundtrip}, + {name: "create_and_read", run: testSnap_CreateAndRead}, + {name: "multiple_snapshots", run: testSnap_MultipleSnapshots}, + {name: "delete_removes_file", run: testSnap_DeleteRemovesFile}, + {name: "cow_only_touched_blocks", run: testSnap_CoWOnlyTouchedBlocks}, + {name: "during_active_writes", run: testSnap_DuringActiveWrites}, + {name: "survives_recovery", run: testSnap_SurvivesRecovery}, + {name: "restore_rewinds", run: testSnap_RestoreRewinds}, + } + for _, tt := range tests { + t.Run(tt.name, tt.run) + } +} + +func testSnap_BitmapGetSet(t *testing.T) { + bm := NewSnapshotBitmap(1024) + + // All bits start as zero. + for i := uint64(0); i < 1024; i++ { + if bm.Get(i) { + t.Fatalf("bit %d should be 0", i) + } + } + + // Set some bits. + bm.Set(0) + bm.Set(7) + bm.Set(8) + bm.Set(1023) + + if !bm.Get(0) { + t.Fatal("bit 0 not set") + } + if !bm.Get(7) { + t.Fatal("bit 7 not set") + } + if !bm.Get(8) { + t.Fatal("bit 8 not set") + } + if !bm.Get(1023) { + t.Fatal("bit 1023 not set") + } + if bm.Get(1) { + t.Fatal("bit 1 should not be set") + } + + if bm.CountSet() != 4 { + t.Fatalf("CountSet = %d, want 4", bm.CountSet()) + } + + // Out-of-range: Get returns false, Set is a no-op. + if bm.Get(1024) { + t.Fatal("out-of-range Get should return false") + } + bm.Set(1024) // no panic + + // ByteSize. + if bm.ByteSize() != 128 { // 1024/8 = 128 + t.Fatalf("ByteSize = %d, want 128", bm.ByteSize()) + } +} + +func testSnap_BitmapPersistReload(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "bitmap.dat") + + bm := NewSnapshotBitmap(256) + bm.Set(0) + bm.Set(100) + bm.Set(255) + + // Write to file. + fd, err := os.Create(path) + if err != nil { + t.Fatal(err) + } + if err := bm.WriteTo(fd, 0); err != nil { + t.Fatal(err) + } + fd.Close() + + // Read back. + fd2, err := os.Open(path) + if err != nil { + t.Fatal(err) + } + defer fd2.Close() + + bm2 := NewSnapshotBitmap(256) + if err := bm2.ReadFrom(fd2, 0); err != nil { + t.Fatal(err) + } + + if !bm2.Get(0) || !bm2.Get(100) || !bm2.Get(255) { + t.Fatal("reloaded bitmap missing set bits") + } + if bm2.Get(1) || bm2.Get(99) || bm2.Get(254) { + t.Fatal("reloaded bitmap has spurious bits") + } + if bm2.CountSet() != 3 { + t.Fatalf("CountSet = %d, want 3", bm2.CountSet()) + } +} + +func testSnap_HeaderRoundtrip(t *testing.T) { + hdr := SnapshotHeader{ + Version: SnapVersion, + SnapshotID: 42, + BaseLSN: 1000, + VolumeSize: 1 << 30, + BlockSize: 4096, + BitmapSize: 32768, + DataOffset: 36864, + CreatedAt: uint64(time.Now().Unix()), + } + copy(hdr.Magic[:], SnapMagic) + copy(hdr.ParentUUID[:], "0123456789abcdef") + + var buf bytes.Buffer + if _, err := hdr.WriteTo(&buf); err != nil { + t.Fatal(err) + } + if buf.Len() != SnapHeaderSize { + t.Fatalf("header size = %d, want %d", buf.Len(), SnapHeaderSize) + } + + hdr2, err := ReadSnapshotHeader(&buf) + if err != nil { + t.Fatal(err) + } + + if hdr2.SnapshotID != 42 { + t.Fatalf("SnapshotID = %d, want 42", hdr2.SnapshotID) + } + if hdr2.BaseLSN != 1000 { + t.Fatalf("BaseLSN = %d, want 1000", hdr2.BaseLSN) + } + if hdr2.VolumeSize != 1<<30 { + t.Fatalf("VolumeSize = %d, want %d", hdr2.VolumeSize, 1<<30) + } + if hdr2.BlockSize != 4096 { + t.Fatalf("BlockSize = %d, want 4096", hdr2.BlockSize) + } + if hdr2.BitmapSize != 32768 { + t.Fatalf("BitmapSize = %d", hdr2.BitmapSize) + } + if hdr2.DataOffset != 36864 { + t.Fatalf("DataOffset = %d", hdr2.DataOffset) + } + if hdr2.ParentUUID != hdr.ParentUUID { + t.Fatalf("ParentUUID mismatch") + } +} + +func testSnap_CreateAndRead(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + // Write 'A' to LBA 0. + if err := v.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatal(err) + } + if err := v.SyncCache(); err != nil { + t.Fatal(err) + } + + // Create snapshot 1. + if err := v.CreateSnapshot(1); err != nil { + t.Fatal(err) + } + + // Write 'B' to LBA 0 (after snapshot). + if err := v.WriteLBA(0, makeBlock('B')); err != nil { + t.Fatal(err) + } + if err := v.SyncCache(); err != nil { + t.Fatal(err) + } + // Force flush so CoW happens. + v.flusher.FlushOnce() + + // Live read should see 'B'. + live, err := v.ReadLBA(0, 4096) + if err != nil { + t.Fatal(err) + } + if live[0] != 'B' { + t.Fatalf("live read: got %c, want B", live[0]) + } + + // Snapshot read should see 'A'. + snapData, err := v.ReadSnapshot(1, 0, 4096) + if err != nil { + t.Fatal(err) + } + if snapData[0] != 'A' { + t.Fatalf("snapshot read: got %c, want A", snapData[0]) + } +} + +func testSnap_MultipleSnapshots(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + // Write 'A' to LBA 5, snapshot S1. + if err := v.WriteLBA(5, makeBlock('A')); err != nil { + t.Fatal(err) + } + v.SyncCache() + if err := v.CreateSnapshot(1); err != nil { + t.Fatal(err) + } + + // Write 'B' to LBA 5, snapshot S2. + if err := v.WriteLBA(5, makeBlock('B')); err != nil { + t.Fatal(err) + } + v.SyncCache() + v.flusher.FlushOnce() // CoW 'A' to S1 + if err := v.CreateSnapshot(2); err != nil { + t.Fatal(err) + } + + // Write 'C' to LBA 5. + if err := v.WriteLBA(5, makeBlock('C')); err != nil { + t.Fatal(err) + } + v.SyncCache() + v.flusher.FlushOnce() // CoW 'B' to S2, S1 already done + + // S1 sees 'A', S2 sees 'B', live sees 'C'. + s1, err := v.ReadSnapshot(1, 5, 4096) + if err != nil { + t.Fatal(err) + } + if s1[0] != 'A' { + t.Fatalf("S1: got %c, want A", s1[0]) + } + + s2, err := v.ReadSnapshot(2, 5, 4096) + if err != nil { + t.Fatal(err) + } + if s2[0] != 'B' { + t.Fatalf("S2: got %c, want B", s2[0]) + } + + live, err := v.ReadLBA(5, 4096) + if err != nil { + t.Fatal(err) + } + if live[0] != 'C' { + t.Fatalf("live: got %c, want C", live[0]) + } + + // List should show 2 snapshots. + infos := v.ListSnapshots() + if len(infos) != 2 { + t.Fatalf("ListSnapshots: got %d, want 2", len(infos)) + } +} + +func testSnap_DeleteRemovesFile(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + v.WriteLBA(0, makeBlock('X')) + v.SyncCache() + v.CreateSnapshot(1) + + deltaPath := deltaFilePath(v.path, 1) + if _, err := os.Stat(deltaPath); err != nil { + t.Fatalf("delta file should exist: %v", err) + } + + if err := v.DeleteSnapshot(1); err != nil { + t.Fatal(err) + } + + if _, err := os.Stat(deltaPath); !os.IsNotExist(err) { + t.Fatalf("delta file should be removed, got err: %v", err) + } + + // Reading deleted snapshot returns error. + if _, err := v.ReadSnapshot(1, 0, 4096); err != ErrSnapshotNotFound { + t.Fatalf("expected ErrSnapshotNotFound, got %v", err) + } + + if len(v.ListSnapshots()) != 0 { + t.Fatal("ListSnapshots should be empty") + } +} + +func testSnap_CoWOnlyTouchedBlocks(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + // Write LBAs 0, 1, 2. + for i := uint64(0); i < 3; i++ { + v.WriteLBA(i, makeBlock(byte('A'+i))) + } + v.SyncCache() + v.CreateSnapshot(1) + + // Only modify LBA 1. + v.WriteLBA(1, makeBlock('Z')) + v.SyncCache() + v.flusher.FlushOnce() + + // Check bitmap: only LBA 1 should be CoW'd. + v.snapMu.RLock() + snap := v.snapshots[1] + cowCount := snap.bitmap.CountSet() + v.snapMu.RUnlock() + + if cowCount != 1 { + t.Fatalf("CoW count = %d, want 1", cowCount) + } + + // Snapshot should still see original values. + for i := uint64(0); i < 3; i++ { + data, err := v.ReadSnapshot(1, i, 4096) + if err != nil { + t.Fatal(err) + } + expected := byte('A' + i) + if data[0] != expected { + t.Fatalf("LBA %d: got %c, want %c", i, data[0], expected) + } + } +} + +func testSnap_DuringActiveWrites(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + // Write initial data. + v.WriteLBA(0, makeBlock('A')) + v.SyncCache() + + // Start concurrent writes and create snapshot. + var wg sync.WaitGroup + errCh := make(chan error, 20) + + // Writer goroutine: continuously write to LBA 10. + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + if err := v.WriteLBA(10, makeBlock(byte('0'+i))); err != nil { + errCh <- err + return + } + time.Sleep(1 * time.Millisecond) + } + }() + + // Snapshot creation mid-writes. + time.Sleep(5 * time.Millisecond) + if err := v.CreateSnapshot(1); err != nil { + t.Fatal(err) + } + + wg.Wait() + close(errCh) + for err := range errCh { + t.Fatalf("concurrent write error: %v", err) + } + + // Snapshot should exist and be readable. + _, err := v.ReadSnapshot(1, 0, 4096) + if err != nil { + t.Fatal(err) + } +} + +func testSnap_SurvivesRecovery(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.blockvol") + + // Create volume, write, snapshot. + v, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 256 * 1024, + }) + if err != nil { + t.Fatal(err) + } + + v.WriteLBA(0, makeBlock('A')) + v.SyncCache() + v.CreateSnapshot(1) + + // Write new data after snapshot. + v.WriteLBA(0, makeBlock('B')) + v.SyncCache() + v.flusher.FlushOnce() + + // Close and reopen. + v.Close() + + v2, err := OpenBlockVol(path) + if err != nil { + t.Fatal(err) + } + defer v2.Close() + + // Snapshot should survive. + if len(v2.ListSnapshots()) != 1 { + t.Fatalf("expected 1 snapshot after recovery, got %d", len(v2.ListSnapshots())) + } + + // Snapshot data should read 'A'. + snapData, err := v2.ReadSnapshot(1, 0, 4096) + if err != nil { + t.Fatal(err) + } + if snapData[0] != 'A' { + t.Fatalf("snapshot after recovery: got %c, want A", snapData[0]) + } + + // Live data should read 'B'. + live, err := v2.ReadLBA(0, 4096) + if err != nil { + t.Fatal(err) + } + if live[0] != 'B' { + t.Fatalf("live after recovery: got %c, want B", live[0]) + } +} + +func testSnap_RestoreRewinds(t *testing.T) { + v := createTestVol(t) + defer v.Close() + + // Write 'A' to LBA 0 and 'X' to LBA 1. + v.WriteLBA(0, makeBlock('A')) + v.WriteLBA(1, makeBlock('X')) + v.SyncCache() + v.CreateSnapshot(1) + + // Write 'B' to LBA 0 (overwrites 'A'). + v.WriteLBA(0, makeBlock('B')) + v.SyncCache() + v.flusher.FlushOnce() + + // Live should be 'B'. + live, _ := v.ReadLBA(0, 4096) + if live[0] != 'B' { + t.Fatalf("pre-restore live: got %c, want B", live[0]) + } + + // Restore snapshot 1. + if err := v.RestoreSnapshot(1); err != nil { + t.Fatal(err) + } + + // Live should now be 'A' (reverted). + live2, err := v.ReadLBA(0, 4096) + if err != nil { + t.Fatal(err) + } + if live2[0] != 'A' { + t.Fatalf("post-restore LBA 0: got %c, want A", live2[0]) + } + + // LBA 1 should still be 'X' (unchanged, not CoW'd). + live3, err := v.ReadLBA(1, 4096) + if err != nil { + t.Fatal(err) + } + if live3[0] != 'X' { + t.Fatalf("post-restore LBA 1: got %c, want X", live3[0]) + } + + // All snapshots should be gone. + if len(v.ListSnapshots()) != 0 { + t.Fatalf("snapshots should be empty after restore, got %d", len(v.ListSnapshots())) + } + + // Volume should still be writable. + if err := v.WriteLBA(0, makeBlock('C')); err != nil { + t.Fatalf("write after restore: %v", err) + } + v.SyncCache() + data, err := v.ReadLBA(0, 4096) + if err != nil { + t.Fatal(err) + } + if data[0] != 'C' { + t.Fatalf("read after restore write: got %c, want C", data[0]) + } +} diff --git a/weed/storage/blockvol/wal_writer.go b/weed/storage/blockvol/wal_writer.go index 01eda675e..a42a4f750 100644 --- a/weed/storage/blockvol/wal_writer.go +++ b/weed/storage/blockvol/wal_writer.go @@ -155,6 +155,14 @@ func (w *WALWriter) AdvanceTail(newTail uint64) { w.mu.Unlock() } +// Reset resets the WAL to empty state (head=tail=0). Used by snapshot restore. +func (w *WALWriter) Reset() { + w.mu.Lock() + w.logicalHead = 0 + w.logicalTail = 0 + w.mu.Unlock() +} + // Head returns the current physical head position (relative to WAL start). func (w *WALWriter) Head() uint64 { w.mu.Lock()