Browse Source

iceberg: detect maintenance work per operation (#8639)

* iceberg: detect maintenance work per operation

* iceberg: ignore delete manifests during detection

* iceberg: clean up detection maintenance planning

* iceberg: tighten detection manifest heuristics

* Potential fix for code scanning alert no. 330: Incorrect conversion between integer types

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>

* iceberg: tolerate per-operation detection errors

* iceberg: fix fake metadata location versioning

* iceberg: check snapshot expiry before manifest loads

* iceberg: make expire-snapshots switch case explicit

---------

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
pull/8436/merge
Chris Lu 21 hours ago
committed by GitHub
parent
commit
6b2b442450
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 5
      weed/plugin/worker/iceberg/config.go
  2. 253
      weed/plugin/worker/iceberg/detection.go
  3. 470
      weed/plugin/worker/iceberg/exec_test.go
  4. 89
      weed/plugin/worker/iceberg/handler_test.go
  5. 66
      weed/plugin/worker/iceberg/operations.go

5
weed/plugin/worker/iceberg/config.go

@ -19,6 +19,7 @@ const (
defaultTargetFileSizeMB = 256
defaultMinInputFiles = 5
defaultMinManifestsToRewrite = 5
minManifestsToRewrite = 2
defaultOperations = "all"
// Metric keys returned by maintenance operations.
@ -80,8 +81,8 @@ func ParseConfig(values map[string]*plugin_pb.ConfigValue) Config {
if cfg.MinInputFiles < 2 {
cfg.MinInputFiles = defaultMinInputFiles
}
if cfg.MinManifestsToRewrite < 2 {
cfg.MinManifestsToRewrite = 2
if cfg.MinManifestsToRewrite < minManifestsToRewrite {
cfg.MinManifestsToRewrite = minManifestsToRewrite
}
return cfg

253
weed/plugin/worker/iceberg/detection.go

@ -1,6 +1,7 @@
package iceberg
import (
"bytes"
"context"
"encoding/json"
"fmt"
@ -8,6 +9,7 @@ import (
"strings"
"time"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -18,11 +20,12 @@ import (
// tableInfo captures metadata about a table for detection/execution.
type tableInfo struct {
BucketName string
Namespace string
TableName string
TablePath string // namespace/tableName
Metadata table.Metadata
BucketName string
Namespace string
TableName string
TablePath string // namespace/tableName
MetadataFileName string
Metadata table.Metadata
}
// scanTablesForMaintenance enumerates table buckets and their tables,
@ -37,13 +40,16 @@ func (h *Handler) scanTablesForMaintenance(
limit int,
) ([]tableInfo, error) {
var tables []tableInfo
ops, err := parseOperations(config.Operations)
if err != nil {
return nil, fmt.Errorf("parse operations: %w", err)
}
// Compile wildcard matchers once (nil = match all)
bucketMatchers := wildcard.CompileWildcardMatchers(bucketFilter)
nsMatchers := wildcard.CompileWildcardMatchers(namespaceFilter)
tableMatchers := wildcard.CompileWildcardMatchers(tableFilter)
// List entries under /buckets to find table buckets
bucketsPath := s3tables.TablesPath
bucketEntries, err := listFilerEntries(ctx, filerClient, bucketsPath, "")
if err != nil {
@ -117,7 +123,8 @@ func (h *Handler) scanTablesForMaintenance(
// Parse the internal metadata to get FullMetadata
var internalMeta struct {
Metadata *struct {
MetadataLocation string `json:"metadataLocation,omitempty"`
Metadata *struct {
FullMetadata json.RawMessage `json:"fullMetadata,omitempty"`
} `json:"metadata,omitempty"`
}
@ -135,13 +142,21 @@ func (h *Handler) scanTablesForMaintenance(
continue
}
if needsMaintenance(icebergMeta, config) {
tablePath := path.Join(nsName, tblName)
metadataFileName := metadataFileNameFromLocation(internalMeta.MetadataLocation, bucketName, tablePath)
needsWork, err := h.tableNeedsMaintenance(ctx, filerClient, bucketName, tablePath, icebergMeta, metadataFileName, config, ops)
if err != nil {
glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot evaluate maintenance need: %v", bucketName, nsName, tblName, err)
continue
}
if needsWork {
tables = append(tables, tableInfo{
BucketName: bucketName,
Namespace: nsName,
TableName: tblName,
TablePath: path.Join(nsName, tblName),
Metadata: icebergMeta,
BucketName: bucketName,
Namespace: nsName,
TableName: tblName,
TablePath: tablePath,
MetadataFileName: metadataFileName,
Metadata: icebergMeta,
})
if limit > 0 && len(tables) > limit {
return tables, nil
@ -154,8 +169,216 @@ func (h *Handler) scanTablesForMaintenance(
return tables, nil
}
// needsMaintenance checks if a table needs any maintenance based on
// metadata-only thresholds (no manifest reading).
func normalizeDetectionConfig(config Config) Config {
normalized := config
if normalized.TargetFileSizeBytes <= 0 {
normalized.TargetFileSizeBytes = defaultTargetFileSizeMB * 1024 * 1024
}
if normalized.MinInputFiles < 2 {
normalized.MinInputFiles = defaultMinInputFiles
}
if normalized.MinManifestsToRewrite < minManifestsToRewrite {
normalized.MinManifestsToRewrite = minManifestsToRewrite
}
if normalized.OrphanOlderThanHours <= 0 {
normalized.OrphanOlderThanHours = defaultOrphanOlderThanHours
}
return normalized
}
func (h *Handler) tableNeedsMaintenance(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
meta table.Metadata,
metadataFileName string,
config Config,
ops []string,
) (bool, error) {
config = normalizeDetectionConfig(config)
// Evaluate the metadata-only expiration check first so large tables do not
// pay for manifest reads when snapshot expiry already makes them eligible.
for _, op := range ops {
if op == "expire_snapshots" && needsMaintenance(meta, config) {
return true, nil
}
}
loadManifests := func() ([]iceberg.ManifestFile, error) {
return loadCurrentManifests(ctx, filerClient, bucketName, tablePath, meta)
}
var currentManifests []iceberg.ManifestFile
var manifestsErr error
manifestsLoaded := false
getCurrentManifests := func() ([]iceberg.ManifestFile, error) {
if manifestsLoaded {
return currentManifests, manifestsErr
}
currentManifests, manifestsErr = loadManifests()
manifestsLoaded = true
return currentManifests, manifestsErr
}
var opEvalErrors []string
for _, op := range ops {
switch op {
case "expire_snapshots":
// Handled by the metadata-only check above.
continue
case "compact":
manifests, err := getCurrentManifests()
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
eligible, err := hasEligibleCompaction(ctx, filerClient, bucketName, tablePath, manifests, config)
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
if eligible {
return true, nil
}
case "rewrite_manifests":
manifests, err := getCurrentManifests()
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
if countDataManifests(manifests) >= config.MinManifestsToRewrite {
return true, nil
}
case "remove_orphans":
if metadataFileName == "" {
_, currentMetadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath)
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
metadataFileName = currentMetadataFileName
}
orphanCandidates, err := collectOrphanCandidates(ctx, filerClient, bucketName, tablePath, meta, metadataFileName, config.OrphanOlderThanHours)
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
if len(orphanCandidates) > 0 {
return true, nil
}
}
}
if len(opEvalErrors) > 0 {
return false, fmt.Errorf("evaluate maintenance operations: %s", strings.Join(opEvalErrors, "; "))
}
return false, nil
}
func metadataFileNameFromLocation(location, bucketName, tablePath string) string {
if location == "" {
return ""
}
return path.Base(normalizeIcebergPath(location, bucketName, tablePath))
}
func countDataManifests(manifests []iceberg.ManifestFile) int64 {
var count int64
for _, mf := range manifests {
if mf.ManifestContent() == iceberg.ManifestContentData {
count++
}
}
return count
}
func loadCurrentManifests(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
meta table.Metadata,
) ([]iceberg.ManifestFile, error) {
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil || currentSnap.ManifestList == "" {
return nil, nil
}
manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList)
if err != nil {
return nil, fmt.Errorf("read manifest list: %w", err)
}
manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData))
if err != nil {
return nil, fmt.Errorf("parse manifest list: %w", err)
}
return manifests, nil
}
func hasEligibleCompaction(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
manifests []iceberg.ManifestFile,
config Config,
) (bool, error) {
if len(manifests) == 0 {
return false, nil
}
minInputFiles, err := compactionMinInputFiles(config.MinInputFiles)
if err != nil {
return false, err
}
var dataManifests []iceberg.ManifestFile
specIDs := make(map[int32]struct{})
for _, mf := range manifests {
if mf.ManifestContent() != iceberg.ManifestContentData {
continue
}
dataManifests = append(dataManifests, mf)
specIDs[mf.PartitionSpecID()] = struct{}{}
}
if len(dataManifests) == 0 {
return false, nil
}
if len(specIDs) > 1 {
return false, nil
}
var allEntries []iceberg.ManifestEntry
for _, mf := range dataManifests {
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
if err != nil {
return false, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if err != nil {
return false, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err)
}
allEntries = append(allEntries, entries...)
}
bins := buildCompactionBins(allEntries, config.TargetFileSizeBytes, minInputFiles)
return len(bins) > 0, nil
}
func compactionMinInputFiles(minInputFiles int64) (int, error) {
// Ensure the configured value is positive and fits into the platform's int type
if minInputFiles <= 0 {
return 0, fmt.Errorf("min input files must be positive, got %d", minInputFiles)
}
maxInt := int64(^uint(0) >> 1)
if minInputFiles > maxInt {
return 0, fmt.Errorf("min input files %d exceeds platform int size", minInputFiles)
}
return int(minInputFiles), nil
}
// needsMaintenance checks whether snapshot expiration work is needed based on
// metadata-only thresholds.
func needsMaintenance(meta table.Metadata, config Config) bool {
snapshots := meta.Snapshots()
if len(snapshots) == 0 {

470
weed/plugin/worker/iceberg/exec_test.go

@ -243,8 +243,10 @@ func populateTable(t *testing.T, fs *fakeFilerServer, setup tableSetup) table.Me
}
// Build internal metadata xattr
const metadataVersion = 1
internalMeta := map[string]interface{}{
"metadataVersion": 1,
"metadataVersion": metadataVersion,
"metadataLocation": path.Join("metadata", fmt.Sprintf("v%d.metadata.json", metadataVersion)),
"metadata": map[string]interface{}{
"fullMetadata": json.RawMessage(fullMetadataJSON),
},
@ -370,6 +372,66 @@ func populateTable(t *testing.T, fs *fakeFilerServer, setup tableSetup) table.Me
return meta
}
func writeCurrentSnapshotManifests(t *testing.T, fs *fakeFilerServer, setup tableSetup, meta table.Metadata, manifestEntries [][]iceberg.ManifestEntry) {
t.Helper()
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil {
t.Fatal("current snapshot is required")
}
metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata")
version := meta.Version()
schema := meta.CurrentSchema()
spec := meta.PartitionSpec()
var manifests []iceberg.ManifestFile
for i, entries := range manifestEntries {
manifestName := fmt.Sprintf("detect-manifest-%d.avro", i+1)
var manifestBuf bytes.Buffer
mf, err := iceberg.WriteManifest(
path.Join("metadata", manifestName),
&manifestBuf,
version,
spec,
schema,
currentSnap.SnapshotID,
entries,
)
if err != nil {
t.Fatalf("write manifest %d: %v", i+1, err)
}
fs.putEntry(metaDir, manifestName, &filer_pb.Entry{
Name: manifestName,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
FileSize: uint64(manifestBuf.Len()),
},
Content: manifestBuf.Bytes(),
})
manifests = append(manifests, mf)
}
var manifestListBuf bytes.Buffer
seqNum := currentSnap.SequenceNumber
if err := iceberg.WriteManifestList(version, &manifestListBuf, currentSnap.SnapshotID, currentSnap.ParentSnapshotID, &seqNum, 0, manifests); err != nil {
t.Fatalf("write current manifest list: %v", err)
}
fs.putEntry(metaDir, path.Base(currentSnap.ManifestList), &filer_pb.Entry{
Name: path.Base(currentSnap.ManifestList),
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
FileSize: uint64(manifestListBuf.Len()),
},
Content: manifestListBuf.Bytes(),
})
}
func makeManifestEntries(t *testing.T, specs []testEntrySpec, snapshotID int64) []iceberg.ManifestEntry {
t.Helper()
return makeManifestEntriesWithSnapshot(t, specs, snapshotID, iceberg.EntryStatusADDED)
}
// ---------------------------------------------------------------------------
// Recording senders for Execute tests
// ---------------------------------------------------------------------------
@ -934,6 +996,412 @@ func TestConnectToFilerFailsWhenAllAddressesAreUnreachable(t *testing.T) {
}
}
func TestDetectSchedulesCompactionWithoutSnapshotPressure(t *testing.T) {
fs, client := startFakeFiler(t)
now := time.Now().UnixMilli()
setup := tableSetup{
BucketName: "test-bucket",
Namespace: "analytics",
TableName: "events",
Snapshots: []table.Snapshot{
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
},
}
meta := populateTable(t, fs, setup)
writeCurrentSnapshotManifests(t, fs, setup, meta, [][]iceberg.ManifestEntry{
makeManifestEntries(t, []testEntrySpec{
{path: "data/small-1.parquet", size: 1024, partition: map[int]any{}},
{path: "data/small-2.parquet", size: 1024, partition: map[int]any{}},
{path: "data/small-3.parquet", size: 1024, partition: map[int]any{}},
}, 1),
})
handler := NewHandler(nil)
config := Config{
SnapshotRetentionHours: 24 * 365,
MaxSnapshotsToKeep: 10,
TargetFileSizeBytes: 4096,
MinInputFiles: 2,
Operations: "compact",
}
tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0)
if err != nil {
t.Fatalf("scanTablesForMaintenance failed: %v", err)
}
if len(tables) != 1 {
t.Fatalf("expected 1 compaction candidate, got %d", len(tables))
}
}
func TestDetectSchedulesCompactionWithDeleteManifestPresent(t *testing.T) {
fs, client := startFakeFiler(t)
now := time.Now().UnixMilli()
setup := tableSetup{
BucketName: "test-bucket",
Namespace: "analytics",
TableName: "events",
Snapshots: []table.Snapshot{
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
},
}
meta := populateTable(t, fs, setup)
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil {
t.Fatal("current snapshot is required")
}
metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata")
version := meta.Version()
schema := meta.CurrentSchema()
spec := meta.PartitionSpec()
dataEntries := makeManifestEntries(t, []testEntrySpec{
{path: "data/small-1.parquet", size: 1024, partition: map[int]any{}},
{path: "data/small-2.parquet", size: 1024, partition: map[int]any{}},
{path: "data/small-3.parquet", size: 1024, partition: map[int]any{}},
}, currentSnap.SnapshotID)
var dataManifestBuf bytes.Buffer
dataManifestName := "detect-manifest-1.avro"
dataManifest, err := iceberg.WriteManifest(
path.Join("metadata", dataManifestName),
&dataManifestBuf,
version,
spec,
schema,
currentSnap.SnapshotID,
dataEntries,
)
if err != nil {
t.Fatalf("write data manifest: %v", err)
}
fs.putEntry(metaDir, dataManifestName, &filer_pb.Entry{
Name: dataManifestName,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
FileSize: uint64(dataManifestBuf.Len()),
},
Content: dataManifestBuf.Bytes(),
})
deleteManifest := iceberg.NewManifestFile(
version,
path.Join("metadata", "detect-delete-manifest.avro"),
0,
int32(spec.ID()),
currentSnap.SnapshotID,
).Content(iceberg.ManifestContentDeletes).
SequenceNum(currentSnap.SequenceNumber, currentSnap.SequenceNumber).
DeletedFiles(1).
DeletedRows(1).
Build()
var manifestListBuf bytes.Buffer
seqNum := currentSnap.SequenceNumber
if err := iceberg.WriteManifestList(
version,
&manifestListBuf,
currentSnap.SnapshotID,
currentSnap.ParentSnapshotID,
&seqNum,
0,
[]iceberg.ManifestFile{dataManifest, deleteManifest},
); err != nil {
t.Fatalf("write manifest list: %v", err)
}
fs.putEntry(metaDir, path.Base(currentSnap.ManifestList), &filer_pb.Entry{
Name: path.Base(currentSnap.ManifestList),
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
FileSize: uint64(manifestListBuf.Len()),
},
Content: manifestListBuf.Bytes(),
})
handler := NewHandler(nil)
config := Config{
SnapshotRetentionHours: 24 * 365,
MaxSnapshotsToKeep: 10,
TargetFileSizeBytes: 4096,
MinInputFiles: 2,
Operations: "compact",
}
tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0)
if err != nil {
t.Fatalf("scanTablesForMaintenance failed: %v", err)
}
if len(tables) != 1 {
t.Fatalf("expected 1 compaction candidate with delete manifest present, got %d", len(tables))
}
}
func TestDetectSchedulesSnapshotExpiryDespiteCompactionEvaluationError(t *testing.T) {
fs, client := startFakeFiler(t)
now := time.Now().UnixMilli()
setup := tableSetup{
BucketName: "test-bucket",
Namespace: "analytics",
TableName: "events",
Snapshots: []table.Snapshot{
{SnapshotID: 1, TimestampMs: now - 1, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
},
}
populateTable(t, fs, setup)
metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata")
manifestListName := path.Base(setup.Snapshots[0].ManifestList)
fs.putEntry(metaDir, manifestListName, &filer_pb.Entry{
Name: manifestListName,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
FileSize: uint64(len("not-a-manifest-list")),
},
Content: []byte("not-a-manifest-list"),
})
handler := NewHandler(nil)
config := Config{
SnapshotRetentionHours: 0,
MaxSnapshotsToKeep: 10,
Operations: "compact,expire_snapshots",
}
tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0)
if err != nil {
t.Fatalf("scanTablesForMaintenance failed: %v", err)
}
if len(tables) != 1 {
t.Fatalf("expected snapshot expiration candidate despite compaction evaluation error, got %d", len(tables))
}
}
func TestDetectSchedulesManifestRewriteWithoutSnapshotPressure(t *testing.T) {
fs, client := startFakeFiler(t)
now := time.Now().UnixMilli()
setup := tableSetup{
BucketName: "test-bucket",
Namespace: "analytics",
TableName: "events",
Snapshots: []table.Snapshot{
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
},
}
meta := populateTable(t, fs, setup)
manifestEntries := make([][]iceberg.ManifestEntry, 0, 5)
for i := 0; i < 5; i++ {
manifestEntries = append(manifestEntries, makeManifestEntries(t, []testEntrySpec{
{path: fmt.Sprintf("data/rewrite-%d.parquet", i), size: 1024, partition: map[int]any{}},
}, 1))
}
writeCurrentSnapshotManifests(t, fs, setup, meta, manifestEntries)
handler := NewHandler(nil)
config := Config{
SnapshotRetentionHours: 24 * 365,
MaxSnapshotsToKeep: 10,
MinManifestsToRewrite: 5,
Operations: "rewrite_manifests",
}
tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0)
if err != nil {
t.Fatalf("scanTablesForMaintenance failed: %v", err)
}
if len(tables) != 1 {
t.Fatalf("expected 1 manifest rewrite candidate, got %d", len(tables))
}
}
func TestDetectDoesNotScheduleManifestRewriteFromDeleteManifestsOnly(t *testing.T) {
fs, client := startFakeFiler(t)
now := time.Now().UnixMilli()
setup := tableSetup{
BucketName: "test-bucket",
Namespace: "analytics",
TableName: "events",
Snapshots: []table.Snapshot{
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
},
}
meta := populateTable(t, fs, setup)
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil {
t.Fatal("current snapshot is required")
}
metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata")
version := meta.Version()
schema := meta.CurrentSchema()
spec := meta.PartitionSpec()
dataEntries := makeManifestEntries(t, []testEntrySpec{
{path: "data/rewrite-0.parquet", size: 1024, partition: map[int]any{}},
}, currentSnap.SnapshotID)
var dataManifestBuf bytes.Buffer
dataManifestName := "detect-rewrite-data.avro"
dataManifest, err := iceberg.WriteManifest(
path.Join("metadata", dataManifestName),
&dataManifestBuf,
version,
spec,
schema,
currentSnap.SnapshotID,
dataEntries,
)
if err != nil {
t.Fatalf("write data manifest: %v", err)
}
fs.putEntry(metaDir, dataManifestName, &filer_pb.Entry{
Name: dataManifestName,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
FileSize: uint64(dataManifestBuf.Len()),
},
Content: dataManifestBuf.Bytes(),
})
manifests := []iceberg.ManifestFile{dataManifest}
for i := 0; i < 4; i++ {
deleteManifest := iceberg.NewManifestFile(
version,
path.Join("metadata", fmt.Sprintf("detect-delete-%d.avro", i)),
0,
int32(spec.ID()),
currentSnap.SnapshotID,
).Content(iceberg.ManifestContentDeletes).
SequenceNum(currentSnap.SequenceNumber, currentSnap.SequenceNumber).
DeletedFiles(1).
DeletedRows(1).
Build()
manifests = append(manifests, deleteManifest)
}
var manifestListBuf bytes.Buffer
seqNum := currentSnap.SequenceNumber
if err := iceberg.WriteManifestList(
version,
&manifestListBuf,
currentSnap.SnapshotID,
currentSnap.ParentSnapshotID,
&seqNum,
0,
manifests,
); err != nil {
t.Fatalf("write manifest list: %v", err)
}
fs.putEntry(metaDir, path.Base(currentSnap.ManifestList), &filer_pb.Entry{
Name: path.Base(currentSnap.ManifestList),
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
FileSize: uint64(manifestListBuf.Len()),
},
Content: manifestListBuf.Bytes(),
})
handler := NewHandler(nil)
config := Config{
SnapshotRetentionHours: 24 * 365,
MaxSnapshotsToKeep: 10,
MinManifestsToRewrite: 2,
Operations: "rewrite_manifests",
}
tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0)
if err != nil {
t.Fatalf("scanTablesForMaintenance failed: %v", err)
}
if len(tables) != 0 {
t.Fatalf("expected no manifest rewrite candidate when only one data manifest exists, got %d", len(tables))
}
}
func TestDetectSchedulesOrphanCleanupWithoutSnapshotPressure(t *testing.T) {
fs, client := startFakeFiler(t)
now := time.Now().UnixMilli()
setup := tableSetup{
BucketName: "test-bucket",
Namespace: "analytics",
TableName: "events",
Snapshots: []table.Snapshot{
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
},
}
populateTable(t, fs, setup)
dataDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "data")
fs.putEntry(dataDir, "stale-orphan.parquet", &filer_pb.Entry{
Name: "stale-orphan.parquet",
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Add(-200 * time.Hour).Unix(),
FileSize: 100,
},
Content: []byte("orphan"),
})
handler := NewHandler(nil)
config := Config{
SnapshotRetentionHours: 24 * 365,
MaxSnapshotsToKeep: 10,
OrphanOlderThanHours: 72,
Operations: "remove_orphans",
}
tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0)
if err != nil {
t.Fatalf("scanTablesForMaintenance failed: %v", err)
}
if len(tables) != 1 {
t.Fatalf("expected 1 orphan cleanup candidate, got %d", len(tables))
}
}
func TestDetectSchedulesOrphanCleanupWithoutSnapshots(t *testing.T) {
fs, client := startFakeFiler(t)
setup := tableSetup{
BucketName: "test-bucket",
Namespace: "analytics",
TableName: "events",
}
populateTable(t, fs, setup)
dataDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "data")
fs.putEntry(dataDir, "stale-orphan.parquet", &filer_pb.Entry{
Name: "stale-orphan.parquet",
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Add(-200 * time.Hour).Unix(),
FileSize: 100,
},
Content: []byte("orphan"),
})
handler := NewHandler(nil)
config := Config{
OrphanOlderThanHours: 72,
Operations: "remove_orphans",
}
tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0)
if err != nil {
t.Fatalf("scanTablesForMaintenance failed: %v", err)
}
if len(tables) != 1 {
t.Fatalf("expected 1 orphan cleanup candidate without snapshots, got %d", len(tables))
}
}
func TestStalePlanGuard(t *testing.T) {
fs, client := startFakeFiler(t)

89
weed/plugin/worker/iceberg/handler_test.go

@ -505,13 +505,19 @@ func TestBuildCompactionBinsMultiplePartitions(t *testing.T) {
partA := map[int]any{1: "us-east"}
partB := map[int]any{1: "eu-west"}
partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{
SourceID: 1,
FieldID: 1000,
Name: "region",
Transform: iceberg.IdentityTransform{},
})
entries := makeTestEntries(t, []testEntrySpec{
{path: "data/a1.parquet", size: 1024, partition: partA},
{path: "data/a2.parquet", size: 2048, partition: partA},
{path: "data/b1.parquet", size: 1024, partition: partB},
{path: "data/b2.parquet", size: 2048, partition: partB},
{path: "data/b3.parquet", size: 4096, partition: partB},
{path: "data/a1.parquet", size: 1024, partition: partA, partitionSpec: &partitionSpec},
{path: "data/a2.parquet", size: 2048, partition: partA, partitionSpec: &partitionSpec},
{path: "data/b1.parquet", size: 1024, partition: partB, partitionSpec: &partitionSpec},
{path: "data/b2.parquet", size: 2048, partition: partB, partitionSpec: &partitionSpec},
{path: "data/b3.parquet", size: 4096, partition: partB, partitionSpec: &partitionSpec},
})
bins := buildCompactionBins(entries, targetSize, minFiles)
@ -574,40 +580,65 @@ func TestSplitOversizedBinDropsImpossibleRunts(t *testing.T) {
}
type testEntrySpec struct {
path string
size int64
partition map[int]any
specID int32 // partition spec ID; 0 uses UnpartitionedSpec
path string
size int64
partition map[int]any
partitionSpec *iceberg.PartitionSpec
specID int32 // partition spec ID; 0 uses UnpartitionedSpec
}
// makeTestEntries creates manifest entries using UnpartitionedSpec (spec ID 0).
// The specID field in testEntrySpec is ignored here; for multi-spec testing,
// use makeTestEntriesWithSpec instead.
func makeTestEntries(t *testing.T, specs []testEntrySpec) []iceberg.ManifestEntry {
func buildTestDataFile(t *testing.T, spec testEntrySpec) iceberg.DataFile {
t.Helper()
partitionSpec := iceberg.UnpartitionedSpec
if spec.partitionSpec != nil {
partitionSpec = spec.partitionSpec
} else if len(spec.partition) > 0 {
t.Fatalf("partition spec is required for partitioned test entry %s", spec.path)
}
dfBuilder, err := iceberg.NewDataFileBuilder(
*partitionSpec,
iceberg.EntryContentData,
spec.path,
iceberg.ParquetFile,
spec.partition,
nil, nil,
1, // recordCount (must be > 0)
spec.size,
)
if err != nil {
t.Fatalf("failed to build data file %s: %v", spec.path, err)
}
return dfBuilder.Build()
}
func makeManifestEntriesWithSnapshot(
t *testing.T,
specs []testEntrySpec,
snapshotID int64,
status iceberg.ManifestEntryStatus,
) []iceberg.ManifestEntry {
t.Helper()
entries := make([]iceberg.ManifestEntry, 0, len(specs))
for _, spec := range specs {
partSpec := *iceberg.UnpartitionedSpec
dfBuilder, err := iceberg.NewDataFileBuilder(
partSpec,
iceberg.EntryContentData,
spec.path,
iceberg.ParquetFile,
spec.partition,
entries = append(entries, iceberg.NewManifestEntry(
status,
&snapshotID,
nil, nil,
1, // recordCount (must be > 0)
spec.size,
)
if err != nil {
t.Fatalf("failed to build data file %s: %v", spec.path, err)
}
snapID := int64(1)
entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build())
entries = append(entries, entry)
buildTestDataFile(t, spec),
))
}
return entries
}
// makeTestEntries creates manifest entries using the default unpartitioned
// spec. For multi-spec testing, use makeTestEntriesWithSpec instead.
func makeTestEntries(t *testing.T, specs []testEntrySpec) []iceberg.ManifestEntry {
t.Helper()
return makeManifestEntriesWithSnapshot(t, specs, 1, iceberg.EntryStatusADDED)
}
// makeTestEntriesWithSpec creates manifest entries using specific partition specs.
// Each spec in the specs slice can specify a specID; the entry is built using
// a PartitionSpec with that ID.

66
weed/plugin/worker/iceberg/operations.go

@ -229,31 +229,54 @@ func (h *Handler) removeOrphans(
return "", nil, fmt.Errorf("load metadata: %w", err)
}
// Collect all referenced files from all snapshots
orphanCandidates, err := collectOrphanCandidates(ctx, filerClient, bucketName, tablePath, meta, metadataFileName, config.OrphanOlderThanHours)
if err != nil {
return "", nil, fmt.Errorf("collect orphan candidates: %w", err)
}
orphanCount := 0
for _, candidate := range orphanCandidates {
if delErr := deleteFilerFile(ctx, filerClient, candidate.Dir, candidate.Entry.Name); delErr != nil {
glog.Warningf("iceberg maintenance: failed to delete orphan %s/%s: %v", candidate.Dir, candidate.Entry.Name, delErr)
} else {
orphanCount++
}
}
metrics := map[string]int64{
MetricOrphansRemoved: int64(orphanCount),
MetricDurationMs: time.Since(start).Milliseconds(),
}
return fmt.Sprintf("removed %d orphan file(s)", orphanCount), metrics, nil
}
func collectOrphanCandidates(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
meta table.Metadata,
metadataFileName string,
orphanOlderThanHours int64,
) ([]filerFileEntry, error) {
referencedFiles, err := collectSnapshotFiles(ctx, filerClient, bucketName, tablePath, meta.Snapshots())
if err != nil {
return "", nil, fmt.Errorf("collect referenced files: %w", err)
return nil, fmt.Errorf("collect referenced files: %w", err)
}
// Reference the active metadata file so it is not treated as orphan
referencedFiles[path.Join("metadata", metadataFileName)] = struct{}{}
// Also reference the current metadata files
for mle := range meta.PreviousFiles() {
referencedFiles[mle.MetadataFile] = struct{}{}
}
// Precompute a normalized lookup set so orphan checks are O(1) per file.
normalizedRefs := make(map[string]struct{}, len(referencedFiles))
for ref := range referencedFiles {
normalizedRefs[ref] = struct{}{}
normalizedRefs[normalizeIcebergPath(ref, bucketName, tablePath)] = struct{}{}
}
// List actual files on filer in metadata/ and data/ directories
tableBasePath := path.Join(s3tables.TablesPath, bucketName, tablePath)
safetyThreshold := time.Now().Add(-time.Duration(config.OrphanOlderThanHours) * time.Hour)
orphanCount := 0
safetyThreshold := time.Now().Add(-time.Duration(orphanOlderThanHours) * time.Hour)
var candidates []filerFileEntry
for _, subdir := range []string{"metadata", "data"} {
dirPath := path.Join(tableBasePath, subdir)
@ -265,39 +288,22 @@ func (h *Handler) removeOrphans(
for _, fe := range fileEntries {
entry := fe.Entry
// Build relative path from the table base (e.g. "data/region=us/file.parquet")
fullPath := path.Join(fe.Dir, entry.Name)
relPath := strings.TrimPrefix(fullPath, tableBasePath+"/")
_, isReferenced := normalizedRefs[relPath]
if isReferenced {
if _, isReferenced := normalizedRefs[relPath]; isReferenced {
continue
}
// Check safety window — skip entries with unknown age
if entry.Attributes == nil {
continue
}
mtime := time.Unix(entry.Attributes.Mtime, 0)
if mtime.After(safetyThreshold) {
if time.Unix(entry.Attributes.Mtime, 0).After(safetyThreshold) {
continue
}
// Delete orphan
if delErr := deleteFilerFile(ctx, filerClient, fe.Dir, entry.Name); delErr != nil {
glog.Warningf("iceberg maintenance: failed to delete orphan %s/%s: %v", fe.Dir, entry.Name, delErr)
} else {
orphanCount++
}
candidates = append(candidates, fe)
}
}
metrics := map[string]int64{
MetricOrphansRemoved: int64(orphanCount),
MetricDurationMs: time.Since(start).Milliseconds(),
}
return fmt.Sprintf("removed %d orphan file(s)", orphanCount), metrics, nil
return candidates, nil
}
// ---------------------------------------------------------------------------

Loading…
Cancel
Save