package iceberg import ( "fmt" "strconv" "strings" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" ) const ( jobType = "iceberg_maintenance" defaultSnapshotRetentionHours = 168 // 7 days defaultMaxSnapshotsToKeep = 5 defaultOrphanOlderThanHours = 72 defaultMaxCommitRetries = 5 defaultTargetFileSizeMB = 256 defaultMinInputFiles = 5 defaultDeleteTargetFileSizeMB = 64 defaultDeleteMinInputFiles = 2 defaultDeleteMaxGroupSizeMB = 256 defaultDeleteMaxOutputFiles = 8 defaultRewriteStrategy = "binpack" defaultMinManifestsToRewrite = 5 minManifestsToRewrite = 2 defaultOperations = "all" // Metric keys returned by maintenance operations. MetricFilesMerged = "files_merged" MetricFilesWritten = "files_written" MetricBins = "bins" MetricSnapshotsExpired = "snapshots_expired" MetricFilesDeleted = "files_deleted" MetricOrphansRemoved = "orphans_removed" MetricManifestsRewritten = "manifests_rewritten" MetricDeleteFilesRewritten = "delete_files_rewritten" MetricDeleteFilesWritten = "delete_files_written" MetricDeleteBytesRewritten = "delete_bytes_rewritten" MetricDeleteGroupsPlanned = "delete_groups_planned" MetricDeleteGroupsSkipped = "delete_groups_skipped" MetricEntriesTotal = "entries_total" MetricDurationMs = "duration_ms" ) const bytesPerMB int64 = 1024 * 1024 // Config holds parsed worker config values. type Config struct { SnapshotRetentionHours int64 MaxSnapshotsToKeep int64 OrphanOlderThanHours int64 MaxCommitRetries int64 TargetFileSizeBytes int64 MinInputFiles int64 DeleteTargetFileSizeBytes int64 DeleteMinInputFiles int64 DeleteMaxFileGroupSizeBytes int64 DeleteMaxOutputFiles int64 MinManifestsToRewrite int64 Operations string ApplyDeletes bool Where string RewriteStrategy string SortMaxInputBytes int64 } // ParseConfig extracts an iceberg maintenance Config from plugin config values. // Values are clamped to safe minimums to prevent misconfiguration. func ParseConfig(values map[string]*plugin_pb.ConfigValue) Config { cfg := Config{ SnapshotRetentionHours: readInt64Config(values, "snapshot_retention_hours", defaultSnapshotRetentionHours), MaxSnapshotsToKeep: readInt64Config(values, "max_snapshots_to_keep", defaultMaxSnapshotsToKeep), OrphanOlderThanHours: readInt64Config(values, "orphan_older_than_hours", defaultOrphanOlderThanHours), MaxCommitRetries: readInt64Config(values, "max_commit_retries", defaultMaxCommitRetries), TargetFileSizeBytes: readSizeMBConfig(values, "target_file_size_mb", defaultTargetFileSizeMB), MinInputFiles: readInt64Config(values, "min_input_files", defaultMinInputFiles), DeleteTargetFileSizeBytes: readSizeMBConfig(values, "delete_target_file_size_mb", defaultDeleteTargetFileSizeMB), DeleteMinInputFiles: readInt64Config(values, "delete_min_input_files", defaultDeleteMinInputFiles), DeleteMaxFileGroupSizeBytes: readSizeMBConfig(values, "delete_max_file_group_size_mb", defaultDeleteMaxGroupSizeMB), DeleteMaxOutputFiles: readInt64Config(values, "delete_max_output_files", defaultDeleteMaxOutputFiles), MinManifestsToRewrite: readInt64Config(values, "min_manifests_to_rewrite", defaultMinManifestsToRewrite), Operations: readStringConfig(values, "operations", defaultOperations), ApplyDeletes: readBoolConfig(values, "apply_deletes", true), Where: strings.TrimSpace(readStringConfig(values, "where", "")), RewriteStrategy: strings.TrimSpace(strings.ToLower(readStringConfig(values, "rewrite_strategy", defaultRewriteStrategy))), SortMaxInputBytes: readSizeMBConfig(values, "sort_max_input_mb", 0), } // Clamp the fields that are always defaulted by worker config parsing. if cfg.SnapshotRetentionHours <= 0 { cfg.SnapshotRetentionHours = defaultSnapshotRetentionHours } if cfg.MaxSnapshotsToKeep <= 0 { cfg.MaxSnapshotsToKeep = defaultMaxSnapshotsToKeep } if cfg.MaxCommitRetries <= 0 { cfg.MaxCommitRetries = defaultMaxCommitRetries } cfg = applyThresholdDefaults(cfg) return cfg } func applyThresholdDefaults(cfg Config) Config { if cfg.OrphanOlderThanHours <= 0 { cfg.OrphanOlderThanHours = defaultOrphanOlderThanHours } if cfg.TargetFileSizeBytes <= 0 { cfg.TargetFileSizeBytes = defaultTargetFileSizeMB * 1024 * 1024 } if cfg.MinInputFiles < 2 { cfg.MinInputFiles = defaultMinInputFiles } if cfg.DeleteTargetFileSizeBytes <= 0 { cfg.DeleteTargetFileSizeBytes = defaultDeleteTargetFileSizeMB * 1024 * 1024 } if cfg.DeleteMinInputFiles < 2 { cfg.DeleteMinInputFiles = defaultDeleteMinInputFiles } if cfg.DeleteMaxFileGroupSizeBytes <= 0 { cfg.DeleteMaxFileGroupSizeBytes = defaultDeleteMaxGroupSizeMB * 1024 * 1024 } if cfg.DeleteMaxOutputFiles <= 0 { cfg.DeleteMaxOutputFiles = defaultDeleteMaxOutputFiles } if cfg.RewriteStrategy == "" { cfg.RewriteStrategy = defaultRewriteStrategy } if cfg.RewriteStrategy != "binpack" && cfg.RewriteStrategy != "sort" { cfg.RewriteStrategy = defaultRewriteStrategy } if cfg.SortMaxInputBytes < 0 { cfg.SortMaxInputBytes = 0 } if cfg.MinManifestsToRewrite < minManifestsToRewrite { cfg.MinManifestsToRewrite = minManifestsToRewrite } return cfg } func readSizeMBConfig(values map[string]*plugin_pb.ConfigValue, field string, fallbackMB int64) int64 { mb := readInt64Config(values, field, fallbackMB) if mb <= 0 { return 0 } maxMB := int64(^uint64(0)>>1) / bytesPerMB if mb > maxMB { glog.V(1).Infof("readSizeMBConfig: clamping %q from %d MB to %d MB", field, mb, maxMB) mb = maxMB } return mb * bytesPerMB } // parseOperations returns the ordered list of maintenance operations to execute. // Order follows Iceberg best practices: compact → rewrite_position_delete_files // → expire_snapshots → remove_orphans → rewrite_manifests. // Returns an error if any unknown operation is specified or the result would be empty. func parseOperations(ops string) ([]string, error) { ops = strings.TrimSpace(strings.ToLower(ops)) if ops == "" || ops == "all" { return []string{"compact", "rewrite_position_delete_files", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, nil } validOps := map[string]struct{}{ "compact": {}, "rewrite_position_delete_files": {}, "expire_snapshots": {}, "remove_orphans": {}, "rewrite_manifests": {}, } requested := make(map[string]struct{}) for _, op := range strings.Split(ops, ",") { op = strings.TrimSpace(op) if op == "" { continue } if _, ok := validOps[op]; !ok { return nil, fmt.Errorf("unknown maintenance operation %q (valid: compact, rewrite_position_delete_files, expire_snapshots, remove_orphans, rewrite_manifests)", op) } requested[op] = struct{}{} } // Return in canonical order: compact → rewrite_position_delete_files → // expire_snapshots → remove_orphans → rewrite_manifests canonicalOrder := []string{"compact", "rewrite_position_delete_files", "expire_snapshots", "remove_orphans", "rewrite_manifests"} var result []string for _, op := range canonicalOrder { if _, ok := requested[op]; ok { result = append(result, op) } } if len(result) == 0 { return nil, fmt.Errorf("no valid maintenance operations specified") } return result, nil } func extractMetadataVersion(metadataFileName string) int { // Parse "v3.metadata.json" or "v3-{nonce}.metadata.json" → 3 name := strings.TrimPrefix(metadataFileName, "v") name = strings.TrimSuffix(name, ".metadata.json") // Strip any nonce suffix (e.g. "3-1709766000" → "3") if dashIdx := strings.Index(name, "-"); dashIdx > 0 { name = name[:dashIdx] } version, _ := strconv.Atoi(name) return version } // readStringConfig reads a string value from plugin config, with fallback. func readStringConfig(values map[string]*plugin_pb.ConfigValue, field string, fallback string) string { if values == nil { return fallback } value := values[field] if value == nil { return fallback } switch kind := value.Kind.(type) { case *plugin_pb.ConfigValue_StringValue: return kind.StringValue case *plugin_pb.ConfigValue_Int64Value: return strconv.FormatInt(kind.Int64Value, 10) case *plugin_pb.ConfigValue_DoubleValue: return strconv.FormatFloat(kind.DoubleValue, 'f', -1, 64) case *plugin_pb.ConfigValue_BoolValue: return strconv.FormatBool(kind.BoolValue) default: glog.V(1).Infof("readStringConfig: unexpected config value type %T for field %q, using fallback", value.Kind, field) } return fallback } // readBoolConfig reads a bool value from plugin config, with fallback. func readBoolConfig(values map[string]*plugin_pb.ConfigValue, field string, fallback bool) bool { if values == nil { return fallback } value := values[field] if value == nil { return fallback } switch kind := value.Kind.(type) { case *plugin_pb.ConfigValue_BoolValue: return kind.BoolValue case *plugin_pb.ConfigValue_StringValue: s := strings.TrimSpace(strings.ToLower(kind.StringValue)) if s == "true" || s == "1" || s == "yes" { return true } if s == "false" || s == "0" || s == "no" { return false } glog.V(1).Infof("readBoolConfig: unrecognized string value %q for field %q, using fallback %v", kind.StringValue, field, fallback) case *plugin_pb.ConfigValue_Int64Value: return kind.Int64Value != 0 default: glog.V(1).Infof("readBoolConfig: unexpected config value type %T for field %q, using fallback", value.Kind, field) } return fallback } // readInt64Config reads an int64 value from plugin config, with fallback. func readInt64Config(values map[string]*plugin_pb.ConfigValue, field string, fallback int64) int64 { if values == nil { return fallback } value := values[field] if value == nil { return fallback } switch kind := value.Kind.(type) { case *plugin_pb.ConfigValue_Int64Value: return kind.Int64Value case *plugin_pb.ConfigValue_DoubleValue: return int64(kind.DoubleValue) case *plugin_pb.ConfigValue_StringValue: parsed, err := strconv.ParseInt(strings.TrimSpace(kind.StringValue), 10, 64) if err == nil { return parsed } case *plugin_pb.ConfigValue_BoolValue: if kind.BoolValue { return 1 } return 0 default: glog.V(1).Infof("readInt64Config: unexpected config value type %T for field %q, using fallback", value.Kind, field) } return fallback }