From 6c822192400c678ebc96e3a730d346bc2be8e4a0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 9 Feb 2026 23:09:12 -0800 Subject: [PATCH] iceberg: implement stage-create and create-on-commit finalize --- weed/s3api/iceberg/iceberg.go | 322 +++++++++++++++++- .../iceberg_stage_create_helpers_test.go | 37 ++ 2 files changed, 345 insertions(+), 14 deletions(-) create mode 100644 weed/s3api/iceberg/iceberg_stage_create_helpers_test.go diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index b96e12be5..f673fe3b8 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -288,6 +288,34 @@ func (s *Server) deleteMetadataFile(ctx context.Context, bucketName, tablePath, }) } +func (s *Server) loadMetadataFile(ctx context.Context, bucketName, tablePath, metadataFileName string) ([]byte, error) { + opCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + metadataDir := path.Join(s3tables.TablesPath, bucketName) + if tablePath != "" { + metadataDir = path.Join(metadataDir, tablePath) + } + metadataDir = path.Join(metadataDir, "metadata") + + var content []byte + err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := filer_pb.LookupEntry(opCtx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: metadataDir, + Name: metadataFileName, + }) + if err != nil { + return err + } + content = append([]byte(nil), resp.Entry.Content...) + return nil + }) + if err != nil { + return nil, err + } + return content, nil +} + type statisticsUpdate struct { set *table.StatisticsFile remove *int64 @@ -464,6 +492,52 @@ func isS3TablesConflict(err error) bool { return errors.As(err, &tableErr) && tableErr.Type == s3tables.ErrCodeConflict } +func isS3TablesNotFound(err error) bool { + if err == nil { + return false + } + if strings.Contains(strings.ToLower(err.Error()), "not found") { + return true + } + var tableErr *s3tables.S3TablesError + return errors.As(err, &tableErr) && + (tableErr.Type == s3tables.ErrCodeNoSuchTable || tableErr.Type == s3tables.ErrCodeNoSuchNamespace || strings.Contains(strings.ToLower(tableErr.Message), "not found")) +} + +func hasAssertCreateRequirement(requirements table.Requirements) bool { + for _, requirement := range requirements { + if requirement.GetType() == requirementAssertCreate { + return true + } + } + return false +} + +func isS3TablesAlreadyExists(err error) bool { + if err == nil { + return false + } + if strings.Contains(strings.ToLower(err.Error()), "already exists") { + return true + } + var tableErr *s3tables.S3TablesError + return errors.As(err, &tableErr) && + (tableErr.Type == s3tables.ErrCodeTableAlreadyExists || tableErr.Type == s3tables.ErrCodeNamespaceAlreadyExists || strings.Contains(strings.ToLower(tableErr.Message), "already exists")) +} + +func parseMetadataVersionFromLocation(metadataLocation string) int { + base := path.Base(metadataLocation) + if !strings.HasPrefix(base, "v") || !strings.HasSuffix(base, ".metadata.json") { + return 0 + } + rawVersion := strings.TrimPrefix(strings.TrimSuffix(base, ".metadata.json"), "v") + version, err := strconv.Atoi(rawVersion) + if err != nil || version <= 0 { + return 0 + } + return version +} + // parseNamespace parses the namespace from path parameter. // Iceberg uses unit separator (0x1F) for multi-level namespaces. // Note: mux already decodes URL-encoded path parameters, so we only split by unit separator. @@ -562,8 +636,9 @@ func buildTableBucketARN(bucketName string) string { } const ( - defaultListPageSize = 1000 - maxListPageSize = 1000 + defaultListPageSize = 1000 + maxListPageSize = 1000 + requirementAssertCreate = "assert-create" ) func getPaginationQueryParam(r *http.Request, primary, fallback string) string { @@ -965,17 +1040,25 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { tableName := req.Name metadataFileName := "v1.metadata.json" // Initial version is always 1 metadataLocation := fmt.Sprintf("%s/metadata/%s", location, metadataFileName) - if !req.StageCreate { - // Save metadata file to filer for immediate table creation. - metadataBucket, metadataPath, err := parseS3Location(location) - if err != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error()) - return - } - if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error()) - return + metadataBucket, metadataPath, err := parseS3Location(location) + if err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error()) + return + } + if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error()) + return + } + + // Stage-create writes the initial metadata file but does not register table state in S3Tables. + if req.StageCreate { + result := LoadTableResult{ + MetadataLocation: metadataLocation, + Metadata: metadata, + Config: make(iceberg.Properties), } + writeJSON(w, http.StatusOK, result) + return } // Use S3 Tables manager to create table @@ -1267,6 +1350,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { maxCommitAttempts := 3 generatedLegacyUUID := uuid.New() + canCreateOnCommit := hasAssertCreateRequirement(req.Requirements) for attempt := 1; attempt <= maxCommitAttempts; attempt++ { getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, @@ -1280,8 +1364,218 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) }) if err != nil { - if strings.Contains(err.Error(), "not found") { - writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName)) + if isS3TablesNotFound(err) { + if !canCreateOnCommit { + writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName)) + return + } + + for _, requirement := range req.Requirements { + if requirementErr := requirement.Validate(nil); requirementErr != nil { + writeError(w, http.StatusConflict, "CommitFailedException", "Requirement failed: "+requirementErr.Error()) + return + } + } + + location := fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), tableName) + tableUUID := generatedLegacyUUID + baseMetadataVersion := 0 + baseMetadataLocation := "" + + metadataBucket, metadataPath, parseLocationErr := parseS3Location(location) + if parseLocationErr != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+parseLocationErr.Error()) + return + } + + stagedFileName := "v1.metadata.json" + stagedMetadataBytes, loadErr := s.loadMetadataFile(r.Context(), metadataBucket, metadataPath, stagedFileName) + if loadErr == nil && len(stagedMetadataBytes) > 0 { + stagedMetadata, parseErr := table.ParseMetadataBytes(stagedMetadataBytes) + if parseErr != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse staged metadata: "+parseErr.Error()) + return + } + baseMetadataLocation = fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), stagedFileName) + baseMetadataVersion = parseMetadataVersionFromLocation(baseMetadataLocation) + if stagedMetadata.TableUUID() != uuid.Nil { + tableUUID = stagedMetadata.TableUUID() + } + + builder, builderErr := table.MetadataBuilderFromBase(stagedMetadata, baseMetadataLocation) + if builderErr != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+builderErr.Error()) + return + } + for _, update := range req.Updates { + if applyErr := update.Apply(builder); applyErr != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+applyErr.Error()) + return + } + } + + newMetadata, buildErr := builder.Build() + if buildErr != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+buildErr.Error()) + return + } + + metadataVersion := baseMetadataVersion + 1 + if metadataVersion <= 0 { + metadataVersion = 1 + } + metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion) + newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName) + + metadataBytes, marshalErr := json.Marshal(newMetadata) + if marshalErr != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+marshalErr.Error()) + return + } + metadataBytes, marshalErr = applyStatisticsUpdates(metadataBytes, statisticsUpdates) + if marshalErr != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+marshalErr.Error()) + return + } + newMetadata, marshalErr = table.ParseMetadataBytes(metadataBytes) + if marshalErr != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+marshalErr.Error()) + return + } + + if saveErr := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); saveErr != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+saveErr.Error()) + return + } + + createReq := &s3tables.CreateTableRequest{ + TableBucketARN: bucketARN, + Namespace: namespace, + Name: tableName, + Format: "ICEBERG", + Metadata: &s3tables.TableMetadata{ + Iceberg: &s3tables.IcebergMetadata{ + TableUUID: tableUUID.String(), + }, + FullMetadata: metadataBytes, + }, + MetadataVersion: metadataVersion, + MetadataLocation: newMetadataLocation, + } + + createErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + mgrClient := s3tables.NewManagerClient(client) + return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, nil, identityName) + }) + if createErr != nil { + if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil { + glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after create-on-commit failure: %v", newMetadataLocation, cleanupErr) + } + if isS3TablesConflict(createErr) || isS3TablesAlreadyExists(createErr) { + writeError(w, http.StatusConflict, "CommitFailedException", "Table was created concurrently") + return + } + glog.Errorf("Iceberg: CommitTable CreateTable error: %v", createErr) + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table creation: "+createErr.Error()) + return + } + + result := CommitTableResponse{ + MetadataLocation: newMetadataLocation, + Metadata: newMetadata, + } + writeJSON(w, http.StatusOK, result) + return + } + if loadErr != nil && !errors.Is(loadErr, filer_pb.ErrNotFound) { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to load staged metadata: "+loadErr.Error()) + return + } + + currentMetadata := newTableMetadata(tableUUID, location, nil, nil, nil, nil) + if currentMetadata == nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata") + return + } + + builder, builderErr := table.MetadataBuilderFromBase(currentMetadata, baseMetadataLocation) + if builderErr != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+builderErr.Error()) + return + } + for _, update := range req.Updates { + if applyErr := update.Apply(builder); applyErr != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+applyErr.Error()) + return + } + } + + newMetadata, buildErr := builder.Build() + if buildErr != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+buildErr.Error()) + return + } + metadataVersion := 1 + metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion) + newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName) + + metadataBytes, marshalErr := json.Marshal(newMetadata) + if marshalErr != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+marshalErr.Error()) + return + } + metadataBytes, marshalErr = applyStatisticsUpdates(metadataBytes, statisticsUpdates) + if marshalErr != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+marshalErr.Error()) + return + } + newMetadata, marshalErr = table.ParseMetadataBytes(metadataBytes) + if marshalErr != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+marshalErr.Error()) + return + } + + if saveErr := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); saveErr != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+saveErr.Error()) + return + } + + createReq := &s3tables.CreateTableRequest{ + TableBucketARN: bucketARN, + Namespace: namespace, + Name: tableName, + Format: "ICEBERG", + Metadata: &s3tables.TableMetadata{ + Iceberg: &s3tables.IcebergMetadata{ + TableUUID: tableUUID.String(), + }, + FullMetadata: metadataBytes, + }, + MetadataVersion: metadataVersion, + MetadataLocation: newMetadataLocation, + } + createErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + mgrClient := s3tables.NewManagerClient(client) + return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, nil, identityName) + }) + if createErr != nil { + if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil { + glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after create-on-commit failure: %v", newMetadataLocation, cleanupErr) + } + if isS3TablesConflict(createErr) || isS3TablesAlreadyExists(createErr) { + writeError(w, http.StatusConflict, "CommitFailedException", "Table was created concurrently") + return + } + glog.Errorf("Iceberg: CommitTable CreateTable error: %v", createErr) + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table creation: "+createErr.Error()) + return + } + + result := CommitTableResponse{ + MetadataLocation: newMetadataLocation, + Metadata: newMetadata, + } + writeJSON(w, http.StatusOK, result) return } glog.V(1).Infof("Iceberg: CommitTable GetTable error: %v", err) diff --git a/weed/s3api/iceberg/iceberg_stage_create_helpers_test.go b/weed/s3api/iceberg/iceberg_stage_create_helpers_test.go new file mode 100644 index 000000000..2fae43f60 --- /dev/null +++ b/weed/s3api/iceberg/iceberg_stage_create_helpers_test.go @@ -0,0 +1,37 @@ +package iceberg + +import ( + "testing" + + "github.com/apache/iceberg-go/table" +) + +func TestHasAssertCreateRequirement(t *testing.T) { + requirements := table.Requirements{table.AssertCreate()} + if !hasAssertCreateRequirement(requirements) { + t.Fatalf("hasAssertCreateRequirement() = false, want true") + } + + requirements = table.Requirements{table.AssertDefaultSortOrderID(0)} + if hasAssertCreateRequirement(requirements) { + t.Fatalf("hasAssertCreateRequirement() = true, want false") + } +} + +func TestParseMetadataVersionFromLocation(t *testing.T) { + testCases := []struct { + location string + version int + }{ + {location: "s3://b/ns/t/metadata/v1.metadata.json", version: 1}, + {location: "s3://b/ns/t/metadata/v25.metadata.json", version: 25}, + {location: "s3://b/ns/t/metadata/current.json", version: 0}, + {location: "", version: 0}, + } + + for _, tc := range testCases { + if got := parseMetadataVersionFromLocation(tc.location); got != tc.version { + t.Fatalf("parseMetadataVersionFromLocation(%q) = %d, want %d", tc.location, got, tc.version) + } + } +}