|
|
|
@ -8,6 +8,7 @@ import ( |
|
|
|
"encoding/json" |
|
|
|
"errors" |
|
|
|
"fmt" |
|
|
|
"math/rand/v2" |
|
|
|
"net/http" |
|
|
|
"os" |
|
|
|
"path" |
|
|
|
@ -311,6 +312,9 @@ type setStatisticsUpdate struct { |
|
|
|
|
|
|
|
func (u *setStatisticsUpdate) asStatisticsFile() (*table.StatisticsFile, error) { |
|
|
|
if u.Statistics != nil { |
|
|
|
if u.Statistics.BlobMetadata == nil { |
|
|
|
u.Statistics.BlobMetadata = []table.BlobMetadata{} |
|
|
|
} |
|
|
|
return u.Statistics, nil |
|
|
|
} |
|
|
|
if u.SnapshotID == nil || u.StatisticsPath == "" || u.FileSizeInBytes == nil || u.FileFooterSizeInBytes == nil { |
|
|
|
@ -1349,6 +1353,8 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { |
|
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+err.Error()) |
|
|
|
return |
|
|
|
} |
|
|
|
// iceberg-go does not currently support set/remove-statistics updates in MetadataBuilder.
|
|
|
|
// Patch the encoded metadata JSON and parse it back to keep the response object consistent.
|
|
|
|
metadataBytes, err = applyStatisticsUpdates(metadataBytes, statisticsUpdates) |
|
|
|
if err != nil { |
|
|
|
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+err.Error()) |
|
|
|
@ -1404,13 +1410,17 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { |
|
|
|
} |
|
|
|
if attempt < maxCommitAttempts { |
|
|
|
glog.V(1).Infof("Iceberg: CommitTable conflict for %s (attempt %d/%d), retrying", tableName, attempt, maxCommitAttempts) |
|
|
|
time.Sleep(time.Duration(50*attempt) * time.Millisecond) |
|
|
|
jitter := time.Duration(rand.Int64N(int64(25 * time.Millisecond))) |
|
|
|
time.Sleep(time.Duration(50*attempt)*time.Millisecond + jitter) |
|
|
|
continue |
|
|
|
} |
|
|
|
writeError(w, http.StatusConflict, "CommitFailedException", "Version token mismatch") |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil { |
|
|
|
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after update failure: %v", newMetadataLocation, cleanupErr) |
|
|
|
} |
|
|
|
glog.Errorf("Iceberg: CommitTable UpdateTable error: %v", err) |
|
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table update: "+err.Error()) |
|
|
|
return |
|
|
|
|