diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index 65b0e39f7..10c0f4b14 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -6,6 +6,7 @@ package iceberg import ( "context" "encoding/json" + "errors" "fmt" "net/http" "os" @@ -272,6 +273,180 @@ func (s *Server) saveMetadataFile(ctx context.Context, bucketName, tablePath, me }) } +func (s *Server) deleteMetadataFile(ctx context.Context, bucketName, tablePath, metadataFileName string) error { + opCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + metadataDir := path.Join(s3tables.TablesPath, bucketName) + if tablePath != "" { + metadataDir = path.Join(metadataDir, tablePath) + } + metadataDir = path.Join(metadataDir, "metadata") + return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer_pb.DoRemove(opCtx, client, metadataDir, metadataFileName, true, false, true, false, nil) + }) +} + +type statisticsUpdate struct { + set *table.StatisticsFile + remove *int64 +} + +type commitAction struct { + Action string `json:"action"` +} + +type setStatisticsUpdate struct { + Action string `json:"action"` + SnapshotID *int64 `json:"snapshot-id,omitempty"` + StatisticsPath string `json:"statistics-path,omitempty"` + FileSizeInBytes *int64 `json:"file-size-in-bytes,omitempty"` + FileFooterSizeInBytes *int64 `json:"file-footer-size-in-bytes,omitempty"` + KeyMetadata *string `json:"key-metadata,omitempty"` + BlobMetadata []table.BlobMetadata `json:"blob-metadata,omitempty"` + Statistics *table.StatisticsFile `json:"statistics,omitempty"` +} + +func (u *setStatisticsUpdate) asStatisticsFile() (*table.StatisticsFile, error) { + if u.Statistics != nil { + return u.Statistics, nil + } + if u.SnapshotID == nil || u.StatisticsPath == "" || u.FileSizeInBytes == nil || u.FileFooterSizeInBytes == nil { + return nil, errors.New("set-statistics requires snapshot-id, statistics-path, file-size-in-bytes, and file-footer-size-in-bytes") + } + + stats := &table.StatisticsFile{ + SnapshotID: *u.SnapshotID, + StatisticsPath: u.StatisticsPath, + FileSizeInBytes: *u.FileSizeInBytes, + FileFooterSizeInBytes: *u.FileFooterSizeInBytes, + KeyMetadata: u.KeyMetadata, + BlobMetadata: u.BlobMetadata, + } + if stats.BlobMetadata == nil { + stats.BlobMetadata = []table.BlobMetadata{} + } + return stats, nil +} + +type removeStatisticsUpdate struct { + Action string `json:"action"` + SnapshotID int64 `json:"snapshot-id"` +} + +func parseCommitUpdates(rawUpdates []json.RawMessage) (table.Updates, []statisticsUpdate, error) { + filtered := make([]json.RawMessage, 0, len(rawUpdates)) + statisticsUpdates := make([]statisticsUpdate, 0) + + for _, raw := range rawUpdates { + var action commitAction + if err := json.Unmarshal(raw, &action); err != nil { + return nil, nil, err + } + + switch action.Action { + case "set-statistics": + var setUpdate setStatisticsUpdate + if err := json.Unmarshal(raw, &setUpdate); err != nil { + return nil, nil, err + } + stats, err := setUpdate.asStatisticsFile() + if err != nil { + return nil, nil, err + } + statisticsUpdates = append(statisticsUpdates, statisticsUpdate{set: stats}) + case "remove-statistics": + var removeUpdate removeStatisticsUpdate + if err := json.Unmarshal(raw, &removeUpdate); err != nil { + return nil, nil, err + } + snapshotID := removeUpdate.SnapshotID + statisticsUpdates = append(statisticsUpdates, statisticsUpdate{remove: &snapshotID}) + default: + filtered = append(filtered, raw) + } + } + + if len(filtered) == 0 { + return nil, statisticsUpdates, nil + } + + data, err := json.Marshal(filtered) + if err != nil { + return nil, nil, err + } + var updates table.Updates + if err := json.Unmarshal(data, &updates); err != nil { + return nil, nil, err + } + + return updates, statisticsUpdates, nil +} + +func applyStatisticsUpdates(metadataBytes []byte, updates []statisticsUpdate) ([]byte, error) { + if len(updates) == 0 { + return metadataBytes, nil + } + + var metadata map[string]json.RawMessage + if err := json.Unmarshal(metadataBytes, &metadata); err != nil { + return nil, err + } + + var statistics []table.StatisticsFile + if rawStatistics, ok := metadata["statistics"]; ok && len(rawStatistics) > 0 { + if err := json.Unmarshal(rawStatistics, &statistics); err != nil { + return nil, err + } + } + + for _, update := range updates { + if update.set != nil { + next := make([]table.StatisticsFile, 0, len(statistics)+1) + for _, existing := range statistics { + if existing.SnapshotID != update.set.SnapshotID { + next = append(next, existing) + } + } + next = append(next, *update.set) + statistics = next + continue + } + if update.remove != nil { + next := make([]table.StatisticsFile, 0, len(statistics)) + for _, existing := range statistics { + if existing.SnapshotID != *update.remove { + next = append(next, existing) + } + } + statistics = next + } + } + + if len(statistics) == 0 { + delete(metadata, "statistics") + } else { + data, err := json.Marshal(statistics) + if err != nil { + return nil, err + } + metadata["statistics"] = data + } + + return json.Marshal(metadata) +} + +func isS3TablesConflict(err error) bool { + if err == nil { + return false + } + if errors.Is(err, s3tables.ErrVersionTokenMismatch) { + return true + } + var tableErr *s3tables.S3TablesError + return errors.As(err, &tableErr) && tableErr.Type == s3tables.ErrCodeConflict +} + // parseNamespace parses the namespace from path parameter. // Iceberg uses unit separator (0x1F) for multi-level namespaces. // Note: mux already decodes URL-encoded path parameters, so we only split by unit separator. @@ -1043,7 +1218,8 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { // Extract identity from context identityName := s3_constants.GetIdentityNameFromContext(r) - // Parse the commit request, skipping update actions not supported by iceberg-go. + // Parse commit request and keep statistics updates separate because iceberg-go v0.4.0 + // does not decode set/remove-statistics update actions yet. var raw struct { Identifier *TableIdentifier `json:"identifier,omitempty"` Requirements json.RawMessage `json:"requirements"` @@ -1056,6 +1232,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { var req CommitTableRequest req.Identifier = raw.Identifier + var statisticsUpdates []statisticsUpdate if len(raw.Requirements) > 0 { if err := json.Unmarshal(raw.Requirements, &req.Requirements); err != nil { writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid requirements: "+err.Error()) @@ -1063,178 +1240,166 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { } } if len(raw.Updates) > 0 { - filtered := make([]json.RawMessage, 0, len(raw.Updates)) - for _, update := range raw.Updates { - var action struct { - Action string `json:"action"` - } - if err := json.Unmarshal(update, &action); err != nil { - writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid update: "+err.Error()) + var err error + req.Updates, statisticsUpdates, err = parseCommitUpdates(raw.Updates) + if err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid updates: "+err.Error()) + return + } + } + + maxCommitAttempts := 3 + for attempt := 1; attempt <= maxCommitAttempts; attempt++ { + getReq := &s3tables.GetTableRequest{ + TableBucketARN: bucketARN, + Namespace: namespace, + Name: tableName, + } + var getResp s3tables.GetTableResponse + + err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + mgrClient := s3tables.NewManagerClient(client) + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) + }) + if err != nil { + if strings.Contains(err.Error(), "not found") { + writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName)) return } - if action.Action == "set-statistics" { - continue + glog.V(1).Infof("Iceberg: CommitTable GetTable error: %v", err) + writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error()) + return + } + + location := tableLocationFromMetadataLocation(getResp.MetadataLocation) + if location == "" { + location = fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), tableName) + } + tableUUID := uuid.Nil + if getResp.Metadata != nil && getResp.Metadata.Iceberg != nil && getResp.Metadata.Iceberg.TableUUID != "" { + if parsed, parseErr := uuid.Parse(getResp.Metadata.Iceberg.TableUUID); parseErr == nil { + tableUUID = parsed } - filtered = append(filtered, update) } - if len(filtered) > 0 { - updatesBytes, err := json.Marshal(filtered) + if tableUUID == uuid.Nil { + tableUUID = uuid.New() + } + + var currentMetadata table.Metadata + if getResp.Metadata != nil && len(getResp.Metadata.FullMetadata) > 0 { + currentMetadata, err = table.ParseMetadataBytes(getResp.Metadata.FullMetadata) if err != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse updates: "+err.Error()) + glog.Errorf("Iceberg: Failed to parse current metadata for %s: %v", tableName, err) + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse current metadata") return } - if err := json.Unmarshal(updatesBytes, &req.Updates); err != nil { - writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid updates: "+err.Error()) + } else { + currentMetadata = newTableMetadata(tableUUID, location, nil, nil, nil, nil) + } + if currentMetadata == nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata") + return + } + + for _, requirement := range req.Requirements { + if err := requirement.Validate(currentMetadata); err != nil { + writeError(w, http.StatusConflict, "CommitFailedException", "Requirement failed: "+err.Error()) return } } - } - - // First, load current table metadata - getReq := &s3tables.GetTableRequest{ - TableBucketARN: bucketARN, - Namespace: namespace, - Name: tableName, - } - var getResp s3tables.GetTableResponse - err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) - }) - - if err != nil { - if strings.Contains(err.Error(), "not found") { - writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName)) + builder, err := table.MetadataBuilderFromBase(currentMetadata, getResp.MetadataLocation) + if err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+err.Error()) return } - glog.V(1).Infof("Iceberg: CommitTable GetTable error: %v", err) - writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error()) - return - } - - // Build the current metadata - location := tableLocationFromMetadataLocation(getResp.MetadataLocation) - if location == "" { - location = fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), tableName) - } - tableUUID := uuid.Nil - if getResp.Metadata != nil && getResp.Metadata.Iceberg != nil && getResp.Metadata.Iceberg.TableUUID != "" { - if parsed, err := uuid.Parse(getResp.Metadata.Iceberg.TableUUID); err == nil { - tableUUID = parsed + for _, update := range req.Updates { + if err := update.Apply(builder); err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+err.Error()) + return + } } - } - if tableUUID == uuid.Nil { - tableUUID = uuid.New() - } - var currentMetadata table.Metadata - if getResp.Metadata != nil && len(getResp.Metadata.FullMetadata) > 0 { - var err error - currentMetadata, err = table.ParseMetadataBytes(getResp.Metadata.FullMetadata) + newMetadata, err := builder.Build() if err != nil { - glog.Errorf("Iceberg: Failed to parse current metadata for %s: %v", tableName, err) - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse current metadata") + writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+err.Error()) return } - } else { - // Fallback for tables without persisted full metadata (legacy or error state) - currentMetadata = newTableMetadata(tableUUID, location, nil, nil, nil, nil) - } - if currentMetadata == nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata") - return - } + metadataVersion := getResp.MetadataVersion + 1 + metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion) + newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName) - // Validate all requirements against current metadata - for _, requirement := range req.Requirements { - if err := requirement.Validate(currentMetadata); err != nil { - writeError(w, http.StatusConflict, "CommitFailedException", "Requirement failed: "+err.Error()) + metadataBytes, err := json.Marshal(newMetadata) + if err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+err.Error()) return } - } - - // Apply updates using MetadataBuilder - builder, err := table.MetadataBuilderFromBase(currentMetadata, getResp.MetadataLocation) - if err != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+err.Error()) - return - } - - for _, update := range req.Updates { - if err := update.Apply(builder); err != nil { - writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+err.Error()) + metadataBytes, err = applyStatisticsUpdates(metadataBytes, statisticsUpdates) + if err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+err.Error()) + return + } + newMetadata, err = table.ParseMetadataBytes(metadataBytes) + if err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+err.Error()) return } - } - - // Build the new metadata - newMetadata, err := builder.Build() - if err != nil { - writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+err.Error()) - return - } - - // Determine next metadata version - metadataVersion := getResp.MetadataVersion + 1 - metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion) - newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName) - - // Serialize metadata to JSON - metadataBytes, err := json.Marshal(newMetadata) - if err != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+err.Error()) - return - } - // 1. Save metadata file to filer - metadataBucket, metadataPath, err := parseS3Location(location) - if err != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error()) - return - } - if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error()) - return - } + metadataBucket, metadataPath, err := parseS3Location(location) + if err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error()) + return + } + if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error()) + return + } - // Persist the new metadata and update the table reference - updateReq := &s3tables.UpdateTableRequest{ - TableBucketARN: bucketARN, - Namespace: namespace, - Name: tableName, - VersionToken: getResp.VersionToken, - Metadata: &s3tables.TableMetadata{ - Iceberg: &s3tables.IcebergMetadata{ - TableUUID: tableUUID.String(), + updateReq := &s3tables.UpdateTableRequest{ + TableBucketARN: bucketARN, + Namespace: namespace, + Name: tableName, + VersionToken: getResp.VersionToken, + Metadata: &s3tables.TableMetadata{ + Iceberg: &s3tables.IcebergMetadata{ + TableUUID: tableUUID.String(), + }, + FullMetadata: metadataBytes, }, - FullMetadata: metadataBytes, - }, - MetadataVersion: metadataVersion, - MetadataLocation: newMetadataLocation, - } + MetadataVersion: metadataVersion, + MetadataLocation: newMetadataLocation, + } - err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - mgrClient := s3tables.NewManagerClient(client) - // 1. Write metadata file (this would normally be an S3 PutObject, - // but s3tables manager handles the metadata storage logic) - // For now, we assume s3tables.UpdateTable handles the reference update. - return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, identityName) - }) + err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + mgrClient := s3tables.NewManagerClient(client) + return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, identityName) + }) + if err == nil { + result := CommitTableResponse{ + MetadataLocation: newMetadataLocation, + Metadata: newMetadata, + } + writeJSON(w, http.StatusOK, result) + return + } + + if isS3TablesConflict(err) { + if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil { + glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s on conflict: %v", newMetadataLocation, cleanupErr) + } + if attempt < maxCommitAttempts { + glog.V(1).Infof("Iceberg: CommitTable conflict for %s (attempt %d/%d), retrying", tableName, attempt, maxCommitAttempts) + continue + } + writeError(w, http.StatusConflict, "CommitFailedException", "Version token mismatch") + return + } - if err != nil { glog.Errorf("Iceberg: CommitTable UpdateTable error: %v", err) writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table update: "+err.Error()) return } - - // Return the new metadata - result := CommitTableResponse{ - MetadataLocation: newMetadataLocation, - Metadata: newMetadata, - } - writeJSON(w, http.StatusOK, result) } // newTableMetadata creates a new table.Metadata object with the given parameters. diff --git a/weed/s3api/iceberg/iceberg_commit_updates_test.go b/weed/s3api/iceberg/iceberg_commit_updates_test.go new file mode 100644 index 000000000..e583d4216 --- /dev/null +++ b/weed/s3api/iceberg/iceberg_commit_updates_test.go @@ -0,0 +1,97 @@ +package iceberg + +import ( + "encoding/json" + "errors" + "testing" + + "github.com/apache/iceberg-go/table" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" +) + +func TestParseCommitUpdatesSeparatesStatistics(t *testing.T) { + raw := []json.RawMessage{ + json.RawMessage(`{"action":"set-statistics","snapshot-id":10,"statistics-path":"s3://bucket/table/metadata/stats.puffin","file-size-in-bytes":100,"file-footer-size-in-bytes":20,"blob-metadata":[]}`), + json.RawMessage(`{"action":"set-properties","updates":{"k":"v"}}`), + } + + updates, stats, err := parseCommitUpdates(raw) + if err != nil { + t.Fatalf("parseCommitUpdates() error = %v", err) + } + if len(stats) != 1 { + t.Fatalf("statistics updates = %d, want 1", len(stats)) + } + if stats[0].set == nil || stats[0].set.SnapshotID != 10 { + t.Fatalf("unexpected statistics update: %#v", stats[0]) + } + if len(updates) != 1 { + t.Fatalf("decoded updates = %d, want 1", len(updates)) + } +} + +func TestParseCommitUpdatesRejectsIncompleteSetStatistics(t *testing.T) { + raw := []json.RawMessage{ + json.RawMessage(`{"action":"set-statistics","snapshot-id":10}`), + } + + _, _, err := parseCommitUpdates(raw) + if err == nil { + t.Fatalf("parseCommitUpdates() expected error") + } +} + +func TestApplyStatisticsUpdatesUpsertAndRemove(t *testing.T) { + metadata := []byte(`{"format-version":2,"statistics":[{"snapshot-id":1,"statistics-path":"s3://bucket/stats-1.puffin","file-size-in-bytes":10,"file-footer-size-in-bytes":1,"blob-metadata":[]}]} `) + + snapshotID := int64(2) + setUpdates := []statisticsUpdate{ + { + set: &statisticsFileForTest, + }, + { + remove: &snapshotID, + }, + } + + updated, err := applyStatisticsUpdates(metadata, setUpdates) + if err != nil { + t.Fatalf("applyStatisticsUpdates() error = %v", err) + } + + var decoded map[string]json.RawMessage + if err := json.Unmarshal(updated, &decoded); err != nil { + t.Fatalf("json.Unmarshal(updated) error = %v", err) + } + + var stats []map[string]any + if err := json.Unmarshal(decoded["statistics"], &stats); err != nil { + t.Fatalf("json.Unmarshal(statistics) error = %v", err) + } + if len(stats) != 1 { + t.Fatalf("statistics length = %d, want 1", len(stats)) + } + if got := int64(stats[0]["snapshot-id"].(float64)); got != 1 { + t.Fatalf("remaining snapshot-id = %d, want 1", got) + } +} + +var statisticsFileForTest = table.StatisticsFile{ + SnapshotID: 1, + StatisticsPath: "s3://bucket/stats-1.puffin", + FileSizeInBytes: 11, + FileFooterSizeInBytes: 2, + BlobMetadata: []table.BlobMetadata{}, +} + +func TestIsS3TablesConflict(t *testing.T) { + if !isS3TablesConflict(s3tables.ErrVersionTokenMismatch) { + t.Fatalf("expected ErrVersionTokenMismatch to be conflict") + } + if !isS3TablesConflict(&s3tables.S3TablesError{Type: s3tables.ErrCodeConflict, Message: "Version token mismatch"}) { + t.Fatalf("expected S3Tables conflict error to be conflict") + } + if isS3TablesConflict(errors.New("other")) { + t.Fatalf("unexpected conflict for non-conflict error") + } +}