You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

271 lines
8.0 KiB

package iceberg
import (
"context"
"encoding/json"
"fmt"
"path"
"time"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type planningIndex struct {
SnapshotID int64 `json:"snapshotId"`
ManifestList string `json:"manifestList,omitempty"`
UpdatedAtMs int64 `json:"updatedAtMs"`
DataManifestCount int64 `json:"dataManifestCount,omitempty"`
Compaction *planningIndexCompaction `json:"compaction,omitempty"`
RewriteManifests *planningIndexRewriteManifests `json:"rewriteManifests,omitempty"`
}
type planningIndexCompaction struct {
ConfigHash string `json:"configHash"`
Eligible bool `json:"eligible"`
}
type planningIndexRewriteManifests struct {
Threshold int64 `json:"threshold"`
Eligible bool `json:"eligible"`
}
type tableMetadataEnvelope struct {
MetadataVersion int `json:"metadataVersion"`
MetadataLocation string `json:"metadataLocation,omitempty"`
Metadata *struct {
FullMetadata json.RawMessage `json:"fullMetadata,omitempty"`
} `json:"metadata,omitempty"`
PlanningIndex json.RawMessage `json:"planningIndex,omitempty"`
}
func parseTableMetadataEnvelope(metadataBytes []byte) (table.Metadata, string, *planningIndex, error) {
var envelope tableMetadataEnvelope
if err := json.Unmarshal(metadataBytes, &envelope); err != nil {
return nil, "", nil, fmt.Errorf("parse metadata xattr: %w", err)
}
if envelope.Metadata == nil || len(envelope.Metadata.FullMetadata) == 0 {
return nil, "", nil, fmt.Errorf("no fullMetadata in table xattr")
}
meta, err := table.ParseMetadataBytes(envelope.Metadata.FullMetadata)
if err != nil {
return nil, "", nil, fmt.Errorf("parse iceberg metadata: %w", err)
}
var index *planningIndex
if len(envelope.PlanningIndex) > 0 {
if err := json.Unmarshal(envelope.PlanningIndex, &index); err != nil {
glog.V(2).Infof("iceberg maintenance: ignoring invalid planning index cache: %v", err)
index = nil
}
}
metadataFileName := metadataFileNameFromLocation(envelope.MetadataLocation, "", "")
if metadataFileName == "" {
metadataFileName = fmt.Sprintf("v%d.metadata.json", envelope.MetadataVersion)
}
return meta, metadataFileName, index, nil
}
func (idx *planningIndex) matchesSnapshot(meta table.Metadata) bool {
if idx == nil {
return false
}
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil || currentSnap.ManifestList == "" {
return false
}
return idx.SnapshotID == currentSnap.SnapshotID && idx.ManifestList == currentSnap.ManifestList
}
func (idx *planningIndex) compactionEligible(config Config) (bool, bool) {
if idx == nil || idx.Compaction == nil {
return false, false
}
if idx.Compaction.ConfigHash != compactionPlanningConfigHash(config) {
return false, false
}
return idx.Compaction.Eligible, true
}
func (idx *planningIndex) rewriteManifestsEligible(config Config) (bool, bool) {
if idx == nil || idx.RewriteManifests == nil {
return false, false
}
if idx.RewriteManifests.Threshold != config.MinManifestsToRewrite {
return false, false
}
return idx.RewriteManifests.Eligible, true
}
func compactionPlanningConfigHash(config Config) string {
return fmt.Sprintf("target=%d|min=%d|strategy=%s|sortcap=%d",
config.TargetFileSizeBytes, config.MinInputFiles,
config.RewriteStrategy, config.SortMaxInputBytes)
}
func operationRequested(ops []string, wanted string) bool {
for _, op := range ops {
if op == wanted {
return true
}
}
return false
}
func mergePlanningIndexSections(index, existing *planningIndex) *planningIndex {
if index == nil || existing == nil {
return index
}
if index.SnapshotID != existing.SnapshotID || index.ManifestList != existing.ManifestList {
return index
}
if index.Compaction == nil && existing.Compaction != nil {
compactionCopy := *existing.Compaction
index.Compaction = &compactionCopy
}
if index.RewriteManifests == nil && existing.RewriteManifests != nil {
rewriteCopy := *existing.RewriteManifests
index.RewriteManifests = &rewriteCopy
}
return index
}
func buildPlanningIndex(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
meta table.Metadata,
config Config,
ops []string,
) (*planningIndex, error) {
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil || currentSnap.ManifestList == "" {
return nil, nil
}
manifests, err := loadCurrentManifests(ctx, filerClient, bucketName, tablePath, meta)
if err != nil {
return nil, err
}
return buildPlanningIndexFromManifests(ctx, filerClient, bucketName, tablePath, meta, config, ops, manifests)
}
func buildPlanningIndexFromManifests(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
meta table.Metadata,
config Config,
ops []string,
manifests []iceberg.ManifestFile,
) (*planningIndex, error) {
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil || currentSnap.ManifestList == "" {
return nil, nil
}
index := &planningIndex{
SnapshotID: currentSnap.SnapshotID,
ManifestList: currentSnap.ManifestList,
UpdatedAtMs: time.Now().UnixMilli(),
DataManifestCount: countDataManifests(manifests),
}
if operationRequested(ops, "compact") {
eligible, err := hasEligibleCompaction(ctx, filerClient, bucketName, tablePath, manifests, config, meta, nil)
if err != nil {
return nil, err
}
index.Compaction = &planningIndexCompaction{
ConfigHash: compactionPlanningConfigHash(config),
Eligible: eligible,
}
}
if operationRequested(ops, "rewrite_manifests") {
index.RewriteManifests = &planningIndexRewriteManifests{
Threshold: config.MinManifestsToRewrite,
Eligible: index.DataManifestCount >= config.MinManifestsToRewrite,
}
}
return index, nil
}
func persistPlanningIndex(
ctx context.Context,
client filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
index *planningIndex,
) error {
if index == nil {
return nil
}
tableDir := path.Join(s3tables.TablesPath, bucketName, tablePath)
tableName := path.Base(tableDir)
parentDir := path.Dir(tableDir)
resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{
Directory: parentDir,
Name: tableName,
})
if err != nil {
return fmt.Errorf("lookup table entry: %w", err)
}
if resp == nil || resp.Entry == nil {
return fmt.Errorf("table entry not found")
}
existingXattr, ok := resp.Entry.Extended[s3tables.ExtendedKeyMetadata]
if !ok || len(existingXattr) == 0 {
return fmt.Errorf("no metadata xattr on table entry")
}
var internalMeta map[string]json.RawMessage
if err := json.Unmarshal(existingXattr, &internalMeta); err != nil {
return fmt.Errorf("unmarshal metadata xattr: %w", err)
}
if _, _, existingIndex, err := parseTableMetadataEnvelope(existingXattr); err == nil {
index = mergePlanningIndexSections(index, existingIndex)
}
indexJSON, err := json.Marshal(index)
if err != nil {
return fmt.Errorf("marshal planning index: %w", err)
}
internalMeta["planningIndex"] = indexJSON
updatedXattr, err := json.Marshal(internalMeta)
if err != nil {
return fmt.Errorf("marshal updated metadata xattr: %w", err)
}
expectedExtended := map[string][]byte{
s3tables.ExtendedKeyMetadata: existingXattr,
}
if expectedVersionXattr, ok := resp.Entry.Extended[s3tables.ExtendedKeyMetadataVersion]; ok && len(expectedVersionXattr) > 0 {
expectedExtended[s3tables.ExtendedKeyMetadataVersion] = expectedVersionXattr
}
resp.Entry.Extended[s3tables.ExtendedKeyMetadata] = updatedXattr
_, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
Directory: parentDir,
Entry: resp.Entry,
ExpectedExtended: expectedExtended,
})
if err != nil {
if status.Code(err) == codes.FailedPrecondition {
return nil
}
return fmt.Errorf("update table entry: %w", err)
}
return nil
}