From fb3dc6a695889c0fc3eabfa7cde65976cb13f7a3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 9 Feb 2026 22:32:53 -0800 Subject: [PATCH] iceberg: refine commit retry and statistics patching --- weed/s3api/iceberg/iceberg.go | 47 ++++++++++++------- .../iceberg/iceberg_commit_updates_test.go | 8 +++- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index 10c0f4b14..f0b4703f8 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -292,6 +292,8 @@ type statisticsUpdate struct { remove *int64 } +var ErrIncompleteSetStatistics = errors.New("set-statistics requires snapshot-id, statistics-path, file-size-in-bytes, and file-footer-size-in-bytes") + type commitAction struct { Action string `json:"action"` } @@ -312,7 +314,7 @@ func (u *setStatisticsUpdate) asStatisticsFile() (*table.StatisticsFile, error) return u.Statistics, nil } if u.SnapshotID == nil || u.StatisticsPath == "" || u.FileSizeInBytes == nil || u.FileFooterSizeInBytes == nil { - return nil, errors.New("set-statistics requires snapshot-id, statistics-path, file-size-in-bytes, and file-footer-size-in-bytes") + return nil, ErrIncompleteSetStatistics } stats := &table.StatisticsFile{ @@ -400,27 +402,38 @@ func applyStatisticsUpdates(metadataBytes []byte, updates []statisticsUpdate) ([ } } + statisticsBySnapshot := make(map[int64]table.StatisticsFile, len(statistics)) + orderedSnapshotIDs := make([]int64, 0, len(statistics)) + inOrder := make(map[int64]bool, len(statistics)) + for _, stat := range statistics { + statisticsBySnapshot[stat.SnapshotID] = stat + if !inOrder[stat.SnapshotID] { + orderedSnapshotIDs = append(orderedSnapshotIDs, stat.SnapshotID) + inOrder[stat.SnapshotID] = true + } + } + for _, update := range updates { if update.set != nil { - next := make([]table.StatisticsFile, 0, len(statistics)+1) - for _, existing := range statistics { - if existing.SnapshotID != update.set.SnapshotID { - next = append(next, existing) - } + statisticsBySnapshot[update.set.SnapshotID] = *update.set + if !inOrder[update.set.SnapshotID] { + orderedSnapshotIDs = append(orderedSnapshotIDs, update.set.SnapshotID) + inOrder[update.set.SnapshotID] = true } - next = append(next, *update.set) - statistics = next continue } if update.remove != nil { - next := make([]table.StatisticsFile, 0, len(statistics)) - for _, existing := range statistics { - if existing.SnapshotID != *update.remove { - next = append(next, existing) - } - } - statistics = next + delete(statisticsBySnapshot, *update.remove) + } + } + + statistics = make([]table.StatisticsFile, 0, len(statisticsBySnapshot)) + for _, snapshotID := range orderedSnapshotIDs { + stat, ok := statisticsBySnapshot[snapshotID] + if !ok { + continue } + statistics = append(statistics, stat) } if len(statistics) == 0 { @@ -1249,6 +1262,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { } maxCommitAttempts := 3 + generatedLegacyUUID := uuid.New() for attempt := 1; attempt <= maxCommitAttempts; attempt++ { getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, @@ -1282,7 +1296,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { } } if tableUUID == uuid.Nil { - tableUUID = uuid.New() + tableUUID = generatedLegacyUUID } var currentMetadata table.Metadata @@ -1390,6 +1404,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { } if attempt < maxCommitAttempts { glog.V(1).Infof("Iceberg: CommitTable conflict for %s (attempt %d/%d), retrying", tableName, attempt, maxCommitAttempts) + time.Sleep(time.Duration(50*attempt) * time.Millisecond) continue } writeError(w, http.StatusConflict, "CommitFailedException", "Version token mismatch") diff --git a/weed/s3api/iceberg/iceberg_commit_updates_test.go b/weed/s3api/iceberg/iceberg_commit_updates_test.go index e583d4216..96ed7e402 100644 --- a/weed/s3api/iceberg/iceberg_commit_updates_test.go +++ b/weed/s3api/iceberg/iceberg_commit_updates_test.go @@ -39,10 +39,13 @@ func TestParseCommitUpdatesRejectsIncompleteSetStatistics(t *testing.T) { if err == nil { t.Fatalf("parseCommitUpdates() expected error") } + if !errors.Is(err, ErrIncompleteSetStatistics) { + t.Fatalf("parseCommitUpdates() error = %v, want ErrIncompleteSetStatistics", err) + } } func TestApplyStatisticsUpdatesUpsertAndRemove(t *testing.T) { - metadata := []byte(`{"format-version":2,"statistics":[{"snapshot-id":1,"statistics-path":"s3://bucket/stats-1.puffin","file-size-in-bytes":10,"file-footer-size-in-bytes":1,"blob-metadata":[]}]} `) + metadata := []byte(`{"format-version":2,"statistics":[{"snapshot-id":1,"statistics-path":"s3://bucket/stats-1.puffin","file-size-in-bytes":10,"file-footer-size-in-bytes":1,"blob-metadata":[]},{"snapshot-id":2,"statistics-path":"s3://bucket/stats-2.puffin","file-size-in-bytes":20,"file-footer-size-in-bytes":2,"blob-metadata":[]}]} `) snapshotID := int64(2) setUpdates := []statisticsUpdate{ @@ -74,6 +77,9 @@ func TestApplyStatisticsUpdatesUpsertAndRemove(t *testing.T) { if got := int64(stats[0]["snapshot-id"].(float64)); got != 1 { t.Fatalf("remaining snapshot-id = %d, want 1", got) } + if got := int64(stats[0]["file-size-in-bytes"].(float64)); got != 11 { + t.Fatalf("remaining file-size-in-bytes = %d, want 11", got) + } } var statisticsFileForTest = table.StatisticsFile{