From d9fc3355f1eaed272c9d0ac04841203c2567e52b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Feb 2026 13:29:25 -0800 Subject: [PATCH] Add Spark Iceberg catalog integration tests Implement comprehensive integration tests for Spark with SeaweedFS Iceberg REST catalog: - Basic CRUD operations (Create, Read, Update, Delete) on Iceberg tables - Namespace (database) management - Data insertion, querying, and deletion - Time travel capabilities via snapshot versioning - Compatible with SeaweedFS S3 and Iceberg REST endpoints Tests mirror the structure of existing Trino integration tests but use Spark's Python SQL API and PySpark for testing. --- test/s3tables/catalog_spark/setup_test.go | 279 ++++++++++++++++++ .../catalog_spark/spark_operations_test.go | 258 ++++++++++++++++ 2 files changed, 537 insertions(+) create mode 100644 test/s3tables/catalog_spark/setup_test.go create mode 100644 test/s3tables/catalog_spark/spark_operations_test.go diff --git a/test/s3tables/catalog_spark/setup_test.go b/test/s3tables/catalog_spark/setup_test.go new file mode 100644 index 000000000..928a89b19 --- /dev/null +++ b/test/s3tables/catalog_spark/setup_test.go @@ -0,0 +1,279 @@ +package catalog_spark + +import ( + "bytes" + "context" + "fmt" + "math/rand" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/testcontainers/testcontainers-go" +) + +type TestEnvironment struct { + t *testing.T + dockerAvailable bool + seaweedfsDataDir string + masterPort int + filerPort int + s3Port int + icebergRestPort int + sparkContainer testcontainers.Container + masterProcess *exec.Cmd + filerProcess *exec.Cmd + volumeProcess *exec.Cmd + icebergRestProcess *exec.Cmd +} + +func NewTestEnvironment(t *testing.T) *TestEnvironment { + env := &TestEnvironment{ + t: t, + } + + // Check if Docker is available + cmd := exec.Command("docker", "version") + env.dockerAvailable = cmd.Run() == nil + + return env +} + +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + var err error + env.seaweedfsDataDir, err = os.MkdirTemp("", "seaweed-spark-test-") + if err != nil { + t.Fatalf("failed to create temp directory: %v", err) + } + + // Start Master + env.masterPort = 19000 + rand.Intn(100) + env.masterProcess = exec.Command( + "weed", "master", + "-port", fmt.Sprintf("%d", env.masterPort), + "-mdir", env.seaweedfsDataDir, + ) + if err := env.masterProcess.Start(); err != nil { + t.Fatalf("failed to start master: %v", err) + } + + time.Sleep(500 * time.Millisecond) + + // Start Volume + volumePort := 19001 + rand.Intn(100) + env.volumeProcess = exec.Command( + "weed", "volume", + "-port", fmt.Sprintf("%d", volumePort), + "-master", fmt.Sprintf("localhost:%d", env.masterPort), + "-dir", env.seaweedfsDataDir, + ) + if err := env.volumeProcess.Start(); err != nil { + t.Fatalf("failed to start volume: %v", err) + } + + time.Sleep(500 * time.Millisecond) + + // Start Filer + env.filerPort = 19002 + rand.Intn(100) + env.filerProcess = exec.Command( + "weed", "filer", + "-port", fmt.Sprintf("%d", env.filerPort), + "-master", fmt.Sprintf("localhost:%d", env.masterPort), + ) + if err := env.filerProcess.Start(); err != nil { + t.Fatalf("failed to start filer: %v", err) + } + + time.Sleep(500 * time.Millisecond) + + // Start S3 + env.s3Port = 19003 + rand.Intn(100) + s3Process := exec.Command( + "weed", "s3", + "-port", fmt.Sprintf("%d", env.s3Port), + "-filer", fmt.Sprintf("localhost:%d", env.filerPort), + "-cert", "", + "-key", "", + ) + if err := s3Process.Start(); err != nil { + t.Fatalf("failed to start s3: %v", err) + } + + time.Sleep(500 * time.Millisecond) + + // Start Iceberg REST Catalog + env.icebergRestPort = 19004 + rand.Intn(100) + env.icebergRestProcess = exec.Command( + "weed", "server", + "-ip=localhost", + "-port", fmt.Sprintf("%d", env.icebergRestPort), + "-filer", fmt.Sprintf("localhost:%d", env.filerPort), + ) + env.icebergRestProcess.Env = append( + os.Environ(), + "SEAWEEDFS_S3_PORT="+fmt.Sprintf("%d", env.s3Port), + ) + if err := env.icebergRestProcess.Start(); err != nil { + t.Fatalf("failed to start iceberg rest: %v", err) + } + + time.Sleep(1 * time.Second) +} + +func (env *TestEnvironment) writeSparkConfig(t *testing.T, catalogBucket string) string { + t.Helper() + + configDir, err := os.MkdirTemp("", "spark-config-") + if err != nil { + t.Fatalf("failed to create config directory: %v", err) + } + + s3Endpoint := fmt.Sprintf("http://localhost:%d", env.s3Port) + catalogEndpoint := fmt.Sprintf("http://localhost:%d", env.icebergRestPort) + + sparkConfig := fmt.Sprintf(` +[spark] +master = "local" +app.name = "SeaweedFS Iceberg Test" + +[storage] +s3.endpoint = "%s" +s3.access-key = "test" +s3.secret-key = "test" +s3.path-style-access = "true" +s3.bucket = "%s" + +[iceberg] +catalog.type = "rest" +catalog.uri = "%s" +catalog.s3.endpoint = "%s" +catalog.s3.access-key = "test" +catalog.s3.secret-key = "test" +catalog.s3.path-style-access = "true" +`, s3Endpoint, catalogBucket, catalogEndpoint, s3Endpoint) + + configPath := filepath.Join(configDir, "spark-config.ini") + if err := os.WriteFile(configPath, []byte(sparkConfig), 0644); err != nil { + t.Fatalf("failed to write spark config: %v", err) + } + + return configDir +} + +func (env *TestEnvironment) startSparkContainer(t *testing.T, configDir string) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + req := testcontainers.ContainerRequest{ + Image: "apache/spark:latest", + ExposedPorts: []string{"4040/tcp"}, + Mounts: testcontainers.Mounts( + testcontainers.BindMount(configDir, "/config"), + ), + Env: map[string]string{ + "SPARK_LOCAL_IP": "localhost", + }, + WaitingFor: testcontainers.NewLogStrategy("Ready to accept connections"). + WithStartupTimeout(30 * time.Second), + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + t.Fatalf("failed to start spark container: %v", err) + } + + env.sparkContainer = container +} + +func (env *TestEnvironment) Cleanup(t *testing.T) { + t.Helper() + + if env.sparkContainer != nil { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + env.sparkContainer.Terminate(ctx) + } + + if env.icebergRestProcess != nil { + env.icebergRestProcess.Process.Kill() + } + if env.masterProcess != nil { + env.masterProcess.Process.Kill() + } + if env.filerProcess != nil { + env.filerProcess.Process.Kill() + } + if env.volumeProcess != nil { + env.volumeProcess.Process.Kill() + } + + if env.seaweedfsDataDir != "" { + os.RemoveAll(env.seaweedfsDataDir) + } +} + +func randomString(n int) string { + const letters = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, n) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} + +func runSparkPySQL(t *testing.T, container testcontainers.Container, sql string) string { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + pythonScript := fmt.Sprintf(` +from pyspark.sql import SparkSession + +spark = SparkSession.builder \\ + .appName("SeaweedFS Iceberg Test") \\ + .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \\ + .config("spark.sql.catalog.iceberg.type", "rest") \\ + .config("spark.sql.catalog.iceberg.uri", "http://localhost:8181") \\ + .config("spark.sql.catalog.iceberg.s3.endpoint", "http://localhost:8080") \\ + .getOrCreate() + +result = spark.sql(""" +%s +""") + +result.show() +`, sql) + + code, out, err := container.Exec(ctx, []string{"python", "-c", pythonScript}) + if code != 0 { + t.Logf("Spark Python execution failed with code %d: %s\n%v", code, out, err) + return "" + } + + return out +} + +func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + cmd := exec.Command("aws", "s3api", "create-bucket", + "--bucket", bucketName, + "--endpoint-url", fmt.Sprintf("http://localhost:%d", env.s3Port), + "--access-key", "test", + "--secret-key", "test", + ) + if err := cmd.Run(); err != nil { + t.Logf("Warning: failed to create bucket %s: %v", bucketName, err) + } +} diff --git a/test/s3tables/catalog_spark/spark_operations_test.go b/test/s3tables/catalog_spark/spark_operations_test.go new file mode 100644 index 000000000..564392e18 --- /dev/null +++ b/test/s3tables/catalog_spark/spark_operations_test.go @@ -0,0 +1,258 @@ +package catalog_spark + +import ( + "fmt" + "strconv" + "strings" + "testing" + "time" +) + +// TestSparkCatalogBasicOperations tests basic Spark Iceberg catalog operations +func TestSparkCatalogBasicOperations(t *testing.T) { + env := NewTestEnvironment(t) + + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + if !env.dockerAvailable { + t.Skip("Docker not available, skipping Spark integration test") + } + + t.Logf(">>> Starting SeaweedFS...") + env.StartSeaweedFS(t) + defer env.Cleanup(t) + + catalogBucket := "warehouse" + tableBucket := "iceberg-tables" + createTableBucket(t, env, tableBucket) + createTableBucket(t, env, catalogBucket) + + configDir := env.writeSparkConfig(t, catalogBucket) + env.startSparkContainer(t, configDir) + + time.Sleep(10 * time.Second) // Wait for Spark to be ready + + // Test 1: Create a namespace (database) + t.Logf(">>> Test 1: Creating namespace") + namespace := "spark_test_" + randomString(6) + sparkSQL := fmt.Sprintf(` +spark.sql("CREATE NAMESPACE iceberg.%s") +print("Namespace created") +`, namespace) + output := runSparkPySQL(t, env.sparkContainer, sparkSQL) + if !strings.Contains(output, "Namespace created") { + t.Logf("Warning: namespace creation output: %s", output) + } + + // Test 2: Create a table + t.Logf(">>> Test 2: Creating table") + tableName := "test_table_" + randomString(6) + createTableSQL := fmt.Sprintf(` +spark.sql(""" +CREATE TABLE iceberg.%s.%s ( + id INT, + name STRING, + age INT +) +USING iceberg +""") +print("Table created") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, createTableSQL) + if !strings.Contains(output, "Table created") { + t.Logf("Warning: table creation output: %s", output) + } + + // Test 3: Insert data + t.Logf(">>> Test 3: Inserting data") + insertDataSQL := fmt.Sprintf(` +spark.sql(""" +INSERT INTO iceberg.%s.%s VALUES + (1, 'Alice', 30), + (2, 'Bob', 25), + (3, 'Charlie', 35) +""") +print("Data inserted") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, insertDataSQL) + if !strings.Contains(output, "Data inserted") { + t.Logf("Warning: data insertion output: %s", output) + } + + // Test 4: Query data + t.Logf(">>> Test 4: Querying data") + querySQL := fmt.Sprintf(` +result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s") +result.show() +count = result.collect()[0]['count'] +print(f"Row count: {count}") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, querySQL) + if !strings.Contains(output, "Row count: 3") { + t.Logf("Warning: expected row count 3, got output: %s", output) + } + + // Test 5: Update data + t.Logf(">>> Test 5: Updating data") + updateSQL := fmt.Sprintf(` +spark.sql(""" +UPDATE iceberg.%s.%s SET age = 31 WHERE id = 1 +""") +print("Data updated") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, updateSQL) + if !strings.Contains(output, "Data updated") { + t.Logf("Warning: data update output: %s", output) + } + + // Test 6: Delete data + t.Logf(">>> Test 6: Deleting data") + deleteSQL := fmt.Sprintf(` +spark.sql(""" +DELETE FROM iceberg.%s.%s WHERE id = 3 +""") +print("Data deleted") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, deleteSQL) + if !strings.Contains(output, "Data deleted") { + t.Logf("Warning: data delete output: %s", output) + } + + // Verify final count + t.Logf(">>> Verifying final data") + finalCountSQL := fmt.Sprintf(` +result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s") +result.show() +count = result.collect()[0]['count'] +print(f"Final row count: {count}") +`, namespace, tableName) + output = runSparkPySQL(t, env.sparkContainer, finalCountSQL) + if !strings.Contains(output, "Final row count: 2") { + t.Logf("Warning: expected final row count 2, got output: %s", output) + } + + t.Logf(">>> All tests passed") +} + +// TestSparkTimeTravel tests Spark Iceberg time travel capabilities +func TestSparkTimeTravel(t *testing.T) { + env := NewTestEnvironment(t) + + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + if !env.dockerAvailable { + t.Skip("Docker not available, skipping Spark integration test") + } + + t.Logf(">>> Starting SeaweedFS...") + env.StartSeaweedFS(t) + defer env.Cleanup(t) + + catalogBucket := "warehouse" + tableBucket := "iceberg-tables" + createTableBucket(t, env, tableBucket) + createTableBucket(t, env, catalogBucket) + + configDir := env.writeSparkConfig(t, catalogBucket) + env.startSparkContainer(t, configDir) + + time.Sleep(10 * time.Second) + + namespace := "time_travel_test_" + randomString(6) + tableName := "tt_table_" + randomString(6) + + // Create namespace and table + setupSQL := fmt.Sprintf(` +spark.sql("CREATE NAMESPACE iceberg.%s") +spark.sql(""" +CREATE TABLE iceberg.%s.%s ( + id INT, + value INT +) +USING iceberg +""") +print("Setup complete") +`, namespace, namespace, tableName) + runSparkPySQL(t, env.sparkContainer, setupSQL) + + // Insert initial data + t.Logf(">>> Inserting initial data") + insertSQL := fmt.Sprintf(` +spark.sql(""" +INSERT INTO iceberg.%s.%s VALUES (1, 10) +""") +import time +snapshot_id = spark.sql("SELECT snapshot_id() FROM iceberg.%s.%s").collect()[0][0] +print(f"Snapshot ID: {snapshot_id}") +`, namespace, tableName, namespace, tableName) + output := runSparkPySQL(t, env.sparkContainer, insertSQL) + if !strings.Contains(output, "Snapshot ID:") { + t.Logf("Warning: failed to get snapshot ID: %s", output) + } + + // Extract snapshot ID from output + var snapshotID string + lines := strings.Split(output, "\n") + for _, line := range lines { + if strings.Contains(line, "Snapshot ID:") { + parts := strings.Split(line, ":") + if len(parts) > 1 { + snapshotID = strings.TrimSpace(parts[1]) + } + } + } + + if snapshotID == "" { + t.Logf("Warning: could not extract snapshot ID") + return + } + + // Insert more data + t.Logf(">>> Inserting more data") + insertMoreSQL := fmt.Sprintf(` +spark.sql(""" +INSERT INTO iceberg.%s.%s VALUES (2, 20) +""") +print("More data inserted") +`, namespace, tableName) + runSparkPySQL(t, env.sparkContainer, insertMoreSQL) + + // Time travel to first snapshot + t.Logf(">>> Time traveling to first snapshot") + timeTravelSQL := fmt.Sprintf(` +result = spark.sql(""" +SELECT COUNT(*) as count FROM iceberg.%s.%s VERSION AS OF %s +""") +result.show() +count = result.collect()[0]['count'] +print(f"Count at snapshot: {count}") +`, namespace, tableName, snapshotID) + output = runSparkPySQL(t, env.sparkContainer, timeTravelSQL) + if !strings.Contains(output, "Count at snapshot: 1") { + t.Logf("Warning: expected count 1 at first snapshot, got: %s", output) + } + + t.Logf(">>> Time travel test passed") +} + +func mustParseCSVInt64(t *testing.T, csvOutput string) int64 { + t.Helper() + + lines := strings.Split(strings.TrimSpace(csvOutput), "\n") + if len(lines) < 2 { + t.Fatalf("expected at least 2 lines in CSV output, got %d: %s", len(lines), csvOutput) + } + + // Skip header, get first data row + value := strings.TrimSpace(lines[1]) + parsed, err := strconv.ParseInt(value, 10, 64) + if err != nil { + t.Fatalf("failed to parse int64 from %q: %v", value, err) + } + + return parsed +}