Browse Source

iceberg: refine commit retry and statistics patching

pull/8277/head
Chris Lu 1 day ago
parent
commit
fb3dc6a695
  1. 47
      weed/s3api/iceberg/iceberg.go
  2. 8
      weed/s3api/iceberg/iceberg_commit_updates_test.go

47
weed/s3api/iceberg/iceberg.go

@ -292,6 +292,8 @@ type statisticsUpdate struct {
remove *int64
}
var ErrIncompleteSetStatistics = errors.New("set-statistics requires snapshot-id, statistics-path, file-size-in-bytes, and file-footer-size-in-bytes")
type commitAction struct {
Action string `json:"action"`
}
@ -312,7 +314,7 @@ func (u *setStatisticsUpdate) asStatisticsFile() (*table.StatisticsFile, error)
return u.Statistics, nil
}
if u.SnapshotID == nil || u.StatisticsPath == "" || u.FileSizeInBytes == nil || u.FileFooterSizeInBytes == nil {
return nil, errors.New("set-statistics requires snapshot-id, statistics-path, file-size-in-bytes, and file-footer-size-in-bytes")
return nil, ErrIncompleteSetStatistics
}
stats := &table.StatisticsFile{
@ -400,27 +402,38 @@ func applyStatisticsUpdates(metadataBytes []byte, updates []statisticsUpdate) ([
}
}
statisticsBySnapshot := make(map[int64]table.StatisticsFile, len(statistics))
orderedSnapshotIDs := make([]int64, 0, len(statistics))
inOrder := make(map[int64]bool, len(statistics))
for _, stat := range statistics {
statisticsBySnapshot[stat.SnapshotID] = stat
if !inOrder[stat.SnapshotID] {
orderedSnapshotIDs = append(orderedSnapshotIDs, stat.SnapshotID)
inOrder[stat.SnapshotID] = true
}
}
for _, update := range updates {
if update.set != nil {
next := make([]table.StatisticsFile, 0, len(statistics)+1)
for _, existing := range statistics {
if existing.SnapshotID != update.set.SnapshotID {
next = append(next, existing)
}
statisticsBySnapshot[update.set.SnapshotID] = *update.set
if !inOrder[update.set.SnapshotID] {
orderedSnapshotIDs = append(orderedSnapshotIDs, update.set.SnapshotID)
inOrder[update.set.SnapshotID] = true
}
next = append(next, *update.set)
statistics = next
continue
}
if update.remove != nil {
next := make([]table.StatisticsFile, 0, len(statistics))
for _, existing := range statistics {
if existing.SnapshotID != *update.remove {
next = append(next, existing)
}
}
statistics = next
delete(statisticsBySnapshot, *update.remove)
}
}
statistics = make([]table.StatisticsFile, 0, len(statisticsBySnapshot))
for _, snapshotID := range orderedSnapshotIDs {
stat, ok := statisticsBySnapshot[snapshotID]
if !ok {
continue
}
statistics = append(statistics, stat)
}
if len(statistics) == 0 {
@ -1249,6 +1262,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
}
maxCommitAttempts := 3
generatedLegacyUUID := uuid.New()
for attempt := 1; attempt <= maxCommitAttempts; attempt++ {
getReq := &s3tables.GetTableRequest{
TableBucketARN: bucketARN,
@ -1282,7 +1296,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
}
}
if tableUUID == uuid.Nil {
tableUUID = uuid.New()
tableUUID = generatedLegacyUUID
}
var currentMetadata table.Metadata
@ -1390,6 +1404,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
}
if attempt < maxCommitAttempts {
glog.V(1).Infof("Iceberg: CommitTable conflict for %s (attempt %d/%d), retrying", tableName, attempt, maxCommitAttempts)
time.Sleep(time.Duration(50*attempt) * time.Millisecond)
continue
}
writeError(w, http.StatusConflict, "CommitFailedException", "Version token mismatch")

8
weed/s3api/iceberg/iceberg_commit_updates_test.go

@ -39,10 +39,13 @@ func TestParseCommitUpdatesRejectsIncompleteSetStatistics(t *testing.T) {
if err == nil {
t.Fatalf("parseCommitUpdates() expected error")
}
if !errors.Is(err, ErrIncompleteSetStatistics) {
t.Fatalf("parseCommitUpdates() error = %v, want ErrIncompleteSetStatistics", err)
}
}
func TestApplyStatisticsUpdatesUpsertAndRemove(t *testing.T) {
metadata := []byte(`{"format-version":2,"statistics":[{"snapshot-id":1,"statistics-path":"s3://bucket/stats-1.puffin","file-size-in-bytes":10,"file-footer-size-in-bytes":1,"blob-metadata":[]}]} `)
metadata := []byte(`{"format-version":2,"statistics":[{"snapshot-id":1,"statistics-path":"s3://bucket/stats-1.puffin","file-size-in-bytes":10,"file-footer-size-in-bytes":1,"blob-metadata":[]},{"snapshot-id":2,"statistics-path":"s3://bucket/stats-2.puffin","file-size-in-bytes":20,"file-footer-size-in-bytes":2,"blob-metadata":[]}]} `)
snapshotID := int64(2)
setUpdates := []statisticsUpdate{
@ -74,6 +77,9 @@ func TestApplyStatisticsUpdatesUpsertAndRemove(t *testing.T) {
if got := int64(stats[0]["snapshot-id"].(float64)); got != 1 {
t.Fatalf("remaining snapshot-id = %d, want 1", got)
}
if got := int64(stats[0]["file-size-in-bytes"].(float64)); got != 11 {
t.Fatalf("remaining file-size-in-bytes = %d, want 11", got)
}
}
var statisticsFileForTest = table.StatisticsFile{

Loading…
Cancel
Save