Browse Source

Add Spark Iceberg catalog integration tests

Implement comprehensive integration tests for Spark with SeaweedFS Iceberg REST catalog:
- Basic CRUD operations (Create, Read, Update, Delete) on Iceberg tables
- Namespace (database) management
- Data insertion, querying, and deletion
- Time travel capabilities via snapshot versioning
- Compatible with SeaweedFS S3 and Iceberg REST endpoints

Tests mirror the structure of existing Trino integration tests but use Spark's
Python SQL API and PySpark for testing.
feature/s3tables-improvements-and-spark-tests
Chris Lu 4 days ago
parent
commit
d9fc3355f1
  1. 279
      test/s3tables/catalog_spark/setup_test.go
  2. 258
      test/s3tables/catalog_spark/spark_operations_test.go

279
test/s3tables/catalog_spark/setup_test.go

@ -0,0 +1,279 @@
package catalog_spark
import (
"bytes"
"context"
"fmt"
"math/rand"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
"github.com/testcontainers/testcontainers-go"
)
type TestEnvironment struct {
t *testing.T
dockerAvailable bool
seaweedfsDataDir string
masterPort int
filerPort int
s3Port int
icebergRestPort int
sparkContainer testcontainers.Container
masterProcess *exec.Cmd
filerProcess *exec.Cmd
volumeProcess *exec.Cmd
icebergRestProcess *exec.Cmd
}
func NewTestEnvironment(t *testing.T) *TestEnvironment {
env := &TestEnvironment{
t: t,
}
// Check if Docker is available
cmd := exec.Command("docker", "version")
env.dockerAvailable = cmd.Run() == nil
return env
}
func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
t.Helper()
var err error
env.seaweedfsDataDir, err = os.MkdirTemp("", "seaweed-spark-test-")
if err != nil {
t.Fatalf("failed to create temp directory: %v", err)
}
// Start Master
env.masterPort = 19000 + rand.Intn(100)
env.masterProcess = exec.Command(
"weed", "master",
"-port", fmt.Sprintf("%d", env.masterPort),
"-mdir", env.seaweedfsDataDir,
)
if err := env.masterProcess.Start(); err != nil {
t.Fatalf("failed to start master: %v", err)
}
time.Sleep(500 * time.Millisecond)
// Start Volume
volumePort := 19001 + rand.Intn(100)
env.volumeProcess = exec.Command(
"weed", "volume",
"-port", fmt.Sprintf("%d", volumePort),
"-master", fmt.Sprintf("localhost:%d", env.masterPort),
"-dir", env.seaweedfsDataDir,
)
if err := env.volumeProcess.Start(); err != nil {
t.Fatalf("failed to start volume: %v", err)
}
time.Sleep(500 * time.Millisecond)
// Start Filer
env.filerPort = 19002 + rand.Intn(100)
env.filerProcess = exec.Command(
"weed", "filer",
"-port", fmt.Sprintf("%d", env.filerPort),
"-master", fmt.Sprintf("localhost:%d", env.masterPort),
)
if err := env.filerProcess.Start(); err != nil {
t.Fatalf("failed to start filer: %v", err)
}
time.Sleep(500 * time.Millisecond)
// Start S3
env.s3Port = 19003 + rand.Intn(100)
s3Process := exec.Command(
"weed", "s3",
"-port", fmt.Sprintf("%d", env.s3Port),
"-filer", fmt.Sprintf("localhost:%d", env.filerPort),
"-cert", "",
"-key", "",
)
if err := s3Process.Start(); err != nil {
t.Fatalf("failed to start s3: %v", err)
}
time.Sleep(500 * time.Millisecond)
// Start Iceberg REST Catalog
env.icebergRestPort = 19004 + rand.Intn(100)
env.icebergRestProcess = exec.Command(
"weed", "server",
"-ip=localhost",
"-port", fmt.Sprintf("%d", env.icebergRestPort),
"-filer", fmt.Sprintf("localhost:%d", env.filerPort),
)
env.icebergRestProcess.Env = append(
os.Environ(),
"SEAWEEDFS_S3_PORT="+fmt.Sprintf("%d", env.s3Port),
)
if err := env.icebergRestProcess.Start(); err != nil {
t.Fatalf("failed to start iceberg rest: %v", err)
}
time.Sleep(1 * time.Second)
}
func (env *TestEnvironment) writeSparkConfig(t *testing.T, catalogBucket string) string {
t.Helper()
configDir, err := os.MkdirTemp("", "spark-config-")
if err != nil {
t.Fatalf("failed to create config directory: %v", err)
}
s3Endpoint := fmt.Sprintf("http://localhost:%d", env.s3Port)
catalogEndpoint := fmt.Sprintf("http://localhost:%d", env.icebergRestPort)
sparkConfig := fmt.Sprintf(`
[spark]
master = "local"
app.name = "SeaweedFS Iceberg Test"
[storage]
s3.endpoint = "%s"
s3.access-key = "test"
s3.secret-key = "test"
s3.path-style-access = "true"
s3.bucket = "%s"
[iceberg]
catalog.type = "rest"
catalog.uri = "%s"
catalog.s3.endpoint = "%s"
catalog.s3.access-key = "test"
catalog.s3.secret-key = "test"
catalog.s3.path-style-access = "true"
`, s3Endpoint, catalogBucket, catalogEndpoint, s3Endpoint)
configPath := filepath.Join(configDir, "spark-config.ini")
if err := os.WriteFile(configPath, []byte(sparkConfig), 0644); err != nil {
t.Fatalf("failed to write spark config: %v", err)
}
return configDir
}
func (env *TestEnvironment) startSparkContainer(t *testing.T, configDir string) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req := testcontainers.ContainerRequest{
Image: "apache/spark:latest",
ExposedPorts: []string{"4040/tcp"},
Mounts: testcontainers.Mounts(
testcontainers.BindMount(configDir, "/config"),
),
Env: map[string]string{
"SPARK_LOCAL_IP": "localhost",
},
WaitingFor: testcontainers.NewLogStrategy("Ready to accept connections").
WithStartupTimeout(30 * time.Second),
}
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
if err != nil {
t.Fatalf("failed to start spark container: %v", err)
}
env.sparkContainer = container
}
func (env *TestEnvironment) Cleanup(t *testing.T) {
t.Helper()
if env.sparkContainer != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
env.sparkContainer.Terminate(ctx)
}
if env.icebergRestProcess != nil {
env.icebergRestProcess.Process.Kill()
}
if env.masterProcess != nil {
env.masterProcess.Process.Kill()
}
if env.filerProcess != nil {
env.filerProcess.Process.Kill()
}
if env.volumeProcess != nil {
env.volumeProcess.Process.Kill()
}
if env.seaweedfsDataDir != "" {
os.RemoveAll(env.seaweedfsDataDir)
}
}
func randomString(n int) string {
const letters = "abcdefghijklmnopqrstuvwxyz0123456789"
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
func runSparkPySQL(t *testing.T, container testcontainers.Container, sql string) string {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pythonScript := fmt.Sprintf(`
from pyspark.sql import SparkSession
spark = SparkSession.builder \\
.appName("SeaweedFS Iceberg Test") \\
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \\
.config("spark.sql.catalog.iceberg.type", "rest") \\
.config("spark.sql.catalog.iceberg.uri", "http://localhost:8181") \\
.config("spark.sql.catalog.iceberg.s3.endpoint", "http://localhost:8080") \\
.getOrCreate()
result = spark.sql("""
%s
""")
result.show()
`, sql)
code, out, err := container.Exec(ctx, []string{"python", "-c", pythonScript})
if code != 0 {
t.Logf("Spark Python execution failed with code %d: %s\n%v", code, out, err)
return ""
}
return out
}
func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) {
t.Helper()
cmd := exec.Command("aws", "s3api", "create-bucket",
"--bucket", bucketName,
"--endpoint-url", fmt.Sprintf("http://localhost:%d", env.s3Port),
"--access-key", "test",
"--secret-key", "test",
)
if err := cmd.Run(); err != nil {
t.Logf("Warning: failed to create bucket %s: %v", bucketName, err)
}
}

