From 6b2b44245048f4e94f9a066053fac656131efbd7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 15 Mar 2026 20:21:50 -0700 Subject: [PATCH] iceberg: detect maintenance work per operation (#8639) * iceberg: detect maintenance work per operation * iceberg: ignore delete manifests during detection * iceberg: clean up detection maintenance planning * iceberg: tighten detection manifest heuristics * Potential fix for code scanning alert no. 330: Incorrect conversion between integer types Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * iceberg: tolerate per-operation detection errors * iceberg: fix fake metadata location versioning * iceberg: check snapshot expiry before manifest loads * iceberg: make expire-snapshots switch case explicit --------- Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> --- weed/plugin/worker/iceberg/config.go | 5 +- weed/plugin/worker/iceberg/detection.go | 253 ++++++++++- weed/plugin/worker/iceberg/exec_test.go | 470 ++++++++++++++++++++- weed/plugin/worker/iceberg/handler_test.go | 89 ++-- weed/plugin/worker/iceberg/operations.go | 66 +-- 5 files changed, 806 insertions(+), 77 deletions(-) diff --git a/weed/plugin/worker/iceberg/config.go b/weed/plugin/worker/iceberg/config.go index 45ab867ff..8da2ce268 100644 --- a/weed/plugin/worker/iceberg/config.go +++ b/weed/plugin/worker/iceberg/config.go @@ -19,6 +19,7 @@ const ( defaultTargetFileSizeMB = 256 defaultMinInputFiles = 5 defaultMinManifestsToRewrite = 5 + minManifestsToRewrite = 2 defaultOperations = "all" // Metric keys returned by maintenance operations. @@ -80,8 +81,8 @@ func ParseConfig(values map[string]*plugin_pb.ConfigValue) Config { if cfg.MinInputFiles < 2 { cfg.MinInputFiles = defaultMinInputFiles } - if cfg.MinManifestsToRewrite < 2 { - cfg.MinManifestsToRewrite = 2 + if cfg.MinManifestsToRewrite < minManifestsToRewrite { + cfg.MinManifestsToRewrite = minManifestsToRewrite } return cfg diff --git a/weed/plugin/worker/iceberg/detection.go b/weed/plugin/worker/iceberg/detection.go index 405e37017..4523af10c 100644 --- a/weed/plugin/worker/iceberg/detection.go +++ b/weed/plugin/worker/iceberg/detection.go @@ -1,6 +1,7 @@ package iceberg import ( + "bytes" "context" "encoding/json" "fmt" @@ -8,6 +9,7 @@ import ( "strings" "time" + "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/table" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -18,11 +20,12 @@ import ( // tableInfo captures metadata about a table for detection/execution. type tableInfo struct { - BucketName string - Namespace string - TableName string - TablePath string // namespace/tableName - Metadata table.Metadata + BucketName string + Namespace string + TableName string + TablePath string // namespace/tableName + MetadataFileName string + Metadata table.Metadata } // scanTablesForMaintenance enumerates table buckets and their tables, @@ -37,13 +40,16 @@ func (h *Handler) scanTablesForMaintenance( limit int, ) ([]tableInfo, error) { var tables []tableInfo + ops, err := parseOperations(config.Operations) + if err != nil { + return nil, fmt.Errorf("parse operations: %w", err) + } // Compile wildcard matchers once (nil = match all) bucketMatchers := wildcard.CompileWildcardMatchers(bucketFilter) nsMatchers := wildcard.CompileWildcardMatchers(namespaceFilter) tableMatchers := wildcard.CompileWildcardMatchers(tableFilter) - // List entries under /buckets to find table buckets bucketsPath := s3tables.TablesPath bucketEntries, err := listFilerEntries(ctx, filerClient, bucketsPath, "") if err != nil { @@ -117,7 +123,8 @@ func (h *Handler) scanTablesForMaintenance( // Parse the internal metadata to get FullMetadata var internalMeta struct { - Metadata *struct { + MetadataLocation string `json:"metadataLocation,omitempty"` + Metadata *struct { FullMetadata json.RawMessage `json:"fullMetadata,omitempty"` } `json:"metadata,omitempty"` } @@ -135,13 +142,21 @@ func (h *Handler) scanTablesForMaintenance( continue } - if needsMaintenance(icebergMeta, config) { + tablePath := path.Join(nsName, tblName) + metadataFileName := metadataFileNameFromLocation(internalMeta.MetadataLocation, bucketName, tablePath) + needsWork, err := h.tableNeedsMaintenance(ctx, filerClient, bucketName, tablePath, icebergMeta, metadataFileName, config, ops) + if err != nil { + glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot evaluate maintenance need: %v", bucketName, nsName, tblName, err) + continue + } + if needsWork { tables = append(tables, tableInfo{ - BucketName: bucketName, - Namespace: nsName, - TableName: tblName, - TablePath: path.Join(nsName, tblName), - Metadata: icebergMeta, + BucketName: bucketName, + Namespace: nsName, + TableName: tblName, + TablePath: tablePath, + MetadataFileName: metadataFileName, + Metadata: icebergMeta, }) if limit > 0 && len(tables) > limit { return tables, nil @@ -154,8 +169,216 @@ func (h *Handler) scanTablesForMaintenance( return tables, nil } -// needsMaintenance checks if a table needs any maintenance based on -// metadata-only thresholds (no manifest reading). +func normalizeDetectionConfig(config Config) Config { + normalized := config + if normalized.TargetFileSizeBytes <= 0 { + normalized.TargetFileSizeBytes = defaultTargetFileSizeMB * 1024 * 1024 + } + if normalized.MinInputFiles < 2 { + normalized.MinInputFiles = defaultMinInputFiles + } + if normalized.MinManifestsToRewrite < minManifestsToRewrite { + normalized.MinManifestsToRewrite = minManifestsToRewrite + } + if normalized.OrphanOlderThanHours <= 0 { + normalized.OrphanOlderThanHours = defaultOrphanOlderThanHours + } + return normalized +} + +func (h *Handler) tableNeedsMaintenance( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + meta table.Metadata, + metadataFileName string, + config Config, + ops []string, +) (bool, error) { + config = normalizeDetectionConfig(config) + + // Evaluate the metadata-only expiration check first so large tables do not + // pay for manifest reads when snapshot expiry already makes them eligible. + for _, op := range ops { + if op == "expire_snapshots" && needsMaintenance(meta, config) { + return true, nil + } + } + + loadManifests := func() ([]iceberg.ManifestFile, error) { + return loadCurrentManifests(ctx, filerClient, bucketName, tablePath, meta) + } + + var currentManifests []iceberg.ManifestFile + var manifestsErr error + manifestsLoaded := false + getCurrentManifests := func() ([]iceberg.ManifestFile, error) { + if manifestsLoaded { + return currentManifests, manifestsErr + } + currentManifests, manifestsErr = loadManifests() + manifestsLoaded = true + return currentManifests, manifestsErr + } + var opEvalErrors []string + + for _, op := range ops { + switch op { + case "expire_snapshots": + // Handled by the metadata-only check above. + continue + case "compact": + manifests, err := getCurrentManifests() + if err != nil { + opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err)) + continue + } + eligible, err := hasEligibleCompaction(ctx, filerClient, bucketName, tablePath, manifests, config) + if err != nil { + opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err)) + continue + } + if eligible { + return true, nil + } + case "rewrite_manifests": + manifests, err := getCurrentManifests() + if err != nil { + opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err)) + continue + } + if countDataManifests(manifests) >= config.MinManifestsToRewrite { + return true, nil + } + case "remove_orphans": + if metadataFileName == "" { + _, currentMetadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath) + if err != nil { + opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err)) + continue + } + metadataFileName = currentMetadataFileName + } + orphanCandidates, err := collectOrphanCandidates(ctx, filerClient, bucketName, tablePath, meta, metadataFileName, config.OrphanOlderThanHours) + if err != nil { + opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err)) + continue + } + if len(orphanCandidates) > 0 { + return true, nil + } + } + } + + if len(opEvalErrors) > 0 { + return false, fmt.Errorf("evaluate maintenance operations: %s", strings.Join(opEvalErrors, "; ")) + } + + return false, nil +} + +func metadataFileNameFromLocation(location, bucketName, tablePath string) string { + if location == "" { + return "" + } + return path.Base(normalizeIcebergPath(location, bucketName, tablePath)) +} + +func countDataManifests(manifests []iceberg.ManifestFile) int64 { + var count int64 + for _, mf := range manifests { + if mf.ManifestContent() == iceberg.ManifestContentData { + count++ + } + } + return count +} + +func loadCurrentManifests( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + meta table.Metadata, +) ([]iceberg.ManifestFile, error) { + currentSnap := meta.CurrentSnapshot() + if currentSnap == nil || currentSnap.ManifestList == "" { + return nil, nil + } + + manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList) + if err != nil { + return nil, fmt.Errorf("read manifest list: %w", err) + } + manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData)) + if err != nil { + return nil, fmt.Errorf("parse manifest list: %w", err) + } + return manifests, nil +} + +func hasEligibleCompaction( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + manifests []iceberg.ManifestFile, + config Config, +) (bool, error) { + if len(manifests) == 0 { + return false, nil + } + + minInputFiles, err := compactionMinInputFiles(config.MinInputFiles) + if err != nil { + return false, err + } + + var dataManifests []iceberg.ManifestFile + specIDs := make(map[int32]struct{}) + for _, mf := range manifests { + if mf.ManifestContent() != iceberg.ManifestContentData { + continue + } + dataManifests = append(dataManifests, mf) + specIDs[mf.PartitionSpecID()] = struct{}{} + } + if len(dataManifests) == 0 { + return false, nil + } + if len(specIDs) > 1 { + return false, nil + } + + var allEntries []iceberg.ManifestEntry + for _, mf := range dataManifests { + manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath()) + if err != nil { + return false, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err) + } + entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) + if err != nil { + return false, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err) + } + allEntries = append(allEntries, entries...) + } + + bins := buildCompactionBins(allEntries, config.TargetFileSizeBytes, minInputFiles) + return len(bins) > 0, nil +} + +func compactionMinInputFiles(minInputFiles int64) (int, error) { + // Ensure the configured value is positive and fits into the platform's int type + if minInputFiles <= 0 { + return 0, fmt.Errorf("min input files must be positive, got %d", minInputFiles) + } + maxInt := int64(^uint(0) >> 1) + if minInputFiles > maxInt { + return 0, fmt.Errorf("min input files %d exceeds platform int size", minInputFiles) + } + return int(minInputFiles), nil +} + +// needsMaintenance checks whether snapshot expiration work is needed based on +// metadata-only thresholds. func needsMaintenance(meta table.Metadata, config Config) bool { snapshots := meta.Snapshots() if len(snapshots) == 0 { diff --git a/weed/plugin/worker/iceberg/exec_test.go b/weed/plugin/worker/iceberg/exec_test.go index c59edea46..25c3cc276 100644 --- a/weed/plugin/worker/iceberg/exec_test.go +++ b/weed/plugin/worker/iceberg/exec_test.go @@ -243,8 +243,10 @@ func populateTable(t *testing.T, fs *fakeFilerServer, setup tableSetup) table.Me } // Build internal metadata xattr + const metadataVersion = 1 internalMeta := map[string]interface{}{ - "metadataVersion": 1, + "metadataVersion": metadataVersion, + "metadataLocation": path.Join("metadata", fmt.Sprintf("v%d.metadata.json", metadataVersion)), "metadata": map[string]interface{}{ "fullMetadata": json.RawMessage(fullMetadataJSON), }, @@ -370,6 +372,66 @@ func populateTable(t *testing.T, fs *fakeFilerServer, setup tableSetup) table.Me return meta } +func writeCurrentSnapshotManifests(t *testing.T, fs *fakeFilerServer, setup tableSetup, meta table.Metadata, manifestEntries [][]iceberg.ManifestEntry) { + t.Helper() + + currentSnap := meta.CurrentSnapshot() + if currentSnap == nil { + t.Fatal("current snapshot is required") + } + + metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") + version := meta.Version() + schema := meta.CurrentSchema() + spec := meta.PartitionSpec() + + var manifests []iceberg.ManifestFile + for i, entries := range manifestEntries { + manifestName := fmt.Sprintf("detect-manifest-%d.avro", i+1) + var manifestBuf bytes.Buffer + mf, err := iceberg.WriteManifest( + path.Join("metadata", manifestName), + &manifestBuf, + version, + spec, + schema, + currentSnap.SnapshotID, + entries, + ) + if err != nil { + t.Fatalf("write manifest %d: %v", i+1, err) + } + fs.putEntry(metaDir, manifestName, &filer_pb.Entry{ + Name: manifestName, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(manifestBuf.Len()), + }, + Content: manifestBuf.Bytes(), + }) + manifests = append(manifests, mf) + } + + var manifestListBuf bytes.Buffer + seqNum := currentSnap.SequenceNumber + if err := iceberg.WriteManifestList(version, &manifestListBuf, currentSnap.SnapshotID, currentSnap.ParentSnapshotID, &seqNum, 0, manifests); err != nil { + t.Fatalf("write current manifest list: %v", err) + } + fs.putEntry(metaDir, path.Base(currentSnap.ManifestList), &filer_pb.Entry{ + Name: path.Base(currentSnap.ManifestList), + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(manifestListBuf.Len()), + }, + Content: manifestListBuf.Bytes(), + }) +} + +func makeManifestEntries(t *testing.T, specs []testEntrySpec, snapshotID int64) []iceberg.ManifestEntry { + t.Helper() + return makeManifestEntriesWithSnapshot(t, specs, snapshotID, iceberg.EntryStatusADDED) +} + // --------------------------------------------------------------------------- // Recording senders for Execute tests // --------------------------------------------------------------------------- @@ -934,6 +996,412 @@ func TestConnectToFilerFailsWhenAllAddressesAreUnreachable(t *testing.T) { } } +func TestDetectSchedulesCompactionWithoutSnapshotPressure(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + }, + } + meta := populateTable(t, fs, setup) + writeCurrentSnapshotManifests(t, fs, setup, meta, [][]iceberg.ManifestEntry{ + makeManifestEntries(t, []testEntrySpec{ + {path: "data/small-1.parquet", size: 1024, partition: map[int]any{}}, + {path: "data/small-2.parquet", size: 1024, partition: map[int]any{}}, + {path: "data/small-3.parquet", size: 1024, partition: map[int]any{}}, + }, 1), + }) + + handler := NewHandler(nil) + config := Config{ + SnapshotRetentionHours: 24 * 365, + MaxSnapshotsToKeep: 10, + TargetFileSizeBytes: 4096, + MinInputFiles: 2, + Operations: "compact", + } + + tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scanTablesForMaintenance failed: %v", err) + } + if len(tables) != 1 { + t.Fatalf("expected 1 compaction candidate, got %d", len(tables)) + } +} + +func TestDetectSchedulesCompactionWithDeleteManifestPresent(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + }, + } + meta := populateTable(t, fs, setup) + + currentSnap := meta.CurrentSnapshot() + if currentSnap == nil { + t.Fatal("current snapshot is required") + } + + metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") + version := meta.Version() + schema := meta.CurrentSchema() + spec := meta.PartitionSpec() + + dataEntries := makeManifestEntries(t, []testEntrySpec{ + {path: "data/small-1.parquet", size: 1024, partition: map[int]any{}}, + {path: "data/small-2.parquet", size: 1024, partition: map[int]any{}}, + {path: "data/small-3.parquet", size: 1024, partition: map[int]any{}}, + }, currentSnap.SnapshotID) + + var dataManifestBuf bytes.Buffer + dataManifestName := "detect-manifest-1.avro" + dataManifest, err := iceberg.WriteManifest( + path.Join("metadata", dataManifestName), + &dataManifestBuf, + version, + spec, + schema, + currentSnap.SnapshotID, + dataEntries, + ) + if err != nil { + t.Fatalf("write data manifest: %v", err) + } + fs.putEntry(metaDir, dataManifestName, &filer_pb.Entry{ + Name: dataManifestName, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(dataManifestBuf.Len()), + }, + Content: dataManifestBuf.Bytes(), + }) + + deleteManifest := iceberg.NewManifestFile( + version, + path.Join("metadata", "detect-delete-manifest.avro"), + 0, + int32(spec.ID()), + currentSnap.SnapshotID, + ).Content(iceberg.ManifestContentDeletes). + SequenceNum(currentSnap.SequenceNumber, currentSnap.SequenceNumber). + DeletedFiles(1). + DeletedRows(1). + Build() + + var manifestListBuf bytes.Buffer + seqNum := currentSnap.SequenceNumber + if err := iceberg.WriteManifestList( + version, + &manifestListBuf, + currentSnap.SnapshotID, + currentSnap.ParentSnapshotID, + &seqNum, + 0, + []iceberg.ManifestFile{dataManifest, deleteManifest}, + ); err != nil { + t.Fatalf("write manifest list: %v", err) + } + fs.putEntry(metaDir, path.Base(currentSnap.ManifestList), &filer_pb.Entry{ + Name: path.Base(currentSnap.ManifestList), + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(manifestListBuf.Len()), + }, + Content: manifestListBuf.Bytes(), + }) + + handler := NewHandler(nil) + config := Config{ + SnapshotRetentionHours: 24 * 365, + MaxSnapshotsToKeep: 10, + TargetFileSizeBytes: 4096, + MinInputFiles: 2, + Operations: "compact", + } + + tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scanTablesForMaintenance failed: %v", err) + } + if len(tables) != 1 { + t.Fatalf("expected 1 compaction candidate with delete manifest present, got %d", len(tables)) + } +} + +func TestDetectSchedulesSnapshotExpiryDespiteCompactionEvaluationError(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now - 1, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + }, + } + populateTable(t, fs, setup) + + metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") + manifestListName := path.Base(setup.Snapshots[0].ManifestList) + fs.putEntry(metaDir, manifestListName, &filer_pb.Entry{ + Name: manifestListName, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(len("not-a-manifest-list")), + }, + Content: []byte("not-a-manifest-list"), + }) + + handler := NewHandler(nil) + config := Config{ + SnapshotRetentionHours: 0, + MaxSnapshotsToKeep: 10, + Operations: "compact,expire_snapshots", + } + + tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scanTablesForMaintenance failed: %v", err) + } + if len(tables) != 1 { + t.Fatalf("expected snapshot expiration candidate despite compaction evaluation error, got %d", len(tables)) + } +} + +func TestDetectSchedulesManifestRewriteWithoutSnapshotPressure(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + }, + } + meta := populateTable(t, fs, setup) + + manifestEntries := make([][]iceberg.ManifestEntry, 0, 5) + for i := 0; i < 5; i++ { + manifestEntries = append(manifestEntries, makeManifestEntries(t, []testEntrySpec{ + {path: fmt.Sprintf("data/rewrite-%d.parquet", i), size: 1024, partition: map[int]any{}}, + }, 1)) + } + writeCurrentSnapshotManifests(t, fs, setup, meta, manifestEntries) + + handler := NewHandler(nil) + config := Config{ + SnapshotRetentionHours: 24 * 365, + MaxSnapshotsToKeep: 10, + MinManifestsToRewrite: 5, + Operations: "rewrite_manifests", + } + + tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scanTablesForMaintenance failed: %v", err) + } + if len(tables) != 1 { + t.Fatalf("expected 1 manifest rewrite candidate, got %d", len(tables)) + } +} + +func TestDetectDoesNotScheduleManifestRewriteFromDeleteManifestsOnly(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + }, + } + meta := populateTable(t, fs, setup) + + currentSnap := meta.CurrentSnapshot() + if currentSnap == nil { + t.Fatal("current snapshot is required") + } + + metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") + version := meta.Version() + schema := meta.CurrentSchema() + spec := meta.PartitionSpec() + + dataEntries := makeManifestEntries(t, []testEntrySpec{ + {path: "data/rewrite-0.parquet", size: 1024, partition: map[int]any{}}, + }, currentSnap.SnapshotID) + + var dataManifestBuf bytes.Buffer + dataManifestName := "detect-rewrite-data.avro" + dataManifest, err := iceberg.WriteManifest( + path.Join("metadata", dataManifestName), + &dataManifestBuf, + version, + spec, + schema, + currentSnap.SnapshotID, + dataEntries, + ) + if err != nil { + t.Fatalf("write data manifest: %v", err) + } + fs.putEntry(metaDir, dataManifestName, &filer_pb.Entry{ + Name: dataManifestName, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(dataManifestBuf.Len()), + }, + Content: dataManifestBuf.Bytes(), + }) + + manifests := []iceberg.ManifestFile{dataManifest} + for i := 0; i < 4; i++ { + deleteManifest := iceberg.NewManifestFile( + version, + path.Join("metadata", fmt.Sprintf("detect-delete-%d.avro", i)), + 0, + int32(spec.ID()), + currentSnap.SnapshotID, + ).Content(iceberg.ManifestContentDeletes). + SequenceNum(currentSnap.SequenceNumber, currentSnap.SequenceNumber). + DeletedFiles(1). + DeletedRows(1). + Build() + manifests = append(manifests, deleteManifest) + } + + var manifestListBuf bytes.Buffer + seqNum := currentSnap.SequenceNumber + if err := iceberg.WriteManifestList( + version, + &manifestListBuf, + currentSnap.SnapshotID, + currentSnap.ParentSnapshotID, + &seqNum, + 0, + manifests, + ); err != nil { + t.Fatalf("write manifest list: %v", err) + } + fs.putEntry(metaDir, path.Base(currentSnap.ManifestList), &filer_pb.Entry{ + Name: path.Base(currentSnap.ManifestList), + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(manifestListBuf.Len()), + }, + Content: manifestListBuf.Bytes(), + }) + + handler := NewHandler(nil) + config := Config{ + SnapshotRetentionHours: 24 * 365, + MaxSnapshotsToKeep: 10, + MinManifestsToRewrite: 2, + Operations: "rewrite_manifests", + } + + tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scanTablesForMaintenance failed: %v", err) + } + if len(tables) != 0 { + t.Fatalf("expected no manifest rewrite candidate when only one data manifest exists, got %d", len(tables)) + } +} + +func TestDetectSchedulesOrphanCleanupWithoutSnapshotPressure(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + }, + } + populateTable(t, fs, setup) + + dataDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "data") + fs.putEntry(dataDir, "stale-orphan.parquet", &filer_pb.Entry{ + Name: "stale-orphan.parquet", + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Add(-200 * time.Hour).Unix(), + FileSize: 100, + }, + Content: []byte("orphan"), + }) + + handler := NewHandler(nil) + config := Config{ + SnapshotRetentionHours: 24 * 365, + MaxSnapshotsToKeep: 10, + OrphanOlderThanHours: 72, + Operations: "remove_orphans", + } + + tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scanTablesForMaintenance failed: %v", err) + } + if len(tables) != 1 { + t.Fatalf("expected 1 orphan cleanup candidate, got %d", len(tables)) + } +} + +func TestDetectSchedulesOrphanCleanupWithoutSnapshots(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + } + populateTable(t, fs, setup) + + dataDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "data") + fs.putEntry(dataDir, "stale-orphan.parquet", &filer_pb.Entry{ + Name: "stale-orphan.parquet", + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Add(-200 * time.Hour).Unix(), + FileSize: 100, + }, + Content: []byte("orphan"), + }) + + handler := NewHandler(nil) + config := Config{ + OrphanOlderThanHours: 72, + Operations: "remove_orphans", + } + + tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scanTablesForMaintenance failed: %v", err) + } + if len(tables) != 1 { + t.Fatalf("expected 1 orphan cleanup candidate without snapshots, got %d", len(tables)) + } +} + func TestStalePlanGuard(t *testing.T) { fs, client := startFakeFiler(t) diff --git a/weed/plugin/worker/iceberg/handler_test.go b/weed/plugin/worker/iceberg/handler_test.go index 83e9a3fa0..42a9c8b4c 100644 --- a/weed/plugin/worker/iceberg/handler_test.go +++ b/weed/plugin/worker/iceberg/handler_test.go @@ -505,13 +505,19 @@ func TestBuildCompactionBinsMultiplePartitions(t *testing.T) { partA := map[int]any{1: "us-east"} partB := map[int]any{1: "eu-west"} + partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 1, + FieldID: 1000, + Name: "region", + Transform: iceberg.IdentityTransform{}, + }) entries := makeTestEntries(t, []testEntrySpec{ - {path: "data/a1.parquet", size: 1024, partition: partA}, - {path: "data/a2.parquet", size: 2048, partition: partA}, - {path: "data/b1.parquet", size: 1024, partition: partB}, - {path: "data/b2.parquet", size: 2048, partition: partB}, - {path: "data/b3.parquet", size: 4096, partition: partB}, + {path: "data/a1.parquet", size: 1024, partition: partA, partitionSpec: &partitionSpec}, + {path: "data/a2.parquet", size: 2048, partition: partA, partitionSpec: &partitionSpec}, + {path: "data/b1.parquet", size: 1024, partition: partB, partitionSpec: &partitionSpec}, + {path: "data/b2.parquet", size: 2048, partition: partB, partitionSpec: &partitionSpec}, + {path: "data/b3.parquet", size: 4096, partition: partB, partitionSpec: &partitionSpec}, }) bins := buildCompactionBins(entries, targetSize, minFiles) @@ -574,40 +580,65 @@ func TestSplitOversizedBinDropsImpossibleRunts(t *testing.T) { } type testEntrySpec struct { - path string - size int64 - partition map[int]any - specID int32 // partition spec ID; 0 uses UnpartitionedSpec + path string + size int64 + partition map[int]any + partitionSpec *iceberg.PartitionSpec + specID int32 // partition spec ID; 0 uses UnpartitionedSpec } -// makeTestEntries creates manifest entries using UnpartitionedSpec (spec ID 0). -// The specID field in testEntrySpec is ignored here; for multi-spec testing, -// use makeTestEntriesWithSpec instead. -func makeTestEntries(t *testing.T, specs []testEntrySpec) []iceberg.ManifestEntry { +func buildTestDataFile(t *testing.T, spec testEntrySpec) iceberg.DataFile { + t.Helper() + + partitionSpec := iceberg.UnpartitionedSpec + if spec.partitionSpec != nil { + partitionSpec = spec.partitionSpec + } else if len(spec.partition) > 0 { + t.Fatalf("partition spec is required for partitioned test entry %s", spec.path) + } + dfBuilder, err := iceberg.NewDataFileBuilder( + *partitionSpec, + iceberg.EntryContentData, + spec.path, + iceberg.ParquetFile, + spec.partition, + nil, nil, + 1, // recordCount (must be > 0) + spec.size, + ) + if err != nil { + t.Fatalf("failed to build data file %s: %v", spec.path, err) + } + return dfBuilder.Build() +} + +func makeManifestEntriesWithSnapshot( + t *testing.T, + specs []testEntrySpec, + snapshotID int64, + status iceberg.ManifestEntryStatus, +) []iceberg.ManifestEntry { t.Helper() + entries := make([]iceberg.ManifestEntry, 0, len(specs)) for _, spec := range specs { - partSpec := *iceberg.UnpartitionedSpec - dfBuilder, err := iceberg.NewDataFileBuilder( - partSpec, - iceberg.EntryContentData, - spec.path, - iceberg.ParquetFile, - spec.partition, + entries = append(entries, iceberg.NewManifestEntry( + status, + &snapshotID, nil, nil, - 1, // recordCount (must be > 0) - spec.size, - ) - if err != nil { - t.Fatalf("failed to build data file %s: %v", spec.path, err) - } - snapID := int64(1) - entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build()) - entries = append(entries, entry) + buildTestDataFile(t, spec), + )) } return entries } +// makeTestEntries creates manifest entries using the default unpartitioned +// spec. For multi-spec testing, use makeTestEntriesWithSpec instead. +func makeTestEntries(t *testing.T, specs []testEntrySpec) []iceberg.ManifestEntry { + t.Helper() + return makeManifestEntriesWithSnapshot(t, specs, 1, iceberg.EntryStatusADDED) +} + // makeTestEntriesWithSpec creates manifest entries using specific partition specs. // Each spec in the specs slice can specify a specID; the entry is built using // a PartitionSpec with that ID. diff --git a/weed/plugin/worker/iceberg/operations.go b/weed/plugin/worker/iceberg/operations.go index d97dd9465..fa8b89af0 100644 --- a/weed/plugin/worker/iceberg/operations.go +++ b/weed/plugin/worker/iceberg/operations.go @@ -229,31 +229,54 @@ func (h *Handler) removeOrphans( return "", nil, fmt.Errorf("load metadata: %w", err) } - // Collect all referenced files from all snapshots + orphanCandidates, err := collectOrphanCandidates(ctx, filerClient, bucketName, tablePath, meta, metadataFileName, config.OrphanOlderThanHours) + if err != nil { + return "", nil, fmt.Errorf("collect orphan candidates: %w", err) + } + + orphanCount := 0 + for _, candidate := range orphanCandidates { + if delErr := deleteFilerFile(ctx, filerClient, candidate.Dir, candidate.Entry.Name); delErr != nil { + glog.Warningf("iceberg maintenance: failed to delete orphan %s/%s: %v", candidate.Dir, candidate.Entry.Name, delErr) + } else { + orphanCount++ + } + } + + metrics := map[string]int64{ + MetricOrphansRemoved: int64(orphanCount), + MetricDurationMs: time.Since(start).Milliseconds(), + } + return fmt.Sprintf("removed %d orphan file(s)", orphanCount), metrics, nil +} + +func collectOrphanCandidates( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + meta table.Metadata, + metadataFileName string, + orphanOlderThanHours int64, +) ([]filerFileEntry, error) { referencedFiles, err := collectSnapshotFiles(ctx, filerClient, bucketName, tablePath, meta.Snapshots()) if err != nil { - return "", nil, fmt.Errorf("collect referenced files: %w", err) + return nil, fmt.Errorf("collect referenced files: %w", err) } - // Reference the active metadata file so it is not treated as orphan referencedFiles[path.Join("metadata", metadataFileName)] = struct{}{} - - // Also reference the current metadata files for mle := range meta.PreviousFiles() { referencedFiles[mle.MetadataFile] = struct{}{} } - // Precompute a normalized lookup set so orphan checks are O(1) per file. normalizedRefs := make(map[string]struct{}, len(referencedFiles)) for ref := range referencedFiles { normalizedRefs[ref] = struct{}{} normalizedRefs[normalizeIcebergPath(ref, bucketName, tablePath)] = struct{}{} } - // List actual files on filer in metadata/ and data/ directories tableBasePath := path.Join(s3tables.TablesPath, bucketName, tablePath) - safetyThreshold := time.Now().Add(-time.Duration(config.OrphanOlderThanHours) * time.Hour) - orphanCount := 0 + safetyThreshold := time.Now().Add(-time.Duration(orphanOlderThanHours) * time.Hour) + var candidates []filerFileEntry for _, subdir := range []string{"metadata", "data"} { dirPath := path.Join(tableBasePath, subdir) @@ -265,39 +288,22 @@ func (h *Handler) removeOrphans( for _, fe := range fileEntries { entry := fe.Entry - // Build relative path from the table base (e.g. "data/region=us/file.parquet") fullPath := path.Join(fe.Dir, entry.Name) relPath := strings.TrimPrefix(fullPath, tableBasePath+"/") - - _, isReferenced := normalizedRefs[relPath] - - if isReferenced { + if _, isReferenced := normalizedRefs[relPath]; isReferenced { continue } - - // Check safety window — skip entries with unknown age if entry.Attributes == nil { continue } - mtime := time.Unix(entry.Attributes.Mtime, 0) - if mtime.After(safetyThreshold) { + if time.Unix(entry.Attributes.Mtime, 0).After(safetyThreshold) { continue } - - // Delete orphan - if delErr := deleteFilerFile(ctx, filerClient, fe.Dir, entry.Name); delErr != nil { - glog.Warningf("iceberg maintenance: failed to delete orphan %s/%s: %v", fe.Dir, entry.Name, delErr) - } else { - orphanCount++ - } + candidates = append(candidates, fe) } } - metrics := map[string]int64{ - MetricOrphansRemoved: int64(orphanCount), - MetricDurationMs: time.Since(start).Milliseconds(), - } - return fmt.Sprintf("removed %d orphan file(s)", orphanCount), metrics, nil + return candidates, nil } // ---------------------------------------------------------------------------