20 changed files with 1114 additions and 120 deletions
-
75.github/workflows/s3-tables-tests.yml
-
23go.mod
-
4go.sum
-
395test/s3tables/catalog_spark/setup_test.go
-
279test/s3tables/catalog_spark/spark_operations_test.go
-
90test/s3tables/catalog_trino/trino_blog_operations_test.go
-
68test/s3tables/catalog_trino/trino_catalog_test.go
-
3test/s3tables/catalog_trino/trino_crud_operations_test.go
-
56test/s3tables/testutil/weed_mini.go
-
4weed/admin/view/app/maintenance_workers.templ
-
2weed/admin/view/app/maintenance_workers_templ.go
-
6weed/filer/filer.go
-
3weed/s3api/bucket_paths.go
-
153weed/s3api/iceberg/iceberg.go
-
1weed/s3api/s3api_object_handlers_list.go
-
4weed/s3api/s3api_server.go
-
8weed/s3api/s3tables/handler_bucket_create.go
-
32weed/s3api/s3tables/handler_namespace.go
-
26weed/s3api/s3tables/handler_table.go
-
2weed/storage/volume_read.go
@ -0,0 +1,395 @@ |
|||
package catalog_spark |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"io" |
|||
"math/rand" |
|||
"net" |
|||
"os" |
|||
"os/exec" |
|||
"path/filepath" |
|||
"strings" |
|||
"sync" |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/aws/aws-sdk-go-v2/aws" |
|||
"github.com/aws/aws-sdk-go-v2/credentials" |
|||
"github.com/aws/aws-sdk-go-v2/service/s3" |
|||
"github.com/seaweedfs/seaweedfs/test/s3tables/testutil" |
|||
"github.com/testcontainers/testcontainers-go" |
|||
) |
|||
|
|||
var ( |
|||
miniProcessMu sync.Mutex |
|||
lastMiniProcess *exec.Cmd |
|||
) |
|||
|
|||
func stopPreviousMini() { |
|||
miniProcessMu.Lock() |
|||
defer miniProcessMu.Unlock() |
|||
|
|||
if lastMiniProcess != nil && lastMiniProcess.Process != nil { |
|||
_ = lastMiniProcess.Process.Kill() |
|||
_ = lastMiniProcess.Wait() |
|||
} |
|||
lastMiniProcess = nil |
|||
} |
|||
|
|||
func registerMiniProcess(cmd *exec.Cmd) { |
|||
miniProcessMu.Lock() |
|||
lastMiniProcess = cmd |
|||
miniProcessMu.Unlock() |
|||
} |
|||
|
|||
func clearMiniProcess(cmd *exec.Cmd) { |
|||
miniProcessMu.Lock() |
|||
if lastMiniProcess == cmd { |
|||
lastMiniProcess = nil |
|||
} |
|||
miniProcessMu.Unlock() |
|||
} |
|||
|
|||
type TestEnvironment struct { |
|||
t *testing.T |
|||
dockerAvailable bool |
|||
seaweedfsDataDir string |
|||
sparkConfigDir string |
|||
masterPort int |
|||
filerPort int |
|||
s3Port int |
|||
icebergRestPort int |
|||
accessKey string |
|||
secretKey string |
|||
sparkContainer testcontainers.Container |
|||
masterProcess *exec.Cmd |
|||
} |
|||
|
|||
func NewTestEnvironment(t *testing.T) *TestEnvironment { |
|||
env := &TestEnvironment{ |
|||
t: t, |
|||
accessKey: "test", |
|||
secretKey: "test", |
|||
} |
|||
|
|||
// Check if Docker is available
|
|||
cmd := exec.Command("docker", "version") |
|||
env.dockerAvailable = cmd.Run() == nil |
|||
|
|||
return env |
|||
} |
|||
|
|||
func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { |
|||
t.Helper() |
|||
|
|||
stopPreviousMini() |
|||
|
|||
var err error |
|||
env.seaweedfsDataDir, err = os.MkdirTemp("", "seaweed-spark-test-") |
|||
if err != nil { |
|||
t.Fatalf("failed to create temp directory: %v", err) |
|||
} |
|||
|
|||
env.masterPort = mustFreePort(t, "Master") |
|||
env.filerPort = mustFreePort(t, "Filer") |
|||
env.s3Port = mustFreePort(t, "S3") |
|||
env.icebergRestPort = mustFreePort(t, "Iceberg") |
|||
|
|||
bindIP := testutil.FindBindIP() |
|||
|
|||
iamConfigPath, err := testutil.WriteIAMConfig(env.seaweedfsDataDir, env.accessKey, env.secretKey) |
|||
if err != nil { |
|||
t.Fatalf("failed to create IAM config: %v", err) |
|||
} |
|||
|
|||
// Start SeaweedFS using weed mini (all-in-one including Iceberg REST)
|
|||
env.masterProcess = exec.Command( |
|||
"weed", "mini", |
|||
"-ip", bindIP, |
|||
"-ip.bind", "0.0.0.0", |
|||
"-master.port", fmt.Sprintf("%d", env.masterPort), |
|||
"-filer.port", fmt.Sprintf("%d", env.filerPort), |
|||
"-s3.port", fmt.Sprintf("%d", env.s3Port), |
|||
"-s3.port.iceberg", fmt.Sprintf("%d", env.icebergRestPort), |
|||
"-s3.config", iamConfigPath, |
|||
"-dir", env.seaweedfsDataDir, |
|||
) |
|||
env.masterProcess.Env = append(os.Environ(), |
|||
"AWS_ACCESS_KEY_ID="+env.accessKey, |
|||
"AWS_SECRET_ACCESS_KEY="+env.secretKey, |
|||
"ICEBERG_WAREHOUSE=s3://iceberg-tables", |
|||
"S3TABLES_DEFAULT_BUCKET=iceberg-tables", |
|||
) |
|||
if err := env.masterProcess.Start(); err != nil { |
|||
t.Fatalf("failed to start weed mini: %v", err) |
|||
} |
|||
registerMiniProcess(env.masterProcess) |
|||
|
|||
// Wait for all services to be ready
|
|||
if !waitForPort(env.masterPort, 15*time.Second) { |
|||
t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort) |
|||
} |
|||
if !waitForPort(env.filerPort, 15*time.Second) { |
|||
t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort) |
|||
} |
|||
if !waitForPort(env.s3Port, 15*time.Second) { |
|||
t.Fatalf("weed mini failed to start - s3 port %d not listening", env.s3Port) |
|||
} |
|||
if !waitForPort(env.icebergRestPort, 15*time.Second) { |
|||
t.Fatalf("weed mini failed to start - iceberg rest port %d not listening", env.icebergRestPort) |
|||
} |
|||
} |
|||
|
|||
func mustFreePort(t *testing.T, name string) int { |
|||
t.Helper() |
|||
|
|||
for i := 0; i < 200; i++ { |
|||
port := 20000 + rand.Intn(30000) |
|||
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port)) |
|||
if err != nil { |
|||
continue |
|||
} |
|||
listener.Close() |
|||
grpcPort := port + 10000 |
|||
if grpcPort > 65535 { |
|||
continue |
|||
} |
|||
grpcListener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", grpcPort)) |
|||
if err != nil { |
|||
continue |
|||
} |
|||
grpcListener.Close() |
|||
return port |
|||
} |
|||
t.Fatalf("failed to get free port for %s", name) |
|||
return 0 |
|||
} |
|||
|
|||
func waitForPort(port int, timeout time.Duration) bool { |
|||
deadline := time.Now().Add(timeout) |
|||
for time.Now().Before(deadline) { |
|||
conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 500*time.Millisecond) |
|||
if err == nil { |
|||
conn.Close() |
|||
return true |
|||
} |
|||
time.Sleep(100 * time.Millisecond) |
|||
} |
|||
return false |
|||
} |
|||
|
|||
func (env *TestEnvironment) writeSparkConfig(t *testing.T, catalogBucket string) string { |
|||
t.Helper() |
|||
|
|||
configDir, err := os.MkdirTemp("", "spark-config-") |
|||
if err != nil { |
|||
t.Fatalf("failed to create config directory: %v", err) |
|||
} |
|||
|
|||
// Store for cleanup
|
|||
env.sparkConfigDir = configDir |
|||
|
|||
s3Endpoint := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) |
|||
catalogEndpoint := fmt.Sprintf("http://host.docker.internal:%d", env.icebergRestPort) |
|||
|
|||
sparkConfig := fmt.Sprintf(` |
|||
[spark] |
|||
master = "local" |
|||
app.name = "SeaweedFS Iceberg Test" |
|||
|
|||
[storage] |
|||
s3.endpoint = "%s" |
|||
s3.access-key = "test" |
|||
s3.secret-key = "test" |
|||
s3.path-style-access = "true" |
|||
s3.bucket = "%s" |
|||
|
|||
[iceberg] |
|||
catalog.type = "rest" |
|||
catalog.uri = "%s" |
|||
catalog.s3.endpoint = "%s" |
|||
catalog.s3.access-key = "test" |
|||
catalog.s3.secret-key = "test" |
|||
catalog.s3.path-style-access = "true" |
|||
`, s3Endpoint, catalogBucket, catalogEndpoint, s3Endpoint) |
|||
|
|||
configPath := filepath.Join(configDir, "spark-config.ini") |
|||
if err := os.WriteFile(configPath, []byte(sparkConfig), 0644); err != nil { |
|||
t.Fatalf("failed to write spark config: %v", err) |
|||
} |
|||
|
|||
return configDir |
|||
} |
|||
|
|||
func (env *TestEnvironment) startSparkContainer(t *testing.T, configDir string) { |
|||
t.Helper() |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) |
|||
defer cancel() |
|||
|
|||
req := testcontainers.ContainerRequest{ |
|||
Image: "apache/spark:3.5.1", |
|||
ExposedPorts: []string{"4040/tcp"}, |
|||
Mounts: testcontainers.Mounts( |
|||
testcontainers.BindMount(configDir, "/config"), |
|||
), |
|||
Env: map[string]string{ |
|||
"SPARK_LOCAL_IP": "localhost", |
|||
}, |
|||
ExtraHosts: []string{"host.docker.internal:host-gateway"}, |
|||
Cmd: []string{"/bin/sh", "-c", "sleep 3600"}, |
|||
} |
|||
|
|||
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ |
|||
ContainerRequest: req, |
|||
Started: true, |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("failed to start spark container: %v", err) |
|||
} |
|||
|
|||
env.sparkContainer = container |
|||
} |
|||
|
|||
func (env *TestEnvironment) Cleanup(t *testing.T) { |
|||
t.Helper() |
|||
|
|||
// Kill weed mini process
|
|||
if env.masterProcess != nil && env.masterProcess.Process != nil { |
|||
env.masterProcess.Process.Kill() |
|||
env.masterProcess.Wait() |
|||
} |
|||
clearMiniProcess(env.masterProcess) |
|||
|
|||
// Stop Spark container
|
|||
if env.sparkContainer != nil { |
|||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
|||
defer cancel() |
|||
env.sparkContainer.Terminate(ctx) |
|||
} |
|||
|
|||
// Remove temporary directories after processes are stopped
|
|||
if env.seaweedfsDataDir != "" { |
|||
os.RemoveAll(env.seaweedfsDataDir) |
|||
} |
|||
if env.sparkConfigDir != "" { |
|||
os.RemoveAll(env.sparkConfigDir) |
|||
} |
|||
} |
|||
|
|||
func randomString(n int) string { |
|||
const letters = "abcdefghijklmnopqrstuvwxyz0123456789" |
|||
b := make([]byte, n) |
|||
for i := range b { |
|||
b[i] = letters[rand.Intn(len(letters))] |
|||
} |
|||
return string(b) |
|||
} |
|||
|
|||
func runSparkPySQL(t *testing.T, container testcontainers.Container, sql string, icebergPort int, s3Port int) string { |
|||
t.Helper() |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) |
|||
defer cancel() |
|||
|
|||
pythonScript := fmt.Sprintf(` |
|||
import glob |
|||
import os |
|||
import sys |
|||
|
|||
spark_home = os.environ.get("SPARK_HOME", "/opt/spark") |
|||
python_path = os.path.join(spark_home, "python") |
|||
py4j_glob = glob.glob(os.path.join(python_path, "lib", "py4j-*.zip")) |
|||
ivy_dir = "/tmp/ivy" |
|||
os.makedirs(ivy_dir, exist_ok=True) |
|||
os.environ["AWS_REGION"] = "us-west-2" |
|||
os.environ["AWS_DEFAULT_REGION"] = "us-west-2" |
|||
os.environ["AWS_ACCESS_KEY_ID"] = "test" |
|||
os.environ["AWS_SECRET_ACCESS_KEY"] = "test" |
|||
if python_path not in sys.path: |
|||
sys.path.insert(0, python_path) |
|||
if py4j_glob and py4j_glob[0] not in sys.path: |
|||
sys.path.insert(0, py4j_glob[0]) |
|||
|
|||
from pyspark.sql import SparkSession |
|||
|
|||
spark = (SparkSession.builder |
|||
.appName("SeaweedFS Iceberg Test") |
|||
.config("spark.jars.ivy", ivy_dir) |
|||
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2") |
|||
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") |
|||
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") |
|||
.config("spark.sql.catalog.iceberg.type", "rest") |
|||
.config("spark.sql.catalog.iceberg.metrics-reporter-impl", "org.apache.iceberg.metrics.LoggingMetricsReporter") |
|||
.config("spark.sql.catalog.iceberg.uri", "http://host.docker.internal:%d") |
|||
.config("spark.sql.catalog.iceberg.rest.auth.type", "sigv4") |
|||
.config("spark.sql.catalog.iceberg.rest.auth.sigv4.delegate-auth-type", "none") |
|||
.config("spark.sql.catalog.iceberg.rest.sigv4-enabled", "true") |
|||
.config("spark.sql.catalog.iceberg.rest.signing-region", "us-west-2") |
|||
.config("spark.sql.catalog.iceberg.rest.signing-name", "s3") |
|||
.config("spark.sql.catalog.iceberg.rest.access-key-id", "test") |
|||
.config("spark.sql.catalog.iceberg.rest.secret-access-key", "test") |
|||
.config("spark.sql.catalog.iceberg.s3.endpoint", "http://host.docker.internal:%d") |
|||
.config("spark.sql.catalog.iceberg.s3.region", "us-west-2") |
|||
.config("spark.sql.catalog.iceberg.s3.access-key", "test") |
|||
.config("spark.sql.catalog.iceberg.s3.secret-key", "test") |
|||
.config("spark.sql.catalog.iceberg.s3.path-style-access", "true") |
|||
.config("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") |
|||
.getOrCreate()) |
|||
|
|||
%s |
|||
`, icebergPort, s3Port, sql) |
|||
|
|||
code, out, err := container.Exec(ctx, []string{"python3", "-c", pythonScript}) |
|||
var output string |
|||
if out != nil { |
|||
outputBytes, readErr := io.ReadAll(out) |
|||
if readErr != nil { |
|||
t.Logf("failed to read output: %v", readErr) |
|||
} else { |
|||
output = string(outputBytes) |
|||
} |
|||
} |
|||
if code != 0 { |
|||
t.Logf("Spark Python execution failed with code %d: %v, output: %s", code, err, output) |
|||
return output |
|||
} |
|||
|
|||
return output |
|||
} |
|||
|
|||
func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { |
|||
t.Helper() |
|||
|
|||
masterGrpcPort := env.masterPort + 10000 |
|||
cmd := exec.Command("weed", "shell", |
|||
fmt.Sprintf("-master=localhost:%d.%d", env.masterPort, masterGrpcPort), |
|||
) |
|||
cmd.Stdin = strings.NewReader(fmt.Sprintf("s3tables.bucket -create -name %s -account 000000000000\nexit\n", bucketName)) |
|||
output, err := cmd.CombinedOutput() |
|||
if err != nil { |
|||
t.Fatalf("failed to create table bucket %s via weed shell: %v\nOutput: %s", bucketName, err, string(output)) |
|||
} |
|||
} |
|||
|
|||
func createObjectBucket(t *testing.T, env *TestEnvironment, bucketName string) { |
|||
t.Helper() |
|||
|
|||
cfg := aws.Config{ |
|||
Region: "us-east-1", |
|||
Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "")), |
|||
BaseEndpoint: aws.String(fmt.Sprintf("http://localhost:%d", env.s3Port)), |
|||
} |
|||
|
|||
client := s3.NewFromConfig(cfg, func(o *s3.Options) { |
|||
o.UsePathStyle = true |
|||
}) |
|||
|
|||
_, err := client.CreateBucket(context.Background(), &s3.CreateBucketInput{ |
|||
Bucket: aws.String(bucketName), |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("failed to create object bucket %s: %v", bucketName, err) |
|||
} |
|||
} |
|||
@ -0,0 +1,279 @@ |
|||
package catalog_spark |
|||
|
|||
import ( |
|||
"fmt" |
|||
"regexp" |
|||
"strings" |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/testcontainers/testcontainers-go" |
|||
) |
|||
|
|||
// waitForSparkReady polls Spark to verify it's ready by executing a simple query
|
|||
func waitForSparkReady(t *testing.T, container testcontainers.Container, icebergPort int, s3Port int, timeout time.Duration) { |
|||
t.Helper() |
|||
|
|||
deadline := time.Now().Add(timeout) |
|||
for time.Now().Before(deadline) { |
|||
output := runSparkPySQL(t, container, ` |
|||
spark.sql("SELECT 1 as test") |
|||
print("Spark ready") |
|||
`, icebergPort, s3Port) |
|||
if strings.Contains(output, "Spark ready") { |
|||
return |
|||
} |
|||
time.Sleep(5 * time.Second) |
|||
} |
|||
t.Fatalf("Spark did not become ready within %v", timeout) |
|||
} |
|||
|
|||
// setupSparkTestEnv initializes a test environment with SeaweedFS and Spark containers
|
|||
func setupSparkTestEnv(t *testing.T) (*TestEnvironment, string, string) { |
|||
t.Helper() |
|||
|
|||
env := NewTestEnvironment(t) |
|||
|
|||
if !env.dockerAvailable { |
|||
t.Skip("Docker not available, skipping Spark integration test") |
|||
} |
|||
|
|||
t.Logf(">>> Starting SeaweedFS...") |
|||
env.StartSeaweedFS(t) |
|||
t.Cleanup(func() { env.Cleanup(t) }) |
|||
|
|||
tableBucket := "iceberg-tables" |
|||
catalogBucket := tableBucket |
|||
createTableBucket(t, env, tableBucket) |
|||
|
|||
configDir := env.writeSparkConfig(t, catalogBucket) |
|||
env.startSparkContainer(t, configDir) |
|||
|
|||
// Poll for Spark readiness instead of fixed sleep
|
|||
waitForSparkReady(t, env.sparkContainer, env.icebergRestPort, env.s3Port, 10*time.Minute) |
|||
|
|||
return env, catalogBucket, tableBucket |
|||
} |
|||
|
|||
// TestSparkCatalogBasicOperations tests basic Spark Iceberg catalog operations
|
|||
func TestSparkCatalogBasicOperations(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping integration test in short mode") |
|||
} |
|||
|
|||
env, _, _ := setupSparkTestEnv(t) |
|||
|
|||
// Test 1: Create a namespace (database)
|
|||
t.Logf(">>> Test 1: Creating namespace") |
|||
namespace := "spark_test_" + randomString(6) |
|||
sparkSQL := fmt.Sprintf(` |
|||
spark.sql("CREATE NAMESPACE iceberg.%s") |
|||
print("Namespace created") |
|||
`, namespace) |
|||
output := runSparkPySQL(t, env.sparkContainer, sparkSQL, env.icebergRestPort, env.s3Port) |
|||
if !strings.Contains(output, "Namespace created") { |
|||
t.Fatalf("namespace creation failed, output: %s", output) |
|||
} |
|||
|
|||
// Test 2: Create a table
|
|||
t.Logf(">>> Test 2: Creating table") |
|||
tableName := "test_table_" + randomString(6) |
|||
createTableSQL := fmt.Sprintf(` |
|||
spark.sql(""" |
|||
CREATE TABLE iceberg.%s.%s ( |
|||
id INT, |
|||
name STRING, |
|||
age INT |
|||
) |
|||
USING iceberg |
|||
""") |
|||
print("Table created") |
|||
`, namespace, tableName) |
|||
output = runSparkPySQL(t, env.sparkContainer, createTableSQL, env.icebergRestPort, env.s3Port) |
|||
if !strings.Contains(output, "Table created") { |
|||
t.Fatalf("table creation failed, output: %s", output) |
|||
} |
|||
|
|||
// Test 3: Insert data
|
|||
t.Logf(">>> Test 3: Inserting data") |
|||
insertDataSQL := fmt.Sprintf(` |
|||
spark.sql(""" |
|||
INSERT INTO iceberg.%s.%s VALUES |
|||
(1, 'Alice', 30), |
|||
(2, 'Bob', 25), |
|||
(3, 'Charlie', 35) |
|||
""") |
|||
print("Data inserted") |
|||
`, namespace, tableName) |
|||
output = runSparkPySQL(t, env.sparkContainer, insertDataSQL, env.icebergRestPort, env.s3Port) |
|||
if !strings.Contains(output, "Data inserted") { |
|||
t.Fatalf("data insertion failed, output: %s", output) |
|||
} |
|||
|
|||
// Test 4: Query data
|
|||
t.Logf(">>> Test 4: Querying data") |
|||
querySQL := fmt.Sprintf(` |
|||
result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s") |
|||
result.show() |
|||
count = result.collect()[0]['count'] |
|||
print(f"Row count: {count}") |
|||
`, namespace, tableName) |
|||
output = runSparkPySQL(t, env.sparkContainer, querySQL, env.icebergRestPort, env.s3Port) |
|||
if !strings.Contains(output, "Row count: 3") { |
|||
t.Errorf("expected row count 3, got output: %s", output) |
|||
} |
|||
|
|||
// Test 5: Update data
|
|||
t.Logf(">>> Test 5: Updating data") |
|||
updateSQL := fmt.Sprintf(` |
|||
spark.sql(""" |
|||
UPDATE iceberg.%s.%s SET age = 31 WHERE id = 1 |
|||
""") |
|||
print("Data updated") |
|||
`, namespace, tableName) |
|||
output = runSparkPySQL(t, env.sparkContainer, updateSQL, env.icebergRestPort, env.s3Port) |
|||
if !strings.Contains(output, "Data updated") { |
|||
t.Errorf("data update failed, output: %s", output) |
|||
} |
|||
|
|||
// Test 6: Delete data
|
|||
t.Logf(">>> Test 6: Deleting data") |
|||
deleteSQL := fmt.Sprintf(` |
|||
spark.sql(""" |
|||
DELETE FROM iceberg.%s.%s WHERE id = 3 |
|||
""") |
|||
print("Data deleted") |
|||
`, namespace, tableName) |
|||
output = runSparkPySQL(t, env.sparkContainer, deleteSQL, env.icebergRestPort, env.s3Port) |
|||
if !strings.Contains(output, "Data deleted") { |
|||
t.Errorf("data delete failed, output: %s", output) |
|||
} |
|||
|
|||
// Verify final count
|
|||
t.Logf(">>> Verifying final data") |
|||
finalCountSQL := fmt.Sprintf(` |
|||
result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s") |
|||
result.show() |
|||
count = result.collect()[0]['count'] |
|||
print(f"Final row count: {count}") |
|||
`, namespace, tableName) |
|||
output = runSparkPySQL(t, env.sparkContainer, finalCountSQL, env.icebergRestPort, env.s3Port) |
|||
if !strings.Contains(output, "Final row count: 2") { |
|||
t.Errorf("expected final row count 2, got output: %s", output) |
|||
} |
|||
|
|||
t.Logf(">>> All tests passed") |
|||
} |
|||
|
|||
// TestSparkTimeTravel tests Spark Iceberg time travel capabilities
|
|||
func TestSparkTimeTravel(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping integration test in short mode") |
|||
} |
|||
|
|||
env, _, _ := setupSparkTestEnv(t) |
|||
|
|||
namespace := "time_travel_test_" + randomString(6) |
|||
tableName := "tt_table_" + randomString(6) |
|||
|
|||
// Create namespace and table
|
|||
setupSQL := fmt.Sprintf(` |
|||
spark.sql("CREATE NAMESPACE iceberg.%s") |
|||
spark.sql(""" |
|||
CREATE TABLE iceberg.%s.%s ( |
|||
id INT, |
|||
value INT |
|||
) |
|||
USING iceberg |
|||
""") |
|||
print("Setup complete") |
|||
`, namespace, namespace, tableName) |
|||
output := runSparkPySQL(t, env.sparkContainer, setupSQL, env.icebergRestPort, env.s3Port) |
|||
if !strings.Contains(output, "Setup complete") { |
|||
t.Fatalf("setup failed for namespace %s and table %s, output: %s", namespace, tableName, output) |
|||
} |
|||
|
|||
// Insert initial data
|
|||
t.Logf(">>> Inserting initial data") |
|||
insertSQL := fmt.Sprintf(` |
|||
import time |
|||
from datetime import timedelta |
|||
spark.sql(""" |
|||
INSERT INTO iceberg.%s.%s VALUES (1, 10) |
|||
""") |
|||
ts = None |
|||
for _ in range(10): |
|||
try: |
|||
ts = spark.sql("SELECT committed_at FROM iceberg.%s.%s.snapshots ORDER BY committed_at DESC LIMIT 1").collect()[0]["committed_at"] |
|||
if ts is not None: |
|||
break |
|||
except Exception as e: |
|||
print(f"Snapshot query failed: {e}") |
|||
time.sleep(1) |
|||
if ts is None: |
|||
raise RuntimeError("Failed to read snapshot committed_at") |
|||
ts_for_time_travel = ts + timedelta(seconds=1) |
|||
print(f"Snapshot timestamp: {ts_for_time_travel.strftime('%%Y-%%m-%%d %%H:%%M:%%S')}") |
|||
`, namespace, tableName, namespace, tableName) |
|||
output = runSparkPySQL(t, env.sparkContainer, insertSQL, env.icebergRestPort, env.s3Port) |
|||
if !strings.Contains(output, "Snapshot timestamp:") { |
|||
t.Fatalf("failed to get snapshot timestamp: %s", output) |
|||
} |
|||
|
|||
// Extract snapshot timestamp from output - look specifically for the "Snapshot timestamp:" line
|
|||
var snapshotTS string |
|||
tsRe := regexp.MustCompile(`Snapshot timestamp:\s*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})`) |
|||
matches := tsRe.FindStringSubmatch(output) |
|||
if len(matches) > 1 { |
|||
snapshotTS = matches[1] |
|||
} |
|||
|
|||
if snapshotTS == "" { |
|||
t.Fatalf("could not extract snapshot timestamp from output: %s", output) |
|||
} |
|||
|
|||
// Wait to ensure the next insert gets a distinct snapshot timestamp
|
|||
time.Sleep(2 * time.Second) |
|||
|
|||
// Insert more data
|
|||
t.Logf(">>> Inserting more data") |
|||
insertMoreSQL := fmt.Sprintf(` |
|||
spark.sql(""" |
|||
INSERT INTO iceberg.%s.%s VALUES (2, 20) |
|||
""") |
|||
print("More data inserted") |
|||
`, namespace, tableName) |
|||
output = runSparkPySQL(t, env.sparkContainer, insertMoreSQL, env.icebergRestPort, env.s3Port) |
|||
if !strings.Contains(output, "More data inserted") { |
|||
t.Fatalf("failed to insert more data, output: %s", output) |
|||
} |
|||
|
|||
// Verify count increased to 2
|
|||
t.Logf(">>> Verifying row count after second insert") |
|||
verifySQL := fmt.Sprintf(` |
|||
result = spark.sql("SELECT COUNT(*) as count FROM iceberg.%s.%s") |
|||
count = result.collect()[0]['count'] |
|||
print(f"Current row count: {count}") |
|||
`, namespace, tableName) |
|||
output = runSparkPySQL(t, env.sparkContainer, verifySQL, env.icebergRestPort, env.s3Port) |
|||
if !strings.Contains(output, "Current row count: 2") { |
|||
t.Fatalf("expected current row count 2 after second insert, got output: %s", output) |
|||
} |
|||
|
|||
// Time travel to first snapshot
|
|||
t.Logf(">>> Time traveling to first snapshot") |
|||
timeTravelSQL := fmt.Sprintf(` |
|||
result = spark.sql(""" |
|||
SELECT COUNT(*) as count FROM iceberg.%s.%s TIMESTAMP AS OF '%s' |
|||
""") |
|||
result.show() |
|||
count = result.collect()[0]['count'] |
|||
print(f"Count at snapshot: {count}") |
|||
`, namespace, tableName, snapshotTS) |
|||
output = runSparkPySQL(t, env.sparkContainer, timeTravelSQL, env.icebergRestPort, env.s3Port) |
|||
if !strings.Contains(output, "Count at snapshot: 1") { |
|||
t.Errorf("expected count 1 at first snapshot, got: %s", output) |
|||
} |
|||
|
|||
t.Logf(">>> Time travel test passed") |
|||
} |
|||
@ -0,0 +1,56 @@ |
|||
package testutil |
|||
|
|||
import ( |
|||
"fmt" |
|||
"net" |
|||
"os" |
|||
"path/filepath" |
|||
) |
|||
|
|||
func FindBindIP() string { |
|||
addrs, err := net.InterfaceAddrs() |
|||
if err != nil { |
|||
return "127.0.0.1" |
|||
} |
|||
for _, addr := range addrs { |
|||
ipNet, ok := addr.(*net.IPNet) |
|||
if !ok || ipNet.IP == nil { |
|||
continue |
|||
} |
|||
ip := ipNet.IP.To4() |
|||
if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() { |
|||
continue |
|||
} |
|||
return ip.String() |
|||
} |
|||
return "127.0.0.1" |
|||
} |
|||
|
|||
func WriteIAMConfig(dir, accessKey, secretKey string) (string, error) { |
|||
iamConfigPath := filepath.Join(dir, "iam_config.json") |
|||
iamConfig := fmt.Sprintf(`{ |
|||
"identities": [ |
|||
{ |
|||
"name": "admin", |
|||
"credentials": [ |
|||
{ |
|||
"accessKey": "%s", |
|||
"secretKey": "%s" |
|||
} |
|||
], |
|||
"actions": [ |
|||
"Admin", |
|||
"Read", |
|||
"List", |
|||
"Tagging", |
|||
"Write" |
|||
] |
|||
} |
|||
] |
|||
}`, accessKey, secretKey) |
|||
|
|||
if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil { |
|||
return "", err |
|||
} |
|||
return iamConfigPath, nil |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue