Browse Source

ci: refactor Spark workflow for DRY and robustness

1. Add explicit permissions (least privilege):
   - contents: read
   - checks: write (for test reports)
   - pull-requests: write (for PR comments)

2. Extract duplicate build steps into shared 'build-deps' job:
   - Eliminates duplication between spark-tests and spark-example
   - Build artifacts are uploaded and reused by dependent jobs
   - Reduces CI time and ensures consistency

3. Fix spark-example service startup verification:
   - Match robust approach from spark-tests job
   - Add explicit timeout and failure handling
   - Verify all services (master, volume, filer)
   - Include diagnostic logging on failure
   - Prevents silent failures and obscure errors

These changes improve maintainability, security, and reliability
of the Spark integration test workflow.
pull/7526/head
chrislu 7 days ago
parent
commit
786f5de7bb
  1. 100
      .github/workflows/spark-integration-tests.yml
  2. 109
      test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java

100
.github/workflows/spark-integration-tests.yml

@ -19,11 +19,16 @@ on:
- '.github/workflows/spark-integration-tests.yml'
workflow_dispatch:
permissions:
contents: read
checks: write
pull-requests: write
jobs:
spark-tests:
name: Spark Integration Tests
build-deps:
name: Build SeaweedFS Dependencies
runs-on: ubuntu-latest
timeout-minutes: 30
timeout-minutes: 15
steps:
- name: Checkout code
@ -70,6 +75,37 @@ jobs:
mvn clean install -DskipTests -Dgpg.skip=true
echo "✓ HDFS3 client built"
- name: Upload build artifacts
uses: actions/upload-artifact@v4
with:
name: seaweedfs-build
path: |
docker/weed
~/.m2/repository/com/seaweedfs
retention-days: 1
spark-tests:
name: Spark Integration Tests
runs-on: ubuntu-latest
needs: build-deps
timeout-minutes: 30
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '11'
distribution: 'temurin'
- name: Download build artifacts
uses: actions/download-artifact@v4
with:
name: seaweedfs-build
path: .
- name: Start SeaweedFS services
working-directory: test/java/spark
run: |
@ -157,7 +193,7 @@ jobs:
spark-example:
name: Run Spark Example Application
runs-on: ubuntu-latest
needs: spark-tests
needs: [build-deps, spark-tests]
timeout-minutes: 20
if: github.event_name == 'push' || github.event_name == 'workflow_dispatch'
@ -170,38 +206,12 @@ jobs:
with:
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Set up Go
uses: actions/setup-go@v5
- name: Download build artifacts
uses: actions/download-artifact@v4
with:
go-version: '1.24'
- name: Build SeaweedFS binary
run: |
echo "Building SeaweedFS binary..."
cd weed
go build -o ../docker/weed
cd ../docker
ls -la weed filer.toml entrypoint.sh
echo "✓ SeaweedFS binary built and ready for Docker build"
- name: Build SeaweedFS Java dependencies
run: |
echo "Building Java client..."
cd other/java/client
mvn clean install -DskipTests -Dgpg.skip=true
cd ../../..
echo "Building HDFS2 client..."
cd other/java/hdfs2
mvn clean install -DskipTests -Dgpg.skip=true
cd ../../..
echo "Building HDFS3 client..."
cd other/java/hdfs3
mvn clean install -DskipTests -Dgpg.skip=true
echo "✓ All Java dependencies built"
name: seaweedfs-build
path: .
- name: Cache Apache Spark
id: cache-spark
@ -226,17 +236,29 @@ jobs:
- name: Start SeaweedFS services
working-directory: test/java/spark
run: |
docker compose up -d
sleep 10
echo "Starting SeaweedFS with Docker Compose..."
docker compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer
# Wait for filer to be ready
for i in {1..20}; do
echo "Waiting for SeaweedFS filer to be ready..."
for i in {1..30}; do
if curl -f http://localhost:8888/ > /dev/null 2>&1; then
echo "✓ SeaweedFS is ready"
echo "✓ SeaweedFS filer is ready!"
break
fi
if [ $i -eq 30 ]; then
echo "✗ SeaweedFS failed to start after 60 seconds"
docker compose logs
exit 1
fi
echo "Waiting... ($i/30)"
sleep 2
done
# Verify all services
echo "Verifying SeaweedFS services..."
curl -f http://localhost:9333/cluster/status || exit 1
curl -f http://localhost:8888/ || exit 1
echo "✓ All SeaweedFS services are healthy"
- name: Build project
working-directory: test/java/spark

109
test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java

