From 403592bb9fa63cc6abcf7e865e809494ff4965dd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Feb 2026 10:03:53 -0800 Subject: [PATCH] Add Spark Iceberg catalog integration tests and CI support (#8242) * Add Spark Iceberg catalog integration tests and CI support Implement comprehensive integration tests for Spark with SeaweedFS Iceberg REST catalog: - Basic CRUD operations (Create, Read, Update, Delete) on Iceberg tables - Namespace (database) management - Data insertion, querying, and deletion - Time travel capabilities via snapshot versioning - Compatible with SeaweedFS S3 and Iceberg REST endpoints Tests mirror the structure of existing Trino integration tests but use Spark's Python SQL API and PySpark for testing. Add GitHub Actions CI job for spark-iceberg-catalog-tests in s3-tables-tests.yml to automatically run Spark integration tests on pull requests. * fmt * Fix Spark integration tests - code review feedback * go mod tidy * Add go mod tidy step to integration test jobs Add 'go mod tidy' step before test runs for all integration test jobs: - s3-tables-tests - iceberg-catalog-tests - trino-iceberg-catalog-tests - spark-iceberg-catalog-tests This ensures dependencies are clean before running tests. * Fix remaining Spark operations test issues Address final code review comments: Setup & Initialization: - Add waitForSparkReady() helper function that polls Spark readiness with backoff instead of hardcoded 10-second sleep - Extract setupSparkTestEnv() helper to reduce boilerplate duplication between TestSparkCatalogBasicOperations and TestSparkTimeTravel - Both tests now use helpers for consistent, reliable setup Assertions & Validation: - Make setup-critical operations (namespace, table creation, initial insert) use t.Fatalf instead of t.Errorf to fail fast - Validate setupSQL output in TestSparkTimeTravel and fail if not 'Setup complete' - Add validation after second INSERT in TestSparkTimeTravel: verify row count increased to 2 before time travel test - Add context to error messages with namespace and tableName params Code Quality: - Remove code duplication between test functions - All critical paths now properly validated - Consistent error handling throughout * Fix go vet errors in S3 Tables tests Fixes: 1. setup_test.go (Spark): - Add missing import: github.com/testcontainers/testcontainers-go/wait - Use wait.ForLog instead of undefined testcontainers.NewLogStrategy - Remove unused strings import 2. trino_catalog_test.go: - Use net.JoinHostPort instead of fmt.Sprintf for address formatting - Properly handles IPv6 addresses by wrapping them in brackets * Use weed mini for simpler SeaweedFS startup Replace complex multi-process startup (master, volume, filer, s3) with single 'weed mini' command that starts all services together. Benefits: - Simpler, more reliable startup - Single weed mini process vs 4 separate processes - Automatic coordination between components - Better port management with no manual coordination Changes: - Remove separate master, volume, filer process startup - Use weed mini with -master.port, -filer.port, -s3.port flags - Keep Iceberg REST as separate service (still needed) - Increase timeout to 15s for port readiness (weed mini startup) - Remove volumePort and filerProcess fields from TestEnvironment - Simplify cleanup to only handle two processes (mini, iceberg rest) * Clean up dead code and temp directory leaks Fixes: 1. Remove dead s3Process field and cleanup: - weed mini bundles S3 gateway, no separate process needed - Removed s3Process field from TestEnvironment - Removed unnecessary s3Process cleanup code 2. Fix temp config directory leak: - Add sparkConfigDir field to TestEnvironment - Store returned configDir in writeSparkConfig - Clean up sparkConfigDir in Cleanup() with os.RemoveAll - Prevents accumulation of temp directories in test runs 3. Simplify Cleanup: - Now handles only necessary processes (weed mini, iceberg rest) - Removes both seaweedfsDataDir and sparkConfigDir - Cleaner shutdown sequence * Use weed mini's built-in Iceberg REST and fix python binary Changes: - Add -s3.port.iceberg flag to weed mini for built-in Iceberg REST Catalog - Remove separate 'weed server' process for Iceberg REST - Remove icebergRestProcess field from TestEnvironment - Simplify Cleanup() to only manage weed mini + Spark - Add port readiness check for iceberg REST from weed mini - Set Spark container Cmd to '/bin/sh -c sleep 3600' to keep it running - Change python to python3 in container.Exec calls This simplifies to truly one all-in-one weed mini process (master, filer, s3, iceberg-rest) plus just the Spark container. * go fmt * clean up * bind on a non-loopback IP for container access, aligned Iceberg metadata saves/locations with table locations, and reworked Spark time travel to use TIMESTAMP AS OF with safe timestamp extraction. * shared mini start * Fixed internal directory creation under /buckets so .objects paths can auto-create without failing bucket-name validation, which restores table bucket object writes * fix path Updated table bucket objects to write under `/buckets/` and saved Iceberg metadata there, adjusting Spark time-travel timestamp to committed_at +1s. Rebuilt the weed binary (`go install ./weed`) and confirmed passing tests for Spark and Trino with focused test commands. * Updated table bucket creation to stop creating /buckets/.objects and switched Trino REST warehouse to s3:// to match Iceberg layout. * Stabilize S3Tables integration tests * Fix timestamp extraction and remove dead code in bucketDir * Use table bucket as warehouse in s3tables tests * Update trino_blog_operations_test.go * adds the CASCADE option to handle any remaining table metadata/files in the schema directory * skip namespace not empty --- .github/workflows/s3-tables-tests.yml | 75 ++++ go.mod | 23 + go.sum | 4 + test/s3tables/catalog_spark/setup_test.go | 395 ++++++++++++++++++ .../catalog_spark/spark_operations_test.go | 279 +++++++++++++ .../trino_blog_operations_test.go | 90 +++- .../catalog_trino/trino_catalog_test.go | 68 +-- .../trino_crud_operations_test.go | 3 +- test/s3tables/testutil/weed_mini.go | 56 +++ weed/filer/filer.go | 6 +- weed/s3api/bucket_paths.go | 3 - weed/s3api/iceberg/iceberg.go | 153 +++++-- weed/s3api/s3api_object_handlers_list.go | 1 - weed/s3api/s3tables/handler_bucket_create.go | 8 - weed/s3api/s3tables/handler_namespace.go | 32 +- weed/s3api/s3tables/handler_table.go | 26 +- 16 files changed, 1108 insertions(+), 114 deletions(-) create mode 100644 test/s3tables/catalog_spark/setup_test.go create mode 100644 test/s3tables/catalog_spark/spark_operations_test.go create mode 100644 test/s3tables/testutil/weed_mini.go diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 1b1d0763d..5f6ee7b91 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -27,6 +27,9 @@ jobs: go-version-file: 'go.mod' id: go + - name: Run go mod tidy + run: go mod tidy + - name: Install SeaweedFS run: | go install -buildvcs=false ./weed @@ -84,6 +87,9 @@ jobs: go-version-file: 'go.mod' id: go + - name: Run go mod tidy + run: go mod tidy + - name: Run Iceberg Catalog Integration Tests timeout-minutes: 25 working-directory: test/s3tables/catalog @@ -143,6 +149,9 @@ jobs: - name: Pre-pull Trino image run: docker pull trinodb/trino:479 + - name: Run go mod tidy + run: go mod tidy + - name: Install SeaweedFS run: | go install -buildvcs=false ./weed @@ -185,6 +194,72 @@ jobs: path: test/s3tables/catalog_trino/test-output.log retention-days: 3 + spark-iceberg-catalog-tests: + name: Spark Iceberg Catalog Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Set up Docker + uses: docker/setup-buildx-action@v3 + + - name: Pre-pull Spark image + run: docker pull apache/spark:3.5.1 + + - name: Run go mod tidy + run: go mod tidy + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Run Spark Iceberg Catalog Integration Tests + timeout-minutes: 25 + working-directory: test/s3tables/catalog_spark + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + echo "=== Starting Spark Iceberg Catalog Tests ===" + + # Run Spark + Iceberg catalog integration tests + go test -v -timeout 20m . 2>&1 | tee test-output.log || { + echo "Spark Iceberg catalog integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3tables/catalog_spark + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker)" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v6 + with: + name: spark-iceberg-catalog-test-logs + path: test/s3tables/catalog_spark/test-output.log + retention-days: 3 + s3-tables-build-verification: name: S3 Tables Build Verification runs-on: ubuntu-22.04 diff --git a/go.mod b/go.mod index a244a069e..bf6a0ca37 100644 --- a/go.mod +++ b/go.mod @@ -155,6 +155,7 @@ require ( github.com/seaweedfs/go-fuse/v2 v2.9.1 github.com/shirou/gopsutil/v4 v4.26.1 github.com/tarantool/go-tarantool/v2 v2.4.1 + github.com/testcontainers/testcontainers-go v0.39.0 github.com/tikv/client-go/v2 v2.0.7 github.com/xeipuuv/gojsonschema v1.2.0 github.com/ydb-platform/ydb-go-sdk-auth-environ v0.5.1 @@ -174,7 +175,9 @@ require ( atomicgo.dev/schedule v0.1.0 // indirect cloud.google.com/go/longrunning v0.7.0 // indirect cloud.google.com/go/pubsub/v2 v2.2.1 // indirect + dario.cat/mergo v1.0.2 // indirect github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.1 // indirect + github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest/to v0.4.1 // indirect github.com/a1ex3/zstd-seekable-format-go/pkg v0.10.0 // indirect @@ -199,8 +202,17 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/version v0.0.0-20250314144055-3860cd14adf2 // indirect github.com/containerd/console v1.0.5 // indirect + github.com/containerd/errdefs v1.0.0 // indirect + github.com/containerd/errdefs/pkg v0.3.0 // indirect + github.com/containerd/log v0.1.0 // indirect + github.com/containerd/platforms v1.0.0-rc.1 // indirect + github.com/cpuguy83/dockercfg v0.3.2 // indirect github.com/dave/dst v0.27.2 // indirect github.com/diskfs/go-diskfs v1.7.0 // indirect + github.com/distribution/reference v0.6.0 // indirect + github.com/docker/docker v28.5.0+incompatible // indirect + github.com/docker/go-connections v0.6.0 // indirect + github.com/docker/go-units v0.5.0 // indirect github.com/go-asn1-ber/asn1-ber v1.5.8-0.20250403174932-29230038a667 // indirect github.com/go-git/go-billy/v5 v5.6.2 // indirect github.com/goccy/go-yaml v1.18.0 // indirect @@ -225,8 +237,19 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/lithammer/fuzzysearch v1.1.8 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect + github.com/magiconair/properties v1.8.10 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect + github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/go-archive v0.1.0 // indirect + github.com/moby/patternmatcher v0.6.0 // indirect + github.com/moby/sys/sequential v0.6.0 // indirect + github.com/moby/sys/user v0.4.0 // indirect + github.com/moby/sys/userns v0.1.0 // indirect + github.com/moby/term v0.5.2 // indirect + github.com/morikuni/aec v1.0.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.1 // indirect github.com/openzipkin/zipkin-go v0.4.3 // indirect github.com/parquet-go/bitpack v1.0.0 // indirect github.com/parquet-go/jsonlite v1.0.0 // indirect diff --git a/go.sum b/go.sum index 7e3d19b08..3d090234f 100644 --- a/go.sum +++ b/go.sum @@ -904,6 +904,8 @@ github.com/coreos/go-systemd/v22 v22.6.0/go.mod h1:iG+pp635Fo7ZmV/j14KUcmEyWF+0X github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA= github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/creasty/defaults v1.8.0 h1:z27FJxCAa0JKt3utc0sCImAEb+spPucmKoOdLHvHYKk= github.com/creasty/defaults v1.8.0/go.mod h1:iGzKe6pbEHnpMPtfDXZEr0NVxWnPTjb1bbDy08fPzYM= github.com/cronokirby/saferith v0.33.0 h1:TgoQlfsD4LIwx71+ChfRcIpjkw+RPOapDEVxa+LhwLo= @@ -2966,6 +2968,8 @@ gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= +gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q= +gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/test/s3tables/catalog_spark/setup_test.go b/test/s3tables/catalog_spark/setup_test.go new file mode 100644 index 000000000..9e889b53e --- /dev/null +++ b/test/s3tables/catalog_spark/setup_test.go @@ -0,0 +1,395 @@ +package catalog_spark + +import ( + "context" + "fmt" + "io" + "math/rand" + "net" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" + "github.com/testcontainers/testcontainers-go" +) + +var ( + miniProcessMu sync.Mutex + lastMiniProcess *exec.Cmd +) + +func stopPreviousMini() { + miniProcessMu.Lock() + defer miniProcessMu.Unlock() + + if lastMiniProcess != nil && lastMiniProcess.Process != nil { + _ = lastMiniProcess.Process.Kill() + _ = lastMiniProcess.Wait() + } + lastMiniProcess = nil +} + +func registerMiniProcess(cmd *exec.Cmd) { + miniProcessMu.Lock() + lastMiniProcess = cmd + miniProcessMu.Unlock() +} + +func clearMiniProcess(cmd *exec.Cmd) { + miniProcessMu.Lock() + if lastMiniProcess == cmd { + lastMiniProcess = nil + } + miniProcessMu.Unlock() +} + +type TestEnvironment struct { + t *testing.T + dockerAvailable bool + seaweedfsDataDir string + sparkConfigDir string + masterPort int + filerPort int + s3Port int + icebergRestPort int + accessKey string + secretKey string + sparkContainer testcontainers.Container + masterProcess *exec.Cmd +} + +func NewTestEnvironment(t *testing.T) *TestEnvironment { + env := &TestEnvironment{ + t: t, + accessKey: "test", + secretKey: "test", + } + + // Check if Docker is available + cmd := exec.Command("docker", "version") + env.dockerAvailable = cmd.Run() == nil + + return env +} + +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + stopPreviousMini() + + var err error + env.seaweedfsDataDir, err = os.MkdirTemp("", "seaweed-spark-test-") + if err != nil { + t.Fatalf("failed to create temp directory: %v", err) + } + + env.masterPort = mustFreePort(t, "Master") + env.filerPort = mustFreePort(t, "Filer") + env.s3Port = mustFreePort(t, "S3") + env.icebergRestPort = mustFreePort(t, "Iceberg") + + bindIP := testutil.FindBindIP() + + iamConfigPath, err := testutil.WriteIAMConfig(env.seaweedfsDataDir, env.accessKey, env.secretKey) + if err != nil { + t.Fatalf("failed to create IAM config: %v", err) + } + + // Start SeaweedFS using weed mini (all-in-one including Iceberg REST) + env.masterProcess = exec.Command( + "weed", "mini", + "-ip", bindIP, + "-ip.bind", "0.0.0.0", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.iceberg", fmt.Sprintf("%d", env.icebergRestPort), + "-s3.config", iamConfigPath, + "-dir", env.seaweedfsDataDir, + ) + env.masterProcess.Env = append(os.Environ(), + "AWS_ACCESS_KEY_ID="+env.accessKey, + "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "ICEBERG_WAREHOUSE=s3://iceberg-tables", + "S3TABLES_DEFAULT_BUCKET=iceberg-tables", + ) + if err := env.masterProcess.Start(); err != nil { + t.Fatalf("failed to start weed mini: %v", err) + } + registerMiniProcess(env.masterProcess) + + // Wait for all services to be ready + if !waitForPort(env.masterPort, 15*time.Second) { + t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort) + } + if !waitForPort(env.filerPort, 15*time.Second) { + t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort) + } + if !waitForPort(env.s3Port, 15*time.Second) { + t.Fatalf("weed mini failed to start - s3 port %d not listening", env.s3Port) + } + if !waitForPort(env.icebergRestPort, 15*time.Second) { + t.Fatalf("weed mini failed to start - iceberg rest port %d not listening", env.icebergRestPort) + } +} + +func mustFreePort(t *testing.T, name string) int { + t.Helper() + + for i := 0; i < 200; i++ { + port := 20000 + rand.Intn(30000) + listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port)) + if err != nil { + continue + } + listener.Close() + grpcPort := port + 10000 + if grpcPort > 65535 { + continue + } + grpcListener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", grpcPort)) + if err != nil { + continue + } + grpcListener.Close() + return port + } + t.Fatalf("failed to get free port for %s", name) + return 0 +} + +func waitForPort(port int, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 500*time.Millisecond) + if err == nil { + conn.Close() + return true + } + time.Sleep(100 * time.Millisecond) + } + return false +} + +func (env *TestEnvironment) writeSparkConfig(t *testing.T, catalogBucket string) string { + t.Helper() + + configDir, err := os.MkdirTemp("", "spark-config-") + if err != nil { + t.Fatalf("failed to create config directory: %v", err) + } + + // Store for cleanup + env.sparkConfigDir = configDir + + s3Endpoint := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) + catalogEndpoint := fmt.Sprintf("http://host.docker.internal:%d", env.icebergRestPort) + + sparkConfig := fmt.Sprintf(` +[spark] +master = "local" +app.name = "SeaweedFS Iceberg Test" + +[storage] +s3.endpoint = "%s" +s3.access-key = "test" +s3.secret-key = "test" +s3.path-style-access = "true" +s3.bucket = "%s" + +[iceberg] +catalog.type = "rest" +catalog.uri = "%s" +catalog.s3.endpoint = "%s" +catalog.s3.access-key = "test" +catalog.s3.secret-key = "test" +catalog.s3.path-style-access = "true" +`, s3Endpoint, catalogBucket, catalogEndpoint, s3Endpoint) + + configPath := filepath.Join(configDir, "spark-config.ini") + if err := os.WriteFile(configPath, []byte(sparkConfig), 0644); err != nil { + t.Fatalf("failed to write spark config: %v", err) + } + + return configDir +} + +func (env *TestEnvironment) startSparkContainer(t *testing.T, configDir string) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + req := testcontainers.ContainerRequest{ + Image: "apache/spark:3.5.1", + ExposedPorts: []string{"4040/tcp"}, + Mounts: testcontainers.Mounts( + testcontainers.BindMount(configDir, "/config"), + ), + Env: map[string]string{ + "SPARK_LOCAL_IP": "localhost", + }, + ExtraHosts: []string{"host.docker.internal:host-gateway"}, + Cmd: []string{"/bin/sh", "-c", "sleep 3600"}, + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + t.Fatalf("failed to start spark container: %v", err) + } + + env.sparkContainer = container +} + +func (env *TestEnvironment) Cleanup(t *testing.T) { + t.Helper() + + // Kill weed mini process + if env.masterProcess != nil && env.masterProcess.Process != nil { + env.masterProcess.Process.Kill() + env.masterProcess.Wait() + } + clearMiniProcess(env.masterProcess) + + // Stop Spark container + if env.sparkContainer != nil { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + env.sparkContainer.Terminate(ctx) + } + + // Remove temporary directories after processes are stopped + if env.seaweedfsDataDir != "" { + os.RemoveAll(env.seaweedfsDataDir) + } + if env.sparkConfigDir != "" { + os.RemoveAll(env.sparkConfigDir) + } +} + +func randomString(n int) string { + const letters = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, n) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} + +func runSparkPySQL(t *testing.T, container testcontainers.Container, sql string, icebergPort int, s3Port int) string { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + pythonScript := fmt.Sprintf(` +import glob +import os +import sys + +spark_home = os.environ.get("SPARK_HOME", "/opt/spark") +python_path = os.path.join(spark_home, "python") +py4j_glob = glob.glob(os.path.join(python_path, "lib", "py4j-*.zip")) +ivy_dir = "/tmp/ivy" +os.makedirs(ivy_dir, exist_ok=True) +os.environ["AWS_REGION"] = "us-west-2" +os.environ["AWS_DEFAULT_REGION"] = "us-west-2" +os.environ["AWS_ACCESS_KEY_ID"] = "test" +os.environ["AWS_SECRET_ACCESS_KEY"] = "test" +if python_path not in sys.path: + sys.path.insert(0, python_path) +if py4j_glob and py4j_glob[0] not in sys.path: + sys.path.insert(0, py4j_glob[0]) + +from pyspark.sql import SparkSession + +spark = (SparkSession.builder + .appName("SeaweedFS Iceberg Test") + .config("spark.jars.ivy", ivy_dir) + .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.iceberg.type", "rest") + .config("spark.sql.catalog.iceberg.metrics-reporter-impl", "org.apache.iceberg.metrics.LoggingMetricsReporter") + .config("spark.sql.catalog.iceberg.uri", "http://host.docker.internal:%d") + .config("spark.sql.catalog.iceberg.rest.auth.type", "sigv4") + .config("spark.sql.catalog.iceberg.rest.auth.sigv4.delegate-auth-type", "none") + .config("spark.sql.catalog.iceberg.rest.sigv4-enabled", "true") + .config("spark.sql.catalog.iceberg.rest.signing-region", "us-west-2") + .config("spark.sql.catalog.iceberg.rest.signing-name", "s3") + .config("spark.sql.catalog.iceberg.rest.access-key-id", "test") + .config("spark.sql.catalog.iceberg.rest.secret-access-key", "test") + .config("spark.sql.catalog.iceberg.s3.endpoint", "http://host.docker.internal:%d") + .config("spark.sql.catalog.iceberg.s3.region", "us-west-2") + .config("spark.sql.catalog.iceberg.s3.access-key", "test") + .config("spark.sql.catalog.iceberg.s3.secret-key", "test") + .config("spark.sql.catalog.iceberg.s3.path-style-access", "true") + .config("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .getOrCreate()) + +%s +`, icebergPort, s3Port, sql) + + code, out, err := container.Exec(ctx, []string{"python3", "-c", pythonScript}) + var output string + if out != nil { + outputBytes, readErr := io.ReadAll(out) + if readErr != nil { + t.Logf("failed to read output: %v", readErr) + } else { + output = string(outputBytes) + } + } + if code != 0 { + t.Logf("Spark Python execution failed with code %d: %v, output: %s", code, err, output) + return output + } + + return output +} + +func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + masterGrpcPort := env.masterPort + 10000 + cmd := exec.Command("weed", "shell", + fmt.Sprintf("-master=localhost:%d.%d", env.masterPort, masterGrpcPort), + ) + cmd.Stdin = strings.NewReader(fmt.Sprintf("s3tables.bucket -create -name %s -account 000000000000\nexit\n", bucketName)) + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("failed to create table bucket %s via weed shell: %v\nOutput: %s", bucketName, err, string(output)) + } +} + +func createObjectBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + cfg := aws.Config{ + Region: "us-east-1", + Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "")), + BaseEndpoint: aws.String(fmt.Sprintf("http://localhost:%d", env.s3Port)), + } + + client := s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = true + }) + + _, err := client.CreateBucket(context.Background(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + t.Fatalf("failed to create object bucket %s: %v", bucketName, err) + } +} diff --git a/test/s3tables/catalog_spark/spark_operations_test.go b/test/s3tables/catalog_spark/spark_operations_test.go new file mode 100644 index 000000000..55437c8e7 --- /dev/null +++ b/test/s3tables/catalog_spark/spark_operations_test.go @@ -0,0 +1,279 @@ +package catalog_spark + +import ( + "fmt" + "regexp" + "strings" + "testing" + "time" + + "github.com/testcontainers/testcontainers-go" +) + +// waitForSparkReady polls Spark to verify it's ready by executing a simple query +func waitForSparkReady(t *testing.T, container testcontainers.Container, icebergPort int, s3Port int, timeout time.Duration) { + t.Helper() + + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + output := runSparkPySQL(t, container, ` +spark.sql("SELECT 1 as test") +print("Spark ready") +`, icebergPort, s3Port) + if strings.Contains(output, "Spark ready") { + return + } + time.Sleep(5 * time.Second) + } + t.Fatalf("Spark did not become ready within %v", timeout) +} + +// setupSparkTestEnv initializes a test environment with SeaweedFS and Spark containers +func setupSparkTestEnv(t *testing.T) (*TestEnvironment, string, string) { + t.Helper() + + env := NewTestEnvironment(t) + + if !env.dockerAvailable { + t.Skip("Docker not available, skipping Spark integration test") + } + + t.Logf(">>> Starting SeaweedFS...") + env.StartSeaweedFS(t) + t.Cleanup(func() { env.Cleanup(t) }) + + tableBucket := "iceberg-tables" + catalogBucket := tableBucket + createTableBucket(t, env, tableBucket) + + configDir := env.writeSparkConfig(t, catalogBucket) + env.startSparkContainer(t, configDir) + + // Poll for Spark readiness instead of fixed sleep + waitForSparkReady(t, env.sparkContainer, env.icebergRestPort, env.s3Port, 10*time.Minute) + + return env, catalogBucket, tableBucket +} + +// TestSparkCatalogBasicOperations tests basic Spark Iceberg catalog operations +func TestSparkCatalogBasicOperations(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + env, _, _ := setupSparkTestEnv(t) + + // Test 1: Create a namespace (database) + t.Logf(">>> Test 1: Creating namespace") + namespace := "spark_test_" + randomString(6) + sparkSQL := fmt.Sprintf(` +spark.sql("CREATE NAMESPACE iceberg.%s") +print("Namespace created") +`, namespace) + output := runSparkPySQL(t, env.sparkContainer, sparkSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Namespace created") { + t.Fatalf("namespace creation failed, output: %s", output) + } + + // Test 2: Create a table + t.Logf(">>> Test 2: Creating table") + tableName := "test_table_" + randomString(6) + createTableSQL := fmt.Sprintf(` +spark.sql(""" +CREATE TABLE iceberg.%s.%s ( + id INT, + name STRING, + age INT +) +USING iceberg +""") +print("Table created") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, createTableSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Table created") { + t.Fatalf("table creation failed, output: %s", output) + } + + // Test 3: Insert data + t.Logf(">>> Test 3: Inserting data") + insertDataSQL := fmt.Sprintf(` +spark.sql(""" +INSERT INTO iceberg.%s.%s VALUES + (1, 'Alice', 30), + (2, 'Bob', 25), + (3, 'Charlie', 35) +""") +print("Data inserted") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, insertDataSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Data inserted") { + t.Fatalf("data insertion failed, output: %s", output) + } + + // Test 4: Query data + t.Logf(">>> Test 4: Querying data") + querySQL := fmt.Sprintf(` +result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s") +result.show() +count = result.collect()[0]['count'] +print(f"Row count: {count}") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, querySQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Row count: 3") { + t.Errorf("expected row count 3, got output: %s", output) + } + + // Test 5: Update data + t.Logf(">>> Test 5: Updating data") + updateSQL := fmt.Sprintf(` +spark.sql(""" +UPDATE iceberg.%s.%s SET age = 31 WHERE id = 1 +""") +print("Data updated") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, updateSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Data updated") { + t.Errorf("data update failed, output: %s", output) + } + + // Test 6: Delete data + t.Logf(">>> Test 6: Deleting data") + deleteSQL := fmt.Sprintf(` +spark.sql(""" +DELETE FROM iceberg.%s.%s WHERE id = 3 +""") +print("Data deleted") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, deleteSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Data deleted") { + t.Errorf("data delete failed, output: %s", output) + } + + // Verify final count + t.Logf(">>> Verifying final data") + finalCountSQL := fmt.Sprintf(` +result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s") +result.show() +count = result.collect()[0]['count'] +print(f"Final row count: {count}") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, finalCountSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Final row count: 2") { + t.Errorf("expected final row count 2, got output: %s", output) + } + + t.Logf(">>> All tests passed") +} + +// TestSparkTimeTravel tests Spark Iceberg time travel capabilities +func TestSparkTimeTravel(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + env, _, _ := setupSparkTestEnv(t) + + namespace := "time_travel_test_" + randomString(6) + tableName := "tt_table_" + randomString(6) + + // Create namespace and table + setupSQL := fmt.Sprintf(` +spark.sql("CREATE NAMESPACE iceberg.%s") +spark.sql(""" +CREATE TABLE iceberg.%s.%s ( + id INT, + value INT +) +USING iceberg +""") +print("Setup complete") +`, namespace, namespace, tableName) + output := runSparkPySQL(t, env.sparkContainer, setupSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Setup complete") { + t.Fatalf("setup failed for namespace %s and table %s, output: %s", namespace, tableName, output) + } + + // Insert initial data + t.Logf(">>> Inserting initial data") + insertSQL := fmt.Sprintf(` +import time +from datetime import timedelta +spark.sql(""" +INSERT INTO iceberg.%s.%s VALUES (1, 10) +""") +ts = None +for _ in range(10): + try: + ts = spark.sql("SELECT committed_at FROM iceberg.%s.%s.snapshots ORDER BY committed_at DESC LIMIT 1").collect()[0]["committed_at"] + if ts is not None: + break + except Exception as e: + print(f"Snapshot query failed: {e}") + time.sleep(1) +if ts is None: + raise RuntimeError("Failed to read snapshot committed_at") +ts_for_time_travel = ts + timedelta(seconds=1) +print(f"Snapshot timestamp: {ts_for_time_travel.strftime('%%Y-%%m-%%d %%H:%%M:%%S')}") +`, namespace, tableName, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, insertSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Snapshot timestamp:") { + t.Fatalf("failed to get snapshot timestamp: %s", output) + } + + // Extract snapshot timestamp from output - look specifically for the "Snapshot timestamp:" line + var snapshotTS string + tsRe := regexp.MustCompile(`Snapshot timestamp:\s*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})`) + matches := tsRe.FindStringSubmatch(output) + if len(matches) > 1 { + snapshotTS = matches[1] + } + + if snapshotTS == "" { + t.Fatalf("could not extract snapshot timestamp from output: %s", output) + } + + // Wait to ensure the next insert gets a distinct snapshot timestamp + time.Sleep(2 * time.Second) + + // Insert more data + t.Logf(">>> Inserting more data") + insertMoreSQL := fmt.Sprintf(` +spark.sql(""" +INSERT INTO iceberg.%s.%s VALUES (2, 20) +""") +print("More data inserted") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, insertMoreSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "More data inserted") { + t.Fatalf("failed to insert more data, output: %s", output) + } + + // Verify count increased to 2 + t.Logf(">>> Verifying row count after second insert") + verifySQL := fmt.Sprintf(` +result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s") +count = result.collect()[0]['count'] +print(f"Current row count: {count}") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, verifySQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Current row count: 2") { + t.Fatalf("expected current row count 2 after second insert, got output: %s", output) + } + + // Time travel to first snapshot + t.Logf(">>> Time traveling to first snapshot") + timeTravelSQL := fmt.Sprintf(` +result = spark.sql(""" +SELECT COUNT(*) as count FROM iceberg.%s.%s TIMESTAMP AS OF '%s' +""") +result.show() +count = result.collect()[0]['count'] +print(f"Count at snapshot: {count}") +`, namespace, tableName, snapshotTS) + output = runSparkPySQL(t, env.sparkContainer, timeTravelSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Count at snapshot: 1") { + t.Errorf("expected count 1 at first snapshot, got: %s", output) + } + + t.Logf(">>> Time travel test passed") +} diff --git a/test/s3tables/catalog_trino/trino_blog_operations_test.go b/test/s3tables/catalog_trino/trino_blog_operations_test.go index 6603507d5..54ad9b7bb 100644 --- a/test/s3tables/catalog_trino/trino_blog_operations_test.go +++ b/test/s3tables/catalog_trino/trino_blog_operations_test.go @@ -2,6 +2,7 @@ package catalog_trino import ( "fmt" + "os/exec" "strconv" "strings" "testing" @@ -15,11 +16,16 @@ func TestTrinoBlogOperations(t *testing.T) { schemaName := "blog_ns_" + randomString(6) customersTable := "customers_" + randomString(6) trinoCustomersTable := "trino_customers_" + randomString(6) + warehouseBucket := "iceberg-tables" + customersLocation := fmt.Sprintf("s3://%s/%s/%s_%s", warehouseBucket, schemaName, customersTable, randomString(6)) + trinoCustomersLocation := fmt.Sprintf("s3://%s/%s/%s_%s", warehouseBucket, schemaName, trinoCustomersTable, randomString(6)) runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS iceberg.%s", schemaName)) - defer runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP SCHEMA IF EXISTS iceberg.%s", schemaName)) + defer runTrinoSQLAllowNamespaceNotEmpty(t, env.trinoContainer, fmt.Sprintf("DROP SCHEMA IF EXISTS iceberg.%s CASCADE", schemaName)) defer runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP TABLE IF EXISTS iceberg.%s.%s", schemaName, trinoCustomersTable)) defer runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP TABLE IF EXISTS iceberg.%s.%s", schemaName, customersTable)) + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP TABLE IF EXISTS iceberg.%s.%s", schemaName, trinoCustomersTable)) + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP TABLE IF EXISTS iceberg.%s.%s", schemaName, customersTable)) createCustomersSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS iceberg.%s.%s ( customer_sk INT, @@ -35,9 +41,10 @@ func TestTrinoBlogOperations(t *testing.T) { login VARCHAR ) WITH ( format = 'PARQUET', - sorted_by = ARRAY['customer_id'] -)`, schemaName, customersTable) - runTrinoSQL(t, env.trinoContainer, createCustomersSQL) + sorted_by = ARRAY['customer_id'], + location = '%s' +)`, schemaName, customersTable, customersLocation) + runTrinoSQLAllowExists(t, env.trinoContainer, createCustomersSQL) insertCustomersSQL := fmt.Sprintf(`INSERT INTO iceberg.%s.%s VALUES (1, 'AAAAA', 'Mrs', 'Amanda', 'Olson', 'Y', 8, 4, 1984, 'US', 'aolson'), @@ -64,10 +71,13 @@ func TestTrinoBlogOperations(t *testing.T) { ctasSQL := fmt.Sprintf(`CREATE TABLE iceberg.%s.%s WITH ( - format = 'PARQUET' + format = 'PARQUET', + location = '%s' ) -AS SELECT * FROM iceberg.%s.%s`, schemaName, trinoCustomersTable, schemaName, customersTable) - runTrinoSQL(t, env.trinoContainer, ctasSQL) +AS SELECT * FROM iceberg.%s.%s`, schemaName, trinoCustomersTable, trinoCustomersLocation, schemaName, customersTable) + ctasInsertSQL := fmt.Sprintf("INSERT INTO iceberg.%s.%s SELECT * FROM iceberg.%s.%s", schemaName, trinoCustomersTable, schemaName, customersTable) + ctasDeleteSQL := fmt.Sprintf("DELETE FROM iceberg.%s.%s", schemaName, trinoCustomersTable) + runTrinoCTAS(t, env.trinoContainer, ctasSQL, ctasDeleteSQL, ctasInsertSQL) countOutput = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s", schemaName, trinoCustomersTable)) rowCount = mustParseCSVInt64(t, countOutput) @@ -130,6 +140,72 @@ AS SELECT * FROM iceberg.%s.%s`, schemaName, trinoCustomersTable, schemaName, cu } } +func runTrinoSQLAllowExists(t *testing.T, containerName, sql string) string { + t.Helper() + + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "iceberg", + "--output-format", "CSV", + "--execute", sql, + ) + output, err := cmd.CombinedOutput() + if err != nil { + outputStr := string(output) + if strings.Contains(outputStr, "already exists") { + return sanitizeTrinoOutput(outputStr) + } + t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s", err, sql, outputStr) + } + return sanitizeTrinoOutput(string(output)) +} + +func runTrinoSQLAllowNamespaceNotEmpty(t *testing.T, containerName, sql string) string { + t.Helper() + + var output []byte + var err error + for attempt := 0; attempt < 3; attempt++ { + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "iceberg", + "--output-format", "CSV", + "--execute", sql, + ) + output, err = cmd.CombinedOutput() + if err == nil { + return sanitizeTrinoOutput(string(output)) + } + outputStr := string(output) + if !strings.Contains(outputStr, "Namespace is not empty") { + t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s", err, sql, outputStr) + } + time.Sleep(500 * time.Millisecond) + } + t.Logf("Ignoring cleanup error for SQL %s: %s", sql, sanitizeTrinoOutput(string(output))) + return sanitizeTrinoOutput(string(output)) +} + +func runTrinoCTAS(t *testing.T, containerName, createSQL, deleteSQL, insertSQL string) { + t.Helper() + + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "iceberg", + "--output-format", "CSV", + "--execute", createSQL, + ) + output, err := cmd.CombinedOutput() + if err != nil { + outputStr := string(output) + if strings.Contains(outputStr, "already exists") { + if deleteSQL != "" { + runTrinoSQL(t, containerName, deleteSQL) + } + runTrinoSQL(t, containerName, insertSQL) + return + } + t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s", err, createSQL, outputStr) + } +} + func hasCSVDataRow(output string) bool { lines := strings.Split(strings.TrimSpace(output), "\n") if len(lines) == 0 { diff --git a/test/s3tables/catalog_trino/trino_catalog_test.go b/test/s3tables/catalog_trino/trino_catalog_test.go index 44beea80c..1a30791f0 100644 --- a/test/s3tables/catalog_trino/trino_catalog_test.go +++ b/test/s3tables/catalog_trino/trino_catalog_test.go @@ -17,6 +17,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" ) type TestEnvironment struct { @@ -57,12 +58,10 @@ func TestTrinoIcebergCatalog(t *testing.T) { env.StartSeaweedFS(t) fmt.Printf(">>> SeaweedFS started.\n") - catalogBucket := "warehouse" tableBucket := "iceberg-tables" + catalogBucket := tableBucket fmt.Printf(">>> Creating table bucket: %s\n", tableBucket) createTableBucket(t, env, tableBucket) - fmt.Printf(">>> Creating table bucket: %s\n", catalogBucket) - createTableBucket(t, env, catalogBucket) fmt.Printf(">>> All buckets created.\n") // Test Iceberg REST API directly @@ -117,7 +116,7 @@ func NewTestEnvironment(t *testing.T) *TestEnvironment { t.Fatalf("Failed to create temp dir: %v", err) } - bindIP := findBindIP() + bindIP := testutil.FindBindIP() masterPort, masterGrpcPort := mustFreePortPair(t, "Master") volumePort, volumeGrpcPort := mustFreePortPair(t, "Volume") @@ -149,29 +148,8 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { t.Helper() // Create IAM config file - iamConfigPath := filepath.Join(env.dataDir, "iam_config.json") - iamConfig := fmt.Sprintf(`{ - "identities": [ - { - "name": "admin", - "credentials": [ - { - "accessKey": "%s", - "secretKey": "%s" - } - ], - "actions": [ - "Admin", - "Read", - "List", - "Tagging", - "Write" - ] - } - ] -}`, env.accessKey, env.secretKey) - - if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil { + iamConfigPath, err := testutil.WriteIAMConfig(env.dataDir, env.accessKey, env.secretKey) + if err != nil { t.Fatalf("Failed to create IAM config: %v", err) } @@ -206,6 +184,8 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { cmd.Env = append(os.Environ(), "AWS_ACCESS_KEY_ID="+env.accessKey, "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "ICEBERG_WAREHOUSE=s3://iceberg-tables", + "S3TABLES_DEFAULT_BUCKET=iceberg-tables", ) if err := cmd.Start(); err != nil { @@ -282,12 +262,13 @@ func testIcebergRestAPI(t *testing.T, env *TestEnvironment) { fmt.Printf(">>> Testing Iceberg REST API directly...\n") // First, verify the service is listening - conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", env.bindIP, env.icebergPort)) + addr := net.JoinHostPort(env.bindIP, fmt.Sprintf("%d", env.icebergPort)) + conn, err := net.Dial("tcp", addr) if err != nil { - t.Fatalf("Cannot connect to Iceberg service at %s:%d: %v", env.bindIP, env.icebergPort, err) + t.Fatalf("Cannot connect to Iceberg service at %s: %v", addr, err) } conn.Close() - t.Logf("Successfully connected to Iceberg service at %s:%d", env.bindIP, env.icebergPort) + t.Logf("Successfully connected to Iceberg service at %s", addr) // Test /v1/config endpoint url := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort) @@ -319,7 +300,7 @@ func (env *TestEnvironment) writeTrinoConfig(t *testing.T, warehouseBucket strin config := fmt.Sprintf(`connector.name=iceberg iceberg.catalog.type=rest iceberg.rest-catalog.uri=http://host.docker.internal:%d -iceberg.rest-catalog.warehouse=s3tablescatalog/%s +iceberg.rest-catalog.warehouse=s3://%s iceberg.file-format=PARQUET iceberg.unique-table-location=true @@ -399,8 +380,7 @@ func waitForTrino(t *testing.T, containerName string, timeout time.Duration) { return } - logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() - t.Fatalf("Timed out waiting for Trino to be ready\nLast output:\n%s\nTrino logs:\n%s", string(lastOutput), string(logs)) + t.Fatalf("Timed out waiting for Trino to be ready\nLast output:\n%s", string(lastOutput)) } func runTrinoSQL(t *testing.T, containerName, sql string) string { @@ -413,8 +393,7 @@ func runTrinoSQL(t *testing.T, containerName, sql string) string { ) output, err := cmd.CombinedOutput() if err != nil { - logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() - t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s\nTrino logs:\n%s", err, sql, string(output), string(logs)) + t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s", err, sql, string(output)) } return sanitizeTrinoOutput(string(output)) } @@ -535,25 +514,6 @@ func getFreePort() (int, error) { return addr.Port, nil } -func findBindIP() string { - addrs, err := net.InterfaceAddrs() - if err != nil { - return "127.0.0.1" - } - for _, addr := range addrs { - ipNet, ok := addr.(*net.IPNet) - if !ok || ipNet.IP == nil { - continue - } - ip := ipNet.IP.To4() - if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() { - continue - } - return ip.String() - } - return "127.0.0.1" -} - func randomString(length int) string { const charset = "abcdefghijklmnopqrstuvwxyz0123456789" b := make([]byte, length) diff --git a/test/s3tables/catalog_trino/trino_crud_operations_test.go b/test/s3tables/catalog_trino/trino_crud_operations_test.go index cd78c08ae..e33d03e1c 100644 --- a/test/s3tables/catalog_trino/trino_crud_operations_test.go +++ b/test/s3tables/catalog_trino/trino_crud_operations_test.go @@ -24,10 +24,9 @@ func setupTrinoTest(t *testing.T) *TestEnvironment { t.Logf(">>> Starting SeaweedFS...") env.StartSeaweedFS(t) - catalogBucket := "warehouse" tableBucket := "iceberg-tables" + catalogBucket := tableBucket createTableBucket(t, env, tableBucket) - createTableBucket(t, env, catalogBucket) configDir := env.writeTrinoConfig(t, catalogBucket) env.startTrinoContainer(t, configDir) diff --git a/test/s3tables/testutil/weed_mini.go b/test/s3tables/testutil/weed_mini.go new file mode 100644 index 000000000..ff8260ef0 --- /dev/null +++ b/test/s3tables/testutil/weed_mini.go @@ -0,0 +1,56 @@ +package testutil + +import ( + "fmt" + "net" + "os" + "path/filepath" +) + +func FindBindIP() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "127.0.0.1" + } + for _, addr := range addrs { + ipNet, ok := addr.(*net.IPNet) + if !ok || ipNet.IP == nil { + continue + } + ip := ipNet.IP.To4() + if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() { + continue + } + return ip.String() + } + return "127.0.0.1" +} + +func WriteIAMConfig(dir, accessKey, secretKey string) (string, error) { + iamConfigPath := filepath.Join(dir, "iam_config.json") + iamConfig := fmt.Sprintf(`{ + "identities": [ + { + "name": "admin", + "credentials": [ + { + "accessKey": "%s", + "secretKey": "%s" + } + ], + "actions": [ + "Admin", + "Read", + "List", + "Tagging", + "Write" + ] + } + ] +}`, accessKey, secretKey) + + if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil { + return "", err + } + return iamConfigPath, nil +} diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 1b6cbe769..9f58d3f9d 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -276,8 +276,10 @@ func (f *Filer) ensureParentDirectoryEntry(ctx context.Context, entry *Entry, di // dirParts[0] == "" and dirParts[1] == "buckets" isUnderBuckets := len(dirParts) >= 3 && dirParts[1] == "buckets" if isUnderBuckets { - if err := s3bucket.VerifyS3BucketName(dirParts[2]); err != nil { - return fmt.Errorf("invalid bucket name %s: %v", dirParts[2], err) + if !strings.HasPrefix(dirParts[2], ".") { + if err := s3bucket.VerifyS3BucketName(dirParts[2]); err != nil { + return fmt.Errorf("invalid bucket name %s: %v", dirParts[2], err) + } } } diff --git a/weed/s3api/bucket_paths.go b/weed/s3api/bucket_paths.go index 5c7b9520a..de3adee0d 100644 --- a/weed/s3api/bucket_paths.go +++ b/weed/s3api/bucket_paths.go @@ -89,9 +89,6 @@ func (s3a *S3ApiServer) bucketDir(bucket string) string { if tablePath, ok := s3a.tableLocationDir(bucket); ok { return tablePath } - if s3a.isTableBucket(bucket) { - return s3tables.GetTableObjectBucketPath(bucket) - } return path.Join(s3a.bucketRoot(bucket), bucket) } diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index 96a6230bf..d96957c41 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -217,13 +217,14 @@ func (s *Server) saveMetadataFile(ctx context.Context, bucketName, tablePath, me return nil } + bucketDir := path.Join(bucketsPath, bucketName) // 1. Ensure bucket directory exists: / if err := ensureDir(bucketsPath, bucketName, "bucket directory"); err != nil { return err } - // 2. Ensure table path exists: // - tableDir := path.Join(bucketsPath, bucketName) + // 2. Ensure table path exists under the bucket directory + tableDir := bucketDir if tablePath != "" { segments := strings.Split(tablePath, "/") for _, segment := range segments { @@ -354,6 +355,9 @@ func getBucketFromPrefix(r *http.Request) string { if prefix := vars["prefix"]; prefix != "" { return prefix } + if bucket := os.Getenv("S3TABLES_DEFAULT_BUCKET"); bucket != "" { + return bucket + } // Default bucket if no prefix - use "warehouse" for Iceberg return "warehouse" } @@ -680,24 +684,32 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { // Generate UUID for the new table tableUUID := uuid.New() - location := strings.TrimSuffix(req.Location, "/") tablePath := path.Join(encodeNamespace(namespace), req.Name) - storageBucket := bucketName - tableLocationBucket := "" - if location != "" { + location := strings.TrimSuffix(req.Location, "/") + if location == "" { + if req.Properties != nil { + if warehouse := strings.TrimSuffix(req.Properties["warehouse"], "/"); warehouse != "" { + location = fmt.Sprintf("%s/%s", warehouse, tablePath) + } + } + if location == "" { + if warehouse := strings.TrimSuffix(os.Getenv("ICEBERG_WAREHOUSE"), "/"); warehouse != "" { + location = fmt.Sprintf("%s/%s", warehouse, tablePath) + } + } + if location == "" { + location = fmt.Sprintf("s3://%s/%s", bucketName, tablePath) + } + } else { parsedBucket, parsedPath, err := parseS3Location(location) if err != nil { writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid table location: "+err.Error()) return } - if strings.HasSuffix(parsedBucket, "--table-s3") && parsedPath == "" { - tableLocationBucket = parsedBucket + if parsedPath == "" { + location = fmt.Sprintf("s3://%s/%s", parsedBucket, tablePath) } } - if tableLocationBucket == "" { - tableLocationBucket = fmt.Sprintf("%s--table-s3", tableUUID.String()) - } - location = fmt.Sprintf("s3://%s", tableLocationBucket) // Build proper Iceberg table metadata using iceberg-go types metadata := newTableMetadata(tableUUID, location, req.Schema, req.PartitionSpec, req.WriteOrder, req.Properties) @@ -713,15 +725,21 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { return } - // 1. Save metadata file to filer tableName := req.Name metadataFileName := "v1.metadata.json" // Initial version is always 1 - if err := s.saveMetadataFile(r.Context(), storageBucket, tablePath, metadataFileName, metadataBytes); err != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error()) - return - } - metadataLocation := fmt.Sprintf("%s/metadata/%s", location, metadataFileName) + if !req.StageCreate { + // Save metadata file to filer for immediate table creation. + metadataBucket, metadataPath, err := parseS3Location(location) + if err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error()) + return + } + if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error()) + return + } + } // Use S3 Tables manager to create table createReq := &s3tables.CreateTableRequest{ @@ -746,8 +764,42 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { }) if err != nil { + if tableErr, ok := err.(*s3tables.S3TablesError); ok && tableErr.Type == s3tables.ErrCodeTableAlreadyExists { + getReq := &s3tables.GetTableRequest{ + TableBucketARN: bucketARN, + Namespace: namespace, + Name: tableName, + } + var getResp s3tables.GetTableResponse + getErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + mgrClient := s3tables.NewManagerClient(client) + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) + }) + if getErr != nil { + writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error()) + return + } + result := buildLoadTableResult(getResp, bucketName, namespace, tableName) + writeJSON(w, http.StatusOK, result) + return + } if strings.Contains(err.Error(), "already exists") { - writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error()) + getReq := &s3tables.GetTableRequest{ + TableBucketARN: bucketARN, + Namespace: namespace, + Name: tableName, + } + var getResp s3tables.GetTableResponse + getErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + mgrClient := s3tables.NewManagerClient(client) + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) + }) + if getErr != nil { + writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error()) + return + } + result := buildLoadTableResult(getResp, bucketName, namespace, tableName) + writeJSON(w, http.StatusOK, result) return } glog.V(1).Infof("Iceberg: CreateTable error: %v", err) @@ -809,7 +861,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) { return } - // Build table metadata using iceberg-go types + result := buildLoadTableResult(getResp, bucketName, namespace, tableName) + writeJSON(w, http.StatusOK, result) +} + +func buildLoadTableResult(getResp s3tables.GetTableResponse, bucketName string, namespace []string, tableName string) LoadTableResult { location := tableLocationFromMetadataLocation(getResp.MetadataLocation) if location == "" { location = fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), tableName) @@ -840,12 +896,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) { metadata = newTableMetadata(tableUUID, location, nil, nil, nil, nil) } - result := LoadTableResult{ + return LoadTableResult{ MetadataLocation: getResp.MetadataLocation, Metadata: metadata, Config: make(iceberg.Properties), } - writeJSON(w, http.StatusOK, result) } // handleTableExists checks if a table exists. @@ -943,13 +998,53 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { // Extract identity from context identityName := s3_constants.GetIdentityNameFromContext(r) - // Parse the commit request - var req CommitTableRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + // Parse the commit request, skipping update actions not supported by iceberg-go. + var raw struct { + Identifier *TableIdentifier `json:"identifier,omitempty"` + Requirements json.RawMessage `json:"requirements"` + Updates []json.RawMessage `json:"updates"` + } + if err := json.NewDecoder(r.Body).Decode(&raw); err != nil { writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid request body: "+err.Error()) return } + var req CommitTableRequest + req.Identifier = raw.Identifier + if len(raw.Requirements) > 0 { + if err := json.Unmarshal(raw.Requirements, &req.Requirements); err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid requirements: "+err.Error()) + return + } + } + if len(raw.Updates) > 0 { + filtered := make([]json.RawMessage, 0, len(raw.Updates)) + for _, update := range raw.Updates { + var action struct { + Action string `json:"action"` + } + if err := json.Unmarshal(update, &action); err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid update: "+err.Error()) + return + } + if action.Action == "set-statistics" { + continue + } + filtered = append(filtered, update) + } + if len(filtered) > 0 { + updatesBytes, err := json.Marshal(filtered) + if err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse updates: "+err.Error()) + return + } + if err := json.Unmarshal(updatesBytes, &req.Updates); err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid updates: "+err.Error()) + return + } + } + } + // First, load current table metadata getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, @@ -1049,8 +1144,12 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { } // 1. Save metadata file to filer - tablePath := path.Join(encodeNamespace(namespace), tableName) - if err := s.saveMetadataFile(r.Context(), bucketName, tablePath, metadataFileName, metadataBytes); err != nil { + metadataBucket, metadataPath, err := parseS3Location(location) + if err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error()) + return + } + if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil { writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error()) return } diff --git a/weed/s3api/s3api_object_handlers_list.go b/weed/s3api/s3api_object_handlers_list.go index 1a4cf84dd..731972f75 100644 --- a/weed/s3api/s3api_object_handlers_list.go +++ b/weed/s3api/s3api_object_handlers_list.go @@ -524,7 +524,6 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d stream, listErr := client.ListEntries(ctx, request) if listErr != nil { if errors.Is(listErr, filer_pb.ErrNotFound) { - err = filer_pb.ErrNotFound return } err = fmt.Errorf("list entries %+v: %w", request, listErr) diff --git a/weed/s3api/s3tables/handler_bucket_create.go b/weed/s3api/s3tables/handler_bucket_create.go index 3b4cac7e4..44971dbb9 100644 --- a/weed/s3api/s3tables/handler_bucket_create.go +++ b/weed/s3api/s3tables/handler_bucket_create.go @@ -105,14 +105,6 @@ func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http } } - // Ensure object root directory exists for table bucket S3 operations - if err := h.ensureDirectory(r.Context(), client, GetTableObjectRootDir()); err != nil { - return fmt.Errorf("failed to create table object root directory: %w", err) - } - if err := h.ensureDirectory(r.Context(), client, GetTableObjectBucketPath(req.Name)); err != nil { - return fmt.Errorf("failed to create table object bucket directory: %w", err) - } - // Create bucket directory if err := h.createDirectory(r.Context(), client, bucketPath); err != nil { return err diff --git a/weed/s3api/s3tables/handler_namespace.go b/weed/s3api/s3tables/handler_namespace.go index 492a53241..b6a18d826 100644 --- a/weed/s3api/s3tables/handler_namespace.go +++ b/weed/s3api/s3tables/handler_namespace.go @@ -50,12 +50,38 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R var bucketMetadata tableBucketMetadata var bucketPolicy string var bucketTags map[string]string + ownerAccountID := h.getAccountID(r) err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { data, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata) if err != nil { - return err - } - if err := json.Unmarshal(data, &bucketMetadata); err != nil { + if errors.Is(err, ErrAttributeNotFound) { + dir, name := splitPath(bucketPath) + entryResp, lookupErr := filer_pb.LookupEntry(r.Context(), client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + }) + if lookupErr != nil { + return lookupErr + } + if entryResp.Entry == nil || !IsTableBucketEntry(entryResp.Entry) { + return filer_pb.ErrNotFound + } + bucketMetadata = tableBucketMetadata{ + Name: bucketName, + CreatedAt: time.Now(), + OwnerAccountID: ownerAccountID, + } + metadataBytes, err := json.Marshal(&bucketMetadata) + if err != nil { + return fmt.Errorf("failed to marshal bucket metadata: %w", err) + } + if err := h.setExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata, metadataBytes); err != nil { + return err + } + } else { + return err + } + } else if err := json.Unmarshal(data, &bucketMetadata); err != nil { return fmt.Errorf("failed to unmarshal bucket metadata: %w", err) } diff --git a/weed/s3api/s3tables/handler_table.go b/weed/s3api/s3tables/handler_table.go index b1ae0f8e1..77e6691fd 100644 --- a/weed/s3api/s3tables/handler_table.go +++ b/weed/s3api/s3tables/handler_table.go @@ -164,14 +164,26 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque tablePath := GetTablePath(bucketName, namespaceName, tableName) // Check if table already exists + var existingMetadata tableMetadataInternal err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - _, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata) - return err + data, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata) + if err != nil { + return err + } + if unmarshalErr := json.Unmarshal(data, &existingMetadata); unmarshalErr != nil { + return fmt.Errorf("failed to parse existing table metadata: %w", unmarshalErr) + } + return nil }) if err == nil { - h.writeError(w, http.StatusConflict, ErrCodeTableAlreadyExists, fmt.Sprintf("table %s already exists", tableName)) - return fmt.Errorf("table already exists") + tableARN := h.generateTableARN(existingMetadata.OwnerAccountID, bucketName, namespaceName+"/"+tableName) + h.writeJSON(w, http.StatusOK, &CreateTableResponse{ + TableARN: tableARN, + VersionToken: existingMetadata.VersionToken, + MetadataLocation: existingMetadata.MetadataLocation, + }) + return nil } else if !errors.Is(err, filer_pb.ErrNotFound) && !errors.Is(err, ErrAttributeNotFound) { h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check table: %v", err)) return err @@ -201,14 +213,14 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque } err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - // Create table directory - if err := h.createDirectory(r.Context(), client, tablePath); err != nil { + // Ensure table directory exists (may already be created by object storage clients) + if err := h.ensureDirectory(r.Context(), client, tablePath); err != nil { return err } // Create data subdirectory for Iceberg files dataPath := tablePath + "/data" - if err := h.createDirectory(r.Context(), client, dataPath); err != nil { + if err := h.ensureDirectory(r.Context(), client, dataPath); err != nil { return err }