You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

431 lines
13 KiB

package iceberg
import (
"bytes"
"context"
"encoding/json"
"fmt"
"path"
"strings"
"time"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
)
// tableInfo captures metadata about a table for detection/execution.
type tableInfo struct {
BucketName string
Namespace string
TableName string
TablePath string // namespace/tableName
MetadataFileName string
Metadata table.Metadata
}
// scanTablesForMaintenance enumerates table buckets and their tables,
// evaluating which ones need maintenance based on metadata thresholds.
// When limit > 0 the scan stops after collecting limit+1 results so the
// caller can determine whether more tables remain (HasMore).
func (h *Handler) scanTablesForMaintenance(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
config Config,
bucketFilter, namespaceFilter, tableFilter string,
limit int,
) ([]tableInfo, error) {
var tables []tableInfo
ops, err := parseOperations(config.Operations)
if err != nil {
return nil, fmt.Errorf("parse operations: %w", err)
}
// Compile wildcard matchers once (nil = match all)
bucketMatchers := wildcard.CompileWildcardMatchers(bucketFilter)
nsMatchers := wildcard.CompileWildcardMatchers(namespaceFilter)
tableMatchers := wildcard.CompileWildcardMatchers(tableFilter)
bucketsPath := s3tables.TablesPath
bucketEntries, err := listFilerEntries(ctx, filerClient, bucketsPath, "")
if err != nil {
return nil, fmt.Errorf("list buckets: %w", err)
}
for _, bucketEntry := range bucketEntries {
select {
case <-ctx.Done():
return tables, ctx.Err()
default:
}
if !bucketEntry.IsDirectory || !s3tables.IsTableBucketEntry(bucketEntry) {
continue
}
bucketName := bucketEntry.Name
if !wildcard.MatchesAnyWildcard(bucketMatchers, bucketName) {
continue
}
// List namespaces within the bucket
bucketPath := path.Join(bucketsPath, bucketName)
nsEntries, err := listFilerEntries(ctx, filerClient, bucketPath, "")
if err != nil {
glog.Warningf("iceberg maintenance: failed to list namespaces in bucket %s: %v", bucketName, err)
continue
}
for _, nsEntry := range nsEntries {
select {
case <-ctx.Done():
return tables, ctx.Err()
default:
}
if !nsEntry.IsDirectory {
continue
}
nsName := nsEntry.Name
if !wildcard.MatchesAnyWildcard(nsMatchers, nsName) {
continue
}
// Skip internal directories
if strings.HasPrefix(nsName, ".") {
continue
}
// List tables within the namespace
nsPath := path.Join(bucketPath, nsName)
tableEntries, err := listFilerEntries(ctx, filerClient, nsPath, "")
if err != nil {
glog.Warningf("iceberg maintenance: failed to list tables in %s/%s: %v", bucketName, nsName, err)
continue
}
for _, tableEntry := range tableEntries {
if !tableEntry.IsDirectory {
continue
}
tblName := tableEntry.Name
if !wildcard.MatchesAnyWildcard(tableMatchers, tblName) {
continue
}
// Check if this entry has table metadata
metadataBytes, ok := tableEntry.Extended[s3tables.ExtendedKeyMetadata]
if !ok || len(metadataBytes) == 0 {
continue
}
// Parse the internal metadata to get FullMetadata
var internalMeta struct {
MetadataLocation string `json:"metadataLocation,omitempty"`
Metadata *struct {
FullMetadata json.RawMessage `json:"fullMetadata,omitempty"`
} `json:"metadata,omitempty"`
}
if err := json.Unmarshal(metadataBytes, &internalMeta); err != nil {
glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot parse metadata: %v", bucketName, nsName, tblName, err)
continue
}
if internalMeta.Metadata == nil || len(internalMeta.Metadata.FullMetadata) == 0 {
continue
}
icebergMeta, err := table.ParseMetadataBytes(internalMeta.Metadata.FullMetadata)
if err != nil {
glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot parse iceberg metadata: %v", bucketName, nsName, tblName, err)
continue
}
tablePath := path.Join(nsName, tblName)
metadataFileName := metadataFileNameFromLocation(internalMeta.MetadataLocation, bucketName, tablePath)
needsWork, err := h.tableNeedsMaintenance(ctx, filerClient, bucketName, tablePath, icebergMeta, metadataFileName, config, ops)
if err != nil {
glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot evaluate maintenance need: %v", bucketName, nsName, tblName, err)
continue
}
if needsWork {
tables = append(tables, tableInfo{
BucketName: bucketName,
Namespace: nsName,
TableName: tblName,
TablePath: tablePath,
MetadataFileName: metadataFileName,
Metadata: icebergMeta,
})
if limit > 0 && len(tables) > limit {
return tables, nil
}
}
}
}
}
return tables, nil
}
func normalizeDetectionConfig(config Config) Config {
normalized := config
if normalized.TargetFileSizeBytes <= 0 {
normalized.TargetFileSizeBytes = defaultTargetFileSizeMB * 1024 * 1024
}
if normalized.MinInputFiles < 2 {
normalized.MinInputFiles = defaultMinInputFiles
}
if normalized.MinManifestsToRewrite < minManifestsToRewrite {
normalized.MinManifestsToRewrite = minManifestsToRewrite
}
if normalized.OrphanOlderThanHours <= 0 {
normalized.OrphanOlderThanHours = defaultOrphanOlderThanHours
}
return normalized
}
func (h *Handler) tableNeedsMaintenance(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
meta table.Metadata,
metadataFileName string,
config Config,
ops []string,
) (bool, error) {
config = normalizeDetectionConfig(config)
// Evaluate the metadata-only expiration check first so large tables do not
// pay for manifest reads when snapshot expiry already makes them eligible.
for _, op := range ops {
if op == "expire_snapshots" && needsMaintenance(meta, config) {
return true, nil
}
}
loadManifests := func() ([]iceberg.ManifestFile, error) {
return loadCurrentManifests(ctx, filerClient, bucketName, tablePath, meta)
}
var currentManifests []iceberg.ManifestFile
var manifestsErr error
manifestsLoaded := false
getCurrentManifests := func() ([]iceberg.ManifestFile, error) {
if manifestsLoaded {
return currentManifests, manifestsErr
}
currentManifests, manifestsErr = loadManifests()
manifestsLoaded = true
return currentManifests, manifestsErr
}
var opEvalErrors []string
for _, op := range ops {
switch op {
case "expire_snapshots":
// Handled by the metadata-only check above.
continue
case "compact":
manifests, err := getCurrentManifests()
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
eligible, err := hasEligibleCompaction(ctx, filerClient, bucketName, tablePath, manifests, config)
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
if eligible {
return true, nil
}
case "rewrite_manifests":
manifests, err := getCurrentManifests()
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
if countDataManifests(manifests) >= config.MinManifestsToRewrite {
return true, nil
}
case "remove_orphans":
if metadataFileName == "" {
_, currentMetadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath)
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
metadataFileName = currentMetadataFileName
}
orphanCandidates, err := collectOrphanCandidates(ctx, filerClient, bucketName, tablePath, meta, metadataFileName, config.OrphanOlderThanHours)
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
if len(orphanCandidates) > 0 {
return true, nil
}
}
}
if len(opEvalErrors) > 0 {
return false, fmt.Errorf("evaluate maintenance operations: %s", strings.Join(opEvalErrors, "; "))
}
return false, nil
}
func metadataFileNameFromLocation(location, bucketName, tablePath string) string {
if location == "" {
return ""
}
return path.Base(normalizeIcebergPath(location, bucketName, tablePath))
}
func countDataManifests(manifests []iceberg.ManifestFile) int64 {
var count int64
for _, mf := range manifests {
if mf.ManifestContent() == iceberg.ManifestContentData {
count++
}
}
return count
}
func loadCurrentManifests(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
meta table.Metadata,
) ([]iceberg.ManifestFile, error) {
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil || currentSnap.ManifestList == "" {
return nil, nil
}
manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList)
if err != nil {
return nil, fmt.Errorf("read manifest list: %w", err)
}
manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData))
if err != nil {
return nil, fmt.Errorf("parse manifest list: %w", err)
}
return manifests, nil
}
func hasEligibleCompaction(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
manifests []iceberg.ManifestFile,
config Config,
) (bool, error) {
if len(manifests) == 0 {
return false, nil
}
minInputFiles, err := compactionMinInputFiles(config.MinInputFiles)
if err != nil {
return false, err
}
var dataManifests []iceberg.ManifestFile
specIDs := make(map[int32]struct{})
for _, mf := range manifests {
if mf.ManifestContent() != iceberg.ManifestContentData {
continue
}
dataManifests = append(dataManifests, mf)
specIDs[mf.PartitionSpecID()] = struct{}{}
}
if len(dataManifests) == 0 {
return false, nil
}
if len(specIDs) > 1 {
return false, nil
}
var allEntries []iceberg.ManifestEntry
for _, mf := range dataManifests {
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
if err != nil {
return false, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if err != nil {
return false, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err)
}
allEntries = append(allEntries, entries...)
}
bins := buildCompactionBins(allEntries, config.TargetFileSizeBytes, minInputFiles)
return len(bins) > 0, nil
}
func compactionMinInputFiles(minInputFiles int64) (int, error) {
// Ensure the configured value is positive and fits into the platform's int type
if minInputFiles <= 0 {
return 0, fmt.Errorf("min input files must be positive, got %d", minInputFiles)
}
maxInt := int64(^uint(0) >> 1)
if minInputFiles > maxInt {
return 0, fmt.Errorf("min input files %d exceeds platform int size", minInputFiles)
}
return int(minInputFiles), nil
}
// needsMaintenance checks whether snapshot expiration work is needed based on
// metadata-only thresholds.
func needsMaintenance(meta table.Metadata, config Config) bool {
snapshots := meta.Snapshots()
if len(snapshots) == 0 {
return false
}
// Check snapshot count
if int64(len(snapshots)) > config.MaxSnapshotsToKeep {
return true
}
// Check oldest snapshot age
retentionMs := config.SnapshotRetentionHours * 3600 * 1000
nowMs := time.Now().UnixMilli()
for _, snap := range snapshots {
if nowMs-snap.TimestampMs > retentionMs {
return true
}
}
return false
}
// buildMaintenanceProposal creates a JobProposal for a table needing maintenance.
func (h *Handler) buildMaintenanceProposal(t tableInfo, filerAddress string) *plugin_pb.JobProposal {
dedupeKey := fmt.Sprintf("iceberg_maintenance:%s/%s/%s", t.BucketName, t.Namespace, t.TableName)
snapshotCount := len(t.Metadata.Snapshots())
summary := fmt.Sprintf("Maintain %s/%s/%s (%d snapshots)", t.BucketName, t.Namespace, t.TableName, snapshotCount)
return &plugin_pb.JobProposal{
ProposalId: fmt.Sprintf("iceberg-%s-%s-%s-%d", t.BucketName, t.Namespace, t.TableName, time.Now().UnixMilli()),
DedupeKey: dedupeKey,
JobType: jobType,
Priority: plugin_pb.JobPriority_JOB_PRIORITY_NORMAL,
Summary: summary,
Parameters: map[string]*plugin_pb.ConfigValue{
"bucket_name": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.BucketName}},
"namespace": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.Namespace}},
"table_name": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.TableName}},
"table_path": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.TablePath}},
"filer_address": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: filerAddress}},
},
Labels: map[string]string{
"bucket": t.BucketName,
"namespace": t.Namespace,
"table": t.TableName,
},
}
}