diff --git a/.github/workflows/spark-integration-tests.yml b/.github/workflows/spark-integration-tests.yml index 54bf8ebb4..d036d546a 100644 --- a/.github/workflows/spark-integration-tests.yml +++ b/.github/workflows/spark-integration-tests.yml @@ -19,11 +19,16 @@ on: - '.github/workflows/spark-integration-tests.yml' workflow_dispatch: +permissions: + contents: read + checks: write + pull-requests: write + jobs: - spark-tests: - name: Spark Integration Tests + build-deps: + name: Build SeaweedFS Dependencies runs-on: ubuntu-latest - timeout-minutes: 30 + timeout-minutes: 15 steps: - name: Checkout code @@ -70,6 +75,37 @@ jobs: mvn clean install -DskipTests -Dgpg.skip=true echo "✓ HDFS3 client built" + - name: Upload build artifacts + uses: actions/upload-artifact@v4 + with: + name: seaweedfs-build + path: | + docker/weed + ~/.m2/repository/com/seaweedfs + retention-days: 1 + + spark-tests: + name: Spark Integration Tests + runs-on: ubuntu-latest + needs: build-deps + timeout-minutes: 30 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up JDK 11 + uses: actions/setup-java@v4 + with: + java-version: '11' + distribution: 'temurin' + + - name: Download build artifacts + uses: actions/download-artifact@v4 + with: + name: seaweedfs-build + path: . + - name: Start SeaweedFS services working-directory: test/java/spark run: | @@ -157,7 +193,7 @@ jobs: spark-example: name: Run Spark Example Application runs-on: ubuntu-latest - needs: spark-tests + needs: [build-deps, spark-tests] timeout-minutes: 20 if: github.event_name == 'push' || github.event_name == 'workflow_dispatch' @@ -170,38 +206,12 @@ jobs: with: java-version: '11' distribution: 'temurin' - cache: maven - - name: Set up Go - uses: actions/setup-go@v5 + - name: Download build artifacts + uses: actions/download-artifact@v4 with: - go-version: '1.24' - - - name: Build SeaweedFS binary - run: | - echo "Building SeaweedFS binary..." - cd weed - go build -o ../docker/weed - cd ../docker - ls -la weed filer.toml entrypoint.sh - echo "✓ SeaweedFS binary built and ready for Docker build" - - - name: Build SeaweedFS Java dependencies - run: | - echo "Building Java client..." - cd other/java/client - mvn clean install -DskipTests -Dgpg.skip=true - cd ../../.. - - echo "Building HDFS2 client..." - cd other/java/hdfs2 - mvn clean install -DskipTests -Dgpg.skip=true - cd ../../.. - - echo "Building HDFS3 client..." - cd other/java/hdfs3 - mvn clean install -DskipTests -Dgpg.skip=true - echo "✓ All Java dependencies built" + name: seaweedfs-build + path: . - name: Cache Apache Spark id: cache-spark @@ -226,17 +236,29 @@ jobs: - name: Start SeaweedFS services working-directory: test/java/spark run: | - docker compose up -d - sleep 10 + echo "Starting SeaweedFS with Docker Compose..." + docker compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer - # Wait for filer to be ready - for i in {1..20}; do + echo "Waiting for SeaweedFS filer to be ready..." + for i in {1..30}; do if curl -f http://localhost:8888/ > /dev/null 2>&1; then - echo "✓ SeaweedFS is ready" + echo "✓ SeaweedFS filer is ready!" break fi + if [ $i -eq 30 ]; then + echo "✗ SeaweedFS failed to start after 60 seconds" + docker compose logs + exit 1 + fi + echo "Waiting... ($i/30)" sleep 2 done + + # Verify all services + echo "Verifying SeaweedFS services..." + curl -f http://localhost:9333/cluster/status || exit 1 + curl -f http://localhost:8888/ || exit 1 + echo "✓ All SeaweedFS services are healthy" - name: Build project working-directory: test/java/spark diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java index e69b3038a..10ea1cd3a 100644 --- a/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java +++ b/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java @@ -21,10 +21,9 @@ public class SparkReadWriteTest extends SparkTestBase { // Create test data List people = Arrays.asList( - new Person("Alice", 30), - new Person("Bob", 25), - new Person("Charlie", 35) - ); + new Person("Alice", 30), + new Person("Bob", 25), + new Person("Charlie", 35)); Dataset df = spark.createDataFrame(people, Person.class); @@ -38,11 +37,11 @@ public class SparkReadWriteTest extends SparkTestBase { // Verify assertEquals(3, readDf.count()); assertEquals(2, readDf.columns().length); - + List results = readDf.collectAsList(); - assertTrue(results.stream().anyMatch(r -> "Alice".equals(r.getAs("name")) && (Integer)r.getAs("age") == 30)); - assertTrue(results.stream().anyMatch(r -> "Bob".equals(r.getAs("name")) && (Integer)r.getAs("age") == 25)); - assertTrue(results.stream().anyMatch(r -> "Charlie".equals(r.getAs("name")) && (Integer)r.getAs("age") == 35)); + assertTrue(results.stream().anyMatch(r -> "Alice".equals(r.getAs("name")) && (Integer) r.getAs("age") == 30)); + assertTrue(results.stream().anyMatch(r -> "Bob".equals(r.getAs("name")) && (Integer) r.getAs("age") == 25)); + assertTrue(results.stream().anyMatch(r -> "Charlie".equals(r.getAs("name")) && (Integer) r.getAs("age") == 35)); } @Test @@ -51,9 +50,8 @@ public class SparkReadWriteTest extends SparkTestBase { // Create test data List people = Arrays.asList( - new Person("Alice", 30), - new Person("Bob", 25) - ); + new Person("Alice", 30), + new Person("Bob", 25)); Dataset df = spark.createDataFrame(people, Person.class); @@ -75,10 +73,9 @@ public class SparkReadWriteTest extends SparkTestBase { // Create test data List people = Arrays.asList( - new Person("Alice", 30), - new Person("Bob", 25), - new Person("Charlie", 35) - ); + new Person("Alice", 30), + new Person("Bob", 25), + new Person("Charlie", 35)); Dataset df = spark.createDataFrame(people, Person.class); @@ -100,11 +97,10 @@ public class SparkReadWriteTest extends SparkTestBase { // Create test data with multiple years List people = Arrays.asList( - new PersonWithYear("Alice", 30, 2020), - new PersonWithYear("Bob", 25, 2021), - new PersonWithYear("Charlie", 35, 2020), - new PersonWithYear("David", 28, 2021) - ); + new PersonWithYear("Alice", 30, 2020), + new PersonWithYear("Bob", 25, 2021), + new PersonWithYear("Charlie", 35, 2020), + new PersonWithYear("David", 28, 2021)); Dataset df = spark.createDataFrame(people, PersonWithYear.class); @@ -117,11 +113,11 @@ public class SparkReadWriteTest extends SparkTestBase { // Verify assertEquals(4, readDf.count()); - + // Verify partition filtering works Dataset filtered2020 = readDf.filter("year = 2020"); assertEquals(2, filtered2020.count()); - + Dataset filtered2021 = readDf.filter("year = 2021"); assertEquals(2, filtered2021.count()); } @@ -134,17 +130,15 @@ public class SparkReadWriteTest extends SparkTestBase { // Write first batch List batch1 = Arrays.asList( - new Person("Alice", 30), - new Person("Bob", 25) - ); + new Person("Alice", 30), + new Person("Bob", 25)); Dataset df1 = spark.createDataFrame(batch1, Person.class); df1.write().mode(SaveMode.Overwrite).parquet(outputPath); // Append second batch List batch2 = Arrays.asList( - new Person("Charlie", 35), - new Person("David", 28) - ); + new Person("Charlie", 35), + new Person("David", 28)); Dataset df2 = spark.createDataFrame(batch2, Person.class); df2.write().mode(SaveMode.Append).parquet(outputPath); @@ -159,7 +153,7 @@ public class SparkReadWriteTest extends SparkTestBase { // Create a larger dataset Dataset largeDf = spark.range(0, 10000) - .selectExpr("id as value", "id * 2 as doubled"); + .selectExpr("id as value", "id * 2 as doubled"); String outputPath = getTestPath("large_dataset.parquet"); largeDf.write().mode(SaveMode.Overwrite).parquet(outputPath); @@ -167,7 +161,7 @@ public class SparkReadWriteTest extends SparkTestBase { // Read back and verify Dataset readDf = spark.read().parquet(outputPath); assertEquals(10000, readDf.count()); - + // Verify some data (sort to ensure deterministic order) Row firstRow = readDf.orderBy("value").first(); assertEquals(0L, firstRow.getLong(0)); @@ -179,17 +173,29 @@ public class SparkReadWriteTest extends SparkTestBase { private String name; private int age; - public Person() {} + public Person() { + } public Person(String name, int age) { this.name = name; this.age = age; } - public String getName() { return name; } - public void setName(String name) { this.name = name; } - public int getAge() { return age; } - public void setAge(int age) { this.age = age; } + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } } public static class PersonWithYear implements java.io.Serializable { @@ -197,7 +203,8 @@ public class SparkReadWriteTest extends SparkTestBase { private int age; private int year; - public PersonWithYear() {} + public PersonWithYear() { + } public PersonWithYear(String name, int age, int year) { this.name = name; @@ -205,12 +212,28 @@ public class SparkReadWriteTest extends SparkTestBase { this.year = year; } - public String getName() { return name; } - public void setName(String name) { this.name = name; } - public int getAge() { return age; } - public void setAge(int age) { this.age = age; } - public int getYear() { return year; } - public void setYear(int year) { this.year = year; } + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + public int getYear() { + return year; + } + + public void setYear(int year) { + this.year = year; + } } } -