You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

572 lines
17 KiB

package iceberg
import (
"bytes"
"context"
"fmt"
"path"
"strings"
"time"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
)
// tableInfo captures metadata about a table for detection/execution.
type tableInfo struct {
BucketName string
Namespace string
TableName string
TablePath string // namespace/tableName
MetadataFileName string
Metadata table.Metadata
}
// scanTablesForMaintenance enumerates table buckets and their tables,
// evaluating which ones need maintenance based on metadata thresholds.
// When limit > 0 the scan stops after collecting limit+1 results so the
// caller can determine whether more tables remain (HasMore).
func (h *Handler) scanTablesForMaintenance(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
config Config,
bucketFilter, namespaceFilter, tableFilter string,
limit int,
) ([]tableInfo, error) {
var tables []tableInfo
ops, err := parseOperations(config.Operations)
if err != nil {
return nil, fmt.Errorf("parse operations: %w", err)
}
// Compile wildcard matchers once (nil = match all)
bucketMatchers := wildcard.CompileWildcardMatchers(bucketFilter)
nsMatchers := wildcard.CompileWildcardMatchers(namespaceFilter)
tableMatchers := wildcard.CompileWildcardMatchers(tableFilter)
bucketsPath := s3tables.TablesPath
bucketEntries, err := listFilerEntries(ctx, filerClient, bucketsPath, "")
if err != nil {
return nil, fmt.Errorf("list buckets: %w", err)
}
for _, bucketEntry := range bucketEntries {
select {
case <-ctx.Done():
return tables, ctx.Err()
default:
}
if !bucketEntry.IsDirectory || !s3tables.IsTableBucketEntry(bucketEntry) {
continue
}
bucketName := bucketEntry.Name
if !wildcard.MatchesAnyWildcard(bucketMatchers, bucketName) {
continue
}
// List namespaces within the bucket
bucketPath := path.Join(bucketsPath, bucketName)
nsEntries, err := listFilerEntries(ctx, filerClient, bucketPath, "")
if err != nil {
glog.Warningf("iceberg maintenance: failed to list namespaces in bucket %s: %v", bucketName, err)
continue
}
for _, nsEntry := range nsEntries {
select {
case <-ctx.Done():
return tables, ctx.Err()
default:
}
if !nsEntry.IsDirectory {
continue
}
nsName := nsEntry.Name
if !wildcard.MatchesAnyWildcard(nsMatchers, nsName) {
continue
}
// Skip internal directories
if strings.HasPrefix(nsName, ".") {
continue
}
// List tables within the namespace
nsPath := path.Join(bucketPath, nsName)
tableEntries, err := listFilerEntries(ctx, filerClient, nsPath, "")
if err != nil {
glog.Warningf("iceberg maintenance: failed to list tables in %s/%s: %v", bucketName, nsName, err)
continue
}
for _, tableEntry := range tableEntries {
if !tableEntry.IsDirectory {
continue
}
tblName := tableEntry.Name
if !wildcard.MatchesAnyWildcard(tableMatchers, tblName) {
continue
}
// Check if this entry has table metadata
metadataBytes, ok := tableEntry.Extended[s3tables.ExtendedKeyMetadata]
if !ok || len(metadataBytes) == 0 {
continue
}
icebergMeta, metadataFileName, planningIndex, err := parseTableMetadataEnvelope(metadataBytes)
if err != nil {
glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot parse iceberg metadata: %v", bucketName, nsName, tblName, err)
continue
}
tablePath := path.Join(nsName, tblName)
needsWork, err := h.tableNeedsMaintenance(ctx, filerClient, bucketName, tablePath, icebergMeta, metadataFileName, planningIndex, config, ops)
if err != nil {
glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot evaluate maintenance need: %v", bucketName, nsName, tblName, err)
continue
}
if needsWork {
tables = append(tables, tableInfo{
BucketName: bucketName,
Namespace: nsName,
TableName: tblName,
TablePath: tablePath,
MetadataFileName: metadataFileName,
Metadata: icebergMeta,
})
if limit > 0 && len(tables) > limit {
return tables, nil
}
}
}
}
}
return tables, nil
}
func normalizeDetectionConfig(config Config) Config {
config = applyThresholdDefaults(config)
if config.SnapshotRetentionHours <= 0 {
config.SnapshotRetentionHours = defaultSnapshotRetentionHours
}
if config.MaxSnapshotsToKeep <= 0 {
config.MaxSnapshotsToKeep = defaultMaxSnapshotsToKeep
}
return config
}
func (h *Handler) tableNeedsMaintenance(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
meta table.Metadata,
metadataFileName string,
cachedPlanningIndex *planningIndex,
config Config,
ops []string,
) (bool, error) {
config = normalizeDetectionConfig(config)
var predicate *partitionPredicate
if strings.TrimSpace(config.Where) != "" {
needsPredicate := false
for _, op := range ops {
if op == "compact" || op == "rewrite_position_delete_files" || op == "rewrite_manifests" {
needsPredicate = true
break
}
}
if needsPredicate {
var err error
predicate, err = parsePartitionPredicate(config.Where, meta)
if err != nil {
return false, err
}
}
}
_ = predicate // used by rewrite_position_delete_files; planning index handles compact/rewrite_manifests
// Evaluate the metadata-only expiration check first so large tables do not
// pay for manifest reads when snapshot expiry already makes them eligible.
for _, op := range ops {
if op == "expire_snapshots" && needsMaintenance(meta, config) {
return true, nil
}
}
var currentManifests []iceberg.ManifestFile
var manifestsErr error
var manifestsLoaded bool
getCurrentManifests := func() ([]iceberg.ManifestFile, error) {
if manifestsLoaded {
return currentManifests, manifestsErr
}
currentManifests, manifestsErr = loadCurrentManifests(ctx, filerClient, bucketName, tablePath, meta)
manifestsLoaded = true
return currentManifests, manifestsErr
}
computedPlanningIndexes := make(map[string]*planningIndex)
planningIndexLoaded := make(map[string]bool)
planningIndexErrs := make(map[string]error)
getPlanningIndex := func(op string) (*planningIndex, error) {
if planningIndexLoaded[op] {
return computedPlanningIndexes[op], planningIndexErrs[op]
}
planningIndexLoaded[op] = true
manifests, err := getCurrentManifests()
if err != nil {
planningIndexErrs[op] = err
return nil, err
}
index, err := buildPlanningIndexFromManifests(ctx, filerClient, bucketName, tablePath, meta, config, []string{op}, manifests)
if err != nil {
planningIndexErrs[op] = err
return nil, err
}
computedPlanningIndexes[op] = index
if index != nil {
if err := persistPlanningIndex(ctx, filerClient, bucketName, tablePath, index); err != nil {
glog.V(2).Infof("iceberg maintenance: unable to persist planning index for %s/%s: %v", bucketName, tablePath, err)
}
}
return index, nil
}
checkPlanningIndex := func(op string, eligibleFn func(*planningIndex, Config) (bool, bool)) (bool, error) {
if cachedPlanningIndex != nil && cachedPlanningIndex.matchesSnapshot(meta) {
if eligible, ok := eligibleFn(cachedPlanningIndex, config); ok {
return eligible, nil
}
}
index, err := getPlanningIndex(op)
if err != nil {
return false, err
}
if index == nil {
return false, nil
}
eligible, _ := eligibleFn(index, config)
return eligible, nil
}
var opEvalErrors []string
planningIndexErrorReported := false
for _, op := range ops {
switch op {
case "expire_snapshots":
// Handled by the metadata-only check above.
continue
case "compact":
eligible, err := checkPlanningIndex(op, (*planningIndex).compactionEligible)
if err != nil {
if !planningIndexErrorReported {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
planningIndexErrorReported = true
}
continue
}
if eligible {
return true, nil
}
case "rewrite_position_delete_files":
manifests, err := getCurrentManifests()
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
eligible, err := hasEligibleDeleteRewrite(ctx, filerClient, bucketName, tablePath, manifests, config, meta, predicate)
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
if eligible {
return true, nil
}
case "rewrite_manifests":
eligible, err := checkPlanningIndex(op, (*planningIndex).rewriteManifestsEligible)
if err != nil {
if !planningIndexErrorReported {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
planningIndexErrorReported = true
}
continue
}
if eligible {
return true, nil
}
case "remove_orphans":
if metadataFileName == "" {
_, currentMetadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath)
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
metadataFileName = currentMetadataFileName
}
orphanCandidates, err := collectOrphanCandidates(ctx, filerClient, bucketName, tablePath, meta, metadataFileName, config.OrphanOlderThanHours)
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
if len(orphanCandidates) > 0 {
return true, nil
}
}
}
if len(opEvalErrors) > 0 {
return false, fmt.Errorf("evaluate maintenance operations: %s", strings.Join(opEvalErrors, "; "))
}
return false, nil
}
func metadataFileNameFromLocation(location, bucketName, tablePath string) string {
if location == "" {
return ""
}
return path.Base(normalizeIcebergPath(location, bucketName, tablePath))
}
func countDataManifests(manifests []iceberg.ManifestFile) int64 {
var count int64
for _, mf := range manifests {
if mf.ManifestContent() == iceberg.ManifestContentData {
count++
}
}
return count
}
func loadCurrentManifests(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
meta table.Metadata,
) ([]iceberg.ManifestFile, error) {
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil || currentSnap.ManifestList == "" {
return nil, nil
}
manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList)
if err != nil {
return nil, fmt.Errorf("read manifest list: %w", err)
}
manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData))
if err != nil {
return nil, fmt.Errorf("parse manifest list: %w", err)
}
return manifests, nil
}
func hasEligibleCompaction(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
manifests []iceberg.ManifestFile,
config Config,
meta table.Metadata,
predicate *partitionPredicate,
) (bool, error) {
if len(manifests) == 0 {
return false, nil
}
minInputFiles, err := compactionMinInputFiles(config.MinInputFiles)
if err != nil {
return false, err
}
var dataManifests []iceberg.ManifestFile
specIDs := make(map[int32]struct{})
for _, mf := range manifests {
if mf.ManifestContent() != iceberg.ManifestContentData {
continue
}
dataManifests = append(dataManifests, mf)
specIDs[mf.PartitionSpecID()] = struct{}{}
}
if len(dataManifests) == 0 {
return false, nil
}
if len(specIDs) > 1 {
return false, nil
}
var allEntries []iceberg.ManifestEntry
for _, mf := range dataManifests {
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
if err != nil {
return false, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if err != nil {
return false, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err)
}
allEntries = append(allEntries, entries...)
}
candidateEntries := allEntries
if predicate != nil {
specsByID := specByID(meta)
candidateEntries = make([]iceberg.ManifestEntry, 0, len(allEntries))
for _, entry := range allEntries {
spec, ok := specsByID[int(entry.DataFile().SpecID())]
if !ok {
continue
}
match, err := predicate.Matches(spec, entry.DataFile().Partition())
if err != nil {
return false, err
}
if match {
candidateEntries = append(candidateEntries, entry)
}
}
}
rewritePlan, err := resolveCompactionRewritePlan(config, meta)
if err != nil {
return false, fmt.Errorf("resolve rewrite strategy: %w", err)
}
targetSize := compactionTargetSizeForPlan(config, rewritePlan)
bins := buildCompactionBins(candidateEntries, targetSize, minInputFiles)
bins = filterCompactionBinsByPlan(bins, config, rewritePlan)
return len(bins) > 0, nil
}
func countDataManifestsForRewrite(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
manifests []iceberg.ManifestFile,
meta table.Metadata,
predicate *partitionPredicate,
) (int64, error) {
if predicate == nil {
return countDataManifests(manifests), nil
}
specsByID := specByID(meta)
var count int64
for _, mf := range manifests {
if mf.ManifestContent() != iceberg.ManifestContentData {
continue
}
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
if err != nil {
return 0, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if err != nil {
return 0, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err)
}
if len(entries) == 0 {
continue
}
spec, ok := specsByID[int(mf.PartitionSpecID())]
if !ok {
continue
}
allMatch := len(entries) > 0
for _, entry := range entries {
match, err := predicate.Matches(spec, entry.DataFile().Partition())
if err != nil {
return 0, err
}
if !match {
allMatch = false
break
}
}
if allMatch {
count++
}
}
return count, nil
}
func compactionMinInputFiles(minInputFiles int64) (int, error) {
// Ensure the configured value is positive and fits into the platform's int type
if minInputFiles <= 0 {
return 0, fmt.Errorf("min input files must be positive, got %d", minInputFiles)
}
maxInt := int64(^uint(0) >> 1)
if minInputFiles > maxInt {
return 0, fmt.Errorf("min input files %d exceeds platform int size", minInputFiles)
}
return int(minInputFiles), nil
}
// needsMaintenance checks whether snapshot expiration work is needed based on
// metadata-only thresholds.
func needsMaintenance(meta table.Metadata, config Config) bool {
snapshots := meta.Snapshots()
if len(snapshots) == 0 {
return false
}
// Check snapshot count
if int64(len(snapshots)) > config.MaxSnapshotsToKeep {
return true
}
// Check oldest snapshot age
retentionMs := config.SnapshotRetentionHours * 3600 * 1000
nowMs := time.Now().UnixMilli()
for _, snap := range snapshots {
if nowMs-snap.TimestampMs > retentionMs {
return true
}
}
return false
}
// buildMaintenanceProposal creates a JobProposal for a table needing maintenance.
func (h *Handler) buildMaintenanceProposal(t tableInfo, filerAddress, resourceGroup string) *plugin_pb.JobProposal {
dedupeKey := fmt.Sprintf("iceberg_maintenance:%s/%s/%s", t.BucketName, t.Namespace, t.TableName)
snapshotCount := len(t.Metadata.Snapshots())
summary := fmt.Sprintf("Maintain %s/%s/%s (%d snapshots)", t.BucketName, t.Namespace, t.TableName, snapshotCount)
proposal := &plugin_pb.JobProposal{
ProposalId: fmt.Sprintf("iceberg-%s-%s-%s-%d", t.BucketName, t.Namespace, t.TableName, time.Now().UnixMilli()),
DedupeKey: dedupeKey,
JobType: jobType,
Priority: plugin_pb.JobPriority_JOB_PRIORITY_NORMAL,
Summary: summary,
Parameters: map[string]*plugin_pb.ConfigValue{
"bucket_name": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.BucketName}},
"namespace": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.Namespace}},
"table_name": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.TableName}},
"table_path": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.TablePath}},
"filer_address": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: filerAddress}},
},
Labels: map[string]string{
"bucket": t.BucketName,
"namespace": t.Namespace,
"table": t.TableName,
},
}
if resourceGroup != "" {
proposal.Parameters["resource_group"] = &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_StringValue{StringValue: resourceGroup}}
proposal.Labels["resource_group"] = resourceGroup
}
return proposal
}