258
test/s3tables/catalog_spark/spark_operations_test.go

@ -0,0 +1,258 @@
package catalog_spark
import (
"fmt"
"strconv"
"strings"
"testing"
"time"
)
// TestSparkCatalogBasicOperations tests basic Spark Iceberg catalog operations
func TestSparkCatalogBasicOperations(t *testing.T) {
env := NewTestEnvironment(t)
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
if !env.dockerAvailable {
t.Skip("Docker not available, skipping Spark integration test")
}
t.Logf(">>> Starting SeaweedFS...")
env.StartSeaweedFS(t)
defer env.Cleanup(t)
catalogBucket := "warehouse"
tableBucket := "iceberg-tables"
createTableBucket(t, env, tableBucket)
createTableBucket(t, env, catalogBucket)
configDir := env.writeSparkConfig(t, catalogBucket)
env.startSparkContainer(t, configDir)
time.Sleep(10 * time.Second) // Wait for Spark to be ready
// Test 1: Create a namespace (database)
t.Logf(">>> Test 1: Creating namespace")
namespace := "spark_test_" + randomString(6)
sparkSQL := fmt.Sprintf(`
spark.sql("CREATE NAMESPACE iceberg.%s")
print("Namespace created")
`, namespace)
output := runSparkPySQL(t, env.sparkContainer, sparkSQL)
if !strings.Contains(output, "Namespace created") {
t.Logf("Warning: namespace creation output: %s", output)
}
// Test 2: Create a table
t.Logf(">>> Test 2: Creating table")
tableName := "test_table_" + randomString(6)
createTableSQL := fmt.Sprintf(`
spark.sql("""
CREATE TABLE iceberg.%s.%s (
id INT,
name STRING,
age INT
)
USING iceberg
""")
print("Table created")
`, namespace, tableName)
output = runSparkPySQL(t, env.sparkContainer, createTableSQL)
if !strings.Contains(output, "Table created") {
t.Logf("Warning: table creation output: %s", output)
}
// Test 3: Insert data
t.Logf(">>> Test 3: Inserting data")
insertDataSQL := fmt.Sprintf(`
spark.sql("""
INSERT INTO iceberg.%s.%s VALUES
(1, 'Alice', 30),
(2, 'Bob', 25),
(3, 'Charlie', 35)
""")
print("Data inserted")
`, namespace, tableName)
output = runSparkPySQL(t, env.sparkContainer, insertDataSQL)
if !strings.Contains(output, "Data inserted") {
t.Logf("Warning: data insertion output: %s", output)
}
// Test 4: Query data
t.Logf(">>> Test 4: Querying data")
querySQL := fmt.Sprintf(`
result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s")
result.show()
count = result.collect()[0]['count']
print(f"Row count: {count}")
`, namespace, tableName)
output = runSparkPySQL(t, env.sparkContainer, querySQL)
if !strings.Contains(output, "Row count: 3") {
t.Logf("Warning: expected row count 3, got output: %s", output)
}
// Test 5: Update data
t.Logf(">>> Test 5: Updating data")
updateSQL := fmt.Sprintf(`
spark.sql("""
UPDATE iceberg.%s.%s SET age = 31 WHERE id = 1
""")
print("Data updated")
`, namespace, tableName)
output = runSparkPySQL(t, env.sparkContainer, updateSQL)
if !strings.Contains(output, "Data updated") {
t.Logf("Warning: data update output: %s", output)
}
// Test 6: Delete data
t.Logf(">>> Test 6: Deleting data")
deleteSQL := fmt.Sprintf(`
spark.sql("""
DELETE FROM iceberg.%s.%s WHERE id = 3
""")
print("Data deleted")
`, namespace, tableName)
output = runSparkPySQL(t, env.sparkContainer, deleteSQL)
if !strings.Contains(output, "Data deleted") {
t.Logf("Warning: data delete output: %s", output)
}
// Verify final count
t.Logf(">>> Verifying final data")
finalCountSQL := fmt.Sprintf(`
result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s")
result.show()
count = result.collect()[0]['count']
print(f"Final row count: {count}")
`, namespace, tableName)
output = runSparkPySQL(t, env.sparkContainer, finalCountSQL)
if !strings.Contains(output, "Final row count: 2") {
t.Logf("Warning: expected final row count 2, got output: %s", output)
}
t.Logf(">>> All tests passed")
}
// TestSparkTimeTravel tests Spark Iceberg time travel capabilities
func TestSparkTimeTravel(t *testing.T) {
env := NewTestEnvironment(t)
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
if !env.dockerAvailable {
t.Skip("Docker not available, skipping Spark integration test")
}
t.Logf(">>> Starting SeaweedFS...")
env.StartSeaweedFS(t)
defer env.Cleanup(t)
catalogBucket := "warehouse"
tableBucket := "iceberg-tables"
createTableBucket(t, env, tableBucket)
createTableBucket(t, env, catalogBucket)
configDir := env.writeSparkConfig(t, catalogBucket)
env.startSparkContainer(t, configDir)
time.Sleep(10 * time.Second)
namespace := "time_travel_test_" + randomString(6)
tableName := "tt_table_" + randomString(6)
// Create namespace and table
setupSQL := fmt.Sprintf(`
spark.sql("CREATE NAMESPACE iceberg.%s")
spark.sql("""
CREATE TABLE iceberg.%s.%s (
id INT,
value INT
)
USING iceberg
""")
print("Setup complete")
`, namespace, namespace, tableName)
runSparkPySQL(t, env.sparkContainer, setupSQL)
// Insert initial data
t.Logf(">>> Inserting initial data")
insertSQL := fmt.Sprintf(`
spark.sql("""
INSERT INTO iceberg.%s.%s VALUES (1, 10)
""")
import time
snapshot_id = spark.sql("SELECT snapshot_id() FROM iceberg.%s.%s").collect()[0][0]
print(f"Snapshot ID: {snapshot_id}")
`, namespace, tableName, namespace, tableName)
output := runSparkPySQL(t, env.sparkContainer, insertSQL)
if !strings.Contains(output, "Snapshot ID:") {
t.Logf("Warning: failed to get snapshot ID: %s", output)
}
// Extract snapshot ID from output
var snapshotID string
lines := strings.Split(output, "\n")
for _, line := range lines {
if strings.Contains(line, "Snapshot ID:") {
parts := strings.Split(line, ":")
if len(parts) > 1 {
snapshotID = strings.TrimSpace(parts[1])
}
}
}
if snapshotID == "" {
t.Logf("Warning: could not extract snapshot ID")
return
}
// Insert more data
t.Logf(">>> Inserting more data")
insertMoreSQL := fmt.Sprintf(`
spark.sql("""
INSERT INTO iceberg.%s.%s VALUES (2, 20)
""")
print("More data inserted")
`, namespace, tableName)
runSparkPySQL(t, env.sparkContainer, insertMoreSQL)
// Time travel to first snapshot
t.Logf(">>> Time traveling to first snapshot")
timeTravelSQL := fmt.Sprintf(`
result = spark.sql("""
SELECT COUNT(*) as count FROM iceberg.%s.%s VERSION AS OF %s
""")
result.show()
count = result.collect()[0]['count']
print(f"Count at snapshot: {count}")
`, namespace, tableName, snapshotID)
output = runSparkPySQL(t, env.sparkContainer, timeTravelSQL)
if !strings.Contains(output, "Count at snapshot: 1") {
t.Logf("Warning: expected count 1 at first snapshot, got: %s", output)
}
t.Logf(">>> Time travel test passed")
}
func mustParseCSVInt64(t *testing.T, csvOutput string) int64 {
t.Helper()
lines := strings.Split(strings.TrimSpace(csvOutput), "\n")
if len(lines) < 2 {
t.Fatalf("expected at least 2 lines in CSV output, got %d: %s", len(lines), csvOutput)
}
// Skip header, get first data row
value := strings.TrimSpace(lines[1])
parsed, err := strconv.ParseInt(value, 10, 64)
if err != nil {
t.Fatalf("failed to parse int64 from %q: %v", value, err)
}
return parsed
}
Loading…
Cancel
Save