diff --git a/test/s3tables/maintenance/maintenance_integration_test.go b/test/s3tables/maintenance/maintenance_integration_test.go index 4a3857fff..ae1a166c6 100644 --- a/test/s3tables/maintenance/maintenance_integration_test.go +++ b/test/s3tables/maintenance/maintenance_integration_test.go @@ -625,7 +625,7 @@ func testExpireSnapshots(t *testing.T) { MaxCommitRetries: 3, } - result, err := handler.ExpireSnapshots(context.Background(), client, bucket, path.Join(ns, tbl), config) + result, _, err := handler.ExpireSnapshots(context.Background(), client, bucket, path.Join(ns, tbl), config) require.NoError(t, err) assert.Contains(t, result, "expired") t.Logf("ExpireSnapshots result: %s", result) @@ -733,10 +733,15 @@ func testCompactDataFiles(t *testing.T) { MaxCommitRetries: 3, } - result, err := handler.CompactDataFiles(ctx, client, bucket, tablePath, config) + result, metrics, err := handler.CompactDataFiles(ctx, client, bucket, tablePath, config) require.NoError(t, err) assert.Contains(t, result, "compacted") - t.Logf("CompactDataFiles result: %s", result) + require.NotNil(t, metrics, "expected non-nil metrics from CompactDataFiles") + assert.Contains(t, metrics, icebergHandler.MetricFilesMerged) + assert.Contains(t, metrics, icebergHandler.MetricFilesWritten) + assert.Contains(t, metrics, icebergHandler.MetricBins) + assert.Contains(t, metrics, icebergHandler.MetricDurationMs) + t.Logf("CompactDataFiles result: %s, metrics: %v", result, metrics) var compacted *filer_pb.Entry listErr := filer_pb.SeaweedList(ctx, client, dataDir, "", func(entry *filer_pb.Entry, isLast bool) error { @@ -866,7 +871,7 @@ func testRemoveOrphans(t *testing.T) { MaxCommitRetries: 3, } - result, err := handler.RemoveOrphans(ctx, client, bucket, tablePath, config) + result, _, err := handler.RemoveOrphans(ctx, client, bucket, tablePath, config) require.NoError(t, err) assert.Contains(t, result, "removed") t.Logf("RemoveOrphans result: %s", result) @@ -902,7 +907,7 @@ func testRewriteManifests(t *testing.T) { } tablePath := path.Join(ns, tbl) - result, err := handler.RewriteManifests(context.Background(), client, bucket, tablePath, config) + result, _, err := handler.RewriteManifests(context.Background(), client, bucket, tablePath, config) require.NoError(t, err) assert.Contains(t, result, "below threshold") t.Logf("RewriteManifests result: %s", result) @@ -951,7 +956,7 @@ func testFullMaintenanceCycle(t *testing.T) { MaxSnapshotsToKeep: 1, MaxCommitRetries: 3, } - result, err := handler.ExpireSnapshots(ctx, client, bucket, tablePath, expireConfig) + result, _, err := handler.ExpireSnapshots(ctx, client, bucket, tablePath, expireConfig) require.NoError(t, err) assert.Contains(t, result, "expired") t.Logf("Step 1 (expire): %s", result) @@ -961,7 +966,7 @@ func testFullMaintenanceCycle(t *testing.T) { OrphanOlderThanHours: 72, MaxCommitRetries: 3, } - result, err = handler.RemoveOrphans(ctx, client, bucket, tablePath, orphanConfig) + result, _, err = handler.RemoveOrphans(ctx, client, bucket, tablePath, orphanConfig) require.NoError(t, err) t.Logf("Step 2 (orphans): %s", result) // The orphan and the unreferenced files from expired snapshots should be gone diff --git a/weed/plugin/worker/iceberg/compact.go b/weed/plugin/worker/iceberg/compact.go index d41d8c986..cf1bc440b 100644 --- a/weed/plugin/worker/iceberg/compact.go +++ b/weed/plugin/worker/iceberg/compact.go @@ -3,6 +3,8 @@ package iceberg import ( "bytes" "context" + "crypto/rand" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -22,10 +24,11 @@ import ( "google.golang.org/grpc/status" ) -// compactionBin groups small data files from the same partition for merging. +// compactionBin groups small data files from the same partition and spec for merging. type compactionBin struct { PartitionKey string Partition map[int]any + SpecID int32 Entries []iceberg.ManifestEntry TotalSize int64 } @@ -38,64 +41,116 @@ func (h *Handler) compactDataFiles( filerClient filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config, -) (string, error) { + onProgress func(binIdx, totalBins int), +) (string, map[string]int64, error) { + start := time.Now() meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath) if err != nil { - return "", fmt.Errorf("load metadata: %w", err) + return "", nil, fmt.Errorf("load metadata: %w", err) } currentSnap := meta.CurrentSnapshot() if currentSnap == nil || currentSnap.ManifestList == "" { - return "no current snapshot", nil + return "no current snapshot", nil, nil } // Read manifest list manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList) if err != nil { - return "", fmt.Errorf("read manifest list: %w", err) + return "", nil, fmt.Errorf("read manifest list: %w", err) } manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData)) if err != nil { - return "", fmt.Errorf("parse manifest list: %w", err) + return "", nil, fmt.Errorf("parse manifest list: %w", err) } - // Abort if delete manifests exist — the compactor does not apply deletes, - // so carrying them through could produce incorrect results. - // Also detect multiple partition specs — the compactor writes a single - // manifest under the current spec which is invalid for spec-evolved tables. - specIDs := make(map[int32]struct{}) + // Separate data manifests from delete manifests. + var dataManifests, deleteManifests []iceberg.ManifestFile for _, mf := range manifests { - if mf.ManifestContent() != iceberg.ManifestContentData { - return "compaction skipped: delete manifests present (not yet supported)", nil + if mf.ManifestContent() == iceberg.ManifestContentData { + dataManifests = append(dataManifests, mf) + } else { + deleteManifests = append(deleteManifests, mf) } - specIDs[mf.PartitionSpecID()] = struct{}{} } - if len(specIDs) > 1 { - return "compaction skipped: multiple partition specs present (not yet supported)", nil + + // If delete manifests exist and apply_deletes is disabled (or not yet + // implemented for this code path), skip compaction to avoid producing + // incorrect results by dropping deletes. + if len(deleteManifests) > 0 && !config.ApplyDeletes { + return "compaction skipped: delete manifests present and apply_deletes is disabled", nil, nil } // Collect data file entries from data manifests var allEntries []iceberg.ManifestEntry - for _, mf := range manifests { + for _, mf := range dataManifests { manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath()) if err != nil { - return "", fmt.Errorf("read manifest %s: %w", mf.FilePath(), err) + return "", nil, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err) } entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) if err != nil { - return "", fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err) + return "", nil, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err) } allEntries = append(allEntries, entries...) } + // Collect delete entries if we need to apply deletes + var positionDeletes map[string][]int64 + var eqDeleteGroups []equalityDeleteGroup + if config.ApplyDeletes && len(deleteManifests) > 0 { + var allDeleteEntries []iceberg.ManifestEntry + for _, mf := range deleteManifests { + manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath()) + if err != nil { + return "", nil, fmt.Errorf("read delete manifest %s: %w", mf.FilePath(), err) + } + entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) + if err != nil { + return "", nil, fmt.Errorf("parse delete manifest %s: %w", mf.FilePath(), err) + } + allDeleteEntries = append(allDeleteEntries, entries...) + } + + // Separate position and equality deletes + var posDeleteEntries, eqDeleteEntries []iceberg.ManifestEntry + for _, entry := range allDeleteEntries { + switch entry.DataFile().ContentType() { + case iceberg.EntryContentPosDeletes: + posDeleteEntries = append(posDeleteEntries, entry) + case iceberg.EntryContentEqDeletes: + eqDeleteEntries = append(eqDeleteEntries, entry) + } + } + + if len(posDeleteEntries) > 0 { + positionDeletes, err = collectPositionDeletes(ctx, filerClient, bucketName, tablePath, posDeleteEntries) + if err != nil { + return "", nil, fmt.Errorf("collect position deletes: %w", err) + } + } + + if len(eqDeleteEntries) > 0 { + eqDeleteGroups, err = collectEqualityDeletes(ctx, filerClient, bucketName, tablePath, eqDeleteEntries, meta.CurrentSchema()) + if err != nil { + return "", nil, fmt.Errorf("collect equality deletes: %w", err) + } + } + } + // Build compaction bins: group small files by partition // MinInputFiles is clamped by ParseConfig to [2, ...] so int conversion is safe. bins := buildCompactionBins(allEntries, config.TargetFileSizeBytes, int(config.MinInputFiles)) if len(bins) == 0 { - return "no files eligible for compaction", nil + return "no files eligible for compaction", nil, nil + } + + // Build a lookup from spec ID to PartitionSpec for per-bin manifest writing. + specByID := make(map[int]iceberg.PartitionSpec) + for _, ps := range meta.PartitionSpecs() { + specByID[ps.ID()] = ps } - spec := meta.PartitionSpec() schema := meta.CurrentSchema() version := meta.Version() snapshotID := currentSnap.SnapshotID @@ -103,6 +158,9 @@ func (h *Handler) compactDataFiles( // Compute the snapshot ID for the commit up front so all manifest entries // reference the same snapshot that will actually be committed. newSnapID := time.Now().UnixMilli() + // Random suffix for artifact filenames to avoid collisions between + // concurrent compaction runs on different tables sharing a timestamp. + artifactSuffix := compactRandomSuffix() // Process each bin: read source Parquet files, merge, write output var newManifestEntries []iceberg.ManifestEntry @@ -152,73 +210,87 @@ func (h *Handler) compactDataFiles( for binIdx, bin := range bins { select { case <-ctx.Done(): - return "", ctx.Err() + return "", nil, ctx.Err() default: } - mergedFileName := fmt.Sprintf("compact-%d-%d-%d.parquet", snapshotID, newSnapID, binIdx) + mergedFileName := fmt.Sprintf("compact-%d-%d-%s-%d.parquet", snapshotID, newSnapID, artifactSuffix, binIdx) mergedFilePath := path.Join("data", mergedFileName) - mergedData, recordCount, err := mergeParquetFiles(ctx, filerClient, bucketName, tablePath, bin.Entries) + mergedData, recordCount, err := mergeParquetFiles(ctx, filerClient, bucketName, tablePath, bin.Entries, positionDeletes, eqDeleteGroups, schema) if err != nil { glog.Warningf("iceberg compact: failed to merge bin %d (%d files): %v", binIdx, len(bin.Entries), err) - continue + goto binDone } // Write merged file to filer if err := ensureFilerDir(ctx, filerClient, dataDir); err != nil { - return "", fmt.Errorf("ensure data dir: %w", err) + return "", nil, fmt.Errorf("ensure data dir: %w", err) } if err := saveFilerFile(ctx, filerClient, dataDir, mergedFileName, mergedData); err != nil { - return "", fmt.Errorf("save merged file: %w", err) - } - - // Create new DataFile entry for the merged file - dfBuilder, err := iceberg.NewDataFileBuilder( - spec, - iceberg.EntryContentData, - mergedFilePath, - iceberg.ParquetFile, - bin.Partition, - nil, nil, - recordCount, - int64(len(mergedData)), - ) - if err != nil { - glog.Warningf("iceberg compact: failed to build data file entry for bin %d: %v", binIdx, err) - // Clean up the written file - _ = deleteFilerFile(ctx, filerClient, dataDir, mergedFileName) - continue + return "", nil, fmt.Errorf("save merged file: %w", err) } - writtenArtifacts = append(writtenArtifacts, artifact{dir: dataDir, fileName: mergedFileName}) - newEntry := iceberg.NewManifestEntry( - iceberg.EntryStatusADDED, - &newSnapID, - nil, nil, - dfBuilder.Build(), - ) - newManifestEntries = append(newManifestEntries, newEntry) + // Use the partition spec matching this bin's spec ID + { + binSpec, ok := specByID[int(bin.SpecID)] + if !ok { + glog.Warningf("iceberg compact: spec %d not found for bin %d, skipping", bin.SpecID, binIdx) + _ = deleteFilerFile(ctx, filerClient, dataDir, mergedFileName) + goto binDone + } + + // Create new DataFile entry for the merged file + dfBuilder, err := iceberg.NewDataFileBuilder( + binSpec, + iceberg.EntryContentData, + mergedFilePath, + iceberg.ParquetFile, + bin.Partition, + nil, nil, + recordCount, + int64(len(mergedData)), + ) + if err != nil { + glog.Warningf("iceberg compact: failed to build data file entry for bin %d: %v", binIdx, err) + _ = deleteFilerFile(ctx, filerClient, dataDir, mergedFileName) + goto binDone + } + writtenArtifacts = append(writtenArtifacts, artifact{dir: dataDir, fileName: mergedFileName}) - // Mark original entries as deleted - for _, entry := range bin.Entries { - delEntry := iceberg.NewManifestEntry( - iceberg.EntryStatusDELETED, + newEntry := iceberg.NewManifestEntry( + iceberg.EntryStatusADDED, &newSnapID, - entrySeqNum(entry), entryFileSeqNum(entry), - entry.DataFile(), + nil, nil, + dfBuilder.Build(), ) - deletedManifestEntries = append(deletedManifestEntries, delEntry) + newManifestEntries = append(newManifestEntries, newEntry) + + // Mark original entries as deleted + for _, entry := range bin.Entries { + delEntry := iceberg.NewManifestEntry( + iceberg.EntryStatusDELETED, + &newSnapID, + entrySeqNum(entry), entryFileSeqNum(entry), + entry.DataFile(), + ) + deletedManifestEntries = append(deletedManifestEntries, delEntry) + } + + totalMerged += len(bin.Entries) } - totalMerged += len(bin.Entries) + binDone: + if onProgress != nil { + onProgress(binIdx, len(bins)) + } } if len(newManifestEntries) == 0 { - return "no bins successfully compacted", nil + return "no bins successfully compacted", nil, nil } - // Build entries for the new manifest: + // Build entries for the new manifests: // - ADDED entries for merged files // - DELETED entries for original files // - EXISTING entries for files that weren't compacted @@ -227,11 +299,31 @@ func (h *Handler) compactDataFiles( compactedPaths[entry.DataFile().FilePath()] = struct{}{} } - var manifestEntries []iceberg.ManifestEntry - manifestEntries = append(manifestEntries, newManifestEntries...) - manifestEntries = append(manifestEntries, deletedManifestEntries...) + // Group all manifest entries by spec ID for per-spec manifest writing. + type specEntries struct { + specID int32 + entries []iceberg.ManifestEntry + } + specEntriesMap := make(map[int32]*specEntries) + + addToSpec := func(specID int32, entry iceberg.ManifestEntry) { + se, ok := specEntriesMap[specID] + if !ok { + se = &specEntries{specID: specID} + specEntriesMap[specID] = se + } + se.entries = append(se.entries, entry) + } - // Keep existing entries that weren't compacted + // New and deleted entries carry the spec ID from their bin + for _, entry := range newManifestEntries { + addToSpec(entry.DataFile().SpecID(), entry) + } + for _, entry := range deletedManifestEntries { + addToSpec(entry.DataFile().SpecID(), entry) + } + + // Existing entries that weren't compacted for _, entry := range allEntries { if _, compacted := compactedPaths[entry.DataFile().FilePath()]; !compacted { existingEntry := iceberg.NewManifestEntry( @@ -240,46 +332,87 @@ func (h *Handler) compactDataFiles( entrySeqNum(entry), entryFileSeqNum(entry), entry.DataFile(), ) - manifestEntries = append(manifestEntries, existingEntry) - } - } - - // Write new manifest - var manifestBuf bytes.Buffer - manifestFileName := fmt.Sprintf("compact-%d.avro", newSnapID) - newManifest, err := iceberg.WriteManifest( - path.Join("metadata", manifestFileName), - &manifestBuf, - version, - spec, - schema, - newSnapID, - manifestEntries, - ) - if err != nil { - return "", fmt.Errorf("write compact manifest: %w", err) + addToSpec(entry.DataFile().SpecID(), existingEntry) + } + } + + // Write one manifest per spec ID, iterating in sorted order for + // deterministic manifest list construction. + sortedSpecIDs := make([]int32, 0, len(specEntriesMap)) + for sid := range specEntriesMap { + sortedSpecIDs = append(sortedSpecIDs, sid) } + sort.Slice(sortedSpecIDs, func(i, j int) bool { return sortedSpecIDs[i] < sortedSpecIDs[j] }) + + var allManifests []iceberg.ManifestFile + for _, sid := range sortedSpecIDs { + se := specEntriesMap[sid] + ps, ok := specByID[int(se.specID)] + if !ok { + return "", nil, fmt.Errorf("partition spec %d not found in table metadata", se.specID) + } + + var manifestBuf bytes.Buffer + manifestFileName := fmt.Sprintf("compact-%d-%s-spec%d.avro", newSnapID, artifactSuffix, se.specID) + newManifest, err := iceberg.WriteManifest( + path.Join("metadata", manifestFileName), + &manifestBuf, + version, + ps, + schema, + newSnapID, + se.entries, + ) + if err != nil { + return "", nil, fmt.Errorf("write compact manifest for spec %d: %w", se.specID, err) + } - if err := saveFilerFile(ctx, filerClient, metaDir, manifestFileName, manifestBuf.Bytes()); err != nil { - return "", fmt.Errorf("save compact manifest: %w", err) + if err := saveFilerFile(ctx, filerClient, metaDir, manifestFileName, manifestBuf.Bytes()); err != nil { + return "", nil, fmt.Errorf("save compact manifest for spec %d: %w", se.specID, err) + } + writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestFileName}) + allManifests = append(allManifests, newManifest) } - writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestFileName}) - // Build manifest list with only the new manifest (the early abort at the - // top of this function guarantees no delete manifests are present). - allManifests := []iceberg.ManifestFile{newManifest} + // Carry forward delete manifests only if deletes were NOT applied. + // When deletes were applied, they've been consumed during the merge. + // Position deletes reference specific data files — if all those files + // were compacted, the deletes are fully consumed. Equality deletes + // apply broadly, so they're only consumed if all data files were compacted. + if !config.ApplyDeletes || (len(positionDeletes) == 0 && len(eqDeleteGroups) == 0) { + for _, mf := range deleteManifests { + allManifests = append(allManifests, mf) + } + } else { + // Check if any non-compacted data files remain + hasUncompactedFiles := false + for _, entry := range allEntries { + if _, compacted := compactedPaths[entry.DataFile().FilePath()]; !compacted { + hasUncompactedFiles = true + break + } + } + if hasUncompactedFiles { + // Some files weren't compacted — carry forward delete manifests + // since deletes may still apply to those files. + for _, mf := range deleteManifests { + allManifests = append(allManifests, mf) + } + } + // If all files were compacted, deletes are fully consumed — don't carry forward. + } // Write new manifest list var manifestListBuf bytes.Buffer seqNum := currentSnap.SequenceNumber + 1 err = iceberg.WriteManifestList(version, &manifestListBuf, newSnapID, &snapshotID, &seqNum, 0, allManifests) if err != nil { - return "", fmt.Errorf("write compact manifest list: %w", err) + return "", nil, fmt.Errorf("write compact manifest list: %w", err) } - manifestListFileName := fmt.Sprintf("snap-%d.avro", newSnapID) + manifestListFileName := fmt.Sprintf("snap-%d-%s.avro", newSnapID, artifactSuffix) if err := saveFilerFile(ctx, filerClient, metaDir, manifestListFileName, manifestListBuf.Bytes()); err != nil { - return "", fmt.Errorf("save compact manifest list: %w", err) + return "", nil, fmt.Errorf("save compact manifest list: %w", err) } writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestListFileName}) @@ -318,11 +451,17 @@ func (h *Handler) compactDataFiles( return builder.SetSnapshotRef(table.MainBranch, newSnapID, table.BranchRef) }) if err != nil { - return "", fmt.Errorf("commit compaction: %w", err) + return "", nil, fmt.Errorf("commit compaction: %w", err) } committed = true - return fmt.Sprintf("compacted %d files into %d (across %d bins)", totalMerged, len(newManifestEntries), len(bins)), nil + metrics := map[string]int64{ + MetricFilesMerged: int64(totalMerged), + MetricFilesWritten: int64(len(newManifestEntries)), + MetricBins: int64(len(bins)), + MetricDurationMs: time.Since(start).Milliseconds(), + } + return fmt.Sprintf("compacted %d files into %d (across %d bins)", totalMerged, len(newManifestEntries), len(bins)), metrics, nil } // buildCompactionBins groups small data files by partition for bin-packing. @@ -333,7 +472,8 @@ func buildCompactionBins(entries []iceberg.ManifestEntry, targetSize int64, minF minFiles = 2 } - // Group entries by partition key + // Group entries by spec ID + partition key so that files from different + // partition specs are never mixed in the same compaction bin. groups := make(map[string]*compactionBin) for _, entry := range entries { df := entry.DataFile() @@ -345,13 +485,15 @@ func buildCompactionBins(entries []iceberg.ManifestEntry, targetSize int64, minF } partKey := partitionKey(df.Partition()) - bin, ok := groups[partKey] + groupKey := fmt.Sprintf("spec%d\x00%s", df.SpecID(), partKey) + bin, ok := groups[groupKey] if !ok { bin = &compactionBin{ PartitionKey: partKey, Partition: df.Partition(), + SpecID: df.SpecID(), } - groups[partKey] = bin + groups[groupKey] = bin } bin.Entries = append(bin.Entries, entry) bin.TotalSize += df.FileSizeBytes() @@ -370,8 +512,11 @@ func buildCompactionBins(entries []iceberg.ManifestEntry, targetSize int64, minF } } - // Sort by partition key for deterministic order + // Sort by spec ID then partition key for deterministic order sort.Slice(result, func(i, j int) bool { + if result[i].SpecID != result[j].SpecID { + return result[i].SpecID < result[j].SpecID + } return result[i].PartitionKey < result[j].PartitionKey }) @@ -394,6 +539,7 @@ func splitOversizedBin(bin compactionBin, targetSize int64, minFiles int) []comp current := compactionBin{ PartitionKey: bin.PartitionKey, Partition: bin.Partition, + SpecID: bin.SpecID, } for _, entry := range sorted { if current.TotalSize > 0 && current.TotalSize+entry.DataFile().FileSizeBytes() > targetSize { @@ -401,6 +547,7 @@ func splitOversizedBin(bin compactionBin, targetSize int64, minFiles int) []comp current = compactionBin{ PartitionKey: bin.PartitionKey, Partition: bin.Partition, + SpecID: bin.SpecID, } } current.Entries = append(current.Entries, entry) @@ -474,17 +621,251 @@ func partitionKey(partition map[int]any) string { return strings.Join(parts, "\x00") } +// collectPositionDeletes reads position delete Parquet files and returns a map +// from normalized data file path to sorted row positions that should be deleted. +// Paths are normalized so that absolute S3 URLs and relative paths match. +func collectPositionDeletes( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + deleteEntries []iceberg.ManifestEntry, +) (map[string][]int64, error) { + result := make(map[string][]int64) + for _, entry := range deleteEntries { + if entry.DataFile().ContentType() != iceberg.EntryContentPosDeletes { + continue + } + fileDeletes, err := readPositionDeleteFile(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath()) + if err != nil { + return nil, fmt.Errorf("read position delete file %s: %w", entry.DataFile().FilePath(), err) + } + for filePath, positions := range fileDeletes { + normalized := normalizeIcebergPath(filePath, bucketName, tablePath) + result[normalized] = append(result[normalized], positions...) + } + } + // Sort positions for each file (binary search during filtering) + for filePath := range result { + sort.Slice(result[filePath], func(i, j int) bool { + return result[filePath][i] < result[filePath][j] + }) + } + return result, nil +} + +// readPositionDeleteFile reads a position delete Parquet file and returns a map +// from data file path to row positions. The file must have columns "file_path" +// (string) and "pos" (int32 or int64). +func readPositionDeleteFile( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath, filePath string, +) (map[string][]int64, error) { + data, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, filePath) + if err != nil { + return nil, err + } + reader := parquet.NewReader(bytes.NewReader(data)) + defer reader.Close() + + pqSchema := reader.Schema() + filePathIdx := -1 + posIdx := -1 + for i, col := range pqSchema.Columns() { + name := strings.Join(col, ".") + switch name { + case "file_path": + filePathIdx = i + case "pos": + posIdx = i + } + } + if filePathIdx < 0 || posIdx < 0 { + return nil, fmt.Errorf("position delete file %s missing required columns (file_path=%d, pos=%d)", filePath, filePathIdx, posIdx) + } + + result := make(map[string][]int64) + rows := make([]parquet.Row, 256) + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + n, readErr := reader.ReadRows(rows) + for i := 0; i < n; i++ { + row := rows[i] + fp := row[filePathIdx].String() + pos := row[posIdx].Int64() + result[fp] = append(result[fp], pos) + } + if readErr != nil { + if readErr == io.EOF { + break + } + return nil, readErr + } + } + return result, nil +} + +// equalityDeleteGroup holds a set of delete keys for a specific set of equality field IDs. +// Different equality delete files may use different field IDs, so deletes are grouped. +type equalityDeleteGroup struct { + FieldIDs []int + Keys map[string]struct{} +} + +// collectEqualityDeletes reads equality delete Parquet files and returns groups +// of delete keys, one per distinct set of equality field IDs. This correctly +// handles the case where different delete files use different equality columns. +func collectEqualityDeletes( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + deleteEntries []iceberg.ManifestEntry, + schema *iceberg.Schema, +) ([]equalityDeleteGroup, error) { + type groupState struct { + fieldIDs []int + keys map[string]struct{} + } + groups := make(map[string]*groupState) + + for _, entry := range deleteEntries { + if entry.DataFile().ContentType() != iceberg.EntryContentEqDeletes { + continue + } + eqFieldIDs := entry.DataFile().EqualityFieldIDs() + if len(eqFieldIDs) == 0 { + continue + } + + groupKey := fmt.Sprint(eqFieldIDs) + gs, ok := groups[groupKey] + if !ok { + gs = &groupState{fieldIDs: eqFieldIDs, keys: make(map[string]struct{})} + groups[groupKey] = gs + } + + keys, err := readEqualityDeleteFile(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath(), eqFieldIDs, schema) + if err != nil { + return nil, fmt.Errorf("read equality delete file %s: %w", entry.DataFile().FilePath(), err) + } + for k := range keys { + gs.keys[k] = struct{}{} + } + } + + result := make([]equalityDeleteGroup, 0, len(groups)) + for _, gs := range groups { + result = append(result, equalityDeleteGroup{FieldIDs: gs.fieldIDs, Keys: gs.keys}) + } + return result, nil +} + +// readEqualityDeleteFile reads an equality delete Parquet file and returns a set +// of composite keys built from the specified field IDs. The Iceberg schema is used +// to map field IDs to column names, which are then looked up in the Parquet schema. +func readEqualityDeleteFile( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath, filePath string, + fieldIDs []int, + icebergSchema *iceberg.Schema, +) (map[string]struct{}, error) { + data, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, filePath) + if err != nil { + return nil, err + } + reader := parquet.NewReader(bytes.NewReader(data)) + defer reader.Close() + + colIndices, err := resolveEqualityColIndices(reader.Schema(), fieldIDs, icebergSchema) + if err != nil { + return nil, fmt.Errorf("resolve columns in %s: %w", filePath, err) + } + + result := make(map[string]struct{}) + rows := make([]parquet.Row, 256) + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + n, readErr := reader.ReadRows(rows) + for i := 0; i < n; i++ { + key := buildEqualityKey(rows[i], colIndices) + result[key] = struct{}{} + } + if readErr != nil { + if readErr == io.EOF { + break + } + return nil, readErr + } + } + return result, nil +} + +// buildEqualityKey builds a composite string key from specific column values +// in a row. Each value is serialized as "kind:length:value" to avoid ambiguity +// between types (e.g., int 123 vs string "123") and to prevent collisions from +// values containing separator characters. +func buildEqualityKey(row parquet.Row, colIndices []int) string { + if len(colIndices) == 1 { + v := row[colIndices[0]] + s := v.String() + return fmt.Sprintf("%d:%d:%s", v.Kind(), len(s), s) + } + var b strings.Builder + for _, idx := range colIndices { + v := row[idx] + s := v.String() + fmt.Fprintf(&b, "%d:%d:%s", v.Kind(), len(s), s) + } + return b.String() +} + +// resolveEqualityColIndices maps Iceberg field IDs to Parquet column indices. +func resolveEqualityColIndices(pqSchema *parquet.Schema, fieldIDs []int, icebergSchema *iceberg.Schema) ([]int, error) { + if len(fieldIDs) == 0 { + return nil, nil + } + + colNameToIdx := make(map[string]int) + for i, col := range pqSchema.Columns() { + colNameToIdx[strings.Join(col, ".")] = i + } + + indices := make([]int, len(fieldIDs)) + for i, fid := range fieldIDs { + field, ok := icebergSchema.FindFieldByID(fid) + if !ok { + return nil, fmt.Errorf("field ID %d not found in iceberg schema", fid) + } + idx, ok := colNameToIdx[field.Name] + if !ok { + return nil, fmt.Errorf("column %q (field ID %d) not found in parquet schema", field.Name, fid) + } + indices[i] = idx + } + return indices, nil +} + // mergeParquetFiles reads multiple small Parquet files and merges them into -// a single Parquet file. Files are processed one at a time: each source file -// is loaded, its rows are streamed into the output writer, and then its data -// is released before the next file is loaded. This keeps peak memory -// proportional to the size of a single input file plus the output buffer, -// rather than the sum of all inputs. +// a single Parquet file, optionally filtering out rows matching position or +// equality deletes. Files are processed one at a time to keep memory usage +// proportional to a single input file plus the output buffer. func mergeParquetFiles( ctx context.Context, filerClient filer_pb.SeaweedFilerClient, bucketName, tablePath string, entries []iceberg.ManifestEntry, + positionDeletes map[string][]int64, + eqDeleteGroups []equalityDeleteGroup, + icebergSchema *iceberg.Schema, ) ([]byte, int64, error) { if len(entries) == 0 { return nil, 0, fmt.Errorf("no entries to merge") @@ -502,15 +883,42 @@ func mergeParquetFiles( return nil, 0, fmt.Errorf("no parquet schema found in %s", entries[0].DataFile().FilePath()) } + // Resolve equality delete column indices for each group. + type resolvedEqGroup struct { + colIndices []int + keys map[string]struct{} + } + var resolvedEqGroups []resolvedEqGroup + if len(eqDeleteGroups) > 0 && icebergSchema != nil { + for _, g := range eqDeleteGroups { + indices, resolveErr := resolveEqualityColIndices(parquetSchema, g.FieldIDs, icebergSchema) + if resolveErr != nil { + firstReader.Close() + return nil, 0, fmt.Errorf("resolve equality columns: %w", resolveErr) + } + resolvedEqGroups = append(resolvedEqGroups, resolvedEqGroup{colIndices: indices, keys: g.Keys}) + } + } + var outputBuf bytes.Buffer writer := parquet.NewWriter(&outputBuf, parquetSchema) - // drainReader streams all rows from reader into writer, then closes reader. - // source identifies the input file for error messages. var totalRows int64 rows := make([]parquet.Row, 256) + hasEqDeletes := len(resolvedEqGroups) > 0 + + // drainReader streams rows from reader into writer, filtering out deleted + // rows. source is the data file path (used for error messages and + // position delete lookups). drainReader := func(reader *parquet.Reader, source string) error { defer reader.Close() + + // Normalize source path so it matches the normalized keys in positionDeletes. + normalizedSource := normalizeIcebergPath(source, bucketName, tablePath) + posDeletes := positionDeletes[normalizedSource] + posDeleteIdx := 0 + var absolutePos int64 + for { select { case <-ctx.Done(): @@ -519,10 +927,52 @@ func mergeParquetFiles( } n, readErr := reader.ReadRows(rows) if n > 0 { - if _, writeErr := writer.WriteRows(rows[:n]); writeErr != nil { - return fmt.Errorf("write rows from %s: %w", source, writeErr) + // Filter rows if we have any deletes + if len(posDeletes) > 0 || hasEqDeletes { + writeIdx := 0 + for i := 0; i < n; i++ { + rowPos := absolutePos + int64(i) + + // Check position deletes (sorted, so advance index) + if len(posDeletes) > 0 { + for posDeleteIdx < len(posDeletes) && posDeletes[posDeleteIdx] < rowPos { + posDeleteIdx++ + } + if posDeleteIdx < len(posDeletes) && posDeletes[posDeleteIdx] == rowPos { + posDeleteIdx++ + continue // skip this row + } + } + + // Check equality deletes — each group independently + deleted := false + for _, g := range resolvedEqGroups { + key := buildEqualityKey(rows[i], g.colIndices) + if _, ok := g.keys[key]; ok { + deleted = true + break + } + } + if deleted { + continue // skip this row + } + + rows[writeIdx] = rows[i] + writeIdx++ + } + absolutePos += int64(n) + if writeIdx > 0 { + if _, writeErr := writer.WriteRows(rows[:writeIdx]); writeErr != nil { + return fmt.Errorf("write rows from %s: %w", source, writeErr) + } + totalRows += int64(writeIdx) + } + } else { + if _, writeErr := writer.WriteRows(rows[:n]); writeErr != nil { + return fmt.Errorf("write rows from %s: %w", source, writeErr) + } + totalRows += int64(n) } - totalRows += int64(n) } if readErr != nil { if readErr == io.EOF { @@ -577,6 +1027,16 @@ func mergeParquetFiles( return outputBuf.Bytes(), totalRows, nil } +// compactRandomSuffix returns a short random hex string for use in artifact +// filenames to prevent collisions between concurrent runs. +func compactRandomSuffix() string { + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + return fmt.Sprintf("%x", time.Now().UnixNano()&0xFFFFFFFF) + } + return hex.EncodeToString(b) +} + // schemasEqual compares two parquet schemas structurally. func schemasEqual(a, b *parquet.Schema) bool { if a == b { diff --git a/weed/plugin/worker/iceberg/config.go b/weed/plugin/worker/iceberg/config.go index bf9c1b06e..45ab867ff 100644 --- a/weed/plugin/worker/iceberg/config.go +++ b/weed/plugin/worker/iceberg/config.go @@ -20,6 +20,17 @@ const ( defaultMinInputFiles = 5 defaultMinManifestsToRewrite = 5 defaultOperations = "all" + + // Metric keys returned by maintenance operations. + MetricFilesMerged = "files_merged" + MetricFilesWritten = "files_written" + MetricBins = "bins" + MetricSnapshotsExpired = "snapshots_expired" + MetricFilesDeleted = "files_deleted" + MetricOrphansRemoved = "orphans_removed" + MetricManifestsRewritten = "manifests_rewritten" + MetricEntriesTotal = "entries_total" + MetricDurationMs = "duration_ms" ) // Config holds parsed worker config values. @@ -32,6 +43,7 @@ type Config struct { MinInputFiles int64 MinManifestsToRewrite int64 Operations string + ApplyDeletes bool } // ParseConfig extracts an iceberg maintenance Config from plugin config values. @@ -46,6 +58,7 @@ func ParseConfig(values map[string]*plugin_pb.ConfigValue) Config { MinInputFiles: readInt64Config(values, "min_input_files", defaultMinInputFiles), MinManifestsToRewrite: readInt64Config(values, "min_manifests_to_rewrite", defaultMinManifestsToRewrite), Operations: readStringConfig(values, "operations", defaultOperations), + ApplyDeletes: readBoolConfig(values, "apply_deletes", true), } // Clamp to safe minimums using the default constants @@ -153,6 +166,35 @@ func readStringConfig(values map[string]*plugin_pb.ConfigValue, field string, fa return fallback } +// readBoolConfig reads a bool value from plugin config, with fallback. +func readBoolConfig(values map[string]*plugin_pb.ConfigValue, field string, fallback bool) bool { + if values == nil { + return fallback + } + value := values[field] + if value == nil { + return fallback + } + switch kind := value.Kind.(type) { + case *plugin_pb.ConfigValue_BoolValue: + return kind.BoolValue + case *plugin_pb.ConfigValue_StringValue: + s := strings.TrimSpace(strings.ToLower(kind.StringValue)) + if s == "true" || s == "1" || s == "yes" { + return true + } + if s == "false" || s == "0" || s == "no" { + return false + } + glog.V(1).Infof("readBoolConfig: unrecognized string value %q for field %q, using fallback %v", kind.StringValue, field, fallback) + case *plugin_pb.ConfigValue_Int64Value: + return kind.Int64Value != 0 + default: + glog.V(1).Infof("readBoolConfig: unexpected config value type %T for field %q, using fallback", value.Kind, field) + } + return fallback +} + // readInt64Config reads an int64 value from plugin config, with fallback. func readInt64Config(values map[string]*plugin_pb.ConfigValue, field string, fallback int64) int64 { if values == nil { diff --git a/weed/plugin/worker/iceberg/exec_test.go b/weed/plugin/worker/iceberg/exec_test.go index 963ac5248..c59edea46 100644 --- a/weed/plugin/worker/iceberg/exec_test.go +++ b/weed/plugin/worker/iceberg/exec_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net" "path" "sort" @@ -15,6 +16,7 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/table" + "github.com/parquet-go/parquet-go" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" @@ -420,7 +422,7 @@ func TestExpireSnapshotsExecution(t *testing.T) { Operations: "expire_snapshots", } - result, err := handler.expireSnapshots(context.Background(), client, setup.BucketName, setup.tablePath(), config) + result, _, err := handler.expireSnapshots(context.Background(), client, setup.BucketName, setup.tablePath(), config) if err != nil { t.Fatalf("expireSnapshots failed: %v", err) } @@ -460,7 +462,7 @@ func TestExpireSnapshotsNothingToExpire(t *testing.T) { MaxCommitRetries: 3, } - result, err := handler.expireSnapshots(context.Background(), client, setup.BucketName, setup.tablePath(), config) + result, _, err := handler.expireSnapshots(context.Background(), client, setup.BucketName, setup.tablePath(), config) if err != nil { t.Fatalf("expireSnapshots failed: %v", err) } @@ -508,7 +510,7 @@ func TestRemoveOrphansExecution(t *testing.T) { MaxCommitRetries: 3, } - result, err := handler.removeOrphans(context.Background(), client, setup.BucketName, setup.tablePath(), config) + result, _, err := handler.removeOrphans(context.Background(), client, setup.BucketName, setup.tablePath(), config) if err != nil { t.Fatalf("removeOrphans failed: %v", err) } @@ -550,7 +552,7 @@ func TestRemoveOrphansPreservesReferencedFiles(t *testing.T) { MaxCommitRetries: 3, } - result, err := handler.removeOrphans(context.Background(), client, setup.BucketName, setup.tablePath(), config) + result, _, err := handler.removeOrphans(context.Background(), client, setup.BucketName, setup.tablePath(), config) if err != nil { t.Fatalf("removeOrphans failed: %v", err) } @@ -642,7 +644,7 @@ func TestRewriteManifestsExecution(t *testing.T) { MaxCommitRetries: 3, } - result, err := handler.rewriteManifests(context.Background(), client, setup.BucketName, setup.tablePath(), config) + result, _, err := handler.rewriteManifests(context.Background(), client, setup.BucketName, setup.tablePath(), config) if err != nil { t.Fatalf("rewriteManifests failed: %v", err) } @@ -688,7 +690,7 @@ func TestRewriteManifestsBelowThreshold(t *testing.T) { MaxCommitRetries: 3, } - result, err := handler.rewriteManifests(context.Background(), client, setup.BucketName, setup.tablePath(), config) + result, _, err := handler.rewriteManifests(context.Background(), client, setup.BucketName, setup.tablePath(), config) if err != nil { t.Fatalf("rewriteManifests failed: %v", err) } @@ -762,11 +764,11 @@ func TestFullExecuteFlow(t *testing.T) { var opErr error switch op { case "expire_snapshots": - opResult, opErr = handler.expireSnapshots(context.Background(), client, setup.BucketName, setup.tablePath(), workerConfig) + opResult, _, opErr = handler.expireSnapshots(context.Background(), client, setup.BucketName, setup.tablePath(), workerConfig) case "remove_orphans": - opResult, opErr = handler.removeOrphans(context.Background(), client, setup.BucketName, setup.tablePath(), workerConfig) + opResult, _, opErr = handler.removeOrphans(context.Background(), client, setup.BucketName, setup.tablePath(), workerConfig) case "rewrite_manifests": - opResult, opErr = handler.rewriteManifests(context.Background(), client, setup.BucketName, setup.tablePath(), workerConfig) + opResult, _, opErr = handler.rewriteManifests(context.Background(), client, setup.BucketName, setup.tablePath(), workerConfig) } if opErr != nil { t.Fatalf("operation %s failed: %v", op, opErr) @@ -1015,3 +1017,791 @@ func TestMetadataVersionCAS(t *testing.T) { t.Errorf("expected version 2 after update, got %d", version) } } + +// --------------------------------------------------------------------------- +// Avro manifest content patching for tests +// --------------------------------------------------------------------------- + +// patchManifestContentToDeletes performs a binary patch on an Avro manifest +// file to change the "content" metadata value from "data" to "deletes". +// This workaround is needed because iceberg-go's WriteManifest API always +// sets content="data" and provides no way to create delete manifests. +// The function validates the pattern was found (bytes.Equal check) and fails +// fast if not, so breakage from encoding changes is caught immediately. +// +// In Avro OCF encoding, strings are stored as zigzag-encoded length + bytes. +// "content" (7 chars) = \x0e + "content", "data" (4 chars) = \x08 + "data", +// "deletes" (7 chars) = \x0e + "deletes". +func patchManifestContentToDeletes(t *testing.T, manifestBytes []byte) []byte { + t.Helper() + + // Pattern: zigzag(7)="content" zigzag(4)="data" + old := append([]byte{0x0e}, []byte("content")...) + old = append(old, 0x08) + old = append(old, []byte("data")...) + + // Replacement: zigzag(7)="content" zigzag(7)="deletes" + new := append([]byte{0x0e}, []byte("content")...) + new = append(new, 0x0e) + new = append(new, []byte("deletes")...) + + result := bytes.Replace(manifestBytes, old, new, 1) + if bytes.Equal(result, manifestBytes) { + t.Fatal("patchManifestContentToDeletes: pattern not found in manifest bytes") + } + return result +} + +// --------------------------------------------------------------------------- +// End-to-end compaction tests with deletes +// --------------------------------------------------------------------------- + +// writeTestParquetFile creates a Parquet file with id/name columns in the fake filer. +func writeTestParquetFile(t *testing.T, fs *fakeFilerServer, dir, name string, rows []struct { + ID int64 + Name string +}) []byte { + t.Helper() + type dataRow struct { + ID int64 `parquet:"id"` + Name string `parquet:"name"` + } + var buf bytes.Buffer + w := parquet.NewWriter(&buf, parquet.SchemaOf(new(dataRow))) + for _, r := range rows { + if err := w.Write(&dataRow{r.ID, r.Name}); err != nil { + t.Fatalf("write parquet row: %v", err) + } + } + if err := w.Close(); err != nil { + t.Fatalf("close parquet writer: %v", err) + } + data := buf.Bytes() + fs.putEntry(dir, name, &filer_pb.Entry{ + Name: name, + Content: data, + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(len(data))}, + }) + return data +} + +// populateTableWithDeleteFiles sets up a table with data files and delete manifest(s) +// for compaction testing. Returns the table metadata. +func populateTableWithDeleteFiles( + t *testing.T, + fs *fakeFilerServer, + setup tableSetup, + dataFiles []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }, + posDeleteFiles []struct { + Name string + Rows []struct { + FilePath string + Pos int64 + } + }, + eqDeleteFiles []struct { + Name string + FieldIDs []int + Rows []struct { + ID int64 + Name string + } + }, +) table.Metadata { + t.Helper() + + schema := newTestSchema() + spec := *iceberg.UnpartitionedSpec + + meta, err := table.NewMetadata(schema, &spec, table.UnsortedSortOrder, "s3://"+setup.BucketName+"/"+setup.tablePath(), nil) + if err != nil { + t.Fatalf("create metadata: %v", err) + } + + bucketsPath := s3tables.TablesPath + bucketPath := path.Join(bucketsPath, setup.BucketName) + nsPath := path.Join(bucketPath, setup.Namespace) + tableFilerPath := path.Join(nsPath, setup.TableName) + metaDir := path.Join(tableFilerPath, "metadata") + dataDir := path.Join(tableFilerPath, "data") + + version := meta.Version() + + // Write data files + var dataManifestEntries []iceberg.ManifestEntry + for _, df := range dataFiles { + data := writeTestParquetFile(t, fs, dataDir, df.Name, df.Rows) + dfb, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentData, "data/"+df.Name, iceberg.ParquetFile, map[int]any{}, nil, nil, int64(len(df.Rows)), int64(len(data))) + if err != nil { + t.Fatalf("build data file %s: %v", df.Name, err) + } + snapID := int64(1) + dataManifestEntries = append(dataManifestEntries, iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfb.Build())) + } + + // Write data manifest + var dataManifestBuf bytes.Buffer + dataManifestName := "data-manifest-1.avro" + dataMf, err := iceberg.WriteManifest(path.Join("metadata", dataManifestName), &dataManifestBuf, version, spec, schema, 1, dataManifestEntries) + if err != nil { + t.Fatalf("write data manifest: %v", err) + } + fs.putEntry(metaDir, dataManifestName, &filer_pb.Entry{ + Name: dataManifestName, Content: dataManifestBuf.Bytes(), + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(dataManifestBuf.Len())}, + }) + + allManifests := []iceberg.ManifestFile{dataMf} + + // Write position delete files and manifests + if len(posDeleteFiles) > 0 { + var posDeleteEntries []iceberg.ManifestEntry + for _, pdf := range posDeleteFiles { + type posRow struct { + FilePath string `parquet:"file_path"` + Pos int64 `parquet:"pos"` + } + var buf bytes.Buffer + w := parquet.NewWriter(&buf, parquet.SchemaOf(new(posRow))) + for _, r := range pdf.Rows { + if err := w.Write(&posRow{r.FilePath, r.Pos}); err != nil { + t.Fatalf("write pos delete: %v", err) + } + } + if err := w.Close(); err != nil { + t.Fatalf("close pos delete: %v", err) + } + fs.putEntry(dataDir, pdf.Name, &filer_pb.Entry{ + Name: pdf.Name, Content: buf.Bytes(), + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(buf.Len())}, + }) + dfb, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentPosDeletes, "data/"+pdf.Name, iceberg.ParquetFile, map[int]any{}, nil, nil, int64(len(pdf.Rows)), int64(buf.Len())) + if err != nil { + t.Fatalf("build pos delete file: %v", err) + } + snapID := int64(1) + posDeleteEntries = append(posDeleteEntries, iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfb.Build())) + } + + // WriteManifest always sets content="data", so we patch the Avro + // metadata to "deletes" and build a ManifestFile with the right content type. + var posManifestBuf bytes.Buffer + posManifestName := "pos-delete-manifest-1.avro" + posManifestPath := path.Join("metadata", posManifestName) + _, err := iceberg.WriteManifest(posManifestPath, &posManifestBuf, version, spec, schema, 1, posDeleteEntries) + if err != nil { + t.Fatalf("write pos delete manifest: %v", err) + } + patchedBytes := patchManifestContentToDeletes(t, posManifestBuf.Bytes()) + fs.putEntry(metaDir, posManifestName, &filer_pb.Entry{ + Name: posManifestName, Content: patchedBytes, + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(len(patchedBytes))}, + }) + posMf := iceberg.NewManifestFile(version, posManifestPath, int64(len(patchedBytes)), int32(spec.ID()), 1). + Content(iceberg.ManifestContentDeletes). + AddedFiles(int32(len(posDeleteEntries))). + AddedRows(int64(len(posDeleteFiles[0].Rows))). + Build() + allManifests = append(allManifests, posMf) + } + + // Write equality delete files and manifests + if len(eqDeleteFiles) > 0 { + var eqDeleteEntries []iceberg.ManifestEntry + for _, edf := range eqDeleteFiles { + type eqRow struct { + ID int64 `parquet:"id"` + Name string `parquet:"name"` + } + var buf bytes.Buffer + w := parquet.NewWriter(&buf, parquet.SchemaOf(new(eqRow))) + for _, r := range edf.Rows { + if err := w.Write(&eqRow{r.ID, r.Name}); err != nil { + t.Fatalf("write eq delete: %v", err) + } + } + if err := w.Close(); err != nil { + t.Fatalf("close eq delete: %v", err) + } + fs.putEntry(dataDir, edf.Name, &filer_pb.Entry{ + Name: edf.Name, Content: buf.Bytes(), + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(buf.Len())}, + }) + dfb, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentEqDeletes, "data/"+edf.Name, iceberg.ParquetFile, map[int]any{}, nil, nil, int64(len(edf.Rows)), int64(buf.Len())) + if err != nil { + t.Fatalf("build eq delete file: %v", err) + } + dfb.EqualityFieldIDs(edf.FieldIDs) + snapID := int64(1) + eqDeleteEntries = append(eqDeleteEntries, iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfb.Build())) + } + + var eqManifestBuf bytes.Buffer + eqManifestName := "eq-delete-manifest-1.avro" + eqManifestPath := path.Join("metadata", eqManifestName) + _, err := iceberg.WriteManifest(eqManifestPath, &eqManifestBuf, version, spec, schema, 1, eqDeleteEntries) + if err != nil { + t.Fatalf("write eq delete manifest: %v", err) + } + patchedBytes := patchManifestContentToDeletes(t, eqManifestBuf.Bytes()) + fs.putEntry(metaDir, eqManifestName, &filer_pb.Entry{ + Name: eqManifestName, Content: patchedBytes, + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(len(patchedBytes))}, + }) + eqMf := iceberg.NewManifestFile(version, eqManifestPath, int64(len(patchedBytes)), int32(spec.ID()), 1). + Content(iceberg.ManifestContentDeletes). + AddedFiles(int32(len(eqDeleteEntries))). + AddedRows(int64(len(eqDeleteFiles[0].Rows))). + Build() + allManifests = append(allManifests, eqMf) + } + + // Write manifest list + var mlBuf bytes.Buffer + seqNum := int64(1) + if err := iceberg.WriteManifestList(version, &mlBuf, 1, nil, &seqNum, 0, allManifests); err != nil { + t.Fatalf("write manifest list: %v", err) + } + fs.putEntry(metaDir, "snap-1.avro", &filer_pb.Entry{ + Name: "snap-1.avro", Content: mlBuf.Bytes(), + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(mlBuf.Len())}, + }) + + // Build final metadata with snapshot + now := time.Now().UnixMilli() + snap := table.Snapshot{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"} + meta = buildTestMetadata(t, []table.Snapshot{snap}) + + // Register table structure + fullMetadataJSON, _ := json.Marshal(meta) + internalMeta := map[string]interface{}{ + "metadataVersion": 1, + "metadata": map[string]interface{}{"fullMetadata": json.RawMessage(fullMetadataJSON)}, + } + xattr, _ := json.Marshal(internalMeta) + + fs.putEntry(bucketsPath, setup.BucketName, &filer_pb.Entry{ + Name: setup.BucketName, IsDirectory: true, + Extended: map[string][]byte{s3tables.ExtendedKeyTableBucket: []byte("true")}, + }) + fs.putEntry(bucketPath, setup.Namespace, &filer_pb.Entry{Name: setup.Namespace, IsDirectory: true}) + fs.putEntry(nsPath, setup.TableName, &filer_pb.Entry{ + Name: setup.TableName, IsDirectory: true, + Extended: map[string][]byte{s3tables.ExtendedKeyMetadata: xattr}, + }) + + return meta +} + +func TestCompactDataFilesMetrics(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFiles(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{1, "a"}, {2, "b"}}}, + {"d2.parquet", []struct { + ID int64 + Name string + }{{3, "c"}}}, + }, + nil, nil, + ) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 256 * 1024 * 1024, + MinInputFiles: 2, + MaxCommitRetries: 3, + ApplyDeletes: true, + } + + // Track progress callbacks + var progressCalls []int + result, metrics, err := handler.compactDataFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config, func(binIdx, totalBins int) { + progressCalls = append(progressCalls, binIdx) + }) + if err != nil { + t.Fatalf("compactDataFiles: %v", err) + } + + if !strings.Contains(result, "compacted") { + t.Errorf("expected compaction result, got %q", result) + } + + // Verify metrics + if metrics == nil { + t.Fatal("expected non-nil metrics") + } + if metrics[MetricFilesMerged] != 2 { + t.Errorf("expected files_merged=2, got %d", metrics[MetricFilesMerged]) + } + if metrics[MetricFilesWritten] != 1 { + t.Errorf("expected files_written=1, got %d", metrics[MetricFilesWritten]) + } + if metrics[MetricBins] != 1 { + t.Errorf("expected bins=1, got %d", metrics[MetricBins]) + } + if metrics[MetricDurationMs] < 0 { + t.Errorf("expected non-negative duration_ms, got %d", metrics[MetricDurationMs]) + } + + // Verify progress callback was invoked + if len(progressCalls) != 1 { + t.Errorf("expected 1 progress call, got %d", len(progressCalls)) + } +} + +func TestExpireSnapshotsMetrics(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "ns", + TableName: "tbl", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + {SnapshotID: 2, TimestampMs: now + 1, ManifestList: "metadata/snap-2.avro"}, + {SnapshotID: 3, TimestampMs: now + 2, ManifestList: "metadata/snap-3.avro"}, + }, + } + populateTable(t, fs, setup) + + handler := NewHandler(nil) + config := Config{ + SnapshotRetentionHours: 0, + MaxSnapshotsToKeep: 1, + MaxCommitRetries: 3, + } + + _, metrics, err := handler.expireSnapshots(context.Background(), client, setup.BucketName, setup.tablePath(), config) + if err != nil { + t.Fatalf("expireSnapshots: %v", err) + } + + if metrics == nil { + t.Fatal("expected non-nil metrics") + } + if metrics[MetricSnapshotsExpired] == 0 { + t.Error("expected snapshots_expired > 0") + } + if metrics[MetricDurationMs] < 0 { + t.Errorf("expected non-negative duration_ms, got %d", metrics[MetricDurationMs]) + } +} + +func TestExecuteCompletionOutputValues(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "ns", + TableName: "tbl", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + {SnapshotID: 2, TimestampMs: now + 1, ManifestList: "metadata/snap-2.avro"}, + {SnapshotID: 3, TimestampMs: now + 2, ManifestList: "metadata/snap-3.avro"}, + }, + } + populateTable(t, fs, setup) + + handler := NewHandler(nil) + config := Config{ + SnapshotRetentionHours: 0, + MaxSnapshotsToKeep: 1, + MaxCommitRetries: 3, + } + + _, metrics, err := handler.expireSnapshots(context.Background(), client, setup.BucketName, setup.tablePath(), config) + if err != nil { + t.Fatalf("expireSnapshots: %v", err) + } + + // Verify metrics have the expected keys + if _, ok := metrics[MetricSnapshotsExpired]; !ok { + t.Error("expected 'snapshots_expired' key in metrics") + } + if _, ok := metrics[MetricFilesDeleted]; !ok { + t.Error("expected 'files_deleted' key in metrics") + } + if _, ok := metrics[MetricDurationMs]; !ok { + t.Error("expected 'duration_ms' key in metrics") + } +} + +func TestCompactDataFilesWithPositionDeletes(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFiles(t, fs, setup, + // 3 small data files (to meet min_input_files=2) + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{1, "alice"}, {2, "bob"}, {3, "charlie"}}}, + {"d2.parquet", []struct { + ID int64 + Name string + }{{4, "dave"}, {5, "eve"}}}, + }, + // Position deletes: delete row 1 (bob) from d1 + []struct { + Name string + Rows []struct { + FilePath string + Pos int64 + } + }{ + {"pd1.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 1}}}, + }, + nil, // no equality deletes + ) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 256 * 1024 * 1024, + MinInputFiles: 2, + MaxCommitRetries: 3, + ApplyDeletes: true, + } + + result, _, err := handler.compactDataFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config, nil) + if err != nil { + t.Fatalf("compactDataFiles: %v", err) + } + + if !strings.Contains(result, "compacted") { + t.Errorf("expected compaction result, got %q", result) + } + t.Logf("result: %s", result) + + // Verify: read the merged output and count rows + // The merged file should have 4 rows (5 total - 1 position delete) + dataDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "data") + entries := fs.listDir(dataDir) + var mergedContent []byte + for _, e := range entries { + if strings.HasPrefix(e.Name, "compact-") { + mergedContent = e.Content + break + } + } + if mergedContent == nil { + t.Fatal("no merged file found") + } + + reader := parquet.NewReader(bytes.NewReader(mergedContent)) + defer reader.Close() + type row struct { + ID int64 `parquet:"id"` + Name string `parquet:"name"` + } + var outputRows []row + for { + var r row + if err := reader.Read(&r); err != nil { + if err == io.EOF { + break + } + t.Fatalf("read: %v", err) + } + outputRows = append(outputRows, r) + } + + if len(outputRows) != 4 { + t.Errorf("expected 4 rows (5 - 1 pos delete), got %d", len(outputRows)) + } + for _, r := range outputRows { + if r.Name == "bob" { + t.Error("bob should have been deleted by position delete") + } + } +} + +func TestCompactDataFilesWithEqualityDeletes(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFiles(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{1, "alice"}, {2, "bob"}}}, + {"d2.parquet", []struct { + ID int64 + Name string + }{{3, "charlie"}, {4, "dave"}}}, + }, + nil, // no position deletes + // Equality deletes: delete rows where name="bob" or name="dave" + []struct { + Name string + FieldIDs []int + Rows []struct { + ID int64 + Name string + } + }{ + {"ed1.parquet", []int{2}, []struct { + ID int64 + Name string + }{{0, "bob"}, {0, "dave"}}}, + }, + ) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 256 * 1024 * 1024, + MinInputFiles: 2, + MaxCommitRetries: 3, + ApplyDeletes: true, + } + + result, _, err := handler.compactDataFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config, nil) + if err != nil { + t.Fatalf("compactDataFiles: %v", err) + } + + if !strings.Contains(result, "compacted") { + t.Errorf("expected compaction result, got %q", result) + } + t.Logf("result: %s", result) + + // Verify merged output + dataDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "data") + entries := fs.listDir(dataDir) + var mergedContent []byte + for _, e := range entries { + if strings.HasPrefix(e.Name, "compact-") { + mergedContent = e.Content + break + } + } + if mergedContent == nil { + t.Fatal("no merged file found") + } + + reader := parquet.NewReader(bytes.NewReader(mergedContent)) + defer reader.Close() + type row struct { + ID int64 `parquet:"id"` + Name string `parquet:"name"` + } + var outputRows []row + for { + var r row + if err := reader.Read(&r); err != nil { + if err == io.EOF { + break + } + t.Fatalf("read: %v", err) + } + outputRows = append(outputRows, r) + } + + if len(outputRows) != 2 { + t.Errorf("expected 2 rows (4 - 2 eq deletes), got %d", len(outputRows)) + } + for _, r := range outputRows { + if r.Name == "bob" || r.Name == "dave" { + t.Errorf("row %q should have been deleted by equality delete", r.Name) + } + } +} + +func TestCompactDataFilesApplyDeletesDisabled(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFiles(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{1, "a"}, {2, "b"}}}, + {"d2.parquet", []struct { + ID int64 + Name string + }{{3, "c"}}}, + }, + []struct { + Name string + Rows []struct { + FilePath string + Pos int64 + } + }{ + {"pd1.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 0}}}, + }, + nil, + ) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 256 * 1024 * 1024, + MinInputFiles: 2, + MaxCommitRetries: 3, + ApplyDeletes: false, // disabled + } + + result, _, err := handler.compactDataFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config, nil) + if err != nil { + t.Fatalf("compactDataFiles: %v", err) + } + + if !strings.Contains(result, "skipped") { + t.Errorf("expected skip when apply_deletes=false, got %q", result) + } +} + +func TestCompactDataFilesWithMixedDeletes(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFiles(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{1, "alice"}, {2, "bob"}, {3, "charlie"}}}, + {"d2.parquet", []struct { + ID int64 + Name string + }{{4, "dave"}, {5, "eve"}}}, + }, + // Position delete: row 0 (alice) from d1 + []struct { + Name string + Rows []struct { + FilePath string + Pos int64 + } + }{ + {"pd1.parquet", []struct { + FilePath string + Pos int64 + }{{"data/d1.parquet", 0}}}, + }, + // Equality delete: name="eve" + []struct { + Name string + FieldIDs []int + Rows []struct { + ID int64 + Name string + } + }{ + {"ed1.parquet", []int{2}, []struct { + ID int64 + Name string + }{{0, "eve"}}}, + }, + ) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 256 * 1024 * 1024, + MinInputFiles: 2, + MaxCommitRetries: 3, + ApplyDeletes: true, + } + + result, _, err := handler.compactDataFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config, nil) + if err != nil { + t.Fatalf("compactDataFiles: %v", err) + } + + if !strings.Contains(result, "compacted") { + t.Errorf("expected compaction, got %q", result) + } + + // Verify: 5 total - 1 pos delete (alice) - 1 eq delete (eve) = 3 rows + dataDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "data") + entries := fs.listDir(dataDir) + var mergedContent []byte + for _, e := range entries { + if strings.HasPrefix(e.Name, "compact-") { + mergedContent = e.Content + break + } + } + if mergedContent == nil { + t.Fatal("no merged file found") + } + + reader := parquet.NewReader(bytes.NewReader(mergedContent)) + defer reader.Close() + type row struct { + ID int64 `parquet:"id"` + Name string `parquet:"name"` + } + var outputRows []row + for { + var r row + if err := reader.Read(&r); err != nil { + if err == io.EOF { + break + } + t.Fatalf("read: %v", err) + } + outputRows = append(outputRows, r) + } + + if len(outputRows) != 3 { + t.Errorf("expected 3 rows (5 - 1 pos - 1 eq), got %d", len(outputRows)) + } + for _, r := range outputRows { + if r.Name == "alice" || r.Name == "eve" { + t.Errorf("%q should have been deleted", r.Name) + } + } +} diff --git a/weed/plugin/worker/iceberg/handler.go b/weed/plugin/worker/iceberg/handler.go index 37cbf6860..5ef03c185 100644 --- a/weed/plugin/worker/iceberg/handler.go +++ b/weed/plugin/worker/iceberg/handler.go @@ -152,6 +152,13 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2}}, }, + { + Name: "apply_deletes", + Label: "Apply Deletes", + Description: "When true, compaction applies position and equality deletes to data files. When false, tables with delete manifests are skipped.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_BOOL, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TOGGLE, + }, }, }, { @@ -217,6 +224,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { "orphan_older_than_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultOrphanOlderThanHours}}, "max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxCommitRetries}}, "operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultOperations}}, + "apply_deletes": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true}}, }, }, AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{ @@ -238,6 +246,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { "orphan_older_than_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultOrphanOlderThanHours}}, "max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxCommitRetries}}, "operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultOperations}}, + "apply_deletes": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true}}, }, } } @@ -397,6 +406,7 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ var lastErr error totalOps := len(ops) completedOps := 0 + allMetrics := make(map[string]int64) // Execute operations in correct Iceberg maintenance order: // expire_snapshots → remove_orphans → rewrite_manifests @@ -424,21 +434,37 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ var opResult string var opErr error + var opMetrics map[string]int64 switch op { case "compact": - opResult, opErr = h.compactDataFiles(ctx, filerClient, bucketName, tablePath, workerConfig) + opResult, opMetrics, opErr = h.compactDataFiles(ctx, filerClient, bucketName, tablePath, workerConfig, func(binIdx, totalBins int) { + binProgress := progress + float64(binIdx+1)/float64(totalBins)*(100.0/float64(totalOps)) + _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: canonicalJobType, + State: plugin_pb.JobState_JOB_STATE_RUNNING, + ProgressPercent: binProgress, + Stage: fmt.Sprintf("compact bin %d/%d", binIdx+1, totalBins), + Message: fmt.Sprintf("compacting bin %d of %d", binIdx+1, totalBins), + }) + }) case "expire_snapshots": - opResult, opErr = h.expireSnapshots(ctx, filerClient, bucketName, tablePath, workerConfig) + opResult, opMetrics, opErr = h.expireSnapshots(ctx, filerClient, bucketName, tablePath, workerConfig) case "remove_orphans": - opResult, opErr = h.removeOrphans(ctx, filerClient, bucketName, tablePath, workerConfig) + opResult, opMetrics, opErr = h.removeOrphans(ctx, filerClient, bucketName, tablePath, workerConfig) case "rewrite_manifests": - opResult, opErr = h.rewriteManifests(ctx, filerClient, bucketName, tablePath, workerConfig) + opResult, opMetrics, opErr = h.rewriteManifests(ctx, filerClient, bucketName, tablePath, workerConfig) default: glog.Warningf("unknown maintenance operation: %s", op) continue } + // Accumulate per-operation metrics with dot-prefixed keys + for k, v := range opMetrics { + allMetrics[op+"."+k] = v + } + completedOps++ if opErr != nil { glog.Warningf("iceberg maintenance %s failed for %s/%s/%s: %v", op, bucketName, namespace, tableName, opErr) @@ -452,6 +478,16 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ resultSummary := strings.Join(results, "; ") success := lastErr == nil + // Build OutputValues with base table info + per-operation metrics + outputValues := map[string]*plugin_pb.ConfigValue{ + "bucket": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: bucketName}}, + "namespace": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: namespace}}, + "table": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: tableName}}, + } + for k, v := range allMetrics { + outputValues[k] = &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: v}} + } + return sender.SendCompleted(&plugin_pb.JobCompleted{ JobId: request.Job.JobId, JobType: canonicalJobType, @@ -463,12 +499,8 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ return "" }(), Result: &plugin_pb.JobResult{ - Summary: resultSummary, - OutputValues: map[string]*plugin_pb.ConfigValue{ - "bucket": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: bucketName}}, - "namespace": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: namespace}}, - "table": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: tableName}}, - }, + Summary: resultSummary, + OutputValues: outputValues, }, Activities: []*plugin_pb.ActivityEvent{ pluginworker.BuildExecutorActivity("completed", resultSummary), diff --git a/weed/plugin/worker/iceberg/handler_test.go b/weed/plugin/worker/iceberg/handler_test.go index db7f5aa07..83e9a3fa0 100644 --- a/weed/plugin/worker/iceberg/handler_test.go +++ b/weed/plugin/worker/iceberg/handler_test.go @@ -2,13 +2,18 @@ package iceberg import ( "bytes" + "context" "fmt" + "io" "path" "testing" "time" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/table" + "github.com/parquet-go/parquet-go" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" ) func TestParseConfig(t *testing.T) { @@ -572,14 +577,19 @@ type testEntrySpec struct { path string size int64 partition map[int]any + specID int32 // partition spec ID; 0 uses UnpartitionedSpec } +// makeTestEntries creates manifest entries using UnpartitionedSpec (spec ID 0). +// The specID field in testEntrySpec is ignored here; for multi-spec testing, +// use makeTestEntriesWithSpec instead. func makeTestEntries(t *testing.T, specs []testEntrySpec) []iceberg.ManifestEntry { t.Helper() entries := make([]iceberg.ManifestEntry, 0, len(specs)) for _, spec := range specs { + partSpec := *iceberg.UnpartitionedSpec dfBuilder, err := iceberg.NewDataFileBuilder( - *iceberg.UnpartitionedSpec, + partSpec, iceberg.EntryContentData, spec.path, iceberg.ParquetFile, @@ -598,6 +608,471 @@ func makeTestEntries(t *testing.T, specs []testEntrySpec) []iceberg.ManifestEntr return entries } +// makeTestEntriesWithSpec creates manifest entries using specific partition specs. +// Each spec in the specs slice can specify a specID; the entry is built using +// a PartitionSpec with that ID. +func makeTestEntriesWithSpec(t *testing.T, specs []testEntrySpec, partSpecs map[int32]iceberg.PartitionSpec) []iceberg.ManifestEntry { + t.Helper() + entries := make([]iceberg.ManifestEntry, 0, len(specs)) + for _, s := range specs { + ps, ok := partSpecs[s.specID] + if !ok { + t.Fatalf("spec ID %d not found in partSpecs map", s.specID) + } + dfBuilder, err := iceberg.NewDataFileBuilder( + ps, + iceberg.EntryContentData, + s.path, + iceberg.ParquetFile, + s.partition, + nil, nil, + 1, + s.size, + ) + if err != nil { + t.Fatalf("failed to build data file %s: %v", s.path, err) + } + snapID := int64(1) + entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build()) + entries = append(entries, entry) + } + return entries +} + +func TestBuildCompactionBinsMultipleSpecs(t *testing.T) { + targetSize := int64(256 * 1024 * 1024) + minFiles := 2 + + // Two partition specs with different IDs + spec0 := iceberg.NewPartitionSpecID(0) + spec1 := iceberg.NewPartitionSpecID(1) + partSpecs := map[int32]iceberg.PartitionSpec{ + 0: spec0, + 1: spec1, + } + + // Entries from two different specs with same partition values should go + // to separate bins. + entries := makeTestEntriesWithSpec(t, []testEntrySpec{ + {path: "data/s0-f1.parquet", size: 1024, partition: map[int]any{}, specID: 0}, + {path: "data/s0-f2.parquet", size: 2048, partition: map[int]any{}, specID: 0}, + {path: "data/s1-f1.parquet", size: 1024, partition: map[int]any{}, specID: 1}, + {path: "data/s1-f2.parquet", size: 2048, partition: map[int]any{}, specID: 1}, + }, partSpecs) + + bins := buildCompactionBins(entries, targetSize, minFiles) + if len(bins) != 2 { + t.Fatalf("expected 2 bins (one per spec), got %d", len(bins)) + } + + // Verify each bin has entries from only one spec + specsSeen := make(map[int32]bool) + for _, bin := range bins { + specsSeen[bin.SpecID] = true + for _, entry := range bin.Entries { + if entry.DataFile().SpecID() != bin.SpecID { + t.Errorf("bin specID=%d contains entry with specID=%d", bin.SpecID, entry.DataFile().SpecID()) + } + } + } + if !specsSeen[0] || !specsSeen[1] { + t.Errorf("expected bins for spec 0 and 1, got specs %v", specsSeen) + } +} + +func TestBuildCompactionBinsSingleSpec(t *testing.T) { + // Verify existing behavior: all entries with spec 0 still group correctly. + targetSize := int64(256 * 1024 * 1024) + minFiles := 2 + + spec0 := iceberg.NewPartitionSpecID(0) + partSpecs := map[int32]iceberg.PartitionSpec{0: spec0} + + entries := makeTestEntriesWithSpec(t, []testEntrySpec{ + {path: "data/f1.parquet", size: 1024, partition: map[int]any{}, specID: 0}, + {path: "data/f2.parquet", size: 2048, partition: map[int]any{}, specID: 0}, + {path: "data/f3.parquet", size: 4096, partition: map[int]any{}, specID: 0}, + }, partSpecs) + + bins := buildCompactionBins(entries, targetSize, minFiles) + if len(bins) != 1 { + t.Fatalf("expected 1 bin, got %d", len(bins)) + } + if len(bins[0].Entries) != 3 { + t.Errorf("expected 3 entries in bin, got %d", len(bins[0].Entries)) + } + if bins[0].SpecID != 0 { + t.Errorf("expected specID=0, got %d", bins[0].SpecID) + } +} + +func TestParseConfigApplyDeletes(t *testing.T) { + // Default: true + config := ParseConfig(nil) + if !config.ApplyDeletes { + t.Error("expected ApplyDeletes=true by default") + } + + // Explicit false + config = ParseConfig(map[string]*plugin_pb.ConfigValue{ + "apply_deletes": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: false}}, + }) + if config.ApplyDeletes { + t.Error("expected ApplyDeletes=false when explicitly set") + } + + // String "false" + config = ParseConfig(map[string]*plugin_pb.ConfigValue{ + "apply_deletes": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "false"}}, + }) + if config.ApplyDeletes { + t.Error("expected ApplyDeletes=false when set via string 'false'") + } +} + +func TestCollectPositionDeletes(t *testing.T) { + fs, client := startFakeFiler(t) + + // Create a position delete Parquet file with file_path + pos columns + type posDeleteRow struct { + FilePath string `parquet:"file_path"` + Pos int64 `parquet:"pos"` + } + var buf bytes.Buffer + writer := parquet.NewWriter(&buf, parquet.SchemaOf(new(posDeleteRow))) + rows := []posDeleteRow{ + {"data/file1.parquet", 0}, + {"data/file1.parquet", 5}, + {"data/file1.parquet", 10}, + {"data/file2.parquet", 3}, + } + for _, r := range rows { + if err := writer.Write(&r); err != nil { + t.Fatalf("write pos delete row: %v", err) + } + } + if err := writer.Close(); err != nil { + t.Fatalf("close pos delete writer: %v", err) + } + + // Store in fake filer + dataDir := "/buckets/test-bucket/ns/tbl/data" + fs.putEntry(dataDir, "pos-delete-1.parquet", &filer_pb.Entry{ + Name: "pos-delete-1.parquet", + Content: buf.Bytes(), + }) + + // Create a manifest entry for the position delete file + spec := *iceberg.UnpartitionedSpec + dfBuilder, err := iceberg.NewDataFileBuilder( + spec, + iceberg.EntryContentPosDeletes, + "data/pos-delete-1.parquet", + iceberg.ParquetFile, + map[int]any{}, + nil, nil, + int64(len(rows)), + int64(buf.Len()), + ) + if err != nil { + t.Fatalf("build pos delete data file: %v", err) + } + snapID := int64(1) + entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build()) + + result, err := collectPositionDeletes(context.Background(), client, "test-bucket", "ns/tbl", []iceberg.ManifestEntry{entry}) + if err != nil { + t.Fatalf("collectPositionDeletes: %v", err) + } + + // Verify results + if len(result["data/file1.parquet"]) != 3 { + t.Errorf("expected 3 positions for file1, got %d", len(result["data/file1.parquet"])) + } + if len(result["data/file2.parquet"]) != 1 { + t.Errorf("expected 1 position for file2, got %d", len(result["data/file2.parquet"])) + } + + // Verify sorted + positions := result["data/file1.parquet"] + for i := 1; i < len(positions); i++ { + if positions[i] <= positions[i-1] { + t.Errorf("positions not sorted: %v", positions) + break + } + } +} + +func TestCollectEqualityDeletes(t *testing.T) { + fs, client := startFakeFiler(t) + + // Create an equality delete Parquet file with id + name columns + type eqDeleteRow struct { + ID int64 `parquet:"id"` + Name string `parquet:"name"` + } + var buf bytes.Buffer + writer := parquet.NewWriter(&buf, parquet.SchemaOf(new(eqDeleteRow))) + deleteRows := []eqDeleteRow{ + {1, "alice"}, + {2, "bob"}, + {3, "charlie"}, + } + for _, r := range deleteRows { + if err := writer.Write(&r); err != nil { + t.Fatalf("write eq delete row: %v", err) + } + } + if err := writer.Close(); err != nil { + t.Fatalf("close eq delete writer: %v", err) + } + + dataDir := "/buckets/test-bucket/ns/tbl/data" + fs.putEntry(dataDir, "eq-delete-1.parquet", &filer_pb.Entry{ + Name: "eq-delete-1.parquet", + Content: buf.Bytes(), + }) + + // Create manifest entry with equality field IDs + schema := newTestSchema() // has field 1=id, 2=name + spec := *iceberg.UnpartitionedSpec + dfBuilder, err := iceberg.NewDataFileBuilder( + spec, + iceberg.EntryContentEqDeletes, + "data/eq-delete-1.parquet", + iceberg.ParquetFile, + map[int]any{}, + nil, nil, + int64(len(deleteRows)), + int64(buf.Len()), + ) + if err != nil { + t.Fatalf("build eq delete data file: %v", err) + } + dfBuilder.EqualityFieldIDs([]int{1, 2}) // id + name + snapID := int64(1) + entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build()) + + groups, err := collectEqualityDeletes(context.Background(), client, "test-bucket", "ns/tbl", []iceberg.ManifestEntry{entry}, schema) + if err != nil { + t.Fatalf("collectEqualityDeletes: %v", err) + } + + if len(groups) != 1 { + t.Fatalf("expected 1 equality delete group, got %d", len(groups)) + } + if len(groups[0].Keys) != 3 { + t.Errorf("expected 3 equality delete keys, got %d", len(groups[0].Keys)) + } + if len(groups[0].FieldIDs) != 2 { + t.Errorf("expected 2 field IDs, got %d", len(groups[0].FieldIDs)) + } +} + +func TestMergeParquetFilesWithPositionDeletes(t *testing.T) { + fs, client := startFakeFiler(t) + + // Create two data files with known rows + type dataRow struct { + ID int64 `parquet:"id"` + Name string `parquet:"name"` + } + + writeDataFile := func(name string, rows []dataRow) { + var buf bytes.Buffer + w := parquet.NewWriter(&buf, parquet.SchemaOf(new(dataRow))) + for _, r := range rows { + if err := w.Write(&r); err != nil { + t.Fatalf("write row: %v", err) + } + } + if err := w.Close(); err != nil { + t.Fatalf("close: %v", err) + } + dataDir := "/buckets/test-bucket/ns/tbl/data" + fs.putEntry(dataDir, name, &filer_pb.Entry{ + Name: name, + Content: buf.Bytes(), + }) + } + + writeDataFile("file1.parquet", []dataRow{ + {1, "alice"}, {2, "bob"}, {3, "charlie"}, {4, "dave"}, {5, "eve"}, + }) + writeDataFile("file2.parquet", []dataRow{ + {6, "frank"}, {7, "grace"}, + }) + + spec := *iceberg.UnpartitionedSpec + makeEntry := func(path string, size int64) iceberg.ManifestEntry { + dfb, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentData, path, iceberg.ParquetFile, map[int]any{}, nil, nil, 1, size) + if err != nil { + t.Fatalf("build: %v", err) + } + snapID := int64(1) + return iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfb.Build()) + } + + entries := []iceberg.ManifestEntry{ + makeEntry("data/file1.parquet", 1024), + makeEntry("data/file2.parquet", 512), + } + + // Delete rows 1 (bob) and 3 (dave) from file1 + posDeletes := map[string][]int64{ + "data/file1.parquet": {1, 3}, + } + + merged, count, err := mergeParquetFiles( + context.Background(), client, "test-bucket", "ns/tbl", + entries, posDeletes, nil, nil, + ) + if err != nil { + t.Fatalf("mergeParquetFiles: %v", err) + } + + // Original: 5 + 2 = 7 rows, deleted 2 = 5 rows + if count != 5 { + t.Errorf("expected 5 rows after position deletes, got %d", count) + } + + // Verify merged output is valid Parquet + reader := parquet.NewReader(bytes.NewReader(merged)) + defer reader.Close() + var outputRows []dataRow + for { + var r dataRow + err := reader.Read(&r) + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("read merged row: %v", err) + } + outputRows = append(outputRows, r) + } + + if len(outputRows) != 5 { + t.Fatalf("expected 5 output rows, got %d", len(outputRows)) + } + + // Verify bob (id=2) and dave (id=4) are NOT in output + for _, r := range outputRows { + if r.ID == 2 || r.ID == 4 { + t.Errorf("row with id=%d should have been deleted", r.ID) + } + } +} + +func TestMergeParquetFilesWithEqualityDeletes(t *testing.T) { + fs, client := startFakeFiler(t) + + type dataRow struct { + ID int64 `parquet:"id"` + Name string `parquet:"name"` + } + + var buf bytes.Buffer + w := parquet.NewWriter(&buf, parquet.SchemaOf(new(dataRow))) + dataRows := []dataRow{ + {1, "alice"}, {2, "bob"}, {3, "charlie"}, {4, "dave"}, + } + for _, r := range dataRows { + if err := w.Write(&r); err != nil { + t.Fatalf("write: %v", err) + } + } + if err := w.Close(); err != nil { + t.Fatalf("close: %v", err) + } + + dataDir := "/buckets/test-bucket/ns/tbl/data" + fs.putEntry(dataDir, "data1.parquet", &filer_pb.Entry{ + Name: "data1.parquet", + Content: buf.Bytes(), + }) + + spec := *iceberg.UnpartitionedSpec + dfb, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentData, "data/data1.parquet", iceberg.ParquetFile, map[int]any{}, nil, nil, 4, int64(buf.Len())) + if err != nil { + t.Fatalf("build: %v", err) + } + snapID := int64(1) + entries := []iceberg.ManifestEntry{ + iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfb.Build()), + } + + // Build equality deletes via the collect pipeline so keys match the + // encoding used by mergeParquetFiles. + schema := newTestSchema() // field 1=id, field 2=name + + type eqDeleteRow struct { + ID int64 `parquet:"id"` + Name string `parquet:"name"` + } + var eqBuf bytes.Buffer + eqWriter := parquet.NewWriter(&eqBuf, parquet.SchemaOf(new(eqDeleteRow))) + for _, r := range []eqDeleteRow{{0, "bob"}, {0, "dave"}} { + if err := eqWriter.Write(&r); err != nil { + t.Fatalf("write eq delete: %v", err) + } + } + if err := eqWriter.Close(); err != nil { + t.Fatalf("close eq writer: %v", err) + } + eqDataDir := "/buckets/test-bucket/ns/tbl/data" + fs.putEntry(eqDataDir, "eq-del.parquet", &filer_pb.Entry{ + Name: "eq-del.parquet", Content: eqBuf.Bytes(), + }) + + eqDfb, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentEqDeletes, + "data/eq-del.parquet", iceberg.ParquetFile, map[int]any{}, nil, nil, 2, int64(eqBuf.Len())) + if err != nil { + t.Fatalf("build eq delete: %v", err) + } + eqDfb.EqualityFieldIDs([]int{2}) // field 2 = name + eqEntry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, eqDfb.Build()) + + eqGroups, err := collectEqualityDeletes(context.Background(), client, "test-bucket", "ns/tbl", + []iceberg.ManifestEntry{eqEntry}, schema) + if err != nil { + t.Fatalf("collectEqualityDeletes: %v", err) + } + + merged, count, err := mergeParquetFiles( + context.Background(), client, "test-bucket", "ns/tbl", + entries, nil, eqGroups, schema, + ) + if err != nil { + t.Fatalf("mergeParquetFiles: %v", err) + } + + if count != 2 { + t.Errorf("expected 2 rows after equality deletes, got %d", count) + } + + reader := parquet.NewReader(bytes.NewReader(merged)) + defer reader.Close() + var outputRows []dataRow + for { + var r dataRow + err := reader.Read(&r) + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("read: %v", err) + } + outputRows = append(outputRows, r) + } + + for _, r := range outputRows { + if r.Name == "bob" || r.Name == "dave" { + t.Errorf("row %q should have been deleted", r.Name) + } + } +} + func TestDetectNilRequest(t *testing.T) { handler := NewHandler(nil) err := handler.Detect(nil, nil, nil) diff --git a/weed/plugin/worker/iceberg/operations.go b/weed/plugin/worker/iceberg/operations.go index c091b0d4b..d97dd9465 100644 --- a/weed/plugin/worker/iceberg/operations.go +++ b/weed/plugin/worker/iceberg/operations.go @@ -38,16 +38,17 @@ func (h *Handler) expireSnapshots( filerClient filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config, -) (string, error) { +) (string, map[string]int64, error) { + start := time.Now() // Load current metadata meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath) if err != nil { - return "", fmt.Errorf("load metadata: %w", err) + return "", nil, fmt.Errorf("load metadata: %w", err) } snapshots := meta.Snapshots() if len(snapshots) == 0 { - return "no snapshots", nil + return "no snapshots", nil, nil } // Determine which snapshots to expire @@ -92,7 +93,7 @@ func (h *Handler) expireSnapshots( } if len(toExpire) == 0 { - return "no snapshots expired", nil + return "no snapshots expired", nil, nil } // Split snapshots into expired and kept sets @@ -113,11 +114,11 @@ func (h *Handler) expireSnapshots( // This lets us determine which files become unreferenced. expiredFiles, err := collectSnapshotFiles(ctx, filerClient, bucketName, tablePath, expiredSnaps) if err != nil { - return "", fmt.Errorf("collect expired snapshot files: %w", err) + return "", nil, fmt.Errorf("collect expired snapshot files: %w", err) } keptFiles, err := collectSnapshotFiles(ctx, filerClient, bucketName, tablePath, keptSnaps) if err != nil { - return "", fmt.Errorf("collect kept snapshot files: %w", err) + return "", nil, fmt.Errorf("collect kept snapshot files: %w", err) } // Normalize kept file paths for consistent comparison @@ -136,7 +137,7 @@ func (h *Handler) expireSnapshots( return builder.RemoveSnapshots(toExpire) }) if err != nil { - return "", fmt.Errorf("commit snapshot expiration: %w", err) + return "", nil, fmt.Errorf("commit snapshot expiration: %w", err) } // Delete files exclusively referenced by expired snapshots (best-effort) @@ -156,7 +157,12 @@ func (h *Handler) expireSnapshots( } } - return fmt.Sprintf("expired %d snapshot(s), deleted %d unreferenced file(s)", len(toExpire), deletedCount), nil + metrics := map[string]int64{ + MetricSnapshotsExpired: int64(len(toExpire)), + MetricFilesDeleted: int64(deletedCount), + MetricDurationMs: time.Since(start).Milliseconds(), + } + return fmt.Sprintf("expired %d snapshot(s), deleted %d unreferenced file(s)", len(toExpire), deletedCount), metrics, nil } // collectSnapshotFiles returns all file paths (manifest lists, manifest files, @@ -215,17 +221,18 @@ func (h *Handler) removeOrphans( filerClient filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config, -) (string, error) { +) (string, map[string]int64, error) { + start := time.Now() // Load current metadata meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath) if err != nil { - return "", fmt.Errorf("load metadata: %w", err) + return "", nil, fmt.Errorf("load metadata: %w", err) } // Collect all referenced files from all snapshots referencedFiles, err := collectSnapshotFiles(ctx, filerClient, bucketName, tablePath, meta.Snapshots()) if err != nil { - return "", fmt.Errorf("collect referenced files: %w", err) + return "", nil, fmt.Errorf("collect referenced files: %w", err) } // Reference the active metadata file so it is not treated as orphan @@ -286,7 +293,11 @@ func (h *Handler) removeOrphans( } } - return fmt.Sprintf("removed %d orphan file(s)", orphanCount), nil + metrics := map[string]int64{ + MetricOrphansRemoved: int64(orphanCount), + MetricDurationMs: time.Since(start).Milliseconds(), + } + return fmt.Sprintf("removed %d orphan file(s)", orphanCount), metrics, nil } // --------------------------------------------------------------------------- @@ -299,31 +310,41 @@ func (h *Handler) rewriteManifests( filerClient filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config, -) (string, error) { +) (string, map[string]int64, error) { + start := time.Now() // Load current metadata meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath) if err != nil { - return "", fmt.Errorf("load metadata: %w", err) + return "", nil, fmt.Errorf("load metadata: %w", err) } currentSnap := meta.CurrentSnapshot() if currentSnap == nil || currentSnap.ManifestList == "" { - return "no current snapshot", nil + return "no current snapshot", nil, nil } // Read manifest list manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList) if err != nil { - return "", fmt.Errorf("read manifest list: %w", err) + return "", nil, fmt.Errorf("read manifest list: %w", err) } manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData)) if err != nil { - return "", fmt.Errorf("parse manifest list: %w", err) + return "", nil, fmt.Errorf("parse manifest list: %w", err) + } + + // Separate data manifests from delete manifests. Only data manifests + // are candidates for rewriting; delete manifests are carried forward. + var dataManifests []iceberg.ManifestFile + for _, mf := range manifests { + if mf.ManifestContent() == iceberg.ManifestContentData { + dataManifests = append(dataManifests, mf) + } } - if int64(len(manifests)) < config.MinManifestsToRewrite { - return fmt.Sprintf("only %d manifests, below threshold of %d", len(manifests), config.MinManifestsToRewrite), nil + if int64(len(dataManifests)) < config.MinManifestsToRewrite { + return fmt.Sprintf("only %d data manifests, below threshold of %d", len(dataManifests), config.MinManifestsToRewrite), nil, nil } // Collect all entries from data manifests, grouped by partition spec ID @@ -341,17 +362,14 @@ func (h *Handler) rewriteManifests( specByID[ps.ID()] = ps } - for _, mf := range manifests { - if mf.ManifestContent() != iceberg.ManifestContentData { - continue - } + for _, mf := range dataManifests { manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath()) if err != nil { - return "", fmt.Errorf("read manifest %s: %w", mf.FilePath(), err) + return "", nil, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err) } entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) if err != nil { - return "", fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err) + return "", nil, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err) } sid := mf.PartitionSpecID() @@ -359,7 +377,7 @@ func (h *Handler) rewriteManifests( if !ok { ps, found := specByID[int(sid)] if !found { - return "", fmt.Errorf("partition spec %d not found in table metadata", sid) + return "", nil, fmt.Errorf("partition spec %d not found in table metadata", sid) } se = &specEntries{specID: sid, spec: ps} specMap[sid] = se @@ -368,7 +386,7 @@ func (h *Handler) rewriteManifests( } if len(specMap) == 0 { - return "no data entries to rewrite", nil + return "no data entries to rewrite", nil, nil } schema := meta.CurrentSchema() @@ -376,6 +394,7 @@ func (h *Handler) rewriteManifests( snapshotID := currentSnap.SnapshotID newSnapshotID := time.Now().UnixMilli() newSeqNum := currentSnap.SequenceNumber + 1 + artifactSuffix := compactRandomSuffix() metaDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "metadata") // Track written artifacts so we can clean them up if the commit fails. @@ -403,7 +422,7 @@ func (h *Handler) rewriteManifests( totalEntries := 0 for _, se := range specMap { totalEntries += len(se.entries) - manifestFileName := fmt.Sprintf("merged-%d-spec%d-%d.avro", newSnapshotID, se.specID, time.Now().UnixMilli()) + manifestFileName := fmt.Sprintf("merged-%d-%s-spec%d.avro", newSnapshotID, artifactSuffix, se.specID) manifestPath := path.Join("metadata", manifestFileName) var manifestBuf bytes.Buffer @@ -417,11 +436,11 @@ func (h *Handler) rewriteManifests( se.entries, ) if err != nil { - return "", fmt.Errorf("write merged manifest for spec %d: %w", se.specID, err) + return "", nil, fmt.Errorf("write merged manifest for spec %d: %w", se.specID, err) } if err := saveFilerFile(ctx, filerClient, metaDir, manifestFileName, manifestBuf.Bytes()); err != nil { - return "", fmt.Errorf("save merged manifest for spec %d: %w", se.specID, err) + return "", nil, fmt.Errorf("save merged manifest for spec %d: %w", se.specID, err) } writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestFileName}) newManifests = append(newManifests, mergedManifest) @@ -437,13 +456,13 @@ func (h *Handler) rewriteManifests( var manifestListBuf bytes.Buffer err = iceberg.WriteManifestList(version, &manifestListBuf, newSnapshotID, &snapshotID, &newSeqNum, 0, newManifests) if err != nil { - return "", fmt.Errorf("write manifest list: %w", err) + return "", nil, fmt.Errorf("write manifest list: %w", err) } // Save new manifest list - manifestListFileName := fmt.Sprintf("snap-%d-%d.avro", newSnapshotID, time.Now().UnixMilli()) + manifestListFileName := fmt.Sprintf("snap-%d-%s.avro", newSnapshotID, artifactSuffix) if err := saveFilerFile(ctx, filerClient, metaDir, manifestListFileName, manifestListBuf.Bytes()); err != nil { - return "", fmt.Errorf("save manifest list: %w", err) + return "", nil, fmt.Errorf("save manifest list: %w", err) } writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestListFileName}) @@ -484,11 +503,16 @@ func (h *Handler) rewriteManifests( ) }) if err != nil { - return "", fmt.Errorf("commit manifest rewrite: %w", err) + return "", nil, fmt.Errorf("commit manifest rewrite: %w", err) } committed = true - return fmt.Sprintf("rewrote %d manifests into %d (%d entries)", len(manifests), len(specMap), totalEntries), nil + metrics := map[string]int64{ + MetricManifestsRewritten: int64(len(dataManifests)), + MetricEntriesTotal: int64(totalEntries), + MetricDurationMs: time.Since(start).Milliseconds(), + } + return fmt.Sprintf("rewrote %d manifests into %d (%d entries)", len(dataManifests), len(specMap), totalEntries), metrics, nil } // --------------------------------------------------------------------------- diff --git a/weed/plugin/worker/iceberg/testing_api.go b/weed/plugin/worker/iceberg/testing_api.go index c0fdda5ff..ecb1a40a6 100644 --- a/weed/plugin/worker/iceberg/testing_api.go +++ b/weed/plugin/worker/iceberg/testing_api.go @@ -9,18 +9,18 @@ import ( // The following methods export the internal maintenance operations for use // by integration tests. They are intentionally thin wrappers. -func (h *Handler) ExpireSnapshots(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config) (string, error) { +func (h *Handler) ExpireSnapshots(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config) (string, map[string]int64, error) { return h.expireSnapshots(ctx, client, bucketName, tablePath, config) } -func (h *Handler) RemoveOrphans(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config) (string, error) { +func (h *Handler) RemoveOrphans(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config) (string, map[string]int64, error) { return h.removeOrphans(ctx, client, bucketName, tablePath, config) } -func (h *Handler) RewriteManifests(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config) (string, error) { +func (h *Handler) RewriteManifests(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config) (string, map[string]int64, error) { return h.rewriteManifests(ctx, client, bucketName, tablePath, config) } -func (h *Handler) CompactDataFiles(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config) (string, error) { - return h.compactDataFiles(ctx, client, bucketName, tablePath, config) +func (h *Handler) CompactDataFiles(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config) (string, map[string]int64, error) { + return h.compactDataFiles(ctx, client, bucketName, tablePath, config, nil) }