diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 1b1d0763d..5f6ee7b91 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -27,6 +27,9 @@ jobs: go-version-file: 'go.mod' id: go + - name: Run go mod tidy + run: go mod tidy + - name: Install SeaweedFS run: | go install -buildvcs=false ./weed @@ -84,6 +87,9 @@ jobs: go-version-file: 'go.mod' id: go + - name: Run go mod tidy + run: go mod tidy + - name: Run Iceberg Catalog Integration Tests timeout-minutes: 25 working-directory: test/s3tables/catalog @@ -143,6 +149,9 @@ jobs: - name: Pre-pull Trino image run: docker pull trinodb/trino:479 + - name: Run go mod tidy + run: go mod tidy + - name: Install SeaweedFS run: | go install -buildvcs=false ./weed @@ -185,6 +194,72 @@ jobs: path: test/s3tables/catalog_trino/test-output.log retention-days: 3 + spark-iceberg-catalog-tests: + name: Spark Iceberg Catalog Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Set up Docker + uses: docker/setup-buildx-action@v3 + + - name: Pre-pull Spark image + run: docker pull apache/spark:3.5.1 + + - name: Run go mod tidy + run: go mod tidy + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Run Spark Iceberg Catalog Integration Tests + timeout-minutes: 25 + working-directory: test/s3tables/catalog_spark + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + echo "=== Starting Spark Iceberg Catalog Tests ===" + + # Run Spark + Iceberg catalog integration tests + go test -v -timeout 20m . 2>&1 | tee test-output.log || { + echo "Spark Iceberg catalog integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3tables/catalog_spark + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker)" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v6 + with: + name: spark-iceberg-catalog-test-logs + path: test/s3tables/catalog_spark/test-output.log + retention-days: 3 + s3-tables-build-verification: name: S3 Tables Build Verification runs-on: ubuntu-22.04 diff --git a/go.mod b/go.mod index a244a069e..bf6a0ca37 100644 --- a/go.mod +++ b/go.mod @@ -155,6 +155,7 @@ require ( github.com/seaweedfs/go-fuse/v2 v2.9.1 github.com/shirou/gopsutil/v4 v4.26.1 github.com/tarantool/go-tarantool/v2 v2.4.1 + github.com/testcontainers/testcontainers-go v0.39.0 github.com/tikv/client-go/v2 v2.0.7 github.com/xeipuuv/gojsonschema v1.2.0 github.com/ydb-platform/ydb-go-sdk-auth-environ v0.5.1 @@ -174,7 +175,9 @@ require ( atomicgo.dev/schedule v0.1.0 // indirect cloud.google.com/go/longrunning v0.7.0 // indirect cloud.google.com/go/pubsub/v2 v2.2.1 // indirect + dario.cat/mergo v1.0.2 // indirect github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.1 // indirect + github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest/to v0.4.1 // indirect github.com/a1ex3/zstd-seekable-format-go/pkg v0.10.0 // indirect @@ -199,8 +202,17 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/version v0.0.0-20250314144055-3860cd14adf2 // indirect github.com/containerd/console v1.0.5 // indirect + github.com/containerd/errdefs v1.0.0 // indirect + github.com/containerd/errdefs/pkg v0.3.0 // indirect + github.com/containerd/log v0.1.0 // indirect + github.com/containerd/platforms v1.0.0-rc.1 // indirect + github.com/cpuguy83/dockercfg v0.3.2 // indirect github.com/dave/dst v0.27.2 // indirect github.com/diskfs/go-diskfs v1.7.0 // indirect + github.com/distribution/reference v0.6.0 // indirect + github.com/docker/docker v28.5.0+incompatible // indirect + github.com/docker/go-connections v0.6.0 // indirect + github.com/docker/go-units v0.5.0 // indirect github.com/go-asn1-ber/asn1-ber v1.5.8-0.20250403174932-29230038a667 // indirect github.com/go-git/go-billy/v5 v5.6.2 // indirect github.com/goccy/go-yaml v1.18.0 // indirect @@ -225,8 +237,19 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/lithammer/fuzzysearch v1.1.8 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect + github.com/magiconair/properties v1.8.10 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect + github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/go-archive v0.1.0 // indirect + github.com/moby/patternmatcher v0.6.0 // indirect + github.com/moby/sys/sequential v0.6.0 // indirect + github.com/moby/sys/user v0.4.0 // indirect + github.com/moby/sys/userns v0.1.0 // indirect + github.com/moby/term v0.5.2 // indirect + github.com/morikuni/aec v1.0.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.1 // indirect github.com/openzipkin/zipkin-go v0.4.3 // indirect github.com/parquet-go/bitpack v1.0.0 // indirect github.com/parquet-go/jsonlite v1.0.0 // indirect diff --git a/go.sum b/go.sum index 7e3d19b08..3d090234f 100644 --- a/go.sum +++ b/go.sum @@ -904,6 +904,8 @@ github.com/coreos/go-systemd/v22 v22.6.0/go.mod h1:iG+pp635Fo7ZmV/j14KUcmEyWF+0X github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA= github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/creasty/defaults v1.8.0 h1:z27FJxCAa0JKt3utc0sCImAEb+spPucmKoOdLHvHYKk= github.com/creasty/defaults v1.8.0/go.mod h1:iGzKe6pbEHnpMPtfDXZEr0NVxWnPTjb1bbDy08fPzYM= github.com/cronokirby/saferith v0.33.0 h1:TgoQlfsD4LIwx71+ChfRcIpjkw+RPOapDEVxa+LhwLo= @@ -2966,6 +2968,8 @@ gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= +gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q= +gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/test/s3tables/catalog_spark/setup_test.go b/test/s3tables/catalog_spark/setup_test.go new file mode 100644 index 000000000..9e889b53e --- /dev/null +++ b/test/s3tables/catalog_spark/setup_test.go @@ -0,0 +1,395 @@ +package catalog_spark + +import ( + "context" + "fmt" + "io" + "math/rand" + "net" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" + "github.com/testcontainers/testcontainers-go" +) + +var ( + miniProcessMu sync.Mutex + lastMiniProcess *exec.Cmd +) + +func stopPreviousMini() { + miniProcessMu.Lock() + defer miniProcessMu.Unlock() + + if lastMiniProcess != nil && lastMiniProcess.Process != nil { + _ = lastMiniProcess.Process.Kill() + _ = lastMiniProcess.Wait() + } + lastMiniProcess = nil +} + +func registerMiniProcess(cmd *exec.Cmd) { + miniProcessMu.Lock() + lastMiniProcess = cmd + miniProcessMu.Unlock() +} + +func clearMiniProcess(cmd *exec.Cmd) { + miniProcessMu.Lock() + if lastMiniProcess == cmd { + lastMiniProcess = nil + } + miniProcessMu.Unlock() +} + +type TestEnvironment struct { + t *testing.T + dockerAvailable bool + seaweedfsDataDir string + sparkConfigDir string + masterPort int + filerPort int + s3Port int + icebergRestPort int + accessKey string + secretKey string + sparkContainer testcontainers.Container + masterProcess *exec.Cmd +} + +func NewTestEnvironment(t *testing.T) *TestEnvironment { + env := &TestEnvironment{ + t: t, + accessKey: "test", + secretKey: "test", + } + + // Check if Docker is available + cmd := exec.Command("docker", "version") + env.dockerAvailable = cmd.Run() == nil + + return env +} + +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + stopPreviousMini() + + var err error + env.seaweedfsDataDir, err = os.MkdirTemp("", "seaweed-spark-test-") + if err != nil { + t.Fatalf("failed to create temp directory: %v", err) + } + + env.masterPort = mustFreePort(t, "Master") + env.filerPort = mustFreePort(t, "Filer") + env.s3Port = mustFreePort(t, "S3") + env.icebergRestPort = mustFreePort(t, "Iceberg") + + bindIP := testutil.FindBindIP() + + iamConfigPath, err := testutil.WriteIAMConfig(env.seaweedfsDataDir, env.accessKey, env.secretKey) + if err != nil { + t.Fatalf("failed to create IAM config: %v", err) + } + + // Start SeaweedFS using weed mini (all-in-one including Iceberg REST) + env.masterProcess = exec.Command( + "weed", "mini", + "-ip", bindIP, + "-ip.bind", "0.0.0.0", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.iceberg", fmt.Sprintf("%d", env.icebergRestPort), + "-s3.config", iamConfigPath, + "-dir", env.seaweedfsDataDir, + ) + env.masterProcess.Env = append(os.Environ(), + "AWS_ACCESS_KEY_ID="+env.accessKey, + "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "ICEBERG_WAREHOUSE=s3://iceberg-tables", + "S3TABLES_DEFAULT_BUCKET=iceberg-tables", + ) + if err := env.masterProcess.Start(); err != nil { + t.Fatalf("failed to start weed mini: %v", err) + } + registerMiniProcess(env.masterProcess) + + // Wait for all services to be ready + if !waitForPort(env.masterPort, 15*time.Second) { + t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort) + } + if !waitForPort(env.filerPort, 15*time.Second) { + t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort) + } + if !waitForPort(env.s3Port, 15*time.Second) { + t.Fatalf("weed mini failed to start - s3 port %d not listening", env.s3Port) + } + if !waitForPort(env.icebergRestPort, 15*time.Second) { + t.Fatalf("weed mini failed to start - iceberg rest port %d not listening", env.icebergRestPort) + } +} + +func mustFreePort(t *testing.T, name string) int { + t.Helper() + + for i := 0; i < 200; i++ { + port := 20000 + rand.Intn(30000) + listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port)) + if err != nil { + continue + } + listener.Close() + grpcPort := port + 10000 + if grpcPort > 65535 { + continue + } + grpcListener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", grpcPort)) + if err != nil { + continue + } + grpcListener.Close() + return port + } + t.Fatalf("failed to get free port for %s", name) + return 0 +} + +func waitForPort(port int, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 500*time.Millisecond) + if err == nil { + conn.Close() + return true + } + time.Sleep(100 * time.Millisecond) + } + return false +} + +func (env *TestEnvironment) writeSparkConfig(t *testing.T, catalogBucket string) string { + t.Helper() + + configDir, err := os.MkdirTemp("", "spark-config-") + if err != nil { + t.Fatalf("failed to create config directory: %v", err) + } + + // Store for cleanup + env.sparkConfigDir = configDir + + s3Endpoint := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) + catalogEndpoint := fmt.Sprintf("http://host.docker.internal:%d", env.icebergRestPort) + + sparkConfig := fmt.Sprintf(` +[spark] +master = "local" +app.name = "SeaweedFS Iceberg Test" + +[storage] +s3.endpoint = "%s" +s3.access-key = "test" +s3.secret-key = "test" +s3.path-style-access = "true" +s3.bucket = "%s" + +[iceberg] +catalog.type = "rest" +catalog.uri = "%s" +catalog.s3.endpoint = "%s" +catalog.s3.access-key = "test" +catalog.s3.secret-key = "test" +catalog.s3.path-style-access = "true" +`, s3Endpoint, catalogBucket, catalogEndpoint, s3Endpoint) + + configPath := filepath.Join(configDir, "spark-config.ini") + if err := os.WriteFile(configPath, []byte(sparkConfig), 0644); err != nil { + t.Fatalf("failed to write spark config: %v", err) + } + + return configDir +} + +func (env *TestEnvironment) startSparkContainer(t *testing.T, configDir string) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + req := testcontainers.ContainerRequest{ + Image: "apache/spark:3.5.1", + ExposedPorts: []string{"4040/tcp"}, + Mounts: testcontainers.Mounts( + testcontainers.BindMount(configDir, "/config"), + ), + Env: map[string]string{ + "SPARK_LOCAL_IP": "localhost", + }, + ExtraHosts: []string{"host.docker.internal:host-gateway"}, + Cmd: []string{"/bin/sh", "-c", "sleep 3600"}, + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + t.Fatalf("failed to start spark container: %v", err) + } + + env.sparkContainer = container +} + +func (env *TestEnvironment) Cleanup(t *testing.T) { + t.Helper() + + // Kill weed mini process + if env.masterProcess != nil && env.masterProcess.Process != nil { + env.masterProcess.Process.Kill() + env.masterProcess.Wait() + } + clearMiniProcess(env.masterProcess) + + // Stop Spark container + if env.sparkContainer != nil { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + env.sparkContainer.Terminate(ctx) + } + + // Remove temporary directories after processes are stopped + if env.seaweedfsDataDir != "" { + os.RemoveAll(env.seaweedfsDataDir) + } + if env.sparkConfigDir != "" { + os.RemoveAll(env.sparkConfigDir) + } +} + +func randomString(n int) string { + const letters = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, n) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} + +func runSparkPySQL(t *testing.T, container testcontainers.Container, sql string, icebergPort int, s3Port int) string { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + pythonScript := fmt.Sprintf(` +import glob +import os +import sys + +spark_home = os.environ.get("SPARK_HOME", "/opt/spark") +python_path = os.path.join(spark_home, "python") +py4j_glob = glob.glob(os.path.join(python_path, "lib", "py4j-*.zip")) +ivy_dir = "/tmp/ivy" +os.makedirs(ivy_dir, exist_ok=True) +os.environ["AWS_REGION"] = "us-west-2" +os.environ["AWS_DEFAULT_REGION"] = "us-west-2" +os.environ["AWS_ACCESS_KEY_ID"] = "test" +os.environ["AWS_SECRET_ACCESS_KEY"] = "test" +if python_path not in sys.path: + sys.path.insert(0, python_path) +if py4j_glob and py4j_glob[0] not in sys.path: + sys.path.insert(0, py4j_glob[0]) + +from pyspark.sql import SparkSession + +spark = (SparkSession.builder + .appName("SeaweedFS Iceberg Test") + .config("spark.jars.ivy", ivy_dir) + .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.iceberg.type", "rest") + .config("spark.sql.catalog.iceberg.metrics-reporter-impl", "org.apache.iceberg.metrics.LoggingMetricsReporter") + .config("spark.sql.catalog.iceberg.uri", "http://host.docker.internal:%d") + .config("spark.sql.catalog.iceberg.rest.auth.type", "sigv4") + .config("spark.sql.catalog.iceberg.rest.auth.sigv4.delegate-auth-type", "none") + .config("spark.sql.catalog.iceberg.rest.sigv4-enabled", "true") + .config("spark.sql.catalog.iceberg.rest.signing-region", "us-west-2") + .config("spark.sql.catalog.iceberg.rest.signing-name", "s3") + .config("spark.sql.catalog.iceberg.rest.access-key-id", "test") + .config("spark.sql.catalog.iceberg.rest.secret-access-key", "test") + .config("spark.sql.catalog.iceberg.s3.endpoint", "http://host.docker.internal:%d") + .config("spark.sql.catalog.iceberg.s3.region", "us-west-2") + .config("spark.sql.catalog.iceberg.s3.access-key", "test") + .config("spark.sql.catalog.iceberg.s3.secret-key", "test") + .config("spark.sql.catalog.iceberg.s3.path-style-access", "true") + .config("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .getOrCreate()) + +%s +`, icebergPort, s3Port, sql) + + code, out, err := container.Exec(ctx, []string{"python3", "-c", pythonScript}) + var output string + if out != nil { + outputBytes, readErr := io.ReadAll(out) + if readErr != nil { + t.Logf("failed to read output: %v", readErr) + } else { + output = string(outputBytes) + } + } + if code != 0 { + t.Logf("Spark Python execution failed with code %d: %v, output: %s", code, err, output) + return output + } + + return output +} + +func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + masterGrpcPort := env.masterPort + 10000 + cmd := exec.Command("weed", "shell", + fmt.Sprintf("-master=localhost:%d.%d", env.masterPort, masterGrpcPort), + ) + cmd.Stdin = strings.NewReader(fmt.Sprintf("s3tables.bucket -create -name %s -account 000000000000\nexit\n", bucketName)) + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("failed to create table bucket %s via weed shell: %v\nOutput: %s", bucketName, err, string(output)) + } +} + +func createObjectBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + cfg := aws.Config{ + Region: "us-east-1", + Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "")), + BaseEndpoint: aws.String(fmt.Sprintf("http://localhost:%d", env.s3Port)), + } + + client := s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = true + }) + + _, err := client.CreateBucket(context.Background(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + t.Fatalf("failed to create object bucket %s: %v", bucketName, err) + } +} diff --git a/test/s3tables/catalog_spark/spark_operations_test.go b/test/s3tables/catalog_spark/spark_operations_test.go new file mode 100644 index 000000000..55437c8e7 --- /dev/null +++ b/test/s3tables/catalog_spark/spark_operations_test.go @@ -0,0 +1,279 @@ +package catalog_spark + +import ( + "fmt" + "regexp" + "strings" + "testing" + "time" + + "github.com/testcontainers/testcontainers-go" +) + +// waitForSparkReady polls Spark to verify it's ready by executing a simple query +func waitForSparkReady(t *testing.T, container testcontainers.Container, icebergPort int, s3Port int, timeout time.Duration) { + t.Helper() + + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + output := runSparkPySQL(t, container, ` +spark.sql("SELECT 1 as test") +print("Spark ready") +`, icebergPort, s3Port) + if strings.Contains(output, "Spark ready") { + return + } + time.Sleep(5 * time.Second) + } + t.Fatalf("Spark did not become ready within %v", timeout) +} + +// setupSparkTestEnv initializes a test environment with SeaweedFS and Spark containers +func setupSparkTestEnv(t *testing.T) (*TestEnvironment, string, string) { + t.Helper() + + env := NewTestEnvironment(t) + + if !env.dockerAvailable { + t.Skip("Docker not available, skipping Spark integration test") + } + + t.Logf(">>> Starting SeaweedFS...") + env.StartSeaweedFS(t) + t.Cleanup(func() { env.Cleanup(t) }) + + tableBucket := "iceberg-tables" + catalogBucket := tableBucket + createTableBucket(t, env, tableBucket) + + configDir := env.writeSparkConfig(t, catalogBucket) + env.startSparkContainer(t, configDir) + + // Poll for Spark readiness instead of fixed sleep + waitForSparkReady(t, env.sparkContainer, env.icebergRestPort, env.s3Port, 10*time.Minute) + + return env, catalogBucket, tableBucket +} + +// TestSparkCatalogBasicOperations tests basic Spark Iceberg catalog operations +func TestSparkCatalogBasicOperations(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + env, _, _ := setupSparkTestEnv(t) + + // Test 1: Create a namespace (database) + t.Logf(">>> Test 1: Creating namespace") + namespace := "spark_test_" + randomString(6) + sparkSQL := fmt.Sprintf(` +spark.sql("CREATE NAMESPACE iceberg.%s") +print("Namespace created") +`, namespace) + output := runSparkPySQL(t, env.sparkContainer, sparkSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Namespace created") { + t.Fatalf("namespace creation failed, output: %s", output) + } + + // Test 2: Create a table + t.Logf(">>> Test 2: Creating table") + tableName := "test_table_" + randomString(6) + createTableSQL := fmt.Sprintf(` +spark.sql(""" +CREATE TABLE iceberg.%s.%s ( + id INT, + name STRING, + age INT +) +USING iceberg +""") +print("Table created") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, createTableSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Table created") { + t.Fatalf("table creation failed, output: %s", output) + } + + // Test 3: Insert data + t.Logf(">>> Test 3: Inserting data") + insertDataSQL := fmt.Sprintf(` +spark.sql(""" +INSERT INTO iceberg.%s.%s VALUES + (1, 'Alice', 30), + (2, 'Bob', 25), + (3, 'Charlie', 35) +""") +print("Data inserted") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, insertDataSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Data inserted") { + t.Fatalf("data insertion failed, output: %s", output) + } + + // Test 4: Query data + t.Logf(">>> Test 4: Querying data") + querySQL := fmt.Sprintf(` +result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s") +result.show() +count = result.collect()[0]['count'] +print(f"Row count: {count}") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, querySQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Row count: 3") { + t.Errorf("expected row count 3, got output: %s", output) + } + + // Test 5: Update data + t.Logf(">>> Test 5: Updating data") + updateSQL := fmt.Sprintf(` +spark.sql(""" +UPDATE iceberg.%s.%s SET age = 31 WHERE id = 1 +""") +print("Data updated") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, updateSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Data updated") { + t.Errorf("data update failed, output: %s", output) + } + + // Test 6: Delete data + t.Logf(">>> Test 6: Deleting data") + deleteSQL := fmt.Sprintf(` +spark.sql(""" +DELETE FROM iceberg.%s.%s WHERE id = 3 +""") +print("Data deleted") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, deleteSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Data deleted") { + t.Errorf("data delete failed, output: %s", output) + } + + // Verify final count + t.Logf(">>> Verifying final data") + finalCountSQL := fmt.Sprintf(` +result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s") +result.show() +count = result.collect()[0]['count'] +print(f"Final row count: {count}") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, finalCountSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Final row count: 2") { + t.Errorf("expected final row count 2, got output: %s", output) + } + + t.Logf(">>> All tests passed") +} + +// TestSparkTimeTravel tests Spark Iceberg time travel capabilities +func TestSparkTimeTravel(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + env, _, _ := setupSparkTestEnv(t) + + namespace := "time_travel_test_" + randomString(6) + tableName := "tt_table_" + randomString(6) + + // Create namespace and table + setupSQL := fmt.Sprintf(` +spark.sql("CREATE NAMESPACE iceberg.%s") +spark.sql(""" +CREATE TABLE iceberg.%s.%s ( + id INT, + value INT +) +USING iceberg +""") +print("Setup complete") +`, namespace, namespace, tableName) + output := runSparkPySQL(t, env.sparkContainer, setupSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Setup complete") { + t.Fatalf("setup failed for namespace %s and table %s, output: %s", namespace, tableName, output) + } + + // Insert initial data + t.Logf(">>> Inserting initial data") + insertSQL := fmt.Sprintf(` +import time +from datetime import timedelta +spark.sql(""" +INSERT INTO iceberg.%s.%s VALUES (1, 10) +""") +ts = None +for _ in range(10): + try: + ts = spark.sql("SELECT committed_at FROM iceberg.%s.%s.snapshots ORDER BY committed_at DESC LIMIT 1").collect()[0]["committed_at"] + if ts is not None: + break + except Exception as e: + print(f"Snapshot query failed: {e}") + time.sleep(1) +if ts is None: + raise RuntimeError("Failed to read snapshot committed_at") +ts_for_time_travel = ts + timedelta(seconds=1) +print(f"Snapshot timestamp: {ts_for_time_travel.strftime('%%Y-%%m-%%d %%H:%%M:%%S')}") +`, namespace, tableName, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, insertSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Snapshot timestamp:") { + t.Fatalf("failed to get snapshot timestamp: %s", output) + } + + // Extract snapshot timestamp from output - look specifically for the "Snapshot timestamp:" line + var snapshotTS string + tsRe := regexp.MustCompile(`Snapshot timestamp:\s*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})`) + matches := tsRe.FindStringSubmatch(output) + if len(matches) > 1 { + snapshotTS = matches[1] + } + + if snapshotTS == "" { + t.Fatalf("could not extract snapshot timestamp from output: %s", output) + } + + // Wait to ensure the next insert gets a distinct snapshot timestamp + time.Sleep(2 * time.Second) + + // Insert more data + t.Logf(">>> Inserting more data") + insertMoreSQL := fmt.Sprintf(` +spark.sql(""" +INSERT INTO iceberg.%s.%s VALUES (2, 20) +""") +print("More data inserted") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, insertMoreSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "More data inserted") { + t.Fatalf("failed to insert more data, output: %s", output) + } + + // Verify count increased to 2 + t.Logf(">>> Verifying row count after second insert") + verifySQL := fmt.Sprintf(` +result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s") +count = result.collect()[0]['count'] +print(f"Current row count: {count}") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, verifySQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Current row count: 2") { + t.Fatalf("expected current row count 2 after second insert, got output: %s", output) + } + + // Time travel to first snapshot + t.Logf(">>> Time traveling to first snapshot") + timeTravelSQL := fmt.Sprintf(` +result = spark.sql(""" +SELECT COUNT(*) as count FROM iceberg.%s.%s TIMESTAMP AS OF '%s' +""") +result.show() +count = result.collect()[0]['count'] +print(f"Count at snapshot: {count}") +`, namespace, tableName, snapshotTS) + output = runSparkPySQL(t, env.sparkContainer, timeTravelSQL, env.icebergRestPort, env.s3Port) + if !strings.Contains(output, "Count at snapshot: 1") { + t.Errorf("expected count 1 at first snapshot, got: %s", output) + } + + t.Logf(">>> Time travel test passed") +} diff --git a/test/s3tables/catalog_trino/trino_blog_operations_test.go b/test/s3tables/catalog_trino/trino_blog_operations_test.go index 6603507d5..54ad9b7bb 100644 --- a/test/s3tables/catalog_trino/trino_blog_operations_test.go +++ b/test/s3tables/catalog_trino/trino_blog_operations_test.go @@ -2,6 +2,7 @@ package catalog_trino import ( "fmt" + "os/exec" "strconv" "strings" "testing" @@ -15,11 +16,16 @@ func TestTrinoBlogOperations(t *testing.T) { schemaName := "blog_ns_" + randomString(6) customersTable := "customers_" + randomString(6) trinoCustomersTable := "trino_customers_" + randomString(6) + warehouseBucket := "iceberg-tables" + customersLocation := fmt.Sprintf("s3://%s/%s/%s_%s", warehouseBucket, schemaName, customersTable, randomString(6)) + trinoCustomersLocation := fmt.Sprintf("s3://%s/%s/%s_%s", warehouseBucket, schemaName, trinoCustomersTable, randomString(6)) runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS iceberg.%s", schemaName)) - defer runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP SCHEMA IF EXISTS iceberg.%s", schemaName)) + defer runTrinoSQLAllowNamespaceNotEmpty(t, env.trinoContainer, fmt.Sprintf("DROP SCHEMA IF EXISTS iceberg.%s CASCADE", schemaName)) defer runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP TABLE IF EXISTS iceberg.%s.%s", schemaName, trinoCustomersTable)) defer runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP TABLE IF EXISTS iceberg.%s.%s", schemaName, customersTable)) + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP TABLE IF EXISTS iceberg.%s.%s", schemaName, trinoCustomersTable)) + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP TABLE IF EXISTS iceberg.%s.%s", schemaName, customersTable)) createCustomersSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS iceberg.%s.%s ( customer_sk INT, @@ -35,9 +41,10 @@ func TestTrinoBlogOperations(t *testing.T) { login VARCHAR ) WITH ( format = 'PARQUET', - sorted_by = ARRAY['customer_id'] -)`, schemaName, customersTable) - runTrinoSQL(t, env.trinoContainer, createCustomersSQL) + sorted_by = ARRAY['customer_id'], + location = '%s' +)`, schemaName, customersTable, customersLocation) + runTrinoSQLAllowExists(t, env.trinoContainer, createCustomersSQL) insertCustomersSQL := fmt.Sprintf(`INSERT INTO iceberg.%s.%s VALUES (1, 'AAAAA', 'Mrs', 'Amanda', 'Olson', 'Y', 8, 4, 1984, 'US', 'aolson'), @@ -64,10 +71,13 @@ func TestTrinoBlogOperations(t *testing.T) { ctasSQL := fmt.Sprintf(`CREATE TABLE iceberg.%s.%s WITH ( - format = 'PARQUET' + format = 'PARQUET', + location = '%s' ) -AS SELECT * FROM iceberg.%s.%s`, schemaName, trinoCustomersTable, schemaName, customersTable) - runTrinoSQL(t, env.trinoContainer, ctasSQL) +AS SELECT * FROM iceberg.%s.%s`, schemaName, trinoCustomersTable, trinoCustomersLocation, schemaName, customersTable) + ctasInsertSQL := fmt.Sprintf("INSERT INTO iceberg.%s.%s SELECT * FROM iceberg.%s.%s", schemaName, trinoCustomersTable, schemaName, customersTable) + ctasDeleteSQL := fmt.Sprintf("DELETE FROM iceberg.%s.%s", schemaName, trinoCustomersTable) + runTrinoCTAS(t, env.trinoContainer, ctasSQL, ctasDeleteSQL, ctasInsertSQL) countOutput = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s", schemaName, trinoCustomersTable)) rowCount = mustParseCSVInt64(t, countOutput) @@ -130,6 +140,72 @@ AS SELECT * FROM iceberg.%s.%s`, schemaName, trinoCustomersTable, schemaName, cu } } +func runTrinoSQLAllowExists(t *testing.T, containerName, sql string) string { + t.Helper() + + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "iceberg", + "--output-format", "CSV", + "--execute", sql, + ) + output, err := cmd.CombinedOutput() + if err != nil { + outputStr := string(output) + if strings.Contains(outputStr, "already exists") { + return sanitizeTrinoOutput(outputStr) + } + t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s", err, sql, outputStr) + } + return sanitizeTrinoOutput(string(output)) +} + +func runTrinoSQLAllowNamespaceNotEmpty(t *testing.T, containerName, sql string) string { + t.Helper() + + var output []byte + var err error + for attempt := 0; attempt < 3; attempt++ { + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "iceberg", + "--output-format", "CSV", + "--execute", sql, + ) + output, err = cmd.CombinedOutput() + if err == nil { + return sanitizeTrinoOutput(string(output)) + } + outputStr := string(output) + if !strings.Contains(outputStr, "Namespace is not empty") { + t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s", err, sql, outputStr) + } + time.Sleep(500 * time.Millisecond) + } + t.Logf("Ignoring cleanup error for SQL %s: %s", sql, sanitizeTrinoOutput(string(output))) + return sanitizeTrinoOutput(string(output)) +} + +func runTrinoCTAS(t *testing.T, containerName, createSQL, deleteSQL, insertSQL string) { + t.Helper() + + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "iceberg", + "--output-format", "CSV", + "--execute", createSQL, + ) + output, err := cmd.CombinedOutput() + if err != nil { + outputStr := string(output) + if strings.Contains(outputStr, "already exists") { + if deleteSQL != "" { + runTrinoSQL(t, containerName, deleteSQL) + } + runTrinoSQL(t, containerName, insertSQL) + return + } + t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s", err, createSQL, outputStr) + } +} + func hasCSVDataRow(output string) bool { lines := strings.Split(strings.TrimSpace(output), "\n") if len(lines) == 0 { diff --git a/test/s3tables/catalog_trino/trino_catalog_test.go b/test/s3tables/catalog_trino/trino_catalog_test.go index 44beea80c..1a30791f0 100644 --- a/test/s3tables/catalog_trino/trino_catalog_test.go +++ b/test/s3tables/catalog_trino/trino_catalog_test.go @@ -17,6 +17,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" ) type TestEnvironment struct { @@ -57,12 +58,10 @@ func TestTrinoIcebergCatalog(t *testing.T) { env.StartSeaweedFS(t) fmt.Printf(">>> SeaweedFS started.\n") - catalogBucket := "warehouse" tableBucket := "iceberg-tables" + catalogBucket := tableBucket fmt.Printf(">>> Creating table bucket: %s\n", tableBucket) createTableBucket(t, env, tableBucket) - fmt.Printf(">>> Creating table bucket: %s\n", catalogBucket) - createTableBucket(t, env, catalogBucket) fmt.Printf(">>> All buckets created.\n") // Test Iceberg REST API directly @@ -117,7 +116,7 @@ func NewTestEnvironment(t *testing.T) *TestEnvironment { t.Fatalf("Failed to create temp dir: %v", err) } - bindIP := findBindIP() + bindIP := testutil.FindBindIP() masterPort, masterGrpcPort := mustFreePortPair(t, "Master") volumePort, volumeGrpcPort := mustFreePortPair(t, "Volume") @@ -149,29 +148,8 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { t.Helper() // Create IAM config file - iamConfigPath := filepath.Join(env.dataDir, "iam_config.json") - iamConfig := fmt.Sprintf(`{ - "identities": [ - { - "name": "admin", - "credentials": [ - { - "accessKey": "%s", - "secretKey": "%s" - } - ], - "actions": [ - "Admin", - "Read", - "List", - "Tagging", - "Write" - ] - } - ] -}`, env.accessKey, env.secretKey) - - if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil { + iamConfigPath, err := testutil.WriteIAMConfig(env.dataDir, env.accessKey, env.secretKey) + if err != nil { t.Fatalf("Failed to create IAM config: %v", err) } @@ -206,6 +184,8 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { cmd.Env = append(os.Environ(), "AWS_ACCESS_KEY_ID="+env.accessKey, "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "ICEBERG_WAREHOUSE=s3://iceberg-tables", + "S3TABLES_DEFAULT_BUCKET=iceberg-tables", ) if err := cmd.Start(); err != nil { @@ -282,12 +262,13 @@ func testIcebergRestAPI(t *testing.T, env *TestEnvironment) { fmt.Printf(">>> Testing Iceberg REST API directly...\n") // First, verify the service is listening - conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", env.bindIP, env.icebergPort)) + addr := net.JoinHostPort(env.bindIP, fmt.Sprintf("%d", env.icebergPort)) + conn, err := net.Dial("tcp", addr) if err != nil { - t.Fatalf("Cannot connect to Iceberg service at %s:%d: %v", env.bindIP, env.icebergPort, err) + t.Fatalf("Cannot connect to Iceberg service at %s: %v", addr, err) } conn.Close() - t.Logf("Successfully connected to Iceberg service at %s:%d", env.bindIP, env.icebergPort) + t.Logf("Successfully connected to Iceberg service at %s", addr) // Test /v1/config endpoint url := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort) @@ -319,7 +300,7 @@ func (env *TestEnvironment) writeTrinoConfig(t *testing.T, warehouseBucket strin config := fmt.Sprintf(`connector.name=iceberg iceberg.catalog.type=rest iceberg.rest-catalog.uri=http://host.docker.internal:%d -iceberg.rest-catalog.warehouse=s3tablescatalog/%s +iceberg.rest-catalog.warehouse=s3://%s iceberg.file-format=PARQUET iceberg.unique-table-location=true @@ -399,8 +380,7 @@ func waitForTrino(t *testing.T, containerName string, timeout time.Duration) { return } - logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() - t.Fatalf("Timed out waiting for Trino to be ready\nLast output:\n%s\nTrino logs:\n%s", string(lastOutput), string(logs)) + t.Fatalf("Timed out waiting for Trino to be ready\nLast output:\n%s", string(lastOutput)) } func runTrinoSQL(t *testing.T, containerName, sql string) string { @@ -413,8 +393,7 @@ func runTrinoSQL(t *testing.T, containerName, sql string) string { ) output, err := cmd.CombinedOutput() if err != nil { - logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() - t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s\nTrino logs:\n%s", err, sql, string(output), string(logs)) + t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s", err, sql, string(output)) } return sanitizeTrinoOutput(string(output)) } @@ -535,25 +514,6 @@ func getFreePort() (int, error) { return addr.Port, nil } -func findBindIP() string { - addrs, err := net.InterfaceAddrs() - if err != nil { - return "127.0.0.1" - } - for _, addr := range addrs { - ipNet, ok := addr.(*net.IPNet) - if !ok || ipNet.IP == nil { - continue - } - ip := ipNet.IP.To4() - if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() { - continue - } - return ip.String() - } - return "127.0.0.1" -} - func randomString(length int) string { const charset = "abcdefghijklmnopqrstuvwxyz0123456789" b := make([]byte, length) diff --git a/test/s3tables/catalog_trino/trino_crud_operations_test.go b/test/s3tables/catalog_trino/trino_crud_operations_test.go index cd78c08ae..e33d03e1c 100644 --- a/test/s3tables/catalog_trino/trino_crud_operations_test.go +++ b/test/s3tables/catalog_trino/trino_crud_operations_test.go @@ -24,10 +24,9 @@ func setupTrinoTest(t *testing.T) *TestEnvironment { t.Logf(">>> Starting SeaweedFS...") env.StartSeaweedFS(t) - catalogBucket := "warehouse" tableBucket := "iceberg-tables" + catalogBucket := tableBucket createTableBucket(t, env, tableBucket) - createTableBucket(t, env, catalogBucket) configDir := env.writeTrinoConfig(t, catalogBucket) env.startTrinoContainer(t, configDir) diff --git a/test/s3tables/testutil/weed_mini.go b/test/s3tables/testutil/weed_mini.go new file mode 100644 index 000000000..ff8260ef0 --- /dev/null +++ b/test/s3tables/testutil/weed_mini.go @@ -0,0 +1,56 @@ +package testutil + +import ( + "fmt" + "net" + "os" + "path/filepath" +) + +func FindBindIP() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "127.0.0.1" + } + for _, addr := range addrs { + ipNet, ok := addr.(*net.IPNet) + if !ok || ipNet.IP == nil { + continue + } + ip := ipNet.IP.To4() + if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() { + continue + } + return ip.String() + } + return "127.0.0.1" +} + +func WriteIAMConfig(dir, accessKey, secretKey string) (string, error) { + iamConfigPath := filepath.Join(dir, "iam_config.json") + iamConfig := fmt.Sprintf(`{ + "identities": [ + { + "name": "admin", + "credentials": [ + { + "accessKey": "%s", + "secretKey": "%s" + } + ], + "actions": [ + "Admin", + "Read", + "List", + "Tagging", + "Write" + ] + } + ] +}`, accessKey, secretKey) + + if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil { + return "", err + } + return iamConfigPath, nil +} diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 1b6cbe769..9f58d3f9d 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -276,8 +276,10 @@ func (f *Filer) ensureParentDirectoryEntry(ctx context.Context, entry *Entry, di // dirParts[0] == "" and dirParts[1] == "buckets" isUnderBuckets := len(dirParts) >= 3 && dirParts[1] == "buckets" if isUnderBuckets { - if err := s3bucket.VerifyS3BucketName(dirParts[2]); err != nil { - return fmt.Errorf("invalid bucket name %s: %v", dirParts[2], err) + if !strings.HasPrefix(dirParts[2], ".") { + if err := s3bucket.VerifyS3BucketName(dirParts[2]); err != nil { + return fmt.Errorf("invalid bucket name %s: %v", dirParts[2], err) + } } } diff --git a/weed/s3api/bucket_paths.go b/weed/s3api/bucket_paths.go index 5c7b9520a..de3adee0d 100644 --- a/weed/s3api/bucket_paths.go +++ b/weed/s3api/bucket_paths.go @@ -89,9 +89,6 @@ func (s3a *S3ApiServer) bucketDir(bucket string) string { if tablePath, ok := s3a.tableLocationDir(bucket); ok { return tablePath } - if s3a.isTableBucket(bucket) { - return s3tables.GetTableObjectBucketPath(bucket) - } return path.Join(s3a.bucketRoot(bucket), bucket) } diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index 96a6230bf..d96957c41 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -217,13 +217,14 @@ func (s *Server) saveMetadataFile(ctx context.Context, bucketName, tablePath, me return nil } + bucketDir := path.Join(bucketsPath, bucketName) // 1. Ensure bucket directory exists: / if err := ensureDir(bucketsPath, bucketName, "bucket directory"); err != nil { return err } - // 2. Ensure table path exists: // - tableDir := path.Join(bucketsPath, bucketName) + // 2. Ensure table path exists under the bucket directory + tableDir := bucketDir if tablePath != "" { segments := strings.Split(tablePath, "/") for _, segment := range segments { @@ -354,6 +355,9 @@ func getBucketFromPrefix(r *http.Request) string { if prefix := vars["prefix"]; prefix != "" { return prefix } + if bucket := os.Getenv("S3TABLES_DEFAULT_BUCKET"); bucket != "" { + return bucket + } // Default bucket if no prefix - use "warehouse" for Iceberg return "warehouse" } @@ -680,24 +684,32 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { // Generate UUID for the new table tableUUID := uuid.New() - location := strings.TrimSuffix(req.Location, "/") tablePath := path.Join(encodeNamespace(namespace), req.Name) - storageBucket := bucketName - tableLocationBucket := "" - if location != "" { + location := strings.TrimSuffix(req.Location, "/") + if location == "" { + if req.Properties != nil { + if warehouse := strings.TrimSuffix(req.Properties["warehouse"], "/"); warehouse != "" { + location = fmt.Sprintf("%s/%s", warehouse, tablePath) + } + } + if location == "" { + if warehouse := strings.TrimSuffix(os.Getenv("ICEBERG_WAREHOUSE"), "/"); warehouse != "" { + location = fmt.Sprintf("%s/%s", warehouse, tablePath) + } + } + if location == "" { + location = fmt.Sprintf("s3://%s/%s", bucketName, tablePath) + } + } else { parsedBucket, parsedPath, err := parseS3Location(location) if err != nil { writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid table location: "+err.Error()) return } - if strings.HasSuffix(parsedBucket, "--table-s3") && parsedPath == "" { - tableLocationBucket = parsedBucket + if parsedPath == "" { + location = fmt.Sprintf("s3://%s/%s", parsedBucket, tablePath) } } - if tableLocationBucket == "" { - tableLocationBucket = fmt.Sprintf("%s--table-s3", tableUUID.String()) - } - location = fmt.Sprintf("s3://%s", tableLocationBucket) // Build proper Iceberg table metadata using iceberg-go types metadata := newTableMetadata(tableUUID, location, req.Schema, req.PartitionSpec, req.WriteOrder, req.Properties) @@ -713,15 +725,21 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { return } - // 1. Save metadata file to filer tableName := req.Name metadataFileName := "v1.metadata.json" // Initial version is always 1 - if err := s.saveMetadataFile(r.Context(), storageBucket, tablePath, metadataFileName, metadataBytes); err != nil { - writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error()) - return - } - metadataLocation := fmt.Sprintf("%s/metadata/%s", location, metadataFileName) + if !req.StageCreate { + // Save metadata file to filer for immediate table creation. + metadataBucket, metadataPath, err := parseS3Location(location) + if err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error()) + return + } + if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error()) + return + } + } // Use S3 Tables manager to create table createReq := &s3tables.CreateTableRequest{ @@ -746,8 +764,42 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { }) if err != nil { + if tableErr, ok := err.(*s3tables.S3TablesError); ok && tableErr.Type == s3tables.ErrCodeTableAlreadyExists { + getReq := &s3tables.GetTableRequest{ + TableBucketARN: bucketARN, + Namespace: namespace, + Name: tableName, + } + var getResp s3tables.GetTableResponse + getErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + mgrClient := s3tables.NewManagerClient(client) + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) + }) + if getErr != nil { + writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error()) + return + } + result := buildLoadTableResult(getResp, bucketName, namespace, tableName) + writeJSON(w, http.StatusOK, result) + return + } if strings.Contains(err.Error(), "already exists") { - writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error()) + getReq := &s3tables.GetTableRequest{ + TableBucketARN: bucketARN, + Namespace: namespace, + Name: tableName, + } + var getResp s3tables.GetTableResponse + getErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + mgrClient := s3tables.NewManagerClient(client) + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) + }) + if getErr != nil { + writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error()) + return + } + result := buildLoadTableResult(getResp, bucketName, namespace, tableName) + writeJSON(w, http.StatusOK, result) return } glog.V(1).Infof("Iceberg: CreateTable error: %v", err) @@ -809,7 +861,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) { return } - // Build table metadata using iceberg-go types + result := buildLoadTableResult(getResp, bucketName, namespace, tableName) + writeJSON(w, http.StatusOK, result) +} + +func buildLoadTableResult(getResp s3tables.GetTableResponse, bucketName string, namespace []string, tableName string) LoadTableResult { location := tableLocationFromMetadataLocation(getResp.MetadataLocation) if location == "" { location = fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), tableName) @@ -840,12 +896,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) { metadata = newTableMetadata(tableUUID, location, nil, nil, nil, nil) } - result := LoadTableResult{ + return LoadTableResult{ MetadataLocation: getResp.MetadataLocation, Metadata: metadata, Config: make(iceberg.Properties), } - writeJSON(w, http.StatusOK, result) } // handleTableExists checks if a table exists. @@ -943,13 +998,53 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { // Extract identity from context identityName := s3_constants.GetIdentityNameFromContext(r) - // Parse the commit request - var req CommitTableRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + // Parse the commit request, skipping update actions not supported by iceberg-go. + var raw struct { + Identifier *TableIdentifier `json:"identifier,omitempty"` + Requirements json.RawMessage `json:"requirements"` + Updates []json.RawMessage `json:"updates"` + } + if err := json.NewDecoder(r.Body).Decode(&raw); err != nil { writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid request body: "+err.Error()) return } + var req CommitTableRequest + req.Identifier = raw.Identifier + if len(raw.Requirements) > 0 { + if err := json.Unmarshal(raw.Requirements, &req.Requirements); err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid requirements: "+err.Error()) + return + } + } + if len(raw.Updates) > 0 { + filtered := make([]json.RawMessage, 0, len(raw.Updates)) + for _, update := range raw.Updates { + var action struct { + Action string `json:"action"` + } + if err := json.Unmarshal(update, &action); err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid update: "+err.Error()) + return + } + if action.Action == "set-statistics" { + continue + } + filtered = append(filtered, update) + } + if len(filtered) > 0 { + updatesBytes, err := json.Marshal(filtered) + if err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse updates: "+err.Error()) + return + } + if err := json.Unmarshal(updatesBytes, &req.Updates); err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid updates: "+err.Error()) + return + } + } + } + // First, load current table metadata getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, @@ -1049,8 +1144,12 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { } // 1. Save metadata file to filer - tablePath := path.Join(encodeNamespace(namespace), tableName) - if err := s.saveMetadataFile(r.Context(), bucketName, tablePath, metadataFileName, metadataBytes); err != nil { + metadataBucket, metadataPath, err := parseS3Location(location) + if err != nil { + writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error()) + return + } + if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil { writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error()) return } diff --git a/weed/s3api/s3api_object_handlers_list.go b/weed/s3api/s3api_object_handlers_list.go index 1a4cf84dd..731972f75 100644 --- a/weed/s3api/s3api_object_handlers_list.go +++ b/weed/s3api/s3api_object_handlers_list.go @@ -524,7 +524,6 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d stream, listErr := client.ListEntries(ctx, request) if listErr != nil { if errors.Is(listErr, filer_pb.ErrNotFound) { - err = filer_pb.ErrNotFound return } err = fmt.Errorf("list entries %+v: %w", request, listErr) diff --git a/weed/s3api/s3tables/handler_bucket_create.go b/weed/s3api/s3tables/handler_bucket_create.go index 3b4cac7e4..44971dbb9 100644 --- a/weed/s3api/s3tables/handler_bucket_create.go +++ b/weed/s3api/s3tables/handler_bucket_create.go @@ -105,14 +105,6 @@ func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http } } - // Ensure object root directory exists for table bucket S3 operations - if err := h.ensureDirectory(r.Context(), client, GetTableObjectRootDir()); err != nil { - return fmt.Errorf("failed to create table object root directory: %w", err) - } - if err := h.ensureDirectory(r.Context(), client, GetTableObjectBucketPath(req.Name)); err != nil { - return fmt.Errorf("failed to create table object bucket directory: %w", err) - } - // Create bucket directory if err := h.createDirectory(r.Context(), client, bucketPath); err != nil { return err diff --git a/weed/s3api/s3tables/handler_namespace.go b/weed/s3api/s3tables/handler_namespace.go index 492a53241..b6a18d826 100644 --- a/weed/s3api/s3tables/handler_namespace.go +++ b/weed/s3api/s3tables/handler_namespace.go @@ -50,12 +50,38 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R var bucketMetadata tableBucketMetadata var bucketPolicy string var bucketTags map[string]string + ownerAccountID := h.getAccountID(r) err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { data, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata) if err != nil { - return err - } - if err := json.Unmarshal(data, &bucketMetadata); err != nil { + if errors.Is(err, ErrAttributeNotFound) { + dir, name := splitPath(bucketPath) + entryResp, lookupErr := filer_pb.LookupEntry(r.Context(), client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + }) + if lookupErr != nil { + return lookupErr + } + if entryResp.Entry == nil || !IsTableBucketEntry(entryResp.Entry) { + return filer_pb.ErrNotFound + } + bucketMetadata = tableBucketMetadata{ + Name: bucketName, + CreatedAt: time.Now(), + OwnerAccountID: ownerAccountID, + } + metadataBytes, err := json.Marshal(&bucketMetadata) + if err != nil { + return fmt.Errorf("failed to marshal bucket metadata: %w", err) + } + if err := h.setExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata, metadataBytes); err != nil { + return err + } + } else { + return err + } + } else if err := json.Unmarshal(data, &bucketMetadata); err != nil { return fmt.Errorf("failed to unmarshal bucket metadata: %w", err) } diff --git a/weed/s3api/s3tables/handler_table.go b/weed/s3api/s3tables/handler_table.go index b1ae0f8e1..77e6691fd 100644 --- a/weed/s3api/s3tables/handler_table.go +++ b/weed/s3api/s3tables/handler_table.go @@ -164,14 +164,26 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque tablePath := GetTablePath(bucketName, namespaceName, tableName) // Check if table already exists + var existingMetadata tableMetadataInternal err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - _, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata) - return err + data, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata) + if err != nil { + return err + } + if unmarshalErr := json.Unmarshal(data, &existingMetadata); unmarshalErr != nil { + return fmt.Errorf("failed to parse existing table metadata: %w", unmarshalErr) + } + return nil }) if err == nil { - h.writeError(w, http.StatusConflict, ErrCodeTableAlreadyExists, fmt.Sprintf("table %s already exists", tableName)) - return fmt.Errorf("table already exists") + tableARN := h.generateTableARN(existingMetadata.OwnerAccountID, bucketName, namespaceName+"/"+tableName) + h.writeJSON(w, http.StatusOK, &CreateTableResponse{ + TableARN: tableARN, + VersionToken: existingMetadata.VersionToken, + MetadataLocation: existingMetadata.MetadataLocation, + }) + return nil } else if !errors.Is(err, filer_pb.ErrNotFound) && !errors.Is(err, ErrAttributeNotFound) { h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check table: %v", err)) return err @@ -201,14 +213,14 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque } err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - // Create table directory - if err := h.createDirectory(r.Context(), client, tablePath); err != nil { + // Ensure table directory exists (may already be created by object storage clients) + if err := h.ensureDirectory(r.Context(), client, tablePath); err != nil { return err } // Create data subdirectory for Iceberg files dataPath := tablePath + "/data" - if err := h.createDirectory(r.Context(), client, dataPath); err != nil { + if err := h.ensureDirectory(r.Context(), client, dataPath); err != nil { return err }