diff --git a/test/s3tables/catalog/iceberg_catalog_test.go b/test/s3tables/catalog/iceberg_catalog_test.go index 42efff29f..025a8bac7 100644 --- a/test/s3tables/catalog/iceberg_catalog_test.go +++ b/test/s3tables/catalog/iceberg_catalog_test.go @@ -3,6 +3,7 @@ package catalog import ( + "bytes" "context" "encoding/json" "fmt" @@ -340,6 +341,163 @@ func TestStageCreateMissingNameReturnsBadRequest(t *testing.T) { } } +// TestStageCreateAndFinalizeFlow verifies staged create remains invisible until assert-create commit finalizes table creation. +func TestStageCreateAndFinalizeFlow(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + env.StartSeaweedFS(t) + createTableBucket(t, env, "warehouse") + + namespace := "stage_ns" + tableName := "orders" + + status, _, err := doIcebergJSONRequest(env, http.MethodPost, "/v1/namespaces", map[string]any{ + "namespace": []string{namespace}, + }) + if err != nil { + t.Fatalf("Create namespace request failed: %v", err) + } + if status != http.StatusOK && status != http.StatusConflict { + t.Fatalf("Create namespace status = %d, want 200 or 409", status) + } + + status, stageResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables", namespace), map[string]any{ + "name": tableName, + "stage-create": true, + }) + if err != nil { + t.Fatalf("Stage create request failed: %v", err) + } + if status != http.StatusOK { + t.Fatalf("Stage create status = %d, want 200", status) + } + stageLocation, _ := stageResp["metadata-location"].(string) + if !strings.HasSuffix(stageLocation, "/metadata/v1.metadata.json") { + t.Fatalf("stage metadata-location = %q, want suffix /metadata/v1.metadata.json", stageLocation) + } + + status, _, err = doIcebergJSONRequest(env, http.MethodGet, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), nil) + if err != nil { + t.Fatalf("Load staged table request failed: %v", err) + } + if status != http.StatusNotFound { + t.Fatalf("Load staged table status = %d, want 404", status) + } + + status, commitResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), map[string]any{ + "requirements": []map[string]any{ + {"type": "assert-create"}, + }, + "updates": []any{}, + }) + if err != nil { + t.Fatalf("Finalize commit request failed: %v", err) + } + if status != http.StatusOK { + t.Fatalf("Finalize commit status = %d, want 200", status) + } + commitLocation, _ := commitResp["metadata-location"].(string) + if !strings.HasSuffix(commitLocation, "/metadata/v2.metadata.json") { + t.Fatalf("final metadata-location = %q, want suffix /metadata/v2.metadata.json", commitLocation) + } + + status, loadResp, err := doIcebergJSONRequest(env, http.MethodGet, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), nil) + if err != nil { + t.Fatalf("Load finalized table request failed: %v", err) + } + if status != http.StatusOK { + t.Fatalf("Load finalized table status = %d, want 200", status) + } + loadLocation, _ := loadResp["metadata-location"].(string) + if loadLocation != commitLocation { + t.Fatalf("loaded metadata-location = %q, want %q", loadLocation, commitLocation) + } +} + +// TestCommitMissingTableWithoutAssertCreate ensures missing-table commits still require assert-create for creation. +func TestCommitMissingTableWithoutAssertCreate(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + env.StartSeaweedFS(t) + createTableBucket(t, env, "warehouse") + + namespace := "stage_missing_assert_ns" + tableName := "missing_table" + + status, _, err := doIcebergJSONRequest(env, http.MethodPost, "/v1/namespaces", map[string]any{ + "namespace": []string{namespace}, + }) + if err != nil { + t.Fatalf("Create namespace request failed: %v", err) + } + if status != http.StatusOK && status != http.StatusConflict { + t.Fatalf("Create namespace status = %d, want 200 or 409", status) + } + + status, _, err = doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), map[string]any{ + "requirements": []any{}, + "updates": []any{}, + }) + if err != nil { + t.Fatalf("Commit missing table request failed: %v", err) + } + if status != http.StatusNotFound { + t.Fatalf("Commit missing table status = %d, want 404", status) + } +} + +func doIcebergJSONRequest(env *TestEnvironment, method, path string, payload any) (int, map[string]any, error) { + url := env.IcebergURL() + path + + var bodyReader io.Reader + if payload != nil { + data, err := json.Marshal(payload) + if err != nil { + return 0, nil, err + } + bodyReader = bytes.NewReader(data) + } + + req, err := http.NewRequest(method, url, bodyReader) + if err != nil { + return 0, nil, err + } + if payload != nil { + req.Header.Set("Content-Type", "application/json") + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return 0, nil, err + } + defer resp.Body.Close() + + data, err := io.ReadAll(resp.Body) + if err != nil { + return resp.StatusCode, nil, err + } + + if len(data) == 0 { + return resp.StatusCode, nil, nil + } + + var decoded map[string]any + if err := json.Unmarshal(data, &decoded); err != nil { + return resp.StatusCode, nil, fmt.Errorf("failed to decode %s %s response: %w body=%s", method, path, err, string(data)) + } + return resp.StatusCode, decoded, nil +} + // createTableBucket creates a table bucket via the S3Tables REST API func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { t.Helper()