You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1086 lines
34 KiB
1086 lines
34 KiB
package iceberg
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"path"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/apache/iceberg-go"
|
|
"github.com/apache/iceberg-go/table"
|
|
"github.com/parquet-go/parquet-go"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// compactionBin groups small data files from the same partition and spec for merging.
|
|
type compactionBin struct {
|
|
PartitionKey string
|
|
Partition map[int]any
|
|
SpecID int32
|
|
Entries []iceberg.ManifestEntry
|
|
TotalSize int64
|
|
}
|
|
|
|
// compactDataFiles reads manifests to find small Parquet data files, groups
|
|
// them by partition, reads and merges them using parquet-go, and commits new
|
|
// manifest entries.
|
|
func (h *Handler) compactDataFiles(
|
|
ctx context.Context,
|
|
filerClient filer_pb.SeaweedFilerClient,
|
|
bucketName, tablePath string,
|
|
config Config,
|
|
onProgress func(binIdx, totalBins int),
|
|
) (string, map[string]int64, error) {
|
|
start := time.Now()
|
|
meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("load metadata: %w", err)
|
|
}
|
|
|
|
currentSnap := meta.CurrentSnapshot()
|
|
if currentSnap == nil || currentSnap.ManifestList == "" {
|
|
return "no current snapshot", nil, nil
|
|
}
|
|
|
|
// Read manifest list
|
|
manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("read manifest list: %w", err)
|
|
}
|
|
manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData))
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("parse manifest list: %w", err)
|
|
}
|
|
|
|
// Separate data manifests from delete manifests.
|
|
var dataManifests, deleteManifests []iceberg.ManifestFile
|
|
for _, mf := range manifests {
|
|
if mf.ManifestContent() == iceberg.ManifestContentData {
|
|
dataManifests = append(dataManifests, mf)
|
|
} else {
|
|
deleteManifests = append(deleteManifests, mf)
|
|
}
|
|
}
|
|
|
|
// If delete manifests exist and apply_deletes is disabled (or not yet
|
|
// implemented for this code path), skip compaction to avoid producing
|
|
// incorrect results by dropping deletes.
|
|
if len(deleteManifests) > 0 && !config.ApplyDeletes {
|
|
return "compaction skipped: delete manifests present and apply_deletes is disabled", nil, nil
|
|
}
|
|
|
|
// Collect data file entries from data manifests
|
|
var allEntries []iceberg.ManifestEntry
|
|
for _, mf := range dataManifests {
|
|
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err)
|
|
}
|
|
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err)
|
|
}
|
|
allEntries = append(allEntries, entries...)
|
|
}
|
|
|
|
// Collect delete entries if we need to apply deletes
|
|
var positionDeletes map[string][]int64
|
|
var eqDeleteGroups []equalityDeleteGroup
|
|
if config.ApplyDeletes && len(deleteManifests) > 0 {
|
|
var allDeleteEntries []iceberg.ManifestEntry
|
|
for _, mf := range deleteManifests {
|
|
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("read delete manifest %s: %w", mf.FilePath(), err)
|
|
}
|
|
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("parse delete manifest %s: %w", mf.FilePath(), err)
|
|
}
|
|
allDeleteEntries = append(allDeleteEntries, entries...)
|
|
}
|
|
|
|
// Separate position and equality deletes
|
|
var posDeleteEntries, eqDeleteEntries []iceberg.ManifestEntry
|
|
for _, entry := range allDeleteEntries {
|
|
switch entry.DataFile().ContentType() {
|
|
case iceberg.EntryContentPosDeletes:
|
|
posDeleteEntries = append(posDeleteEntries, entry)
|
|
case iceberg.EntryContentEqDeletes:
|
|
eqDeleteEntries = append(eqDeleteEntries, entry)
|
|
}
|
|
}
|
|
|
|
if len(posDeleteEntries) > 0 {
|
|
positionDeletes, err = collectPositionDeletes(ctx, filerClient, bucketName, tablePath, posDeleteEntries)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("collect position deletes: %w", err)
|
|
}
|
|
}
|
|
|
|
if len(eqDeleteEntries) > 0 {
|
|
eqDeleteGroups, err = collectEqualityDeletes(ctx, filerClient, bucketName, tablePath, eqDeleteEntries, meta.CurrentSchema())
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("collect equality deletes: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Build compaction bins: group small files by partition
|
|
// MinInputFiles is clamped by ParseConfig to [2, ...] so int conversion is safe.
|
|
bins := buildCompactionBins(allEntries, config.TargetFileSizeBytes, int(config.MinInputFiles))
|
|
if len(bins) == 0 {
|
|
return "no files eligible for compaction", nil, nil
|
|
}
|
|
|
|
// Build a lookup from spec ID to PartitionSpec for per-bin manifest writing.
|
|
specByID := make(map[int]iceberg.PartitionSpec)
|
|
for _, ps := range meta.PartitionSpecs() {
|
|
specByID[ps.ID()] = ps
|
|
}
|
|
|
|
schema := meta.CurrentSchema()
|
|
version := meta.Version()
|
|
snapshotID := currentSnap.SnapshotID
|
|
|
|
// Compute the snapshot ID for the commit up front so all manifest entries
|
|
// reference the same snapshot that will actually be committed.
|
|
newSnapID := time.Now().UnixMilli()
|
|
// Random suffix for artifact filenames to avoid collisions between
|
|
// concurrent compaction runs on different tables sharing a timestamp.
|
|
artifactSuffix := compactRandomSuffix()
|
|
|
|
// Process each bin: read source Parquet files, merge, write output
|
|
var newManifestEntries []iceberg.ManifestEntry
|
|
var deletedManifestEntries []iceberg.ManifestEntry
|
|
totalMerged := 0
|
|
|
|
entrySeqNum := func(entry iceberg.ManifestEntry) *int64 {
|
|
seqNum := entry.SequenceNum()
|
|
if seqNum < 0 {
|
|
return nil
|
|
}
|
|
return &seqNum
|
|
}
|
|
|
|
entryFileSeqNum := func(entry iceberg.ManifestEntry) *int64 {
|
|
if fileSeqNum := entry.FileSequenceNum(); fileSeqNum != nil {
|
|
value := *fileSeqNum
|
|
return &value
|
|
}
|
|
return entrySeqNum(entry)
|
|
}
|
|
|
|
metaDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "metadata")
|
|
dataDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "data")
|
|
|
|
// Track written artifacts so we can clean them up if the commit fails.
|
|
type artifact struct {
|
|
dir, fileName string
|
|
}
|
|
var writtenArtifacts []artifact
|
|
committed := false
|
|
|
|
defer func() {
|
|
if committed || len(writtenArtifacts) == 0 {
|
|
return
|
|
}
|
|
// Use a detached context so cleanup completes even if ctx was canceled.
|
|
cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
for _, a := range writtenArtifacts {
|
|
if err := deleteFilerFile(cleanupCtx, filerClient, a.dir, a.fileName); err != nil {
|
|
glog.Warningf("iceberg compact: failed to clean up artifact %s/%s: %v", a.dir, a.fileName, err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
for binIdx, bin := range bins {
|
|
select {
|
|
case <-ctx.Done():
|
|
return "", nil, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
mergedFileName := fmt.Sprintf("compact-%d-%d-%s-%d.parquet", snapshotID, newSnapID, artifactSuffix, binIdx)
|
|
mergedFilePath := path.Join("data", mergedFileName)
|
|
|
|
mergedData, recordCount, err := mergeParquetFiles(ctx, filerClient, bucketName, tablePath, bin.Entries, positionDeletes, eqDeleteGroups, schema)
|
|
if err != nil {
|
|
glog.Warningf("iceberg compact: failed to merge bin %d (%d files): %v", binIdx, len(bin.Entries), err)
|
|
goto binDone
|
|
}
|
|
|
|
// Write merged file to filer
|
|
if err := ensureFilerDir(ctx, filerClient, dataDir); err != nil {
|
|
return "", nil, fmt.Errorf("ensure data dir: %w", err)
|
|
}
|
|
if err := saveFilerFile(ctx, filerClient, dataDir, mergedFileName, mergedData); err != nil {
|
|
return "", nil, fmt.Errorf("save merged file: %w", err)
|
|
}
|
|
|
|
// Use the partition spec matching this bin's spec ID
|
|
{
|
|
binSpec, ok := specByID[int(bin.SpecID)]
|
|
if !ok {
|
|
glog.Warningf("iceberg compact: spec %d not found for bin %d, skipping", bin.SpecID, binIdx)
|
|
_ = deleteFilerFile(ctx, filerClient, dataDir, mergedFileName)
|
|
goto binDone
|
|
}
|
|
|
|
// Create new DataFile entry for the merged file
|
|
dfBuilder, err := iceberg.NewDataFileBuilder(
|
|
binSpec,
|
|
iceberg.EntryContentData,
|
|
mergedFilePath,
|
|
iceberg.ParquetFile,
|
|
bin.Partition,
|
|
nil, nil,
|
|
recordCount,
|
|
int64(len(mergedData)),
|
|
)
|
|
if err != nil {
|
|
glog.Warningf("iceberg compact: failed to build data file entry for bin %d: %v", binIdx, err)
|
|
_ = deleteFilerFile(ctx, filerClient, dataDir, mergedFileName)
|
|
goto binDone
|
|
}
|
|
writtenArtifacts = append(writtenArtifacts, artifact{dir: dataDir, fileName: mergedFileName})
|
|
|
|
newEntry := iceberg.NewManifestEntry(
|
|
iceberg.EntryStatusADDED,
|
|
&newSnapID,
|
|
nil, nil,
|
|
dfBuilder.Build(),
|
|
)
|
|
newManifestEntries = append(newManifestEntries, newEntry)
|
|
|
|
// Mark original entries as deleted
|
|
for _, entry := range bin.Entries {
|
|
delEntry := iceberg.NewManifestEntry(
|
|
iceberg.EntryStatusDELETED,
|
|
&newSnapID,
|
|
entrySeqNum(entry), entryFileSeqNum(entry),
|
|
entry.DataFile(),
|
|
)
|
|
deletedManifestEntries = append(deletedManifestEntries, delEntry)
|
|
}
|
|
|
|
totalMerged += len(bin.Entries)
|
|
}
|
|
|
|
binDone:
|
|
if onProgress != nil {
|
|
onProgress(binIdx, len(bins))
|
|
}
|
|
}
|
|
|
|
if len(newManifestEntries) == 0 {
|
|
return "no bins successfully compacted", nil, nil
|
|
}
|
|
|
|
// Build entries for the new manifests:
|
|
// - ADDED entries for merged files
|
|
// - DELETED entries for original files
|
|
// - EXISTING entries for files that weren't compacted
|
|
compactedPaths := make(map[string]struct{})
|
|
for _, entry := range deletedManifestEntries {
|
|
compactedPaths[entry.DataFile().FilePath()] = struct{}{}
|
|
}
|
|
|
|
// Group all manifest entries by spec ID for per-spec manifest writing.
|
|
type specEntries struct {
|
|
specID int32
|
|
entries []iceberg.ManifestEntry
|
|
}
|
|
specEntriesMap := make(map[int32]*specEntries)
|
|
|
|
addToSpec := func(specID int32, entry iceberg.ManifestEntry) {
|
|
se, ok := specEntriesMap[specID]
|
|
if !ok {
|
|
se = &specEntries{specID: specID}
|
|
specEntriesMap[specID] = se
|
|
}
|
|
se.entries = append(se.entries, entry)
|
|
}
|
|
|
|
// New and deleted entries carry the spec ID from their bin
|
|
for _, entry := range newManifestEntries {
|
|
addToSpec(entry.DataFile().SpecID(), entry)
|
|
}
|
|
for _, entry := range deletedManifestEntries {
|
|
addToSpec(entry.DataFile().SpecID(), entry)
|
|
}
|
|
|
|
// Existing entries that weren't compacted
|
|
for _, entry := range allEntries {
|
|
if _, compacted := compactedPaths[entry.DataFile().FilePath()]; !compacted {
|
|
existingEntry := iceberg.NewManifestEntry(
|
|
iceberg.EntryStatusEXISTING,
|
|
func() *int64 { id := entry.SnapshotID(); return &id }(),
|
|
entrySeqNum(entry), entryFileSeqNum(entry),
|
|
entry.DataFile(),
|
|
)
|
|
addToSpec(entry.DataFile().SpecID(), existingEntry)
|
|
}
|
|
}
|
|
|
|
// Write one manifest per spec ID, iterating in sorted order for
|
|
// deterministic manifest list construction.
|
|
sortedSpecIDs := make([]int32, 0, len(specEntriesMap))
|
|
for sid := range specEntriesMap {
|
|
sortedSpecIDs = append(sortedSpecIDs, sid)
|
|
}
|
|
sort.Slice(sortedSpecIDs, func(i, j int) bool { return sortedSpecIDs[i] < sortedSpecIDs[j] })
|
|
|
|
var allManifests []iceberg.ManifestFile
|
|
for _, sid := range sortedSpecIDs {
|
|
se := specEntriesMap[sid]
|
|
ps, ok := specByID[int(se.specID)]
|
|
if !ok {
|
|
return "", nil, fmt.Errorf("partition spec %d not found in table metadata", se.specID)
|
|
}
|
|
|
|
var manifestBuf bytes.Buffer
|
|
manifestFileName := fmt.Sprintf("compact-%d-%s-spec%d.avro", newSnapID, artifactSuffix, se.specID)
|
|
newManifest, err := iceberg.WriteManifest(
|
|
path.Join("metadata", manifestFileName),
|
|
&manifestBuf,
|
|
version,
|
|
ps,
|
|
schema,
|
|
newSnapID,
|
|
se.entries,
|
|
)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("write compact manifest for spec %d: %w", se.specID, err)
|
|
}
|
|
|
|
if err := saveFilerFile(ctx, filerClient, metaDir, manifestFileName, manifestBuf.Bytes()); err != nil {
|
|
return "", nil, fmt.Errorf("save compact manifest for spec %d: %w", se.specID, err)
|
|
}
|
|
writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestFileName})
|
|
allManifests = append(allManifests, newManifest)
|
|
}
|
|
|
|
// Carry forward delete manifests only if deletes were NOT applied.
|
|
// When deletes were applied, they've been consumed during the merge.
|
|
// Position deletes reference specific data files — if all those files
|
|
// were compacted, the deletes are fully consumed. Equality deletes
|
|
// apply broadly, so they're only consumed if all data files were compacted.
|
|
if !config.ApplyDeletes || (len(positionDeletes) == 0 && len(eqDeleteGroups) == 0) {
|
|
for _, mf := range deleteManifests {
|
|
allManifests = append(allManifests, mf)
|
|
}
|
|
} else {
|
|
// Check if any non-compacted data files remain
|
|
hasUncompactedFiles := false
|
|
for _, entry := range allEntries {
|
|
if _, compacted := compactedPaths[entry.DataFile().FilePath()]; !compacted {
|
|
hasUncompactedFiles = true
|
|
break
|
|
}
|
|
}
|
|
if hasUncompactedFiles {
|
|
// Some files weren't compacted — carry forward delete manifests
|
|
// since deletes may still apply to those files.
|
|
for _, mf := range deleteManifests {
|
|
allManifests = append(allManifests, mf)
|
|
}
|
|
}
|
|
// If all files were compacted, deletes are fully consumed — don't carry forward.
|
|
}
|
|
|
|
// Write new manifest list
|
|
var manifestListBuf bytes.Buffer
|
|
seqNum := currentSnap.SequenceNumber + 1
|
|
err = iceberg.WriteManifestList(version, &manifestListBuf, newSnapID, &snapshotID, &seqNum, 0, allManifests)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("write compact manifest list: %w", err)
|
|
}
|
|
|
|
manifestListFileName := fmt.Sprintf("snap-%d-%s.avro", newSnapID, artifactSuffix)
|
|
if err := saveFilerFile(ctx, filerClient, metaDir, manifestListFileName, manifestListBuf.Bytes()); err != nil {
|
|
return "", nil, fmt.Errorf("save compact manifest list: %w", err)
|
|
}
|
|
writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestListFileName})
|
|
|
|
// Commit: add new snapshot and update main branch ref
|
|
manifestListLocation := path.Join("metadata", manifestListFileName)
|
|
err = h.commitWithRetry(ctx, filerClient, bucketName, tablePath, metadataFileName, config, func(currentMeta table.Metadata, builder *table.MetadataBuilder) error {
|
|
// Guard: verify table head hasn't advanced since we planned.
|
|
cs := currentMeta.CurrentSnapshot()
|
|
if cs == nil || cs.SnapshotID != snapshotID {
|
|
return errStalePlan
|
|
}
|
|
|
|
newSnapshot := &table.Snapshot{
|
|
SnapshotID: newSnapID,
|
|
ParentSnapshotID: &snapshotID,
|
|
SequenceNumber: seqNum,
|
|
TimestampMs: newSnapID,
|
|
ManifestList: manifestListLocation,
|
|
Summary: &table.Summary{
|
|
Operation: table.OpReplace,
|
|
Properties: map[string]string{
|
|
"maintenance": "compact_data_files",
|
|
"merged-files": fmt.Sprintf("%d", totalMerged),
|
|
"new-files": fmt.Sprintf("%d", len(newManifestEntries)),
|
|
"compaction-bins": fmt.Sprintf("%d", len(bins)),
|
|
},
|
|
},
|
|
SchemaID: func() *int {
|
|
id := schema.ID
|
|
return &id
|
|
}(),
|
|
}
|
|
if err := builder.AddSnapshot(newSnapshot); err != nil {
|
|
return err
|
|
}
|
|
return builder.SetSnapshotRef(table.MainBranch, newSnapID, table.BranchRef)
|
|
})
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("commit compaction: %w", err)
|
|
}
|
|
|
|
committed = true
|
|
metrics := map[string]int64{
|
|
MetricFilesMerged: int64(totalMerged),
|
|
MetricFilesWritten: int64(len(newManifestEntries)),
|
|
MetricBins: int64(len(bins)),
|
|
MetricDurationMs: time.Since(start).Milliseconds(),
|
|
}
|
|
return fmt.Sprintf("compacted %d files into %d (across %d bins)", totalMerged, len(newManifestEntries), len(bins)), metrics, nil
|
|
}
|
|
|
|
// buildCompactionBins groups small data files by partition for bin-packing.
|
|
// A file is "small" if it's below targetSize. A bin must have at least
|
|
// minFiles entries to be worth compacting.
|
|
func buildCompactionBins(entries []iceberg.ManifestEntry, targetSize int64, minFiles int) []compactionBin {
|
|
if minFiles < 2 {
|
|
minFiles = 2
|
|
}
|
|
|
|
// Group entries by spec ID + partition key so that files from different
|
|
// partition specs are never mixed in the same compaction bin.
|
|
groups := make(map[string]*compactionBin)
|
|
for _, entry := range entries {
|
|
df := entry.DataFile()
|
|
if df.FileFormat() != iceberg.ParquetFile {
|
|
continue
|
|
}
|
|
if df.FileSizeBytes() >= targetSize {
|
|
continue
|
|
}
|
|
|
|
partKey := partitionKey(df.Partition())
|
|
groupKey := fmt.Sprintf("spec%d\x00%s", df.SpecID(), partKey)
|
|
bin, ok := groups[groupKey]
|
|
if !ok {
|
|
bin = &compactionBin{
|
|
PartitionKey: partKey,
|
|
Partition: df.Partition(),
|
|
SpecID: df.SpecID(),
|
|
}
|
|
groups[groupKey] = bin
|
|
}
|
|
bin.Entries = append(bin.Entries, entry)
|
|
bin.TotalSize += df.FileSizeBytes()
|
|
}
|
|
|
|
// Filter to bins with enough files, splitting oversized bins
|
|
var result []compactionBin
|
|
for _, bin := range groups {
|
|
if len(bin.Entries) < minFiles {
|
|
continue
|
|
}
|
|
if bin.TotalSize <= targetSize {
|
|
result = append(result, *bin)
|
|
} else {
|
|
result = append(result, splitOversizedBin(*bin, targetSize, minFiles)...)
|
|
}
|
|
}
|
|
|
|
// Sort by spec ID then partition key for deterministic order
|
|
sort.Slice(result, func(i, j int) bool {
|
|
if result[i].SpecID != result[j].SpecID {
|
|
return result[i].SpecID < result[j].SpecID
|
|
}
|
|
return result[i].PartitionKey < result[j].PartitionKey
|
|
})
|
|
|
|
return result
|
|
}
|
|
|
|
// splitOversizedBin splits a bin whose total size exceeds targetSize into
|
|
// sub-bins that stay under targetSize. Bins that cannot reach minFiles
|
|
// without violating targetSize are left uncompacted rather than merged into
|
|
// oversized bins.
|
|
func splitOversizedBin(bin compactionBin, targetSize int64, minFiles int) []compactionBin {
|
|
// Sort largest-first for better packing.
|
|
sorted := make([]iceberg.ManifestEntry, len(bin.Entries))
|
|
copy(sorted, bin.Entries)
|
|
sort.Slice(sorted, func(i, j int) bool {
|
|
return sorted[i].DataFile().FileSizeBytes() > sorted[j].DataFile().FileSizeBytes()
|
|
})
|
|
|
|
var bins []compactionBin
|
|
current := compactionBin{
|
|
PartitionKey: bin.PartitionKey,
|
|
Partition: bin.Partition,
|
|
SpecID: bin.SpecID,
|
|
}
|
|
for _, entry := range sorted {
|
|
if current.TotalSize > 0 && current.TotalSize+entry.DataFile().FileSizeBytes() > targetSize {
|
|
bins = append(bins, current)
|
|
current = compactionBin{
|
|
PartitionKey: bin.PartitionKey,
|
|
Partition: bin.Partition,
|
|
SpecID: bin.SpecID,
|
|
}
|
|
}
|
|
current.Entries = append(current.Entries, entry)
|
|
current.TotalSize += entry.DataFile().FileSizeBytes()
|
|
}
|
|
if len(current.Entries) > 0 {
|
|
bins = append(bins, current)
|
|
}
|
|
|
|
var valid []compactionBin
|
|
var pending []compactionBin
|
|
for _, candidate := range bins {
|
|
if len(candidate.Entries) >= minFiles {
|
|
valid = append(valid, candidate)
|
|
continue
|
|
}
|
|
pending = append(pending, candidate)
|
|
}
|
|
|
|
// Try to fold entries from underfilled bins into valid bins when they fit.
|
|
for _, runt := range pending {
|
|
for _, entry := range runt.Entries {
|
|
bestIdx := -1
|
|
bestRemaining := int64(-1)
|
|
entrySize := entry.DataFile().FileSizeBytes()
|
|
for i := range valid {
|
|
remaining := targetSize - valid[i].TotalSize - entrySize
|
|
if remaining < 0 {
|
|
continue
|
|
}
|
|
if bestIdx == -1 || remaining < bestRemaining {
|
|
bestIdx = i
|
|
bestRemaining = remaining
|
|
}
|
|
}
|
|
if bestIdx >= 0 {
|
|
valid[bestIdx].Entries = append(valid[bestIdx].Entries, entry)
|
|
valid[bestIdx].TotalSize += entrySize
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(valid) == 0 {
|
|
return nil
|
|
}
|
|
return valid
|
|
}
|
|
|
|
// partitionKey creates a string key from a partition map for grouping.
|
|
// Values are JSON-encoded to avoid ambiguity when values contain commas or '='.
|
|
func partitionKey(partition map[int]any) string {
|
|
if len(partition) == 0 {
|
|
return "__unpartitioned__"
|
|
}
|
|
|
|
// Sort field IDs for deterministic key
|
|
ids := make([]int, 0, len(partition))
|
|
for id := range partition {
|
|
ids = append(ids, id)
|
|
}
|
|
sort.Ints(ids)
|
|
|
|
var parts []string
|
|
for _, id := range ids {
|
|
v, err := json.Marshal(partition[id])
|
|
if err != nil {
|
|
v = []byte(fmt.Sprintf("%x", fmt.Sprintf("%v", partition[id])))
|
|
}
|
|
parts = append(parts, fmt.Sprintf("%d=%s", id, v))
|
|
}
|
|
return strings.Join(parts, "\x00")
|
|
}
|
|
|
|
// collectPositionDeletes reads position delete Parquet files and returns a map
|
|
// from normalized data file path to sorted row positions that should be deleted.
|
|
// Paths are normalized so that absolute S3 URLs and relative paths match.
|
|
func collectPositionDeletes(
|
|
ctx context.Context,
|
|
filerClient filer_pb.SeaweedFilerClient,
|
|
bucketName, tablePath string,
|
|
deleteEntries []iceberg.ManifestEntry,
|
|
) (map[string][]int64, error) {
|
|
result := make(map[string][]int64)
|
|
for _, entry := range deleteEntries {
|
|
if entry.DataFile().ContentType() != iceberg.EntryContentPosDeletes {
|
|
continue
|
|
}
|
|
fileDeletes, err := readPositionDeleteFile(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read position delete file %s: %w", entry.DataFile().FilePath(), err)
|
|
}
|
|
for filePath, positions := range fileDeletes {
|
|
normalized := normalizeIcebergPath(filePath, bucketName, tablePath)
|
|
result[normalized] = append(result[normalized], positions...)
|
|
}
|
|
}
|
|
// Sort positions for each file (binary search during filtering)
|
|
for filePath := range result {
|
|
sort.Slice(result[filePath], func(i, j int) bool {
|
|
return result[filePath][i] < result[filePath][j]
|
|
})
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// readPositionDeleteFile reads a position delete Parquet file and returns a map
|
|
// from data file path to row positions. The file must have columns "file_path"
|
|
// (string) and "pos" (int32 or int64).
|
|
func readPositionDeleteFile(
|
|
ctx context.Context,
|
|
filerClient filer_pb.SeaweedFilerClient,
|
|
bucketName, tablePath, filePath string,
|
|
) (map[string][]int64, error) {
|
|
data, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, filePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
reader := parquet.NewReader(bytes.NewReader(data))
|
|
defer reader.Close()
|
|
|
|
pqSchema := reader.Schema()
|
|
filePathIdx := -1
|
|
posIdx := -1
|
|
for i, col := range pqSchema.Columns() {
|
|
name := strings.Join(col, ".")
|
|
switch name {
|
|
case "file_path":
|
|
filePathIdx = i
|
|
case "pos":
|
|
posIdx = i
|
|
}
|
|
}
|
|
if filePathIdx < 0 || posIdx < 0 {
|
|
return nil, fmt.Errorf("position delete file %s missing required columns (file_path=%d, pos=%d)", filePath, filePathIdx, posIdx)
|
|
}
|
|
|
|
result := make(map[string][]int64)
|
|
rows := make([]parquet.Row, 256)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
n, readErr := reader.ReadRows(rows)
|
|
for i := 0; i < n; i++ {
|
|
row := rows[i]
|
|
fp := row[filePathIdx].String()
|
|
pos := row[posIdx].Int64()
|
|
result[fp] = append(result[fp], pos)
|
|
}
|
|
if readErr != nil {
|
|
if readErr == io.EOF {
|
|
break
|
|
}
|
|
return nil, readErr
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// equalityDeleteGroup holds a set of delete keys for a specific set of equality field IDs.
|
|
// Different equality delete files may use different field IDs, so deletes are grouped.
|
|
type equalityDeleteGroup struct {
|
|
FieldIDs []int
|
|
Keys map[string]struct{}
|
|
}
|
|
|
|
// collectEqualityDeletes reads equality delete Parquet files and returns groups
|
|
// of delete keys, one per distinct set of equality field IDs. This correctly
|
|
// handles the case where different delete files use different equality columns.
|
|
func collectEqualityDeletes(
|
|
ctx context.Context,
|
|
filerClient filer_pb.SeaweedFilerClient,
|
|
bucketName, tablePath string,
|
|
deleteEntries []iceberg.ManifestEntry,
|
|
schema *iceberg.Schema,
|
|
) ([]equalityDeleteGroup, error) {
|
|
type groupState struct {
|
|
fieldIDs []int
|
|
keys map[string]struct{}
|
|
}
|
|
groups := make(map[string]*groupState)
|
|
|
|
for _, entry := range deleteEntries {
|
|
if entry.DataFile().ContentType() != iceberg.EntryContentEqDeletes {
|
|
continue
|
|
}
|
|
eqFieldIDs := entry.DataFile().EqualityFieldIDs()
|
|
if len(eqFieldIDs) == 0 {
|
|
continue
|
|
}
|
|
|
|
groupKey := fmt.Sprint(eqFieldIDs)
|
|
gs, ok := groups[groupKey]
|
|
if !ok {
|
|
gs = &groupState{fieldIDs: eqFieldIDs, keys: make(map[string]struct{})}
|
|
groups[groupKey] = gs
|
|
}
|
|
|
|
keys, err := readEqualityDeleteFile(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath(), eqFieldIDs, schema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read equality delete file %s: %w", entry.DataFile().FilePath(), err)
|
|
}
|
|
for k := range keys {
|
|
gs.keys[k] = struct{}{}
|
|
}
|
|
}
|
|
|
|
result := make([]equalityDeleteGroup, 0, len(groups))
|
|
for _, gs := range groups {
|
|
result = append(result, equalityDeleteGroup{FieldIDs: gs.fieldIDs, Keys: gs.keys})
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// readEqualityDeleteFile reads an equality delete Parquet file and returns a set
|
|
// of composite keys built from the specified field IDs. The Iceberg schema is used
|
|
// to map field IDs to column names, which are then looked up in the Parquet schema.
|
|
func readEqualityDeleteFile(
|
|
ctx context.Context,
|
|
filerClient filer_pb.SeaweedFilerClient,
|
|
bucketName, tablePath, filePath string,
|
|
fieldIDs []int,
|
|
icebergSchema *iceberg.Schema,
|
|
) (map[string]struct{}, error) {
|
|
data, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, filePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
reader := parquet.NewReader(bytes.NewReader(data))
|
|
defer reader.Close()
|
|
|
|
colIndices, err := resolveEqualityColIndices(reader.Schema(), fieldIDs, icebergSchema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("resolve columns in %s: %w", filePath, err)
|
|
}
|
|
|
|
result := make(map[string]struct{})
|
|
rows := make([]parquet.Row, 256)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
n, readErr := reader.ReadRows(rows)
|
|
for i := 0; i < n; i++ {
|
|
key := buildEqualityKey(rows[i], colIndices)
|
|
result[key] = struct{}{}
|
|
}
|
|
if readErr != nil {
|
|
if readErr == io.EOF {
|
|
break
|
|
}
|
|
return nil, readErr
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// buildEqualityKey builds a composite string key from specific column values
|
|
// in a row. Each value is serialized as "kind:length:value" to avoid ambiguity
|
|
// between types (e.g., int 123 vs string "123") and to prevent collisions from
|
|
// values containing separator characters.
|
|
func buildEqualityKey(row parquet.Row, colIndices []int) string {
|
|
if len(colIndices) == 1 {
|
|
v := row[colIndices[0]]
|
|
s := v.String()
|
|
return fmt.Sprintf("%d:%d:%s", v.Kind(), len(s), s)
|
|
}
|
|
var b strings.Builder
|
|
for _, idx := range colIndices {
|
|
v := row[idx]
|
|
s := v.String()
|
|
fmt.Fprintf(&b, "%d:%d:%s", v.Kind(), len(s), s)
|
|
}
|
|
return b.String()
|
|
}
|
|
|
|
// resolveEqualityColIndices maps Iceberg field IDs to Parquet column indices.
|
|
func resolveEqualityColIndices(pqSchema *parquet.Schema, fieldIDs []int, icebergSchema *iceberg.Schema) ([]int, error) {
|
|
if len(fieldIDs) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
colNameToIdx := make(map[string]int)
|
|
for i, col := range pqSchema.Columns() {
|
|
colNameToIdx[strings.Join(col, ".")] = i
|
|
}
|
|
|
|
indices := make([]int, len(fieldIDs))
|
|
for i, fid := range fieldIDs {
|
|
field, ok := icebergSchema.FindFieldByID(fid)
|
|
if !ok {
|
|
return nil, fmt.Errorf("field ID %d not found in iceberg schema", fid)
|
|
}
|
|
idx, ok := colNameToIdx[field.Name]
|
|
if !ok {
|
|
return nil, fmt.Errorf("column %q (field ID %d) not found in parquet schema", field.Name, fid)
|
|
}
|
|
indices[i] = idx
|
|
}
|
|
return indices, nil
|
|
}
|
|
|
|
// mergeParquetFiles reads multiple small Parquet files and merges them into
|
|
// a single Parquet file, optionally filtering out rows matching position or
|
|
// equality deletes. Files are processed one at a time to keep memory usage
|
|
// proportional to a single input file plus the output buffer.
|
|
func mergeParquetFiles(
|
|
ctx context.Context,
|
|
filerClient filer_pb.SeaweedFilerClient,
|
|
bucketName, tablePath string,
|
|
entries []iceberg.ManifestEntry,
|
|
positionDeletes map[string][]int64,
|
|
eqDeleteGroups []equalityDeleteGroup,
|
|
icebergSchema *iceberg.Schema,
|
|
) ([]byte, int64, error) {
|
|
if len(entries) == 0 {
|
|
return nil, 0, fmt.Errorf("no entries to merge")
|
|
}
|
|
|
|
// Load the first file to obtain the schema for the writer.
|
|
firstData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, entries[0].DataFile().FilePath())
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("read parquet file %s: %w", entries[0].DataFile().FilePath(), err)
|
|
}
|
|
firstReader := parquet.NewReader(bytes.NewReader(firstData))
|
|
parquetSchema := firstReader.Schema()
|
|
if parquetSchema == nil {
|
|
firstReader.Close()
|
|
return nil, 0, fmt.Errorf("no parquet schema found in %s", entries[0].DataFile().FilePath())
|
|
}
|
|
|
|
// Resolve equality delete column indices for each group.
|
|
type resolvedEqGroup struct {
|
|
colIndices []int
|
|
keys map[string]struct{}
|
|
}
|
|
var resolvedEqGroups []resolvedEqGroup
|
|
if len(eqDeleteGroups) > 0 && icebergSchema != nil {
|
|
for _, g := range eqDeleteGroups {
|
|
indices, resolveErr := resolveEqualityColIndices(parquetSchema, g.FieldIDs, icebergSchema)
|
|
if resolveErr != nil {
|
|
firstReader.Close()
|
|
return nil, 0, fmt.Errorf("resolve equality columns: %w", resolveErr)
|
|
}
|
|
resolvedEqGroups = append(resolvedEqGroups, resolvedEqGroup{colIndices: indices, keys: g.Keys})
|
|
}
|
|
}
|
|
|
|
var outputBuf bytes.Buffer
|
|
writer := parquet.NewWriter(&outputBuf, parquetSchema)
|
|
|
|
var totalRows int64
|
|
rows := make([]parquet.Row, 256)
|
|
hasEqDeletes := len(resolvedEqGroups) > 0
|
|
|
|
// drainReader streams rows from reader into writer, filtering out deleted
|
|
// rows. source is the data file path (used for error messages and
|
|
// position delete lookups).
|
|
drainReader := func(reader *parquet.Reader, source string) error {
|
|
defer reader.Close()
|
|
|
|
// Normalize source path so it matches the normalized keys in positionDeletes.
|
|
normalizedSource := normalizeIcebergPath(source, bucketName, tablePath)
|
|
posDeletes := positionDeletes[normalizedSource]
|
|
posDeleteIdx := 0
|
|
var absolutePos int64
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
n, readErr := reader.ReadRows(rows)
|
|
if n > 0 {
|
|
// Filter rows if we have any deletes
|
|
if len(posDeletes) > 0 || hasEqDeletes {
|
|
writeIdx := 0
|
|
for i := 0; i < n; i++ {
|
|
rowPos := absolutePos + int64(i)
|
|
|
|
// Check position deletes (sorted, so advance index)
|
|
if len(posDeletes) > 0 {
|
|
for posDeleteIdx < len(posDeletes) && posDeletes[posDeleteIdx] < rowPos {
|
|
posDeleteIdx++
|
|
}
|
|
if posDeleteIdx < len(posDeletes) && posDeletes[posDeleteIdx] == rowPos {
|
|
posDeleteIdx++
|
|
continue // skip this row
|
|
}
|
|
}
|
|
|
|
// Check equality deletes — each group independently
|
|
deleted := false
|
|
for _, g := range resolvedEqGroups {
|
|
key := buildEqualityKey(rows[i], g.colIndices)
|
|
if _, ok := g.keys[key]; ok {
|
|
deleted = true
|
|
break
|
|
}
|
|
}
|
|
if deleted {
|
|
continue // skip this row
|
|
}
|
|
|
|
rows[writeIdx] = rows[i]
|
|
writeIdx++
|
|
}
|
|
absolutePos += int64(n)
|
|
if writeIdx > 0 {
|
|
if _, writeErr := writer.WriteRows(rows[:writeIdx]); writeErr != nil {
|
|
return fmt.Errorf("write rows from %s: %w", source, writeErr)
|
|
}
|
|
totalRows += int64(writeIdx)
|
|
}
|
|
} else {
|
|
if _, writeErr := writer.WriteRows(rows[:n]); writeErr != nil {
|
|
return fmt.Errorf("write rows from %s: %w", source, writeErr)
|
|
}
|
|
totalRows += int64(n)
|
|
}
|
|
}
|
|
if readErr != nil {
|
|
if readErr == io.EOF {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("read rows from %s: %w", source, readErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Drain the first file.
|
|
firstSource := entries[0].DataFile().FilePath()
|
|
if err := drainReader(firstReader, firstSource); err != nil {
|
|
writer.Close()
|
|
return nil, 0, err
|
|
}
|
|
firstData = nil // allow GC
|
|
|
|
// Process remaining files one at a time.
|
|
for _, entry := range entries[1:] {
|
|
select {
|
|
case <-ctx.Done():
|
|
writer.Close()
|
|
return nil, 0, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
data, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath())
|
|
if err != nil {
|
|
writer.Close()
|
|
return nil, 0, fmt.Errorf("read parquet file %s: %w", entry.DataFile().FilePath(), err)
|
|
}
|
|
|
|
reader := parquet.NewReader(bytes.NewReader(data))
|
|
if !schemasEqual(parquetSchema, reader.Schema()) {
|
|
reader.Close()
|
|
writer.Close()
|
|
return nil, 0, fmt.Errorf("schema mismatch in %s: cannot merge files with different schemas", entry.DataFile().FilePath())
|
|
}
|
|
|
|
if err := drainReader(reader, entry.DataFile().FilePath()); err != nil {
|
|
writer.Close()
|
|
return nil, 0, err
|
|
}
|
|
// data goes out of scope here, eligible for GC before next iteration.
|
|
}
|
|
|
|
if err := writer.Close(); err != nil {
|
|
return nil, 0, fmt.Errorf("close writer: %w", err)
|
|
}
|
|
|
|
return outputBuf.Bytes(), totalRows, nil
|
|
}
|
|
|
|
// compactRandomSuffix returns a short random hex string for use in artifact
|
|
// filenames to prevent collisions between concurrent runs.
|
|
func compactRandomSuffix() string {
|
|
b := make([]byte, 4)
|
|
if _, err := rand.Read(b); err != nil {
|
|
return fmt.Sprintf("%x", time.Now().UnixNano()&0xFFFFFFFF)
|
|
}
|
|
return hex.EncodeToString(b)
|
|
}
|
|
|
|
// schemasEqual compares two parquet schemas structurally.
|
|
func schemasEqual(a, b *parquet.Schema) bool {
|
|
if a == b {
|
|
return true
|
|
}
|
|
if a == nil || b == nil {
|
|
return false
|
|
}
|
|
return parquet.EqualNodes(a, b)
|
|
}
|
|
|
|
// ensureFilerDir ensures a directory exists in the filer.
|
|
func ensureFilerDir(ctx context.Context, client filer_pb.SeaweedFilerClient, dirPath string) error {
|
|
parentDir := path.Dir(dirPath)
|
|
dirName := path.Base(dirPath)
|
|
|
|
_, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: parentDir,
|
|
Name: dirName,
|
|
})
|
|
if err == nil {
|
|
return nil // already exists
|
|
}
|
|
if !errors.Is(err, filer_pb.ErrNotFound) && status.Code(err) != codes.NotFound {
|
|
return fmt.Errorf("lookup dir %s: %w", dirPath, err)
|
|
}
|
|
|
|
resp, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
|
|
Directory: parentDir,
|
|
Entry: &filer_pb.Entry{
|
|
Name: dirName,
|
|
IsDirectory: true,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
Mtime: time.Now().Unix(),
|
|
Crtime: time.Now().Unix(),
|
|
FileMode: uint32(0755),
|
|
},
|
|
},
|
|
})
|
|
if createErr != nil {
|
|
return createErr
|
|
}
|
|
if resp.Error != "" && !strings.Contains(resp.Error, "exist") {
|
|
return fmt.Errorf("create dir %s: %s", dirPath, resp.Error)
|
|
}
|
|
return nil
|
|
}
|