@ -21,10 +21,9 @@ public class SparkReadWriteTest extends SparkTestBase {
// Create test data
List<Person> people = Arrays.asList(
new Person("Alice", 30),
new Person("Bob", 25),
new Person("Charlie", 35)
);
new Person("Alice", 30),
new Person("Bob", 25),
new Person("Charlie", 35));
Dataset<Row> df = spark.createDataFrame(people, Person.class);
@ -38,11 +37,11 @@ public class SparkReadWriteTest extends SparkTestBase {
// Verify
assertEquals(3, readDf.count());
assertEquals(2, readDf.columns().length);
List<Row> results = readDf.collectAsList();
assertTrue(results.stream().anyMatch(r -> "Alice".equals(r.getAs("name")) && (Integer)r.getAs("age") == 30));
assertTrue(results.stream().anyMatch(r -> "Bob".equals(r.getAs("name")) && (Integer)r.getAs("age") == 25));
assertTrue(results.stream().anyMatch(r -> "Charlie".equals(r.getAs("name")) && (Integer)r.getAs("age") == 35));
assertTrue(results.stream().anyMatch(r -> "Alice".equals(r.getAs("name")) && (Integer) r.getAs("age") == 30));
assertTrue(results.stream().anyMatch(r -> "Bob".equals(r.getAs("name")) && (Integer) r.getAs("age") == 25));
assertTrue(results.stream().anyMatch(r -> "Charlie".equals(r.getAs("name")) && (Integer) r.getAs("age") == 35));
}
@Test
@ -51,9 +50,8 @@ public class SparkReadWriteTest extends SparkTestBase {
// Create test data
List<Person> people = Arrays.asList(
new Person("Alice", 30),
new Person("Bob", 25)
);
new Person("Alice", 30),
new Person("Bob", 25));
Dataset<Row> df = spark.createDataFrame(people, Person.class);
@ -75,10 +73,9 @@ public class SparkReadWriteTest extends SparkTestBase {
// Create test data
List<Person> people = Arrays.asList(
new Person("Alice", 30),
new Person("Bob", 25),
new Person("Charlie", 35)
);
new Person("Alice", 30),
new Person("Bob", 25),
new Person("Charlie", 35));
Dataset<Row> df = spark.createDataFrame(people, Person.class);
@ -100,11 +97,10 @@ public class SparkReadWriteTest extends SparkTestBase {
// Create test data with multiple years
List<PersonWithYear> people = Arrays.asList(
new PersonWithYear("Alice", 30, 2020),
new PersonWithYear("Bob", 25, 2021),
new PersonWithYear("Charlie", 35, 2020),
new PersonWithYear("David", 28, 2021)
);
new PersonWithYear("Alice", 30, 2020),
new PersonWithYear("Bob", 25, 2021),
new PersonWithYear("Charlie", 35, 2020),
new PersonWithYear("David", 28, 2021));
Dataset<Row> df = spark.createDataFrame(people, PersonWithYear.class);
@ -117,11 +113,11 @@ public class SparkReadWriteTest extends SparkTestBase {
// Verify
assertEquals(4, readDf.count());
// Verify partition filtering works
Dataset<Row> filtered2020 = readDf.filter("year = 2020");
assertEquals(2, filtered2020.count());
Dataset<Row> filtered2021 = readDf.filter("year = 2021");
assertEquals(2, filtered2021.count());
}
@ -134,17 +130,15 @@ public class SparkReadWriteTest extends SparkTestBase {
// Write first batch
List<Person> batch1 = Arrays.asList(
new Person("Alice", 30),
new Person("Bob", 25)
);
new Person("Alice", 30),
new Person("Bob", 25));
Dataset<Row> df1 = spark.createDataFrame(batch1, Person.class);
df1.write().mode(SaveMode.Overwrite).parquet(outputPath);
// Append second batch
List<Person> batch2 = Arrays.asList(
new Person("Charlie", 35),
new Person("David", 28)
);
new Person("Charlie", 35),
new Person("David", 28));
Dataset<Row> df2 = spark.createDataFrame(batch2, Person.class);
df2.write().mode(SaveMode.Append).parquet(outputPath);
@ -159,7 +153,7 @@ public class SparkReadWriteTest extends SparkTestBase {
// Create a larger dataset
Dataset<Row> largeDf = spark.range(0, 10000)
.selectExpr("id as value", "id * 2 as doubled");
.selectExpr("id as value", "id * 2 as doubled");
String outputPath = getTestPath("large_dataset.parquet");
largeDf.write().mode(SaveMode.Overwrite).parquet(outputPath);
@ -167,7 +161,7 @@ public class SparkReadWriteTest extends SparkTestBase {
// Read back and verify
Dataset<Row> readDf = spark.read().parquet(outputPath);
assertEquals(10000, readDf.count());
// Verify some data (sort to ensure deterministic order)
Row firstRow = readDf.orderBy("value").first();
assertEquals(0L, firstRow.getLong(0));
@ -179,17 +173,29 @@ public class SparkReadWriteTest extends SparkTestBase {
private String name;
private int age;
public Person() {}
public Person() {
}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
public static class PersonWithYear implements java.io.Serializable {
@ -197,7 +203,8 @@ public class SparkReadWriteTest extends SparkTestBase {
private int age;
private int year;
public PersonWithYear() {}
public PersonWithYear() {
}
public PersonWithYear(String name, int age, int year) {
this.name = name;
@ -205,12 +212,28 @@ public class SparkReadWriteTest extends SparkTestBase {
this.year = year;
}
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
public int getYear() { return year; }
public void setYear(int year) { this.year = year; }
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
}
}
Loading…
Cancel
Save