diff --git a/weed/command/plugin_worker_test.go b/weed/command/plugin_worker_test.go index 9e13142cd..26c250cb5 100644 --- a/weed/command/plugin_worker_test.go +++ b/weed/command/plugin_worker_test.go @@ -6,6 +6,7 @@ import ( "net/http/httptest" "os" "path/filepath" + "slices" "strings" "testing" @@ -138,8 +139,11 @@ func TestPluginWorkerDefaultJobTypes(t *testing.T) { if err != nil { t.Fatalf("parsePluginWorkerJobTypes(default setting) err = %v", err) } - if len(jobTypes) != 4 { - t.Fatalf("expected default job types to include 4 handlers, got %v", jobTypes) + if len(jobTypes) != 5 { + t.Fatalf("expected default job types to include 5 handlers, got %v", jobTypes) + } + if !slices.Contains(jobTypes, "iceberg_maintenance") { + t.Fatalf("expected iceberg_maintenance in default job types, got %v", jobTypes) } } diff --git a/weed/command/worker_runtime.go b/weed/command/worker_runtime.go index 170989503..c4387aea0 100644 --- a/weed/command/worker_runtime.go +++ b/weed/command/worker_runtime.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/seaweedfs/seaweedfs/weed/glog" pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" + icebergworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker/iceberg" "github.com/seaweedfs/seaweedfs/weed/security" statsCollect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" @@ -23,7 +24,7 @@ import ( "google.golang.org/grpc" ) -const defaultPluginWorkerJobTypes = "vacuum,volume_balance,erasure_coding,admin_script" +const defaultPluginWorkerJobTypes = "vacuum,volume_balance,erasure_coding,admin_script,iceberg_maintenance" type pluginWorkerRunOptions struct { AdminServer string @@ -158,6 +159,8 @@ func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption, maxExe return pluginworker.NewErasureCodingHandler(dialOption, workingDir), nil case "admin_script": return pluginworker.NewAdminScriptHandler(dialOption), nil + case "iceberg_maintenance": + return icebergworker.NewHandler(dialOption), nil default: return nil, fmt.Errorf("unsupported plugin job type %q", canonicalJobType) } @@ -224,6 +227,8 @@ func canonicalPluginWorkerJobType(jobType string) (string, error) { return "erasure_coding", nil case "admin_script", "admin-script", "admin.script", "script", "admin": return "admin_script", nil + case "iceberg_maintenance", "iceberg-maintenance", "iceberg.maintenance", "iceberg": + return "iceberg_maintenance", nil default: return "", fmt.Errorf("unsupported plugin job type %q", jobType) } diff --git a/weed/command/worker_test.go b/weed/command/worker_test.go index 0bde51cc9..d800e20ab 100644 --- a/weed/command/worker_test.go +++ b/weed/command/worker_test.go @@ -7,7 +7,7 @@ func TestWorkerDefaultJobTypes(t *testing.T) { if err != nil { t.Fatalf("parsePluginWorkerJobTypes(default worker flag) err = %v", err) } - if len(jobTypes) != 4 { - t.Fatalf("expected default worker job types to include 4 handlers, got %v", jobTypes) + if len(jobTypes) != 5 { + t.Fatalf("expected default worker job types to include 5 handlers, got %v", jobTypes) } } diff --git a/weed/plugin/worker/admin_script_handler.go b/weed/plugin/worker/admin_script_handler.go index 0a4f7bfc9..348e2c0b8 100644 --- a/weed/plugin/worker/admin_script_handler.go +++ b/weed/plugin/worker/admin_script_handler.go @@ -131,8 +131,8 @@ func (h *AdminScriptHandler) Detect(ctx context.Context, request *plugin_pb.RunD script := normalizeAdminScript(readStringConfig(request.GetAdminConfigValues(), "script", "")) scriptName := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "script_name", "")) runIntervalMinutes := readAdminScriptRunIntervalMinutes(request.GetAdminConfigValues()) - if shouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), runIntervalMinutes*60) { - _ = sender.SendActivity(buildDetectorActivity( + if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), runIntervalMinutes*60) { + _ = sender.SendActivity(BuildDetectorActivity( "skipped_by_interval", fmt.Sprintf("ADMIN SCRIPT: Detection skipped due to run interval (%dm)", runIntervalMinutes), map[string]*plugin_pb.ConfigValue{ @@ -158,7 +158,7 @@ func (h *AdminScriptHandler) Detect(ctx context.Context, request *plugin_pb.RunD commands := parseAdminScriptCommands(script) execCount := countExecutableCommands(commands) if execCount == 0 { - _ = sender.SendActivity(buildDetectorActivity( + _ = sender.SendActivity(BuildDetectorActivity( "no_script", "ADMIN SCRIPT: No executable commands configured", map[string]*plugin_pb.ConfigValue{ @@ -251,7 +251,7 @@ func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.Exe Stage: "assigned", Message: "admin script job accepted", Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("assigned", "admin script job accepted"), + BuildExecutorActivity("assigned", "admin script job accepted"), }, }); err != nil { return err @@ -287,7 +287,7 @@ func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.Exe Stage: "error", Message: msg, Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("error", msg), + BuildExecutorActivity("error", msg), }, }) } @@ -303,7 +303,7 @@ func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.Exe Stage: "error", Message: msg, Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("error", msg), + BuildExecutorActivity("error", msg), }, }) } @@ -316,7 +316,7 @@ func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.Exe Stage: "running", Message: fmt.Sprintf("executed %d/%d command(s)", executed, len(execCommands)), Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("running", commandLine), + BuildExecutorActivity("running", commandLine), }, }) } @@ -382,7 +382,7 @@ func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.Exe OutputValues: outputValues, }, Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("completed", resultSummary), + BuildExecutorActivity("completed", resultSummary), }, CompletedAt: timestamppb.Now(), }) diff --git a/weed/plugin/worker/erasure_coding_handler.go b/weed/plugin/worker/erasure_coding_handler.go index 15f9c9a83..df4959f17 100644 --- a/weed/plugin/worker/erasure_coding_handler.go +++ b/weed/plugin/worker/erasure_coding_handler.go @@ -205,9 +205,9 @@ func (h *ErasureCodingHandler) Detect( } workerConfig := deriveErasureCodingWorkerConfig(request.GetWorkerConfigValues()) - if shouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) { + if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) { minInterval := time.Duration(workerConfig.MinIntervalSeconds) * time.Second - _ = sender.SendActivity(buildDetectorActivity( + _ = sender.SendActivity(BuildDetectorActivity( "skipped_by_interval", fmt.Sprintf("ERASURE CODING: Detection skipped due to min interval (%s)", minInterval), map[string]*plugin_pb.ConfigValue{ @@ -380,7 +380,7 @@ func emitErasureCodingDetectionDecisionTrace( ) } - if err := sender.SendActivity(buildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{ + if err := sender.SendActivity(BuildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{ "total_volumes": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalVolumes)}, }, @@ -437,7 +437,7 @@ func emitErasureCodingDetectionDecisionTrace( metric.FullnessRatio*100, taskConfig.FullnessRatio*100, ) - if err := sender.SendActivity(buildDetectorActivity("decision_volume", message, map[string]*plugin_pb.ConfigValue{ + if err := sender.SendActivity(BuildDetectorActivity("decision_volume", message, map[string]*plugin_pb.ConfigValue{ "volume_id": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(metric.VolumeID)}, }, @@ -520,7 +520,7 @@ func (h *ErasureCodingHandler) Execute( Stage: stage, Message: message, Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity(stage, message), + BuildExecutorActivity(stage, message), }, }) }) @@ -533,7 +533,7 @@ func (h *ErasureCodingHandler) Execute( Stage: "assigned", Message: "erasure coding job accepted", Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("assigned", "erasure coding job accepted"), + BuildExecutorActivity("assigned", "erasure coding job accepted"), }, }); err != nil { return err @@ -548,7 +548,7 @@ func (h *ErasureCodingHandler) Execute( Stage: "failed", Message: err.Error(), Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("failed", err.Error()), + BuildExecutorActivity("failed", err.Error()), }, }) return err @@ -576,7 +576,7 @@ func (h *ErasureCodingHandler) Execute( }, }, Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("completed", resultSummary), + BuildExecutorActivity("completed", resultSummary), }, }) } diff --git a/weed/plugin/worker/iceberg/compact.go b/weed/plugin/worker/iceberg/compact.go new file mode 100644 index 000000000..b285404e3 --- /dev/null +++ b/weed/plugin/worker/iceberg/compact.go @@ -0,0 +1,551 @@ +package iceberg + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "path" + "sort" + "strings" + "time" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/parquet-go/parquet-go" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// compactionBin groups small data files from the same partition for merging. +type compactionBin struct { + PartitionKey string + Partition map[int]any + Entries []iceberg.ManifestEntry + TotalSize int64 +} + +// compactDataFiles reads manifests to find small Parquet data files, groups +// them by partition, reads and merges them using parquet-go, and commits new +// manifest entries. +func (h *Handler) compactDataFiles( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + config Config, +) (string, error) { + meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath) + if err != nil { + return "", fmt.Errorf("load metadata: %w", err) + } + + currentSnap := meta.CurrentSnapshot() + if currentSnap == nil || currentSnap.ManifestList == "" { + return "no current snapshot", nil + } + + // Read manifest list + manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList) + if err != nil { + return "", fmt.Errorf("read manifest list: %w", err) + } + manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData)) + if err != nil { + return "", fmt.Errorf("parse manifest list: %w", err) + } + + // Abort if delete manifests exist — the compactor does not apply deletes, + // so carrying them through could produce incorrect results. + // Also detect multiple partition specs — the compactor writes a single + // manifest under the current spec which is invalid for spec-evolved tables. + specIDs := make(map[int32]struct{}) + for _, mf := range manifests { + if mf.ManifestContent() != iceberg.ManifestContentData { + return "compaction skipped: delete manifests present (not yet supported)", nil + } + specIDs[mf.PartitionSpecID()] = struct{}{} + } + if len(specIDs) > 1 { + return "compaction skipped: multiple partition specs present (not yet supported)", nil + } + + // Collect data file entries from data manifests + var allEntries []iceberg.ManifestEntry + for _, mf := range manifests { + manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath()) + if err != nil { + return "", fmt.Errorf("read manifest %s: %w", mf.FilePath(), err) + } + entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) + if err != nil { + return "", fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err) + } + allEntries = append(allEntries, entries...) + } + + // Build compaction bins: group small files by partition + // MinInputFiles is clamped by ParseConfig to [2, ...] so int conversion is safe. + bins := buildCompactionBins(allEntries, config.TargetFileSizeBytes, int(config.MinInputFiles)) + if len(bins) == 0 { + return "no files eligible for compaction", nil + } + + spec := meta.PartitionSpec() + schema := meta.CurrentSchema() + version := meta.Version() + snapshotID := currentSnap.SnapshotID + + // Compute the snapshot ID for the commit up front so all manifest entries + // reference the same snapshot that will actually be committed. + newSnapID := time.Now().UnixMilli() + + // Process each bin: read source Parquet files, merge, write output + var newManifestEntries []iceberg.ManifestEntry + var deletedManifestEntries []iceberg.ManifestEntry + totalMerged := 0 + + metaDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "metadata") + dataDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "data") + + // Track written artifacts so we can clean them up if the commit fails. + type artifact struct { + dir, fileName string + } + var writtenArtifacts []artifact + committed := false + + defer func() { + if committed || len(writtenArtifacts) == 0 { + return + } + // Use a detached context so cleanup completes even if ctx was canceled. + cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + for _, a := range writtenArtifacts { + if err := deleteFilerFile(cleanupCtx, filerClient, a.dir, a.fileName); err != nil { + glog.Warningf("iceberg compact: failed to clean up artifact %s/%s: %v", a.dir, a.fileName, err) + } + } + }() + + for binIdx, bin := range bins { + select { + case <-ctx.Done(): + return "", ctx.Err() + default: + } + + mergedFileName := fmt.Sprintf("compact-%d-%d-%d.parquet", snapshotID, newSnapID, binIdx) + mergedFilePath := path.Join("data", mergedFileName) + + mergedData, recordCount, err := mergeParquetFiles(ctx, filerClient, bucketName, tablePath, bin.Entries) + if err != nil { + glog.Warningf("iceberg compact: failed to merge bin %d (%d files): %v", binIdx, len(bin.Entries), err) + continue + } + + // Write merged file to filer + if err := ensureFilerDir(ctx, filerClient, dataDir); err != nil { + return "", fmt.Errorf("ensure data dir: %w", err) + } + if err := saveFilerFile(ctx, filerClient, dataDir, mergedFileName, mergedData); err != nil { + return "", fmt.Errorf("save merged file: %w", err) + } + + // Create new DataFile entry for the merged file + dfBuilder, err := iceberg.NewDataFileBuilder( + spec, + iceberg.EntryContentData, + mergedFilePath, + iceberg.ParquetFile, + bin.Partition, + nil, nil, + recordCount, + int64(len(mergedData)), + ) + if err != nil { + glog.Warningf("iceberg compact: failed to build data file entry for bin %d: %v", binIdx, err) + // Clean up the written file + _ = deleteFilerFile(ctx, filerClient, dataDir, mergedFileName) + continue + } + writtenArtifacts = append(writtenArtifacts, artifact{dir: dataDir, fileName: mergedFileName}) + + newEntry := iceberg.NewManifestEntry( + iceberg.EntryStatusADDED, + &newSnapID, + nil, nil, + dfBuilder.Build(), + ) + newManifestEntries = append(newManifestEntries, newEntry) + + // Mark original entries as deleted + for _, entry := range bin.Entries { + delEntry := iceberg.NewManifestEntry( + iceberg.EntryStatusDELETED, + &newSnapID, + nil, nil, + entry.DataFile(), + ) + deletedManifestEntries = append(deletedManifestEntries, delEntry) + } + + totalMerged += len(bin.Entries) + } + + if len(newManifestEntries) == 0 { + return "no bins successfully compacted", nil + } + + // Build entries for the new manifest: + // - ADDED entries for merged files + // - DELETED entries for original files + // - EXISTING entries for files that weren't compacted + compactedPaths := make(map[string]struct{}) + for _, entry := range deletedManifestEntries { + compactedPaths[entry.DataFile().FilePath()] = struct{}{} + } + + var manifestEntries []iceberg.ManifestEntry + manifestEntries = append(manifestEntries, newManifestEntries...) + manifestEntries = append(manifestEntries, deletedManifestEntries...) + + // Keep existing entries that weren't compacted + for _, entry := range allEntries { + if _, compacted := compactedPaths[entry.DataFile().FilePath()]; !compacted { + existingEntry := iceberg.NewManifestEntry( + iceberg.EntryStatusEXISTING, + func() *int64 { id := entry.SnapshotID(); return &id }(), + nil, nil, + entry.DataFile(), + ) + manifestEntries = append(manifestEntries, existingEntry) + } + } + + // Write new manifest + var manifestBuf bytes.Buffer + manifestFileName := fmt.Sprintf("compact-%d.avro", newSnapID) + newManifest, err := iceberg.WriteManifest( + path.Join("metadata", manifestFileName), + &manifestBuf, + version, + spec, + schema, + newSnapID, + manifestEntries, + ) + if err != nil { + return "", fmt.Errorf("write compact manifest: %w", err) + } + + if err := saveFilerFile(ctx, filerClient, metaDir, manifestFileName, manifestBuf.Bytes()); err != nil { + return "", fmt.Errorf("save compact manifest: %w", err) + } + writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestFileName}) + + // Build manifest list with only the new manifest (the early abort at the + // top of this function guarantees no delete manifests are present). + allManifests := []iceberg.ManifestFile{newManifest} + + // Write new manifest list + var manifestListBuf bytes.Buffer + seqNum := currentSnap.SequenceNumber + 1 + err = iceberg.WriteManifestList(version, &manifestListBuf, newSnapID, &snapshotID, &seqNum, 0, allManifests) + if err != nil { + return "", fmt.Errorf("write compact manifest list: %w", err) + } + + manifestListFileName := fmt.Sprintf("snap-%d.avro", newSnapID) + if err := saveFilerFile(ctx, filerClient, metaDir, manifestListFileName, manifestListBuf.Bytes()); err != nil { + return "", fmt.Errorf("save compact manifest list: %w", err) + } + writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestListFileName}) + + // Commit: add new snapshot and update main branch ref + manifestListLocation := path.Join("metadata", manifestListFileName) + err = h.commitWithRetry(ctx, filerClient, bucketName, tablePath, metadataFileName, config, func(currentMeta table.Metadata, builder *table.MetadataBuilder) error { + // Guard: verify table head hasn't advanced since we planned. + cs := currentMeta.CurrentSnapshot() + if cs == nil || cs.SnapshotID != snapshotID { + return errStalePlan + } + + newSnapshot := &table.Snapshot{ + SnapshotID: newSnapID, + ParentSnapshotID: &snapshotID, + SequenceNumber: seqNum, + TimestampMs: newSnapID, + ManifestList: manifestListLocation, + Summary: &table.Summary{ + Operation: table.OpReplace, + Properties: map[string]string{ + "maintenance": "compact_data_files", + "merged-files": fmt.Sprintf("%d", totalMerged), + "new-files": fmt.Sprintf("%d", len(newManifestEntries)), + "compaction-bins": fmt.Sprintf("%d", len(bins)), + }, + }, + SchemaID: func() *int { + id := schema.ID + return &id + }(), + } + if err := builder.AddSnapshot(newSnapshot); err != nil { + return err + } + return builder.SetSnapshotRef(table.MainBranch, newSnapID, table.BranchRef) + }) + if err != nil { + return "", fmt.Errorf("commit compaction: %w", err) + } + + committed = true + return fmt.Sprintf("compacted %d files into %d (across %d bins)", totalMerged, len(newManifestEntries), len(bins)), nil +} + +// buildCompactionBins groups small data files by partition for bin-packing. +// A file is "small" if it's below targetSize. A bin must have at least +// minFiles entries to be worth compacting. +func buildCompactionBins(entries []iceberg.ManifestEntry, targetSize int64, minFiles int) []compactionBin { + if minFiles < 2 { + minFiles = 2 + } + + // Group entries by partition key + groups := make(map[string]*compactionBin) + for _, entry := range entries { + df := entry.DataFile() + if df.FileFormat() != iceberg.ParquetFile { + continue + } + if df.FileSizeBytes() >= targetSize { + continue + } + + partKey := partitionKey(df.Partition()) + bin, ok := groups[partKey] + if !ok { + bin = &compactionBin{ + PartitionKey: partKey, + Partition: df.Partition(), + } + groups[partKey] = bin + } + bin.Entries = append(bin.Entries, entry) + bin.TotalSize += df.FileSizeBytes() + } + + // Filter to bins with enough files, splitting oversized bins + var result []compactionBin + for _, bin := range groups { + if len(bin.Entries) < minFiles { + continue + } + if bin.TotalSize <= targetSize { + result = append(result, *bin) + } else { + result = append(result, splitOversizedBin(*bin, targetSize, minFiles)...) + } + } + + // Sort by partition key for deterministic order + sort.Slice(result, func(i, j int) bool { + return result[i].PartitionKey < result[j].PartitionKey + }) + + return result +} + +// splitOversizedBin splits a bin whose total size exceeds targetSize into +// sub-bins that each stay under targetSize while meeting minFiles. +func splitOversizedBin(bin compactionBin, targetSize int64, minFiles int) []compactionBin { + var bins []compactionBin + current := compactionBin{ + PartitionKey: bin.PartitionKey, + Partition: bin.Partition, + } + for _, entry := range bin.Entries { + if current.TotalSize > 0 && current.TotalSize+entry.DataFile().FileSizeBytes() > targetSize && len(current.Entries) >= minFiles { + bins = append(bins, current) + current = compactionBin{ + PartitionKey: bin.PartitionKey, + Partition: bin.Partition, + } + } + current.Entries = append(current.Entries, entry) + current.TotalSize += entry.DataFile().FileSizeBytes() + } + if len(current.Entries) >= minFiles { + bins = append(bins, current) + } + return bins +} + +// partitionKey creates a string key from a partition map for grouping. +// Values are JSON-encoded to avoid ambiguity when values contain commas or '='. +func partitionKey(partition map[int]any) string { + if len(partition) == 0 { + return "__unpartitioned__" + } + + // Sort field IDs for deterministic key + ids := make([]int, 0, len(partition)) + for id := range partition { + ids = append(ids, id) + } + sort.Ints(ids) + + var parts []string + for _, id := range ids { + v, err := json.Marshal(partition[id]) + if err != nil { + v = []byte(fmt.Sprintf("%x", fmt.Sprintf("%v", partition[id]))) + } + parts = append(parts, fmt.Sprintf("%d=%s", id, v)) + } + return strings.Join(parts, "\x00") +} + +// mergeParquetFiles reads multiple small Parquet files and merges them into +// a single Parquet file. It reads rows from each source and writes them to +// the output using the schema from the first file. +// +// Files are loaded into memory (appropriate for compacting small files). +func mergeParquetFiles( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + entries []iceberg.ManifestEntry, +) ([]byte, int64, error) { + if len(entries) == 0 { + return nil, 0, fmt.Errorf("no entries to merge") + } + + // Read all source files and create parquet readers + type sourceFile struct { + reader *parquet.Reader + data []byte + } + var sources []sourceFile + defer func() { + for _, src := range sources { + if src.reader != nil { + src.reader.Close() + } + } + }() + + var parquetSchema *parquet.Schema + for _, entry := range entries { + select { + case <-ctx.Done(): + return nil, 0, ctx.Err() + default: + } + + data, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath()) + if err != nil { + return nil, 0, fmt.Errorf("read parquet file %s: %w", entry.DataFile().FilePath(), err) + } + + reader := parquet.NewReader(bytes.NewReader(data)) + readerSchema := reader.Schema() + if parquetSchema == nil { + parquetSchema = readerSchema + } else if !schemasEqual(parquetSchema, readerSchema) { + return nil, 0, fmt.Errorf("schema mismatch in %s: cannot merge files with different schemas", entry.DataFile().FilePath()) + } + sources = append(sources, sourceFile{reader: reader, data: data}) + } + + if parquetSchema == nil { + return nil, 0, fmt.Errorf("no parquet schema found") + } + + // Write merged output + var outputBuf bytes.Buffer + writer := parquet.NewWriter(&outputBuf, parquetSchema) + + var totalRows int64 + rows := make([]parquet.Row, 256) + + for _, src := range sources { + for { + n, err := src.reader.ReadRows(rows) + if n > 0 { + if _, writeErr := writer.WriteRows(rows[:n]); writeErr != nil { + writer.Close() + return nil, 0, fmt.Errorf("write rows: %w", writeErr) + } + totalRows += int64(n) + } + if err != nil { + if err == io.EOF { + break + } + writer.Close() + return nil, 0, fmt.Errorf("read rows: %w", err) + } + } + } + + if err := writer.Close(); err != nil { + return nil, 0, fmt.Errorf("close writer: %w", err) + } + + return outputBuf.Bytes(), totalRows, nil +} + +// schemasEqual compares two parquet schemas structurally. +func schemasEqual(a, b *parquet.Schema) bool { + if a == b { + return true + } + if a == nil || b == nil { + return false + } + return parquet.EqualNodes(a, b) +} + +// ensureFilerDir ensures a directory exists in the filer. +func ensureFilerDir(ctx context.Context, client filer_pb.SeaweedFilerClient, dirPath string) error { + parentDir := path.Dir(dirPath) + dirName := path.Base(dirPath) + + _, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: parentDir, + Name: dirName, + }) + if err == nil { + return nil // already exists + } + if !errors.Is(err, filer_pb.ErrNotFound) && status.Code(err) != codes.NotFound { + return fmt.Errorf("lookup dir %s: %w", dirPath, err) + } + + resp, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: parentDir, + Entry: &filer_pb.Entry{ + Name: dirName, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0755), + }, + }, + }) + if createErr != nil { + return createErr + } + if resp.Error != "" && !strings.Contains(resp.Error, "exist") { + return fmt.Errorf("create dir %s: %s", dirPath, resp.Error) + } + return nil +} diff --git a/weed/plugin/worker/iceberg/config.go b/weed/plugin/worker/iceberg/config.go new file mode 100644 index 000000000..329ddca4b --- /dev/null +++ b/weed/plugin/worker/iceberg/config.go @@ -0,0 +1,178 @@ +package iceberg + +import ( + "fmt" + "strconv" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" +) + +const ( + jobType = "iceberg_maintenance" + + defaultSnapshotRetentionHours = 168 // 7 days + defaultMaxSnapshotsToKeep = 5 + defaultOrphanOlderThanHours = 72 + defaultMaxCommitRetries = 5 + defaultTargetFileSizeBytes = 256 * 1024 * 1024 + defaultMinInputFiles = 5 + defaultOperations = "all" +) + +// Config holds parsed worker config values. +type Config struct { + SnapshotRetentionHours int64 + MaxSnapshotsToKeep int64 + OrphanOlderThanHours int64 + MaxCommitRetries int64 + TargetFileSizeBytes int64 + MinInputFiles int64 + Operations string +} + +// ParseConfig extracts an iceberg maintenance Config from plugin config values. +// Values are clamped to safe minimums to prevent misconfiguration. +func ParseConfig(values map[string]*plugin_pb.ConfigValue) Config { + cfg := Config{ + SnapshotRetentionHours: readInt64Config(values, "snapshot_retention_hours", defaultSnapshotRetentionHours), + MaxSnapshotsToKeep: readInt64Config(values, "max_snapshots_to_keep", defaultMaxSnapshotsToKeep), + OrphanOlderThanHours: readInt64Config(values, "orphan_older_than_hours", defaultOrphanOlderThanHours), + MaxCommitRetries: readInt64Config(values, "max_commit_retries", defaultMaxCommitRetries), + TargetFileSizeBytes: readInt64Config(values, "target_file_size_bytes", defaultTargetFileSizeBytes), + MinInputFiles: readInt64Config(values, "min_input_files", defaultMinInputFiles), + Operations: readStringConfig(values, "operations", defaultOperations), + } + + // Clamp to safe minimums using the default constants + if cfg.SnapshotRetentionHours <= 0 { + cfg.SnapshotRetentionHours = defaultSnapshotRetentionHours + } + if cfg.MaxSnapshotsToKeep <= 0 { + cfg.MaxSnapshotsToKeep = defaultMaxSnapshotsToKeep + } + if cfg.OrphanOlderThanHours <= 0 { + cfg.OrphanOlderThanHours = defaultOrphanOlderThanHours + } + if cfg.MaxCommitRetries <= 0 { + cfg.MaxCommitRetries = defaultMaxCommitRetries + } + if cfg.TargetFileSizeBytes <= 0 { + cfg.TargetFileSizeBytes = defaultTargetFileSizeBytes + } + if cfg.MinInputFiles < 2 { + cfg.MinInputFiles = defaultMinInputFiles + } + + return cfg +} + +// parseOperations returns the ordered list of maintenance operations to execute. +// Order follows Iceberg best practices: compact → expire_snapshots → remove_orphans → rewrite_manifests. +// Returns an error if any unknown operation is specified or the result would be empty. +func parseOperations(ops string) ([]string, error) { + ops = strings.TrimSpace(strings.ToLower(ops)) + if ops == "" || ops == "all" { + return []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, nil + } + + validOps := map[string]struct{}{ + "compact": {}, + "expire_snapshots": {}, + "remove_orphans": {}, + "rewrite_manifests": {}, + } + + requested := make(map[string]struct{}) + for _, op := range strings.Split(ops, ",") { + op = strings.TrimSpace(op) + if op == "" { + continue + } + if _, ok := validOps[op]; !ok { + return nil, fmt.Errorf("unknown maintenance operation %q (valid: compact, expire_snapshots, remove_orphans, rewrite_manifests)", op) + } + requested[op] = struct{}{} + } + + // Return in canonical order: compact → expire_snapshots → remove_orphans → rewrite_manifests + canonicalOrder := []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"} + var result []string + for _, op := range canonicalOrder { + if _, ok := requested[op]; ok { + result = append(result, op) + } + } + + if len(result) == 0 { + return nil, fmt.Errorf("no valid maintenance operations specified") + } + return result, nil +} + +func extractMetadataVersion(metadataFileName string) int { + // Parse "v3.metadata.json" or "v3-{nonce}.metadata.json" → 3 + name := strings.TrimPrefix(metadataFileName, "v") + name = strings.TrimSuffix(name, ".metadata.json") + // Strip any nonce suffix (e.g. "3-1709766000" → "3") + if dashIdx := strings.Index(name, "-"); dashIdx > 0 { + name = name[:dashIdx] + } + version, _ := strconv.Atoi(name) + return version +} + +// readStringConfig reads a string value from plugin config, with fallback. +func readStringConfig(values map[string]*plugin_pb.ConfigValue, field string, fallback string) string { + if values == nil { + return fallback + } + value := values[field] + if value == nil { + return fallback + } + switch kind := value.Kind.(type) { + case *plugin_pb.ConfigValue_StringValue: + return kind.StringValue + case *plugin_pb.ConfigValue_Int64Value: + return strconv.FormatInt(kind.Int64Value, 10) + case *plugin_pb.ConfigValue_DoubleValue: + return strconv.FormatFloat(kind.DoubleValue, 'f', -1, 64) + case *plugin_pb.ConfigValue_BoolValue: + return strconv.FormatBool(kind.BoolValue) + default: + glog.V(1).Infof("readStringConfig: unexpected config value type %T for field %q, using fallback", value.Kind, field) + } + return fallback +} + +// readInt64Config reads an int64 value from plugin config, with fallback. +func readInt64Config(values map[string]*plugin_pb.ConfigValue, field string, fallback int64) int64 { + if values == nil { + return fallback + } + value := values[field] + if value == nil { + return fallback + } + switch kind := value.Kind.(type) { + case *plugin_pb.ConfigValue_Int64Value: + return kind.Int64Value + case *plugin_pb.ConfigValue_DoubleValue: + return int64(kind.DoubleValue) + case *plugin_pb.ConfigValue_StringValue: + parsed, err := strconv.ParseInt(strings.TrimSpace(kind.StringValue), 10, 64) + if err == nil { + return parsed + } + case *plugin_pb.ConfigValue_BoolValue: + if kind.BoolValue { + return 1 + } + return 0 + default: + glog.V(1).Infof("readInt64Config: unexpected config value type %T for field %q, using fallback", value.Kind, field) + } + return fallback +} diff --git a/weed/plugin/worker/iceberg/detection.go b/weed/plugin/worker/iceberg/detection.go new file mode 100644 index 000000000..405e37017 --- /dev/null +++ b/weed/plugin/worker/iceberg/detection.go @@ -0,0 +1,208 @@ +package iceberg + +import ( + "context" + "encoding/json" + "fmt" + "path" + "strings" + "time" + + "github.com/apache/iceberg-go/table" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" +) + +// tableInfo captures metadata about a table for detection/execution. +type tableInfo struct { + BucketName string + Namespace string + TableName string + TablePath string // namespace/tableName + Metadata table.Metadata +} + +// scanTablesForMaintenance enumerates table buckets and their tables, +// evaluating which ones need maintenance based on metadata thresholds. +// When limit > 0 the scan stops after collecting limit+1 results so the +// caller can determine whether more tables remain (HasMore). +func (h *Handler) scanTablesForMaintenance( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + config Config, + bucketFilter, namespaceFilter, tableFilter string, + limit int, +) ([]tableInfo, error) { + var tables []tableInfo + + // Compile wildcard matchers once (nil = match all) + bucketMatchers := wildcard.CompileWildcardMatchers(bucketFilter) + nsMatchers := wildcard.CompileWildcardMatchers(namespaceFilter) + tableMatchers := wildcard.CompileWildcardMatchers(tableFilter) + + // List entries under /buckets to find table buckets + bucketsPath := s3tables.TablesPath + bucketEntries, err := listFilerEntries(ctx, filerClient, bucketsPath, "") + if err != nil { + return nil, fmt.Errorf("list buckets: %w", err) + } + + for _, bucketEntry := range bucketEntries { + select { + case <-ctx.Done(): + return tables, ctx.Err() + default: + } + + if !bucketEntry.IsDirectory || !s3tables.IsTableBucketEntry(bucketEntry) { + continue + } + bucketName := bucketEntry.Name + if !wildcard.MatchesAnyWildcard(bucketMatchers, bucketName) { + continue + } + + // List namespaces within the bucket + bucketPath := path.Join(bucketsPath, bucketName) + nsEntries, err := listFilerEntries(ctx, filerClient, bucketPath, "") + if err != nil { + glog.Warningf("iceberg maintenance: failed to list namespaces in bucket %s: %v", bucketName, err) + continue + } + + for _, nsEntry := range nsEntries { + select { + case <-ctx.Done(): + return tables, ctx.Err() + default: + } + + if !nsEntry.IsDirectory { + continue + } + nsName := nsEntry.Name + if !wildcard.MatchesAnyWildcard(nsMatchers, nsName) { + continue + } + // Skip internal directories + if strings.HasPrefix(nsName, ".") { + continue + } + + // List tables within the namespace + nsPath := path.Join(bucketPath, nsName) + tableEntries, err := listFilerEntries(ctx, filerClient, nsPath, "") + if err != nil { + glog.Warningf("iceberg maintenance: failed to list tables in %s/%s: %v", bucketName, nsName, err) + continue + } + + for _, tableEntry := range tableEntries { + if !tableEntry.IsDirectory { + continue + } + tblName := tableEntry.Name + if !wildcard.MatchesAnyWildcard(tableMatchers, tblName) { + continue + } + + // Check if this entry has table metadata + metadataBytes, ok := tableEntry.Extended[s3tables.ExtendedKeyMetadata] + if !ok || len(metadataBytes) == 0 { + continue + } + + // Parse the internal metadata to get FullMetadata + var internalMeta struct { + Metadata *struct { + FullMetadata json.RawMessage `json:"fullMetadata,omitempty"` + } `json:"metadata,omitempty"` + } + if err := json.Unmarshal(metadataBytes, &internalMeta); err != nil { + glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot parse metadata: %v", bucketName, nsName, tblName, err) + continue + } + if internalMeta.Metadata == nil || len(internalMeta.Metadata.FullMetadata) == 0 { + continue + } + + icebergMeta, err := table.ParseMetadataBytes(internalMeta.Metadata.FullMetadata) + if err != nil { + glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot parse iceberg metadata: %v", bucketName, nsName, tblName, err) + continue + } + + if needsMaintenance(icebergMeta, config) { + tables = append(tables, tableInfo{ + BucketName: bucketName, + Namespace: nsName, + TableName: tblName, + TablePath: path.Join(nsName, tblName), + Metadata: icebergMeta, + }) + if limit > 0 && len(tables) > limit { + return tables, nil + } + } + } + } + } + + return tables, nil +} + +// needsMaintenance checks if a table needs any maintenance based on +// metadata-only thresholds (no manifest reading). +func needsMaintenance(meta table.Metadata, config Config) bool { + snapshots := meta.Snapshots() + if len(snapshots) == 0 { + return false + } + + // Check snapshot count + if int64(len(snapshots)) > config.MaxSnapshotsToKeep { + return true + } + + // Check oldest snapshot age + retentionMs := config.SnapshotRetentionHours * 3600 * 1000 + nowMs := time.Now().UnixMilli() + for _, snap := range snapshots { + if nowMs-snap.TimestampMs > retentionMs { + return true + } + } + + return false +} + +// buildMaintenanceProposal creates a JobProposal for a table needing maintenance. +func (h *Handler) buildMaintenanceProposal(t tableInfo, filerAddress string) *plugin_pb.JobProposal { + dedupeKey := fmt.Sprintf("iceberg_maintenance:%s/%s/%s", t.BucketName, t.Namespace, t.TableName) + + snapshotCount := len(t.Metadata.Snapshots()) + summary := fmt.Sprintf("Maintain %s/%s/%s (%d snapshots)", t.BucketName, t.Namespace, t.TableName, snapshotCount) + + return &plugin_pb.JobProposal{ + ProposalId: fmt.Sprintf("iceberg-%s-%s-%s-%d", t.BucketName, t.Namespace, t.TableName, time.Now().UnixMilli()), + DedupeKey: dedupeKey, + JobType: jobType, + Priority: plugin_pb.JobPriority_JOB_PRIORITY_NORMAL, + Summary: summary, + Parameters: map[string]*plugin_pb.ConfigValue{ + "bucket_name": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.BucketName}}, + "namespace": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.Namespace}}, + "table_name": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.TableName}}, + "table_path": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.TablePath}}, + "filer_address": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: filerAddress}}, + }, + Labels: map[string]string{ + "bucket": t.BucketName, + "namespace": t.Namespace, + "table": t.TableName, + }, + } +} diff --git a/weed/plugin/worker/iceberg/exec_test.go b/weed/plugin/worker/iceberg/exec_test.go new file mode 100644 index 000000000..32c45a45d --- /dev/null +++ b/weed/plugin/worker/iceberg/exec_test.go @@ -0,0 +1,944 @@ +package iceberg + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net" + "path" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +// --------------------------------------------------------------------------- +// Fake filer server for execution tests +// --------------------------------------------------------------------------- + +// fakeFilerServer is an in-memory filer that implements the gRPC methods used +// by the iceberg maintenance handler. +type fakeFilerServer struct { + filer_pb.UnimplementedSeaweedFilerServer + + mu sync.Mutex + entries map[string]map[string]*filer_pb.Entry // dir → name → entry + + // Counters for assertions + createCalls int + updateCalls int + deleteCalls int +} + +func newFakeFilerServer() *fakeFilerServer { + return &fakeFilerServer{ + entries: make(map[string]map[string]*filer_pb.Entry), + } +} + +func (f *fakeFilerServer) putEntry(dir, name string, entry *filer_pb.Entry) { + f.mu.Lock() + defer f.mu.Unlock() + if _, ok := f.entries[dir]; !ok { + f.entries[dir] = make(map[string]*filer_pb.Entry) + } + f.entries[dir][name] = entry +} + +func (f *fakeFilerServer) getEntry(dir, name string) *filer_pb.Entry { + f.mu.Lock() + defer f.mu.Unlock() + if dirEntries, ok := f.entries[dir]; ok { + return dirEntries[name] + } + return nil +} + +func (f *fakeFilerServer) listDir(dir string) []*filer_pb.Entry { + f.mu.Lock() + defer f.mu.Unlock() + dirEntries, ok := f.entries[dir] + if !ok { + return nil + } + result := make([]*filer_pb.Entry, 0, len(dirEntries)) + for _, e := range dirEntries { + result = append(result, e) + } + sort.Slice(result, func(i, j int) bool { + return result[i].Name < result[j].Name + }) + return result +} + +func (f *fakeFilerServer) LookupDirectoryEntry(_ context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { + entry := f.getEntry(req.Directory, req.Name) + if entry == nil { + return nil, status.Errorf(codes.NotFound, "entry not found: %s/%s", req.Directory, req.Name) + } + return &filer_pb.LookupDirectoryEntryResponse{Entry: entry}, nil +} + +func (f *fakeFilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream grpc.ServerStreamingServer[filer_pb.ListEntriesResponse]) error { + entries := f.listDir(req.Directory) + if entries == nil { + return nil // empty directory + } + + var sent uint32 + for _, entry := range entries { + if req.Prefix != "" && !strings.HasPrefix(entry.Name, req.Prefix) { + continue + } + if req.StartFromFileName != "" { + if req.InclusiveStartFrom { + if entry.Name < req.StartFromFileName { + continue + } + } else { + if entry.Name <= req.StartFromFileName { + continue + } + } + } + if err := stream.Send(&filer_pb.ListEntriesResponse{Entry: entry}); err != nil { + return err + } + sent++ + if req.Limit > 0 && sent >= req.Limit { + break + } + } + return nil +} + +func (f *fakeFilerServer) CreateEntry(_ context.Context, req *filer_pb.CreateEntryRequest) (*filer_pb.CreateEntryResponse, error) { + f.mu.Lock() + f.createCalls++ + f.mu.Unlock() + + f.putEntry(req.Directory, req.Entry.Name, req.Entry) + return &filer_pb.CreateEntryResponse{}, nil +} + +func (f *fakeFilerServer) UpdateEntry(_ context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) { + f.mu.Lock() + f.updateCalls++ + f.mu.Unlock() + + f.putEntry(req.Directory, req.Entry.Name, req.Entry) + return &filer_pb.UpdateEntryResponse{}, nil +} + +func (f *fakeFilerServer) DeleteEntry(_ context.Context, req *filer_pb.DeleteEntryRequest) (*filer_pb.DeleteEntryResponse, error) { + f.mu.Lock() + f.deleteCalls++ + + if dirEntries, ok := f.entries[req.Directory]; ok { + delete(dirEntries, req.Name) + } + f.mu.Unlock() + return &filer_pb.DeleteEntryResponse{}, nil +} + +// startFakeFiler starts a gRPC server and returns a connected client. +func startFakeFiler(t *testing.T) (*fakeFilerServer, filer_pb.SeaweedFilerClient) { + t.Helper() + fakeServer := newFakeFilerServer() + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + + server := grpc.NewServer() + filer_pb.RegisterSeaweedFilerServer(server, fakeServer) + + go func() { _ = server.Serve(listener) }() + t.Cleanup(server.GracefulStop) + + conn, err := grpc.NewClient(listener.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("dial: %v", err) + } + t.Cleanup(func() { conn.Close() }) + + return fakeServer, filer_pb.NewSeaweedFilerClient(conn) +} + +// --------------------------------------------------------------------------- +// Helpers to populate the fake filer with Iceberg table state +// --------------------------------------------------------------------------- + +// tableSetup holds the state needed to set up a test table in the fake filer. +type tableSetup struct { + BucketName string + Namespace string + TableName string + Snapshots []table.Snapshot +} + +func (ts tableSetup) tablePath() string { + return path.Join(ts.Namespace, ts.TableName) +} + +// populateTable creates the directory hierarchy and metadata entries in the +// fake filer for a table, writes manifest files referenced by snapshots, +// and returns the built metadata. +func populateTable(t *testing.T, fs *fakeFilerServer, setup tableSetup) table.Metadata { + t.Helper() + + meta := buildTestMetadata(t, setup.Snapshots) + fullMetadataJSON, err := json.Marshal(meta) + if err != nil { + t.Fatalf("marshal metadata: %v", err) + } + + // Build internal metadata xattr + internalMeta := map[string]interface{}{ + "metadataVersion": 1, + "metadata": map[string]interface{}{ + "fullMetadata": json.RawMessage(fullMetadataJSON), + }, + } + xattr, err := json.Marshal(internalMeta) + if err != nil { + t.Fatalf("marshal xattr: %v", err) + } + + bucketsPath := s3tables.TablesPath // "/buckets" + bucketPath := path.Join(bucketsPath, setup.BucketName) + nsPath := path.Join(bucketPath, setup.Namespace) + tableFilerPath := path.Join(nsPath, setup.TableName) + + // Register bucket entry (marked as table bucket) + fs.putEntry(bucketsPath, setup.BucketName, &filer_pb.Entry{ + Name: setup.BucketName, + IsDirectory: true, + Extended: map[string][]byte{ + s3tables.ExtendedKeyTableBucket: []byte("true"), + }, + }) + + // Register namespace entry + fs.putEntry(bucketPath, setup.Namespace, &filer_pb.Entry{ + Name: setup.Namespace, + IsDirectory: true, + }) + + // Register table entry with metadata xattr + fs.putEntry(nsPath, setup.TableName, &filer_pb.Entry{ + Name: setup.TableName, + IsDirectory: true, + Extended: map[string][]byte{ + s3tables.ExtendedKeyMetadata: xattr, + }, + }) + + // Create metadata/ and data/ directory placeholders + metaDir := path.Join(tableFilerPath, "metadata") + dataDir := path.Join(tableFilerPath, "data") + + // Write manifest files for each snapshot that has a ManifestList + schema := meta.CurrentSchema() + spec := meta.PartitionSpec() + version := meta.Version() + + for _, snap := range setup.Snapshots { + if snap.ManifestList == "" { + continue + } + + // Create a minimal manifest with one dummy entry for this snapshot + dfBuilder, err := iceberg.NewDataFileBuilder( + spec, + iceberg.EntryContentData, + fmt.Sprintf("data/snap-%d-data.parquet", snap.SnapshotID), + iceberg.ParquetFile, + map[int]any{}, + nil, nil, + 10, // recordCount + 4096, // fileSizeBytes + ) + if err != nil { + t.Fatalf("build data file for snap %d: %v", snap.SnapshotID, err) + } + snapID := snap.SnapshotID + entry := iceberg.NewManifestEntry( + iceberg.EntryStatusADDED, + &snapID, + nil, nil, + dfBuilder.Build(), + ) + + // Write manifest + manifestFileName := fmt.Sprintf("manifest-%d.avro", snap.SnapshotID) + manifestPath := path.Join("metadata", manifestFileName) + var manifestBuf bytes.Buffer + mf, err := iceberg.WriteManifest(manifestPath, &manifestBuf, version, spec, schema, snap.SnapshotID, []iceberg.ManifestEntry{entry}) + if err != nil { + t.Fatalf("write manifest for snap %d: %v", snap.SnapshotID, err) + } + + fs.putEntry(metaDir, manifestFileName, &filer_pb.Entry{ + Name: manifestFileName, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(manifestBuf.Len()), + }, + Content: manifestBuf.Bytes(), + }) + + // Write manifest list + manifestListFileName := path.Base(snap.ManifestList) + var mlBuf bytes.Buffer + parentSnap := snap.ParentSnapshotID + seqNum := snap.SequenceNumber + if err := iceberg.WriteManifestList(version, &mlBuf, snap.SnapshotID, parentSnap, &seqNum, 0, []iceberg.ManifestFile{mf}); err != nil { + t.Fatalf("write manifest list for snap %d: %v", snap.SnapshotID, err) + } + + fs.putEntry(metaDir, manifestListFileName, &filer_pb.Entry{ + Name: manifestListFileName, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(mlBuf.Len()), + }, + Content: mlBuf.Bytes(), + }) + + // Write a dummy data file + dataFileName := fmt.Sprintf("snap-%d-data.parquet", snap.SnapshotID) + fs.putEntry(dataDir, dataFileName, &filer_pb.Entry{ + Name: dataFileName, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: 4096, + }, + Content: []byte("fake-parquet-data"), + }) + } + + return meta +} + +// --------------------------------------------------------------------------- +// Recording senders for Execute tests +// --------------------------------------------------------------------------- + +type recordingExecutionSender struct { + mu sync.Mutex + progress []*plugin_pb.JobProgressUpdate + completed *plugin_pb.JobCompleted +} + +func (r *recordingExecutionSender) SendProgress(p *plugin_pb.JobProgressUpdate) error { + r.mu.Lock() + defer r.mu.Unlock() + r.progress = append(r.progress, p) + return nil +} + +func (r *recordingExecutionSender) SendCompleted(c *plugin_pb.JobCompleted) error { + r.mu.Lock() + defer r.mu.Unlock() + r.completed = c + return nil +} + +// --------------------------------------------------------------------------- +// Execution tests +// --------------------------------------------------------------------------- + +func TestExpireSnapshotsExecution(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + {SnapshotID: 2, TimestampMs: now + 1, ManifestList: "metadata/snap-2.avro"}, + {SnapshotID: 3, TimestampMs: now + 2, ManifestList: "metadata/snap-3.avro"}, + }, + } + populateTable(t, fs, setup) + + handler := NewHandler(nil) + config := Config{ + SnapshotRetentionHours: 0, // expire everything eligible + MaxSnapshotsToKeep: 1, // keep only 1 + MaxCommitRetries: 3, + Operations: "expire_snapshots", + } + + result, err := handler.expireSnapshots(context.Background(), client, setup.BucketName, setup.tablePath(), config) + if err != nil { + t.Fatalf("expireSnapshots failed: %v", err) + } + + if !strings.Contains(result, "expired") { + t.Errorf("expected result to mention expiration, got %q", result) + } + t.Logf("expireSnapshots result: %s", result) + + // Verify the metadata was updated (update calls > 0) + fs.mu.Lock() + updates := fs.updateCalls + fs.mu.Unlock() + if updates == 0 { + t.Error("expected at least one UpdateEntry call for xattr update") + } +} + +func TestExpireSnapshotsNothingToExpire(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "ns", + TableName: "tbl", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + }, + } + populateTable(t, fs, setup) + + handler := NewHandler(nil) + config := Config{ + SnapshotRetentionHours: 24 * 365, // very long retention + MaxSnapshotsToKeep: 10, + MaxCommitRetries: 3, + } + + result, err := handler.expireSnapshots(context.Background(), client, setup.BucketName, setup.tablePath(), config) + if err != nil { + t.Fatalf("expireSnapshots failed: %v", err) + } + if result != "no snapshots expired" { + t.Errorf("expected 'no snapshots expired', got %q", result) + } +} + +func TestRemoveOrphansExecution(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + }, + } + populateTable(t, fs, setup) + + // Add orphan files (old enough to be removed) + metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") + dataDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "data") + oldTime := time.Now().Add(-200 * time.Hour).Unix() + + fs.putEntry(metaDir, "orphan-old.avro", &filer_pb.Entry{ + Name: "orphan-old.avro", + Attributes: &filer_pb.FuseAttributes{Mtime: oldTime}, + }) + fs.putEntry(dataDir, "orphan-data.parquet", &filer_pb.Entry{ + Name: "orphan-data.parquet", + Attributes: &filer_pb.FuseAttributes{Mtime: oldTime}, + }) + // Add a recent orphan that should NOT be removed (within safety window) + fs.putEntry(dataDir, "recent-orphan.parquet", &filer_pb.Entry{ + Name: "recent-orphan.parquet", + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix()}, + }) + + handler := NewHandler(nil) + config := Config{ + OrphanOlderThanHours: 72, + MaxCommitRetries: 3, + } + + result, err := handler.removeOrphans(context.Background(), client, setup.BucketName, setup.tablePath(), config) + if err != nil { + t.Fatalf("removeOrphans failed: %v", err) + } + + if !strings.Contains(result, "removed 2 orphan") { + t.Errorf("expected 2 orphans removed, got %q", result) + } + + // Verify orphan files were deleted + if fs.getEntry(metaDir, "orphan-old.avro") != nil { + t.Error("orphan-old.avro should have been deleted") + } + if fs.getEntry(dataDir, "orphan-data.parquet") != nil { + t.Error("orphan-data.parquet should have been deleted") + } + // Recent orphan should still exist + if fs.getEntry(dataDir, "recent-orphan.parquet") == nil { + t.Error("recent-orphan.parquet should NOT have been deleted (within safety window)") + } +} + +func TestRemoveOrphansPreservesReferencedFiles(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "ns", + TableName: "tbl", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + }, + } + populateTable(t, fs, setup) + + handler := NewHandler(nil) + config := Config{ + OrphanOlderThanHours: 0, // no safety window — remove immediately + MaxCommitRetries: 3, + } + + result, err := handler.removeOrphans(context.Background(), client, setup.BucketName, setup.tablePath(), config) + if err != nil { + t.Fatalf("removeOrphans failed: %v", err) + } + + if !strings.Contains(result, "removed 0 orphan") { + t.Errorf("expected 0 orphans removed (all files are referenced), got %q", result) + } + + // Verify referenced files are still present + metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") + if fs.getEntry(metaDir, "snap-1.avro") == nil { + t.Error("snap-1.avro (referenced manifest list) should not have been deleted") + } + if fs.getEntry(metaDir, "manifest-1.avro") == nil { + t.Error("manifest-1.avro (referenced manifest) should not have been deleted") + } +} + +func TestRewriteManifestsExecution(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + + // Create a table with a single snapshot — we'll add extra small manifests + // to the manifest list so there's something to rewrite. + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + }, + } + meta := populateTable(t, fs, setup) + schema := meta.CurrentSchema() + spec := meta.PartitionSpec() + version := meta.Version() + + // Build 5 small manifests and write them + a manifest list pointing to all of them + metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") + var allManifests []iceberg.ManifestFile + + for i := 1; i <= 5; i++ { + dfBuilder, err := iceberg.NewDataFileBuilder( + spec, + iceberg.EntryContentData, + fmt.Sprintf("data/rewrite-%d.parquet", i), + iceberg.ParquetFile, + map[int]any{}, + nil, nil, + 1, + 1024, + ) + if err != nil { + t.Fatalf("build data file %d: %v", i, err) + } + snapID := int64(1) + entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build()) + + manifestName := fmt.Sprintf("small-manifest-%d.avro", i) + var buf bytes.Buffer + mf, err := iceberg.WriteManifest(path.Join("metadata", manifestName), &buf, version, spec, schema, 1, []iceberg.ManifestEntry{entry}) + if err != nil { + t.Fatalf("write small manifest %d: %v", i, err) + } + fs.putEntry(metaDir, manifestName, &filer_pb.Entry{ + Name: manifestName, + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix()}, + Content: buf.Bytes(), + }) + allManifests = append(allManifests, mf) + } + + // Overwrite the manifest list with all 5 manifests + var mlBuf bytes.Buffer + seqNum := int64(1) + if err := iceberg.WriteManifestList(version, &mlBuf, 1, nil, &seqNum, 0, allManifests); err != nil { + t.Fatalf("write manifest list: %v", err) + } + fs.putEntry(metaDir, "snap-1.avro", &filer_pb.Entry{ + Name: "snap-1.avro", + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix()}, + Content: mlBuf.Bytes(), + }) + + handler := NewHandler(nil) + config := Config{ + MinInputFiles: 3, // threshold to trigger rewrite (5 >= 3) + MaxCommitRetries: 3, + } + + result, err := handler.rewriteManifests(context.Background(), client, setup.BucketName, setup.tablePath(), config) + if err != nil { + t.Fatalf("rewriteManifests failed: %v", err) + } + + if !strings.Contains(result, "rewrote 5 manifests into 1") { + t.Errorf("expected '5 manifests into 1', got %q", result) + } + t.Logf("rewriteManifests result: %s", result) + + // Verify a new metadata file and merged manifest were written + fs.mu.Lock() + creates := fs.createCalls + updates := fs.updateCalls + fs.mu.Unlock() + + if creates < 3 { + // At minimum: merged manifest, manifest list, new metadata file + t.Errorf("expected at least 3 CreateEntry calls, got %d", creates) + } + if updates == 0 { + t.Error("expected at least one UpdateEntry call for xattr update") + } +} + +func TestRewriteManifestsBelowThreshold(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "ns", + TableName: "tbl", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + }, + } + populateTable(t, fs, setup) + + handler := NewHandler(nil) + config := Config{ + MinInputFiles: 10, // threshold higher than actual count (1) + MaxCommitRetries: 3, + } + + result, err := handler.rewriteManifests(context.Background(), client, setup.BucketName, setup.tablePath(), config) + if err != nil { + t.Fatalf("rewriteManifests failed: %v", err) + } + + if !strings.Contains(result, "below threshold") { + t.Errorf("expected 'below threshold', got %q", result) + } +} + +func TestFullExecuteFlow(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + {SnapshotID: 2, TimestampMs: now + 1, ManifestList: "metadata/snap-2.avro"}, + {SnapshotID: 3, TimestampMs: now + 2, ManifestList: "metadata/snap-3.avro"}, + }, + } + populateTable(t, fs, setup) + + // Add an orphan + metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") + fs.putEntry(metaDir, "orphan.avro", &filer_pb.Entry{ + Name: "orphan.avro", + Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Add(-200 * time.Hour).Unix()}, + }) + + handler := NewHandler(nil) + + // We need to build the request manually since Execute takes gRPC types + // but we're connecting directly + request := &plugin_pb.ExecuteJobRequest{ + Job: &plugin_pb.JobSpec{ + JobId: "test-job-1", + JobType: jobType, + Parameters: map[string]*plugin_pb.ConfigValue{ + "bucket_name": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: setup.BucketName}}, + "namespace": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: setup.Namespace}}, + "table_name": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: setup.TableName}}, + "table_path": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: setup.tablePath()}}, + "filer_address": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "not-used"}}, + }, + }, + WorkerConfigValues: map[string]*plugin_pb.ConfigValue{ + "snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, + "max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + "orphan_older_than_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 72}}, + "max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}}, + "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 100}}, // high threshold to skip rewrite + "operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "expire_snapshots,remove_orphans"}}, + }, + } + + // Execute uses grpc.NewClient internally, but we need to pass our existing + // client. Call operations directly instead of full Execute to avoid the + // grpc.NewClient call which requires a real address. + workerConfig := ParseConfig(request.GetWorkerConfigValues()) + ops, err := parseOperations(workerConfig.Operations) + if err != nil { + t.Fatalf("parseOperations: %v", err) + } + + var results []string + for _, op := range ops { + var opResult string + var opErr error + switch op { + case "expire_snapshots": + opResult, opErr = handler.expireSnapshots(context.Background(), client, setup.BucketName, setup.tablePath(), workerConfig) + case "remove_orphans": + opResult, opErr = handler.removeOrphans(context.Background(), client, setup.BucketName, setup.tablePath(), workerConfig) + case "rewrite_manifests": + opResult, opErr = handler.rewriteManifests(context.Background(), client, setup.BucketName, setup.tablePath(), workerConfig) + } + if opErr != nil { + t.Fatalf("operation %s failed: %v", op, opErr) + } + results = append(results, fmt.Sprintf("%s: %s", op, opResult)) + } + + t.Logf("Full execution results: %s", strings.Join(results, "; ")) + + // Verify snapshots were expired + if !strings.Contains(results[0], "expired") { + t.Errorf("expected snapshot expiration, got %q", results[0]) + } + + // Verify orphan was removed + if !strings.Contains(results[1], "removed") { + t.Errorf("expected orphan removal, got %q", results[1]) + } + if fs.getEntry(metaDir, "orphan.avro") != nil { + t.Error("orphan.avro should have been deleted") + } + +} + +func TestDetectWithFakeFiler(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + {SnapshotID: 2, TimestampMs: now + 1, ManifestList: "metadata/snap-2.avro"}, + {SnapshotID: 3, TimestampMs: now + 2, ManifestList: "metadata/snap-3.avro"}, + }, + } + populateTable(t, fs, setup) + + handler := NewHandler(nil) + + config := Config{ + SnapshotRetentionHours: 0, // everything is expired + MaxSnapshotsToKeep: 2, // 3 > 2, needs maintenance + MaxCommitRetries: 3, + } + + tables, err := handler.scanTablesForMaintenance( + context.Background(), + client, + config, + "", "", "", // no filters + 0, // no limit + ) + if err != nil { + t.Fatalf("scanTablesForMaintenance failed: %v", err) + } + + if len(tables) != 1 { + t.Fatalf("expected 1 table needing maintenance, got %d", len(tables)) + } + if tables[0].BucketName != setup.BucketName { + t.Errorf("expected bucket %q, got %q", setup.BucketName, tables[0].BucketName) + } + if tables[0].TableName != setup.TableName { + t.Errorf("expected table %q, got %q", setup.TableName, tables[0].TableName) + } +} + +func TestDetectWithFilters(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + // Create two tables in different buckets + setup1 := tableSetup{ + BucketName: "bucket-a", + Namespace: "ns", + TableName: "table1", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + {SnapshotID: 2, TimestampMs: now + 1, ManifestList: "metadata/snap-2.avro"}, + {SnapshotID: 3, TimestampMs: now + 2, ManifestList: "metadata/snap-3.avro"}, + }, + } + setup2 := tableSetup{ + BucketName: "bucket-b", + Namespace: "ns", + TableName: "table2", + Snapshots: []table.Snapshot{ + {SnapshotID: 4, TimestampMs: now + 3, ManifestList: "metadata/snap-4.avro"}, + {SnapshotID: 5, TimestampMs: now + 4, ManifestList: "metadata/snap-5.avro"}, + {SnapshotID: 6, TimestampMs: now + 5, ManifestList: "metadata/snap-6.avro"}, + }, + } + populateTable(t, fs, setup1) + populateTable(t, fs, setup2) + + handler := NewHandler(nil) + config := Config{ + SnapshotRetentionHours: 0, + MaxSnapshotsToKeep: 2, + MaxCommitRetries: 3, + } + + // Without filter: should find both + tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scan failed: %v", err) + } + if len(tables) != 2 { + t.Fatalf("expected 2 tables without filter, got %d", len(tables)) + } + + // With bucket filter: should find only one + tables, err = handler.scanTablesForMaintenance(context.Background(), client, config, "bucket-a", "", "", 0) + if err != nil { + t.Fatalf("scan with filter failed: %v", err) + } + if len(tables) != 1 { + t.Fatalf("expected 1 table with bucket filter, got %d", len(tables)) + } + if tables[0].BucketName != "bucket-a" { + t.Errorf("expected bucket-a, got %q", tables[0].BucketName) + } +} + +func TestStalePlanGuard(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "ns", + TableName: "tbl", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + }, + } + populateTable(t, fs, setup) + + handler := NewHandler(nil) + + // Call commitWithRetry with a stale plan that expects a different snapshot + config := Config{MaxCommitRetries: 1} + staleSnapshotID := int64(999) + + err := handler.commitWithRetry(context.Background(), client, setup.BucketName, setup.tablePath(), "v1.metadata.json", config, func(currentMeta table.Metadata, builder *table.MetadataBuilder) error { + cs := currentMeta.CurrentSnapshot() + if cs == nil || cs.SnapshotID != staleSnapshotID { + return errStalePlan + } + return nil + }) + + if err == nil { + t.Fatal("expected stale plan error") + } + if !strings.Contains(err.Error(), "stale plan") { + t.Errorf("expected stale plan in error, got %q", err) + } +} + +func TestMetadataVersionCAS(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "ns", + TableName: "tbl", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + }, + } + populateTable(t, fs, setup) + + // The table xattr has metadataVersion=1. Try updating with wrong expected version. + tableDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath()) + err := updateTableMetadataXattr(context.Background(), client, tableDir, 99, []byte(`{}`), "metadata/v100.metadata.json") + if err == nil { + t.Fatal("expected version conflict error") + } + if !strings.Contains(err.Error(), "metadata version conflict") { + t.Errorf("expected version conflict in error, got %q", err) + } + + // Correct expected version should succeed + err = updateTableMetadataXattr(context.Background(), client, tableDir, 1, []byte(`{}`), "metadata/v2.metadata.json") + if err != nil { + t.Fatalf("expected success with correct version, got: %v", err) + } + + // Verify version was incremented to 2 + entry := fs.getEntry(path.Dir(tableDir), path.Base(tableDir)) + if entry == nil { + t.Fatal("table entry not found after update") + } + var internalMeta map[string]json.RawMessage + if err := json.Unmarshal(entry.Extended[s3tables.ExtendedKeyMetadata], &internalMeta); err != nil { + t.Fatalf("unmarshal xattr: %v", err) + } + var version int + if err := json.Unmarshal(internalMeta["metadataVersion"], &version); err != nil { + t.Fatalf("unmarshal version: %v", err) + } + if version != 2 { + t.Errorf("expected version 2 after update, got %d", version) + } +} diff --git a/weed/plugin/worker/iceberg/filer_io.go b/weed/plugin/worker/iceberg/filer_io.go new file mode 100644 index 000000000..eddc42b0b --- /dev/null +++ b/weed/plugin/worker/iceberg/filer_io.go @@ -0,0 +1,358 @@ +package iceberg + +import ( + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "path" + "strings" + "time" + + "github.com/apache/iceberg-go/table" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// filerFileEntry holds a non-directory entry with its full directory path. +type filerFileEntry struct { + Dir string + Entry *filer_pb.Entry +} + +// listFilerEntries lists all entries in a directory. +func listFilerEntries(ctx context.Context, client filer_pb.SeaweedFilerClient, dir, prefix string) ([]*filer_pb.Entry, error) { + var entries []*filer_pb.Entry + var lastFileName string + limit := uint32(10000) + + for { + resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ + Directory: dir, + Prefix: prefix, + StartFromFileName: lastFileName, + InclusiveStartFrom: lastFileName == "", + Limit: limit, + }) + if err != nil { + // Treat not-found as empty directory; propagate other errors. + if status.Code(err) == codes.NotFound { + return entries, nil + } + return entries, fmt.Errorf("list entries in %s: %w", dir, err) + } + + count := 0 + for { + entry, recvErr := resp.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } + return entries, fmt.Errorf("recv entry in %s: %w", dir, recvErr) + } + if entry.Entry != nil { + entries = append(entries, entry.Entry) + lastFileName = entry.Entry.Name + count++ + } + } + + if count < int(limit) { + break + } + } + + return entries, nil +} + +// walkFilerEntries recursively lists all non-directory entries under dir. +func walkFilerEntries(ctx context.Context, client filer_pb.SeaweedFilerClient, dir string) ([]filerFileEntry, error) { + entries, err := listFilerEntries(ctx, client, dir, "") + if err != nil { + return nil, err + } + + var result []filerFileEntry + for _, entry := range entries { + if entry.IsDirectory { + subDir := path.Join(dir, entry.Name) + subEntries, err := walkFilerEntries(ctx, client, subDir) + if err != nil { + glog.V(2).Infof("iceberg maintenance: cannot walk %s: %v", subDir, err) + continue + } + result = append(result, subEntries...) + } else { + result = append(result, filerFileEntry{Dir: dir, Entry: entry}) + } + } + return result, nil +} + +// loadCurrentMetadata loads and parses the current Iceberg metadata from the table entry's xattr. +func loadCurrentMetadata(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string) (table.Metadata, string, error) { + dir := path.Join(s3tables.TablesPath, bucketName, path.Dir(tablePath)) + name := path.Base(tablePath) + + resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + }) + if err != nil { + return nil, "", fmt.Errorf("lookup table entry %s/%s: %w", dir, name, err) + } + if resp == nil || resp.Entry == nil { + return nil, "", fmt.Errorf("table entry not found: %s/%s", dir, name) + } + + metadataBytes, ok := resp.Entry.Extended[s3tables.ExtendedKeyMetadata] + if !ok || len(metadataBytes) == 0 { + return nil, "", fmt.Errorf("no metadata xattr on table entry %s/%s", dir, name) + } + + // Parse internal metadata to extract FullMetadata + var internalMeta struct { + MetadataVersion int `json:"metadataVersion"` + MetadataLocation string `json:"metadataLocation,omitempty"` + Metadata *struct { + FullMetadata json.RawMessage `json:"fullMetadata,omitempty"` + } `json:"metadata,omitempty"` + } + if err := json.Unmarshal(metadataBytes, &internalMeta); err != nil { + return nil, "", fmt.Errorf("unmarshal internal metadata: %w", err) + } + if internalMeta.Metadata == nil || len(internalMeta.Metadata.FullMetadata) == 0 { + return nil, "", fmt.Errorf("no fullMetadata in table xattr") + } + + meta, err := table.ParseMetadataBytes(internalMeta.Metadata.FullMetadata) + if err != nil { + return nil, "", fmt.Errorf("parse iceberg metadata: %w", err) + } + + // Use metadataLocation from xattr if available (includes nonce suffix), + // otherwise fall back to the canonical name derived from metadataVersion. + metadataFileName := path.Base(internalMeta.MetadataLocation) + if metadataFileName == "" || metadataFileName == "." { + metadataFileName = fmt.Sprintf("v%d.metadata.json", internalMeta.MetadataVersion) + } + return meta, metadataFileName, nil +} + +// loadFileByIcebergPath loads a file from the filer given an Iceberg-style path. +// Paths may be absolute filer paths, relative (metadata/..., data/...), or +// location-based (s3://bucket/ns/table/metadata/...). +// +// The function normalises the path to a relative form under the table root +// (e.g. "metadata/snap-1.avro" or "data/region=us/file.parquet") and splits +// it into the correct filer directory + entry name, so nested sub-directories +// are resolved properly. +func loadFileByIcebergPath(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath, icebergPath string) ([]byte, error) { + relPath := path.Clean(normalizeIcebergPath(icebergPath, bucketName, tablePath)) + relPath = strings.TrimPrefix(relPath, "/") + if relPath == "." || relPath == "" || strings.HasPrefix(relPath, "../") { + return nil, fmt.Errorf("invalid iceberg path %q", icebergPath) + } + + dir := path.Join(s3tables.TablesPath, bucketName, tablePath, path.Dir(relPath)) + fileName := path.Base(relPath) + + resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: fileName, + }) + if err != nil { + return nil, fmt.Errorf("lookup %s/%s: %w", dir, fileName, err) + } + if resp == nil || resp.Entry == nil { + return nil, fmt.Errorf("file not found: %s/%s", dir, fileName) + } + + // Inline content is available for small files (metadata, manifests, and + // manifest lists written by saveFilerFile). Larger files uploaded via S3 + // are stored as chunks with empty Content — detect this and return a + // clear error rather than silently returning empty data. + if len(resp.Entry.Content) == 0 && len(resp.Entry.Chunks) > 0 { + return nil, fmt.Errorf("file %s/%s is stored in chunks; only inline content is supported", dir, fileName) + } + + return resp.Entry.Content, nil +} + +// normalizeIcebergPath converts an Iceberg path (which may be an S3 URL, an +// absolute filer path, or a plain relative path) into a relative path under the +// table root, e.g. "metadata/snap-1.avro" or "data/region=us/file.parquet". +func normalizeIcebergPath(icebergPath, bucketName, tablePath string) string { + p := icebergPath + + // Strip scheme (e.g. "s3://bucket/ns/table/metadata/file" → "bucket/ns/table/metadata/file") + if idx := strings.Index(p, "://"); idx >= 0 { + p = p[idx+3:] + } + + // Strip any leading slash + p = strings.TrimPrefix(p, "/") + + // Strip bucket+tablePath prefix if present + // e.g. "mybucket/ns/table/metadata/file" → "metadata/file" + tablePrefix := path.Join(bucketName, tablePath) + "/" + if strings.HasPrefix(p, tablePrefix) { + return p[len(tablePrefix):] + } + + // Strip filer TablesPath prefix if present + // e.g. "buckets/mybucket/ns/table/metadata/file" → "metadata/file" + filerPrefix := strings.TrimPrefix(s3tables.TablesPath, "/") + fullPrefix := path.Join(filerPrefix, bucketName, tablePath) + "/" + if strings.HasPrefix(p, fullPrefix) { + return p[len(fullPrefix):] + } + + // Already relative (e.g. "metadata/snap-1.avro") + return p +} + +// saveFilerFile saves a file to the filer. +func saveFilerFile(ctx context.Context, client filer_pb.SeaweedFilerClient, dir, fileName string, content []byte) error { + resp, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: fileName, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0644), + FileSize: uint64(len(content)), + }, + Content: content, + }, + }) + if err != nil { + return fmt.Errorf("create entry %s/%s: %w", dir, fileName, err) + } + if resp.Error != "" { + return fmt.Errorf("create entry %s/%s: %s", dir, fileName, resp.Error) + } + return nil +} + +// deleteFilerFile deletes a file from the filer. +func deleteFilerFile(ctx context.Context, client filer_pb.SeaweedFilerClient, dir, fileName string) error { + return filer_pb.DoRemove(ctx, client, dir, fileName, true, false, true, false, nil) +} + +// updateTableMetadataXattr updates the table entry's metadata xattr with +// the new Iceberg metadata. It performs a compare-and-swap: if the stored +// metadataVersion does not match expectedVersion, it returns +// errMetadataVersionConflict so the caller can retry. +// newMetadataLocation is the table-relative path to the new metadata file +// (e.g. "metadata/v3.metadata.json"). +func updateTableMetadataXattr(ctx context.Context, client filer_pb.SeaweedFilerClient, tableDir string, expectedVersion int, newFullMetadata []byte, newMetadataLocation string) error { + tableName := path.Base(tableDir) + parentDir := path.Dir(tableDir) + + resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: parentDir, + Name: tableName, + }) + if err != nil { + return fmt.Errorf("lookup table entry: %w", err) + } + if resp == nil || resp.Entry == nil { + return fmt.Errorf("table entry not found") + } + + existingXattr, ok := resp.Entry.Extended[s3tables.ExtendedKeyMetadata] + if !ok { + return fmt.Errorf("no metadata xattr on table entry") + } + + // Parse existing xattr, update fullMetadata + var internalMeta map[string]json.RawMessage + if err := json.Unmarshal(existingXattr, &internalMeta); err != nil { + return fmt.Errorf("unmarshal existing xattr: %w", err) + } + + // Compare-and-swap: verify the stored metadataVersion matches what we expect. + // NOTE: This is a client-side CAS — two workers could both read the same + // version, pass this check, and race at UpdateEntry (last-write-wins). + // The proper fix is server-side precondition support on UpdateEntryRequest + // (e.g. expect-version or If-Match semantics). Until then, commitWithRetry + // with exponential backoff mitigates but does not eliminate the race. + // Avoid scheduling concurrent maintenance on the same table. + versionRaw, ok := internalMeta["metadataVersion"] + if !ok { + return fmt.Errorf("%w: metadataVersion field missing from xattr", errMetadataVersionConflict) + } + var storedVersion int + if err := json.Unmarshal(versionRaw, &storedVersion); err != nil { + return fmt.Errorf("%w: cannot parse metadataVersion: %v", errMetadataVersionConflict, err) + } + if storedVersion != expectedVersion { + return fmt.Errorf("%w: expected version %d, found %d", errMetadataVersionConflict, expectedVersion, storedVersion) + } + + // Update the metadata.fullMetadata field + var metadataObj map[string]json.RawMessage + if raw, ok := internalMeta["metadata"]; ok { + if err := json.Unmarshal(raw, &metadataObj); err != nil { + return fmt.Errorf("unmarshal metadata object: %w", err) + } + } else { + metadataObj = make(map[string]json.RawMessage) + } + metadataObj["fullMetadata"] = newFullMetadata + metadataJSON, err := json.Marshal(metadataObj) + if err != nil { + return fmt.Errorf("marshal metadata object: %w", err) + } + internalMeta["metadata"] = metadataJSON + + // Increment version + newVersion := expectedVersion + 1 + versionJSON, _ := json.Marshal(newVersion) + internalMeta["metadataVersion"] = versionJSON + + // Update modifiedAt + modifiedAt, _ := json.Marshal(time.Now().Format(time.RFC3339Nano)) + internalMeta["modifiedAt"] = modifiedAt + + // Update metadataLocation to point to the new metadata file + metaLocJSON, _ := json.Marshal(newMetadataLocation) + internalMeta["metadataLocation"] = metaLocJSON + + // Regenerate versionToken for consistency with the S3 Tables catalog + tokenJSON, _ := json.Marshal(generateIcebergVersionToken()) + internalMeta["versionToken"] = tokenJSON + + updatedXattr, err := json.Marshal(internalMeta) + if err != nil { + return fmt.Errorf("marshal updated xattr: %w", err) + } + + resp.Entry.Extended[s3tables.ExtendedKeyMetadata] = updatedXattr + _, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ + Directory: parentDir, + Entry: resp.Entry, + }) + if err != nil { + return fmt.Errorf("update table entry: %w", err) + } + return nil +} + +// generateIcebergVersionToken produces a random hex token, mirroring the +// logic in s3tables.generateVersionToken (which is unexported). +func generateIcebergVersionToken() string { + b := make([]byte, 16) + if _, err := rand.Read(b); err != nil { + return fmt.Sprintf("%x", time.Now().UnixNano()) + } + return hex.EncodeToString(b) +} diff --git a/weed/plugin/worker/iceberg/handler.go b/weed/plugin/worker/iceberg/handler.go new file mode 100644 index 000000000..071fe73f8 --- /dev/null +++ b/weed/plugin/worker/iceberg/handler.go @@ -0,0 +1,460 @@ +package iceberg + +import ( + "context" + "fmt" + "path" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// Handler implements the JobHandler interface for Iceberg table maintenance: +// snapshot expiration, orphan file removal, and manifest rewriting. +type Handler struct { + grpcDialOption grpc.DialOption +} + +// NewHandler creates a new handler for iceberg table maintenance. +func NewHandler(grpcDialOption grpc.DialOption) *Handler { + return &Handler{grpcDialOption: grpcDialOption} +} + +func (h *Handler) Capability() *plugin_pb.JobTypeCapability { + return &plugin_pb.JobTypeCapability{ + JobType: jobType, + CanDetect: true, + CanExecute: true, + MaxDetectionConcurrency: 1, + MaxExecutionConcurrency: 4, + DisplayName: "Iceberg Maintenance", + Description: "Compacts, expires snapshots, removes orphans, and rewrites manifests for Iceberg tables in S3 table buckets", + Weight: 50, + } +} + +func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { + return &plugin_pb.JobTypeDescriptor{ + JobType: jobType, + DisplayName: "Iceberg Maintenance", + Description: "Automated maintenance for Iceberg tables: snapshot expiration, orphan removal, manifest rewriting", + Icon: "fas fa-snowflake", + DescriptorVersion: 1, + AdminConfigForm: &plugin_pb.ConfigForm{ + FormId: "iceberg-maintenance-admin", + Title: "Iceberg Maintenance Admin Config", + Description: "Admin-side controls for Iceberg table maintenance scope.", + Sections: []*plugin_pb.ConfigSection{ + { + SectionId: "scope", + Title: "Scope", + Description: "Filters to restrict which tables are scanned for maintenance.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "bucket_filter", + Label: "Bucket Filter", + Description: "Comma-separated wildcard patterns for table buckets (* and ? supported). Blank = all.", + Placeholder: "prod-*, staging-*", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + { + Name: "namespace_filter", + Label: "Namespace Filter", + Description: "Comma-separated wildcard patterns for namespaces (* and ? supported). Blank = all.", + Placeholder: "analytics, events-*", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + { + Name: "table_filter", + Label: "Table Filter", + Description: "Comma-separated wildcard patterns for table names (* and ? supported). Blank = all.", + Placeholder: "clicks, orders-*", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + }, + }, + }, + DefaultValues: map[string]*plugin_pb.ConfigValue{ + "bucket_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "namespace_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "table_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + }, + }, + WorkerConfigForm: &plugin_pb.ConfigForm{ + FormId: "iceberg-maintenance-worker", + Title: "Iceberg Maintenance Worker Config", + Description: "Worker-side thresholds for maintenance operations.", + Sections: []*plugin_pb.ConfigSection{ + { + SectionId: "snapshots", + Title: "Snapshot Expiration", + Description: "Controls for automatic snapshot cleanup.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "snapshot_retention_hours", + Label: "Retention (hours)", + Description: "Expire snapshots older than this many hours.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + }, + { + Name: "max_snapshots_to_keep", + Label: "Max Snapshots", + Description: "Always keep at least this many most recent snapshots.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + }, + }, + }, + { + SectionId: "compaction", + Title: "Data Compaction", + Description: "Controls for bin-packing small Parquet data files.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "target_file_size_bytes", + Label: "Target File Size (bytes)", + Description: "Files smaller than this are candidates for compaction.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1024 * 1024}}, + }, + { + Name: "min_input_files", + Label: "Min Input Files", + Description: "Minimum number of small files in a partition to trigger compaction.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2}}, + }, + }, + }, + { + SectionId: "orphans", + Title: "Orphan Removal", + Description: "Controls for orphan file cleanup.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "orphan_older_than_hours", + Label: "Safety Window (hours)", + Description: "Only remove orphan files older than this many hours.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + }, + }, + }, + { + SectionId: "general", + Title: "General", + Description: "General maintenance settings.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "max_commit_retries", + Label: "Max Commit Retries", + Description: "Maximum number of commit retries on version conflict.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20}}, + }, + { + Name: "operations", + Label: "Operations", + Description: "Comma-separated list of operations to run: compact, expire_snapshots, remove_orphans, rewrite_manifests, or 'all'.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + }, + }, + }, + DefaultValues: map[string]*plugin_pb.ConfigValue{ + "target_file_size_bytes": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeBytes}}, + "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}}, + "snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}}, + "max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxSnapshotsToKeep}}, + "orphan_older_than_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultOrphanOlderThanHours}}, + "max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxCommitRetries}}, + "operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultOperations}}, + }, + }, + AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{ + Enabled: false, // disabled by default + DetectionIntervalSeconds: 3600, // 1 hour + DetectionTimeoutSeconds: 300, + MaxJobsPerDetection: 100, + GlobalExecutionConcurrency: 4, + PerWorkerExecutionConcurrency: 2, + RetryLimit: 1, + RetryBackoffSeconds: 60, + JobTypeMaxRuntimeSeconds: 3600, // 1 hour max + }, + WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ + "target_file_size_bytes": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeBytes}}, + "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}}, + "snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}}, + "max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxSnapshotsToKeep}}, + "orphan_older_than_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultOrphanOlderThanHours}}, + "max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxCommitRetries}}, + "operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultOperations}}, + }, + } +} + +func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionRequest, sender pluginworker.DetectionSender) error { + if request == nil { + return fmt.Errorf("run detection request is nil") + } + if sender == nil { + return fmt.Errorf("detection sender is nil") + } + if request.JobType != "" && request.JobType != jobType { + return fmt.Errorf("job type %q is not handled by iceberg maintenance handler", request.JobType) + } + + workerConfig := ParseConfig(request.GetWorkerConfigValues()) + if _, err := parseOperations(workerConfig.Operations); err != nil { + return fmt.Errorf("invalid operations config: %w", err) + } + + // Detection interval is managed by the scheduler via AdminRuntimeDefaults.DetectionIntervalSeconds. + + // Get filer addresses from cluster context + filerAddresses := make([]string, 0) + if request.ClusterContext != nil { + filerAddresses = append(filerAddresses, request.ClusterContext.FilerGrpcAddresses...) + } + if len(filerAddresses) == 0 { + _ = sender.SendActivity(pluginworker.BuildDetectorActivity("skipped", "no filer addresses in cluster context", nil)) + return h.sendEmptyDetection(sender) + } + + // Read scope filters + bucketFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "bucket_filter", "")) + namespaceFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "namespace_filter", "")) + tableFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "table_filter", "")) + + // Connect to filer to scan table buckets + filerAddress := filerAddresses[0] + conn, err := grpc.NewClient(filerAddress, h.grpcDialOption) + if err != nil { + return fmt.Errorf("connect to filer %s: %w", filerAddress, err) + } + defer conn.Close() + filerClient := filer_pb.NewSeaweedFilerClient(conn) + + maxResults := int(request.MaxResults) + tables, err := h.scanTablesForMaintenance(ctx, filerClient, workerConfig, bucketFilter, namespaceFilter, tableFilter, maxResults) + if err != nil { + _ = sender.SendActivity(pluginworker.BuildDetectorActivity("scan_error", fmt.Sprintf("error scanning tables: %v", err), nil)) + return fmt.Errorf("scan tables: %w", err) + } + + _ = sender.SendActivity(pluginworker.BuildDetectorActivity("scan_complete", + fmt.Sprintf("found %d table(s) needing maintenance", len(tables)), + map[string]*plugin_pb.ConfigValue{ + "tables_found": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(tables))}}, + })) + + hasMore := false + if maxResults > 0 && len(tables) > maxResults { + hasMore = true + tables = tables[:maxResults] + } + + proposals := make([]*plugin_pb.JobProposal, 0, len(tables)) + for _, t := range tables { + proposal := h.buildMaintenanceProposal(t, filerAddress) + proposals = append(proposals, proposal) + } + + if err := sender.SendProposals(&plugin_pb.DetectionProposals{ + JobType: jobType, + Proposals: proposals, + HasMore: hasMore, + }); err != nil { + return err + } + + return sender.SendComplete(&plugin_pb.DetectionComplete{ + JobType: jobType, + Success: true, + TotalProposals: int32(len(proposals)), + }) +} + +func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequest, sender pluginworker.ExecutionSender) error { + if request == nil || request.Job == nil { + return fmt.Errorf("execute request/job is nil") + } + if sender == nil { + return fmt.Errorf("execution sender is nil") + } + if request.Job.JobType != "" && request.Job.JobType != jobType { + return fmt.Errorf("job type %q is not handled by iceberg maintenance handler", request.Job.JobType) + } + canonicalJobType := request.Job.JobType + if canonicalJobType == "" { + canonicalJobType = jobType + } + + params := request.Job.Parameters + bucketName := readStringConfig(params, "bucket_name", "") + namespace := readStringConfig(params, "namespace", "") + tableName := readStringConfig(params, "table_name", "") + tablePath := readStringConfig(params, "table_path", "") + filerAddress := readStringConfig(params, "filer_address", "") + + if bucketName == "" || namespace == "" || tableName == "" || filerAddress == "" { + return fmt.Errorf("missing required parameters: bucket_name=%q, namespace=%q, table_name=%q, filer_address=%q", bucketName, namespace, tableName, filerAddress) + } + if tablePath == "" { + tablePath = path.Join(namespace, tableName) + } + // Sanitize tablePath to prevent directory traversal. + tablePath = path.Clean(tablePath) + expected := path.Join(namespace, tableName) + if tablePath != expected && !strings.HasPrefix(tablePath, expected+"/") { + return fmt.Errorf("invalid table_path %q: must be %q or a subpath", tablePath, expected) + } + + workerConfig := ParseConfig(request.GetWorkerConfigValues()) + ops, opsErr := parseOperations(workerConfig.Operations) + if opsErr != nil { + return fmt.Errorf("invalid operations config: %w", opsErr) + } + + // Send initial progress + if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: canonicalJobType, + State: plugin_pb.JobState_JOB_STATE_ASSIGNED, + ProgressPercent: 0, + Stage: "assigned", + Message: fmt.Sprintf("maintenance job accepted for %s/%s/%s", bucketName, namespace, tableName), + Activities: []*plugin_pb.ActivityEvent{ + pluginworker.BuildExecutorActivity("assigned", fmt.Sprintf("maintenance job accepted for %s/%s/%s", bucketName, namespace, tableName)), + }, + }); err != nil { + return err + } + + // Connect to filer + conn, err := grpc.NewClient(filerAddress, h.grpcDialOption) + if err != nil { + return fmt.Errorf("connect to filer %s: %w", filerAddress, err) + } + defer conn.Close() + filerClient := filer_pb.NewSeaweedFilerClient(conn) + + var results []string + var lastErr error + totalOps := len(ops) + completedOps := 0 + + // Execute operations in correct Iceberg maintenance order: + // expire_snapshots → remove_orphans → rewrite_manifests + for _, op := range ops { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + progress := float64(completedOps) / float64(totalOps) * 100 + if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: canonicalJobType, + State: plugin_pb.JobState_JOB_STATE_RUNNING, + ProgressPercent: progress, + Stage: op, + Message: fmt.Sprintf("running %s", op), + Activities: []*plugin_pb.ActivityEvent{ + pluginworker.BuildExecutorActivity(op, fmt.Sprintf("starting %s for %s/%s/%s", op, bucketName, namespace, tableName)), + }, + }); err != nil { + return err + } + + var opResult string + var opErr error + + switch op { + case "compact": + opResult, opErr = h.compactDataFiles(ctx, filerClient, bucketName, tablePath, workerConfig) + case "expire_snapshots": + opResult, opErr = h.expireSnapshots(ctx, filerClient, bucketName, tablePath, workerConfig) + case "remove_orphans": + opResult, opErr = h.removeOrphans(ctx, filerClient, bucketName, tablePath, workerConfig) + case "rewrite_manifests": + opResult, opErr = h.rewriteManifests(ctx, filerClient, bucketName, tablePath, workerConfig) + default: + glog.Warningf("unknown maintenance operation: %s", op) + continue + } + + completedOps++ + if opErr != nil { + glog.Warningf("iceberg maintenance %s failed for %s/%s/%s: %v", op, bucketName, namespace, tableName, opErr) + results = append(results, fmt.Sprintf("%s: error: %v", op, opErr)) + lastErr = opErr + } else { + results = append(results, fmt.Sprintf("%s: %s", op, opResult)) + } + } + + resultSummary := strings.Join(results, "; ") + success := lastErr == nil + + return sender.SendCompleted(&plugin_pb.JobCompleted{ + JobId: request.Job.JobId, + JobType: canonicalJobType, + Success: success, + ErrorMessage: func() string { + if lastErr != nil { + return lastErr.Error() + } + return "" + }(), + Result: &plugin_pb.JobResult{ + Summary: resultSummary, + OutputValues: map[string]*plugin_pb.ConfigValue{ + "bucket": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: bucketName}}, + "namespace": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: namespace}}, + "table": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: tableName}}, + }, + }, + Activities: []*plugin_pb.ActivityEvent{ + pluginworker.BuildExecutorActivity("completed", resultSummary), + }, + CompletedAt: timestamppb.Now(), + }) +} + +func (h *Handler) sendEmptyDetection(sender pluginworker.DetectionSender) error { + if err := sender.SendProposals(&plugin_pb.DetectionProposals{ + JobType: jobType, + Proposals: []*plugin_pb.JobProposal{}, + HasMore: false, + }); err != nil { + return err + } + return sender.SendComplete(&plugin_pb.DetectionComplete{ + JobType: jobType, + Success: true, + TotalProposals: 0, + }) +} + +// Ensure Handler implements JobHandler. +var _ pluginworker.JobHandler = (*Handler)(nil) diff --git a/weed/plugin/worker/iceberg/handler_test.go b/weed/plugin/worker/iceberg/handler_test.go new file mode 100644 index 000000000..8727d26d5 --- /dev/null +++ b/weed/plugin/worker/iceberg/handler_test.go @@ -0,0 +1,613 @@ +package iceberg + +import ( + "bytes" + "fmt" + "path" + "testing" + "time" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" +) + +func TestParseConfig(t *testing.T) { + config := ParseConfig(nil) + + if config.SnapshotRetentionHours != defaultSnapshotRetentionHours { + t.Errorf("expected SnapshotRetentionHours=%d, got %d", defaultSnapshotRetentionHours, config.SnapshotRetentionHours) + } + if config.MaxSnapshotsToKeep != defaultMaxSnapshotsToKeep { + t.Errorf("expected MaxSnapshotsToKeep=%d, got %d", defaultMaxSnapshotsToKeep, config.MaxSnapshotsToKeep) + } + if config.OrphanOlderThanHours != defaultOrphanOlderThanHours { + t.Errorf("expected OrphanOlderThanHours=%d, got %d", defaultOrphanOlderThanHours, config.OrphanOlderThanHours) + } + if config.MaxCommitRetries != defaultMaxCommitRetries { + t.Errorf("expected MaxCommitRetries=%d, got %d", defaultMaxCommitRetries, config.MaxCommitRetries) + } + if config.Operations != defaultOperations { + t.Errorf("expected Operations=%q, got %q", defaultOperations, config.Operations) + } +} + +func TestParseOperations(t *testing.T) { + tests := []struct { + input string + expected []string + wantErr bool + }{ + {"all", []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false}, + {"", []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false}, + {"expire_snapshots", []string{"expire_snapshots"}, false}, + {"compact", []string{"compact"}, false}, + {"rewrite_manifests,expire_snapshots", []string{"expire_snapshots", "rewrite_manifests"}, false}, + {"compact,expire_snapshots", []string{"compact", "expire_snapshots"}, false}, + {"remove_orphans, rewrite_manifests", []string{"remove_orphans", "rewrite_manifests"}, false}, + {"expire_snapshots,remove_orphans,rewrite_manifests", []string{"expire_snapshots", "remove_orphans", "rewrite_manifests"}, false}, + {"compact,expire_snapshots,remove_orphans,rewrite_manifests", []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false}, + {"unknown_op", nil, true}, + {"expire_snapshots,bad_op", nil, true}, + } + + for _, tc := range tests { + result, err := parseOperations(tc.input) + if tc.wantErr { + if err == nil { + t.Errorf("parseOperations(%q) expected error, got %v", tc.input, result) + } + continue + } + if err != nil { + t.Errorf("parseOperations(%q) unexpected error: %v", tc.input, err) + continue + } + if len(result) != len(tc.expected) { + t.Errorf("parseOperations(%q) = %v, want %v", tc.input, result, tc.expected) + continue + } + for i := range result { + if result[i] != tc.expected[i] { + t.Errorf("parseOperations(%q)[%d] = %q, want %q", tc.input, i, result[i], tc.expected[i]) + } + } + } +} + +func TestExtractMetadataVersion(t *testing.T) { + tests := []struct { + input string + expected int + }{ + {"v1.metadata.json", 1}, + {"v5.metadata.json", 5}, + {"v100.metadata.json", 100}, + {"v0.metadata.json", 0}, + {"invalid.metadata.json", 0}, + {"metadata.json", 0}, + {"", 0}, + {"v.metadata.json", 0}, + {"v7-1709766000.metadata.json", 7}, + {"v42-abc123.metadata.json", 42}, + {"v5-.metadata.json", 5}, + } + + for _, tc := range tests { + result := extractMetadataVersion(tc.input) + if result != tc.expected { + t.Errorf("extractMetadataVersion(%q) = %d, want %d", tc.input, result, tc.expected) + } + } +} + +func TestNeedsMaintenanceNoSnapshots(t *testing.T) { + config := Config{ + SnapshotRetentionHours: 24, + MaxSnapshotsToKeep: 2, + } + + meta := buildTestMetadata(t, nil) + if needsMaintenance(meta, config) { + t.Error("expected no maintenance for table with no snapshots") + } +} + +func TestNeedsMaintenanceExceedsMaxSnapshots(t *testing.T) { + config := Config{ + SnapshotRetentionHours: 24 * 365, // very long retention + MaxSnapshotsToKeep: 2, + } + + now := time.Now().UnixMilli() + snapshots := []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + {SnapshotID: 2, TimestampMs: now + 1, ManifestList: "metadata/snap-2.avro"}, + {SnapshotID: 3, TimestampMs: now + 2, ManifestList: "metadata/snap-3.avro"}, + } + meta := buildTestMetadata(t, snapshots) + if !needsMaintenance(meta, config) { + t.Error("expected maintenance for table exceeding max snapshots") + } +} + +func TestNeedsMaintenanceWithinLimits(t *testing.T) { + config := Config{ + SnapshotRetentionHours: 24 * 365, // very long retention + MaxSnapshotsToKeep: 5, + } + + now := time.Now().UnixMilli() + snapshots := []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + } + meta := buildTestMetadata(t, snapshots) + if needsMaintenance(meta, config) { + t.Error("expected no maintenance for table within limits") + } +} + +func TestNeedsMaintenanceOldSnapshot(t *testing.T) { + // Use a retention of 0 hours so that any snapshot is considered "old" + config := Config{ + SnapshotRetentionHours: 0, // instant expiry + MaxSnapshotsToKeep: 10, + } + + now := time.Now().UnixMilli() + snapshots := []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now - 1, ManifestList: "metadata/snap-1.avro"}, + } + meta := buildTestMetadata(t, snapshots) + // With 0 retention, any snapshot with timestamp < now should need maintenance + if !needsMaintenance(meta, config) { + t.Error("expected maintenance for table with expired snapshot") + } +} + +func TestCapabilityAndDescriptor(t *testing.T) { + handler := NewHandler(nil) + + cap := handler.Capability() + if cap.JobType != jobType { + t.Errorf("expected job type %q, got %q", jobType, cap.JobType) + } + if !cap.CanDetect { + t.Error("expected CanDetect=true") + } + if !cap.CanExecute { + t.Error("expected CanExecute=true") + } + + desc := handler.Descriptor() + if desc.JobType != jobType { + t.Errorf("expected job type %q, got %q", jobType, desc.JobType) + } + if desc.AdminConfigForm == nil { + t.Error("expected admin config form") + } + if desc.WorkerConfigForm == nil { + t.Error("expected worker config form") + } + if desc.AdminRuntimeDefaults == nil { + t.Error("expected admin runtime defaults") + } + if desc.AdminRuntimeDefaults.Enabled { + t.Error("expected disabled by default") + } +} + +func TestBuildMaintenanceProposal(t *testing.T) { + handler := NewHandler(nil) + + now := time.Now().UnixMilli() + snapshots := []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now}, + {SnapshotID: 2, TimestampMs: now + 1}, + } + meta := buildTestMetadata(t, snapshots) + + info := tableInfo{ + BucketName: "my-bucket", + Namespace: "analytics", + TableName: "events", + TablePath: "analytics/events", + Metadata: meta, + } + + proposal := handler.buildMaintenanceProposal(info, "localhost:8888") + + expectedDedupe := "iceberg_maintenance:my-bucket/analytics/events" + if proposal.DedupeKey != expectedDedupe { + t.Errorf("expected dedupe key %q, got %q", expectedDedupe, proposal.DedupeKey) + } + if proposal.JobType != jobType { + t.Errorf("expected job type %q, got %q", jobType, proposal.JobType) + } + + if readStringConfig(proposal.Parameters, "bucket_name", "") != "my-bucket" { + t.Error("expected bucket_name=my-bucket in parameters") + } + if readStringConfig(proposal.Parameters, "namespace", "") != "analytics" { + t.Error("expected namespace=analytics in parameters") + } + if readStringConfig(proposal.Parameters, "table_name", "") != "events" { + t.Error("expected table_name=events in parameters") + } + if readStringConfig(proposal.Parameters, "filer_address", "") != "localhost:8888" { + t.Error("expected filer_address=localhost:8888 in parameters") + } +} + +func TestManifestRewritePathConsistency(t *testing.T) { + // Verify that WriteManifest returns a ManifestFile whose FilePath() + // matches the path we pass in. This ensures the pattern used in + // rewriteManifests (compute filename once, pass to both WriteManifest + // and saveFilerFile) produces consistent references. + schema := newTestSchema() + spec := *iceberg.UnpartitionedSpec + + snapshotID := int64(42) + manifestFileName := fmt.Sprintf("merged-%d-%d.avro", snapshotID, int64(1700000000000)) + manifestPath := "metadata/" + manifestFileName + + // Create a minimal manifest entry to write + dfBuilder, err := iceberg.NewDataFileBuilder( + spec, + iceberg.EntryContentData, + "data/test.parquet", + iceberg.ParquetFile, + map[int]any{}, + nil, nil, + 1, // recordCount + 1024, // fileSizeBytes + ) + if err != nil { + t.Fatalf("failed to build data file: %v", err) + } + entry := iceberg.NewManifestEntry( + iceberg.EntryStatusADDED, + &snapshotID, + nil, nil, + dfBuilder.Build(), + ) + + var buf bytes.Buffer + mf, err := iceberg.WriteManifest( + manifestPath, + &buf, + 2, // version + spec, + schema, + snapshotID, + []iceberg.ManifestEntry{entry}, + ) + if err != nil { + t.Fatalf("WriteManifest failed: %v", err) + } + + if mf.FilePath() != manifestPath { + t.Errorf("manifest FilePath() = %q, want %q", mf.FilePath(), manifestPath) + } + + // Verify the filename we'd use for saveFilerFile matches + if path.Base(mf.FilePath()) != manifestFileName { + t.Errorf("manifest base name = %q, want %q", path.Base(mf.FilePath()), manifestFileName) + } +} + +func TestManifestRewriteNestedPathConsistency(t *testing.T) { + // Verify that WriteManifest with nested paths preserves the full path + // and that loadFileByIcebergPath (via normalizeIcebergPath) would + // resolve them correctly. + schema := newTestSchema() + spec := *iceberg.UnpartitionedSpec + snapshotID := int64(42) + + testCases := []struct { + name string + manifestPath string + }{ + {"nested two levels", "metadata/a/b/merged-42-1700000000000.avro"}, + {"nested one level", "metadata/subdir/manifest-42.avro"}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dfBuilder, err := iceberg.NewDataFileBuilder( + spec, + iceberg.EntryContentData, + "data/test.parquet", + iceberg.ParquetFile, + map[int]any{}, + nil, nil, + 1, 1024, + ) + if err != nil { + t.Fatalf("failed to build data file: %v", err) + } + entry := iceberg.NewManifestEntry( + iceberg.EntryStatusADDED, + &snapshotID, + nil, nil, + dfBuilder.Build(), + ) + + var buf bytes.Buffer + mf, err := iceberg.WriteManifest( + tc.manifestPath, &buf, 2, spec, schema, snapshotID, + []iceberg.ManifestEntry{entry}, + ) + if err != nil { + t.Fatalf("WriteManifest failed: %v", err) + } + + if mf.FilePath() != tc.manifestPath { + t.Errorf("FilePath() = %q, want %q", mf.FilePath(), tc.manifestPath) + } + + // normalizeIcebergPath should return the path unchanged when already relative + normalized := normalizeIcebergPath(tc.manifestPath, "bucket", "ns/table") + if normalized != tc.manifestPath { + t.Errorf("normalizeIcebergPath(%q) = %q, want %q", tc.manifestPath, normalized, tc.manifestPath) + } + + // Verify normalization strips S3 scheme prefix correctly + s3Path := "s3://bucket/ns/table/" + tc.manifestPath + normalized = normalizeIcebergPath(s3Path, "bucket", "ns/table") + if normalized != tc.manifestPath { + t.Errorf("normalizeIcebergPath(%q) = %q, want %q", s3Path, normalized, tc.manifestPath) + } + }) + } +} + +func TestNormalizeIcebergPath(t *testing.T) { + tests := []struct { + name string + icebergPath string + bucket string + tablePath string + expected string + }{ + { + "relative metadata path", + "metadata/snap-1.avro", + "mybucket", "ns/table", + "metadata/snap-1.avro", + }, + { + "relative data path", + "data/file.parquet", + "mybucket", "ns/table", + "data/file.parquet", + }, + { + "S3 URL", + "s3://mybucket/ns/table/metadata/snap-1.avro", + "mybucket", "ns/table", + "metadata/snap-1.avro", + }, + { + "absolute filer path", + "/buckets/mybucket/ns/table/data/file.parquet", + "mybucket", "ns/table", + "data/file.parquet", + }, + { + "nested data path", + "data/region=us/city=sf/file.parquet", + "mybucket", "ns/table", + "data/region=us/city=sf/file.parquet", + }, + { + "S3 URL nested", + "s3://mybucket/ns/table/data/region=us/file.parquet", + "mybucket", "ns/table", + "data/region=us/file.parquet", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + result := normalizeIcebergPath(tc.icebergPath, tc.bucket, tc.tablePath) + if result != tc.expected { + t.Errorf("normalizeIcebergPath(%q, %q, %q) = %q, want %q", + tc.icebergPath, tc.bucket, tc.tablePath, result, tc.expected) + } + }) + } +} + +func TestPartitionKey(t *testing.T) { + tests := []struct { + name string + partition map[int]any + expected string + }{ + {"empty partition", map[int]any{}, "__unpartitioned__"}, + {"nil partition", nil, "__unpartitioned__"}, + {"single field", map[int]any{1: "us-east"}, "1=\"us-east\""}, + {"multiple fields sorted", map[int]any{3: "2024", 1: "us-east"}, "1=\"us-east\"\x003=\"2024\""}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + result := partitionKey(tc.partition) + if result != tc.expected { + t.Errorf("partitionKey(%v) = %q, want %q", tc.partition, result, tc.expected) + } + }) + } +} + +func TestBuildCompactionBins(t *testing.T) { + targetSize := int64(256 * 1024 * 1024) // 256MB + minFiles := 3 + + // Create test entries: small files in same partition + entries := makeTestEntries(t, []testEntrySpec{ + {path: "data/f1.parquet", size: 1024, partition: map[int]any{}}, + {path: "data/f2.parquet", size: 2048, partition: map[int]any{}}, + {path: "data/f3.parquet", size: 4096, partition: map[int]any{}}, + }) + + bins := buildCompactionBins(entries, targetSize, minFiles) + if len(bins) != 1 { + t.Fatalf("expected 1 bin, got %d", len(bins)) + } + if len(bins[0].Entries) != 3 { + t.Errorf("expected 3 entries in bin, got %d", len(bins[0].Entries)) + } +} + +func TestBuildCompactionBinsFiltersLargeFiles(t *testing.T) { + targetSize := int64(4000) + minFiles := 2 + + entries := makeTestEntries(t, []testEntrySpec{ + {path: "data/small1.parquet", size: 1024, partition: map[int]any{}}, + {path: "data/small2.parquet", size: 2048, partition: map[int]any{}}, + {path: "data/large.parquet", size: 5000, partition: map[int]any{}}, + }) + + bins := buildCompactionBins(entries, targetSize, minFiles) + if len(bins) != 1 { + t.Fatalf("expected 1 bin, got %d", len(bins)) + } + if len(bins[0].Entries) != 2 { + t.Errorf("expected 2 entries (large excluded), got %d", len(bins[0].Entries)) + } +} + +func TestBuildCompactionBinsMinFilesThreshold(t *testing.T) { + targetSize := int64(256 * 1024 * 1024) + minFiles := 5 + + entries := makeTestEntries(t, []testEntrySpec{ + {path: "data/f1.parquet", size: 1024, partition: map[int]any{}}, + {path: "data/f2.parquet", size: 2048, partition: map[int]any{}}, + }) + + bins := buildCompactionBins(entries, targetSize, minFiles) + if len(bins) != 0 { + t.Errorf("expected 0 bins (below min threshold), got %d", len(bins)) + } +} + +func TestBuildCompactionBinsMultiplePartitions(t *testing.T) { + targetSize := int64(256 * 1024 * 1024) + minFiles := 2 + + partA := map[int]any{1: "us-east"} + partB := map[int]any{1: "eu-west"} + + entries := makeTestEntries(t, []testEntrySpec{ + {path: "data/a1.parquet", size: 1024, partition: partA}, + {path: "data/a2.parquet", size: 2048, partition: partA}, + {path: "data/b1.parquet", size: 1024, partition: partB}, + {path: "data/b2.parquet", size: 2048, partition: partB}, + {path: "data/b3.parquet", size: 4096, partition: partB}, + }) + + bins := buildCompactionBins(entries, targetSize, minFiles) + if len(bins) != 2 { + t.Fatalf("expected 2 bins (one per partition), got %d", len(bins)) + } +} + +type testEntrySpec struct { + path string + size int64 + partition map[int]any +} + +func makeTestEntries(t *testing.T, specs []testEntrySpec) []iceberg.ManifestEntry { + t.Helper() + entries := make([]iceberg.ManifestEntry, 0, len(specs)) + for _, spec := range specs { + dfBuilder, err := iceberg.NewDataFileBuilder( + *iceberg.UnpartitionedSpec, + iceberg.EntryContentData, + spec.path, + iceberg.ParquetFile, + spec.partition, + nil, nil, + 1, // recordCount (must be > 0) + spec.size, + ) + if err != nil { + t.Fatalf("failed to build data file %s: %v", spec.path, err) + } + snapID := int64(1) + entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build()) + entries = append(entries, entry) + } + return entries +} + +func TestDetectNilRequest(t *testing.T) { + handler := NewHandler(nil) + err := handler.Detect(nil, nil, nil) + if err == nil { + t.Error("expected error for nil request") + } +} + +func TestExecuteNilRequest(t *testing.T) { + handler := NewHandler(nil) + err := handler.Execute(nil, nil, nil) + if err == nil { + t.Error("expected error for nil request") + } +} + +// --------------------------------------------------------------------------- +// Test helpers +// --------------------------------------------------------------------------- + +// buildTestMetadata creates a minimal Iceberg metadata for testing. +// When snapshots is nil or empty, the metadata has no snapshots. +func buildTestMetadata(t *testing.T, snapshots []table.Snapshot) table.Metadata { + t.Helper() + + schema := newTestSchema() + meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec, table.UnsortedSortOrder, "s3://test-bucket/test-table", nil) + if err != nil { + t.Fatalf("failed to create test metadata: %v", err) + } + + if len(snapshots) == 0 { + return meta + } + + builder, err := table.MetadataBuilderFromBase(meta, "s3://test-bucket/test-table") + if err != nil { + t.Fatalf("failed to create metadata builder: %v", err) + } + + var lastSnapID int64 + for _, snap := range snapshots { + s := snap // copy + if err := builder.AddSnapshot(&s); err != nil { + t.Fatalf("failed to add snapshot %d: %v", snap.SnapshotID, err) + } + lastSnapID = snap.SnapshotID + } + + if err := builder.SetSnapshotRef(table.MainBranch, lastSnapID, table.BranchRef); err != nil { + t.Fatalf("failed to set snapshot ref: %v", err) + } + + result, err := builder.Build() + if err != nil { + t.Fatalf("failed to build metadata: %v", err) + } + return result +} + +func newTestSchema() *iceberg.Schema { + return iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Type: iceberg.PrimitiveTypes.Int64, Name: "id", Required: true}, + iceberg.NestedField{ID: 2, Type: iceberg.PrimitiveTypes.String, Name: "name", Required: false}, + ) +} diff --git a/weed/plugin/worker/iceberg/operations.go b/weed/plugin/worker/iceberg/operations.go new file mode 100644 index 000000000..607283c21 --- /dev/null +++ b/weed/plugin/worker/iceberg/operations.go @@ -0,0 +1,604 @@ +package iceberg + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "math/rand/v2" + "path" + "sort" + "strings" + "time" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" +) + +// errStalePlan is returned by a commit mutation when the table head has +// advanced since planning. The caller should not retry the same plan. +var errStalePlan = errors.New("stale plan: table head changed since planning") + +// errMetadataVersionConflict is returned when the xattr update detects a +// concurrent metadata version change (compare-and-swap failure). +var errMetadataVersionConflict = errors.New("metadata version conflict") + +// --------------------------------------------------------------------------- +// Operation: Expire Snapshots +// --------------------------------------------------------------------------- + +// expireSnapshots removes old snapshots from the table metadata and cleans up +// their manifest list files. +func (h *Handler) expireSnapshots( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + config Config, +) (string, error) { + // Load current metadata + meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath) + if err != nil { + return "", fmt.Errorf("load metadata: %w", err) + } + + snapshots := meta.Snapshots() + if len(snapshots) == 0 { + return "no snapshots", nil + } + + // Determine which snapshots to expire + currentSnap := meta.CurrentSnapshot() + var currentSnapID int64 + if currentSnap != nil { + currentSnapID = currentSnap.SnapshotID + } + + retentionMs := config.SnapshotRetentionHours * 3600 * 1000 + nowMs := time.Now().UnixMilli() + + // Sort snapshots by timestamp descending (most recent first) so that + // the keep-count logic always preserves the newest snapshots. + sorted := make([]table.Snapshot, len(snapshots)) + copy(sorted, snapshots) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].TimestampMs > sorted[j].TimestampMs + }) + + // Walk from newest to oldest. The current snapshot is always kept. + // Among the remaining, keep up to MaxSnapshotsToKeep-1 (since current + // counts toward the quota). Expire the rest only if they exceed the + // retention window; snapshots within the window are kept regardless. + var toExpire []int64 + var kept int64 + for _, snap := range sorted { + if snap.SnapshotID == currentSnapID { + kept++ + continue + } + age := nowMs - snap.TimestampMs + if kept < config.MaxSnapshotsToKeep { + kept++ + continue + } + if age > retentionMs { + toExpire = append(toExpire, snap.SnapshotID) + } else { + kept++ + } + } + + if len(toExpire) == 0 { + return "no snapshots expired", nil + } + + // Split snapshots into expired and kept sets + expireSet := make(map[int64]struct{}, len(toExpire)) + for _, id := range toExpire { + expireSet[id] = struct{}{} + } + var expiredSnaps, keptSnaps []table.Snapshot + for _, snap := range sorted { + if _, ok := expireSet[snap.SnapshotID]; ok { + expiredSnaps = append(expiredSnaps, snap) + } else { + keptSnaps = append(keptSnaps, snap) + } + } + + // Collect all files referenced by each set before modifying metadata. + // This lets us determine which files become unreferenced. + expiredFiles, err := collectSnapshotFiles(ctx, filerClient, bucketName, tablePath, expiredSnaps) + if err != nil { + return "", fmt.Errorf("collect expired snapshot files: %w", err) + } + keptFiles, err := collectSnapshotFiles(ctx, filerClient, bucketName, tablePath, keptSnaps) + if err != nil { + return "", fmt.Errorf("collect kept snapshot files: %w", err) + } + + // Normalize kept file paths for consistent comparison + normalizedKept := make(map[string]struct{}, len(keptFiles)) + for f := range keptFiles { + normalizedKept[normalizeIcebergPath(f, bucketName, tablePath)] = struct{}{} + } + + // Use MetadataBuilder to remove snapshots and create new metadata + err = h.commitWithRetry(ctx, filerClient, bucketName, tablePath, metadataFileName, config, func(currentMeta table.Metadata, builder *table.MetadataBuilder) error { + // Guard: verify table head hasn't changed since we planned + cs := currentMeta.CurrentSnapshot() + if (cs == nil) != (currentSnapID == 0) || (cs != nil && cs.SnapshotID != currentSnapID) { + return errStalePlan + } + return builder.RemoveSnapshots(toExpire) + }) + if err != nil { + return "", fmt.Errorf("commit snapshot expiration: %w", err) + } + + // Delete files exclusively referenced by expired snapshots (best-effort) + tableBasePath := path.Join(s3tables.TablesPath, bucketName, tablePath) + deletedCount := 0 + for filePath := range expiredFiles { + normalized := normalizeIcebergPath(filePath, bucketName, tablePath) + if _, stillReferenced := normalizedKept[normalized]; stillReferenced { + continue + } + dir := path.Join(tableBasePath, path.Dir(normalized)) + fileName := path.Base(normalized) + if delErr := deleteFilerFile(ctx, filerClient, dir, fileName); delErr != nil { + glog.Warningf("iceberg maintenance: failed to delete unreferenced file %s: %v", filePath, delErr) + } else { + deletedCount++ + } + } + + return fmt.Sprintf("expired %d snapshot(s), deleted %d unreferenced file(s)", len(toExpire), deletedCount), nil +} + +// collectSnapshotFiles returns all file paths (manifest lists, manifest files, +// data files) referenced by the given snapshots. It returns an error if any +// manifest list or manifest cannot be read/parsed, to prevent delete decisions +// based on incomplete reference data. +func collectSnapshotFiles( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + snapshots []table.Snapshot, +) (map[string]struct{}, error) { + files := make(map[string]struct{}) + for _, snap := range snapshots { + if snap.ManifestList == "" { + continue + } + files[snap.ManifestList] = struct{}{} + + manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, snap.ManifestList) + if err != nil { + return nil, fmt.Errorf("read manifest list %s: %w", snap.ManifestList, err) + } + manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData)) + if err != nil { + return nil, fmt.Errorf("parse manifest list %s: %w", snap.ManifestList, err) + } + + for _, mf := range manifests { + files[mf.FilePath()] = struct{}{} + + manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath()) + if err != nil { + return nil, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err) + } + entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), false) + if err != nil { + return nil, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err) + } + for _, entry := range entries { + files[entry.DataFile().FilePath()] = struct{}{} + } + } + } + return files, nil +} + +// --------------------------------------------------------------------------- +// Operation: Remove Orphans +// --------------------------------------------------------------------------- + +// removeOrphans finds and deletes unreferenced files from the table's +// metadata/ and data/ directories. +func (h *Handler) removeOrphans( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + config Config, +) (string, error) { + // Load current metadata + meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath) + if err != nil { + return "", fmt.Errorf("load metadata: %w", err) + } + + // Collect all referenced files from all snapshots + referencedFiles, err := collectSnapshotFiles(ctx, filerClient, bucketName, tablePath, meta.Snapshots()) + if err != nil { + return "", fmt.Errorf("collect referenced files: %w", err) + } + + // Reference the active metadata file so it is not treated as orphan + referencedFiles[path.Join("metadata", metadataFileName)] = struct{}{} + + // Also reference the current metadata files + for mle := range meta.PreviousFiles() { + referencedFiles[mle.MetadataFile] = struct{}{} + } + + // Precompute a normalized lookup set so orphan checks are O(1) per file. + normalizedRefs := make(map[string]struct{}, len(referencedFiles)) + for ref := range referencedFiles { + normalizedRefs[ref] = struct{}{} + normalizedRefs[normalizeIcebergPath(ref, bucketName, tablePath)] = struct{}{} + } + + // List actual files on filer in metadata/ and data/ directories + tableBasePath := path.Join(s3tables.TablesPath, bucketName, tablePath) + safetyThreshold := time.Now().Add(-time.Duration(config.OrphanOlderThanHours) * time.Hour) + orphanCount := 0 + + for _, subdir := range []string{"metadata", "data"} { + dirPath := path.Join(tableBasePath, subdir) + fileEntries, err := walkFilerEntries(ctx, filerClient, dirPath) + if err != nil { + glog.V(2).Infof("iceberg maintenance: cannot walk %s: %v", dirPath, err) + continue + } + + for _, fe := range fileEntries { + entry := fe.Entry + // Build relative path from the table base (e.g. "data/region=us/file.parquet") + fullPath := path.Join(fe.Dir, entry.Name) + relPath := strings.TrimPrefix(fullPath, tableBasePath+"/") + + _, isReferenced := normalizedRefs[relPath] + + if isReferenced { + continue + } + + // Check safety window — skip entries with unknown age + if entry.Attributes == nil { + continue + } + mtime := time.Unix(entry.Attributes.Mtime, 0) + if mtime.After(safetyThreshold) { + continue + } + + // Delete orphan + if delErr := deleteFilerFile(ctx, filerClient, fe.Dir, entry.Name); delErr != nil { + glog.Warningf("iceberg maintenance: failed to delete orphan %s/%s: %v", fe.Dir, entry.Name, delErr) + } else { + orphanCount++ + } + } + } + + return fmt.Sprintf("removed %d orphan file(s)", orphanCount), nil +} + +// --------------------------------------------------------------------------- +// Operation: Rewrite Manifests +// --------------------------------------------------------------------------- + +// rewriteManifests merges small manifests into fewer, larger ones. +func (h *Handler) rewriteManifests( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + config Config, +) (string, error) { + // Load current metadata + meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath) + if err != nil { + return "", fmt.Errorf("load metadata: %w", err) + } + + currentSnap := meta.CurrentSnapshot() + if currentSnap == nil || currentSnap.ManifestList == "" { + return "no current snapshot", nil + } + + // Read manifest list + manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList) + if err != nil { + return "", fmt.Errorf("read manifest list: %w", err) + } + + manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData)) + if err != nil { + return "", fmt.Errorf("parse manifest list: %w", err) + } + + if int64(len(manifests)) < config.MinInputFiles { + return fmt.Sprintf("only %d manifests, below threshold of %d", len(manifests), config.MinInputFiles), nil + } + + // Collect all entries from data manifests, grouped by partition spec ID + // so we write one merged manifest per spec (required for spec-evolved tables). + type specEntries struct { + specID int32 + spec iceberg.PartitionSpec + entries []iceberg.ManifestEntry + } + specMap := make(map[int32]*specEntries) + + // Build a lookup from spec ID to PartitionSpec + specByID := make(map[int]iceberg.PartitionSpec) + for _, ps := range meta.PartitionSpecs() { + specByID[ps.ID()] = ps + } + + for _, mf := range manifests { + if mf.ManifestContent() != iceberg.ManifestContentData { + continue + } + manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath()) + if err != nil { + return "", fmt.Errorf("read manifest %s: %w", mf.FilePath(), err) + } + entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) + if err != nil { + return "", fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err) + } + + sid := mf.PartitionSpecID() + se, ok := specMap[sid] + if !ok { + ps, found := specByID[int(sid)] + if !found { + return "", fmt.Errorf("partition spec %d not found in table metadata", sid) + } + se = &specEntries{specID: sid, spec: ps} + specMap[sid] = se + } + se.entries = append(se.entries, entries...) + } + + if len(specMap) == 0 { + return "no data entries to rewrite", nil + } + + schema := meta.CurrentSchema() + version := meta.Version() + snapshotID := currentSnap.SnapshotID + newSnapshotID := time.Now().UnixMilli() + newSeqNum := currentSnap.SequenceNumber + 1 + metaDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "metadata") + + // Track written artifacts so we can clean them up if the commit fails. + type artifact struct { + dir, fileName string + } + var writtenArtifacts []artifact + committed := false + + defer func() { + if committed || len(writtenArtifacts) == 0 { + return + } + cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + for _, a := range writtenArtifacts { + if err := deleteFilerFile(cleanupCtx, filerClient, a.dir, a.fileName); err != nil { + glog.Warningf("iceberg rewrite-manifests: failed to clean up artifact %s/%s: %v", a.dir, a.fileName, err) + } + } + }() + + // Write one merged manifest per partition spec + var newManifests []iceberg.ManifestFile + totalEntries := 0 + for _, se := range specMap { + totalEntries += len(se.entries) + manifestFileName := fmt.Sprintf("merged-%d-spec%d-%d.avro", newSnapshotID, se.specID, time.Now().UnixMilli()) + manifestPath := path.Join("metadata", manifestFileName) + + var manifestBuf bytes.Buffer + mergedManifest, err := iceberg.WriteManifest( + manifestPath, + &manifestBuf, + version, + se.spec, + schema, + newSnapshotID, + se.entries, + ) + if err != nil { + return "", fmt.Errorf("write merged manifest for spec %d: %w", se.specID, err) + } + + if err := saveFilerFile(ctx, filerClient, metaDir, manifestFileName, manifestBuf.Bytes()); err != nil { + return "", fmt.Errorf("save merged manifest for spec %d: %w", se.specID, err) + } + writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestFileName}) + newManifests = append(newManifests, mergedManifest) + } + + // Include any delete manifests that were not rewritten + for _, mf := range manifests { + if mf.ManifestContent() != iceberg.ManifestContentData { + newManifests = append(newManifests, mf) + } + } + + var manifestListBuf bytes.Buffer + err = iceberg.WriteManifestList(version, &manifestListBuf, newSnapshotID, &snapshotID, &newSeqNum, 0, newManifests) + if err != nil { + return "", fmt.Errorf("write manifest list: %w", err) + } + + // Save new manifest list + manifestListFileName := fmt.Sprintf("snap-%d-%d.avro", newSnapshotID, time.Now().UnixMilli()) + if err := saveFilerFile(ctx, filerClient, metaDir, manifestListFileName, manifestListBuf.Bytes()); err != nil { + return "", fmt.Errorf("save manifest list: %w", err) + } + writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestListFileName}) + + // Create new snapshot with the rewritten manifest list + manifestListLocation := path.Join("metadata", manifestListFileName) + + err = h.commitWithRetry(ctx, filerClient, bucketName, tablePath, metadataFileName, config, func(currentMeta table.Metadata, builder *table.MetadataBuilder) error { + // Guard: verify table head hasn't advanced since we planned. + // The merged manifest and manifest list were built against snapshotID; + // if the head moved, they reference stale state. + cs := currentMeta.CurrentSnapshot() + if cs == nil || cs.SnapshotID != snapshotID { + return errStalePlan + } + + newSnapshot := &table.Snapshot{ + SnapshotID: newSnapshotID, + ParentSnapshotID: &snapshotID, + SequenceNumber: cs.SequenceNumber + 1, + TimestampMs: time.Now().UnixMilli(), + ManifestList: manifestListLocation, + Summary: &table.Summary{ + Operation: table.OpReplace, + Properties: map[string]string{"maintenance": "rewrite_manifests"}, + }, + SchemaID: func() *int { + id := schema.ID + return &id + }(), + } + if err := builder.AddSnapshot(newSnapshot); err != nil { + return err + } + return builder.SetSnapshotRef( + table.MainBranch, + newSnapshotID, + table.BranchRef, + ) + }) + if err != nil { + return "", fmt.Errorf("commit manifest rewrite: %w", err) + } + + committed = true + return fmt.Sprintf("rewrote %d manifests into %d (%d entries)", len(manifests), len(specMap), totalEntries), nil +} + +// --------------------------------------------------------------------------- +// Commit Protocol with Retry +// --------------------------------------------------------------------------- + +// commitWithRetry implements optimistic concurrency for metadata updates. +// It reads the current metadata, applies the mutation, writes a new metadata +// file, and updates the table entry. On version conflict, it retries. +func (h *Handler) commitWithRetry( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath, currentMetadataFileName string, + config Config, + mutate func(currentMeta table.Metadata, builder *table.MetadataBuilder) error, +) error { + maxRetries := config.MaxCommitRetries + if maxRetries <= 0 || maxRetries > 20 { + maxRetries = defaultMaxCommitRetries + } + + for attempt := int64(0); attempt < maxRetries; attempt++ { + if attempt > 0 { + backoff := time.Duration(50*(1<<(attempt-1))) * time.Millisecond // exponential: 50ms, 100ms, 200ms, ... + const maxBackoff = 5 * time.Second + if backoff > maxBackoff { + backoff = maxBackoff + } + jitter := time.Duration(rand.Int64N(int64(backoff) / 5)) // 0–20% of backoff + timer := time.NewTimer(backoff + jitter) + select { + case <-timer.C: + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + } + } + + // Load current metadata + meta, metaFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath) + if err != nil { + return fmt.Errorf("load metadata (attempt %d): %w", attempt, err) + } + + // Build new metadata — pass the current metadata file path so the + // metadata log correctly records where the previous version lives. + currentMetaFilePath := path.Join("metadata", metaFileName) + builder, err := table.MetadataBuilderFromBase(meta, currentMetaFilePath) + if err != nil { + return fmt.Errorf("create metadata builder (attempt %d): %w", attempt, err) + } + + // Apply the mutation + if err := mutate(meta, builder); err != nil { + return fmt.Errorf("apply mutation (attempt %d): %w", attempt, err) + } + + if !builder.HasChanges() { + return nil // nothing to commit + } + + newMeta, err := builder.Build() + if err != nil { + return fmt.Errorf("build metadata (attempt %d): %w", attempt, err) + } + + // Serialize + metadataBytes, err := json.Marshal(newMeta) + if err != nil { + return fmt.Errorf("marshal metadata (attempt %d): %w", attempt, err) + } + + // Determine new metadata file name. Include a timestamp suffix so + // concurrent writers stage to distinct files instead of clobbering. + currentVersion := extractMetadataVersion(metaFileName) + newVersion := currentVersion + 1 + newMetadataFileName := fmt.Sprintf("v%d-%d.metadata.json", newVersion, time.Now().UnixNano()) + + // Save new metadata file + metaDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "metadata") + if err := saveFilerFile(ctx, filerClient, metaDir, newMetadataFileName, metadataBytes); err != nil { + return fmt.Errorf("save metadata file (attempt %d): %w", attempt, err) + } + + // Update the table entry's xattr with new metadata (CAS on version) + tableDir := path.Join(s3tables.TablesPath, bucketName, tablePath) + newMetadataLocation := path.Join("metadata", newMetadataFileName) + err = updateTableMetadataXattr(ctx, filerClient, tableDir, currentVersion, metadataBytes, newMetadataLocation) + if err != nil { + // Use a detached context for cleanup so staged files are removed + // even if the original context was canceled. + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 10*time.Second) + if !errors.Is(err, errMetadataVersionConflict) { + // Non-conflict error (permissions, transport, etc.): fail immediately. + _ = deleteFilerFile(cleanupCtx, filerClient, metaDir, newMetadataFileName) + cleanupCancel() + return fmt.Errorf("update table xattr (attempt %d): %w", attempt, err) + } + // Version conflict: clean up the new metadata file and retry + _ = deleteFilerFile(cleanupCtx, filerClient, metaDir, newMetadataFileName) + cleanupCancel() + if attempt < maxRetries-1 { + glog.V(1).Infof("iceberg maintenance: version conflict on %s/%s, retrying (attempt %d)", bucketName, tablePath, attempt) + continue + } + return fmt.Errorf("update table xattr (attempt %d): %w", attempt, err) + } + + return nil + } + + return fmt.Errorf("exceeded max commit retries (%d)", maxRetries) +} diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index ad5b7bdf7..3012cc450 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -180,9 +180,9 @@ func (h *VacuumHandler) Detect(ctx context.Context, request *plugin_pb.RunDetect } workerConfig := deriveVacuumConfig(request.GetWorkerConfigValues()) - if shouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) { + if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) { minInterval := time.Duration(workerConfig.MinIntervalSeconds) * time.Second - _ = sender.SendActivity(buildDetectorActivity( + _ = sender.SendActivity(BuildDetectorActivity( "skipped_by_interval", fmt.Sprintf("VACUUM: Detection skipped due to min interval (%s)", minInterval), map[string]*plugin_pb.ConfigValue{ @@ -311,7 +311,7 @@ func emitVacuumDetectionDecisionTrace( ) } - if err := sender.SendActivity(buildDetectorActivity(summaryStage, summaryMessage, map[string]*plugin_pb.ConfigValue{ + if err := sender.SendActivity(BuildDetectorActivity(summaryStage, summaryMessage, map[string]*plugin_pb.ConfigValue{ "total_volumes": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalVolumes)}, }, @@ -351,7 +351,7 @@ func emitVacuumDetectionDecisionTrace( metric.Age.Truncate(time.Minute), minVolumeAge.Truncate(time.Minute), ) - if err := sender.SendActivity(buildDetectorActivity("decision_volume", message, map[string]*plugin_pb.ConfigValue{ + if err := sender.SendActivity(BuildDetectorActivity("decision_volume", message, map[string]*plugin_pb.ConfigValue{ "volume_id": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(metric.VolumeID)}, }, @@ -429,7 +429,7 @@ func (h *VacuumHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJ Stage: stage, Message: message, Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity(stage, message), + BuildExecutorActivity(stage, message), }, }) }) @@ -442,7 +442,7 @@ func (h *VacuumHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJ Stage: "assigned", Message: "vacuum job accepted", Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("assigned", "vacuum job accepted"), + BuildExecutorActivity("assigned", "vacuum job accepted"), }, }); err != nil { return err @@ -457,7 +457,7 @@ func (h *VacuumHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJ Stage: "failed", Message: err.Error(), Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("failed", err.Error()), + BuildExecutorActivity("failed", err.Error()), }, }) return err @@ -480,7 +480,7 @@ func (h *VacuumHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJ }, }, Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("completed", resultSummary), + BuildExecutorActivity("completed", resultSummary), }, }) } @@ -709,7 +709,9 @@ func mapTaskPriority(priority workertypes.TaskPriority) plugin_pb.JobPriority { } } -func shouldSkipDetectionByInterval(lastSuccessfulRun *timestamppb.Timestamp, minIntervalSeconds int) bool { +// ShouldSkipDetectionByInterval returns true when less than minIntervalSeconds +// have elapsed since lastSuccessfulRun. Exported so sub-packages can reuse it. +func ShouldSkipDetectionByInterval(lastSuccessfulRun *timestamppb.Timestamp, minIntervalSeconds int) bool { if lastSuccessfulRun == nil || minIntervalSeconds <= 0 { return false } @@ -720,7 +722,8 @@ func shouldSkipDetectionByInterval(lastSuccessfulRun *timestamppb.Timestamp, min return time.Since(lastRun) < time.Duration(minIntervalSeconds)*time.Second } -func buildExecutorActivity(stage string, message string) *plugin_pb.ActivityEvent { +// BuildExecutorActivity creates an executor activity event. Exported for sub-packages. +func BuildExecutorActivity(stage string, message string) *plugin_pb.ActivityEvent { return &plugin_pb.ActivityEvent{ Source: plugin_pb.ActivitySource_ACTIVITY_SOURCE_EXECUTOR, Stage: stage, @@ -729,7 +732,8 @@ func buildExecutorActivity(stage string, message string) *plugin_pb.ActivityEven } } -func buildDetectorActivity(stage string, message string, details map[string]*plugin_pb.ConfigValue) *plugin_pb.ActivityEvent { +// BuildDetectorActivity creates a detector activity event. Exported for sub-packages. +func BuildDetectorActivity(stage string, message string, details map[string]*plugin_pb.ConfigValue) *plugin_pb.ActivityEvent { return &plugin_pb.ActivityEvent{ Source: plugin_pb.ActivitySource_ACTIVITY_SOURCE_DETECTOR, Stage: stage, diff --git a/weed/plugin/worker/vacuum_handler_test.go b/weed/plugin/worker/vacuum_handler_test.go index 05454266d..346b592c0 100644 --- a/weed/plugin/worker/vacuum_handler_test.go +++ b/weed/plugin/worker/vacuum_handler_test.go @@ -122,20 +122,20 @@ func TestMasterAddressCandidates(t *testing.T) { } func TestShouldSkipDetectionByInterval(t *testing.T) { - if shouldSkipDetectionByInterval(nil, 10) { + if ShouldSkipDetectionByInterval(nil, 10) { t.Fatalf("expected false when timestamp is nil") } - if shouldSkipDetectionByInterval(timestamppb.Now(), 0) { + if ShouldSkipDetectionByInterval(timestamppb.Now(), 0) { t.Fatalf("expected false when min interval is zero") } recent := timestamppb.New(time.Now().Add(-5 * time.Second)) - if !shouldSkipDetectionByInterval(recent, 10) { + if !ShouldSkipDetectionByInterval(recent, 10) { t.Fatalf("expected true for recent successful run") } old := timestamppb.New(time.Now().Add(-30 * time.Second)) - if shouldSkipDetectionByInterval(old, 10) { + if ShouldSkipDetectionByInterval(old, 10) { t.Fatalf("expected false for old successful run") } } @@ -182,7 +182,7 @@ func TestVacuumHandlerDetectSkipsByMinInterval(t *testing.T) { } func TestBuildExecutorActivity(t *testing.T) { - activity := buildExecutorActivity("running", "vacuum in progress") + activity := BuildExecutorActivity("running", "vacuum in progress") if activity == nil { t.Fatalf("expected non-nil activity") } diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index d040bbef8..dba90e9d3 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -176,9 +176,9 @@ func (h *VolumeBalanceHandler) Detect( } workerConfig := deriveBalanceWorkerConfig(request.GetWorkerConfigValues()) - if shouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) { + if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) { minInterval := time.Duration(workerConfig.MinIntervalSeconds) * time.Second - _ = sender.SendActivity(buildDetectorActivity( + _ = sender.SendActivity(BuildDetectorActivity( "skipped_by_interval", fmt.Sprintf("VOLUME BALANCE: Detection skipped due to min interval (%s)", minInterval), map[string]*plugin_pb.ConfigValue{ @@ -284,7 +284,7 @@ func emitVolumeBalanceDetectionDecisionTrace( ) } - if err := sender.SendActivity(buildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{ + if err := sender.SendActivity(BuildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{ "total_volumes": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalVolumes)}, }, @@ -331,7 +331,7 @@ func emitVolumeBalanceDetectionDecisionTrace( volumeCount, minVolumeCount, ) - if err := sender.SendActivity(buildDetectorActivity("decision_disk_type", message, map[string]*plugin_pb.ConfigValue{ + if err := sender.SendActivity(BuildDetectorActivity("decision_disk_type", message, map[string]*plugin_pb.ConfigValue{ "disk_type": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: diskType}, }, @@ -362,7 +362,7 @@ func emitVolumeBalanceDetectionDecisionTrace( len(serverVolumeCounts), taskConfig.MinServerCount, ) - if err := sender.SendActivity(buildDetectorActivity("decision_disk_type", message, map[string]*plugin_pb.ConfigValue{ + if err := sender.SendActivity(BuildDetectorActivity("decision_disk_type", message, map[string]*plugin_pb.ConfigValue{ "disk_type": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: diskType}, }, @@ -433,7 +433,7 @@ func emitVolumeBalanceDetectionDecisionTrace( ) } - if err := sender.SendActivity(buildDetectorActivity(stage, message, map[string]*plugin_pb.ConfigValue{ + if err := sender.SendActivity(BuildDetectorActivity(stage, message, map[string]*plugin_pb.ConfigValue{ "disk_type": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: diskType}, }, @@ -534,7 +534,7 @@ func (h *VolumeBalanceHandler) Execute( Stage: stage, Message: message, Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity(stage, message), + BuildExecutorActivity(stage, message), }, }) }) @@ -547,7 +547,7 @@ func (h *VolumeBalanceHandler) Execute( Stage: "assigned", Message: "volume balance job accepted", Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("assigned", "volume balance job accepted"), + BuildExecutorActivity("assigned", "volume balance job accepted"), }, }); err != nil { return err @@ -562,7 +562,7 @@ func (h *VolumeBalanceHandler) Execute( Stage: "failed", Message: err.Error(), Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("failed", err.Error()), + BuildExecutorActivity("failed", err.Error()), }, }) return err @@ -591,7 +591,7 @@ func (h *VolumeBalanceHandler) Execute( }, }, Activities: []*plugin_pb.ActivityEvent{ - buildExecutorActivity("completed", resultSummary), + BuildExecutorActivity("completed", resultSummary), }, }) }