Browse Source

feat: CP11A-4 snapshot export/import to S3 — artifact format, engine, and transport

Add crash-consistent snapshot export/import for single-profile block volumes.
Export creates a temp snapshot, streams the full volume image with inline
SHA-256, and uploads to S3. Import validates manifest + checksum and writes
directly to extent region. Admin HTTP endpoints /export and /import added
to the standalone iscsi-target binary.

Engine: snapshot_export.go (manifest types, ExportSnapshot, ImportSnapshot)
S3: snapshot_s3.go (AWS SDK v1 transport, pipe-based streaming upload)
Tests: 14 engine + 9 QA adversarial = 23 new tests, all passing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 20 hours ago
parent
commit
7cc6467d09
  1. 6
      weed/storage/blockvol/iscsi/cmd/iscsi-target/admin.go
  2. 241
      weed/storage/blockvol/iscsi/cmd/iscsi-target/snapshot_s3.go
  3. 261
      weed/storage/blockvol/qa_snapshot_export_test.go
  4. 281
      weed/storage/blockvol/snapshot_export.go
  5. 368
      weed/storage/blockvol/snapshot_export_test.go

6
weed/storage/blockvol/iscsi/cmd/iscsi-target/admin.go

@ -111,6 +111,10 @@ func (a *adminServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
a.handleResize(w, r)
case "/metrics":
a.handleMetrics(w, r)
case "/export":
a.handleExport(w, r)
case "/import":
a.handleImport(w, r)
default:
http.NotFound(w, r)
}
@ -352,6 +356,8 @@ func startAdminServer(addr string, srv *adminServer) (net.Listener, error) {
mux.Handle("/snapshot", srv)
mux.Handle("/resize", srv)
mux.Handle("/metrics", srv)
mux.Handle("/export", srv)
mux.Handle("/import", srv)
// pprof handlers registered on DefaultServeMux by net/http/pprof import.
mux.Handle("/debug/pprof/", http.DefaultServeMux)
go func() {

241
weed/storage/blockvol/iscsi/cmd/iscsi-target/snapshot_s3.go

@ -0,0 +1,241 @@
package main
import (
"bytes"
"encoding/json"
"io"
"log"
"net/http"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
// exportRequest is the JSON body for POST /export.
type exportRequest struct {
Bucket string `json:"bucket"`
KeyPrefix string `json:"key_prefix"`
S3Endpoint string `json:"s3_endpoint"`
S3AccessKey string `json:"s3_access_key"`
S3SecretKey string `json:"s3_secret_key"`
S3Region string `json:"s3_region"`
SnapshotID uint32 `json:"snapshot_id"` // 0 = auto-create temp snapshot
}
// importRequest is the JSON body for POST /import.
type importRequest struct {
Bucket string `json:"bucket"`
ManifestKey string `json:"manifest_key"`
S3Endpoint string `json:"s3_endpoint"`
S3AccessKey string `json:"s3_access_key"`
S3SecretKey string `json:"s3_secret_key"`
S3Region string `json:"s3_region"`
AllowOverwrite bool `json:"allow_overwrite"`
}
// exportResponse is the JSON response for POST /export.
type exportResponse struct {
OK bool `json:"ok"`
ManifestKey string `json:"manifest_key"`
DataKey string `json:"data_key"`
SizeBytes uint64 `json:"size_bytes"`
SHA256 string `json:"sha256"`
}
// importResponse is the JSON response for POST /import.
type importResponse struct {
OK bool `json:"ok"`
SizeBytes uint64 `json:"size_bytes"`
SHA256 string `json:"sha256"`
}
func newS3Session(endpoint, accessKey, secretKey, region string) (*session.Session, error) {
if region == "" {
region = "us-east-1"
}
cfg := &aws.Config{
Region: aws.String(region),
Endpoint: aws.String(endpoint),
S3ForcePathStyle: aws.Bool(true),
DisableSSL: aws.Bool(true),
}
if accessKey != "" && secretKey != "" {
cfg.Credentials = credentials.NewStaticCredentials(accessKey, secretKey, "")
}
return session.NewSession(cfg)
}
func (a *adminServer) handleExport(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, `{"error":"method not allowed"}`, http.StatusMethodNotAllowed)
return
}
var req exportRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
jsonError(w, "bad json: "+err.Error(), http.StatusBadRequest)
return
}
if req.Bucket == "" || req.S3Endpoint == "" {
jsonError(w, "bucket and s3_endpoint are required", http.StatusBadRequest)
return
}
sess, err := newS3Session(req.S3Endpoint, req.S3AccessKey, req.S3SecretKey, req.S3Region)
if err != nil {
jsonError(w, "s3 session: "+err.Error(), http.StatusInternalServerError)
return
}
dataKey := req.KeyPrefix + "data.raw"
manifestKey := req.KeyPrefix + "manifest.json"
// Pipe: export writes → S3 uploader reads.
pr, pw := io.Pipe()
uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.PartSize = 8 * 1024 * 1024 // 8MB parts
u.Concurrency = 1
})
var manifest *blockvol.SnapshotArtifactManifest
var exportErr error
exportDone := make(chan struct{})
go func() {
defer close(exportDone)
defer pw.Close()
manifest, exportErr = a.vol.ExportSnapshot(r.Context(), pw, blockvol.ExportOptions{
DataObjectKey: dataKey,
SnapshotID: req.SnapshotID,
})
if exportErr != nil {
pw.CloseWithError(exportErr)
}
}()
// Upload data object.
_, uploadErr := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(req.Bucket),
Key: aws.String(dataKey),
Body: pr,
})
<-exportDone
if exportErr != nil {
jsonError(w, "export: "+exportErr.Error(), http.StatusInternalServerError)
return
}
if uploadErr != nil {
jsonError(w, "s3 upload data: "+uploadErr.Error(), http.StatusInternalServerError)
return
}
// Upload manifest.
manifestData, err := blockvol.MarshalManifest(manifest)
if err != nil {
jsonError(w, "marshal manifest: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(req.Bucket),
Key: aws.String(manifestKey),
Body: bytes.NewReader(manifestData),
})
if err != nil {
jsonError(w, "s3 upload manifest: "+err.Error(), http.StatusInternalServerError)
return
}
a.logger.Printf("admin: exported snapshot to s3://%s/%s (%d bytes, sha256=%s)",
req.Bucket, dataKey, manifest.DataSizeBytes, manifest.SHA256)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(exportResponse{
OK: true,
ManifestKey: manifestKey,
DataKey: dataKey,
SizeBytes: manifest.DataSizeBytes,
SHA256: manifest.SHA256,
})
}
func (a *adminServer) handleImport(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, `{"error":"method not allowed"}`, http.StatusMethodNotAllowed)
return
}
var req importRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
jsonError(w, "bad json: "+err.Error(), http.StatusBadRequest)
return
}
if req.Bucket == "" || req.ManifestKey == "" || req.S3Endpoint == "" {
jsonError(w, "bucket, manifest_key, and s3_endpoint are required", http.StatusBadRequest)
return
}
sess, err := newS3Session(req.S3Endpoint, req.S3AccessKey, req.S3SecretKey, req.S3Region)
if err != nil {
jsonError(w, "s3 session: "+err.Error(), http.StatusInternalServerError)
return
}
s3Client := s3.New(sess)
// Download manifest.
manifestResp, err := s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(req.Bucket),
Key: aws.String(req.ManifestKey),
})
if err != nil {
jsonError(w, "s3 get manifest: "+err.Error(), http.StatusInternalServerError)
return
}
manifestData, err := io.ReadAll(manifestResp.Body)
manifestResp.Body.Close()
if err != nil {
jsonError(w, "read manifest: "+err.Error(), http.StatusInternalServerError)
return
}
manifest, err := blockvol.UnmarshalManifest(manifestData)
if err != nil {
jsonError(w, "invalid manifest: "+err.Error(), http.StatusBadRequest)
return
}
// Download data object.
dataResp, err := s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(req.Bucket),
Key: aws.String(manifest.DataObjectKey),
})
if err != nil {
jsonError(w, "s3 get data: "+err.Error(), http.StatusInternalServerError)
return
}
defer dataResp.Body.Close()
// Import.
err = a.vol.ImportSnapshot(r.Context(), manifest, dataResp.Body, blockvol.ImportOptions{
AllowOverwrite: req.AllowOverwrite,
})
if err != nil {
jsonError(w, "import: "+err.Error(), http.StatusConflict)
return
}
a.logger.Printf("admin: imported snapshot from s3://%s/%s (%d bytes, sha256=%s)",
req.Bucket, req.ManifestKey, manifest.DataSizeBytes, manifest.SHA256)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(importResponse{
OK: true,
SizeBytes: manifest.DataSizeBytes,
SHA256: manifest.SHA256,
})
}
// Ensure log is used (for future debugging).
var _ = log.Printf

