diff --git a/test/s3tables/catalog/iceberg_catalog_test.go b/test/s3tables/catalog/iceberg_catalog_test.go index 025a8bac7..5210bed95 100644 --- a/test/s3tables/catalog/iceberg_catalog_test.go +++ b/test/s3tables/catalog/iceberg_catalog_test.go @@ -304,6 +304,15 @@ func TestStageCreateMissingNameReturnsBadRequest(t *testing.T) { env.StartSeaweedFS(t) createTableBucket(t, env, "warehouse") + status, _, err := doIcebergJSONRequest(env, http.MethodPost, "/v1/namespaces", map[string]any{ + "namespace": []string{"ns1"}, + }) + if err != nil { + t.Fatalf("Create namespace request failed: %v", err) + } + if status != http.StatusOK && status != http.StatusConflict { + t.Fatalf("Create namespace status = %d, want 200 or 409", status) + } reqBody := `{"stage-create": true}` url := fmt.Sprintf("%s/v1/namespaces/%s/tables", env.IcebergURL(), "ns1") @@ -324,13 +333,13 @@ func TestStageCreateMissingNameReturnsBadRequest(t *testing.T) { t.Fatalf("Expected status 400, got %d: %s", resp.StatusCode, body) } - var decoded map[string]map[string]any + var decoded map[string]any if err := json.NewDecoder(resp.Body).Decode(&decoded); err != nil { t.Fatalf("Failed to decode error response: %v", err) } - errorObj, ok := decoded["error"] + errorObj, ok := decoded["error"].(map[string]any) if !ok { - t.Fatalf("Response missing error object: %#v", decoded) + t.Fatalf("Response missing or invalid error object: %#v", decoded) } if got := errorObj["type"]; got != "BadRequestException" { t.Fatalf("error.type = %v, want BadRequestException", got) @@ -456,6 +465,7 @@ func TestCommitMissingTableWithoutAssertCreate(t *testing.T) { } } +// doIcebergJSONRequest decodes JSON object responses used by catalog tests. func doIcebergJSONRequest(env *TestEnvironment, method, path string, payload any) (int, map[string]any, error) { url := env.IcebergURL() + path diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index c10d08784..eab2b89fa 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -309,6 +309,9 @@ func (s *Server) loadMetadataFile(ctx context.Context, bucketName, tablePath, me if err != nil { return err } + if resp == nil || resp.Entry == nil { + return fmt.Errorf("lookup returned nil entry for %s/%s", metadataDir, metadataFileName) + } content = append([]byte(nil), resp.Entry.Content...) return nil }) @@ -502,6 +505,30 @@ type stageCreateMarker struct { ExpiresAt string `json:"expires_at"` } +type icebergRequestError struct { + status int + errType string + message string +} + +func (e *icebergRequestError) Error() string { + return e.message +} + +type createOnCommitInput struct { + bucketARN string + namespace []string + tableName string + identityName string + location string + tableUUID uuid.UUID + baseMetadata table.Metadata + baseMetadataLoc string + baseMetadataVer int + updates table.Updates + statisticsUpdates []statisticsUpdate +} + type commitAction struct { Action string `json:"action"` } @@ -717,6 +744,129 @@ func parseMetadataVersionFromLocation(metadataLocation string) int { return version } +func (s *Server) finalizeCreateOnCommit(ctx context.Context, input createOnCommitInput) (*CommitTableResponse, *icebergRequestError) { + builder, err := table.MetadataBuilderFromBase(input.baseMetadata, input.baseMetadataLoc) + if err != nil { + return nil, &icebergRequestError{ + status: http.StatusInternalServerError, + errType: "InternalServerError", + message: "Failed to create metadata builder: " + err.Error(), + } + } + for _, update := range input.updates { + if err := update.Apply(builder); err != nil { + return nil, &icebergRequestError{ + status: http.StatusBadRequest, + errType: "BadRequestException", + message: "Failed to apply update: " + err.Error(), + } + } + } + + newMetadata, err := builder.Build() + if err != nil { + return nil, &icebergRequestError{ + status: http.StatusBadRequest, + errType: "BadRequestException", + message: "Failed to build new metadata: " + err.Error(), + } + } + + metadataVersion := input.baseMetadataVer + 1 + if metadataVersion <= 0 { + metadataVersion = 1 + } + metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion) + newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(input.location, "/"), metadataFileName) + + metadataBytes, err := json.Marshal(newMetadata) + if err != nil { + return nil, &icebergRequestError{ + status: http.StatusInternalServerError, + errType: "InternalServerError", + message: "Failed to serialize metadata: " + err.Error(), + } + } + metadataBytes, err = applyStatisticsUpdates(metadataBytes, input.statisticsUpdates) + if err != nil { + return nil, &icebergRequestError{ + status: http.StatusBadRequest, + errType: "BadRequestException", + message: "Failed to apply statistics updates: " + err.Error(), + } + } + newMetadata, err = table.ParseMetadataBytes(metadataBytes) + if err != nil { + return nil, &icebergRequestError{ + status: http.StatusInternalServerError, + errType: "InternalServerError", + message: "Failed to parse committed metadata: " + err.Error(), + } + } + + metadataBucket, metadataPath, err := parseS3Location(input.location) + if err != nil { + return nil, &icebergRequestError{ + status: http.StatusInternalServerError, + errType: "InternalServerError", + message: "Invalid table location: " + err.Error(), + } + } + if err := s.saveMetadataFile(ctx, metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil { + return nil, &icebergRequestError{ + status: http.StatusInternalServerError, + errType: "InternalServerError", + message: "Failed to save metadata file: " + err.Error(), + } + } + + createReq := &s3tables.CreateTableRequest{ + TableBucketARN: input.bucketARN, + Namespace: input.namespace, + Name: input.tableName, + Format: "ICEBERG", + Metadata: &s3tables.TableMetadata{ + Iceberg: &s3tables.IcebergMetadata{ + TableUUID: input.tableUUID.String(), + }, + FullMetadata: metadataBytes, + }, + MetadataVersion: metadataVersion, + MetadataLocation: newMetadataLocation, + } + createErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + mgrClient := s3tables.NewManagerClient(client) + return s.tablesManager.Execute(ctx, mgrClient, "CreateTable", createReq, nil, input.identityName) + }) + if createErr != nil { + if cleanupErr := s.deleteMetadataFile(ctx, metadataBucket, metadataPath, metadataFileName); cleanupErr != nil { + glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after create-on-commit failure: %v", newMetadataLocation, cleanupErr) + } + if isS3TablesConflict(createErr) || isS3TablesAlreadyExists(createErr) { + return nil, &icebergRequestError{ + status: http.StatusConflict, + errType: "CommitFailedException", + message: "Table was created concurrently", + } + } + glog.Errorf("Iceberg: CommitTable CreateTable error: %v", createErr) + return nil, &icebergRequestError{ + status: http.StatusInternalServerError, + errType: "InternalServerError", + message: "Failed to commit table creation: " + createErr.Error(), + } + } + + if markerErr := s.deleteStageCreateMarkers(ctx, metadataBucket, input.namespace, input.tableName); markerErr != nil { + glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after finalize: %v", encodeNamespace(input.namespace), input.tableName, markerErr) + } + + return &CommitTableResponse{ + MetadataLocation: newMetadataLocation, + Metadata: newMetadata, + }, nil +} + func validateCreateTableRequest(req CreateTableRequest) error { if req.Name == "" { return errTableNameRequired @@ -1590,14 +1740,14 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { tableUUID := generatedLegacyUUID baseMetadataVersion := 0 baseMetadataLocation := "" + var baseMetadata table.Metadata + stagedFileName := "v1.metadata.json" metadataBucket, metadataPath, parseLocationErr := parseS3Location(location) if parseLocationErr != nil { writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+parseLocationErr.Error()) return } - - stagedFileName := "v1.metadata.json" stagedMetadataBytes, loadErr := s.loadMetadataFile(r.Context(), metadataBucket, metadataPath, stagedFileName) if loadErr == nil && len(stagedMetadataBytes) > 0 { stagedMetadata, parseErr := table.ParseMetadataBytes(stagedMetadataBytes) @@ -1605,191 +1755,43 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse staged metadata: "+parseErr.Error()) return } + baseMetadata = stagedMetadata baseMetadataLocation = fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), stagedFileName) baseMetadataVersion = parseMetadataVersionFromLocation(baseMetadataLocation) if stagedMetadata.TableUUID() != uuid.Nil { tableUUID = stagedMetadata.TableUUID() } - - builder, builderErr := table.MetadataBuilderFromBase(stagedMetadata, baseMetadataLocation) - if builderErr != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+builderErr.Error()) - return - } - for _, update := range req.Updates { - if applyErr := update.Apply(builder); applyErr != nil { - writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+applyErr.Error()) - return - } - } - - newMetadata, buildErr := builder.Build() - if buildErr != nil { - writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+buildErr.Error()) - return - } - - metadataVersion := baseMetadataVersion + 1 - if metadataVersion <= 0 { - metadataVersion = 1 - } - metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion) - newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName) - - metadataBytes, marshalErr := json.Marshal(newMetadata) - if marshalErr != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+marshalErr.Error()) - return - } - metadataBytes, marshalErr = applyStatisticsUpdates(metadataBytes, statisticsUpdates) - if marshalErr != nil { - writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+marshalErr.Error()) - return - } - newMetadata, marshalErr = table.ParseMetadataBytes(metadataBytes) - if marshalErr != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+marshalErr.Error()) - return - } - - if saveErr := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); saveErr != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+saveErr.Error()) - return - } - - createReq := &s3tables.CreateTableRequest{ - TableBucketARN: bucketARN, - Namespace: namespace, - Name: tableName, - Format: "ICEBERG", - Metadata: &s3tables.TableMetadata{ - Iceberg: &s3tables.IcebergMetadata{ - TableUUID: tableUUID.String(), - }, - FullMetadata: metadataBytes, - }, - MetadataVersion: metadataVersion, - MetadataLocation: newMetadataLocation, - } - - createErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, nil, identityName) - }) - if createErr != nil { - if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil { - glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after create-on-commit failure: %v", newMetadataLocation, cleanupErr) - } - if isS3TablesConflict(createErr) || isS3TablesAlreadyExists(createErr) { - writeError(w, http.StatusConflict, "CommitFailedException", "Table was created concurrently") - return - } - glog.Errorf("Iceberg: CommitTable CreateTable error: %v", createErr) - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table creation: "+createErr.Error()) - return - } - if markerErr := s.deleteStageCreateMarkers(r.Context(), metadataBucket, namespace, tableName); markerErr != nil { - glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after finalize: %v", encodeNamespace(namespace), tableName, markerErr) - } - - result := CommitTableResponse{ - MetadataLocation: newMetadataLocation, - Metadata: newMetadata, - } - writeJSON(w, http.StatusOK, result) - return } if loadErr != nil && !errors.Is(loadErr, filer_pb.ErrNotFound) { writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to load staged metadata: "+loadErr.Error()) return } - currentMetadata := newTableMetadata(tableUUID, location, nil, nil, nil, nil) - if currentMetadata == nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata") - return - } - - builder, builderErr := table.MetadataBuilderFromBase(currentMetadata, baseMetadataLocation) - if builderErr != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+builderErr.Error()) - return - } - for _, update := range req.Updates { - if applyErr := update.Apply(builder); applyErr != nil { - writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+applyErr.Error()) + if baseMetadata == nil { + baseMetadata = newTableMetadata(tableUUID, location, nil, nil, nil, nil) + if baseMetadata == nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata") return } } - newMetadata, buildErr := builder.Build() - if buildErr != nil { - writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+buildErr.Error()) - return - } - metadataVersion := 1 - metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion) - newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName) - - metadataBytes, marshalErr := json.Marshal(newMetadata) - if marshalErr != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+marshalErr.Error()) - return - } - metadataBytes, marshalErr = applyStatisticsUpdates(metadataBytes, statisticsUpdates) - if marshalErr != nil { - writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+marshalErr.Error()) - return - } - newMetadata, marshalErr = table.ParseMetadataBytes(metadataBytes) - if marshalErr != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+marshalErr.Error()) - return - } - - if saveErr := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); saveErr != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+saveErr.Error()) - return - } - - createReq := &s3tables.CreateTableRequest{ - TableBucketARN: bucketARN, - Namespace: namespace, - Name: tableName, - Format: "ICEBERG", - Metadata: &s3tables.TableMetadata{ - Iceberg: &s3tables.IcebergMetadata{ - TableUUID: tableUUID.String(), - }, - FullMetadata: metadataBytes, - }, - MetadataVersion: metadataVersion, - MetadataLocation: newMetadataLocation, - } - createErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, nil, identityName) + result, reqErr := s.finalizeCreateOnCommit(r.Context(), createOnCommitInput{ + bucketARN: bucketARN, + namespace: namespace, + tableName: tableName, + identityName: identityName, + location: location, + tableUUID: tableUUID, + baseMetadata: baseMetadata, + baseMetadataLoc: baseMetadataLocation, + baseMetadataVer: baseMetadataVersion, + updates: req.Updates, + statisticsUpdates: statisticsUpdates, }) - if createErr != nil { - if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil { - glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after create-on-commit failure: %v", newMetadataLocation, cleanupErr) - } - if isS3TablesConflict(createErr) || isS3TablesAlreadyExists(createErr) { - writeError(w, http.StatusConflict, "CommitFailedException", "Table was created concurrently") - return - } - glog.Errorf("Iceberg: CommitTable CreateTable error: %v", createErr) - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table creation: "+createErr.Error()) + if reqErr != nil { + writeError(w, reqErr.status, reqErr.errType, reqErr.message) return } - if markerErr := s.deleteStageCreateMarkers(r.Context(), metadataBucket, namespace, tableName); markerErr != nil { - glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after finalize: %v", encodeNamespace(namespace), tableName, markerErr) - } - - result := CommitTableResponse{ - MetadataLocation: newMetadataLocation, - Metadata: newMetadata, - } writeJSON(w, http.StatusOK, result) return } diff --git a/weed/s3api/iceberg/iceberg_stage_create_helpers_test.go b/weed/s3api/iceberg/iceberg_stage_create_helpers_test.go index d44910fe3..bbf2d7897 100644 --- a/weed/s3api/iceberg/iceberg_stage_create_helpers_test.go +++ b/weed/s3api/iceberg/iceberg_stage_create_helpers_test.go @@ -31,9 +31,11 @@ func TestParseMetadataVersionFromLocation(t *testing.T) { } for _, tc := range testCases { - if got := parseMetadataVersionFromLocation(tc.location); got != tc.version { - t.Fatalf("parseMetadataVersionFromLocation(%q) = %d, want %d", tc.location, got, tc.version) - } + t.Run(tc.location, func(t *testing.T) { + if got := parseMetadataVersionFromLocation(tc.location); got != tc.version { + t.Errorf("parseMetadataVersionFromLocation(%q) = %d, want %d", tc.location, got, tc.version) + } + }) } }