You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

294 lines
10 KiB

package iceberg
import (
"fmt"
"strconv"
"strings"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
)
const (
jobType = "iceberg_maintenance"
defaultSnapshotRetentionHours = 168 // 7 days
defaultMaxSnapshotsToKeep = 5
defaultOrphanOlderThanHours = 72
defaultMaxCommitRetries = 5
defaultTargetFileSizeMB = 256
defaultMinInputFiles = 5
defaultDeleteTargetFileSizeMB = 64
defaultDeleteMinInputFiles = 2
defaultDeleteMaxGroupSizeMB = 256
defaultDeleteMaxOutputFiles = 8
defaultRewriteStrategy = "binpack"
defaultMinManifestsToRewrite = 5
minManifestsToRewrite = 2
defaultOperations = "all"
// Metric keys returned by maintenance operations.
MetricFilesMerged = "files_merged"
MetricFilesWritten = "files_written"
MetricBins = "bins"
MetricSnapshotsExpired = "snapshots_expired"
MetricFilesDeleted = "files_deleted"
MetricOrphansRemoved = "orphans_removed"
MetricManifestsRewritten = "manifests_rewritten"
MetricDeleteFilesRewritten = "delete_files_rewritten"
MetricDeleteFilesWritten = "delete_files_written"
MetricDeleteBytesRewritten = "delete_bytes_rewritten"
MetricDeleteGroupsPlanned = "delete_groups_planned"
MetricDeleteGroupsSkipped = "delete_groups_skipped"
MetricEntriesTotal = "entries_total"
MetricDurationMs = "duration_ms"
)
const bytesPerMB int64 = 1024 * 1024
// Config holds parsed worker config values.
type Config struct {
SnapshotRetentionHours int64
MaxSnapshotsToKeep int64
OrphanOlderThanHours int64
MaxCommitRetries int64
TargetFileSizeBytes int64
MinInputFiles int64
DeleteTargetFileSizeBytes int64
DeleteMinInputFiles int64
DeleteMaxFileGroupSizeBytes int64
DeleteMaxOutputFiles int64
MinManifestsToRewrite int64
Operations string
ApplyDeletes bool
Where string
RewriteStrategy string
SortMaxInputBytes int64
}
// ParseConfig extracts an iceberg maintenance Config from plugin config values.
// Values are clamped to safe minimums to prevent misconfiguration.
func ParseConfig(values map[string]*plugin_pb.ConfigValue) Config {
cfg := Config{
SnapshotRetentionHours: readInt64Config(values, "snapshot_retention_hours", defaultSnapshotRetentionHours),
MaxSnapshotsToKeep: readInt64Config(values, "max_snapshots_to_keep", defaultMaxSnapshotsToKeep),
OrphanOlderThanHours: readInt64Config(values, "orphan_older_than_hours", defaultOrphanOlderThanHours),
MaxCommitRetries: readInt64Config(values, "max_commit_retries", defaultMaxCommitRetries),
TargetFileSizeBytes: readSizeMBConfig(values, "target_file_size_mb", defaultTargetFileSizeMB),
MinInputFiles: readInt64Config(values, "min_input_files", defaultMinInputFiles),
DeleteTargetFileSizeBytes: readSizeMBConfig(values, "delete_target_file_size_mb", defaultDeleteTargetFileSizeMB),
DeleteMinInputFiles: readInt64Config(values, "delete_min_input_files", defaultDeleteMinInputFiles),
DeleteMaxFileGroupSizeBytes: readSizeMBConfig(values, "delete_max_file_group_size_mb", defaultDeleteMaxGroupSizeMB),
DeleteMaxOutputFiles: readInt64Config(values, "delete_max_output_files", defaultDeleteMaxOutputFiles),
MinManifestsToRewrite: readInt64Config(values, "min_manifests_to_rewrite", defaultMinManifestsToRewrite),
Operations: readStringConfig(values, "operations", defaultOperations),
ApplyDeletes: readBoolConfig(values, "apply_deletes", true),
Where: strings.TrimSpace(readStringConfig(values, "where", "")),
RewriteStrategy: strings.TrimSpace(strings.ToLower(readStringConfig(values, "rewrite_strategy", defaultRewriteStrategy))),
SortMaxInputBytes: readSizeMBConfig(values, "sort_max_input_mb", 0),
}
// Clamp the fields that are always defaulted by worker config parsing.
if cfg.SnapshotRetentionHours <= 0 {
cfg.SnapshotRetentionHours = defaultSnapshotRetentionHours
}
if cfg.MaxSnapshotsToKeep <= 0 {
cfg.MaxSnapshotsToKeep = defaultMaxSnapshotsToKeep
}
if cfg.MaxCommitRetries <= 0 {
cfg.MaxCommitRetries = defaultMaxCommitRetries
}
cfg = applyThresholdDefaults(cfg)
return cfg
}
func applyThresholdDefaults(cfg Config) Config {
if cfg.OrphanOlderThanHours <= 0 {
cfg.OrphanOlderThanHours = defaultOrphanOlderThanHours
}
if cfg.TargetFileSizeBytes <= 0 {
cfg.TargetFileSizeBytes = defaultTargetFileSizeMB * 1024 * 1024
}
if cfg.MinInputFiles < 2 {
cfg.MinInputFiles = defaultMinInputFiles
}
if cfg.DeleteTargetFileSizeBytes <= 0 {
cfg.DeleteTargetFileSizeBytes = defaultDeleteTargetFileSizeMB * 1024 * 1024
}
if cfg.DeleteMinInputFiles < 2 {
cfg.DeleteMinInputFiles = defaultDeleteMinInputFiles
}
if cfg.DeleteMaxFileGroupSizeBytes <= 0 {
cfg.DeleteMaxFileGroupSizeBytes = defaultDeleteMaxGroupSizeMB * 1024 * 1024
}
if cfg.DeleteMaxOutputFiles <= 0 {
cfg.DeleteMaxOutputFiles = defaultDeleteMaxOutputFiles
}
if cfg.RewriteStrategy == "" {
cfg.RewriteStrategy = defaultRewriteStrategy
}
if cfg.RewriteStrategy != "binpack" && cfg.RewriteStrategy != "sort" {
cfg.RewriteStrategy = defaultRewriteStrategy
}
if cfg.SortMaxInputBytes < 0 {
cfg.SortMaxInputBytes = 0
}
if cfg.MinManifestsToRewrite < minManifestsToRewrite {
cfg.MinManifestsToRewrite = minManifestsToRewrite
}
return cfg
}
func readSizeMBConfig(values map[string]*plugin_pb.ConfigValue, field string, fallbackMB int64) int64 {
mb := readInt64Config(values, field, fallbackMB)
if mb <= 0 {
return 0
}
maxMB := int64(^uint64(0)>>1) / bytesPerMB
if mb > maxMB {
glog.V(1).Infof("readSizeMBConfig: clamping %q from %d MB to %d MB", field, mb, maxMB)
mb = maxMB
}
return mb * bytesPerMB
}
// parseOperations returns the ordered list of maintenance operations to execute.
// Order follows Iceberg best practices: compact → rewrite_position_delete_files
// → expire_snapshots → remove_orphans → rewrite_manifests.
// Returns an error if any unknown operation is specified or the result would be empty.
func parseOperations(ops string) ([]string, error) {
ops = strings.TrimSpace(strings.ToLower(ops))
if ops == "" || ops == "all" {
return []string{"compact", "rewrite_position_delete_files", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, nil
}
validOps := map[string]struct{}{
"compact": {},
"rewrite_position_delete_files": {},
"expire_snapshots": {},
"remove_orphans": {},
"rewrite_manifests": {},
}
requested := make(map[string]struct{})
for _, op := range strings.Split(ops, ",") {
op = strings.TrimSpace(op)
if op == "" {
continue
}
if _, ok := validOps[op]; !ok {
return nil, fmt.Errorf("unknown maintenance operation %q (valid: compact, rewrite_position_delete_files, expire_snapshots, remove_orphans, rewrite_manifests)", op)
}
requested[op] = struct{}{}
}
// Return in canonical order: compact → rewrite_position_delete_files →
// expire_snapshots → remove_orphans → rewrite_manifests
canonicalOrder := []string{"compact", "rewrite_position_delete_files", "expire_snapshots", "remove_orphans", "rewrite_manifests"}
var result []string
for _, op := range canonicalOrder {
if _, ok := requested[op]; ok {
result = append(result, op)
}
}
if len(result) == 0 {
return nil, fmt.Errorf("no valid maintenance operations specified")
}
return result, nil
}
func extractMetadataVersion(metadataFileName string) int {
// Parse "v3.metadata.json" or "v3-{nonce}.metadata.json" → 3
name := strings.TrimPrefix(metadataFileName, "v")
name = strings.TrimSuffix(name, ".metadata.json")
// Strip any nonce suffix (e.g. "3-1709766000" → "3")
if dashIdx := strings.Index(name, "-"); dashIdx > 0 {
name = name[:dashIdx]
}
version, _ := strconv.Atoi(name)
return version
}
// readStringConfig reads a string value from plugin config, with fallback.
func readStringConfig(values map[string]*plugin_pb.ConfigValue, field string, fallback string) string {
if values == nil {
return fallback
}
value := values[field]
if value == nil {
return fallback
}
switch kind := value.Kind.(type) {
case *plugin_pb.ConfigValue_StringValue:
return kind.StringValue
case *plugin_pb.ConfigValue_Int64Value:
return strconv.FormatInt(kind.Int64Value, 10)
case *plugin_pb.ConfigValue_DoubleValue:
return strconv.FormatFloat(kind.DoubleValue, 'f', -1, 64)
case *plugin_pb.ConfigValue_BoolValue:
return strconv.FormatBool(kind.BoolValue)
default:
glog.V(1).Infof("readStringConfig: unexpected config value type %T for field %q, using fallback", value.Kind, field)
}
return fallback
}
// readBoolConfig reads a bool value from plugin config, with fallback.
func readBoolConfig(values map[string]*plugin_pb.ConfigValue, field string, fallback bool) bool {
if values == nil {
return fallback
}
value := values[field]
if value == nil {
return fallback
}
switch kind := value.Kind.(type) {
case *plugin_pb.ConfigValue_BoolValue:
return kind.BoolValue
case *plugin_pb.ConfigValue_StringValue:
s := strings.TrimSpace(strings.ToLower(kind.StringValue))
if s == "true" || s == "1" || s == "yes" {
return true
}
if s == "false" || s == "0" || s == "no" {
return false
}
glog.V(1).Infof("readBoolConfig: unrecognized string value %q for field %q, using fallback %v", kind.StringValue, field, fallback)
case *plugin_pb.ConfigValue_Int64Value:
return kind.Int64Value != 0
default:
glog.V(1).Infof("readBoolConfig: unexpected config value type %T for field %q, using fallback", value.Kind, field)
}
return fallback
}
// readInt64Config reads an int64 value from plugin config, with fallback.
func readInt64Config(values map[string]*plugin_pb.ConfigValue, field string, fallback int64) int64 {
if values == nil {
return fallback
}
value := values[field]
if value == nil {
return fallback
}
switch kind := value.Kind.(type) {
case *plugin_pb.ConfigValue_Int64Value:
return kind.Int64Value
case *plugin_pb.ConfigValue_DoubleValue:
return int64(kind.DoubleValue)
case *plugin_pb.ConfigValue_StringValue:
parsed, err := strconv.ParseInt(strings.TrimSpace(kind.StringValue), 10, 64)
if err == nil {
return parsed
}
case *plugin_pb.ConfigValue_BoolValue:
if kind.BoolValue {
return 1
}
return 0
default:
glog.V(1).Infof("readInt64Config: unexpected config value type %T for field %q, using fallback", value.Kind, field)
}
return fallback
}