261
weed/storage/blockvol/qa_snapshot_export_test.go

@ -0,0 +1,261 @@
package blockvol
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"io"
"testing"
)
// TestQA_SnapshotExport_TruncatedData verifies that import rejects
// a data stream that is shorter than the manifest declares.
func TestQA_SnapshotExport_TruncatedData(t *testing.T) {
srcVol := createExportTestVol(t, 64*1024)
if err := srcVol.WriteLBA(0, make([]byte, 4096)); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
var buf bytes.Buffer
manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
// Truncate to half the data.
truncated := buf.Bytes()[:buf.Len()/2]
dstVol := createExportTestVol(t, 64*1024)
err = dstVol.ImportSnapshot(context.Background(), manifest, bytes.NewReader(truncated), ImportOptions{})
if err == nil {
t.Fatal("expected error for truncated data")
}
}
// TestQA_SnapshotExport_WrongChecksum verifies that import rejects
// data whose checksum doesn't match the manifest.
func TestQA_SnapshotExport_WrongChecksum(t *testing.T) {
srcVol := createExportTestVol(t, 64*1024)
if err := srcVol.WriteLBA(0, make([]byte, 4096)); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
var buf bytes.Buffer
manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
// Tamper with manifest checksum.
manifest.SHA256 = "0000000000000000000000000000000000000000000000000000000000000000"
dstVol := createExportTestVol(t, 64*1024)
err = dstVol.ImportSnapshot(context.Background(), manifest, &buf, ImportOptions{})
if err == nil {
t.Fatal("expected checksum mismatch error")
}
}
// TestQA_SnapshotExport_CorruptedManifest verifies that corrupted
// manifest JSON is rejected.
func TestQA_SnapshotExport_CorruptedManifest(t *testing.T) {
_, err := UnmarshalManifest([]byte(`{this is not valid json`))
if err == nil {
t.Fatal("expected error for corrupted manifest JSON")
}
// Valid JSON but missing required fields.
_, err = UnmarshalManifest([]byte(`{"format_version": 1}`))
if err == nil {
t.Fatal("expected error for manifest with missing fields")
}
}
// TestQA_SnapshotExport_DataSizeMismatch verifies that import detects
// when the manifest declares a different data size than what's provided.
func TestQA_SnapshotExport_DataSizeMismatch(t *testing.T) {
srcVol := createExportTestVol(t, 64*1024)
var buf bytes.Buffer
manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
// Inflate manifest data size to claim more data than exists.
manifest.DataSizeBytes = manifest.DataSizeBytes * 2
dstVol := createExportTestVol(t, 64*1024)
err = dstVol.ImportSnapshot(context.Background(), manifest, &buf, ImportOptions{})
if err == nil {
t.Fatal("expected error for data size mismatch")
}
}
// TestQA_SnapshotExport_RoundTripIntegrity writes a known multi-block pattern,
// exports, imports into a new volume, and verifies every byte matches.
func TestQA_SnapshotExport_RoundTripIntegrity(t *testing.T) {
volSize := uint64(64 * 1024) // 64KB = 16 blocks at 4KB
srcVol := createExportTestVol(t, volSize)
// Write distinct patterns to multiple blocks.
for lba := uint64(0); lba < volSize/4096; lba++ {
block := make([]byte, 4096)
for i := range block {
block[i] = byte((lba*251 + uint64(i)*37) % 256) // deterministic pattern
}
if err := srcVol.WriteLBA(lba, block); err != nil {
t.Fatalf("WriteLBA %d: %v", lba, err)
}
}
// Export.
var buf bytes.Buffer
manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{
DataObjectKey: "integrity/data.raw",
})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
// Verify manifest is valid.
if err := ValidateManifest(manifest); err != nil {
t.Fatalf("manifest invalid: %v", err)
}
// Independently verify checksum of exported data.
h := sha256.Sum256(buf.Bytes())
if manifest.SHA256 != hex.EncodeToString(h[:]) {
t.Fatal("manifest SHA256 doesn't match exported data")
}
// Import into new volume.
dstVol := createExportTestVol(t, volSize)
err = dstVol.ImportSnapshot(context.Background(), manifest, bytes.NewReader(buf.Bytes()), ImportOptions{})
if err != nil {
t.Fatalf("ImportSnapshot: %v", err)
}
// Verify every block in target matches source pattern.
for lba := uint64(0); lba < volSize/4096; lba++ {
got, err := dstVol.ReadLBA(lba, 4096)
if err != nil {
t.Fatalf("ReadLBA %d: %v", lba, err)
}
expected := make([]byte, 4096)
for i := range expected {
expected[i] = byte((lba*251 + uint64(i)*37) % 256)
}
if !bytes.Equal(got, expected) {
t.Fatalf("block %d mismatch after import", lba)
}
}
}
// TestQA_SnapshotExport_ContextCancellation verifies that export
// respects context cancellation.
func TestQA_SnapshotExport_ContextCancellation(t *testing.T) {
vol := createExportTestVol(t, 64*1024)
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately
var buf bytes.Buffer
_, err := vol.ExportSnapshot(ctx, &buf, ExportOptions{})
if err == nil {
t.Fatal("expected context cancellation error")
}
}
// TestQA_SnapshotExport_ManifestSerializationStable verifies that
// marshal → unmarshal produces the same manifest.
func TestQA_SnapshotExport_ManifestSerializationStable(t *testing.T) {
srcVol := createExportTestVol(t, 64*1024)
var buf bytes.Buffer
manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{
DataObjectKey: "stable/data.raw",
})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
data, err := MarshalManifest(manifest)
if err != nil {
t.Fatalf("MarshalManifest: %v", err)
}
got, err := UnmarshalManifest(data)
if err != nil {
t.Fatalf("UnmarshalManifest: %v", err)
}
if got.SHA256 != manifest.SHA256 {
t.Errorf("SHA256 changed after round-trip: %q → %q", manifest.SHA256, got.SHA256)
}
if got.DataSizeBytes != manifest.DataSizeBytes {
t.Errorf("DataSizeBytes changed: %d → %d", manifest.DataSizeBytes, got.DataSizeBytes)
}
}
// TestQA_SnapshotExport_ExportDuringLiveIO verifies export is safe
// while the volume is receiving writes.
func TestQA_SnapshotExport_ExportDuringLiveIO(t *testing.T) {
vol := createExportTestVol(t, 64*1024)
// Write initial data.
data := make([]byte, 4096)
data[0] = 0x11
if err := vol.WriteLBA(0, data); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
// Start export (creates snapshot internally).
var buf bytes.Buffer
manifest, err := vol.ExportSnapshot(context.Background(), &buf, ExportOptions{})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
// Verify export captured the 0x11 data.
if buf.Bytes()[0] != 0x11 {
t.Errorf("exported first byte = 0x%02X, want 0x11", buf.Bytes()[0])
}
// Write new data after export snapshot was taken but before we check.
data[0] = 0x22
if err := vol.WriteLBA(0, data); err != nil {
t.Fatalf("WriteLBA after export: %v", err)
}
// Import the exported data into a new volume — should have 0x11, not 0x22.
dstVol := createExportTestVol(t, 64*1024)
err = dstVol.ImportSnapshot(context.Background(), manifest, bytes.NewReader(buf.Bytes()), ImportOptions{})
if err != nil {
t.Fatalf("ImportSnapshot: %v", err)
}
got, err := dstVol.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA: %v", err)
}
if got[0] != 0x11 {
t.Errorf("imported first byte = 0x%02X, want 0x11 (snapshot-time value)", got[0])
}
}
// TestQA_SnapshotExport_NonexistentSnapshotReject verifies export from
// a snapshot ID that doesn't exist.
func TestQA_SnapshotExport_NonexistentSnapshotReject(t *testing.T) {
vol := createExportTestVol(t, 64*1024)
var buf bytes.Buffer
_, err := vol.ExportSnapshot(context.Background(), &buf, ExportOptions{SnapshotID: 999})
if err == nil {
t.Fatal("expected error for nonexistent snapshot ID")
}
}
// Ensure io package is used (for io.ReadFull reference in snapshot_export.go).
var _ = io.EOF

281
weed/storage/blockvol/snapshot_export.go

@ -0,0 +1,281 @@
package blockvol
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"time"
)
// Snapshot artifact format constants.
const (
SnapshotArtifactFormatV1 = 1
ArtifactLayoutSingleFile = "single-file"
ExportToolVersion = "sw-block-cp11a4"
exportChunkBlocks = 256 // read 256 blocks per chunk (1MB at 4KB block size)
exportTempSnapID = uint32(0xFFFFFFFE)
)
// SnapshotArtifactManifest describes a snapshot export artifact.
type SnapshotArtifactManifest struct {
FormatVersion int `json:"format_version"`
SourceVolume string `json:"source_volume"`
SourceSizeBytes uint64 `json:"source_size_bytes"`
SourceBlockSize uint32 `json:"source_block_size"`
StorageProfile string `json:"storage_profile"`
CreatedAt string `json:"created_at"`
ArtifactLayout string `json:"artifact_layout"`
DataObjectKey string `json:"data_object_key"`
DataSizeBytes uint64 `json:"data_size_bytes"`
SHA256 string `json:"sha256"`
Compression string `json:"compression"`
ExportToolVersion string `json:"export_tool_version"`
}
var (
ErrUnsupportedArtifactVersion = errors.New("blockvol: unsupported artifact format version")
ErrUnsupportedArtifactLayout = errors.New("blockvol: unsupported artifact layout")
ErrUnsupportedProfileExport = errors.New("blockvol: only single profile supported for export/import")
ErrManifestMissingField = errors.New("blockvol: manifest missing required field")
ErrChecksumMismatch = errors.New("blockvol: data checksum mismatch")
ErrImportSizeMismatch = errors.New("blockvol: target volume size does not match manifest")
ErrImportBlockSizeMismatch = errors.New("blockvol: target block size does not match manifest")
ErrImportTargetNotEmpty = errors.New("blockvol: target volume is not empty (use AllowOverwrite)")
ErrImportDataShort = errors.New("blockvol: import data shorter than manifest declares")
)
// MarshalManifest encodes a manifest as indented JSON.
func MarshalManifest(m *SnapshotArtifactManifest) ([]byte, error) {
return json.MarshalIndent(m, "", " ")
}
// UnmarshalManifest decodes and validates a manifest from JSON.
func UnmarshalManifest(data []byte) (*SnapshotArtifactManifest, error) {
var m SnapshotArtifactManifest
if err := json.Unmarshal(data, &m); err != nil {
return nil, fmt.Errorf("blockvol: unmarshal manifest: %w", err)
}
if err := ValidateManifest(&m); err != nil {
return nil, err
}
return &m, nil
}
// ValidateManifest checks that all required fields are present and supported.
func ValidateManifest(m *SnapshotArtifactManifest) error {
if m.FormatVersion != SnapshotArtifactFormatV1 {
return fmt.Errorf("%w: %d", ErrUnsupportedArtifactVersion, m.FormatVersion)
}
if m.ArtifactLayout != ArtifactLayoutSingleFile {
return fmt.Errorf("%w: %s", ErrUnsupportedArtifactLayout, m.ArtifactLayout)
}
if m.StorageProfile != "single" {
return fmt.Errorf("%w: %s", ErrUnsupportedProfileExport, m.StorageProfile)
}
if m.SourceSizeBytes == 0 {
return fmt.Errorf("%w: source_size_bytes", ErrManifestMissingField)
}
if m.SourceBlockSize == 0 {
return fmt.Errorf("%w: source_block_size", ErrManifestMissingField)
}
if m.SHA256 == "" {
return fmt.Errorf("%w: sha256", ErrManifestMissingField)
}
if m.DataSizeBytes == 0 {
return fmt.Errorf("%w: data_size_bytes", ErrManifestMissingField)
}
return nil
}
// ExportOptions configures a snapshot export.
type ExportOptions struct {
DataObjectKey string // S3 object key for the data object (stored in manifest)
SnapshotID uint32 // if > 0, export from existing snapshot; if 0, create+delete temp
}
// ExportSnapshot exports a crash-consistent snapshot of the volume to w.
// If opts.SnapshotID is 0, a temporary snapshot is created and deleted after export.
// If opts.SnapshotID is > 0, the existing snapshot is used (not deleted).
// The full logical volume image is streamed to w with SHA-256 computed inline.
func (v *BlockVol) ExportSnapshot(ctx context.Context, w io.Writer, opts ExportOptions) (*SnapshotArtifactManifest, error) {
if v.Profile() != ProfileSingle {
return nil, ErrUnsupportedProfileExport
}
info := v.Info()
snapID := opts.SnapshotID
deleteSnap := false
if snapID == 0 {
snapID = exportTempSnapID
if err := v.CreateSnapshot(snapID); err != nil {
return nil, fmt.Errorf("blockvol: export create temp snapshot: %w", err)
}
deleteSnap = true
defer func() {
if deleteSnap {
v.DeleteSnapshot(snapID)
}
}()
} else {
// Verify snapshot exists.
v.snapMu.RLock()
_, ok := v.snapshots[snapID]
v.snapMu.RUnlock()
if !ok {
return nil, ErrSnapshotNotFound
}
}
h := sha256.New()
mw := io.MultiWriter(w, h)
totalBlocks := info.VolumeSize / uint64(info.BlockSize)
chunkBlocks := uint64(exportChunkBlocks)
var totalWritten uint64
for startBlock := uint64(0); startBlock < totalBlocks; startBlock += chunkBlocks {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
remaining := totalBlocks - startBlock
n := chunkBlocks
if n > remaining {
n = remaining
}
readBytes := uint32(n * uint64(info.BlockSize))
data, err := v.ReadSnapshot(snapID, startBlock, readBytes)
if err != nil {
return nil, fmt.Errorf("blockvol: export read block %d: %w", startBlock, err)
}
if _, err := mw.Write(data); err != nil {
return nil, fmt.Errorf("blockvol: export write: %w", err)
}
totalWritten += uint64(len(data))
}
manifest := &SnapshotArtifactManifest{
FormatVersion: SnapshotArtifactFormatV1,
SourceVolume: v.Path(),
SourceSizeBytes: info.VolumeSize,
SourceBlockSize: info.BlockSize,
StorageProfile: "single",
CreatedAt: time.Now().UTC().Format(time.RFC3339),
ArtifactLayout: ArtifactLayoutSingleFile,
DataObjectKey: opts.DataObjectKey,
DataSizeBytes: totalWritten,
SHA256: hex.EncodeToString(h.Sum(nil)),
Compression: "none",
ExportToolVersion: ExportToolVersion,
}
return manifest, nil
}
// ImportOptions configures a snapshot import.
type ImportOptions struct {
AllowOverwrite bool // if true, allow import into a non-empty volume
}
// ImportSnapshot imports a snapshot artifact into this volume.
// The volume must match the manifest's size and block size.
// Data is read from r and written directly to the extent region, bypassing the WAL.
// SHA-256 is verified against the manifest after the full read.
func (v *BlockVol) ImportSnapshot(ctx context.Context, manifest *SnapshotArtifactManifest, r io.Reader, opts ImportOptions) error {
if err := ValidateManifest(manifest); err != nil {
return err
}
info := v.Info()
if info.VolumeSize != manifest.SourceSizeBytes {
return fmt.Errorf("%w: target=%d manifest=%d", ErrImportSizeMismatch, info.VolumeSize, manifest.SourceSizeBytes)
}
if info.BlockSize != manifest.SourceBlockSize {
return fmt.Errorf("%w: target=%d manifest=%d", ErrImportBlockSizeMismatch, info.BlockSize, manifest.SourceBlockSize)
}
// Empty check: nextLSN > 1 means the volume has been written to.
if !opts.AllowOverwrite && v.nextLSN.Load() > 1 {
return ErrImportTargetNotEmpty
}
// Pause flusher — we write directly to extent.
if err := v.flusher.PauseAndFlush(); err != nil {
v.flusher.Resume()
return fmt.Errorf("blockvol: import flush: %w", err)
}
defer v.flusher.Resume()
// Stream data to extent, computing SHA-256.
h := sha256.New()
tr := io.TeeReader(r, h)
extentStart := v.super.WALOffset + v.super.WALSize
chunkSize := uint64(exportChunkBlocks) * uint64(info.BlockSize)
buf := make([]byte, chunkSize)
var totalRead uint64
for totalRead < manifest.DataSizeBytes {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
remaining := manifest.DataSizeBytes - totalRead
readSize := chunkSize
if readSize > remaining {
readSize = remaining
}
n, err := io.ReadFull(tr, buf[:readSize])
if n > 0 {
writeOff := int64(extentStart) + int64(totalRead)
if _, werr := v.fd.WriteAt(buf[:n], writeOff); werr != nil {
return fmt.Errorf("blockvol: import write at offset %d: %w", writeOff, werr)
}
totalRead += uint64(n)
}
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
return fmt.Errorf("blockvol: import read: %w", err)
}
}
if totalRead != manifest.DataSizeBytes {
return fmt.Errorf("%w: read %d bytes, manifest declares %d", ErrImportDataShort, totalRead, manifest.DataSizeBytes)
}
// Verify checksum.
got := hex.EncodeToString(h.Sum(nil))
if got != manifest.SHA256 {
return fmt.Errorf("%w: got %s, want %s", ErrChecksumMismatch, got, manifest.SHA256)
}
// Fsync extent.
if err := v.fd.Sync(); err != nil {
return fmt.Errorf("blockvol: import sync: %w", err)
}
// Reset WAL and dirty map for clean state.
v.dirtyMap.Clear()
v.wal.Reset()
v.super.WALHead = 0
v.super.WALTail = 0
if err := v.persistSuperblock(); err != nil {
return fmt.Errorf("blockvol: import persist superblock: %w", err)
}
return nil
}

