From b8ef48c8f12e93f14a48d372b590f5e3a9a6e6d7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 11 Feb 2026 22:00:06 -0800 Subject: [PATCH] Add RisingWave catalog tests (#8308) * Add RisingWave catalog tests for S3 tables * Add RisingWave catalog integration tests to CI workflow * Refactor RisingWave catalog tests based on PR feedback * Address PR feedback: optimize checks, cleanup logs * fix tests * consistent --- .github/workflows/s3-tables-tests.yml | 68 +++ .../risingwave_catalog_test.go | 79 +++ .../s3tables/catalog_risingwave/setup_test.go | 481 ++++++++++++++++++ 3 files changed, 628 insertions(+) create mode 100644 test/s3tables/catalog_risingwave/risingwave_catalog_test.go create mode 100644 test/s3tables/catalog_risingwave/setup_test.go diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 5f6ee7b91..df365956d 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -260,6 +260,74 @@ jobs: path: test/s3tables/catalog_spark/test-output.log retention-days: 3 + risingwave-catalog-tests: + name: RisingWave Catalog Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Set up Docker + uses: docker/setup-buildx-action@v3 + + - name: Pre-pull RisingWave image + run: | + docker pull risingwavelabs/risingwave:v2.5.0 + docker pull postgres:16-alpine + + - name: Run go mod tidy + run: go mod tidy + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Run RisingWave Catalog Integration Tests + timeout-minutes: 25 + working-directory: test/s3tables/catalog_risingwave + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + echo "=== Starting RisingWave Catalog Tests ===" + + # Run RisingWave catalog integration tests + go test -v -timeout 20m . 2>&1 | tee test-output.log || { + echo "RisingWave catalog integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3tables/catalog_risingwave + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker)" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v6 + with: + name: risingwave-catalog-test-logs + path: test/s3tables/catalog_risingwave/test-output.log + retention-days: 3 + s3-tables-build-verification: name: S3 Tables Build Verification runs-on: ubuntu-22.04 diff --git a/test/s3tables/catalog_risingwave/risingwave_catalog_test.go b/test/s3tables/catalog_risingwave/risingwave_catalog_test.go new file mode 100644 index 000000000..e2d269904 --- /dev/null +++ b/test/s3tables/catalog_risingwave/risingwave_catalog_test.go @@ -0,0 +1,79 @@ +package catalog_risingwave + +import ( + "fmt" + "strings" + "testing" +) + +func TestRisingWaveIcebergCatalog(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + if !env.dockerAvailable { + t.Skip("Docker not available, skipping RisingWave integration test") + } + + t.Log(">>> Starting SeaweedFS...") + env.StartSeaweedFS(t) + t.Log(">>> SeaweedFS started.") + + tableBucket := "iceberg-tables" + t.Logf(">>> Creating table bucket: %s", tableBucket) + createTableBucket(t, env, tableBucket) + + t.Log(">>> Starting RisingWave...") + env.StartRisingWave(t) + t.Log(">>> RisingWave started.") + + // Create Iceberg namespace + createIcebergNamespace(t, env, "default") + + // Create a catalog in RisingWave that points to SeaweedFS Iceberg REST API + icebergUri := env.dockerIcebergEndpoint() + s3Endpoint := env.dockerS3Endpoint() + + tableName := "test_table_" + randomString(6) + createIcebergTable(t, env, tableBucket, "default", tableName) + + sourceName := "test_source_" + randomString(6) + createSourceSql := fmt.Sprintf(` +CREATE SOURCE %s WITH ( + connector = 'iceberg', + catalog.type = 'rest', + catalog.uri = '%s', + catalog.name = 'default', + database.name = 'default', + table.name = '%s', + warehouse.path = 's3://%s', + s3.endpoint = '%s', + s3.region = 'us-east-1', + s3.access.key = '%s', + s3.secret.key = '%s', + s3.path.style.access = 'true', + catalog.rest.sigv4_enabled = 'true', + catalog.rest.signing_region = 'us-east-1', + catalog.rest.signing_name = 's3' +);`, sourceName, icebergUri, tableName, tableBucket, s3Endpoint, env.accessKey, env.secretKey) + + t.Logf(">>> Creating source %s...", sourceName) + runRisingWaveSQL(t, env.postgresSidecar, createSourceSql) + + showSourcesOutput := runRisingWaveSQL(t, env.postgresSidecar, "SHOW SOURCES;") + if !strings.Contains(showSourcesOutput, sourceName) { + t.Fatalf("Expected source %s in SHOW SOURCES output:\n%s", sourceName, showSourcesOutput) + } + + describeOutput := runRisingWaveSQL(t, env.postgresSidecar, fmt.Sprintf("DESCRIBE %s;", sourceName)) + if !strings.Contains(describeOutput, "id") || !strings.Contains(describeOutput, "name") { + t.Fatalf("Expected id/name columns in DESCRIBE output:\n%s", describeOutput) + } + + runRisingWaveSQL(t, env.postgresSidecar, fmt.Sprintf("SELECT * FROM %s LIMIT 0;", sourceName)) + + t.Log(">>> RisingWave Iceberg Catalog test passed!") +} diff --git a/test/s3tables/catalog_risingwave/setup_test.go b/test/s3tables/catalog_risingwave/setup_test.go new file mode 100644 index 000000000..47a4bbf89 --- /dev/null +++ b/test/s3tables/catalog_risingwave/setup_test.go @@ -0,0 +1,481 @@ +package catalog_risingwave + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math/rand" + "net" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" +) + +var ( + miniProcessMu sync.Mutex + lastMiniProcess *exec.Cmd +) + +func stopPreviousMini() { + miniProcessMu.Lock() + defer miniProcessMu.Unlock() + + if lastMiniProcess != nil && lastMiniProcess.Process != nil { + _ = lastMiniProcess.Process.Kill() + _ = lastMiniProcess.Wait() + } + lastMiniProcess = nil +} + +func registerMiniProcess(cmd *exec.Cmd) { + miniProcessMu.Lock() + lastMiniProcess = cmd + miniProcessMu.Unlock() +} + +func clearMiniProcess(cmd *exec.Cmd) { + miniProcessMu.Lock() + if lastMiniProcess == cmd { + lastMiniProcess = nil + } + miniProcessMu.Unlock() +} + +type TestEnvironment struct { + t *testing.T + dockerAvailable bool + seaweedfsDataDir string + masterPort int + filerPort int + s3Port int + icebergRestPort int + risingwavePort int + bindIP string + accessKey string + secretKey string + risingwaveContainer string + postgresSidecar string + masterProcess *exec.Cmd + logFile *os.File +} + +func NewTestEnvironment(t *testing.T) *TestEnvironment { + env := &TestEnvironment{ + t: t, + accessKey: "AKIAIOSFODNN7EXAMPLE", + secretKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + } + + // Check if Docker is available + cmd := exec.Command("docker", "version") + env.dockerAvailable = cmd.Run() == nil + + return env +} + +func (env *TestEnvironment) hostMasterAddress() string { + return fmt.Sprintf("127.0.0.1:%d", env.masterPort) +} + +func (env *TestEnvironment) hostS3Endpoint() string { + return fmt.Sprintf("http://127.0.0.1:%d", env.s3Port) +} + +func (env *TestEnvironment) hostIcebergEndpoint() string { + return fmt.Sprintf("http://127.0.0.1:%d", env.icebergRestPort) +} + +func (env *TestEnvironment) dockerS3Endpoint() string { + return fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) +} + +func (env *TestEnvironment) dockerIcebergEndpoint() string { + return fmt.Sprintf("http://host.docker.internal:%d", env.icebergRestPort) +} + +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + stopPreviousMini() + + var err error + env.seaweedfsDataDir, err = os.MkdirTemp("", "seaweed-risingwave-test-") + if err != nil { + t.Fatalf("failed to create temp directory: %v", err) + } + + env.masterPort = mustFreePort(t, "Master") + env.filerPort = mustFreePort(t, "Filer") + env.s3Port = mustFreePort(t, "S3") + env.icebergRestPort = mustFreePort(t, "Iceberg") + env.risingwavePort = mustFreePort(t, "RisingWave") + + env.bindIP = testutil.FindBindIP() + + iamConfigPath, err := testutil.WriteIAMConfig(env.seaweedfsDataDir, env.accessKey, env.secretKey) + if err != nil { + t.Fatalf("failed to create IAM config: %v", err) + } + + // Create log file for SeaweedFS + logFile, err := os.Create(filepath.Join(env.seaweedfsDataDir, "seaweedfs.log")) + if err != nil { + t.Fatalf("failed to create log file: %v", err) + } + env.logFile = logFile + + // Start SeaweedFS using weed mini (all-in-one including Iceberg REST) + env.masterProcess = exec.Command( + "weed", "mini", + "-ip", env.bindIP, + "-ip.bind", "0.0.0.0", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.iceberg", fmt.Sprintf("%d", env.icebergRestPort), + "-s3.config", iamConfigPath, + "-dir", env.seaweedfsDataDir, + ) + env.masterProcess.Stdout = logFile + env.masterProcess.Stderr = logFile + env.masterProcess.Env = append(os.Environ(), + "AWS_ACCESS_KEY_ID="+env.accessKey, + "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "ICEBERG_WAREHOUSE=s3://iceberg-tables", + "S3TABLES_DEFAULT_BUCKET=iceberg-tables", + ) + if err := env.masterProcess.Start(); err != nil { + t.Fatalf("failed to start weed mini: %v", err) + } + registerMiniProcess(env.masterProcess) + + // Wait for all services to be ready + if !waitForPort(env.masterPort, 15*time.Second) { + t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort) + } + if !waitForPort(env.filerPort, 15*time.Second) { + t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort) + } + if !waitForPort(env.s3Port, 15*time.Second) { + t.Fatalf("weed mini failed to start - s3 port %d not listening", env.s3Port) + } + if !waitForPort(env.icebergRestPort, 15*time.Second) { + t.Fatalf("weed mini failed to start - iceberg rest port %d not listening", env.icebergRestPort) + } +} + +func mustFreePort(t *testing.T, name string) int { + t.Helper() + minPort := 10000 + maxPort := 55000 // Ensure port+10000 < 65535 + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + for i := 0; i < 1000; i++ { + port := minPort + r.Intn(maxPort-minPort) + + // Check http port + ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if err != nil { + continue + } + ln.Close() + + // Check grpc port (weed mini uses port+10000) + ln2, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port+10000)) + if err != nil { + continue + } + ln2.Close() + + return port + } + t.Fatalf("failed to find a free port < %d for %s after 1000 attempts", maxPort, name) + return 0 +} + +func waitForPort(port int, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 500*time.Millisecond) + if err == nil { + conn.Close() + return true + } + time.Sleep(100 * time.Millisecond) + } + return false +} + +func (env *TestEnvironment) StartRisingWave(t *testing.T) { + t.Helper() + + containerName := "seaweed-risingwave-" + randomString(8) + env.risingwaveContainer = containerName + + cmd := exec.Command("docker", "run", "-d", + "--name", containerName, + "-p", fmt.Sprintf("%d:4566", env.risingwavePort), + "--add-host", "host.docker.internal:host-gateway", + "-e", "AWS_ACCESS_KEY_ID="+env.accessKey, + "-e", "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "-e", "AWS_REGION=us-east-1", + "-e", "AWS_S3_PATH_STYLE_ACCESS=true", + "-e", "AWS_S3_FORCE_PATH_STYLE=true", + "risingwavelabs/risingwave:v2.5.0", + "playground", + ) + if output, err := cmd.CombinedOutput(); err != nil { + t.Fatalf("failed to start RisingWave container: %v\n%s", err, string(output)) + } + + // Start a sidecar postgres container for running psql commands + sidecarName := "seaweed-risingwave-sidecar-" + randomString(8) + env.postgresSidecar = sidecarName + sidecarCmd := exec.Command("docker", "run", "-d", "--rm", + "--name", sidecarName, + "--network", fmt.Sprintf("container:%s", containerName), + "postgres:16-alpine", + "sleep", "infinity", + ) + if output, err := sidecarCmd.CombinedOutput(); err != nil { + t.Fatalf("failed to start postgres sidecar: %v\n%s", err, string(output)) + } + + // Wait for RisingWave port to be open on host + if !waitForPort(env.risingwavePort, 120*time.Second) { + t.Fatalf("timed out waiting for RisingWave port %d to be open", env.risingwavePort) + } + + // Wait for RisingWave to be truly ready via psql in the sidecar. + if !env.waitForRisingWave(120 * time.Second) { + t.Fatalf("timed out waiting for RisingWave to be ready via psql") + } +} + +func (env *TestEnvironment) waitForRisingWave(timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + env.t.Logf(">>> Waiting for RisingWave to be ready (timeout %v)...\n", timeout) + for time.Now().Before(deadline) { + if output, err := runPostgresClientSQL(env.postgresSidecar, "SELECT 1;"); err == nil { + env.t.Logf(">>> RisingWave is ready.\n") + return true + } else { + env.t.Logf(">>> RisingWave not ready yet: %v (Output: %s)\n", err, string(output)) + } + time.Sleep(5 * time.Second) + } + return false +} + +func runPostgresClientSQL(containerName, sql string) ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + cmd := exec.CommandContext(ctx, "docker", "exec", + containerName, + "psql", + "-h", "127.0.0.1", + "-p", "4566", + "-U", "root", + "-d", "dev", + "-v", "ON_ERROR_STOP=1", + "-c", sql, + ) + return cmd.CombinedOutput() +} + +func (env *TestEnvironment) Cleanup(t *testing.T) { + t.Helper() + + if env.risingwaveContainer != "" { + if t.Failed() { + logs, err := exec.Command("docker", "logs", env.risingwaveContainer).CombinedOutput() + if err == nil { + env.t.Logf(">>> RisingWave Logs:\n%s\n", string(logs)) + } else { + env.t.Logf(">>> Failed to get RisingWave logs: %v\n", err) + } + } + _ = exec.Command("docker", "rm", "-f", env.risingwaveContainer).Run() + } + + if env.postgresSidecar != "" { + _ = exec.Command("docker", "rm", "-f", env.postgresSidecar).Run() + } + + if env.seaweedfsDataDir != "" && t.Failed() { + logPath := filepath.Join(env.seaweedfsDataDir, "seaweedfs.log") + if content, err := os.ReadFile(logPath); err == nil { + env.t.Logf(">>> SeaweedFS Logs:\n%s\n", string(content)) + } + env.t.Logf(">>> Filer Contents:\n") + listFilerContents(t, env, "/") + } + + if env.masterProcess != nil && env.masterProcess.Process != nil { + _ = env.masterProcess.Process.Kill() + _ = env.masterProcess.Wait() + } + clearMiniProcess(env.masterProcess) + + if env.seaweedfsDataDir != "" { + if env.logFile != nil { + env.logFile.Close() + } + _ = os.RemoveAll(env.seaweedfsDataDir) + } +} + +func runRisingWaveSQL(t *testing.T, containerName, sql string) string { + t.Helper() + + output, err := runPostgresClientSQL(containerName, sql) + if err != nil { + t.Fatalf("RisingWave command failed: %v\nSQL: %s\nOutput:\n%s", err, sql, string(output)) + } + return string(output) +} + +func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cmd := exec.CommandContext(ctx, "weed", "shell", + fmt.Sprintf("-master=%s", env.hostMasterAddress()), + ) + cmd.Stdin = strings.NewReader(fmt.Sprintf("s3tables.bucket -create -name %s -account 000000000000\nexit\n", bucketName)) + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("failed to create table bucket %s via weed shell: %v\nOutput: %s", bucketName, err, string(output)) + } +} + +func doIcebergSignedJSONRequest(env *TestEnvironment, method, path string, payload any) (int, string, error) { + url := env.hostIcebergEndpoint() + path + + var body io.Reader + var payloadHash string + + if payload != nil { + data, err := json.Marshal(payload) + if err != nil { + return 0, "", err + } + body = bytes.NewReader(data) + // hash := sha256.Sum256(data) + // payloadHash = hex.EncodeToString(hash[:]) + payloadHash = "UNSIGNED-PAYLOAD" + } else { + payloadHash = "UNSIGNED-PAYLOAD" + } + + req, err := http.NewRequest(method, url, body) + if err != nil { + return 0, "", fmt.Errorf("failed to create request: %w", err) + } + + if payload != nil { + req.Header.Set("Content-Type", "application/json") + } + req.Header.Set("X-Amz-Content-Sha256", payloadHash) + + // Sign the request + credsProvider := credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "") + creds, err := credsProvider.Retrieve(context.Background()) + if err != nil { + return 0, "", fmt.Errorf("failed to retrieve credentials: %w", err) + } + signer := v4.NewSigner() + + if err := signer.SignHTTP(context.Background(), creds, req, payloadHash, "s3", "us-east-1", time.Now()); err != nil { + return 0, "", fmt.Errorf("failed to sign request: %w", err) + } + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return 0, "", fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return 0, "", fmt.Errorf("failed to read response body: %w", err) + } + + return resp.StatusCode, string(respBody), nil +} + +func createIcebergNamespace(t *testing.T, env *TestEnvironment, namespace string) { + t.Helper() + + status, raw, err := doIcebergSignedJSONRequest(env, "POST", "/v1/namespaces", map[string]any{ + "namespace": []string{namespace}, + }) + if err != nil { + t.Fatalf("failed to create Iceberg namespace %s: %v", namespace, err) + } + if status != 200 && status != 409 { + t.Fatalf("failed to create Iceberg namespace %s: status %d body: %s", namespace, status, raw) + } +} + +func createIcebergTable(t *testing.T, env *TestEnvironment, bucketName, namespace, tableName string) { + t.Helper() + + createPath := fmt.Sprintf("/v1/namespaces/%s/tables", namespace) + status, raw, err := doIcebergSignedJSONRequest(env, "POST", createPath, map[string]any{ + "name": tableName, + "location": fmt.Sprintf("s3://%s/%s/%s", bucketName, namespace, tableName), + "schema": map[string]any{ + "type": "struct", + "fields": []map[string]any{ + {"id": 1, "name": "id", "required": false, "type": "int"}, + {"id": 2, "name": "name", "required": false, "type": "string"}, + }, + }, + }) + if err != nil { + t.Fatalf("failed to create Iceberg table %s.%s in bucket %s: %v", namespace, tableName, bucketName, err) + } + if status != 200 && status != 409 { + t.Fatalf("failed to create Iceberg table %s.%s in bucket %s: status %d body: %s", namespace, tableName, bucketName, status, raw) + } +} + +func listFilerContents(t *testing.T, env *TestEnvironment, path string) { + t.Helper() + + cmd := exec.Command("weed", "shell", + fmt.Sprintf("-master=%s", env.hostMasterAddress()), + ) + cmd.Stdin = strings.NewReader(fmt.Sprintf("fs.ls -R %s\nexit\n", path)) + output, err := cmd.CombinedOutput() + if err != nil { + env.t.Logf(">>> Warning: failed to list filer contents: %v\nOutput: %s\n", err, string(output)) + } else { + env.t.Logf("%s\n", string(output)) + } +} + +func randomString(n int) string { + const letters = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, n) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +}