From 833bcde9f362058295606340ca2c7497741f8de8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 5 Feb 2026 16:10:31 -0800 Subject: [PATCH] test: add Trino Iceberg catalog integration test - Create test/s3/catalog_trino/trino_catalog_test.go with TestTrinoIcebergCatalog - Tests integration between Trino SQL engine and SeaweedFS Iceberg REST catalog - Starts weed mini with all services and Trino in Docker container - Validates Iceberg catalog schema creation and listing operations - Uses native S3 filesystem support in Trino with path-style access - Add workflow job to s3-tables-tests.yml for CI execution --- .github/workflows/s3-tables-tests.yml | 60 +++ test/s3/catalog_trino/trino_catalog_test.go | 413 ++++++++++++++++++++ 2 files changed, 473 insertions(+) create mode 100644 test/s3/catalog_trino/trino_catalog_test.go diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 8b445b85b..6dff93e29 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -122,6 +122,66 @@ jobs: path: test/s3tables/catalog/test-output.log retention-days: 3 + trino-iceberg-catalog-tests: + name: Trino Iceberg Catalog Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Set up Docker + uses: docker/setup-buildx-action@v3 + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Run Trino Iceberg Catalog Integration Tests + timeout-minutes: 25 + working-directory: test/s3/catalog_trino + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + echo "=== Starting Trino Iceberg Catalog Tests ===" + + # Run Trino + Iceberg catalog integration tests + go test -v -timeout 20m . 2>&1 | tee test-output.log || { + echo "Trino Iceberg catalog integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3/catalog_trino + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker)" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v6 + with: + name: trino-iceberg-catalog-test-logs + path: test/s3/catalog_trino/test-output.log + retention-days: 3 + s3-tables-build-verification: name: S3 Tables Build Verification runs-on: ubuntu-22.04 diff --git a/test/s3/catalog_trino/trino_catalog_test.go b/test/s3/catalog_trino/trino_catalog_test.go new file mode 100644 index 000000000..cd5a72711 --- /dev/null +++ b/test/s3/catalog_trino/trino_catalog_test.go @@ -0,0 +1,413 @@ +package catalog_trino + +import ( + "context" + "crypto/rand" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" +) + +type TestEnvironment struct { + seaweedDir string + weedBinary string + dataDir string + bindIP string + s3Port int + s3GrpcPort int + icebergPort int + masterPort int + masterGrpcPort int + filerPort int + filerGrpcPort int + volumePort int + volumeGrpcPort int + weedProcess *exec.Cmd + weedCancel context.CancelFunc + trinoContainer string + dockerAvailable bool +} + +func TestTrinoIcebergCatalog(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + if !env.dockerAvailable { + t.Skip("Docker not available, skipping Trino integration test") + } + + env.StartSeaweedFS(t) + + catalogBucket := "default" + createTableBucket(t, env, catalogBucket) + createObjectBucket(t, env, catalogBucket) + + configDir := env.writeTrinoConfig(t, catalogBucket) + env.startTrinoContainer(t, configDir) + waitForTrino(t, env.trinoContainer, 60*time.Second) + + schemaName := "trino_" + randomString(6) + + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS iceberg.%s", schemaName)) + output := runTrinoSQL(t, env.trinoContainer, "SHOW SCHEMAS FROM iceberg") + if !strings.Contains(output, schemaName) { + t.Fatalf("Expected schema %s in output:\n%s", schemaName, output) + } + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SHOW TABLES FROM iceberg.%s", schemaName)) +} + +func NewTestEnvironment(t *testing.T) *TestEnvironment { + t.Helper() + + wd, err := os.Getwd() + if err != nil { + t.Fatalf("Failed to get working directory: %v", err) + } + + seaweedDir := wd + for i := 0; i < 6; i++ { + if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { + break + } + seaweedDir = filepath.Dir(seaweedDir) + } + + weedBinary := filepath.Join(seaweedDir, "weed", "weed") + if _, err := os.Stat(weedBinary); os.IsNotExist(err) { + weedBinary = "weed" + if _, err := exec.LookPath(weedBinary); err != nil { + t.Skip("weed binary not found, skipping integration test") + } + } + + dataDir, err := os.MkdirTemp("", "seaweed-trino-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + + bindIP := findBindIP() + + masterPort, masterGrpcPort := mustFreePortPair(t, "Master") + volumePort, volumeGrpcPort := mustFreePortPair(t, "Volume") + filerPort, filerGrpcPort := mustFreePortPair(t, "Filer") + s3Port, s3GrpcPort := mustFreePortPair(t, "S3") + icebergPort := mustFreePort(t, "Iceberg") + + return &TestEnvironment{ + seaweedDir: seaweedDir, + weedBinary: weedBinary, + dataDir: dataDir, + bindIP: bindIP, + s3Port: s3Port, + s3GrpcPort: s3GrpcPort, + icebergPort: icebergPort, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + filerPort: filerPort, + filerGrpcPort: filerGrpcPort, + volumePort: volumePort, + volumeGrpcPort: volumeGrpcPort, + dockerAvailable: hasDocker(), + } +} + +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + securityToml := filepath.Join(env.dataDir, "security.toml") + if err := os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644); err != nil { + t.Fatalf("Failed to create security.toml: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + env.weedCancel = cancel + + cmd := exec.CommandContext(ctx, env.weedBinary, "mini", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), + "-volume.port", fmt.Sprintf("%d", env.volumePort), + "-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort), + "-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort), + "-s3.iam.readOnly=false", + "-ip", env.bindIP, + "-ip.bind", env.bindIP, + "-dir", env.dataDir, + ) + cmd.Dir = env.dataDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + t.Fatalf("Failed to start SeaweedFS: %v", err) + } + env.weedProcess = cmd + + if !env.waitForService(fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort), 30*time.Second) { + t.Fatalf("Iceberg REST API did not become ready") + } +} + +func (env *TestEnvironment) Cleanup(t *testing.T) { + t.Helper() + + if env.trinoContainer != "" { + _ = exec.Command("docker", "rm", "-f", env.trinoContainer).Run() + } + + if env.weedCancel != nil { + env.weedCancel() + } + + if env.weedProcess != nil { + time.Sleep(2 * time.Second) + _ = env.weedProcess.Wait() + } + + if env.dataDir != "" { + _ = os.RemoveAll(env.dataDir) + } +} + +func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool { + client := &http.Client{Timeout: 2 * time.Second} + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + resp, err := client.Get(url) + if err == nil { + resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return true + } + } + time.Sleep(500 * time.Millisecond) + } + return false +} + +func (env *TestEnvironment) writeTrinoConfig(t *testing.T, warehouseBucket string) string { + t.Helper() + + configDir := filepath.Join(env.dataDir, "trino") + if err := os.MkdirAll(configDir, 0755); err != nil { + t.Fatalf("Failed to create Trino config dir: %v", err) + } + + config := fmt.Sprintf(`connector.name=iceberg +iceberg.catalog.type=rest +iceberg.rest-catalog.uri=http://%s:%d +iceberg.rest-catalog.warehouse=s3://%s/ +iceberg.file-format=PARQUET +fs.native-s3.enabled=true +s3.endpoint=http://%s:%d +s3.path-style-access=true +s3.aws-access-key=test +s3.aws-secret-key=test +s3.region=us-west-2 +`, env.bindIP, env.icebergPort, warehouseBucket, env.bindIP, env.s3Port) + + if err := os.WriteFile(filepath.Join(configDir, "iceberg.properties"), []byte(config), 0644); err != nil { + t.Fatalf("Failed to write Trino config: %v", err) + } + + return configDir +} + +func (env *TestEnvironment) startTrinoContainer(t *testing.T, configDir string) { + t.Helper() + + containerName := "seaweed-trino-" + randomString(8) + env.trinoContainer = containerName + + cmd := exec.Command("docker", "run", "-d", + "--name", containerName, + "--add-host", "host.docker.internal:host-gateway", + "-v", fmt.Sprintf("%s:/etc/trino/catalog", configDir), + "-v", fmt.Sprintf("%s:/test", env.dataDir), + "-e", "AWS_ACCESS_KEY_ID=test", + "-e", "AWS_SECRET_ACCESS_KEY=test", + "-e", "AWS_REGION=us-west-2", + "trinodb/trino", + ) + if output, err := cmd.CombinedOutput(); err != nil { + t.Fatalf("Failed to start Trino container: %v\n%s", err, string(output)) + } +} + +func waitForTrino(t *testing.T, containerName string, timeout time.Duration) { + t.Helper() + + deadline := time.Now().Add(timeout) + var lastOutput []byte + for time.Now().Before(deadline) { + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "system", "--schema", "runtime", + "--execute", "SELECT 1", + ) + if output, err := cmd.CombinedOutput(); err == nil { + return + } else { + lastOutput = output + outputStr := string(output) + if strings.Contains(outputStr, "No such container") || + strings.Contains(outputStr, "is not running") { + break + } + } + time.Sleep(1 * time.Second) + } + logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() + t.Fatalf("Timed out waiting for Trino to be ready\nLast output:\n%s\nTrino logs:\n%s", string(lastOutput), string(logs)) +} + +func runTrinoSQL(t *testing.T, containerName, sql string) string { + t.Helper() + + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "system", "--schema", "runtime", + "--output-format", "CSV", + "--execute", sql, + ) + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s", err, sql, string(output)) + } + return string(output) +} + +func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + endpoint := fmt.Sprintf("http://%s:%d/buckets", env.bindIP, env.s3Port) + reqBody := fmt.Sprintf(`{"name":"%s"}`, bucketName) + req, err := http.NewRequest(http.MethodPut, endpoint, strings.NewReader(reqBody)) + if err != nil { + t.Fatalf("Failed to create request: %v", err) + } + req.Header.Set("Content-Type", "application/x-amz-json-1.1") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("Failed to create table bucket %s: %v", bucketName, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusConflict { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("Failed to create table bucket %s, status %d: %s", bucketName, resp.StatusCode, body) + } +} + +func createObjectBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + endpoint := fmt.Sprintf("http://%s:%d/%s", env.bindIP, env.s3Port, bucketName) + req, err := http.NewRequest(http.MethodPut, endpoint, nil) + if err != nil { + t.Fatalf("Failed to create S3 bucket request: %v", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("Failed to create S3 bucket %s: %v", bucketName, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusConflict { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("Failed to create S3 bucket %s, status %d: %s", bucketName, resp.StatusCode, body) + } +} + +func hasDocker() bool { + cmd := exec.Command("docker", "version") + return cmd.Run() == nil +} + +func mustFreePort(t *testing.T, name string) int { + t.Helper() + + port, err := getFreePort() + if err != nil { + t.Fatalf("Failed to get free port for %s: %v", name, err) + } + return port +} + +func mustFreePortPair(t *testing.T, name string) (int, int) { + t.Helper() + + httpPort, grpcPort, err := findAvailablePortPair() + if err != nil { + t.Fatalf("Failed to get free port pair for %s: %v", name, err) + } + return httpPort, grpcPort +} + +func findAvailablePortPair() (int, int, error) { + httpPort, err := getFreePort() + if err != nil { + return 0, 0, err + } + grpcPort, err := getFreePort() + if err != nil { + return 0, 0, err + } + return httpPort, grpcPort, nil +} + +func getFreePort() (int, error) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + defer listener.Close() + + addr := listener.Addr().(*net.TCPAddr) + return addr.Port, nil +} + +func findBindIP() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "127.0.0.1" + } + for _, addr := range addrs { + ipNet, ok := addr.(*net.IPNet) + if !ok || ipNet.IP == nil { + continue + } + ip := ipNet.IP.To4() + if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() { + continue + } + return ip.String() + } + return "127.0.0.1" +} + +func randomString(length int) string { + const charset = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, length) + if _, err := rand.Read(b); err != nil { + panic("failed to generate random string: " + err.Error()) + } + for i := range b { + b[i] = charset[int(b[i])%len(charset)] + } + return string(b) +}