|
|
@ -3,6 +3,7 @@ |
|
|
package catalog |
|
|
package catalog |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
|
|
|
"bytes" |
|
|
"context" |
|
|
"context" |
|
|
"encoding/json" |
|
|
"encoding/json" |
|
|
"fmt" |
|
|
"fmt" |
|
|
@ -340,6 +341,163 @@ func TestStageCreateMissingNameReturnsBadRequest(t *testing.T) { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// TestStageCreateAndFinalizeFlow verifies staged create remains invisible until assert-create commit finalizes table creation.
|
|
|
|
|
|
func TestStageCreateAndFinalizeFlow(t *testing.T) { |
|
|
|
|
|
if testing.Short() { |
|
|
|
|
|
t.Skip("Skipping integration test in short mode") |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
env := NewTestEnvironment(t) |
|
|
|
|
|
defer env.Cleanup(t) |
|
|
|
|
|
|
|
|
|
|
|
env.StartSeaweedFS(t) |
|
|
|
|
|
createTableBucket(t, env, "warehouse") |
|
|
|
|
|
|
|
|
|
|
|
namespace := "stage_ns" |
|
|
|
|
|
tableName := "orders" |
|
|
|
|
|
|
|
|
|
|
|
status, _, err := doIcebergJSONRequest(env, http.MethodPost, "/v1/namespaces", map[string]any{ |
|
|
|
|
|
"namespace": []string{namespace}, |
|
|
|
|
|
}) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
t.Fatalf("Create namespace request failed: %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
if status != http.StatusOK && status != http.StatusConflict { |
|
|
|
|
|
t.Fatalf("Create namespace status = %d, want 200 or 409", status) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
status, stageResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables", namespace), map[string]any{ |
|
|
|
|
|
"name": tableName, |
|
|
|
|
|
"stage-create": true, |
|
|
|
|
|
}) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
t.Fatalf("Stage create request failed: %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
if status != http.StatusOK { |
|
|
|
|
|
t.Fatalf("Stage create status = %d, want 200", status) |
|
|
|
|
|
} |
|
|
|
|
|
stageLocation, _ := stageResp["metadata-location"].(string) |
|
|
|
|
|
if !strings.HasSuffix(stageLocation, "/metadata/v1.metadata.json") { |
|
|
|
|
|
t.Fatalf("stage metadata-location = %q, want suffix /metadata/v1.metadata.json", stageLocation) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
status, _, err = doIcebergJSONRequest(env, http.MethodGet, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), nil) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
t.Fatalf("Load staged table request failed: %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
if status != http.StatusNotFound { |
|
|
|
|
|
t.Fatalf("Load staged table status = %d, want 404", status) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
status, commitResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), map[string]any{ |
|
|
|
|
|
"requirements": []map[string]any{ |
|
|
|
|
|
{"type": "assert-create"}, |
|
|
|
|
|
}, |
|
|
|
|
|
"updates": []any{}, |
|
|
|
|
|
}) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
t.Fatalf("Finalize commit request failed: %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
if status != http.StatusOK { |
|
|
|
|
|
t.Fatalf("Finalize commit status = %d, want 200", status) |
|
|
|
|
|
} |
|
|
|
|
|
commitLocation, _ := commitResp["metadata-location"].(string) |
|
|
|
|
|
if !strings.HasSuffix(commitLocation, "/metadata/v2.metadata.json") { |
|
|
|
|
|
t.Fatalf("final metadata-location = %q, want suffix /metadata/v2.metadata.json", commitLocation) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
status, loadResp, err := doIcebergJSONRequest(env, http.MethodGet, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), nil) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
t.Fatalf("Load finalized table request failed: %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
if status != http.StatusOK { |
|
|
|
|
|
t.Fatalf("Load finalized table status = %d, want 200", status) |
|
|
|
|
|
} |
|
|
|
|
|
loadLocation, _ := loadResp["metadata-location"].(string) |
|
|
|
|
|
if loadLocation != commitLocation { |
|
|
|
|
|
t.Fatalf("loaded metadata-location = %q, want %q", loadLocation, commitLocation) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// TestCommitMissingTableWithoutAssertCreate ensures missing-table commits still require assert-create for creation.
|
|
|
|
|
|
func TestCommitMissingTableWithoutAssertCreate(t *testing.T) { |
|
|
|
|
|
if testing.Short() { |
|
|
|
|
|
t.Skip("Skipping integration test in short mode") |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
env := NewTestEnvironment(t) |
|
|
|
|
|
defer env.Cleanup(t) |
|
|
|
|
|
|
|
|
|
|
|
env.StartSeaweedFS(t) |
|
|
|
|
|
createTableBucket(t, env, "warehouse") |
|
|
|
|
|
|
|
|
|
|
|
namespace := "stage_missing_assert_ns" |
|
|
|
|
|
tableName := "missing_table" |
|
|
|
|
|
|
|
|
|
|
|
status, _, err := doIcebergJSONRequest(env, http.MethodPost, "/v1/namespaces", map[string]any{ |
|
|
|
|
|
"namespace": []string{namespace}, |
|
|
|
|
|
}) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
t.Fatalf("Create namespace request failed: %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
if status != http.StatusOK && status != http.StatusConflict { |
|
|
|
|
|
t.Fatalf("Create namespace status = %d, want 200 or 409", status) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
status, _, err = doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), map[string]any{ |
|
|
|
|
|
"requirements": []any{}, |
|
|
|
|
|
"updates": []any{}, |
|
|
|
|
|
}) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
t.Fatalf("Commit missing table request failed: %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
if status != http.StatusNotFound { |
|
|
|
|
|
t.Fatalf("Commit missing table status = %d, want 404", status) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func doIcebergJSONRequest(env *TestEnvironment, method, path string, payload any) (int, map[string]any, error) { |
|
|
|
|
|
url := env.IcebergURL() + path |
|
|
|
|
|
|
|
|
|
|
|
var bodyReader io.Reader |
|
|
|
|
|
if payload != nil { |
|
|
|
|
|
data, err := json.Marshal(payload) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return 0, nil, err |
|
|
|
|
|
} |
|
|
|
|
|
bodyReader = bytes.NewReader(data) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
req, err := http.NewRequest(method, url, bodyReader) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return 0, nil, err |
|
|
|
|
|
} |
|
|
|
|
|
if payload != nil { |
|
|
|
|
|
req.Header.Set("Content-Type", "application/json") |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
resp, err := http.DefaultClient.Do(req) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return 0, nil, err |
|
|
|
|
|
} |
|
|
|
|
|
defer resp.Body.Close() |
|
|
|
|
|
|
|
|
|
|
|
data, err := io.ReadAll(resp.Body) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return resp.StatusCode, nil, err |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if len(data) == 0 { |
|
|
|
|
|
return resp.StatusCode, nil, nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
var decoded map[string]any |
|
|
|
|
|
if err := json.Unmarshal(data, &decoded); err != nil { |
|
|
|
|
|
return resp.StatusCode, nil, fmt.Errorf("failed to decode %s %s response: %w body=%s", method, path, err, string(data)) |
|
|
|
|
|
} |
|
|
|
|
|
return resp.StatusCode, decoded, nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// createTableBucket creates a table bucket via the S3Tables REST API
|
|
|
// createTableBucket creates a table bucket via the S3Tables REST API
|
|
|
func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { |
|
|
func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { |
|
|
t.Helper() |
|
|
t.Helper() |
|
|
|