From 0082c47e049b5de2e44e9d22eda100fa524360ef Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 12 Feb 2026 14:24:04 -0800 Subject: [PATCH] Test: Add RisingWave DML verification test (#8317) * Test: Verify RisingWave DML operations (INSERT, UPDATE, DELETE) support * Test: Refine RisingWave DML test (remove sleeps, use polling) --- .../catalog_risingwave/risingwave_dml_test.go | 219 ++++++++++++++++++ 1 file changed, 219 insertions(+) create mode 100644 test/s3tables/catalog_risingwave/risingwave_dml_test.go diff --git a/test/s3tables/catalog_risingwave/risingwave_dml_test.go b/test/s3tables/catalog_risingwave/risingwave_dml_test.go new file mode 100644 index 000000000..bf1566075 --- /dev/null +++ b/test/s3tables/catalog_risingwave/risingwave_dml_test.go @@ -0,0 +1,219 @@ +package catalog_risingwave + +import ( + "fmt" + "strings" + "testing" + "time" +) + +func TestRisingWaveIcebergDML(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + if !env.dockerAvailable { + t.Skip("Docker not available, skipping RisingWave integration test") + } + + t.Log(">>> Starting SeaweedFS...") + env.StartSeaweedFS(t) + t.Log(">>> SeaweedFS started.") + + tableBucket := "iceberg-tables" + t.Logf(">>> Creating table bucket: %s", tableBucket) + createTableBucket(t, env, tableBucket) + + t.Log(">>> Starting RisingWave...") + env.StartRisingWave(t) + t.Log(">>> RisingWave started.") + + // Create Iceberg namespace + createIcebergNamespace(t, env, "default") + + icebergUri := env.dockerIcebergEndpoint() + s3Endpoint := env.dockerS3Endpoint() + + // 1. Test INSERT (Append-only) + t.Run("TestInsert", func(t *testing.T) { + tableName := "test_insert_" + randomString(6) + createIcebergTable(t, env, tableBucket, "default", tableName) + + rwTableName := "rw_insert_" + randomString(6) + runRisingWaveSQL(t, env.postgresSidecar, fmt.Sprintf("CREATE TABLE %s (id int, name varchar);", rwTableName)) + + sinkName := "test_sink_insert_" + randomString(6) + createSinkSql := fmt.Sprintf(` +CREATE SINK %s FROM %s +WITH ( + connector = 'iceberg', + catalog.type = 'rest', + catalog.uri = '%s', + catalog.name = 'default', + database.name = 'default', + table.name = '%s', + warehouse.path = 's3://%s', + s3.endpoint = '%s', + s3.region = 'us-east-1', + s3.access.key = '%s', + s3.secret.key = '%s', + s3.path.style.access = 'true', + catalog.rest.sigv4_enabled = 'true', + catalog.rest.signing_region = 'us-east-1', + catalog.rest.signing_name = 's3', + type = 'append-only', + force_append_only = 'true' +);`, sinkName, rwTableName, icebergUri, tableName, tableBucket, s3Endpoint, env.accessKey, env.secretKey) + + t.Logf(">>> Creating sink %s...", sinkName) + runRisingWaveSQL(t, env.postgresSidecar, createSinkSql) + + t.Log(">>> Inserting into RisingWave table...") + runRisingWaveSQL(t, env.postgresSidecar, fmt.Sprintf("INSERT INTO %s VALUES (1, 'Alice'), (2, 'Bob');", rwTableName)) + runRisingWaveSQL(t, env.postgresSidecar, "FLUSH;") + + // Verify with Source + sourceName := "test_source_insert_" + randomString(6) + createSourceSql := fmt.Sprintf(` +CREATE SOURCE %s WITH ( + connector = 'iceberg', + catalog.type = 'rest', + catalog.uri = '%s', + catalog.name = 'default', + database.name = 'default', + table.name = '%s', + warehouse.path = 's3://%s', + s3.endpoint = '%s', + s3.region = 'us-east-1', + s3.access.key = '%s', + s3.secret.key = '%s', + s3.path.style.access = 'true', + catalog.rest.sigv4_enabled = 'true', + catalog.rest.signing_region = 'us-east-1', + catalog.rest.signing_name = 's3' +);`, sourceName, icebergUri, tableName, tableBucket, s3Endpoint, env.accessKey, env.secretKey) + + runRisingWaveSQL(t, env.postgresSidecar, createSourceSql) + + t.Log(">>> Selecting from source to verify INSERT...") + verifyQuery(t, env, sourceName, "1 | Alice", "2 | Bob") + }) + + // 2. Test UPSERT (Update/Delete) + t.Run("TestUpsert", func(t *testing.T) { + tableName := "test_upsert_" + randomString(6) + // We need a table with PK for upsert to work effectively in RW logic, + // effectively maps to Iceberg v2 table. + createIcebergTable(t, env, tableBucket, "default", tableName) + + rwTableName := "rw_upsert_" + randomString(6) + runRisingWaveSQL(t, env.postgresSidecar, fmt.Sprintf("CREATE TABLE %s (id int PRIMARY KEY, name varchar);", rwTableName)) + + sinkName := "test_sink_upsert_" + randomString(6) + createSinkSql := fmt.Sprintf(` +CREATE SINK %s FROM %s +WITH ( + connector = 'iceberg', + catalog.type = 'rest', + catalog.uri = '%s', + catalog.name = 'default', + database.name = 'default', + table.name = '%s', + warehouse.path = 's3://%s', + s3.endpoint = '%s', + s3.region = 'us-east-1', + s3.access.key = '%s', + s3.secret.key = '%s', + s3.path.style.access = 'true', + catalog.rest.sigv4_enabled = 'true', + catalog.rest.signing_region = 'us-east-1', + catalog.rest.signing_name = 's3', + type = 'upsert', -- Upsert mode + primary_key = 'id' +);`, sinkName, rwTableName, icebergUri, tableName, tableBucket, s3Endpoint, env.accessKey, env.secretKey) + + t.Logf(">>> Creating upsert sink %s...", sinkName) + runRisingWaveSQL(t, env.postgresSidecar, createSinkSql) + + t.Log(">>> Inserting initial data...") + runRisingWaveSQL(t, env.postgresSidecar, fmt.Sprintf("INSERT INTO %s VALUES (1, 'Charlie'), (2, 'Dave');", rwTableName)) + runRisingWaveSQL(t, env.postgresSidecar, "FLUSH;") + + // Update 1, Delete 2 + t.Log(">>> Updating and Deleting data...") + runRisingWaveSQL(t, env.postgresSidecar, fmt.Sprintf("UPDATE %s SET name = 'Charles' WHERE id = 1;", rwTableName)) + runRisingWaveSQL(t, env.postgresSidecar, fmt.Sprintf("DELETE FROM %s WHERE id = 2;", rwTableName)) + runRisingWaveSQL(t, env.postgresSidecar, "FLUSH;") + + // Verify with Source + sourceName := "test_source_upsert_" + randomString(6) + createSourceSql := fmt.Sprintf(` +CREATE SOURCE %s WITH ( + connector = 'iceberg', + catalog.type = 'rest', + catalog.uri = '%s', + catalog.name = 'default', + database.name = 'default', + table.name = '%s', + warehouse.path = 's3://%s', + s3.endpoint = '%s', + s3.region = 'us-east-1', + s3.access.key = '%s', + s3.secret.key = '%s', + s3.path.style.access = 'true', + catalog.rest.sigv4_enabled = 'true', + catalog.rest.signing_region = 'us-east-1', + catalog.rest.signing_name = 's3' +);`, sourceName, icebergUri, tableName, tableBucket, s3Endpoint, env.accessKey, env.secretKey) + + runRisingWaveSQL(t, env.postgresSidecar, createSourceSql) + + t.Log(">>> Selecting from source to verify UPSERT...") + // Should see (1, 'Charles') and NOT (2, 'Dave') + verifyQuery(t, env, sourceName, "1 | Charles") + verifyQueryAbsence(t, env, sourceName, "2 | Dave") + }) +} + +func verifyQuery(t *testing.T, env *TestEnvironment, sourceName string, expectedSubstrings ...string) { + t.Helper() + var output string + for i := 0; i < 15; i++ { + output = runRisingWaveSQL(t, env.postgresSidecar, fmt.Sprintf("SELECT * FROM %s ORDER BY id;", sourceName)) + allFound := true + for _, s := range expectedSubstrings { + if !strings.Contains(output, s) { + allFound = false + break + } + } + if allFound { + return + } + time.Sleep(2 * time.Second) + } + t.Fatalf("Failed to find expected data %v in output:\n%s", expectedSubstrings, output) +} + +func verifyQueryAbsence(t *testing.T, env *TestEnvironment, sourceName string, unexpectedSubstrings ...string) { + t.Helper() + var output string + for i := 0; i < 15; i++ { + output = runRisingWaveSQL(t, env.postgresSidecar, fmt.Sprintf("SELECT * FROM %s ORDER BY id;", sourceName)) + noneFound := true + for _, s := range unexpectedSubstrings { + if strings.Contains(output, s) { + noneFound = false + break + } + } + if noneFound { + return + } + time.Sleep(2 * time.Second) + } + t.Fatalf("Found unexpected data %v in output:\n%s", unexpectedSubstrings, output) +}