From e5c088947302a75ef42f9ee25fb340407991c5a3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 21:11:09 -0700 Subject: [PATCH] iceberg: add delete file rewrite maintenance (#8664) * iceberg: add delete file rewrite maintenance * iceberg: preserve untouched delete files during rewrites * iceberg: share detection threshold defaults * iceberg: add partition-scoped maintenance filters (#8665) * iceberg: add partition-scoped maintenance filters * iceberg: tighten where-filter partition matching --- weed/plugin/worker/iceberg/compact.go | 59 +- weed/plugin/worker/iceberg/config.go | 139 ++-- weed/plugin/worker/iceberg/delete_rewrite.go | 595 ++++++++++++++++++ weed/plugin/worker/iceberg/detection.go | 126 +++- weed/plugin/worker/iceberg/exec_test.go | 512 ++++++++++++++- weed/plugin/worker/iceberg/handler.go | 144 ++++- weed/plugin/worker/iceberg/handler_test.go | 35 +- weed/plugin/worker/iceberg/operations.go | 47 +- weed/plugin/worker/iceberg/planning_index.go | 2 +- weed/plugin/worker/iceberg/where_filter.go | 311 +++++++++ .../worker/iceberg/where_filter_test.go | 287 +++++++++ 11 files changed, 2140 insertions(+), 117 deletions(-) create mode 100644 weed/plugin/worker/iceberg/delete_rewrite.go create mode 100644 weed/plugin/worker/iceberg/where_filter.go create mode 100644 weed/plugin/worker/iceberg/where_filter_test.go diff --git a/weed/plugin/worker/iceberg/compact.go b/weed/plugin/worker/iceberg/compact.go index cf1bc440b..a07bce9cc 100644 --- a/weed/plugin/worker/iceberg/compact.go +++ b/weed/plugin/worker/iceberg/compact.go @@ -48,6 +48,10 @@ func (h *Handler) compactDataFiles( if err != nil { return "", nil, fmt.Errorf("load metadata: %w", err) } + predicate, err := parsePartitionPredicate(config.Where, meta) + if err != nil { + return "", nil, err + } currentSnap := meta.CurrentSnapshot() if currentSnap == nil || currentSnap.ManifestList == "" { @@ -95,6 +99,8 @@ func (h *Handler) compactDataFiles( allEntries = append(allEntries, entries...) } + specsByID := specByID(meta) + // Collect delete entries if we need to apply deletes var positionDeletes map[string][]int64 var eqDeleteGroups []equalityDeleteGroup @@ -112,9 +118,23 @@ func (h *Handler) compactDataFiles( allDeleteEntries = append(allDeleteEntries, entries...) } - // Separate position and equality deletes + // Separate position and equality deletes, filtering by partition + // predicate so out-of-scope deletes don't affect the merge. var posDeleteEntries, eqDeleteEntries []iceberg.ManifestEntry for _, entry := range allDeleteEntries { + if predicate != nil { + spec, ok := specsByID[int(entry.DataFile().SpecID())] + if !ok { + continue + } + match, err := predicate.Matches(spec, entry.DataFile().Partition()) + if err != nil { + return "", nil, err + } + if !match { + continue + } + } switch entry.DataFile().ContentType() { case iceberg.EntryContentPosDeletes: posDeleteEntries = append(posDeleteEntries, entry) @@ -138,18 +158,37 @@ func (h *Handler) compactDataFiles( } } - // Build compaction bins: group small files by partition - // MinInputFiles is clamped by ParseConfig to [2, ...] so int conversion is safe. - bins := buildCompactionBins(allEntries, config.TargetFileSizeBytes, int(config.MinInputFiles)) + candidateEntries := allEntries + if predicate != nil { + candidateEntries = make([]iceberg.ManifestEntry, 0, len(allEntries)) + for _, entry := range allEntries { + spec, ok := specsByID[int(entry.DataFile().SpecID())] + if !ok { + continue + } + match, err := predicate.Matches(spec, entry.DataFile().Partition()) + if err != nil { + return "", nil, err + } + if match { + candidateEntries = append(candidateEntries, entry) + } + } + } + + minInputFiles, err := compactionMinInputFiles(config.MinInputFiles) + if err != nil { + return "", nil, err + } + + // Build compaction bins: group small data files by partition. + bins := buildCompactionBins(candidateEntries, config.TargetFileSizeBytes, minInputFiles) if len(bins) == 0 { return "no files eligible for compaction", nil, nil } // Build a lookup from spec ID to PartitionSpec for per-bin manifest writing. - specByID := make(map[int]iceberg.PartitionSpec) - for _, ps := range meta.PartitionSpecs() { - specByID[ps.ID()] = ps - } + specLookup := specsByID schema := meta.CurrentSchema() version := meta.Version() @@ -233,7 +272,7 @@ func (h *Handler) compactDataFiles( // Use the partition spec matching this bin's spec ID { - binSpec, ok := specByID[int(bin.SpecID)] + binSpec, ok := specLookup[int(bin.SpecID)] if !ok { glog.Warningf("iceberg compact: spec %d not found for bin %d, skipping", bin.SpecID, binIdx) _ = deleteFilerFile(ctx, filerClient, dataDir, mergedFileName) @@ -347,7 +386,7 @@ func (h *Handler) compactDataFiles( var allManifests []iceberg.ManifestFile for _, sid := range sortedSpecIDs { se := specEntriesMap[sid] - ps, ok := specByID[int(se.specID)] + ps, ok := specLookup[int(se.specID)] if !ok { return "", nil, fmt.Errorf("partition spec %d not found in table metadata", se.specID) } diff --git a/weed/plugin/worker/iceberg/config.go b/weed/plugin/worker/iceberg/config.go index 8da2ce268..e9c9996d0 100644 --- a/weed/plugin/worker/iceberg/config.go +++ b/weed/plugin/worker/iceberg/config.go @@ -18,90 +18,142 @@ const ( defaultMaxCommitRetries = 5 defaultTargetFileSizeMB = 256 defaultMinInputFiles = 5 + defaultDeleteTargetFileSizeMB = 64 + defaultDeleteMinInputFiles = 2 + defaultDeleteMaxGroupSizeMB = 256 + defaultDeleteMaxOutputFiles = 8 defaultMinManifestsToRewrite = 5 minManifestsToRewrite = 2 defaultOperations = "all" // Metric keys returned by maintenance operations. - MetricFilesMerged = "files_merged" - MetricFilesWritten = "files_written" - MetricBins = "bins" - MetricSnapshotsExpired = "snapshots_expired" - MetricFilesDeleted = "files_deleted" - MetricOrphansRemoved = "orphans_removed" - MetricManifestsRewritten = "manifests_rewritten" - MetricEntriesTotal = "entries_total" - MetricDurationMs = "duration_ms" + MetricFilesMerged = "files_merged" + MetricFilesWritten = "files_written" + MetricBins = "bins" + MetricSnapshotsExpired = "snapshots_expired" + MetricFilesDeleted = "files_deleted" + MetricOrphansRemoved = "orphans_removed" + MetricManifestsRewritten = "manifests_rewritten" + MetricDeleteFilesRewritten = "delete_files_rewritten" + MetricDeleteFilesWritten = "delete_files_written" + MetricDeleteBytesRewritten = "delete_bytes_rewritten" + MetricDeleteGroupsPlanned = "delete_groups_planned" + MetricDeleteGroupsSkipped = "delete_groups_skipped" + MetricEntriesTotal = "entries_total" + MetricDurationMs = "duration_ms" ) // Config holds parsed worker config values. type Config struct { - SnapshotRetentionHours int64 - MaxSnapshotsToKeep int64 - OrphanOlderThanHours int64 - MaxCommitRetries int64 - TargetFileSizeBytes int64 - MinInputFiles int64 - MinManifestsToRewrite int64 - Operations string - ApplyDeletes bool + SnapshotRetentionHours int64 + MaxSnapshotsToKeep int64 + OrphanOlderThanHours int64 + MaxCommitRetries int64 + TargetFileSizeBytes int64 + MinInputFiles int64 + DeleteTargetFileSizeBytes int64 + DeleteMinInputFiles int64 + DeleteMaxFileGroupSizeBytes int64 + DeleteMaxOutputFiles int64 + MinManifestsToRewrite int64 + Operations string + ApplyDeletes bool + Where string + RewriteStrategy string + SortFields string + SortMaxInputBytes int64 } // ParseConfig extracts an iceberg maintenance Config from plugin config values. // Values are clamped to safe minimums to prevent misconfiguration. func ParseConfig(values map[string]*plugin_pb.ConfigValue) Config { cfg := Config{ - SnapshotRetentionHours: readInt64Config(values, "snapshot_retention_hours", defaultSnapshotRetentionHours), - MaxSnapshotsToKeep: readInt64Config(values, "max_snapshots_to_keep", defaultMaxSnapshotsToKeep), - OrphanOlderThanHours: readInt64Config(values, "orphan_older_than_hours", defaultOrphanOlderThanHours), - MaxCommitRetries: readInt64Config(values, "max_commit_retries", defaultMaxCommitRetries), - TargetFileSizeBytes: readInt64Config(values, "target_file_size_mb", defaultTargetFileSizeMB) * 1024 * 1024, - MinInputFiles: readInt64Config(values, "min_input_files", defaultMinInputFiles), - MinManifestsToRewrite: readInt64Config(values, "min_manifests_to_rewrite", defaultMinManifestsToRewrite), - Operations: readStringConfig(values, "operations", defaultOperations), - ApplyDeletes: readBoolConfig(values, "apply_deletes", true), - } - - // Clamp to safe minimums using the default constants + SnapshotRetentionHours: readInt64Config(values, "snapshot_retention_hours", defaultSnapshotRetentionHours), + MaxSnapshotsToKeep: readInt64Config(values, "max_snapshots_to_keep", defaultMaxSnapshotsToKeep), + OrphanOlderThanHours: readInt64Config(values, "orphan_older_than_hours", defaultOrphanOlderThanHours), + MaxCommitRetries: readInt64Config(values, "max_commit_retries", defaultMaxCommitRetries), + TargetFileSizeBytes: readInt64Config(values, "target_file_size_mb", defaultTargetFileSizeMB) * 1024 * 1024, + MinInputFiles: readInt64Config(values, "min_input_files", defaultMinInputFiles), + DeleteTargetFileSizeBytes: readInt64Config(values, "delete_target_file_size_mb", defaultDeleteTargetFileSizeMB) * 1024 * 1024, + DeleteMinInputFiles: readInt64Config(values, "delete_min_input_files", defaultDeleteMinInputFiles), + DeleteMaxFileGroupSizeBytes: readInt64Config(values, "delete_max_file_group_size_mb", defaultDeleteMaxGroupSizeMB) * 1024 * 1024, + DeleteMaxOutputFiles: readInt64Config(values, "delete_max_output_files", defaultDeleteMaxOutputFiles), + MinManifestsToRewrite: readInt64Config(values, "min_manifests_to_rewrite", defaultMinManifestsToRewrite), + Operations: readStringConfig(values, "operations", defaultOperations), + ApplyDeletes: readBoolConfig(values, "apply_deletes", true), + Where: strings.TrimSpace(readStringConfig(values, "where", "")), + RewriteStrategy: strings.TrimSpace(strings.ToLower(readStringConfig(values, "rewrite_strategy", "binpack"))), + SortFields: strings.TrimSpace(readStringConfig(values, "sort_fields", "")), + SortMaxInputBytes: readInt64Config(values, "sort_max_input_mb", 0) * 1024 * 1024, + } + + // Clamp the fields that are always defaulted by worker config parsing. if cfg.SnapshotRetentionHours <= 0 { cfg.SnapshotRetentionHours = defaultSnapshotRetentionHours } if cfg.MaxSnapshotsToKeep <= 0 { cfg.MaxSnapshotsToKeep = defaultMaxSnapshotsToKeep } - if cfg.OrphanOlderThanHours <= 0 { - cfg.OrphanOlderThanHours = defaultOrphanOlderThanHours - } if cfg.MaxCommitRetries <= 0 { cfg.MaxCommitRetries = defaultMaxCommitRetries } + cfg = applyThresholdDefaults(cfg) + if cfg.RewriteStrategy == "" { + cfg.RewriteStrategy = "binpack" + } + if cfg.RewriteStrategy != "binpack" && cfg.RewriteStrategy != "sort" { + cfg.RewriteStrategy = "binpack" + } + if cfg.SortMaxInputBytes < 0 { + cfg.SortMaxInputBytes = 0 + } + return cfg +} + +func applyThresholdDefaults(cfg Config) Config { + if cfg.OrphanOlderThanHours <= 0 { + cfg.OrphanOlderThanHours = defaultOrphanOlderThanHours + } if cfg.TargetFileSizeBytes <= 0 { cfg.TargetFileSizeBytes = defaultTargetFileSizeMB * 1024 * 1024 } if cfg.MinInputFiles < 2 { cfg.MinInputFiles = defaultMinInputFiles } + if cfg.DeleteTargetFileSizeBytes <= 0 { + cfg.DeleteTargetFileSizeBytes = defaultDeleteTargetFileSizeMB * 1024 * 1024 + } + if cfg.DeleteMinInputFiles < 2 { + cfg.DeleteMinInputFiles = defaultDeleteMinInputFiles + } + if cfg.DeleteMaxFileGroupSizeBytes <= 0 { + cfg.DeleteMaxFileGroupSizeBytes = defaultDeleteMaxGroupSizeMB * 1024 * 1024 + } + if cfg.DeleteMaxOutputFiles <= 0 { + cfg.DeleteMaxOutputFiles = defaultDeleteMaxOutputFiles + } if cfg.MinManifestsToRewrite < minManifestsToRewrite { cfg.MinManifestsToRewrite = minManifestsToRewrite } - return cfg } // parseOperations returns the ordered list of maintenance operations to execute. -// Order follows Iceberg best practices: compact → expire_snapshots → remove_orphans → rewrite_manifests. +// Order follows Iceberg best practices: compact → rewrite_position_delete_files +// → expire_snapshots → remove_orphans → rewrite_manifests. // Returns an error if any unknown operation is specified or the result would be empty. func parseOperations(ops string) ([]string, error) { ops = strings.TrimSpace(strings.ToLower(ops)) if ops == "" || ops == "all" { - return []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, nil + return []string{"compact", "rewrite_position_delete_files", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, nil } validOps := map[string]struct{}{ - "compact": {}, - "expire_snapshots": {}, - "remove_orphans": {}, - "rewrite_manifests": {}, + "compact": {}, + "rewrite_position_delete_files": {}, + "expire_snapshots": {}, + "remove_orphans": {}, + "rewrite_manifests": {}, } requested := make(map[string]struct{}) @@ -111,13 +163,14 @@ func parseOperations(ops string) ([]string, error) { continue } if _, ok := validOps[op]; !ok { - return nil, fmt.Errorf("unknown maintenance operation %q (valid: compact, expire_snapshots, remove_orphans, rewrite_manifests)", op) + return nil, fmt.Errorf("unknown maintenance operation %q (valid: compact, rewrite_position_delete_files, expire_snapshots, remove_orphans, rewrite_manifests)", op) } requested[op] = struct{}{} } - // Return in canonical order: compact → expire_snapshots → remove_orphans → rewrite_manifests - canonicalOrder := []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"} + // Return in canonical order: compact → rewrite_position_delete_files → + // expire_snapshots → remove_orphans → rewrite_manifests + canonicalOrder := []string{"compact", "rewrite_position_delete_files", "expire_snapshots", "remove_orphans", "rewrite_manifests"} var result []string for _, op := range canonicalOrder { if _, ok := requested[op]; ok { diff --git a/weed/plugin/worker/iceberg/delete_rewrite.go b/weed/plugin/worker/iceberg/delete_rewrite.go new file mode 100644 index 000000000..8e0c4f167 --- /dev/null +++ b/weed/plugin/worker/iceberg/delete_rewrite.go @@ -0,0 +1,595 @@ +package iceberg + +import ( + "bytes" + "context" + "fmt" + "math" + "path" + "sort" + "time" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/parquet-go/parquet-go" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" +) + +type deleteRewriteInput struct { + Entry iceberg.ManifestEntry + ReferencedPath string + Positions []int64 +} + +type deleteRewriteGroup struct { + SpecID int32 + Partition map[int]any + PartitionKey string + ReferencedPath string + Inputs []deleteRewriteInput + TotalSize int64 +} + +type positionDeleteRow struct { + FilePath string `parquet:"file_path"` + Pos int64 `parquet:"pos"` +} + +func hasEligibleDeleteRewrite( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + manifests []iceberg.ManifestFile, + config Config, + meta table.Metadata, + predicate *partitionPredicate, +) (bool, error) { + groups, _, err := collectDeleteRewriteGroups(ctx, filerClient, bucketName, tablePath, manifests) + if err != nil { + return false, err + } + for _, group := range groups { + if predicate != nil { + spec, ok := specByID(meta)[int(group.SpecID)] + if !ok { + continue + } + match, err := predicate.Matches(spec, group.Partition) + if err != nil { + return false, err + } + if !match { + continue + } + } + if groupEligibleForRewrite(group, config) { + return true, nil + } + } + return false, nil +} + +func collectDeleteRewriteGroups( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + manifests []iceberg.ManifestFile, +) (map[string]*deleteRewriteGroup, []iceberg.ManifestEntry, error) { + groups := make(map[string]*deleteRewriteGroup) + var allPositionEntries []iceberg.ManifestEntry + + for _, mf := range manifests { + if mf.ManifestContent() != iceberg.ManifestContentDeletes { + continue + } + + manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath()) + if err != nil { + return nil, nil, fmt.Errorf("read delete manifest %s: %w", mf.FilePath(), err) + } + entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) + if err != nil { + return nil, nil, fmt.Errorf("parse delete manifest %s: %w", mf.FilePath(), err) + } + + for _, entry := range entries { + if entry.DataFile().ContentType() != iceberg.EntryContentPosDeletes { + continue + } + + allPositionEntries = append(allPositionEntries, entry) + + fileDeletes, err := readPositionDeleteFile(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath()) + if err != nil { + return nil, nil, fmt.Errorf("read position delete file %s: %w", entry.DataFile().FilePath(), err) + } + if len(fileDeletes) != 1 { + // Phase 1 only rewrites files that target a single data file. + continue + } + + var referencedPath string + var positions []int64 + for fp, pos := range fileDeletes { + referencedPath = normalizeIcebergPath(fp, bucketName, tablePath) + positions = append(positions, pos...) + } + sort.Slice(positions, func(i, j int) bool { return positions[i] < positions[j] }) + + partKey := partitionKey(entry.DataFile().Partition()) + groupKey := fmt.Sprintf("spec%d\x00%s\x00%s", entry.DataFile().SpecID(), partKey, referencedPath) + group, ok := groups[groupKey] + if !ok { + group = &deleteRewriteGroup{ + SpecID: entry.DataFile().SpecID(), + Partition: entry.DataFile().Partition(), + PartitionKey: partKey, + ReferencedPath: referencedPath, + } + groups[groupKey] = group + } + group.Inputs = append(group.Inputs, deleteRewriteInput{ + Entry: entry, + ReferencedPath: referencedPath, + Positions: positions, + }) + group.TotalSize += entry.DataFile().FileSizeBytes() + } + } + + return groups, allPositionEntries, nil +} + +func groupEligibleForRewrite(group *deleteRewriteGroup, config Config) bool { + if group == nil { + return false + } + if len(group.Inputs) < 2 { + return false + } + if group.TotalSize > config.DeleteMaxFileGroupSizeBytes { + return false + } + target := config.DeleteTargetFileSizeBytes + if target <= 0 { + target = defaultDeleteTargetFileSizeMB * 1024 * 1024 + } + outputFiles := int64(estimatedDeleteOutputFiles(group.TotalSize, target)) + if config.DeleteMaxOutputFiles > 0 && outputFiles > config.DeleteMaxOutputFiles { + return false + } + return int64(len(group.Inputs)) >= config.DeleteMinInputFiles +} + +func estimatedDeleteOutputFiles(totalSize, targetSize int64) int { + if totalSize <= 0 || targetSize <= 0 { + return 1 + } + count := int(math.Ceil(float64(totalSize) / float64(targetSize))) + if count < 1 { + return 1 + } + return count +} + +func manifestEntrySeqNum(entry iceberg.ManifestEntry) *int64 { + seqNum := entry.SequenceNum() + if seqNum < 0 { + return nil + } + return &seqNum +} + +func manifestEntryFileSeqNum(entry iceberg.ManifestEntry) *int64 { + if fileSeqNum := entry.FileSequenceNum(); fileSeqNum != nil { + value := *fileSeqNum + return &value + } + return manifestEntrySeqNum(entry) +} + +func writeManifestWithContent( + filename string, + version int, + spec iceberg.PartitionSpec, + schema *iceberg.Schema, + snapshotID int64, + entries []iceberg.ManifestEntry, + content iceberg.ManifestContent, +) (iceberg.ManifestFile, []byte, error) { + var manifestBuf bytes.Buffer + mf, err := iceberg.WriteManifest(filename, &manifestBuf, version, spec, schema, snapshotID, entries) + if err != nil { + return nil, nil, err + } + + manifestBytes := manifestBuf.Bytes() + if content == iceberg.ManifestContentDeletes { + manifestBytes, err = patchManifestContentBytesToDeletes(manifestBytes) + if err != nil { + return nil, nil, err + } + } + + rebuilt := iceberg.NewManifestFile(version, filename, int64(len(manifestBytes)), int32(spec.ID()), snapshotID). + Content(content). + AddedFiles(mf.AddedDataFiles()). + ExistingFiles(mf.ExistingDataFiles()). + DeletedFiles(mf.DeletedDataFiles()). + AddedRows(mf.AddedRows()). + ExistingRows(mf.ExistingRows()). + DeletedRows(mf.DeletedRows()). + Partitions(mf.Partitions()). + Build() + return rebuilt, manifestBytes, nil +} + +func patchManifestContentBytesToDeletes(manifestBytes []byte) ([]byte, error) { + old := append([]byte{0x0e}, []byte("content")...) + old = append(old, 0x08) + old = append(old, []byte("data")...) + + new := append([]byte{0x0e}, []byte("content")...) + new = append(new, 0x0e) + new = append(new, []byte("deletes")...) + + result := bytes.Replace(manifestBytes, old, new, 1) + if bytes.Equal(result, manifestBytes) { + return nil, fmt.Errorf("delete manifest content patch failed") + } + return result, nil +} + +func writePositionDeleteFile(rows []positionDeleteRow) ([]byte, error) { + var buf bytes.Buffer + writer := parquet.NewWriter(&buf, parquet.SchemaOf(new(positionDeleteRow))) + for _, row := range rows { + if err := writer.Write(&row); err != nil { + return nil, fmt.Errorf("write position delete row: %w", err) + } + } + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("close position delete file: %w", err) + } + return buf.Bytes(), nil +} + +func (h *Handler) rewritePositionDeleteFiles( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + config Config, +) (string, map[string]int64, error) { + start := time.Now() + meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath) + if err != nil { + return "", nil, fmt.Errorf("load metadata: %w", err) + } + + currentSnap := meta.CurrentSnapshot() + if currentSnap == nil || currentSnap.ManifestList == "" { + return "no current snapshot", nil, nil + } + + manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList) + if err != nil { + return "", nil, fmt.Errorf("read manifest list: %w", err) + } + manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData)) + if err != nil { + return "", nil, fmt.Errorf("parse manifest list: %w", err) + } + + var dataManifests []iceberg.ManifestFile + var allEqualityEntries []iceberg.ManifestEntry + for _, mf := range manifests { + switch mf.ManifestContent() { + case iceberg.ManifestContentData: + dataManifests = append(dataManifests, mf) + case iceberg.ManifestContentDeletes: + manifestData, readErr := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath()) + if readErr != nil { + return "", nil, fmt.Errorf("read delete manifest %s: %w", mf.FilePath(), readErr) + } + entries, parseErr := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) + if parseErr != nil { + return "", nil, fmt.Errorf("parse delete manifest %s: %w", mf.FilePath(), parseErr) + } + for _, entry := range entries { + if entry.DataFile().ContentType() == iceberg.EntryContentEqDeletes { + allEqualityEntries = append(allEqualityEntries, entry) + } + } + } + } + + groupMap, allPositionEntries, err := collectDeleteRewriteGroups(ctx, filerClient, bucketName, tablePath, manifests) + if err != nil { + return "", nil, err + } + if len(groupMap) == 0 { + return "no position delete files eligible for rewrite", nil, nil + } + + type artifact struct { + dir, fileName string + } + var writtenArtifacts []artifact + committed := false + defer func() { + if committed || len(writtenArtifacts) == 0 { + return + } + cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + for _, a := range writtenArtifacts { + if err := deleteFilerFile(cleanupCtx, filerClient, a.dir, a.fileName); err != nil { + glog.Warningf("iceberg delete rewrite: failed to clean up artifact %s/%s: %v", a.dir, a.fileName, err) + } + } + }() + + specByID := specByID(meta) + predicate, err := parsePartitionPredicate(config.Where, meta) + if err != nil { + return "", nil, err + } + + type specEntries struct { + specID int32 + entries []iceberg.ManifestEntry + } + specEntriesMap := make(map[int32]*specEntries) + addToSpec := func(specID int32, entry iceberg.ManifestEntry) { + se, ok := specEntriesMap[specID] + if !ok { + se = &specEntries{specID: specID} + specEntriesMap[specID] = se + } + se.entries = append(se.entries, entry) + } + + newSnapID := time.Now().UnixMilli() + version := meta.Version() + snapshotID := currentSnap.SnapshotID + seqNum := currentSnap.SequenceNumber + 1 + metaDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "metadata") + dataDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "data") + artifactSuffix := compactRandomSuffix() + + replacedPaths := make(map[string]struct{}) + var rewrittenGroups int64 + var skippedGroups int64 + var deleteFilesRewritten int64 + var deleteFilesWritten int64 + var deleteBytesRewritten int64 + + sortedKeys := make([]string, 0, len(groupMap)) + for key := range groupMap { + sortedKeys = append(sortedKeys, key) + } + sort.Strings(sortedKeys) + + for _, key := range sortedKeys { + group := groupMap[key] + if predicate != nil { + spec, ok := specByID[int(group.SpecID)] + if !ok { + continue + } + match, err := predicate.Matches(spec, group.Partition) + if err != nil { + return "", nil, err + } + if !match { + skippedGroups++ + continue + } + } + if !groupEligibleForRewrite(group, config) { + skippedGroups++ + continue + } + rows := make([]positionDeleteRow, 0) + for _, input := range group.Inputs { + for _, pos := range input.Positions { + rows = append(rows, positionDeleteRow{FilePath: input.ReferencedPath, Pos: pos}) + } + replacedPaths[input.Entry.DataFile().FilePath()] = struct{}{} + deleteFilesRewritten++ + deleteBytesRewritten += input.Entry.DataFile().FileSizeBytes() + } + sort.Slice(rows, func(i, j int) bool { + if rows[i].FilePath != rows[j].FilePath { + return rows[i].FilePath < rows[j].FilePath + } + return rows[i].Pos < rows[j].Pos + }) + + outputFiles := estimatedDeleteOutputFiles(group.TotalSize, config.DeleteTargetFileSizeBytes) + rowsPerFile := (len(rows) + outputFiles - 1) / outputFiles + if rowsPerFile < 1 { + rowsPerFile = len(rows) + } + + for startIdx, fileIdx := 0, 0; startIdx < len(rows); startIdx, fileIdx = startIdx+rowsPerFile, fileIdx+1 { + endIdx := startIdx + rowsPerFile + if endIdx > len(rows) { + endIdx = len(rows) + } + outputRows := rows[startIdx:endIdx] + deleteBytes, err := writePositionDeleteFile(outputRows) + if err != nil { + return "", nil, err + } + fileName := fmt.Sprintf("rewrite-delete-%d-%s-%d.parquet", newSnapID, artifactSuffix, deleteFilesWritten) + if err := ensureFilerDir(ctx, filerClient, dataDir); err != nil { + return "", nil, fmt.Errorf("ensure data dir: %w", err) + } + if err := saveFilerFile(ctx, filerClient, dataDir, fileName, deleteBytes); err != nil { + return "", nil, fmt.Errorf("save rewritten delete file: %w", err) + } + writtenArtifacts = append(writtenArtifacts, artifact{dir: dataDir, fileName: fileName}) + + spec, ok := specByID[int(group.SpecID)] + if !ok { + return "", nil, fmt.Errorf("partition spec %d not found", group.SpecID) + } + dfBuilder, err := iceberg.NewDataFileBuilder( + spec, + iceberg.EntryContentPosDeletes, + path.Join("data", fileName), + iceberg.ParquetFile, + group.Partition, + nil, nil, + int64(len(outputRows)), + int64(len(deleteBytes)), + ) + if err != nil { + return "", nil, fmt.Errorf("build rewritten delete file: %w", err) + } + entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &newSnapID, nil, nil, dfBuilder.Build()) + addToSpec(group.SpecID, entry) + deleteFilesWritten++ + } + + for _, input := range group.Inputs { + delEntry := iceberg.NewManifestEntry( + iceberg.EntryStatusDELETED, + &newSnapID, + manifestEntrySeqNum(input.Entry), + manifestEntryFileSeqNum(input.Entry), + input.Entry.DataFile(), + ) + addToSpec(group.SpecID, delEntry) + } + rewrittenGroups++ + } + + if rewrittenGroups == 0 { + return "no position delete files eligible for rewrite", nil, nil + } + + for _, entry := range allEqualityEntries { + existingEntry := iceberg.NewManifestEntry( + iceberg.EntryStatusEXISTING, + func() *int64 { id := entry.SnapshotID(); return &id }(), + manifestEntrySeqNum(entry), + manifestEntryFileSeqNum(entry), + entry.DataFile(), + ) + addToSpec(entry.DataFile().SpecID(), existingEntry) + } + + for _, entry := range allPositionEntries { + if _, replaced := replacedPaths[entry.DataFile().FilePath()]; replaced { + continue + } + existingEntry := iceberg.NewManifestEntry( + iceberg.EntryStatusEXISTING, + func() *int64 { id := entry.SnapshotID(); return &id }(), + manifestEntrySeqNum(entry), + manifestEntryFileSeqNum(entry), + entry.DataFile(), + ) + addToSpec(entry.DataFile().SpecID(), existingEntry) + } + + sortedSpecIDs := make([]int32, 0, len(specEntriesMap)) + for specID := range specEntriesMap { + sortedSpecIDs = append(sortedSpecIDs, specID) + } + sort.Slice(sortedSpecIDs, func(i, j int) bool { return sortedSpecIDs[i] < sortedSpecIDs[j] }) + + allManifests := make([]iceberg.ManifestFile, 0, len(dataManifests)+len(sortedSpecIDs)) + allManifests = append(allManifests, dataManifests...) + + for _, specID := range sortedSpecIDs { + spec, ok := specByID[int(specID)] + if !ok { + return "", nil, fmt.Errorf("partition spec %d not found", specID) + } + manifestName := fmt.Sprintf("rewrite-delete-%d-%s-spec%d.avro", newSnapID, artifactSuffix, specID) + manifestPath := path.Join("metadata", manifestName) + mf, manifestBytes, err := writeManifestWithContent( + manifestPath, + version, + spec, + meta.CurrentSchema(), + newSnapID, + specEntriesMap[specID].entries, + iceberg.ManifestContentDeletes, + ) + if err != nil { + return "", nil, fmt.Errorf("write delete manifest for spec %d: %w", specID, err) + } + if err := saveFilerFile(ctx, filerClient, metaDir, manifestName, manifestBytes); err != nil { + return "", nil, fmt.Errorf("save delete manifest for spec %d: %w", specID, err) + } + writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestName}) + allManifests = append(allManifests, mf) + } + + var manifestListBuf bytes.Buffer + if err := iceberg.WriteManifestList(version, &manifestListBuf, newSnapID, &snapshotID, &seqNum, 0, allManifests); err != nil { + return "", nil, fmt.Errorf("write delete manifest list: %w", err) + } + manifestListName := fmt.Sprintf("snap-%d-%s.avro", newSnapID, artifactSuffix) + if err := saveFilerFile(ctx, filerClient, metaDir, manifestListName, manifestListBuf.Bytes()); err != nil { + return "", nil, fmt.Errorf("save delete manifest list: %w", err) + } + writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestListName}) + + manifestListLocation := path.Join("metadata", manifestListName) + err = h.commitWithRetry(ctx, filerClient, bucketName, tablePath, metadataFileName, config, func(currentMeta table.Metadata, builder *table.MetadataBuilder) error { + cs := currentMeta.CurrentSnapshot() + if cs == nil || cs.SnapshotID != snapshotID { + return errStalePlan + } + newSnapshot := &table.Snapshot{ + SnapshotID: newSnapID, + ParentSnapshotID: &snapshotID, + SequenceNumber: seqNum, + TimestampMs: newSnapID, + ManifestList: manifestListLocation, + Summary: &table.Summary{ + Operation: table.OpReplace, + Properties: map[string]string{ + "maintenance": "rewrite_position_delete_files", + "delete-files-rewritten": fmt.Sprintf("%d", deleteFilesRewritten), + "delete-files-written": fmt.Sprintf("%d", deleteFilesWritten), + "delete-groups": fmt.Sprintf("%d", rewrittenGroups), + }, + }, + SchemaID: func() *int { + id := meta.CurrentSchema().ID + return &id + }(), + } + if err := builder.AddSnapshot(newSnapshot); err != nil { + return err + } + return builder.SetSnapshotRef(table.MainBranch, newSnapID, table.BranchRef) + }) + if err != nil { + return "", nil, fmt.Errorf("commit delete rewrite: %w", err) + } + + committed = true + metrics := map[string]int64{ + MetricDeleteFilesRewritten: deleteFilesRewritten, + MetricDeleteFilesWritten: deleteFilesWritten, + MetricDeleteBytesRewritten: deleteBytesRewritten, + MetricDeleteGroupsPlanned: rewrittenGroups, + MetricDeleteGroupsSkipped: skippedGroups, + MetricDurationMs: time.Since(start).Milliseconds(), + } + return fmt.Sprintf( + "rewrote %d position delete files into %d across %d group(s)", + deleteFilesRewritten, + deleteFilesWritten, + rewrittenGroups, + ), metrics, nil +} diff --git a/weed/plugin/worker/iceberg/detection.go b/weed/plugin/worker/iceberg/detection.go index 0ffff0c61..5892cba3c 100644 --- a/weed/plugin/worker/iceberg/detection.go +++ b/weed/plugin/worker/iceberg/detection.go @@ -153,20 +153,14 @@ func (h *Handler) scanTablesForMaintenance( } func normalizeDetectionConfig(config Config) Config { - normalized := config - if normalized.TargetFileSizeBytes <= 0 { - normalized.TargetFileSizeBytes = defaultTargetFileSizeMB * 1024 * 1024 + config = applyThresholdDefaults(config) + if config.SnapshotRetentionHours <= 0 { + config.SnapshotRetentionHours = defaultSnapshotRetentionHours } - if normalized.MinInputFiles < 2 { - normalized.MinInputFiles = defaultMinInputFiles + if config.MaxSnapshotsToKeep <= 0 { + config.MaxSnapshotsToKeep = defaultMaxSnapshotsToKeep } - if normalized.MinManifestsToRewrite < minManifestsToRewrite { - normalized.MinManifestsToRewrite = minManifestsToRewrite - } - if normalized.OrphanOlderThanHours <= 0 { - normalized.OrphanOlderThanHours = defaultOrphanOlderThanHours - } - return normalized + return config } func (h *Handler) tableNeedsMaintenance( @@ -181,6 +175,25 @@ func (h *Handler) tableNeedsMaintenance( ) (bool, error) { config = normalizeDetectionConfig(config) + var predicate *partitionPredicate + if strings.TrimSpace(config.Where) != "" { + needsPredicate := false + for _, op := range ops { + if op == "compact" || op == "rewrite_position_delete_files" || op == "rewrite_manifests" { + needsPredicate = true + break + } + } + if needsPredicate { + var err error + predicate, err = parsePartitionPredicate(config.Where, meta) + if err != nil { + return false, err + } + } + } + _ = predicate // used by rewrite_position_delete_files; planning index handles compact/rewrite_manifests + // Evaluate the metadata-only expiration check first so large tables do not // pay for manifest reads when snapshot expiry already makes them eligible. for _, op := range ops { @@ -267,6 +280,20 @@ func (h *Handler) tableNeedsMaintenance( if eligible { return true, nil } + case "rewrite_position_delete_files": + manifests, err := getCurrentManifests() + if err != nil { + opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err)) + continue + } + eligible, err := hasEligibleDeleteRewrite(ctx, filerClient, bucketName, tablePath, manifests, config, meta, predicate) + if err != nil { + opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err)) + continue + } + if eligible { + return true, nil + } case "rewrite_manifests": eligible, err := checkPlanningIndex(op, (*planningIndex).rewriteManifestsEligible) if err != nil { @@ -351,6 +378,8 @@ func hasEligibleCompaction( bucketName, tablePath string, manifests []iceberg.ManifestFile, config Config, + meta table.Metadata, + predicate *partitionPredicate, ) (bool, error) { if len(manifests) == 0 { return false, nil @@ -390,10 +419,81 @@ func hasEligibleCompaction( allEntries = append(allEntries, entries...) } - bins := buildCompactionBins(allEntries, config.TargetFileSizeBytes, minInputFiles) + candidateEntries := allEntries + if predicate != nil { + specsByID := specByID(meta) + candidateEntries = make([]iceberg.ManifestEntry, 0, len(allEntries)) + for _, entry := range allEntries { + spec, ok := specsByID[int(entry.DataFile().SpecID())] + if !ok { + continue + } + match, err := predicate.Matches(spec, entry.DataFile().Partition()) + if err != nil { + return false, err + } + if match { + candidateEntries = append(candidateEntries, entry) + } + } + } + + bins := buildCompactionBins(candidateEntries, config.TargetFileSizeBytes, minInputFiles) return len(bins) > 0, nil } +func countDataManifestsForRewrite( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + manifests []iceberg.ManifestFile, + meta table.Metadata, + predicate *partitionPredicate, +) (int64, error) { + if predicate == nil { + return countDataManifests(manifests), nil + } + + specsByID := specByID(meta) + + var count int64 + for _, mf := range manifests { + if mf.ManifestContent() != iceberg.ManifestContentData { + continue + } + manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath()) + if err != nil { + return 0, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err) + } + entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) + if err != nil { + return 0, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err) + } + if len(entries) == 0 { + continue + } + spec, ok := specsByID[int(mf.PartitionSpecID())] + if !ok { + continue + } + allMatch := len(entries) > 0 + for _, entry := range entries { + match, err := predicate.Matches(spec, entry.DataFile().Partition()) + if err != nil { + return 0, err + } + if !match { + allMatch = false + break + } + } + if allMatch { + count++ + } + } + return count, nil +} + func compactionMinInputFiles(minInputFiles int64) (int, error) { // Ensure the configured value is positive and fits into the platform's int type if minInputFiles <= 0 { diff --git a/weed/plugin/worker/iceberg/exec_test.go b/weed/plugin/worker/iceberg/exec_test.go index 376e9e195..946bc51df 100644 --- a/weed/plugin/worker/iceberg/exec_test.go +++ b/weed/plugin/worker/iceberg/exec_test.go @@ -498,7 +498,7 @@ func (r *recordingExecutionSender) SendCompleted(c *plugin_pb.JobCompleted) erro func TestExpireSnapshotsExecution(t *testing.T) { fs, client := startFakeFiler(t) - now := time.Now().UnixMilli() + now := time.Now().Add(-10 * time.Second).UnixMilli() setup := tableSetup{ BucketName: "test-bucket", Namespace: "analytics", @@ -541,7 +541,7 @@ func TestExpireSnapshotsExecution(t *testing.T) { func TestExpireSnapshotsNothingToExpire(t *testing.T) { fs, client := startFakeFiler(t) - now := time.Now().UnixMilli() + now := time.Now().Add(-10 * time.Second).UnixMilli() setup := tableSetup{ BucketName: "test-bucket", Namespace: "ns", @@ -1184,26 +1184,30 @@ func TestDetectSchedulesSnapshotExpiryDespiteCompactionEvaluationError(t *testin Namespace: "analytics", TableName: "events", Snapshots: []table.Snapshot{ - {SnapshotID: 1, TimestampMs: now - 1, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + {SnapshotID: 2, TimestampMs: now + 1, ManifestList: "metadata/snap-2.avro", SequenceNumber: 2}, }, } populateTable(t, fs, setup) + // Corrupt manifest lists so compaction evaluation fails. metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") - manifestListName := path.Base(setup.Snapshots[0].ManifestList) - fs.putEntry(metaDir, manifestListName, &filer_pb.Entry{ - Name: manifestListName, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - FileSize: uint64(len("not-a-manifest-list")), - }, - Content: []byte("not-a-manifest-list"), - }) + for _, snap := range setup.Snapshots { + manifestListName := path.Base(snap.ManifestList) + fs.putEntry(metaDir, manifestListName, &filer_pb.Entry{ + Name: manifestListName, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(len("not-a-manifest-list")), + }, + Content: []byte("not-a-manifest-list"), + }) + } handler := NewHandler(nil) config := Config{ - SnapshotRetentionHours: 0, - MaxSnapshotsToKeep: 10, + SnapshotRetentionHours: 24 * 365, // very long retention so age doesn't trigger + MaxSnapshotsToKeep: 1, // 2 snapshots > 1 triggers expiry Operations: "compact,expire_snapshots", } @@ -2207,6 +2211,120 @@ func populateTableWithDeleteFiles( return meta } +func loadLiveDeleteFilePaths( + t *testing.T, + client filer_pb.SeaweedFilerClient, + bucketName, tablePath string, +) (posPaths, eqPaths []string) { + t.Helper() + + meta, _, err := loadCurrentMetadata(context.Background(), client, bucketName, tablePath) + if err != nil { + t.Fatalf("loadCurrentMetadata: %v", err) + } + manifests, err := loadCurrentManifests(context.Background(), client, bucketName, tablePath, meta) + if err != nil { + t.Fatalf("loadCurrentManifests: %v", err) + } + + for _, mf := range manifests { + if mf.ManifestContent() != iceberg.ManifestContentDeletes { + continue + } + manifestData, err := loadFileByIcebergPath(context.Background(), client, bucketName, tablePath, mf.FilePath()) + if err != nil { + t.Fatalf("load delete manifest: %v", err) + } + entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) + if err != nil { + t.Fatalf("read delete manifest: %v", err) + } + for _, entry := range entries { + switch entry.DataFile().ContentType() { + case iceberg.EntryContentPosDeletes: + posPaths = append(posPaths, entry.DataFile().FilePath()) + case iceberg.EntryContentEqDeletes: + eqPaths = append(eqPaths, entry.DataFile().FilePath()) + } + } + } + + sort.Strings(posPaths) + sort.Strings(eqPaths) + return posPaths, eqPaths +} + +func rewriteDeleteManifestsAsMixed( + t *testing.T, + fs *fakeFilerServer, + client filer_pb.SeaweedFilerClient, + setup tableSetup, +) { + t.Helper() + + meta, _, err := loadCurrentMetadata(context.Background(), client, setup.BucketName, setup.tablePath()) + if err != nil { + t.Fatalf("loadCurrentMetadata: %v", err) + } + manifests, err := loadCurrentManifests(context.Background(), client, setup.BucketName, setup.tablePath(), meta) + if err != nil { + t.Fatalf("loadCurrentManifests: %v", err) + } + + var dataManifests []iceberg.ManifestFile + var deleteEntries []iceberg.ManifestEntry + for _, mf := range manifests { + if mf.ManifestContent() == iceberg.ManifestContentData { + dataManifests = append(dataManifests, mf) + continue + } + manifestData, err := loadFileByIcebergPath(context.Background(), client, setup.BucketName, setup.tablePath(), mf.FilePath()) + if err != nil { + t.Fatalf("load delete manifest: %v", err) + } + entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) + if err != nil { + t.Fatalf("read delete manifest: %v", err) + } + for _, entry := range entries { + deleteEntries = append(deleteEntries, entry) + } + } + + spec := *iceberg.UnpartitionedSpec + version := meta.Version() + metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") + manifestName := "mixed-delete-manifest-1.avro" + manifestPath := path.Join("metadata", manifestName) + + var manifestBuf bytes.Buffer + _, err = iceberg.WriteManifest(manifestPath, &manifestBuf, version, spec, meta.CurrentSchema(), 1, deleteEntries) + if err != nil { + t.Fatalf("write mixed delete manifest: %v", err) + } + mixedBytes := patchManifestContentToDeletes(t, manifestBuf.Bytes()) + fs.putEntry(metaDir, manifestName, &filer_pb.Entry{ + Name: manifestName, Content: mixedBytes, + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(len(mixedBytes))}, + }) + + mixedManifest := iceberg.NewManifestFile(version, manifestPath, int64(len(mixedBytes)), int32(spec.ID()), 1). + Content(iceberg.ManifestContentDeletes). + AddedFiles(int32(len(deleteEntries))). + Build() + + var manifestListBuf bytes.Buffer + seqNum := int64(1) + allManifests := append(dataManifests, mixedManifest) + if err := iceberg.WriteManifestList(version, &manifestListBuf, 1, nil, &seqNum, 0, allManifests); err != nil { + t.Fatalf("write mixed manifest list: %v", err) + } + fs.putEntry(metaDir, "snap-1.avro", &filer_pb.Entry{ + Name: "snap-1.avro", Content: manifestListBuf.Bytes(), + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(manifestListBuf.Len())}, + }) +} + func TestCompactDataFilesMetrics(t *testing.T) { fs, client := startFakeFiler(t) @@ -2713,3 +2831,369 @@ func TestCompactDataFilesWithMixedDeletes(t *testing.T) { } } } + +func TestRewritePositionDeleteFilesExecution(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFiles(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{1, "alice"}, {2, "bob"}, {3, "charlie"}}}, + }, + []struct { + Name string + Rows []struct { + FilePath string + Pos int64 + } + }{ + {"pd1.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 0}, {"data/d1.parquet", 2}}}, + {"pd2.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 1}}}, + }, + nil, + ) + + handler := NewHandler(nil) + config := Config{ + DeleteTargetFileSizeBytes: 64 * 1024 * 1024, + DeleteMinInputFiles: 2, + DeleteMaxFileGroupSizeBytes: 128 * 1024 * 1024, + DeleteMaxOutputFiles: 4, + MaxCommitRetries: 3, + } + + result, metrics, err := handler.rewritePositionDeleteFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config) + if err != nil { + t.Fatalf("rewritePositionDeleteFiles: %v", err) + } + if !strings.Contains(result, "rewrote 2 position delete files into 1") { + t.Fatalf("unexpected result: %q", result) + } + if metrics[MetricDeleteFilesRewritten] != 2 { + t.Fatalf("expected 2 rewritten files, got %d", metrics[MetricDeleteFilesRewritten]) + } + if metrics[MetricDeleteFilesWritten] != 1 { + t.Fatalf("expected 1 written file, got %d", metrics[MetricDeleteFilesWritten]) + } + + liveDeletePaths, _ := loadLiveDeleteFilePaths(t, client, setup.BucketName, setup.tablePath()) + if len(liveDeletePaths) != 1 { + t.Fatalf("expected 1 live rewritten delete file, got %v", liveDeletePaths) + } + if !strings.HasPrefix(liveDeletePaths[0], "data/rewrite-delete-") { + t.Fatalf("expected rewritten delete file path, got %q", liveDeletePaths[0]) + } +} + +func TestRewritePositionDeleteFilesDetection(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFiles(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{1, "alice"}, {2, "bob"}}}, + }, + []struct { + Name string + Rows []struct { + FilePath string + Pos int64 + } + }{ + {"pd1.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 0}}}, + {"pd2.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 1}}}, + }, + nil, + ) + + handler := NewHandler(nil) + config := Config{ + Operations: "rewrite_position_delete_files", + DeleteTargetFileSizeBytes: 64 * 1024 * 1024, + DeleteMinInputFiles: 2, + DeleteMaxFileGroupSizeBytes: 128 * 1024 * 1024, + DeleteMaxOutputFiles: 4, + } + + tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scanTablesForMaintenance: %v", err) + } + if len(tables) != 1 { + t.Fatalf("expected 1 table needing delete rewrite, got %d", len(tables)) + } +} + +func TestRewritePositionDeleteFilesSkipsSingleFile(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFiles(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{1, "alice"}, {2, "bob"}}}, + }, + []struct { + Name string + Rows []struct { + FilePath string + Pos int64 + } + }{ + {"pd1.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 0}}}, + }, + nil, + ) + + handler := NewHandler(nil) + config := Config{ + DeleteTargetFileSizeBytes: 64 * 1024 * 1024, + DeleteMinInputFiles: 2, + DeleteMaxFileGroupSizeBytes: 128 * 1024 * 1024, + DeleteMaxOutputFiles: 4, + MaxCommitRetries: 3, + } + + result, _, err := handler.rewritePositionDeleteFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config) + if err != nil { + t.Fatalf("rewritePositionDeleteFiles: %v", err) + } + if !strings.Contains(result, "no position delete files eligible") { + t.Fatalf("unexpected result: %q", result) + } +} + +func TestRewritePositionDeleteFilesRespectsMinInputFiles(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFiles(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{1, "alice"}, {2, "bob"}}}, + }, + []struct { + Name string + Rows []struct { + FilePath string + Pos int64 + } + }{ + {"pd1.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 0}}}, + {"pd2.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 1}}}, + }, + nil, + ) + + handler := NewHandler(nil) + config := Config{ + DeleteTargetFileSizeBytes: 64 * 1024 * 1024, + DeleteMinInputFiles: 3, + DeleteMaxFileGroupSizeBytes: 128 * 1024 * 1024, + DeleteMaxOutputFiles: 4, + MaxCommitRetries: 3, + } + + result, _, err := handler.rewritePositionDeleteFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config) + if err != nil { + t.Fatalf("rewritePositionDeleteFiles: %v", err) + } + if !strings.Contains(result, "no position delete files eligible") { + t.Fatalf("unexpected result: %q", result) + } +} + +func TestRewritePositionDeleteFilesPreservesUnsupportedMultiTargetDeletes(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFiles(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{1, "alice"}, {2, "bob"}, {3, "charlie"}}}, + {"d2.parquet", []struct { + ID int64 + Name string + }{{4, "diana"}, {5, "eve"}}}, + }, + []struct { + Name string + Rows []struct { + FilePath string + Pos int64 + } + }{ + {"pd1.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 0}}}, + {"pd2.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 1}}}, + {"pd3.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 2}, {"data/d2.parquet", 0}}}, + }, + nil, + ) + + handler := NewHandler(nil) + config := Config{ + DeleteTargetFileSizeBytes: 64 * 1024 * 1024, + DeleteMinInputFiles: 2, + DeleteMaxFileGroupSizeBytes: 128 * 1024 * 1024, + DeleteMaxOutputFiles: 4, + MaxCommitRetries: 3, + } + + if _, _, err := handler.rewritePositionDeleteFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config); err != nil { + t.Fatalf("rewritePositionDeleteFiles: %v", err) + } + + posPaths, _ := loadLiveDeleteFilePaths(t, client, setup.BucketName, setup.tablePath()) + if len(posPaths) != 2 { + t.Fatalf("expected rewritten file plus untouched multi-target file, got %v", posPaths) + } + if posPaths[0] != "data/pd3.parquet" && posPaths[1] != "data/pd3.parquet" { + t.Fatalf("expected multi-target delete file to be preserved, got %v", posPaths) + } + if !strings.HasPrefix(posPaths[0], "data/rewrite-delete-") && !strings.HasPrefix(posPaths[1], "data/rewrite-delete-") { + t.Fatalf("expected rewritten delete file to remain live, got %v", posPaths) + } +} + +func TestRewritePositionDeleteFilesRebuildsMixedDeleteManifests(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFiles(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{1, "alice"}, {2, "bob"}, {3, "charlie"}}}, + }, + []struct { + Name string + Rows []struct { + FilePath string + Pos int64 + } + }{ + {"pd1.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 0}}}, + {"pd2.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 1}}}, + }, + []struct { + Name string + FieldIDs []int + Rows []struct { + ID int64 + Name string + } + }{ + {"eq1.parquet", []int{1}, []struct { + ID int64 + Name string + }{{3, "charlie"}}}, + }, + ) + rewriteDeleteManifestsAsMixed(t, fs, client, setup) + + handler := NewHandler(nil) + config := Config{ + DeleteTargetFileSizeBytes: 64 * 1024 * 1024, + DeleteMinInputFiles: 2, + DeleteMaxFileGroupSizeBytes: 128 * 1024 * 1024, + DeleteMaxOutputFiles: 4, + MaxCommitRetries: 3, + } + + if _, _, err := handler.rewritePositionDeleteFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config); err != nil { + t.Fatalf("rewritePositionDeleteFiles: %v", err) + } + + posPaths, eqPaths := loadLiveDeleteFilePaths(t, client, setup.BucketName, setup.tablePath()) + if len(posPaths) != 1 || !strings.HasPrefix(posPaths[0], "data/rewrite-delete-") { + t.Fatalf("expected only the rewritten position delete file to remain live, got %v", posPaths) + } + if len(eqPaths) != 1 || eqPaths[0] != "data/eq1.parquet" { + t.Fatalf("expected equality delete file to be preserved, got %v", eqPaths) + } +} diff --git a/weed/plugin/worker/iceberg/handler.go b/weed/plugin/worker/iceberg/handler.go index bf03ab311..09d6158b6 100644 --- a/weed/plugin/worker/iceberg/handler.go +++ b/weed/plugin/worker/iceberg/handler.go @@ -48,7 +48,7 @@ func (h *Handler) Capability() *plugin_pb.JobTypeCapability { MaxDetectionConcurrency: 1, MaxExecutionConcurrency: 4, DisplayName: "Iceberg Maintenance", - Description: "Compacts, expires snapshots, removes orphans, and rewrites manifests for Iceberg tables in S3 table buckets", + Description: "Compacts data, rewrites delete files, expires snapshots, removes orphans, and rewrites manifests for Iceberg tables in S3 table buckets", Weight: 50, } } @@ -57,7 +57,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { return &plugin_pb.JobTypeDescriptor{ JobType: jobType, DisplayName: "Iceberg Maintenance", - Description: "Automated maintenance for Iceberg tables: snapshot expiration, orphan removal, manifest rewriting", + Description: "Automated maintenance for Iceberg tables: data compaction, delete-file rewrite, snapshot expiration, orphan removal, and manifest rewriting", Icon: "fas fa-snowflake", DescriptorVersion: 1, AdminConfigForm: &plugin_pb.ConfigForm{ @@ -159,7 +159,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { { SectionId: "compaction", Title: "Data Compaction", - Description: "Controls for bin-packing small Parquet data files.", + Description: "Controls for bin-packing or sorting small Parquet data files.", Fields: []*plugin_pb.ConfigField{ { Name: "target_file_size_mb", @@ -184,6 +184,69 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_BOOL, Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TOGGLE, }, + { + Name: "rewrite_strategy", + Label: "Rewrite Strategy", + Description: "binpack keeps the current row order; sort rewrites each compaction bin using sort_fields or the table sort order.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + Placeholder: "binpack or sort", + }, + { + Name: "sort_fields", + Label: "Sort Fields", + Description: "Comma-separated field names for rewrite_strategy=sort. Blank uses the table sort order when present.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + Placeholder: "id, created_at", + }, + { + Name: "sort_max_input_mb", + Label: "Sort Max Input (MB)", + Description: "Optional hard cap for the total bytes in a sorted compaction bin. Zero = no extra cap beyond binning.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, + }, + }, + }, + { + SectionId: "delete_rewrite", + Title: "Delete Rewrite", + Description: "Controls for rewriting small position-delete files into fewer larger files.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "delete_target_file_size_mb", + Label: "Delete Target File Size (MB)", + Description: "Target size for rewritten position-delete files.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + }, + { + Name: "delete_min_input_files", + Label: "Delete Min Input Files", + Description: "Minimum number of position-delete files in a group before rewrite is triggered.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2}}, + }, + { + Name: "delete_max_file_group_size_mb", + Label: "Delete Max Group Size (MB)", + Description: "Skip rewriting delete groups larger than this bound.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + }, + { + Name: "delete_max_output_files", + Label: "Delete Max Output Files", + Description: "Maximum number of rewritten delete files a single group may produce.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + }, }, }, { @@ -233,23 +296,39 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { { Name: "operations", Label: "Operations", - Description: "Comma-separated list of operations to run: compact, expire_snapshots, remove_orphans, rewrite_manifests, or 'all'.", + Description: "Comma-separated list of operations to run: compact, rewrite_position_delete_files, expire_snapshots, remove_orphans, rewrite_manifests, or 'all'.", FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, }, + { + Name: "where", + Label: "Where Filter", + Description: "Optional partition filter for compact, rewrite_position_delete_files, and rewrite_manifests. Supports field = literal, field IN (...), and AND.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + Placeholder: "region = 'us' AND dt IN ('2026-03-15')", + }, }, }, }, DefaultValues: map[string]*plugin_pb.ConfigValue{ - "target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}}, - "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}}, - "min_manifests_to_rewrite": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinManifestsToRewrite}}, - "snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}}, - "max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxSnapshotsToKeep}}, - "orphan_older_than_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultOrphanOlderThanHours}}, - "max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxCommitRetries}}, - "operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultOperations}}, - "apply_deletes": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true}}, + "target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}}, + "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}}, + "delete_target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteTargetFileSizeMB}}, + "delete_min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteMinInputFiles}}, + "delete_max_file_group_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteMaxGroupSizeMB}}, + "delete_max_output_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteMaxOutputFiles}}, + "min_manifests_to_rewrite": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinManifestsToRewrite}}, + "snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}}, + "max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxSnapshotsToKeep}}, + "orphan_older_than_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultOrphanOlderThanHours}}, + "max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxCommitRetries}}, + "operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultOperations}}, + "apply_deletes": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true}}, + "rewrite_strategy": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "binpack"}}, + "sort_fields": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "sort_max_input_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, + "where": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, }, }, AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{ @@ -264,14 +343,22 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { JobTypeMaxRuntimeSeconds: 3600, // 1 hour max }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ - "target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}}, - "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}}, - "snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}}, - "max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxSnapshotsToKeep}}, - "orphan_older_than_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultOrphanOlderThanHours}}, - "max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxCommitRetries}}, - "operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultOperations}}, - "apply_deletes": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true}}, + "target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}}, + "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}}, + "delete_target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteTargetFileSizeMB}}, + "delete_min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteMinInputFiles}}, + "delete_max_file_group_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteMaxGroupSizeMB}}, + "delete_max_output_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteMaxOutputFiles}}, + "snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}}, + "max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxSnapshotsToKeep}}, + "orphan_older_than_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultOrphanOlderThanHours}}, + "max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxCommitRetries}}, + "operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultOperations}}, + "apply_deletes": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true}}, + "rewrite_strategy": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "binpack"}}, + "sort_fields": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "sort_max_input_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, + "where": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, }, } } @@ -288,9 +375,13 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq } workerConfig := ParseConfig(request.GetWorkerConfigValues()) - if _, err := parseOperations(workerConfig.Operations); err != nil { + ops, err := parseOperations(workerConfig.Operations) + if err != nil { return fmt.Errorf("invalid operations config: %w", err) } + if err := validateWhereOperations(workerConfig.Where, ops); err != nil { + return fmt.Errorf("invalid where config: %w", err) + } // Detection interval is managed by the scheduler via AdminRuntimeDefaults.DetectionIntervalSeconds. @@ -407,6 +498,9 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ if opsErr != nil { return fmt.Errorf("invalid operations config: %w", opsErr) } + if err := validateWhereOperations(workerConfig.Where, ops); err != nil { + return fmt.Errorf("invalid where config: %w", err) + } // Send initial progress if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ @@ -437,8 +531,8 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ completedOps := 0 allMetrics := make(map[string]int64) - // Execute operations in correct Iceberg maintenance order: - // expire_snapshots → remove_orphans → rewrite_manifests + // Execute operations in canonical maintenance order as defined by + // parseOperations. for _, op := range ops { select { case <-ctx.Done(): @@ -478,6 +572,8 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ Message: fmt.Sprintf("compacting bin %d of %d", binIdx+1, totalBins), }) }) + case "rewrite_position_delete_files": + opResult, opMetrics, opErr = h.rewritePositionDeleteFiles(ctx, filerClient, bucketName, tablePath, workerConfig) case "expire_snapshots": opResult, opMetrics, opErr = h.expireSnapshots(ctx, filerClient, bucketName, tablePath, workerConfig) case "remove_orphans": diff --git a/weed/plugin/worker/iceberg/handler_test.go b/weed/plugin/worker/iceberg/handler_test.go index 1ee73efb3..3d989880a 100644 --- a/weed/plugin/worker/iceberg/handler_test.go +++ b/weed/plugin/worker/iceberg/handler_test.go @@ -42,15 +42,17 @@ func TestParseOperations(t *testing.T) { expected []string wantErr bool }{ - {"all", []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false}, - {"", []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false}, + {"all", []string{"compact", "rewrite_position_delete_files", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false}, + {"", []string{"compact", "rewrite_position_delete_files", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false}, {"expire_snapshots", []string{"expire_snapshots"}, false}, {"compact", []string{"compact"}, false}, + {"rewrite_position_delete_files", []string{"rewrite_position_delete_files"}, false}, {"rewrite_manifests,expire_snapshots", []string{"expire_snapshots", "rewrite_manifests"}, false}, {"compact,expire_snapshots", []string{"compact", "expire_snapshots"}, false}, {"remove_orphans, rewrite_manifests", []string{"remove_orphans", "rewrite_manifests"}, false}, {"expire_snapshots,remove_orphans,rewrite_manifests", []string{"expire_snapshots", "remove_orphans", "rewrite_manifests"}, false}, {"compact,expire_snapshots,remove_orphans,rewrite_manifests", []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false}, + {"compact,rewrite_position_delete_files,rewrite_manifests", []string{"compact", "rewrite_position_delete_files", "rewrite_manifests"}, false}, {"unknown_op", nil, true}, {"expire_snapshots,bad_op", nil, true}, } @@ -848,6 +850,35 @@ func TestParseConfigApplyDeletes(t *testing.T) { } } +func TestNormalizeDetectionConfigUsesSharedDefaults(t *testing.T) { + config := normalizeDetectionConfig(Config{}) + + if config.TargetFileSizeBytes != defaultTargetFileSizeMB*1024*1024 { + t.Fatalf("expected TargetFileSizeBytes default, got %d", config.TargetFileSizeBytes) + } + if config.DeleteTargetFileSizeBytes != defaultDeleteTargetFileSizeMB*1024*1024 { + t.Fatalf("expected DeleteTargetFileSizeBytes default, got %d", config.DeleteTargetFileSizeBytes) + } + if config.DeleteMinInputFiles != defaultDeleteMinInputFiles { + t.Fatalf("expected DeleteMinInputFiles default, got %d", config.DeleteMinInputFiles) + } + if config.DeleteMaxFileGroupSizeBytes != defaultDeleteMaxGroupSizeMB*1024*1024 { + t.Fatalf("expected DeleteMaxFileGroupSizeBytes default, got %d", config.DeleteMaxFileGroupSizeBytes) + } + if config.DeleteMaxOutputFiles != defaultDeleteMaxOutputFiles { + t.Fatalf("expected DeleteMaxOutputFiles default, got %d", config.DeleteMaxOutputFiles) + } + if config.OrphanOlderThanHours != defaultOrphanOlderThanHours { + t.Fatalf("expected OrphanOlderThanHours default, got %d", config.OrphanOlderThanHours) + } + if config.SnapshotRetentionHours != defaultSnapshotRetentionHours { + t.Fatalf("expected SnapshotRetentionHours default, got %d", config.SnapshotRetentionHours) + } + if config.MaxSnapshotsToKeep != defaultMaxSnapshotsToKeep { + t.Fatalf("expected MaxSnapshotsToKeep default, got %d", config.MaxSnapshotsToKeep) + } +} + func TestCollectPositionDeletes(t *testing.T) { fs, client := startFakeFiler(t) diff --git a/weed/plugin/worker/iceberg/operations.go b/weed/plugin/worker/iceberg/operations.go index fa8b89af0..41fe8ae19 100644 --- a/weed/plugin/worker/iceberg/operations.go +++ b/weed/plugin/worker/iceberg/operations.go @@ -323,6 +323,10 @@ func (h *Handler) rewriteManifests( if err != nil { return "", nil, fmt.Errorf("load metadata: %w", err) } + predicate, err := parsePartitionPredicate(config.Where, meta) + if err != nil { + return "", nil, err + } currentSnap := meta.CurrentSnapshot() if currentSnap == nil || currentSnap.ManifestList == "" { @@ -349,10 +353,6 @@ func (h *Handler) rewriteManifests( } } - if int64(len(dataManifests)) < config.MinManifestsToRewrite { - return fmt.Sprintf("only %d data manifests, below threshold of %d", len(dataManifests), config.MinManifestsToRewrite), nil, nil - } - // Collect all entries from data manifests, grouped by partition spec ID // so we write one merged manifest per spec (required for spec-evolved tables). type specEntries struct { @@ -363,10 +363,9 @@ func (h *Handler) rewriteManifests( specMap := make(map[int32]*specEntries) // Build a lookup from spec ID to PartitionSpec - specByID := make(map[int]iceberg.PartitionSpec) - for _, ps := range meta.PartitionSpecs() { - specByID[ps.ID()] = ps - } + specByID := specByID(meta) + var carriedDataManifests []iceberg.ManifestFile + var manifestsRewritten int64 for _, mf := range dataManifests { manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath()) @@ -378,6 +377,28 @@ func (h *Handler) rewriteManifests( return "", nil, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err) } + if predicate != nil { + spec, found := specByID[int(mf.PartitionSpecID())] + if !found { + return "", nil, fmt.Errorf("partition spec %d not found in table metadata", mf.PartitionSpecID()) + } + allMatch := len(entries) > 0 + for _, entry := range entries { + match, err := predicate.Matches(spec, entry.DataFile().Partition()) + if err != nil { + return "", nil, err + } + if !match { + allMatch = false + break + } + } + if !allMatch { + carriedDataManifests = append(carriedDataManifests, mf) + continue + } + } + sid := mf.PartitionSpecID() se, ok := specMap[sid] if !ok { @@ -389,6 +410,11 @@ func (h *Handler) rewriteManifests( specMap[sid] = se } se.entries = append(se.entries, entries...) + manifestsRewritten++ + } + + if manifestsRewritten < config.MinManifestsToRewrite { + return fmt.Sprintf("only %d data manifests, below threshold of %d", manifestsRewritten, config.MinManifestsToRewrite), nil, nil } if len(specMap) == 0 { @@ -425,6 +451,7 @@ func (h *Handler) rewriteManifests( // Write one merged manifest per partition spec var newManifests []iceberg.ManifestFile + newManifests = append(newManifests, carriedDataManifests...) totalEntries := 0 for _, se := range specMap { totalEntries += len(se.entries) @@ -514,11 +541,11 @@ func (h *Handler) rewriteManifests( committed = true metrics := map[string]int64{ - MetricManifestsRewritten: int64(len(dataManifests)), + MetricManifestsRewritten: manifestsRewritten, MetricEntriesTotal: int64(totalEntries), MetricDurationMs: time.Since(start).Milliseconds(), } - return fmt.Sprintf("rewrote %d manifests into %d (%d entries)", len(dataManifests), len(specMap), totalEntries), metrics, nil + return fmt.Sprintf("rewrote %d manifests into %d (%d entries)", manifestsRewritten, len(specMap), totalEntries), metrics, nil } // --------------------------------------------------------------------------- diff --git a/weed/plugin/worker/iceberg/planning_index.go b/weed/plugin/worker/iceberg/planning_index.go index 43cc44a48..0015012ab 100644 --- a/weed/plugin/worker/iceberg/planning_index.go +++ b/weed/plugin/worker/iceberg/planning_index.go @@ -177,7 +177,7 @@ func buildPlanningIndexFromManifests( } if operationRequested(ops, "compact") { - eligible, err := hasEligibleCompaction(ctx, filerClient, bucketName, tablePath, manifests, config) + eligible, err := hasEligibleCompaction(ctx, filerClient, bucketName, tablePath, manifests, config, meta, nil) if err != nil { return nil, err } diff --git a/weed/plugin/worker/iceberg/where_filter.go b/weed/plugin/worker/iceberg/where_filter.go new file mode 100644 index 000000000..69f60fb71 --- /dev/null +++ b/weed/plugin/worker/iceberg/where_filter.go @@ -0,0 +1,311 @@ +package iceberg + +import ( + "fmt" + "regexp" + "strconv" + "strings" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" +) + +var ( + whereEqualsPattern = regexp.MustCompile(`^([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+)$`) + whereInPattern = regexp.MustCompile(`^(?i)([A-Za-z_][A-Za-z0-9_]*)\s+IN\s*\((.*)\)$`) +) + +type whereClause struct { + Field string + Literals []string +} + +type partitionPredicate struct { + Clauses []whereClause +} + +func validateWhereOperations(where string, ops []string) error { + if strings.TrimSpace(where) == "" { + return nil + } + for _, op := range ops { + switch op { + case "compact", "rewrite_manifests", "rewrite_position_delete_files": + continue + default: + return fmt.Errorf("where filter is only supported for compact, rewrite_position_delete_files, and rewrite_manifests") + } + } + return nil +} + +func parsePartitionPredicate(where string, meta table.Metadata) (*partitionPredicate, error) { + where = strings.TrimSpace(where) + if where == "" { + return nil, nil + } + if meta == nil { + return nil, fmt.Errorf("where filter requires table metadata") + } + + specs := meta.PartitionSpecs() + if len(specs) == 0 || meta.PartitionSpec().IsUnpartitioned() { + return nil, fmt.Errorf("where filter is not supported for unpartitioned tables") + } + + rawClauses := splitWhereConjunction(where) + clauses := make([]whereClause, 0, len(rawClauses)) + for _, raw := range rawClauses { + clause, err := parseWhereClause(raw) + if err != nil { + return nil, err + } + clauses = append(clauses, clause) + } + + // Validate against the current partition spec only. Historical specs may + // lack fields added during schema evolution; per-entry matching in Matches() + // handles those gracefully. + currentSpec := meta.PartitionSpec() + for _, clause := range clauses { + if !specHasFieldByName(currentSpec, clause.Field) { + return nil, fmt.Errorf("where field %q is not present in current partition spec %d", clause.Field, currentSpec.ID()) + } + } + + return &partitionPredicate{Clauses: clauses}, nil +} + +func splitWhereConjunction(where string) []string { + // Quote-aware split: only split on AND that appears outside quotes. + var parts []string + var current strings.Builder + var quote rune + runes := []rune(where) + for i := 0; i < len(runes); i++ { + r := runes[i] + if quote != 0 { + current.WriteRune(r) + if r == quote { + quote = 0 + } + continue + } + if r == '\'' || r == '"' { + quote = r + current.WriteRune(r) + continue + } + // Check for case-insensitive AND surrounded by whitespace. + if (r == 'A' || r == 'a') && i+3 < len(runes) { + candidate := string(runes[i : i+3]) + if strings.EqualFold(candidate, "AND") { + before := i > 0 && isWhitespace(runes[i-1]) + after := i+3 < len(runes) && isWhitespace(runes[i+3]) + if before && after { + part := strings.TrimSpace(current.String()) + if part != "" { + parts = append(parts, part) + } + current.Reset() + i += 3 // skip "AND" + the after-space will be consumed next iteration + continue + } + } + } + current.WriteRune(r) + } + if part := strings.TrimSpace(current.String()); part != "" { + parts = append(parts, part) + } + return parts +} + +func isWhitespace(r rune) bool { + return r == ' ' || r == '\t' || r == '\n' || r == '\r' +} + +func parseWhereClause(raw string) (whereClause, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return whereClause{}, fmt.Errorf("empty where clause") + } + if matches := whereInPattern.FindStringSubmatch(raw); matches != nil { + literals, err := splitLiteralList(matches[2]) + if err != nil { + return whereClause{}, err + } + if len(literals) == 0 { + return whereClause{}, fmt.Errorf("empty IN list in where clause %q", raw) + } + return whereClause{Field: matches[1], Literals: literals}, nil + } + if matches := whereEqualsPattern.FindStringSubmatch(raw); matches != nil { + return whereClause{Field: matches[1], Literals: []string{strings.TrimSpace(matches[2])}}, nil + } + return whereClause{}, fmt.Errorf("unsupported where clause %q", raw) +} + +func splitLiteralList(raw string) ([]string, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return nil, nil + } + var ( + literals []string + current strings.Builder + quote rune + ) + for _, r := range raw { + switch { + case quote != 0: + current.WriteRune(r) + if r == quote { + quote = 0 + } + case r == '\'' || r == '"': + quote = r + current.WriteRune(r) + case r == ',': + literal := strings.TrimSpace(current.String()) + if literal != "" { + literals = append(literals, literal) + } + current.Reset() + default: + current.WriteRune(r) + } + } + if quote != 0 { + return nil, fmt.Errorf("unterminated quoted literal in IN list") + } + if literal := strings.TrimSpace(current.String()); literal != "" { + literals = append(literals, literal) + } + return literals, nil +} + +func specHasFieldByName(spec iceberg.PartitionSpec, fieldName string) bool { + for field := range spec.Fields() { + if field.Name == fieldName { + return true + } + } + return false +} + +func specByID(meta table.Metadata) map[int]iceberg.PartitionSpec { + result := make(map[int]iceberg.PartitionSpec) + if meta == nil { + return result + } + for _, spec := range meta.PartitionSpecs() { + result[spec.ID()] = spec + } + return result +} + +func (p *partitionPredicate) Matches(spec iceberg.PartitionSpec, partition map[int]any) (bool, error) { + if p == nil { + return true, nil + } + + valuesByName := make(map[string]any) + for field := range spec.Fields() { + if value, ok := partition[field.FieldID]; ok { + valuesByName[field.Name] = value + } + } + + for _, clause := range p.Clauses { + actual, ok := valuesByName[clause.Field] + if !ok { + // Field not present in this spec (e.g. older spec before schema + // evolution). Skip this entry rather than erroring. + return false, nil + } + matched := false + for _, literal := range clause.Literals { + ok, err := literalMatchesActual(literal, actual) + if err != nil { + return false, fmt.Errorf("where field %q: %w", clause.Field, err) + } + if ok { + matched = true + break + } + } + if !matched { + return false, nil + } + } + return true, nil +} + +func literalMatchesActual(raw string, actual any) (bool, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return false, fmt.Errorf("empty literal") + } + + switch v := actual.(type) { + case string: + value, err := unquoteLiteral(raw) + if err != nil { + return false, err + } + return v == value, nil + case bool: + value, err := strconv.ParseBool(strings.ToLower(strings.TrimSpace(raw))) + if err != nil { + return false, fmt.Errorf("parse bool literal %q: %w", raw, err) + } + return v == value, nil + case int: + value, err := strconv.ParseInt(raw, 10, 64) + if err != nil { + return false, fmt.Errorf("parse int literal %q: %w", raw, err) + } + return int64(v) == value, nil + case int32: + value, err := strconv.ParseInt(raw, 10, 32) + if err != nil { + return false, fmt.Errorf("parse int32 literal %q: %w", raw, err) + } + return v == int32(value), nil + case int64: + value, err := strconv.ParseInt(raw, 10, 64) + if err != nil { + return false, fmt.Errorf("parse int64 literal %q: %w", raw, err) + } + return v == value, nil + case float32: + value, err := strconv.ParseFloat(raw, 32) + if err != nil { + return false, fmt.Errorf("parse float32 literal %q: %w", raw, err) + } + return v == float32(value), nil + case float64: + value, err := strconv.ParseFloat(raw, 64) + if err != nil { + return false, fmt.Errorf("parse float64 literal %q: %w", raw, err) + } + return v == value, nil + default: + value, err := unquoteLiteral(raw) + if err != nil { + return false, err + } + return fmt.Sprint(actual) == value, nil + } +} + +func unquoteLiteral(raw string) (string, error) { + raw = strings.TrimSpace(raw) + if len(raw) >= 2 { + if (raw[0] == '\'' && raw[len(raw)-1] == '\'') || (raw[0] == '"' && raw[len(raw)-1] == '"') { + return raw[1 : len(raw)-1], nil + } + } + return raw, nil +} diff --git a/weed/plugin/worker/iceberg/where_filter_test.go b/weed/plugin/worker/iceberg/where_filter_test.go new file mode 100644 index 000000000..66ce51a73 --- /dev/null +++ b/weed/plugin/worker/iceberg/where_filter_test.go @@ -0,0 +1,287 @@ +package iceberg + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "path" + "strings" + "testing" + "time" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" +) + +type partitionedTestFile struct { + Name string + Partition map[int]any + Rows []struct { + ID int64 + Name string + } +} + +func populatePartitionedDataTable( + t *testing.T, + fs *fakeFilerServer, + setup tableSetup, + partitionSpec iceberg.PartitionSpec, + manifestGroups [][]partitionedTestFile, +) table.Metadata { + t.Helper() + + schema := newTestSchema() + meta, err := table.NewMetadata(schema, &partitionSpec, table.UnsortedSortOrder, "s3://"+setup.BucketName+"/"+setup.tablePath(), nil) + if err != nil { + t.Fatalf("create metadata: %v", err) + } + + bucketsPath := s3tables.TablesPath + bucketPath := path.Join(bucketsPath, setup.BucketName) + nsPath := path.Join(bucketPath, setup.Namespace) + tablePath := path.Join(nsPath, setup.TableName) + metaDir := path.Join(tablePath, "metadata") + dataDir := path.Join(tablePath, "data") + + version := meta.Version() + var manifestFiles []iceberg.ManifestFile + for idx, group := range manifestGroups { + entries := make([]iceberg.ManifestEntry, 0, len(group)) + for _, file := range group { + data := writeTestParquetFile(t, fs, dataDir, file.Name, file.Rows) + dfBuilder, err := iceberg.NewDataFileBuilder( + partitionSpec, + iceberg.EntryContentData, + path.Join("data", file.Name), + iceberg.ParquetFile, + file.Partition, + nil, nil, + int64(len(file.Rows)), + int64(len(data)), + ) + if err != nil { + t.Fatalf("build data file %s: %v", file.Name, err) + } + snapID := int64(1) + entries = append(entries, iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build())) + } + + manifestName := fmt.Sprintf("where-manifest-%d.avro", idx+1) + var manifestBuf bytes.Buffer + mf, err := iceberg.WriteManifest(path.Join("metadata", manifestName), &manifestBuf, version, partitionSpec, schema, 1, entries) + if err != nil { + t.Fatalf("write manifest %d: %v", idx+1, err) + } + fs.putEntry(metaDir, manifestName, &filer_pb.Entry{ + Name: manifestName, + Content: manifestBuf.Bytes(), + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(manifestBuf.Len())}, + }) + manifestFiles = append(manifestFiles, mf) + } + + var manifestListBuf bytes.Buffer + seqNum := int64(1) + if err := iceberg.WriteManifestList(version, &manifestListBuf, 1, nil, &seqNum, 0, manifestFiles); err != nil { + t.Fatalf("write manifest list: %v", err) + } + fs.putEntry(metaDir, "snap-1.avro", &filer_pb.Entry{ + Name: "snap-1.avro", + Content: manifestListBuf.Bytes(), + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(manifestListBuf.Len())}, + }) + + builder, err := table.MetadataBuilderFromBase(meta, "s3://"+setup.BucketName+"/"+setup.tablePath()) + if err != nil { + t.Fatalf("metadata builder: %v", err) + } + snapshot := table.Snapshot{SnapshotID: 1, TimestampMs: time.Now().UnixMilli(), ManifestList: "metadata/snap-1.avro", SequenceNumber: 1} + if err := builder.AddSnapshot(&snapshot); err != nil { + t.Fatalf("add snapshot: %v", err) + } + if err := builder.SetSnapshotRef(table.MainBranch, 1, table.BranchRef); err != nil { + t.Fatalf("set snapshot ref: %v", err) + } + meta, err = builder.Build() + if err != nil { + t.Fatalf("build metadata: %v", err) + } + + fullMetadataJSON, _ := json.Marshal(meta) + internalMeta := map[string]interface{}{ + "metadataVersion": 1, + "metadataLocation": "metadata/v1.metadata.json", + "metadata": map[string]interface{}{"fullMetadata": json.RawMessage(fullMetadataJSON)}, + } + xattr, _ := json.Marshal(internalMeta) + + fs.putEntry(bucketsPath, setup.BucketName, &filer_pb.Entry{ + Name: setup.BucketName, + IsDirectory: true, + Extended: map[string][]byte{s3tables.ExtendedKeyTableBucket: []byte("true")}, + }) + fs.putEntry(bucketPath, setup.Namespace, &filer_pb.Entry{Name: setup.Namespace, IsDirectory: true}) + fs.putEntry(nsPath, setup.TableName, &filer_pb.Entry{ + Name: setup.TableName, + IsDirectory: true, + Extended: map[string][]byte{ + s3tables.ExtendedKeyMetadata: xattr, + s3tables.ExtendedKeyMetadataVersion: metadataVersionXattr(1), + }, + }) + + return meta +} + +func TestValidateWhereOperations(t *testing.T) { + if err := validateWhereOperations("name = 'us'", []string{"compact", "rewrite_manifests"}); err != nil { + t.Fatalf("unexpected validation error: %v", err) + } + if err := validateWhereOperations("name = 'us'", []string{"expire_snapshots"}); err == nil { + t.Fatal("expected where validation to reject expire_snapshots") + } +} + +func TestSplitWhereConjunctionQuoteAware(t *testing.T) { + cases := []struct { + input string + expected []string + }{ + {"a = 1 AND b = 2", []string{"a = 1", "b = 2"}}, + {"a = 'research AND dev'", []string{"a = 'research AND dev'"}}, + {"a IN ('sales AND marketing', 'eng') AND b = 2", []string{"a IN ('sales AND marketing', 'eng')", "b = 2"}}, + {"a = 1 and b = 2", []string{"a = 1", "b = 2"}}, + {"a = 'x' AND b = \"y AND z\"", []string{"a = 'x'", "b = \"y AND z\""}}, + } + for _, tc := range cases { + got := splitWhereConjunction(tc.input) + if len(got) != len(tc.expected) { + t.Errorf("splitWhereConjunction(%q) = %v, want %v", tc.input, got, tc.expected) + continue + } + for i := range got { + if got[i] != tc.expected[i] { + t.Errorf("splitWhereConjunction(%q)[%d] = %q, want %q", tc.input, i, got[i], tc.expected[i]) + } + } + } +} + +func TestPartitionPredicateMatchesUsesPartitionFieldIDs(t *testing.T) { + spec := iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 2, + FieldID: 1000, + Name: "name", + Transform: iceberg.IdentityTransform{}, + }) + predicate := &partitionPredicate{Clauses: []whereClause{{Field: "name", Literals: []string{"'us'"}}}} + + match, err := predicate.Matches(spec, map[int]any{2: "us"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if match { + t.Fatal("expected source-column key to not match partition predicate") + } +} + +func TestCompactDataFilesWhereFilter(t *testing.T) { + fs, client := startFakeFiler(t) + + partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 2, + FieldID: 1000, + Name: "name", + Transform: iceberg.IdentityTransform{}, + }) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populatePartitionedDataTable(t, fs, setup, partitionSpec, [][]partitionedTestFile{ + { + {Name: "us-1.parquet", Partition: map[int]any{1000: "us"}, Rows: []struct { + ID int64 + Name string + }{{1, "us"}}}, + }, + { + {Name: "us-2.parquet", Partition: map[int]any{1000: "us"}, Rows: []struct { + ID int64 + Name string + }{{2, "us"}}}, + }, + { + {Name: "eu-1.parquet", Partition: map[int]any{1000: "eu"}, Rows: []struct { + ID int64 + Name string + }{{3, "eu"}}}, + {Name: "eu-2.parquet", Partition: map[int]any{1000: "eu"}, Rows: []struct { + ID int64 + Name string + }{{4, "eu"}}}, + }, + }) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 256 * 1024 * 1024, + MinInputFiles: 2, + MaxCommitRetries: 3, + Where: "name = 'us'", + } + + result, _, err := handler.compactDataFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config, nil) + if err != nil { + t.Fatalf("compactDataFiles: %v", err) + } + if !strings.Contains(result, "compacted 2 files into 1") { + t.Fatalf("unexpected result: %q", result) + } + + meta, _, err := loadCurrentMetadata(context.Background(), client, setup.BucketName, setup.tablePath()) + if err != nil { + t.Fatalf("loadCurrentMetadata: %v", err) + } + manifests, err := loadCurrentManifests(context.Background(), client, setup.BucketName, setup.tablePath(), meta) + if err != nil { + t.Fatalf("loadCurrentManifests: %v", err) + } + + var liveDataPaths []string + for _, mf := range manifests { + if mf.ManifestContent() != iceberg.ManifestContentData { + continue + } + manifestData, err := loadFileByIcebergPath(context.Background(), client, setup.BucketName, setup.tablePath(), mf.FilePath()) + if err != nil { + t.Fatalf("load data manifest: %v", err) + } + entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) + if err != nil { + t.Fatalf("read data manifest: %v", err) + } + for _, entry := range entries { + liveDataPaths = append(liveDataPaths, entry.DataFile().FilePath()) + } + } + + if len(liveDataPaths) != 3 { + t.Fatalf("expected 3 live data files after filtered compaction, got %v", liveDataPaths) + } + var compactedCount int + for _, p := range liveDataPaths { + switch { + case strings.HasPrefix(p, "data/compact-"): + compactedCount++ + case p == "data/eu-1.parquet", p == "data/eu-2.parquet": + default: + t.Fatalf("unexpected live data file %q", p) + } + } + if compactedCount != 1 { + t.Fatalf("expected exactly one compacted file, got %d in %v", compactedCount, liveDataPaths) + } +}