Browse Source

Add RisingWave catalog tests (#8308)

* Add RisingWave catalog tests for S3 tables

* Add RisingWave catalog integration tests to CI workflow

* Refactor RisingWave catalog tests based on PR feedback

* Address PR feedback: optimize checks, cleanup logs

* fix tests

* consistent
pull/8323/head
Chris Lu 1 week ago
committed by GitHub
parent
commit
b8ef48c8f1
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 68
      .github/workflows/s3-tables-tests.yml
  2. 79
      test/s3tables/catalog_risingwave/risingwave_catalog_test.go
  3. 481
      test/s3tables/catalog_risingwave/setup_test.go

68
.github/workflows/s3-tables-tests.yml

@ -260,6 +260,74 @@ jobs:
path: test/s3tables/catalog_spark/test-output.log
retention-days: 3
risingwave-catalog-tests:
name: RisingWave Catalog Integration Tests
runs-on: ubuntu-22.04
timeout-minutes: 30
steps:
- name: Check out code
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version-file: 'go.mod'
id: go
- name: Set up Docker
uses: docker/setup-buildx-action@v3
- name: Pre-pull RisingWave image
run: |
docker pull risingwavelabs/risingwave:v2.5.0
docker pull postgres:16-alpine
- name: Run go mod tidy
run: go mod tidy
- name: Install SeaweedFS
run: |
go install -buildvcs=false ./weed
- name: Run RisingWave Catalog Integration Tests
timeout-minutes: 25
working-directory: test/s3tables/catalog_risingwave
run: |
set -x
set -o pipefail
echo "=== System Information ==="
uname -a
free -h
df -h
echo "=== Starting RisingWave Catalog Tests ==="
# Run RisingWave catalog integration tests
go test -v -timeout 20m . 2>&1 | tee test-output.log || {
echo "RisingWave catalog integration tests failed"
exit 1
}
- name: Show test output on failure
if: failure()
working-directory: test/s3tables/catalog_risingwave
run: |
echo "=== Test Output ==="
if [ -f test-output.log ]; then
tail -200 test-output.log
fi
echo "=== Process information ==="
ps aux | grep -E "(weed|test|docker)" || true
- name: Upload test logs on failure
if: failure()
uses: actions/upload-artifact@v6
with:
name: risingwave-catalog-test-logs
path: test/s3tables/catalog_risingwave/test-output.log
retention-days: 3
s3-tables-build-verification:
name: S3 Tables Build Verification
runs-on: ubuntu-22.04

79
test/s3tables/catalog_risingwave/risingwave_catalog_test.go

@ -0,0 +1,79 @@
package catalog_risingwave
import (
"fmt"
"strings"
"testing"
)
func TestRisingWaveIcebergCatalog(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
if !env.dockerAvailable {
t.Skip("Docker not available, skipping RisingWave integration test")
}
t.Log(">>> Starting SeaweedFS...")
env.StartSeaweedFS(t)
t.Log(">>> SeaweedFS started.")
tableBucket := "iceberg-tables"
t.Logf(">>> Creating table bucket: %s", tableBucket)
createTableBucket(t, env, tableBucket)
t.Log(">>> Starting RisingWave...")
env.StartRisingWave(t)
t.Log(">>> RisingWave started.")
// Create Iceberg namespace
createIcebergNamespace(t, env, "default")
// Create a catalog in RisingWave that points to SeaweedFS Iceberg REST API
icebergUri := env.dockerIcebergEndpoint()
s3Endpoint := env.dockerS3Endpoint()
tableName := "test_table_" + randomString(6)
createIcebergTable(t, env, tableBucket, "default", tableName)
sourceName := "test_source_" + randomString(6)
createSourceSql := fmt.Sprintf(`
CREATE SOURCE %s WITH (
connector = 'iceberg',
catalog.type = 'rest',
catalog.uri = '%s',
catalog.name = 'default',
database.name = 'default',
table.name = '%s',
warehouse.path = 's3://%s',
s3.endpoint = '%s',
s3.region = 'us-east-1',
s3.access.key = '%s',
s3.secret.key = '%s',
s3.path.style.access = 'true',
catalog.rest.sigv4_enabled = 'true',
catalog.rest.signing_region = 'us-east-1',
catalog.rest.signing_name = 's3'
);`, sourceName, icebergUri, tableName, tableBucket, s3Endpoint, env.accessKey, env.secretKey)
t.Logf(">>> Creating source %s...", sourceName)
runRisingWaveSQL(t, env.postgresSidecar, createSourceSql)
showSourcesOutput := runRisingWaveSQL(t, env.postgresSidecar, "SHOW SOURCES;")
if !strings.Contains(showSourcesOutput, sourceName) {
t.Fatalf("Expected source %s in SHOW SOURCES output:\n%s", sourceName, showSourcesOutput)
}
describeOutput := runRisingWaveSQL(t, env.postgresSidecar, fmt.Sprintf("DESCRIBE %s;", sourceName))
if !strings.Contains(describeOutput, "id") || !strings.Contains(describeOutput, "name") {
t.Fatalf("Expected id/name columns in DESCRIBE output:\n%s", describeOutput)
}
runRisingWaveSQL(t, env.postgresSidecar, fmt.Sprintf("SELECT * FROM %s LIMIT 0;", sourceName))
t.Log(">>> RisingWave Iceberg Catalog test passed!")
}

481
test/s3tables/catalog_risingwave/setup_test.go

@ -0,0 +1,481 @@
package catalog_risingwave
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"testing"
"time"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/seaweedfs/seaweedfs/test/s3tables/testutil"
)
var (
miniProcessMu sync.Mutex
lastMiniProcess *exec.Cmd
)
func stopPreviousMini() {
miniProcessMu.Lock()
defer miniProcessMu.Unlock()
if lastMiniProcess != nil && lastMiniProcess.Process != nil {
_ = lastMiniProcess.Process.Kill()
_ = lastMiniProcess.Wait()
}
lastMiniProcess = nil
}
func registerMiniProcess(cmd *exec.Cmd) {
miniProcessMu.Lock()
lastMiniProcess = cmd
miniProcessMu.Unlock()
}
func clearMiniProcess(cmd *exec.Cmd) {
miniProcessMu.Lock()
if lastMiniProcess == cmd {
lastMiniProcess = nil
}
miniProcessMu.Unlock()
}
type TestEnvironment struct {
t *testing.T
dockerAvailable bool
seaweedfsDataDir string
masterPort int
filerPort int
s3Port int
icebergRestPort int
risingwavePort int
bindIP string
accessKey string
secretKey string
risingwaveContainer string
postgresSidecar string
masterProcess *exec.Cmd
logFile *os.File
}
func NewTestEnvironment(t *testing.T) *TestEnvironment {
env := &TestEnvironment{
t: t,
accessKey: "AKIAIOSFODNN7EXAMPLE",
secretKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
}
// Check if Docker is available
cmd := exec.Command("docker", "version")
env.dockerAvailable = cmd.Run() == nil
return env
}
func (env *TestEnvironment) hostMasterAddress() string {
return fmt.Sprintf("127.0.0.1:%d", env.masterPort)
}
func (env *TestEnvironment) hostS3Endpoint() string {
return fmt.Sprintf("http://127.0.0.1:%d", env.s3Port)
}
func (env *TestEnvironment) hostIcebergEndpoint() string {
return fmt.Sprintf("http://127.0.0.1:%d", env.icebergRestPort)
}
func (env *TestEnvironment) dockerS3Endpoint() string {
return fmt.Sprintf("http://host.docker.internal:%d", env.s3Port)
}
func (env *TestEnvironment) dockerIcebergEndpoint() string {
return fmt.Sprintf("http://host.docker.internal:%d", env.icebergRestPort)
}
func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
t.Helper()
stopPreviousMini()
var err error
env.seaweedfsDataDir, err = os.MkdirTemp("", "seaweed-risingwave-test-")
if err != nil {
t.Fatalf("failed to create temp directory: %v", err)
}
env.masterPort = mustFreePort(t, "Master")
env.filerPort = mustFreePort(t, "Filer")
env.s3Port = mustFreePort(t, "S3")
env.icebergRestPort = mustFreePort(t, "Iceberg")
env.risingwavePort = mustFreePort(t, "RisingWave")
env.bindIP = testutil.FindBindIP()
iamConfigPath, err := testutil.WriteIAMConfig(env.seaweedfsDataDir, env.accessKey, env.secretKey)
if err != nil {
t.Fatalf("failed to create IAM config: %v", err)
}
// Create log file for SeaweedFS
logFile, err := os.Create(filepath.Join(env.seaweedfsDataDir, "seaweedfs.log"))
if err != nil {
t.Fatalf("failed to create log file: %v", err)
}
env.logFile = logFile
// Start SeaweedFS using weed mini (all-in-one including Iceberg REST)
env.masterProcess = exec.Command(
"weed", "mini",
"-ip", env.bindIP,
"-ip.bind", "0.0.0.0",
"-master.port", fmt.Sprintf("%d", env.masterPort),
"-filer.port", fmt.Sprintf("%d", env.filerPort),
"-s3.port", fmt.Sprintf("%d", env.s3Port),
"-s3.port.iceberg", fmt.Sprintf("%d", env.icebergRestPort),
"-s3.config", iamConfigPath,
"-dir", env.seaweedfsDataDir,
)
env.masterProcess.Stdout = logFile
env.masterProcess.Stderr = logFile
env.masterProcess.Env = append(os.Environ(),
"AWS_ACCESS_KEY_ID="+env.accessKey,
"AWS_SECRET_ACCESS_KEY="+env.secretKey,
"ICEBERG_WAREHOUSE=s3://iceberg-tables",
"S3TABLES_DEFAULT_BUCKET=iceberg-tables",
)
if err := env.masterProcess.Start(); err != nil {
t.Fatalf("failed to start weed mini: %v", err)
}
registerMiniProcess(env.masterProcess)
// Wait for all services to be ready
if !waitForPort(env.masterPort, 15*time.Second) {
t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort)
}
if !waitForPort(env.filerPort, 15*time.Second) {
t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort)
}
if !waitForPort(env.s3Port, 15*time.Second) {
t.Fatalf("weed mini failed to start - s3 port %d not listening", env.s3Port)
}
if !waitForPort(env.icebergRestPort, 15*time.Second) {
t.Fatalf("weed mini failed to start - iceberg rest port %d not listening", env.icebergRestPort)
}
}
func mustFreePort(t *testing.T, name string) int {
t.Helper()
minPort := 10000
maxPort := 55000 // Ensure port+10000 < 65535
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 1000; i++ {
port := minPort + r.Intn(maxPort-minPort)
// Check http port
ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
if err != nil {
continue
}
ln.Close()
// Check grpc port (weed mini uses port+10000)
ln2, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port+10000))
if err != nil {
continue
}
ln2.Close()
return port
}
t.Fatalf("failed to find a free port < %d for %s after 1000 attempts", maxPort, name)
return 0
}
func waitForPort(port int, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 500*time.Millisecond)
if err == nil {
conn.Close()
return true
}
time.Sleep(100 * time.Millisecond)
}
return false
}
func (env *TestEnvironment) StartRisingWave(t *testing.T) {
t.Helper()
containerName := "seaweed-risingwave-" + randomString(8)
env.risingwaveContainer = containerName
cmd := exec.Command("docker", "run", "-d",
"--name", containerName,
"-p", fmt.Sprintf("%d:4566", env.risingwavePort),
"--add-host", "host.docker.internal:host-gateway",
"-e", "AWS_ACCESS_KEY_ID="+env.accessKey,
"-e", "AWS_SECRET_ACCESS_KEY="+env.secretKey,
"-e", "AWS_REGION=us-east-1",
"-e", "AWS_S3_PATH_STYLE_ACCESS=true",
"-e", "AWS_S3_FORCE_PATH_STYLE=true",
"risingwavelabs/risingwave:v2.5.0",
"playground",
)
if output, err := cmd.CombinedOutput(); err != nil {
t.Fatalf("failed to start RisingWave container: %v\n%s", err, string(output))
}
// Start a sidecar postgres container for running psql commands
sidecarName := "seaweed-risingwave-sidecar-" + randomString(8)
env.postgresSidecar = sidecarName
sidecarCmd := exec.Command("docker", "run", "-d", "--rm",
"--name", sidecarName,
"--network", fmt.Sprintf("container:%s", containerName),
"postgres:16-alpine",
"sleep", "infinity",
)
if output, err := sidecarCmd.CombinedOutput(); err != nil {
t.Fatalf("failed to start postgres sidecar: %v\n%s", err, string(output))
}
// Wait for RisingWave port to be open on host
if !waitForPort(env.risingwavePort, 120*time.Second) {
t.Fatalf("timed out waiting for RisingWave port %d to be open", env.risingwavePort)
}
// Wait for RisingWave to be truly ready via psql in the sidecar.
if !env.waitForRisingWave(120 * time.Second) {
t.Fatalf("timed out waiting for RisingWave to be ready via psql")
}
}
func (env *TestEnvironment) waitForRisingWave(timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
env.t.Logf(">>> Waiting for RisingWave to be ready (timeout %v)...\n", timeout)
for time.Now().Before(deadline) {
if output, err := runPostgresClientSQL(env.postgresSidecar, "SELECT 1;"); err == nil {
env.t.Logf(">>> RisingWave is ready.\n")
return true
} else {
env.t.Logf(">>> RisingWave not ready yet: %v (Output: %s)\n", err, string(output))
}
time.Sleep(5 * time.Second)
}
return false
}
func runPostgresClientSQL(containerName, sql string) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "docker", "exec",
containerName,
"psql",
"-h", "127.0.0.1",
"-p", "4566",
"-U", "root",
"-d", "dev",
"-v", "ON_ERROR_STOP=1",
"-c", sql,
)
return cmd.CombinedOutput()
}
func (env *TestEnvironment) Cleanup(t *testing.T) {
t.Helper()
if env.risingwaveContainer != "" {
if t.Failed() {
logs, err := exec.Command("docker", "logs", env.risingwaveContainer).CombinedOutput()
if err == nil {
env.t.Logf(">>> RisingWave Logs:\n%s\n", string(logs))
} else {
env.t.Logf(">>> Failed to get RisingWave logs: %v\n", err)
}
}
_ = exec.Command("docker", "rm", "-f", env.risingwaveContainer).Run()
}
if env.postgresSidecar != "" {
_ = exec.Command("docker", "rm", "-f", env.postgresSidecar).Run()
}
if env.seaweedfsDataDir != "" && t.Failed() {
logPath := filepath.Join(env.seaweedfsDataDir, "seaweedfs.log")
if content, err := os.ReadFile(logPath); err == nil {
env.t.Logf(">>> SeaweedFS Logs:\n%s\n", string(content))
}
env.t.Logf(">>> Filer Contents:\n")
listFilerContents(t, env, "/")
}
if env.masterProcess != nil && env.masterProcess.Process != nil {
_ = env.masterProcess.Process.Kill()
_ = env.masterProcess.Wait()
}
clearMiniProcess(env.masterProcess)
if env.seaweedfsDataDir != "" {
if env.logFile != nil {
env.logFile.Close()
}
_ = os.RemoveAll(env.seaweedfsDataDir)
}
}
func runRisingWaveSQL(t *testing.T, containerName, sql string) string {
t.Helper()
output, err := runPostgresClientSQL(containerName, sql)
if err != nil {
t.Fatalf("RisingWave command failed: %v\nSQL: %s\nOutput:\n%s", err, sql, string(output))
}
return string(output)
}
func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "weed", "shell",
fmt.Sprintf("-master=%s", env.hostMasterAddress()),
)
cmd.Stdin = strings.NewReader(fmt.Sprintf("s3tables.bucket -create -name %s -account 000000000000\nexit\n", bucketName))
output, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("failed to create table bucket %s via weed shell: %v\nOutput: %s", bucketName, err, string(output))
}
}
func doIcebergSignedJSONRequest(env *TestEnvironment, method, path string, payload any) (int, string, error) {
url := env.hostIcebergEndpoint() + path
var body io.Reader
var payloadHash string
if payload != nil {
data, err := json.Marshal(payload)
if err != nil {
return 0, "", err
}
body = bytes.NewReader(data)
// hash := sha256.Sum256(data)
// payloadHash = hex.EncodeToString(hash[:])
payloadHash = "UNSIGNED-PAYLOAD"
} else {
payloadHash = "UNSIGNED-PAYLOAD"
}
req, err := http.NewRequest(method, url, body)
if err != nil {
return 0, "", fmt.Errorf("failed to create request: %w", err)
}
if payload != nil {
req.Header.Set("Content-Type", "application/json")
}
req.Header.Set("X-Amz-Content-Sha256", payloadHash)
// Sign the request
credsProvider := credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "")
creds, err := credsProvider.Retrieve(context.Background())
if err != nil {
return 0, "", fmt.Errorf("failed to retrieve credentials: %w", err)
}
signer := v4.NewSigner()
if err := signer.SignHTTP(context.Background(), creds, req, payloadHash, "s3", "us-east-1", time.Now()); err != nil {
return 0, "", fmt.Errorf("failed to sign request: %w", err)
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return 0, "", fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return 0, "", fmt.Errorf("failed to read response body: %w", err)
}
return resp.StatusCode, string(respBody), nil
}
func createIcebergNamespace(t *testing.T, env *TestEnvironment, namespace string) {
t.Helper()
status, raw, err := doIcebergSignedJSONRequest(env, "POST", "/v1/namespaces", map[string]any{
"namespace": []string{namespace},
})
if err != nil {
t.Fatalf("failed to create Iceberg namespace %s: %v", namespace, err)
}
if status != 200 && status != 409 {
t.Fatalf("failed to create Iceberg namespace %s: status %d body: %s", namespace, status, raw)
}
}
func createIcebergTable(t *testing.T, env *TestEnvironment, bucketName, namespace, tableName string) {
t.Helper()
createPath := fmt.Sprintf("/v1/namespaces/%s/tables", namespace)
status, raw, err := doIcebergSignedJSONRequest(env, "POST", createPath, map[string]any{
"name": tableName,
"location": fmt.Sprintf("s3://%s/%s/%s", bucketName, namespace, tableName),
"schema": map[string]any{
"type": "struct",
"fields": []map[string]any{
{"id": 1, "name": "id", "required": false, "type": "int"},
{"id": 2, "name": "name", "required": false, "type": "string"},
},
},
})
if err != nil {
t.Fatalf("failed to create Iceberg table %s.%s in bucket %s: %v", namespace, tableName, bucketName, err)
}
if status != 200 && status != 409 {
t.Fatalf("failed to create Iceberg table %s.%s in bucket %s: status %d body: %s", namespace, tableName, bucketName, status, raw)
}
}
func listFilerContents(t *testing.T, env *TestEnvironment, path string) {
t.Helper()
cmd := exec.Command("weed", "shell",
fmt.Sprintf("-master=%s", env.hostMasterAddress()),
)
cmd.Stdin = strings.NewReader(fmt.Sprintf("fs.ls -R %s\nexit\n", path))
output, err := cmd.CombinedOutput()
if err != nil {
env.t.Logf(">>> Warning: failed to list filer contents: %v\nOutput: %s\n", err, string(output))
} else {
env.t.Logf("%s\n", string(output))
}
}
func randomString(n int) string {
const letters = "abcdefghijklmnopqrstuvwxyz0123456789"
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
Loading…
Cancel
Save