368
weed/storage/blockvol/snapshot_export_test.go

@ -0,0 +1,368 @@
package blockvol
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"path/filepath"
"testing"
)
func TestManifest_RoundTrip(t *testing.T) {
m := &SnapshotArtifactManifest{
FormatVersion: SnapshotArtifactFormatV1,
SourceVolume: "/data/vol1",
SourceSizeBytes: 1 << 20,
SourceBlockSize: 4096,
StorageProfile: "single",
CreatedAt: "2026-03-13T00:00:00Z",
ArtifactLayout: ArtifactLayoutSingleFile,
DataObjectKey: "backup/vol1/data.raw",
DataSizeBytes: 1 << 20,
SHA256: "abcd1234",
Compression: "none",
ExportToolVersion: ExportToolVersion,
}
data, err := MarshalManifest(m)
if err != nil {
t.Fatalf("MarshalManifest: %v", err)
}
got, err := UnmarshalManifest(data)
if err != nil {
t.Fatalf("UnmarshalManifest: %v", err)
}
if got.FormatVersion != m.FormatVersion {
t.Errorf("FormatVersion = %d, want %d", got.FormatVersion, m.FormatVersion)
}
if got.SourceSizeBytes != m.SourceSizeBytes {
t.Errorf("SourceSizeBytes = %d, want %d", got.SourceSizeBytes, m.SourceSizeBytes)
}
if got.SHA256 != m.SHA256 {
t.Errorf("SHA256 = %q, want %q", got.SHA256, m.SHA256)
}
if got.DataObjectKey != m.DataObjectKey {
t.Errorf("DataObjectKey = %q, want %q", got.DataObjectKey, m.DataObjectKey)
}
}
func TestManifest_Validate_BadVersion(t *testing.T) {
m := validManifest()
m.FormatVersion = 99
if err := ValidateManifest(m); err == nil {
t.Fatal("expected error for bad version")
}
}
func TestManifest_Validate_BadProfile(t *testing.T) {
m := validManifest()
m.StorageProfile = "striped"
if err := ValidateManifest(m); err == nil {
t.Fatal("expected error for bad profile")
}
}
func TestManifest_Validate_BadLayout(t *testing.T) {
m := validManifest()
m.ArtifactLayout = "multi-part"
if err := ValidateManifest(m); err == nil {
t.Fatal("expected error for bad layout")
}
}
func TestManifest_Validate_MissingFields(t *testing.T) {
cases := []struct {
name string
mutate func(*SnapshotArtifactManifest)
}{
{"no size", func(m *SnapshotArtifactManifest) { m.SourceSizeBytes = 0 }},
{"no block size", func(m *SnapshotArtifactManifest) { m.SourceBlockSize = 0 }},
{"no sha256", func(m *SnapshotArtifactManifest) { m.SHA256 = "" }},
{"no data size", func(m *SnapshotArtifactManifest) { m.DataSizeBytes = 0 }},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
m := validManifest()
tc.mutate(m)
if err := ValidateManifest(m); err == nil {
t.Fatal("expected error")
}
})
}
}
func TestExportSnapshot_Basic(t *testing.T) {
vol := createExportTestVol(t, 64*1024) // 64KB volume
// Write known pattern.
data := make([]byte, 4096)
for i := range data {
data[i] = 0xAA
}
if err := vol.WriteLBA(0, data); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
var buf bytes.Buffer
manifest, err := vol.ExportSnapshot(context.Background(), &buf, ExportOptions{
DataObjectKey: "test/data.raw",
})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
if manifest.DataSizeBytes != 64*1024 {
t.Errorf("DataSizeBytes = %d, want %d", manifest.DataSizeBytes, 64*1024)
}
if manifest.SourceSizeBytes != 64*1024 {
t.Errorf("SourceSizeBytes = %d, want %d", manifest.SourceSizeBytes, 64*1024)
}
if manifest.SHA256 == "" {
t.Error("SHA256 is empty")
}
if manifest.StorageProfile != "single" {
t.Errorf("StorageProfile = %q, want single", manifest.StorageProfile)
}
if uint64(buf.Len()) != manifest.DataSizeBytes {
t.Errorf("buffer len = %d, want %d", buf.Len(), manifest.DataSizeBytes)
}
}
func TestExportSnapshot_ChecksumCorrect(t *testing.T) {
vol := createExportTestVol(t, 64*1024)
data := make([]byte, 4096)
for i := range data {
data[i] = 0xBB
}
if err := vol.WriteLBA(0, data); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
var buf bytes.Buffer
manifest, err := vol.ExportSnapshot(context.Background(), &buf, ExportOptions{})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
// Independently compute SHA-256 of the exported data.
h := sha256.Sum256(buf.Bytes())
want := hex.EncodeToString(h[:])
if manifest.SHA256 != want {
t.Errorf("manifest SHA256 = %q, independently computed = %q", manifest.SHA256, want)
}
}
func TestExportSnapshot_ExistingSnapshot(t *testing.T) {
vol := createExportTestVol(t, 64*1024)
data := make([]byte, 4096)
data[0] = 0xCC
if err := vol.WriteLBA(0, data); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
// Create snapshot manually.
if err := vol.CreateSnapshot(42); err != nil {
t.Fatalf("CreateSnapshot: %v", err)
}
// Write more data after snapshot.
data[0] = 0xDD
if err := vol.WriteLBA(0, data); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
// Export from existing snapshot — should get pre-write data.
var buf bytes.Buffer
_, err := vol.ExportSnapshot(context.Background(), &buf, ExportOptions{SnapshotID: 42})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
// First byte should be 0xCC (snapshot-time), not 0xDD (current).
if buf.Bytes()[0] != 0xCC {
t.Errorf("exported first byte = 0x%02X, want 0xCC", buf.Bytes()[0])
}
vol.DeleteSnapshot(42)
}
func TestExportSnapshot_ProfileReject(t *testing.T) {
vol := createExportTestVol(t, 64*1024)
// Force profile to non-single for test.
vol.super.StorageProfile = uint8(ProfileStriped)
var buf bytes.Buffer
_, err := vol.ExportSnapshot(context.Background(), &buf, ExportOptions{})
if err == nil {
t.Fatal("expected error for non-single profile")
}
}
func TestImportSnapshot_Basic(t *testing.T) {
// Create source, write pattern, export.
srcVol := createExportTestVol(t, 64*1024)
pattern := make([]byte, 4096)
for i := range pattern {
pattern[i] = byte(i % 251) // prime-mod pattern for uniqueness
}
if err := srcVol.WriteLBA(0, pattern); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
var buf bytes.Buffer
manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{
DataObjectKey: "test/data.raw",
})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
// Create target, import.
dstVol := createExportTestVol(t, 64*1024)
err = dstVol.ImportSnapshot(context.Background(), manifest, &buf, ImportOptions{})
if err != nil {
t.Fatalf("ImportSnapshot: %v", err)
}
// Read back and verify pattern.
got, err := dstVol.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA: %v", err)
}
if !bytes.Equal(got, pattern) {
t.Error("imported data does not match source pattern")
}
}
func TestImportSnapshot_SizeMismatch(t *testing.T) {
srcVol := createExportTestVol(t, 64*1024)
var buf bytes.Buffer
manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
// Create target with different size.
dstVol := createExportTestVol(t, 128*1024)
err = dstVol.ImportSnapshot(context.Background(), manifest, &buf, ImportOptions{})
if err == nil {
t.Fatal("expected size mismatch error")
}
}
func TestImportSnapshot_NonEmptyReject(t *testing.T) {
srcVol := createExportTestVol(t, 64*1024)
var buf bytes.Buffer
manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
// Create target and write to it (makes it non-empty).
dstVol := createExportTestVol(t, 64*1024)
if err := dstVol.WriteLBA(0, make([]byte, 4096)); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
err = dstVol.ImportSnapshot(context.Background(), manifest, &buf, ImportOptions{})
if err == nil {
t.Fatal("expected non-empty target error")
}
}
func TestImportSnapshot_AllowOverwrite(t *testing.T) {
// Source with known pattern.
srcVol := createExportTestVol(t, 64*1024)
pattern := make([]byte, 4096)
pattern[0] = 0xEE
if err := srcVol.WriteLBA(0, pattern); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
var buf bytes.Buffer
manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
// Target with existing data.
dstVol := createExportTestVol(t, 64*1024)
if err := dstVol.WriteLBA(0, make([]byte, 4096)); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
err = dstVol.ImportSnapshot(context.Background(), manifest, &buf, ImportOptions{AllowOverwrite: true})
if err != nil {
t.Fatalf("ImportSnapshot with AllowOverwrite: %v", err)
}
got, err := dstVol.ReadLBA(0, 4096)
if err != nil {
t.Fatalf("ReadLBA: %v", err)
}
if got[0] != 0xEE {
t.Errorf("overwritten data first byte = 0x%02X, want 0xEE", got[0])
}
}
func TestImportSnapshot_ChecksumMismatch(t *testing.T) {
srcVol := createExportTestVol(t, 64*1024)
if err := srcVol.WriteLBA(0, make([]byte, 4096)); err != nil {
t.Fatalf("WriteLBA: %v", err)
}
var buf bytes.Buffer
manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{})
if err != nil {
t.Fatalf("ExportSnapshot: %v", err)
}
// Corrupt the data.
exported := buf.Bytes()
exported[0] ^= 0xFF
dstVol := createExportTestVol(t, 64*1024)
err = dstVol.ImportSnapshot(context.Background(), manifest, bytes.NewReader(exported), ImportOptions{})
if err == nil {
t.Fatal("expected checksum mismatch error")
}
}
// createExportTestVol creates a temporary BlockVol with a specified size for export tests.
func createExportTestVol(t *testing.T, volumeSize uint64) *BlockVol {
t.Helper()
dir := t.TempDir()
path := filepath.Join(dir, "test.vol")
vol, err := CreateBlockVol(path, CreateOptions{
VolumeSize: volumeSize,
WALSize: 32 << 10, // 32KB WAL
})
if err != nil {
t.Fatalf("CreateBlockVol: %v", err)
}
t.Cleanup(func() { vol.Close() })
return vol
}
// validManifest returns a valid manifest for test mutation.
func validManifest() *SnapshotArtifactManifest {
return &SnapshotArtifactManifest{
FormatVersion: SnapshotArtifactFormatV1,
SourceVolume: "/data/vol1",
SourceSizeBytes: 1 << 20,
SourceBlockSize: 4096,
StorageProfile: "single",
CreatedAt: "2026-03-13T00:00:00Z",
ArtifactLayout: ArtifactLayoutSingleFile,
DataObjectKey: "backup/data.raw",
DataSizeBytes: 1 << 20,
SHA256: "abc123",
Compression: "none",
ExportToolVersion: ExportToolVersion,
}
}
Loading…
Cancel
Save