diff --git a/weed/storage/blockvol/iscsi/cmd/iscsi-target/admin.go b/weed/storage/blockvol/iscsi/cmd/iscsi-target/admin.go index fdc3a9cdc..286a90cf9 100644 --- a/weed/storage/blockvol/iscsi/cmd/iscsi-target/admin.go +++ b/weed/storage/blockvol/iscsi/cmd/iscsi-target/admin.go @@ -111,6 +111,10 @@ func (a *adminServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { a.handleResize(w, r) case "/metrics": a.handleMetrics(w, r) + case "/export": + a.handleExport(w, r) + case "/import": + a.handleImport(w, r) default: http.NotFound(w, r) } @@ -352,6 +356,8 @@ func startAdminServer(addr string, srv *adminServer) (net.Listener, error) { mux.Handle("/snapshot", srv) mux.Handle("/resize", srv) mux.Handle("/metrics", srv) + mux.Handle("/export", srv) + mux.Handle("/import", srv) // pprof handlers registered on DefaultServeMux by net/http/pprof import. mux.Handle("/debug/pprof/", http.DefaultServeMux) go func() { diff --git a/weed/storage/blockvol/iscsi/cmd/iscsi-target/snapshot_s3.go b/weed/storage/blockvol/iscsi/cmd/iscsi-target/snapshot_s3.go new file mode 100644 index 000000000..927b10343 --- /dev/null +++ b/weed/storage/blockvol/iscsi/cmd/iscsi-target/snapshot_s3.go @@ -0,0 +1,241 @@ +package main + +import ( + "bytes" + "encoding/json" + "io" + "log" + "net/http" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" +) + +// exportRequest is the JSON body for POST /export. +type exportRequest struct { + Bucket string `json:"bucket"` + KeyPrefix string `json:"key_prefix"` + S3Endpoint string `json:"s3_endpoint"` + S3AccessKey string `json:"s3_access_key"` + S3SecretKey string `json:"s3_secret_key"` + S3Region string `json:"s3_region"` + SnapshotID uint32 `json:"snapshot_id"` // 0 = auto-create temp snapshot +} + +// importRequest is the JSON body for POST /import. +type importRequest struct { + Bucket string `json:"bucket"` + ManifestKey string `json:"manifest_key"` + S3Endpoint string `json:"s3_endpoint"` + S3AccessKey string `json:"s3_access_key"` + S3SecretKey string `json:"s3_secret_key"` + S3Region string `json:"s3_region"` + AllowOverwrite bool `json:"allow_overwrite"` +} + +// exportResponse is the JSON response for POST /export. +type exportResponse struct { + OK bool `json:"ok"` + ManifestKey string `json:"manifest_key"` + DataKey string `json:"data_key"` + SizeBytes uint64 `json:"size_bytes"` + SHA256 string `json:"sha256"` +} + +// importResponse is the JSON response for POST /import. +type importResponse struct { + OK bool `json:"ok"` + SizeBytes uint64 `json:"size_bytes"` + SHA256 string `json:"sha256"` +} + +func newS3Session(endpoint, accessKey, secretKey, region string) (*session.Session, error) { + if region == "" { + region = "us-east-1" + } + cfg := &aws.Config{ + Region: aws.String(region), + Endpoint: aws.String(endpoint), + S3ForcePathStyle: aws.Bool(true), + DisableSSL: aws.Bool(true), + } + if accessKey != "" && secretKey != "" { + cfg.Credentials = credentials.NewStaticCredentials(accessKey, secretKey, "") + } + return session.NewSession(cfg) +} + +func (a *adminServer) handleExport(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, `{"error":"method not allowed"}`, http.StatusMethodNotAllowed) + return + } + var req exportRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + jsonError(w, "bad json: "+err.Error(), http.StatusBadRequest) + return + } + if req.Bucket == "" || req.S3Endpoint == "" { + jsonError(w, "bucket and s3_endpoint are required", http.StatusBadRequest) + return + } + + sess, err := newS3Session(req.S3Endpoint, req.S3AccessKey, req.S3SecretKey, req.S3Region) + if err != nil { + jsonError(w, "s3 session: "+err.Error(), http.StatusInternalServerError) + return + } + + dataKey := req.KeyPrefix + "data.raw" + manifestKey := req.KeyPrefix + "manifest.json" + + // Pipe: export writes → S3 uploader reads. + pr, pw := io.Pipe() + uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) { + u.PartSize = 8 * 1024 * 1024 // 8MB parts + u.Concurrency = 1 + }) + + var manifest *blockvol.SnapshotArtifactManifest + var exportErr error + exportDone := make(chan struct{}) + + go func() { + defer close(exportDone) + defer pw.Close() + manifest, exportErr = a.vol.ExportSnapshot(r.Context(), pw, blockvol.ExportOptions{ + DataObjectKey: dataKey, + SnapshotID: req.SnapshotID, + }) + if exportErr != nil { + pw.CloseWithError(exportErr) + } + }() + + // Upload data object. + _, uploadErr := uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(req.Bucket), + Key: aws.String(dataKey), + Body: pr, + }) + <-exportDone + + if exportErr != nil { + jsonError(w, "export: "+exportErr.Error(), http.StatusInternalServerError) + return + } + if uploadErr != nil { + jsonError(w, "s3 upload data: "+uploadErr.Error(), http.StatusInternalServerError) + return + } + + // Upload manifest. + manifestData, err := blockvol.MarshalManifest(manifest) + if err != nil { + jsonError(w, "marshal manifest: "+err.Error(), http.StatusInternalServerError) + return + } + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(req.Bucket), + Key: aws.String(manifestKey), + Body: bytes.NewReader(manifestData), + }) + if err != nil { + jsonError(w, "s3 upload manifest: "+err.Error(), http.StatusInternalServerError) + return + } + + a.logger.Printf("admin: exported snapshot to s3://%s/%s (%d bytes, sha256=%s)", + req.Bucket, dataKey, manifest.DataSizeBytes, manifest.SHA256) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(exportResponse{ + OK: true, + ManifestKey: manifestKey, + DataKey: dataKey, + SizeBytes: manifest.DataSizeBytes, + SHA256: manifest.SHA256, + }) +} + +func (a *adminServer) handleImport(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, `{"error":"method not allowed"}`, http.StatusMethodNotAllowed) + return + } + var req importRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + jsonError(w, "bad json: "+err.Error(), http.StatusBadRequest) + return + } + if req.Bucket == "" || req.ManifestKey == "" || req.S3Endpoint == "" { + jsonError(w, "bucket, manifest_key, and s3_endpoint are required", http.StatusBadRequest) + return + } + + sess, err := newS3Session(req.S3Endpoint, req.S3AccessKey, req.S3SecretKey, req.S3Region) + if err != nil { + jsonError(w, "s3 session: "+err.Error(), http.StatusInternalServerError) + return + } + s3Client := s3.New(sess) + + // Download manifest. + manifestResp, err := s3Client.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(req.Bucket), + Key: aws.String(req.ManifestKey), + }) + if err != nil { + jsonError(w, "s3 get manifest: "+err.Error(), http.StatusInternalServerError) + return + } + manifestData, err := io.ReadAll(manifestResp.Body) + manifestResp.Body.Close() + if err != nil { + jsonError(w, "read manifest: "+err.Error(), http.StatusInternalServerError) + return + } + + manifest, err := blockvol.UnmarshalManifest(manifestData) + if err != nil { + jsonError(w, "invalid manifest: "+err.Error(), http.StatusBadRequest) + return + } + + // Download data object. + dataResp, err := s3Client.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(req.Bucket), + Key: aws.String(manifest.DataObjectKey), + }) + if err != nil { + jsonError(w, "s3 get data: "+err.Error(), http.StatusInternalServerError) + return + } + defer dataResp.Body.Close() + + // Import. + err = a.vol.ImportSnapshot(r.Context(), manifest, dataResp.Body, blockvol.ImportOptions{ + AllowOverwrite: req.AllowOverwrite, + }) + if err != nil { + jsonError(w, "import: "+err.Error(), http.StatusConflict) + return + } + + a.logger.Printf("admin: imported snapshot from s3://%s/%s (%d bytes, sha256=%s)", + req.Bucket, req.ManifestKey, manifest.DataSizeBytes, manifest.SHA256) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(importResponse{ + OK: true, + SizeBytes: manifest.DataSizeBytes, + SHA256: manifest.SHA256, + }) +} + +// Ensure log is used (for future debugging). +var _ = log.Printf diff --git a/weed/storage/blockvol/qa_snapshot_export_test.go b/weed/storage/blockvol/qa_snapshot_export_test.go new file mode 100644 index 000000000..87d1107cb --- /dev/null +++ b/weed/storage/blockvol/qa_snapshot_export_test.go @@ -0,0 +1,261 @@ +package blockvol + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "io" + "testing" +) + +// TestQA_SnapshotExport_TruncatedData verifies that import rejects +// a data stream that is shorter than the manifest declares. +func TestQA_SnapshotExport_TruncatedData(t *testing.T) { + srcVol := createExportTestVol(t, 64*1024) + if err := srcVol.WriteLBA(0, make([]byte, 4096)); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + var buf bytes.Buffer + manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{}) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + // Truncate to half the data. + truncated := buf.Bytes()[:buf.Len()/2] + + dstVol := createExportTestVol(t, 64*1024) + err = dstVol.ImportSnapshot(context.Background(), manifest, bytes.NewReader(truncated), ImportOptions{}) + if err == nil { + t.Fatal("expected error for truncated data") + } +} + +// TestQA_SnapshotExport_WrongChecksum verifies that import rejects +// data whose checksum doesn't match the manifest. +func TestQA_SnapshotExport_WrongChecksum(t *testing.T) { + srcVol := createExportTestVol(t, 64*1024) + if err := srcVol.WriteLBA(0, make([]byte, 4096)); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + var buf bytes.Buffer + manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{}) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + // Tamper with manifest checksum. + manifest.SHA256 = "0000000000000000000000000000000000000000000000000000000000000000" + + dstVol := createExportTestVol(t, 64*1024) + err = dstVol.ImportSnapshot(context.Background(), manifest, &buf, ImportOptions{}) + if err == nil { + t.Fatal("expected checksum mismatch error") + } +} + +// TestQA_SnapshotExport_CorruptedManifest verifies that corrupted +// manifest JSON is rejected. +func TestQA_SnapshotExport_CorruptedManifest(t *testing.T) { + _, err := UnmarshalManifest([]byte(`{this is not valid json`)) + if err == nil { + t.Fatal("expected error for corrupted manifest JSON") + } + + // Valid JSON but missing required fields. + _, err = UnmarshalManifest([]byte(`{"format_version": 1}`)) + if err == nil { + t.Fatal("expected error for manifest with missing fields") + } +} + +// TestQA_SnapshotExport_DataSizeMismatch verifies that import detects +// when the manifest declares a different data size than what's provided. +func TestQA_SnapshotExport_DataSizeMismatch(t *testing.T) { + srcVol := createExportTestVol(t, 64*1024) + + var buf bytes.Buffer + manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{}) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + // Inflate manifest data size to claim more data than exists. + manifest.DataSizeBytes = manifest.DataSizeBytes * 2 + + dstVol := createExportTestVol(t, 64*1024) + err = dstVol.ImportSnapshot(context.Background(), manifest, &buf, ImportOptions{}) + if err == nil { + t.Fatal("expected error for data size mismatch") + } +} + +// TestQA_SnapshotExport_RoundTripIntegrity writes a known multi-block pattern, +// exports, imports into a new volume, and verifies every byte matches. +func TestQA_SnapshotExport_RoundTripIntegrity(t *testing.T) { + volSize := uint64(64 * 1024) // 64KB = 16 blocks at 4KB + srcVol := createExportTestVol(t, volSize) + + // Write distinct patterns to multiple blocks. + for lba := uint64(0); lba < volSize/4096; lba++ { + block := make([]byte, 4096) + for i := range block { + block[i] = byte((lba*251 + uint64(i)*37) % 256) // deterministic pattern + } + if err := srcVol.WriteLBA(lba, block); err != nil { + t.Fatalf("WriteLBA %d: %v", lba, err) + } + } + + // Export. + var buf bytes.Buffer + manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{ + DataObjectKey: "integrity/data.raw", + }) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + // Verify manifest is valid. + if err := ValidateManifest(manifest); err != nil { + t.Fatalf("manifest invalid: %v", err) + } + + // Independently verify checksum of exported data. + h := sha256.Sum256(buf.Bytes()) + if manifest.SHA256 != hex.EncodeToString(h[:]) { + t.Fatal("manifest SHA256 doesn't match exported data") + } + + // Import into new volume. + dstVol := createExportTestVol(t, volSize) + err = dstVol.ImportSnapshot(context.Background(), manifest, bytes.NewReader(buf.Bytes()), ImportOptions{}) + if err != nil { + t.Fatalf("ImportSnapshot: %v", err) + } + + // Verify every block in target matches source pattern. + for lba := uint64(0); lba < volSize/4096; lba++ { + got, err := dstVol.ReadLBA(lba, 4096) + if err != nil { + t.Fatalf("ReadLBA %d: %v", lba, err) + } + expected := make([]byte, 4096) + for i := range expected { + expected[i] = byte((lba*251 + uint64(i)*37) % 256) + } + if !bytes.Equal(got, expected) { + t.Fatalf("block %d mismatch after import", lba) + } + } +} + +// TestQA_SnapshotExport_ContextCancellation verifies that export +// respects context cancellation. +func TestQA_SnapshotExport_ContextCancellation(t *testing.T) { + vol := createExportTestVol(t, 64*1024) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + var buf bytes.Buffer + _, err := vol.ExportSnapshot(ctx, &buf, ExportOptions{}) + if err == nil { + t.Fatal("expected context cancellation error") + } +} + +// TestQA_SnapshotExport_ManifestSerializationStable verifies that +// marshal → unmarshal produces the same manifest. +func TestQA_SnapshotExport_ManifestSerializationStable(t *testing.T) { + srcVol := createExportTestVol(t, 64*1024) + + var buf bytes.Buffer + manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{ + DataObjectKey: "stable/data.raw", + }) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + data, err := MarshalManifest(manifest) + if err != nil { + t.Fatalf("MarshalManifest: %v", err) + } + + got, err := UnmarshalManifest(data) + if err != nil { + t.Fatalf("UnmarshalManifest: %v", err) + } + + if got.SHA256 != manifest.SHA256 { + t.Errorf("SHA256 changed after round-trip: %q → %q", manifest.SHA256, got.SHA256) + } + if got.DataSizeBytes != manifest.DataSizeBytes { + t.Errorf("DataSizeBytes changed: %d → %d", manifest.DataSizeBytes, got.DataSizeBytes) + } +} + +// TestQA_SnapshotExport_ExportDuringLiveIO verifies export is safe +// while the volume is receiving writes. +func TestQA_SnapshotExport_ExportDuringLiveIO(t *testing.T) { + vol := createExportTestVol(t, 64*1024) + + // Write initial data. + data := make([]byte, 4096) + data[0] = 0x11 + if err := vol.WriteLBA(0, data); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Start export (creates snapshot internally). + var buf bytes.Buffer + manifest, err := vol.ExportSnapshot(context.Background(), &buf, ExportOptions{}) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + // Verify export captured the 0x11 data. + if buf.Bytes()[0] != 0x11 { + t.Errorf("exported first byte = 0x%02X, want 0x11", buf.Bytes()[0]) + } + + // Write new data after export snapshot was taken but before we check. + data[0] = 0x22 + if err := vol.WriteLBA(0, data); err != nil { + t.Fatalf("WriteLBA after export: %v", err) + } + + // Import the exported data into a new volume — should have 0x11, not 0x22. + dstVol := createExportTestVol(t, 64*1024) + err = dstVol.ImportSnapshot(context.Background(), manifest, bytes.NewReader(buf.Bytes()), ImportOptions{}) + if err != nil { + t.Fatalf("ImportSnapshot: %v", err) + } + + got, err := dstVol.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA: %v", err) + } + if got[0] != 0x11 { + t.Errorf("imported first byte = 0x%02X, want 0x11 (snapshot-time value)", got[0]) + } +} + +// TestQA_SnapshotExport_NonexistentSnapshotReject verifies export from +// a snapshot ID that doesn't exist. +func TestQA_SnapshotExport_NonexistentSnapshotReject(t *testing.T) { + vol := createExportTestVol(t, 64*1024) + + var buf bytes.Buffer + _, err := vol.ExportSnapshot(context.Background(), &buf, ExportOptions{SnapshotID: 999}) + if err == nil { + t.Fatal("expected error for nonexistent snapshot ID") + } +} + +// Ensure io package is used (for io.ReadFull reference in snapshot_export.go). +var _ = io.EOF diff --git a/weed/storage/blockvol/snapshot_export.go b/weed/storage/blockvol/snapshot_export.go new file mode 100644 index 000000000..fcd4b3d89 --- /dev/null +++ b/weed/storage/blockvol/snapshot_export.go @@ -0,0 +1,281 @@ +package blockvol + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "time" +) + +// Snapshot artifact format constants. +const ( + SnapshotArtifactFormatV1 = 1 + ArtifactLayoutSingleFile = "single-file" + ExportToolVersion = "sw-block-cp11a4" + exportChunkBlocks = 256 // read 256 blocks per chunk (1MB at 4KB block size) + exportTempSnapID = uint32(0xFFFFFFFE) +) + +// SnapshotArtifactManifest describes a snapshot export artifact. +type SnapshotArtifactManifest struct { + FormatVersion int `json:"format_version"` + SourceVolume string `json:"source_volume"` + SourceSizeBytes uint64 `json:"source_size_bytes"` + SourceBlockSize uint32 `json:"source_block_size"` + StorageProfile string `json:"storage_profile"` + CreatedAt string `json:"created_at"` + ArtifactLayout string `json:"artifact_layout"` + DataObjectKey string `json:"data_object_key"` + DataSizeBytes uint64 `json:"data_size_bytes"` + SHA256 string `json:"sha256"` + Compression string `json:"compression"` + ExportToolVersion string `json:"export_tool_version"` +} + +var ( + ErrUnsupportedArtifactVersion = errors.New("blockvol: unsupported artifact format version") + ErrUnsupportedArtifactLayout = errors.New("blockvol: unsupported artifact layout") + ErrUnsupportedProfileExport = errors.New("blockvol: only single profile supported for export/import") + ErrManifestMissingField = errors.New("blockvol: manifest missing required field") + ErrChecksumMismatch = errors.New("blockvol: data checksum mismatch") + ErrImportSizeMismatch = errors.New("blockvol: target volume size does not match manifest") + ErrImportBlockSizeMismatch = errors.New("blockvol: target block size does not match manifest") + ErrImportTargetNotEmpty = errors.New("blockvol: target volume is not empty (use AllowOverwrite)") + ErrImportDataShort = errors.New("blockvol: import data shorter than manifest declares") +) + +// MarshalManifest encodes a manifest as indented JSON. +func MarshalManifest(m *SnapshotArtifactManifest) ([]byte, error) { + return json.MarshalIndent(m, "", " ") +} + +// UnmarshalManifest decodes and validates a manifest from JSON. +func UnmarshalManifest(data []byte) (*SnapshotArtifactManifest, error) { + var m SnapshotArtifactManifest + if err := json.Unmarshal(data, &m); err != nil { + return nil, fmt.Errorf("blockvol: unmarshal manifest: %w", err) + } + if err := ValidateManifest(&m); err != nil { + return nil, err + } + return &m, nil +} + +// ValidateManifest checks that all required fields are present and supported. +func ValidateManifest(m *SnapshotArtifactManifest) error { + if m.FormatVersion != SnapshotArtifactFormatV1 { + return fmt.Errorf("%w: %d", ErrUnsupportedArtifactVersion, m.FormatVersion) + } + if m.ArtifactLayout != ArtifactLayoutSingleFile { + return fmt.Errorf("%w: %s", ErrUnsupportedArtifactLayout, m.ArtifactLayout) + } + if m.StorageProfile != "single" { + return fmt.Errorf("%w: %s", ErrUnsupportedProfileExport, m.StorageProfile) + } + if m.SourceSizeBytes == 0 { + return fmt.Errorf("%w: source_size_bytes", ErrManifestMissingField) + } + if m.SourceBlockSize == 0 { + return fmt.Errorf("%w: source_block_size", ErrManifestMissingField) + } + if m.SHA256 == "" { + return fmt.Errorf("%w: sha256", ErrManifestMissingField) + } + if m.DataSizeBytes == 0 { + return fmt.Errorf("%w: data_size_bytes", ErrManifestMissingField) + } + return nil +} + +// ExportOptions configures a snapshot export. +type ExportOptions struct { + DataObjectKey string // S3 object key for the data object (stored in manifest) + SnapshotID uint32 // if > 0, export from existing snapshot; if 0, create+delete temp +} + +// ExportSnapshot exports a crash-consistent snapshot of the volume to w. +// If opts.SnapshotID is 0, a temporary snapshot is created and deleted after export. +// If opts.SnapshotID is > 0, the existing snapshot is used (not deleted). +// The full logical volume image is streamed to w with SHA-256 computed inline. +func (v *BlockVol) ExportSnapshot(ctx context.Context, w io.Writer, opts ExportOptions) (*SnapshotArtifactManifest, error) { + if v.Profile() != ProfileSingle { + return nil, ErrUnsupportedProfileExport + } + + info := v.Info() + snapID := opts.SnapshotID + deleteSnap := false + + if snapID == 0 { + snapID = exportTempSnapID + if err := v.CreateSnapshot(snapID); err != nil { + return nil, fmt.Errorf("blockvol: export create temp snapshot: %w", err) + } + deleteSnap = true + defer func() { + if deleteSnap { + v.DeleteSnapshot(snapID) + } + }() + } else { + // Verify snapshot exists. + v.snapMu.RLock() + _, ok := v.snapshots[snapID] + v.snapMu.RUnlock() + if !ok { + return nil, ErrSnapshotNotFound + } + } + + h := sha256.New() + mw := io.MultiWriter(w, h) + + totalBlocks := info.VolumeSize / uint64(info.BlockSize) + chunkBlocks := uint64(exportChunkBlocks) + var totalWritten uint64 + + for startBlock := uint64(0); startBlock < totalBlocks; startBlock += chunkBlocks { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + remaining := totalBlocks - startBlock + n := chunkBlocks + if n > remaining { + n = remaining + } + readBytes := uint32(n * uint64(info.BlockSize)) + + data, err := v.ReadSnapshot(snapID, startBlock, readBytes) + if err != nil { + return nil, fmt.Errorf("blockvol: export read block %d: %w", startBlock, err) + } + + if _, err := mw.Write(data); err != nil { + return nil, fmt.Errorf("blockvol: export write: %w", err) + } + totalWritten += uint64(len(data)) + } + + manifest := &SnapshotArtifactManifest{ + FormatVersion: SnapshotArtifactFormatV1, + SourceVolume: v.Path(), + SourceSizeBytes: info.VolumeSize, + SourceBlockSize: info.BlockSize, + StorageProfile: "single", + CreatedAt: time.Now().UTC().Format(time.RFC3339), + ArtifactLayout: ArtifactLayoutSingleFile, + DataObjectKey: opts.DataObjectKey, + DataSizeBytes: totalWritten, + SHA256: hex.EncodeToString(h.Sum(nil)), + Compression: "none", + ExportToolVersion: ExportToolVersion, + } + + return manifest, nil +} + +// ImportOptions configures a snapshot import. +type ImportOptions struct { + AllowOverwrite bool // if true, allow import into a non-empty volume +} + +// ImportSnapshot imports a snapshot artifact into this volume. +// The volume must match the manifest's size and block size. +// Data is read from r and written directly to the extent region, bypassing the WAL. +// SHA-256 is verified against the manifest after the full read. +func (v *BlockVol) ImportSnapshot(ctx context.Context, manifest *SnapshotArtifactManifest, r io.Reader, opts ImportOptions) error { + if err := ValidateManifest(manifest); err != nil { + return err + } + + info := v.Info() + if info.VolumeSize != manifest.SourceSizeBytes { + return fmt.Errorf("%w: target=%d manifest=%d", ErrImportSizeMismatch, info.VolumeSize, manifest.SourceSizeBytes) + } + if info.BlockSize != manifest.SourceBlockSize { + return fmt.Errorf("%w: target=%d manifest=%d", ErrImportBlockSizeMismatch, info.BlockSize, manifest.SourceBlockSize) + } + + // Empty check: nextLSN > 1 means the volume has been written to. + if !opts.AllowOverwrite && v.nextLSN.Load() > 1 { + return ErrImportTargetNotEmpty + } + + // Pause flusher — we write directly to extent. + if err := v.flusher.PauseAndFlush(); err != nil { + v.flusher.Resume() + return fmt.Errorf("blockvol: import flush: %w", err) + } + defer v.flusher.Resume() + + // Stream data to extent, computing SHA-256. + h := sha256.New() + tr := io.TeeReader(r, h) + + extentStart := v.super.WALOffset + v.super.WALSize + chunkSize := uint64(exportChunkBlocks) * uint64(info.BlockSize) + buf := make([]byte, chunkSize) + var totalRead uint64 + + for totalRead < manifest.DataSizeBytes { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + remaining := manifest.DataSizeBytes - totalRead + readSize := chunkSize + if readSize > remaining { + readSize = remaining + } + + n, err := io.ReadFull(tr, buf[:readSize]) + if n > 0 { + writeOff := int64(extentStart) + int64(totalRead) + if _, werr := v.fd.WriteAt(buf[:n], writeOff); werr != nil { + return fmt.Errorf("blockvol: import write at offset %d: %w", writeOff, werr) + } + totalRead += uint64(n) + } + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + return fmt.Errorf("blockvol: import read: %w", err) + } + } + + if totalRead != manifest.DataSizeBytes { + return fmt.Errorf("%w: read %d bytes, manifest declares %d", ErrImportDataShort, totalRead, manifest.DataSizeBytes) + } + + // Verify checksum. + got := hex.EncodeToString(h.Sum(nil)) + if got != manifest.SHA256 { + return fmt.Errorf("%w: got %s, want %s", ErrChecksumMismatch, got, manifest.SHA256) + } + + // Fsync extent. + if err := v.fd.Sync(); err != nil { + return fmt.Errorf("blockvol: import sync: %w", err) + } + + // Reset WAL and dirty map for clean state. + v.dirtyMap.Clear() + v.wal.Reset() + v.super.WALHead = 0 + v.super.WALTail = 0 + if err := v.persistSuperblock(); err != nil { + return fmt.Errorf("blockvol: import persist superblock: %w", err) + } + + return nil +} diff --git a/weed/storage/blockvol/snapshot_export_test.go b/weed/storage/blockvol/snapshot_export_test.go new file mode 100644 index 000000000..3dfb446c7 --- /dev/null +++ b/weed/storage/blockvol/snapshot_export_test.go @@ -0,0 +1,368 @@ +package blockvol + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "path/filepath" + "testing" +) + +func TestManifest_RoundTrip(t *testing.T) { + m := &SnapshotArtifactManifest{ + FormatVersion: SnapshotArtifactFormatV1, + SourceVolume: "/data/vol1", + SourceSizeBytes: 1 << 20, + SourceBlockSize: 4096, + StorageProfile: "single", + CreatedAt: "2026-03-13T00:00:00Z", + ArtifactLayout: ArtifactLayoutSingleFile, + DataObjectKey: "backup/vol1/data.raw", + DataSizeBytes: 1 << 20, + SHA256: "abcd1234", + Compression: "none", + ExportToolVersion: ExportToolVersion, + } + + data, err := MarshalManifest(m) + if err != nil { + t.Fatalf("MarshalManifest: %v", err) + } + + got, err := UnmarshalManifest(data) + if err != nil { + t.Fatalf("UnmarshalManifest: %v", err) + } + + if got.FormatVersion != m.FormatVersion { + t.Errorf("FormatVersion = %d, want %d", got.FormatVersion, m.FormatVersion) + } + if got.SourceSizeBytes != m.SourceSizeBytes { + t.Errorf("SourceSizeBytes = %d, want %d", got.SourceSizeBytes, m.SourceSizeBytes) + } + if got.SHA256 != m.SHA256 { + t.Errorf("SHA256 = %q, want %q", got.SHA256, m.SHA256) + } + if got.DataObjectKey != m.DataObjectKey { + t.Errorf("DataObjectKey = %q, want %q", got.DataObjectKey, m.DataObjectKey) + } +} + +func TestManifest_Validate_BadVersion(t *testing.T) { + m := validManifest() + m.FormatVersion = 99 + if err := ValidateManifest(m); err == nil { + t.Fatal("expected error for bad version") + } +} + +func TestManifest_Validate_BadProfile(t *testing.T) { + m := validManifest() + m.StorageProfile = "striped" + if err := ValidateManifest(m); err == nil { + t.Fatal("expected error for bad profile") + } +} + +func TestManifest_Validate_BadLayout(t *testing.T) { + m := validManifest() + m.ArtifactLayout = "multi-part" + if err := ValidateManifest(m); err == nil { + t.Fatal("expected error for bad layout") + } +} + +func TestManifest_Validate_MissingFields(t *testing.T) { + cases := []struct { + name string + mutate func(*SnapshotArtifactManifest) + }{ + {"no size", func(m *SnapshotArtifactManifest) { m.SourceSizeBytes = 0 }}, + {"no block size", func(m *SnapshotArtifactManifest) { m.SourceBlockSize = 0 }}, + {"no sha256", func(m *SnapshotArtifactManifest) { m.SHA256 = "" }}, + {"no data size", func(m *SnapshotArtifactManifest) { m.DataSizeBytes = 0 }}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + m := validManifest() + tc.mutate(m) + if err := ValidateManifest(m); err == nil { + t.Fatal("expected error") + } + }) + } +} + +func TestExportSnapshot_Basic(t *testing.T) { + vol := createExportTestVol(t, 64*1024) // 64KB volume + + // Write known pattern. + data := make([]byte, 4096) + for i := range data { + data[i] = 0xAA + } + if err := vol.WriteLBA(0, data); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + var buf bytes.Buffer + manifest, err := vol.ExportSnapshot(context.Background(), &buf, ExportOptions{ + DataObjectKey: "test/data.raw", + }) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + if manifest.DataSizeBytes != 64*1024 { + t.Errorf("DataSizeBytes = %d, want %d", manifest.DataSizeBytes, 64*1024) + } + if manifest.SourceSizeBytes != 64*1024 { + t.Errorf("SourceSizeBytes = %d, want %d", manifest.SourceSizeBytes, 64*1024) + } + if manifest.SHA256 == "" { + t.Error("SHA256 is empty") + } + if manifest.StorageProfile != "single" { + t.Errorf("StorageProfile = %q, want single", manifest.StorageProfile) + } + if uint64(buf.Len()) != manifest.DataSizeBytes { + t.Errorf("buffer len = %d, want %d", buf.Len(), manifest.DataSizeBytes) + } +} + +func TestExportSnapshot_ChecksumCorrect(t *testing.T) { + vol := createExportTestVol(t, 64*1024) + + data := make([]byte, 4096) + for i := range data { + data[i] = 0xBB + } + if err := vol.WriteLBA(0, data); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + var buf bytes.Buffer + manifest, err := vol.ExportSnapshot(context.Background(), &buf, ExportOptions{}) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + // Independently compute SHA-256 of the exported data. + h := sha256.Sum256(buf.Bytes()) + want := hex.EncodeToString(h[:]) + if manifest.SHA256 != want { + t.Errorf("manifest SHA256 = %q, independently computed = %q", manifest.SHA256, want) + } +} + +func TestExportSnapshot_ExistingSnapshot(t *testing.T) { + vol := createExportTestVol(t, 64*1024) + + data := make([]byte, 4096) + data[0] = 0xCC + if err := vol.WriteLBA(0, data); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Create snapshot manually. + if err := vol.CreateSnapshot(42); err != nil { + t.Fatalf("CreateSnapshot: %v", err) + } + + // Write more data after snapshot. + data[0] = 0xDD + if err := vol.WriteLBA(0, data); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + // Export from existing snapshot — should get pre-write data. + var buf bytes.Buffer + _, err := vol.ExportSnapshot(context.Background(), &buf, ExportOptions{SnapshotID: 42}) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + // First byte should be 0xCC (snapshot-time), not 0xDD (current). + if buf.Bytes()[0] != 0xCC { + t.Errorf("exported first byte = 0x%02X, want 0xCC", buf.Bytes()[0]) + } + + vol.DeleteSnapshot(42) +} + +func TestExportSnapshot_ProfileReject(t *testing.T) { + vol := createExportTestVol(t, 64*1024) + // Force profile to non-single for test. + vol.super.StorageProfile = uint8(ProfileStriped) + + var buf bytes.Buffer + _, err := vol.ExportSnapshot(context.Background(), &buf, ExportOptions{}) + if err == nil { + t.Fatal("expected error for non-single profile") + } +} + +func TestImportSnapshot_Basic(t *testing.T) { + // Create source, write pattern, export. + srcVol := createExportTestVol(t, 64*1024) + pattern := make([]byte, 4096) + for i := range pattern { + pattern[i] = byte(i % 251) // prime-mod pattern for uniqueness + } + if err := srcVol.WriteLBA(0, pattern); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + var buf bytes.Buffer + manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{ + DataObjectKey: "test/data.raw", + }) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + // Create target, import. + dstVol := createExportTestVol(t, 64*1024) + err = dstVol.ImportSnapshot(context.Background(), manifest, &buf, ImportOptions{}) + if err != nil { + t.Fatalf("ImportSnapshot: %v", err) + } + + // Read back and verify pattern. + got, err := dstVol.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA: %v", err) + } + if !bytes.Equal(got, pattern) { + t.Error("imported data does not match source pattern") + } +} + +func TestImportSnapshot_SizeMismatch(t *testing.T) { + srcVol := createExportTestVol(t, 64*1024) + var buf bytes.Buffer + manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{}) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + // Create target with different size. + dstVol := createExportTestVol(t, 128*1024) + err = dstVol.ImportSnapshot(context.Background(), manifest, &buf, ImportOptions{}) + if err == nil { + t.Fatal("expected size mismatch error") + } +} + +func TestImportSnapshot_NonEmptyReject(t *testing.T) { + srcVol := createExportTestVol(t, 64*1024) + var buf bytes.Buffer + manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{}) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + // Create target and write to it (makes it non-empty). + dstVol := createExportTestVol(t, 64*1024) + if err := dstVol.WriteLBA(0, make([]byte, 4096)); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + err = dstVol.ImportSnapshot(context.Background(), manifest, &buf, ImportOptions{}) + if err == nil { + t.Fatal("expected non-empty target error") + } +} + +func TestImportSnapshot_AllowOverwrite(t *testing.T) { + // Source with known pattern. + srcVol := createExportTestVol(t, 64*1024) + pattern := make([]byte, 4096) + pattern[0] = 0xEE + if err := srcVol.WriteLBA(0, pattern); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + var buf bytes.Buffer + manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{}) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + // Target with existing data. + dstVol := createExportTestVol(t, 64*1024) + if err := dstVol.WriteLBA(0, make([]byte, 4096)); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + err = dstVol.ImportSnapshot(context.Background(), manifest, &buf, ImportOptions{AllowOverwrite: true}) + if err != nil { + t.Fatalf("ImportSnapshot with AllowOverwrite: %v", err) + } + + got, err := dstVol.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA: %v", err) + } + if got[0] != 0xEE { + t.Errorf("overwritten data first byte = 0x%02X, want 0xEE", got[0]) + } +} + +func TestImportSnapshot_ChecksumMismatch(t *testing.T) { + srcVol := createExportTestVol(t, 64*1024) + if err := srcVol.WriteLBA(0, make([]byte, 4096)); err != nil { + t.Fatalf("WriteLBA: %v", err) + } + + var buf bytes.Buffer + manifest, err := srcVol.ExportSnapshot(context.Background(), &buf, ExportOptions{}) + if err != nil { + t.Fatalf("ExportSnapshot: %v", err) + } + + // Corrupt the data. + exported := buf.Bytes() + exported[0] ^= 0xFF + + dstVol := createExportTestVol(t, 64*1024) + err = dstVol.ImportSnapshot(context.Background(), manifest, bytes.NewReader(exported), ImportOptions{}) + if err == nil { + t.Fatal("expected checksum mismatch error") + } +} + +// createExportTestVol creates a temporary BlockVol with a specified size for export tests. +func createExportTestVol(t *testing.T, volumeSize uint64) *BlockVol { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "test.vol") + vol, err := CreateBlockVol(path, CreateOptions{ + VolumeSize: volumeSize, + WALSize: 32 << 10, // 32KB WAL + }) + if err != nil { + t.Fatalf("CreateBlockVol: %v", err) + } + t.Cleanup(func() { vol.Close() }) + return vol +} + +// validManifest returns a valid manifest for test mutation. +func validManifest() *SnapshotArtifactManifest { + return &SnapshotArtifactManifest{ + FormatVersion: SnapshotArtifactFormatV1, + SourceVolume: "/data/vol1", + SourceSizeBytes: 1 << 20, + SourceBlockSize: 4096, + StorageProfile: "single", + CreatedAt: "2026-03-13T00:00:00Z", + ArtifactLayout: ArtifactLayoutSingleFile, + DataObjectKey: "backup/data.raw", + DataSizeBytes: 1 << 20, + SHA256: "abc123", + Compression: "none", + ExportToolVersion: ExportToolVersion, + } +}