You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
595 lines
18 KiB
595 lines
18 KiB
package iceberg
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"path"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/apache/iceberg-go"
|
|
"github.com/apache/iceberg-go/table"
|
|
"github.com/parquet-go/parquet-go"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
|
)
|
|
|
|
type deleteRewriteInput struct {
|
|
Entry iceberg.ManifestEntry
|
|
ReferencedPath string
|
|
Positions []int64
|
|
}
|
|
|
|
type deleteRewriteGroup struct {
|
|
SpecID int32
|
|
Partition map[int]any
|
|
PartitionKey string
|
|
ReferencedPath string
|
|
Inputs []deleteRewriteInput
|
|
TotalSize int64
|
|
}
|
|
|
|
type positionDeleteRow struct {
|
|
FilePath string `parquet:"file_path"`
|
|
Pos int64 `parquet:"pos"`
|
|
}
|
|
|
|
func hasEligibleDeleteRewrite(
|
|
ctx context.Context,
|
|
filerClient filer_pb.SeaweedFilerClient,
|
|
bucketName, tablePath string,
|
|
manifests []iceberg.ManifestFile,
|
|
config Config,
|
|
meta table.Metadata,
|
|
predicate *partitionPredicate,
|
|
) (bool, error) {
|
|
groups, _, err := collectDeleteRewriteGroups(ctx, filerClient, bucketName, tablePath, manifests)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, group := range groups {
|
|
if predicate != nil {
|
|
spec, ok := specByID(meta)[int(group.SpecID)]
|
|
if !ok {
|
|
continue
|
|
}
|
|
match, err := predicate.Matches(spec, group.Partition)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !match {
|
|
continue
|
|
}
|
|
}
|
|
if groupEligibleForRewrite(group, config) {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func collectDeleteRewriteGroups(
|
|
ctx context.Context,
|
|
filerClient filer_pb.SeaweedFilerClient,
|
|
bucketName, tablePath string,
|
|
manifests []iceberg.ManifestFile,
|
|
) (map[string]*deleteRewriteGroup, []iceberg.ManifestEntry, error) {
|
|
groups := make(map[string]*deleteRewriteGroup)
|
|
var allPositionEntries []iceberg.ManifestEntry
|
|
|
|
for _, mf := range manifests {
|
|
if mf.ManifestContent() != iceberg.ManifestContentDeletes {
|
|
continue
|
|
}
|
|
|
|
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("read delete manifest %s: %w", mf.FilePath(), err)
|
|
}
|
|
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("parse delete manifest %s: %w", mf.FilePath(), err)
|
|
}
|
|
|
|
for _, entry := range entries {
|
|
if entry.DataFile().ContentType() != iceberg.EntryContentPosDeletes {
|
|
continue
|
|
}
|
|
|
|
allPositionEntries = append(allPositionEntries, entry)
|
|
|
|
fileDeletes, err := readPositionDeleteFile(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath())
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("read position delete file %s: %w", entry.DataFile().FilePath(), err)
|
|
}
|
|
if len(fileDeletes) != 1 {
|
|
// Phase 1 only rewrites files that target a single data file.
|
|
continue
|
|
}
|
|
|
|
var referencedPath string
|
|
var positions []int64
|
|
for fp, pos := range fileDeletes {
|
|
referencedPath = normalizeIcebergPath(fp, bucketName, tablePath)
|
|
positions = append(positions, pos...)
|
|
}
|
|
sort.Slice(positions, func(i, j int) bool { return positions[i] < positions[j] })
|
|
|
|
partKey := partitionKey(entry.DataFile().Partition())
|
|
groupKey := fmt.Sprintf("spec%d\x00%s\x00%s", entry.DataFile().SpecID(), partKey, referencedPath)
|
|
group, ok := groups[groupKey]
|
|
if !ok {
|
|
group = &deleteRewriteGroup{
|
|
SpecID: entry.DataFile().SpecID(),
|
|
Partition: entry.DataFile().Partition(),
|
|
PartitionKey: partKey,
|
|
ReferencedPath: referencedPath,
|
|
}
|
|
groups[groupKey] = group
|
|
}
|
|
group.Inputs = append(group.Inputs, deleteRewriteInput{
|
|
Entry: entry,
|
|
ReferencedPath: referencedPath,
|
|
Positions: positions,
|
|
})
|
|
group.TotalSize += entry.DataFile().FileSizeBytes()
|
|
}
|
|
}
|
|
|
|
return groups, allPositionEntries, nil
|
|
}
|
|
|
|
func groupEligibleForRewrite(group *deleteRewriteGroup, config Config) bool {
|
|
if group == nil {
|
|
return false
|
|
}
|
|
if len(group.Inputs) < 2 {
|
|
return false
|
|
}
|
|
if group.TotalSize > config.DeleteMaxFileGroupSizeBytes {
|
|
return false
|
|
}
|
|
target := config.DeleteTargetFileSizeBytes
|
|
if target <= 0 {
|
|
target = defaultDeleteTargetFileSizeMB * 1024 * 1024
|
|
}
|
|
outputFiles := int64(estimatedDeleteOutputFiles(group.TotalSize, target))
|
|
if config.DeleteMaxOutputFiles > 0 && outputFiles > config.DeleteMaxOutputFiles {
|
|
return false
|
|
}
|
|
return int64(len(group.Inputs)) >= config.DeleteMinInputFiles
|
|
}
|
|
|
|
func estimatedDeleteOutputFiles(totalSize, targetSize int64) int {
|
|
if totalSize <= 0 || targetSize <= 0 {
|
|
return 1
|
|
}
|
|
count := int(math.Ceil(float64(totalSize) / float64(targetSize)))
|
|
if count < 1 {
|
|
return 1
|
|
}
|
|
return count
|
|
}
|
|
|
|
func manifestEntrySeqNum(entry iceberg.ManifestEntry) *int64 {
|
|
seqNum := entry.SequenceNum()
|
|
if seqNum < 0 {
|
|
return nil
|
|
}
|
|
return &seqNum
|
|
}
|
|
|
|
func manifestEntryFileSeqNum(entry iceberg.ManifestEntry) *int64 {
|
|
if fileSeqNum := entry.FileSequenceNum(); fileSeqNum != nil {
|
|
value := *fileSeqNum
|
|
return &value
|
|
}
|
|
return manifestEntrySeqNum(entry)
|
|
}
|
|
|
|
func writeManifestWithContent(
|
|
filename string,
|
|
version int,
|
|
spec iceberg.PartitionSpec,
|
|
schema *iceberg.Schema,
|
|
snapshotID int64,
|
|
entries []iceberg.ManifestEntry,
|
|
content iceberg.ManifestContent,
|
|
) (iceberg.ManifestFile, []byte, error) {
|
|
var manifestBuf bytes.Buffer
|
|
mf, err := iceberg.WriteManifest(filename, &manifestBuf, version, spec, schema, snapshotID, entries)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
manifestBytes := manifestBuf.Bytes()
|
|
if content == iceberg.ManifestContentDeletes {
|
|
manifestBytes, err = patchManifestContentBytesToDeletes(manifestBytes)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
rebuilt := iceberg.NewManifestFile(version, filename, int64(len(manifestBytes)), int32(spec.ID()), snapshotID).
|
|
Content(content).
|
|
AddedFiles(mf.AddedDataFiles()).
|
|
ExistingFiles(mf.ExistingDataFiles()).
|
|
DeletedFiles(mf.DeletedDataFiles()).
|
|
AddedRows(mf.AddedRows()).
|
|
ExistingRows(mf.ExistingRows()).
|
|
DeletedRows(mf.DeletedRows()).
|
|
Partitions(mf.Partitions()).
|
|
Build()
|
|
return rebuilt, manifestBytes, nil
|
|
}
|
|
|
|
func patchManifestContentBytesToDeletes(manifestBytes []byte) ([]byte, error) {
|
|
old := append([]byte{0x0e}, []byte("content")...)
|
|
old = append(old, 0x08)
|
|
old = append(old, []byte("data")...)
|
|
|
|
new := append([]byte{0x0e}, []byte("content")...)
|
|
new = append(new, 0x0e)
|
|
new = append(new, []byte("deletes")...)
|
|
|
|
result := bytes.Replace(manifestBytes, old, new, 1)
|
|
if bytes.Equal(result, manifestBytes) {
|
|
return nil, fmt.Errorf("delete manifest content patch failed")
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func writePositionDeleteFile(rows []positionDeleteRow) ([]byte, error) {
|
|
var buf bytes.Buffer
|
|
writer := parquet.NewWriter(&buf, parquet.SchemaOf(new(positionDeleteRow)))
|
|
for _, row := range rows {
|
|
if err := writer.Write(&row); err != nil {
|
|
return nil, fmt.Errorf("write position delete row: %w", err)
|
|
}
|
|
}
|
|
if err := writer.Close(); err != nil {
|
|
return nil, fmt.Errorf("close position delete file: %w", err)
|
|
}
|
|
return buf.Bytes(), nil
|
|
}
|
|
|
|
func (h *Handler) rewritePositionDeleteFiles(
|
|
ctx context.Context,
|
|
filerClient filer_pb.SeaweedFilerClient,
|
|
bucketName, tablePath string,
|
|
config Config,
|
|
) (string, map[string]int64, error) {
|
|
start := time.Now()
|
|
meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("load metadata: %w", err)
|
|
}
|
|
|
|
currentSnap := meta.CurrentSnapshot()
|
|
if currentSnap == nil || currentSnap.ManifestList == "" {
|
|
return "no current snapshot", nil, nil
|
|
}
|
|
|
|
manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("read manifest list: %w", err)
|
|
}
|
|
manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData))
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("parse manifest list: %w", err)
|
|
}
|
|
|
|
var dataManifests []iceberg.ManifestFile
|
|
var allEqualityEntries []iceberg.ManifestEntry
|
|
for _, mf := range manifests {
|
|
switch mf.ManifestContent() {
|
|
case iceberg.ManifestContentData:
|
|
dataManifests = append(dataManifests, mf)
|
|
case iceberg.ManifestContentDeletes:
|
|
manifestData, readErr := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
|
|
if readErr != nil {
|
|
return "", nil, fmt.Errorf("read delete manifest %s: %w", mf.FilePath(), readErr)
|
|
}
|
|
entries, parseErr := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
|
|
if parseErr != nil {
|
|
return "", nil, fmt.Errorf("parse delete manifest %s: %w", mf.FilePath(), parseErr)
|
|
}
|
|
for _, entry := range entries {
|
|
if entry.DataFile().ContentType() == iceberg.EntryContentEqDeletes {
|
|
allEqualityEntries = append(allEqualityEntries, entry)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
groupMap, allPositionEntries, err := collectDeleteRewriteGroups(ctx, filerClient, bucketName, tablePath, manifests)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
if len(groupMap) == 0 {
|
|
return "no position delete files eligible for rewrite", nil, nil
|
|
}
|
|
|
|
type artifact struct {
|
|
dir, fileName string
|
|
}
|
|
var writtenArtifacts []artifact
|
|
committed := false
|
|
defer func() {
|
|
if committed || len(writtenArtifacts) == 0 {
|
|
return
|
|
}
|
|
cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
for _, a := range writtenArtifacts {
|
|
if err := deleteFilerFile(cleanupCtx, filerClient, a.dir, a.fileName); err != nil {
|
|
glog.Warningf("iceberg delete rewrite: failed to clean up artifact %s/%s: %v", a.dir, a.fileName, err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
specByID := specByID(meta)
|
|
predicate, err := parsePartitionPredicate(config.Where, meta)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
type specEntries struct {
|
|
specID int32
|
|
entries []iceberg.ManifestEntry
|
|
}
|
|
specEntriesMap := make(map[int32]*specEntries)
|
|
addToSpec := func(specID int32, entry iceberg.ManifestEntry) {
|
|
se, ok := specEntriesMap[specID]
|
|
if !ok {
|
|
se = &specEntries{specID: specID}
|
|
specEntriesMap[specID] = se
|
|
}
|
|
se.entries = append(se.entries, entry)
|
|
}
|
|
|
|
newSnapID := time.Now().UnixMilli()
|
|
version := meta.Version()
|
|
snapshotID := currentSnap.SnapshotID
|
|
seqNum := currentSnap.SequenceNumber + 1
|
|
metaDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "metadata")
|
|
dataDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "data")
|
|
artifactSuffix := compactRandomSuffix()
|
|
|
|
replacedPaths := make(map[string]struct{})
|
|
var rewrittenGroups int64
|
|
var skippedGroups int64
|
|
var deleteFilesRewritten int64
|
|
var deleteFilesWritten int64
|
|
var deleteBytesRewritten int64
|
|
|
|
sortedKeys := make([]string, 0, len(groupMap))
|
|
for key := range groupMap {
|
|
sortedKeys = append(sortedKeys, key)
|
|
}
|
|
sort.Strings(sortedKeys)
|
|
|
|
for _, key := range sortedKeys {
|
|
group := groupMap[key]
|
|
if predicate != nil {
|
|
spec, ok := specByID[int(group.SpecID)]
|
|
if !ok {
|
|
continue
|
|
}
|
|
match, err := predicate.Matches(spec, group.Partition)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
if !match {
|
|
skippedGroups++
|
|
continue
|
|
}
|
|
}
|
|
if !groupEligibleForRewrite(group, config) {
|
|
skippedGroups++
|
|
continue
|
|
}
|
|
rows := make([]positionDeleteRow, 0)
|
|
for _, input := range group.Inputs {
|
|
for _, pos := range input.Positions {
|
|
rows = append(rows, positionDeleteRow{FilePath: input.ReferencedPath, Pos: pos})
|
|
}
|
|
replacedPaths[input.Entry.DataFile().FilePath()] = struct{}{}
|
|
deleteFilesRewritten++
|
|
deleteBytesRewritten += input.Entry.DataFile().FileSizeBytes()
|
|
}
|
|
sort.Slice(rows, func(i, j int) bool {
|
|
if rows[i].FilePath != rows[j].FilePath {
|
|
return rows[i].FilePath < rows[j].FilePath
|
|
}
|
|
return rows[i].Pos < rows[j].Pos
|
|
})
|
|
|
|
outputFiles := estimatedDeleteOutputFiles(group.TotalSize, config.DeleteTargetFileSizeBytes)
|
|
rowsPerFile := (len(rows) + outputFiles - 1) / outputFiles
|
|
if rowsPerFile < 1 {
|
|
rowsPerFile = len(rows)
|
|
}
|
|
|
|
for startIdx, fileIdx := 0, 0; startIdx < len(rows); startIdx, fileIdx = startIdx+rowsPerFile, fileIdx+1 {
|
|
endIdx := startIdx + rowsPerFile
|
|
if endIdx > len(rows) {
|
|
endIdx = len(rows)
|
|
}
|
|
outputRows := rows[startIdx:endIdx]
|
|
deleteBytes, err := writePositionDeleteFile(outputRows)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
fileName := fmt.Sprintf("rewrite-delete-%d-%s-%d.parquet", newSnapID, artifactSuffix, deleteFilesWritten)
|
|
if err := ensureFilerDir(ctx, filerClient, dataDir); err != nil {
|
|
return "", nil, fmt.Errorf("ensure data dir: %w", err)
|
|
}
|
|
if err := saveFilerFile(ctx, filerClient, dataDir, fileName, deleteBytes); err != nil {
|
|
return "", nil, fmt.Errorf("save rewritten delete file: %w", err)
|
|
}
|
|
writtenArtifacts = append(writtenArtifacts, artifact{dir: dataDir, fileName: fileName})
|
|
|
|
spec, ok := specByID[int(group.SpecID)]
|
|
if !ok {
|
|
return "", nil, fmt.Errorf("partition spec %d not found", group.SpecID)
|
|
}
|
|
dfBuilder, err := iceberg.NewDataFileBuilder(
|
|
spec,
|
|
iceberg.EntryContentPosDeletes,
|
|
path.Join("data", fileName),
|
|
iceberg.ParquetFile,
|
|
group.Partition,
|
|
nil, nil,
|
|
int64(len(outputRows)),
|
|
int64(len(deleteBytes)),
|
|
)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("build rewritten delete file: %w", err)
|
|
}
|
|
entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &newSnapID, nil, nil, dfBuilder.Build())
|
|
addToSpec(group.SpecID, entry)
|
|
deleteFilesWritten++
|
|
}
|
|
|
|
for _, input := range group.Inputs {
|
|
delEntry := iceberg.NewManifestEntry(
|
|
iceberg.EntryStatusDELETED,
|
|
&newSnapID,
|
|
manifestEntrySeqNum(input.Entry),
|
|
manifestEntryFileSeqNum(input.Entry),
|
|
input.Entry.DataFile(),
|
|
)
|
|
addToSpec(group.SpecID, delEntry)
|
|
}
|
|
rewrittenGroups++
|
|
}
|
|
|
|
if rewrittenGroups == 0 {
|
|
return "no position delete files eligible for rewrite", nil, nil
|
|
}
|
|
|
|
for _, entry := range allEqualityEntries {
|
|
existingEntry := iceberg.NewManifestEntry(
|
|
iceberg.EntryStatusEXISTING,
|
|
func() *int64 { id := entry.SnapshotID(); return &id }(),
|
|
manifestEntrySeqNum(entry),
|
|
manifestEntryFileSeqNum(entry),
|
|
entry.DataFile(),
|
|
)
|
|
addToSpec(entry.DataFile().SpecID(), existingEntry)
|
|
}
|
|
|
|
for _, entry := range allPositionEntries {
|
|
if _, replaced := replacedPaths[entry.DataFile().FilePath()]; replaced {
|
|
continue
|
|
}
|
|
existingEntry := iceberg.NewManifestEntry(
|
|
iceberg.EntryStatusEXISTING,
|
|
func() *int64 { id := entry.SnapshotID(); return &id }(),
|
|
manifestEntrySeqNum(entry),
|
|
manifestEntryFileSeqNum(entry),
|
|
entry.DataFile(),
|
|
)
|
|
addToSpec(entry.DataFile().SpecID(), existingEntry)
|
|
}
|
|
|
|
sortedSpecIDs := make([]int32, 0, len(specEntriesMap))
|
|
for specID := range specEntriesMap {
|
|
sortedSpecIDs = append(sortedSpecIDs, specID)
|
|
}
|
|
sort.Slice(sortedSpecIDs, func(i, j int) bool { return sortedSpecIDs[i] < sortedSpecIDs[j] })
|
|
|
|
allManifests := make([]iceberg.ManifestFile, 0, len(dataManifests)+len(sortedSpecIDs))
|
|
allManifests = append(allManifests, dataManifests...)
|
|
|
|
for _, specID := range sortedSpecIDs {
|
|
spec, ok := specByID[int(specID)]
|
|
if !ok {
|
|
return "", nil, fmt.Errorf("partition spec %d not found", specID)
|
|
}
|
|
manifestName := fmt.Sprintf("rewrite-delete-%d-%s-spec%d.avro", newSnapID, artifactSuffix, specID)
|
|
manifestPath := path.Join("metadata", manifestName)
|
|
mf, manifestBytes, err := writeManifestWithContent(
|
|
manifestPath,
|
|
version,
|
|
spec,
|
|
meta.CurrentSchema(),
|
|
newSnapID,
|
|
specEntriesMap[specID].entries,
|
|
iceberg.ManifestContentDeletes,
|
|
)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("write delete manifest for spec %d: %w", specID, err)
|
|
}
|
|
if err := saveFilerFile(ctx, filerClient, metaDir, manifestName, manifestBytes); err != nil {
|
|
return "", nil, fmt.Errorf("save delete manifest for spec %d: %w", specID, err)
|
|
}
|
|
writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestName})
|
|
allManifests = append(allManifests, mf)
|
|
}
|
|
|
|
var manifestListBuf bytes.Buffer
|
|
if err := iceberg.WriteManifestList(version, &manifestListBuf, newSnapID, &snapshotID, &seqNum, 0, allManifests); err != nil {
|
|
return "", nil, fmt.Errorf("write delete manifest list: %w", err)
|
|
}
|
|
manifestListName := fmt.Sprintf("snap-%d-%s.avro", newSnapID, artifactSuffix)
|
|
if err := saveFilerFile(ctx, filerClient, metaDir, manifestListName, manifestListBuf.Bytes()); err != nil {
|
|
return "", nil, fmt.Errorf("save delete manifest list: %w", err)
|
|
}
|
|
writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestListName})
|
|
|
|
manifestListLocation := path.Join("metadata", manifestListName)
|
|
err = h.commitWithRetry(ctx, filerClient, bucketName, tablePath, metadataFileName, config, func(currentMeta table.Metadata, builder *table.MetadataBuilder) error {
|
|
cs := currentMeta.CurrentSnapshot()
|
|
if cs == nil || cs.SnapshotID != snapshotID {
|
|
return errStalePlan
|
|
}
|
|
newSnapshot := &table.Snapshot{
|
|
SnapshotID: newSnapID,
|
|
ParentSnapshotID: &snapshotID,
|
|
SequenceNumber: seqNum,
|
|
TimestampMs: newSnapID,
|
|
ManifestList: manifestListLocation,
|
|
Summary: &table.Summary{
|
|
Operation: table.OpReplace,
|
|
Properties: map[string]string{
|
|
"maintenance": "rewrite_position_delete_files",
|
|
"delete-files-rewritten": fmt.Sprintf("%d", deleteFilesRewritten),
|
|
"delete-files-written": fmt.Sprintf("%d", deleteFilesWritten),
|
|
"delete-groups": fmt.Sprintf("%d", rewrittenGroups),
|
|
},
|
|
},
|
|
SchemaID: func() *int {
|
|
id := meta.CurrentSchema().ID
|
|
return &id
|
|
}(),
|
|
}
|
|
if err := builder.AddSnapshot(newSnapshot); err != nil {
|
|
return err
|
|
}
|
|
return builder.SetSnapshotRef(table.MainBranch, newSnapID, table.BranchRef)
|
|
})
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("commit delete rewrite: %w", err)
|
|
}
|
|
|
|
committed = true
|
|
metrics := map[string]int64{
|
|
MetricDeleteFilesRewritten: deleteFilesRewritten,
|
|
MetricDeleteFilesWritten: deleteFilesWritten,
|
|
MetricDeleteBytesRewritten: deleteBytesRewritten,
|
|
MetricDeleteGroupsPlanned: rewrittenGroups,
|
|
MetricDeleteGroupsSkipped: skippedGroups,
|
|
MetricDurationMs: time.Since(start).Milliseconds(),
|
|
}
|
|
return fmt.Sprintf(
|
|
"rewrote %d position delete files into %d across %d group(s)",
|
|
deleteFilesRewritten,
|
|
deleteFilesWritten,
|
|
rewrittenGroups,
|
|
), metrics, nil
|
|
}
|