From 89a6d42cee6e37c281df9460b5a8a38d8773de1c Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 22 Nov 2025 12:23:48 -0800 Subject: [PATCH] Complete Spark integration test suite --- test/java/spark/.gitignore | 31 ++ test/java/spark/CI_SETUP.md | 273 +++++++++++++ test/java/spark/Makefile | 76 ++++ test/java/spark/README.md | 361 ++++++++++++++++++ test/java/spark/docker-compose.yml | 72 ++++ test/java/spark/pom.xml | 159 ++++++++ test/java/spark/quick-start.sh | 147 +++++++ test/java/spark/run-tests.sh | 44 +++ .../seaweed/spark/SparkSeaweedFSExample.java | 143 +++++++ .../seaweed/spark/SparkReadWriteTest.java | 216 +++++++++++ .../test/java/seaweed/spark/SparkSQLTest.java | 236 ++++++++++++ .../java/seaweed/spark/SparkTestBase.java | 126 ++++++ .../spark/src/test/resources/log4j.properties | 18 + test/java/spark/test-one.sh | 40 ++ 14 files changed, 1942 insertions(+) create mode 100644 test/java/spark/.gitignore create mode 100644 test/java/spark/CI_SETUP.md create mode 100644 test/java/spark/Makefile create mode 100644 test/java/spark/README.md create mode 100644 test/java/spark/docker-compose.yml create mode 100644 test/java/spark/pom.xml create mode 100755 test/java/spark/quick-start.sh create mode 100755 test/java/spark/run-tests.sh create mode 100644 test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java create mode 100644 test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java create mode 100644 test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java create mode 100644 test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java create mode 100644 test/java/spark/src/test/resources/log4j.properties create mode 100755 test/java/spark/test-one.sh diff --git a/test/java/spark/.gitignore b/test/java/spark/.gitignore new file mode 100644 index 000000000..59a044211 --- /dev/null +++ b/test/java/spark/.gitignore @@ -0,0 +1,31 @@ +# Maven +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties + +# IDE +.idea/ +*.iml +.vscode/ +.classpath +.project +.settings/ + +# Spark +spark-warehouse/ +metastore_db/ +derby.log + +# Logs +*.log + +# OS +.DS_Store +Thumbs.db + diff --git a/test/java/spark/CI_SETUP.md b/test/java/spark/CI_SETUP.md new file mode 100644 index 000000000..19d8aa970 --- /dev/null +++ b/test/java/spark/CI_SETUP.md @@ -0,0 +1,273 @@ +# GitHub Actions CI/CD Setup + +## Overview + +The Spark integration tests are now configured to run automatically via GitHub Actions. + +## Workflow File + +**Location**: `.github/workflows/spark-integration-tests.yml` + +## Triggers + +The workflow runs automatically on: + +1. **Push to master/main** - When code is pushed to main branches +2. **Pull Requests** - When PRs target master/main +3. **Manual Trigger** - Via workflow_dispatch in GitHub UI + +The workflow only runs when changes are detected in: +- `test/java/spark/**` +- `other/java/hdfs2/**` +- `other/java/hdfs3/**` +- `other/java/client/**` +- The workflow file itself + +## Jobs + +### Job 1: spark-tests (Required) +**Duration**: ~5-10 minutes + +Steps: +1. ✓ Checkout code +2. ✓ Setup JDK 11 +3. ✓ Start SeaweedFS (master, volume, filer) +4. ✓ Build project +5. ✓ Run all integration tests (10 tests) +6. ✓ Upload test results +7. ✓ Publish test report +8. ✓ Cleanup + +**Test Coverage**: +- SparkReadWriteTest: 6 tests +- SparkSQLTest: 4 tests + +### Job 2: spark-example (Optional) +**Duration**: ~5 minutes +**Runs**: Only on push/manual trigger (not on PRs) + +Steps: +1. ✓ Checkout code +2. ✓ Setup JDK 11 +3. ✓ Download Apache Spark 3.5.0 (cached) +4. ✓ Start SeaweedFS +5. ✓ Build project +6. ✓ Run example Spark application +7. ✓ Verify output +8. ✓ Cleanup + +### Job 3: summary (Status Check) +**Duration**: < 1 minute + +Provides overall test status summary. + +## Viewing Results + +### In GitHub UI + +1. Go to the **Actions** tab in your GitHub repository +2. Click on **Spark Integration Tests** workflow +3. View individual workflow runs +4. Check test reports and logs + +### Status Badge + +Add this badge to your README.md to show the workflow status: + +```markdown +[![Spark Integration Tests](https://github.com/seaweedfs/seaweedfs/actions/workflows/spark-integration-tests.yml/badge.svg)](https://github.com/seaweedfs/seaweedfs/actions/workflows/spark-integration-tests.yml) +``` + +### Test Reports + +After each run: +- Test results are uploaded as artifacts (retained for 30 days) +- Detailed JUnit reports are published +- Logs are available for each step + +## Configuration + +### Environment Variables + +Set in the workflow: +```yaml +env: + SEAWEEDFS_TEST_ENABLED: true + SEAWEEDFS_FILER_HOST: localhost + SEAWEEDFS_FILER_PORT: 8888 + SEAWEEDFS_FILER_GRPC_PORT: 18888 +``` + +### Timeout + +- spark-tests job: 30 minutes max +- spark-example job: 20 minutes max + +## Troubleshooting CI Failures + +### SeaweedFS Connection Issues + +**Symptom**: Tests fail with connection refused + +**Check**: +1. View SeaweedFS logs in the workflow output +2. Look for "Display SeaweedFS logs on failure" step +3. Verify health check succeeded + +**Solution**: The workflow already includes retry logic and health checks + +### Test Failures + +**Symptom**: Tests pass locally but fail in CI + +**Check**: +1. Download test artifacts from the workflow run +2. Review detailed surefire reports +3. Check for timing issues or resource constraints + +**Common Issues**: +- Docker startup timing (already handled with 30 retries) +- Network issues (retry logic included) +- Resource limits (CI has sufficient memory) + +### Build Failures + +**Symptom**: Maven build fails + +**Check**: +1. Verify dependencies are available +2. Check Maven cache +3. Review build logs + +### Example Application Failures + +**Note**: This job is optional and only runs on push/manual trigger + +**Check**: +1. Verify Spark was downloaded and cached correctly +2. Check spark-submit logs +3. Verify SeaweedFS output directory + +## Manual Workflow Trigger + +To manually run the workflow: + +1. Go to **Actions** tab +2. Select **Spark Integration Tests** +3. Click **Run workflow** button +4. Select branch +5. Click **Run workflow** + +This is useful for: +- Testing changes before pushing +- Re-running failed tests +- Testing with different configurations + +## Local Testing Matching CI + +To run tests locally that match the CI environment: + +```bash +# Use the same Docker setup as CI +cd test/java/spark +docker-compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer + +# Wait for services (same as CI) +for i in {1..30}; do + curl -f http://localhost:8888/ && break + sleep 2 +done + +# Run tests (same environment variables as CI) +export SEAWEEDFS_TEST_ENABLED=true +export SEAWEEDFS_FILER_HOST=localhost +export SEAWEEDFS_FILER_PORT=8888 +export SEAWEEDFS_FILER_GRPC_PORT=18888 +mvn test -B + +# Cleanup +docker-compose down -v +``` + +## Maintenance + +### Updating Spark Version + +To update to a newer Spark version: + +1. Update `pom.xml`: Change `` +2. Update workflow: Change Spark download URL +3. Test locally first +4. Create PR to test in CI + +### Updating Java Version + +1. Update `pom.xml`: Change `` and `` +2. Update workflow: Change JDK version in `setup-java` steps +3. Test locally +4. Update README with new requirements + +### Adding New Tests + +New test classes are automatically discovered and run by the workflow. +Just ensure they: +- Extend `SparkTestBase` +- Use `skipIfTestsDisabled()` +- Are in the correct package + +## CI Performance + +### Typical Run Times + +| Job | Duration | Can Fail Build? | +|-----|----------|-----------------| +| spark-tests | 5-10 min | Yes | +| spark-example | 5 min | No (optional) | +| summary | < 1 min | Only if tests fail | + +### Optimizations + +The workflow includes: +- ✓ Maven dependency caching +- ✓ Spark binary caching +- ✓ Parallel job execution +- ✓ Smart path filtering +- ✓ Docker layer caching + +### Resource Usage + +- Memory: ~4GB per job +- Disk: ~2GB (cached) +- Network: ~500MB (first run) + +## Security Considerations + +- No secrets required (tests use default ports) +- Runs in isolated Docker environment +- Clean up removes all test data +- No external services accessed + +## Future Enhancements + +Potential improvements: +- [ ] Matrix testing (multiple Spark versions) +- [ ] Performance benchmarking +- [ ] Code coverage reporting +- [ ] Integration with larger datasets +- [ ] Multi-node Spark cluster testing + +## Support + +If CI tests fail: + +1. Check workflow logs in GitHub Actions +2. Download test artifacts for detailed reports +3. Try reproducing locally using the "Local Testing" section above +4. Review recent changes in the failing paths +5. Check SeaweedFS logs in the workflow output + +For persistent issues: +- Open an issue with workflow run link +- Include test failure logs +- Note if it passes locally + diff --git a/test/java/spark/Makefile b/test/java/spark/Makefile new file mode 100644 index 000000000..9523f6496 --- /dev/null +++ b/test/java/spark/Makefile @@ -0,0 +1,76 @@ +.PHONY: help build test test-local test-docker clean run-example docker-up docker-down + +help: + @echo "SeaweedFS Spark Integration Tests" + @echo "" + @echo "Available targets:" + @echo " build - Build the project" + @echo " test - Run integration tests (requires SeaweedFS running)" + @echo " test-local - Run tests against local SeaweedFS" + @echo " test-docker - Run tests in Docker with SeaweedFS" + @echo " run-example - Run the example Spark application" + @echo " docker-up - Start SeaweedFS in Docker" + @echo " docker-down - Stop SeaweedFS Docker containers" + @echo " clean - Clean build artifacts" + +build: + mvn clean package + +test: + @if [ -z "$$SEAWEEDFS_TEST_ENABLED" ]; then \ + echo "Setting SEAWEEDFS_TEST_ENABLED=true"; \ + export SEAWEEDFS_TEST_ENABLED=true; \ + fi + mvn test + +test-local: + @echo "Testing against local SeaweedFS (localhost:8888)..." + ./run-tests.sh + +test-docker: + @echo "Running tests in Docker..." + docker-compose up --build --abort-on-container-exit spark-tests + docker-compose down + +docker-up: + @echo "Starting SeaweedFS in Docker..." + docker-compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer + @echo "Waiting for services to be ready..." + @sleep 5 + @echo "SeaweedFS is ready!" + @echo " Master: http://localhost:9333" + @echo " Filer: http://localhost:8888" + +docker-down: + @echo "Stopping SeaweedFS Docker containers..." + docker-compose down -v + +run-example: + @echo "Running example application..." + @if ! command -v spark-submit > /dev/null; then \ + echo "Error: spark-submit not found. Please install Apache Spark."; \ + exit 1; \ + fi + spark-submit \ + --class seaweed.spark.SparkSeaweedFSExample \ + --master local[2] \ + --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \ + --conf spark.hadoop.fs.seaweed.filer.host=localhost \ + --conf spark.hadoop.fs.seaweed.filer.port=8888 \ + --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \ + --conf spark.hadoop.fs.seaweed.replication="" \ + target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \ + seaweedfs://localhost:8888/spark-example-output + +clean: + mvn clean + @echo "Build artifacts cleaned" + +verify-seaweedfs: + @echo "Verifying SeaweedFS connection..." + @curl -f http://localhost:8888/ > /dev/null 2>&1 && \ + echo "✓ SeaweedFS filer is accessible" || \ + (echo "✗ SeaweedFS filer is not accessible at http://localhost:8888"; exit 1) + +.DEFAULT_GOAL := help + diff --git a/test/java/spark/README.md b/test/java/spark/README.md new file mode 100644 index 000000000..af1fdd29e --- /dev/null +++ b/test/java/spark/README.md @@ -0,0 +1,361 @@ +# SeaweedFS Spark Integration Tests + +Comprehensive integration tests for Apache Spark with SeaweedFS HDFS client. + +## Overview + +This test suite validates that Apache Spark works correctly with SeaweedFS as the storage backend, covering: + +- **Data I/O**: Reading and writing data in various formats (Parquet, CSV, JSON) +- **Spark SQL**: Complex SQL queries, joins, aggregations, and window functions +- **Partitioning**: Partitioned writes and partition pruning +- **Performance**: Large dataset operations + +## Prerequisites + +### 1. Running SeaweedFS + +Start SeaweedFS with default ports: + +```bash +# Terminal 1: Start master +weed master + +# Terminal 2: Start volume server +weed volume -mserver=localhost:9333 + +# Terminal 3: Start filer +weed filer -master=localhost:9333 +``` + +Verify services are running: +- Master: http://localhost:9333 +- Filer HTTP: http://localhost:8888 +- Filer gRPC: localhost:18888 + +### 2. Java and Maven + +- Java 8 or higher +- Maven 3.6 or higher + +### 3. Apache Spark (for standalone execution) + +Download and extract Apache Spark 3.5.0: + +```bash +wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz +tar xzf spark-3.5.0-bin-hadoop3.tgz +export SPARK_HOME=$(pwd)/spark-3.5.0-bin-hadoop3 +export PATH=$SPARK_HOME/bin:$PATH +``` + +## Building + +```bash +mvn clean package +``` + +This creates: +- Test JAR: `target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar` +- Fat JAR (with dependencies): `target/original-seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar` + +## Running Integration Tests + +### Quick Test + +Run all integration tests (requires running SeaweedFS): + +```bash +# Enable integration tests +export SEAWEEDFS_TEST_ENABLED=true + +# Run all tests +mvn test +``` + +### Run Specific Test + +```bash +export SEAWEEDFS_TEST_ENABLED=true + +# Run only read/write tests +mvn test -Dtest=SparkReadWriteTest + +# Run only SQL tests +mvn test -Dtest=SparkSQLTest +``` + +### Custom SeaweedFS Configuration + +If your SeaweedFS is running on a different host or port: + +```bash +export SEAWEEDFS_TEST_ENABLED=true +export SEAWEEDFS_FILER_HOST=my-seaweedfs-host +export SEAWEEDFS_FILER_PORT=8888 +export SEAWEEDFS_FILER_GRPC_PORT=18888 + +mvn test +``` + +### Skip Tests + +By default, tests are skipped if `SEAWEEDFS_TEST_ENABLED` is not set: + +```bash +mvn test # Tests will be skipped with message +``` + +## Running the Example Application + +### Local Mode + +Run the example application in Spark local mode: + +```bash +spark-submit \ + --class seaweed.spark.SparkSeaweedFSExample \ + --master local[2] \ + --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \ + --conf spark.hadoop.fs.seaweed.filer.host=localhost \ + --conf spark.hadoop.fs.seaweed.filer.port=8888 \ + --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \ + --conf spark.hadoop.fs.seaweed.replication="" \ + target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \ + seaweedfs://localhost:8888/spark-example-output +``` + +### Cluster Mode + +For production Spark clusters: + +```bash +spark-submit \ + --class seaweed.spark.SparkSeaweedFSExample \ + --master spark://master-host:7077 \ + --deploy-mode cluster \ + --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \ + --conf spark.hadoop.fs.seaweed.filer.host=seaweedfs-filer \ + --conf spark.hadoop.fs.seaweed.filer.port=8888 \ + --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \ + --conf spark.hadoop.fs.seaweed.replication=001 \ + --conf spark.executor.instances=4 \ + --conf spark.executor.memory=4g \ + --conf spark.executor.cores=2 \ + target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \ + seaweedfs://seaweedfs-filer:8888/spark-output +``` + +## Configuration + +### SeaweedFS Configuration Options + +Configure Spark to use SeaweedFS through Hadoop configuration: + +| Property | Description | Default | Example | +|----------|-------------|---------|---------| +| `spark.hadoop.fs.seaweedfs.impl` | FileSystem implementation class | - | `seaweed.hdfs.SeaweedFileSystem` | +| `spark.hadoop.fs.seaweed.filer.host` | SeaweedFS filer hostname | `localhost` | `seaweedfs-filer` | +| `spark.hadoop.fs.seaweed.filer.port` | SeaweedFS filer HTTP port | `8888` | `8888` | +| `spark.hadoop.fs.seaweed.filer.port.grpc` | SeaweedFS filer gRPC port | `18888` | `18888` | +| `spark.hadoop.fs.seaweed.replication` | Replication strategy | (uses HDFS default) | `001`, `""` (filer default) | +| `spark.hadoop.fs.seaweed.buffer.size` | Buffer size for I/O | `4MB` | `8388608` | + +### Replication Configuration Priority + +1. **Non-empty value** (e.g., `001`) - uses that specific replication +2. **Empty string** (`""`) - uses SeaweedFS filer's default replication +3. **Not configured** - uses Hadoop/Spark's replication parameter + +## Test Coverage + +### SparkReadWriteTest + +- ✓ Write and read Parquet files +- ✓ Write and read CSV files with headers +- ✓ Write and read JSON files +- ✓ Partitioned data writes with partition pruning +- ✓ Append mode operations +- ✓ Large dataset handling (10,000+ rows) + +### SparkSQLTest + +- ✓ Create tables and run SELECT queries +- ✓ Aggregation queries (GROUP BY, SUM, AVG) +- ✓ JOIN operations between datasets +- ✓ Window functions (RANK, PARTITION BY) + +## Continuous Integration + +### GitHub Actions + +A GitHub Actions workflow is configured at `.github/workflows/spark-integration-tests.yml` that automatically: +- Runs on push/PR to `master`/`main` when Spark or HDFS code changes +- Starts SeaweedFS in Docker +- Runs all integration tests +- Runs the example application +- Uploads test reports +- Can be triggered manually via workflow_dispatch + +The workflow includes two jobs: +1. **spark-tests**: Runs all integration tests (10 tests) +2. **spark-example**: Runs the example Spark application + +View the workflow status in the GitHub Actions tab of the repository. + +### CI-Friendly Test Execution + +```bash +# In CI environment +./scripts/start-seaweedfs.sh # Start SeaweedFS in background +export SEAWEEDFS_TEST_ENABLED=true +mvn clean test +./scripts/stop-seaweedfs.sh # Cleanup +``` + +### Docker-Based Testing + +Use docker-compose for isolated testing: + +```bash +docker-compose up -d seaweedfs +export SEAWEEDFS_TEST_ENABLED=true +mvn test +docker-compose down +``` + +## Troubleshooting + +### Tests are Skipped + +**Symptom**: Tests show "Skipping test - SEAWEEDFS_TEST_ENABLED not set" + +**Solution**: +```bash +export SEAWEEDFS_TEST_ENABLED=true +mvn test +``` + +### Connection Refused Errors + +**Symptom**: `java.net.ConnectException: Connection refused` + +**Solution**: +1. Verify SeaweedFS is running: + ```bash + curl http://localhost:8888/ + ``` + +2. Check if ports are accessible: + ```bash + netstat -an | grep 8888 + netstat -an | grep 18888 + ``` + +### ClassNotFoundException: seaweed.hdfs.SeaweedFileSystem + +**Symptom**: Spark cannot find the SeaweedFS FileSystem implementation + +**Solution**: +1. Ensure the SeaweedFS HDFS client is in your classpath +2. For spark-submit, add the JAR: + ```bash + spark-submit --jars /path/to/seaweedfs-hadoop3-client-*.jar ... + ``` + +### Out of Memory Errors + +**Symptom**: `java.lang.OutOfMemoryError: Java heap space` + +**Solution**: +```bash +mvn test -DargLine="-Xmx4g" +``` + +For spark-submit: +```bash +spark-submit --driver-memory 4g --executor-memory 4g ... +``` + +### gRPC Version Conflicts + +**Symptom**: `java.lang.NoSuchMethodError` related to gRPC + +**Solution**: Ensure consistent gRPC versions. The project uses Spark 3.5.0 compatible versions. + +## Performance Tips + +1. **Increase buffer size** for large files: + ```bash + --conf spark.hadoop.fs.seaweed.buffer.size=8388608 + ``` + +2. **Use appropriate replication** based on your cluster: + ```bash + --conf spark.hadoop.fs.seaweed.replication=001 + ``` + +3. **Enable partition pruning** by partitioning data on commonly filtered columns + +4. **Use columnar formats** (Parquet) for better performance + +## Additional Examples + +### PySpark with SeaweedFS + +```python +from pyspark.sql import SparkSession + +spark = SparkSession.builder \ + .appName("PySparkSeaweedFS") \ + .config("spark.hadoop.fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem") \ + .config("spark.hadoop.fs.seaweed.filer.host", "localhost") \ + .config("spark.hadoop.fs.seaweed.filer.port", "8888") \ + .config("spark.hadoop.fs.seaweed.filer.port.grpc", "18888") \ + .getOrCreate() + +# Write data +df = spark.range(1000) +df.write.parquet("seaweedfs://localhost:8888/pyspark-output") + +# Read data +df_read = spark.read.parquet("seaweedfs://localhost:8888/pyspark-output") +df_read.show() +``` + +### Scala with SeaweedFS + +```scala +import org.apache.spark.sql.SparkSession + +val spark = SparkSession.builder() + .appName("ScalaSeaweedFS") + .config("spark.hadoop.fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem") + .config("spark.hadoop.fs.seaweed.filer.host", "localhost") + .config("spark.hadoop.fs.seaweed.filer.port", "8888") + .config("spark.hadoop.fs.seaweed.filer.port.grpc", "18888") + .getOrCreate() + +// Write data +val df = spark.range(1000) +df.write.parquet("seaweedfs://localhost:8888/scala-output") + +// Read data +val dfRead = spark.read.parquet("seaweedfs://localhost:8888/scala-output") +dfRead.show() +``` + +## Contributing + +When adding new tests: + +1. Extend `SparkTestBase` for new test classes +2. Use `skipIfTestsDisabled()` in test methods +3. Clean up test data in tearDown +4. Add documentation to this README +5. Ensure tests work in CI environment + +## License + +Same as SeaweedFS project. + diff --git a/test/java/spark/docker-compose.yml b/test/java/spark/docker-compose.yml new file mode 100644 index 000000000..a8b006765 --- /dev/null +++ b/test/java/spark/docker-compose.yml @@ -0,0 +1,72 @@ +version: '3.8' + +services: + seaweedfs-master: + image: seaweedfs:test-grpc-v1.77.0 + container_name: seaweedfs-spark-master + ports: + - "9333:9333" + - "19333:19333" + command: "master -ip=seaweedfs-master -ip.bind=0.0.0.0" + networks: + - seaweedfs-spark + + seaweedfs-volume: + image: seaweedfs:test-grpc-v1.77.0 + container_name: seaweedfs-spark-volume + ports: + - "8080:8080" + - "18080:18080" + command: "volume -mserver=seaweedfs-master:9333 -ip.bind=0.0.0.0 -port=8080" + depends_on: + - seaweedfs-master + networks: + - seaweedfs-spark + + seaweedfs-filer: + image: seaweedfs:test-grpc-v1.77.0 + container_name: seaweedfs-spark-filer + ports: + - "8888:8888" + - "18888:18888" + command: ["filer", "-master=seaweedfs-master:9333", "-ip.bind=0.0.0.0", "-port=8888", "-port.grpc=18888"] + depends_on: + - seaweedfs-master + - seaweedfs-volume + networks: + - seaweedfs-spark + environment: + - GRPC_GO_LOG_VERBOSITY_LEVEL=99 + - GRPC_GO_LOG_SEVERITY_LEVEL=info + - GLOG_v=4 + - GLOG_logtostderr=true + - GODEBUG=http2debug=2 + healthcheck: + test: ["CMD", "wget", "--spider", "-q", "http://localhost:8888/"] + interval: 5s + timeout: 3s + retries: 10 + start_period: 10s + + spark-tests: + image: maven:3.9-eclipse-temurin-17 + container_name: seaweedfs-spark-tests + network_mode: "host" + volumes: + - .:/workspace + - ~/.m2:/root/.m2 + working_dir: /workspace + environment: + - SEAWEEDFS_TEST_ENABLED=true + - SEAWEEDFS_FILER_HOST=localhost + - SEAWEEDFS_FILER_PORT=8888 + - SEAWEEDFS_FILER_GRPC_PORT=18888 + - HADOOP_HOME=/tmp + command: sh -c "sleep 30 && mvn test" + mem_limit: 4g + cpus: 2 + +networks: + seaweedfs-spark: + driver: bridge + diff --git a/test/java/spark/pom.xml b/test/java/spark/pom.xml new file mode 100644 index 000000000..722853f4d --- /dev/null +++ b/test/java/spark/pom.xml @@ -0,0 +1,159 @@ + + + 4.0.0 + + com.seaweedfs + seaweedfs-spark-integration-tests + 1.0-SNAPSHOT + jar + + SeaweedFS Spark Integration Tests + Integration tests for Apache Spark with SeaweedFS HDFS client + + + UTF-8 + 1.8 + 1.8 + 3.5.0 + 3.3.6 + 2.12 + 4.13.2 + + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + provided + + + + + com.seaweedfs + seaweedfs-hadoop3-client + 3.80 + + + + + junit + junit + ${junit.version} + test + + + + + org.slf4j + slf4j-api + 1.7.36 + + + + org.slf4j + slf4j-log4j12 + 1.7.36 + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + 1.8 + 1.8 + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0 + + ${skipTests} + + **/*Test.java + + + -Xmx2g + -Dhadoop.home.dir=/tmp + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/sun.nio.cs=ALL-UNNAMED + --add-opens=java.base/sun.security.action=ALL-UNNAMED + --add-opens=java.base/sun.util.calendar=ALL-UNNAMED + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED + + + /tmp + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.5.0 + + + package + + shade + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + seaweed.spark.SparkSeaweedFSExample + + + + + + + + + + diff --git a/test/java/spark/quick-start.sh b/test/java/spark/quick-start.sh new file mode 100755 index 000000000..6ab73cf36 --- /dev/null +++ b/test/java/spark/quick-start.sh @@ -0,0 +1,147 @@ +#!/bin/bash + +set -e + +echo "=== SeaweedFS Spark Integration Tests Quick Start ===" +echo "" + +# Check if SeaweedFS is running +check_seaweedfs() { + echo "Checking if SeaweedFS is running..." + if curl -f http://localhost:8888/ > /dev/null 2>&1; then + echo "✓ SeaweedFS filer is accessible at http://localhost:8888" + return 0 + else + echo "✗ SeaweedFS filer is not accessible" + return 1 + fi +} + +# Start SeaweedFS with Docker if not running +start_seaweedfs() { + echo "" + echo "Starting SeaweedFS with Docker..." + docker-compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer + + echo "Waiting for SeaweedFS to be ready..." + for i in {1..30}; do + if curl -f http://localhost:8888/ > /dev/null 2>&1; then + echo "✓ SeaweedFS is ready!" + return 0 + fi + echo -n "." + sleep 2 + done + + echo "" + echo "✗ SeaweedFS failed to start" + return 1 +} + +# Build the project +build_project() { + echo "" + echo "Building the project..." + mvn clean package -DskipTests + echo "✓ Build completed" +} + +# Run tests +run_tests() { + echo "" + echo "Running integration tests..." + export SEAWEEDFS_TEST_ENABLED=true + mvn test + echo "✓ Tests completed" +} + +# Run example +run_example() { + echo "" + echo "Running example application..." + + if ! command -v spark-submit > /dev/null; then + echo "⚠ spark-submit not found. Skipping example application." + echo "To run the example, install Apache Spark and try: make run-example" + return 0 + fi + + spark-submit \ + --class seaweed.spark.SparkSeaweedFSExample \ + --master local[2] \ + --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \ + --conf spark.hadoop.fs.seaweed.filer.host=localhost \ + --conf spark.hadoop.fs.seaweed.filer.port=8888 \ + --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \ + target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \ + seaweedfs://localhost:8888/spark-quickstart-output + + echo "✓ Example completed" +} + +# Cleanup +cleanup() { + echo "" + echo "Cleaning up..." + docker-compose down -v + echo "✓ Cleanup completed" +} + +# Main execution +main() { + # Check if Docker is available + if ! command -v docker > /dev/null; then + echo "Error: Docker is not installed or not in PATH" + exit 1 + fi + + # Check if Maven is available + if ! command -v mvn > /dev/null; then + echo "Error: Maven is not installed or not in PATH" + exit 1 + fi + + # Check if SeaweedFS is running, if not start it + if ! check_seaweedfs; then + read -p "Do you want to start SeaweedFS with Docker? (y/n) " -n 1 -r + echo + if [[ $REPLY =~ ^[Yy]$ ]]; then + start_seaweedfs || exit 1 + else + echo "Please start SeaweedFS manually and rerun this script." + exit 1 + fi + fi + + # Build project + build_project || exit 1 + + # Run tests + run_tests || exit 1 + + # Run example if Spark is available + run_example + + echo "" + echo "=== Quick Start Completed Successfully! ===" + echo "" + echo "Next steps:" + echo " - View test results in target/surefire-reports/" + echo " - Check example output at http://localhost:8888/" + echo " - Run 'make help' for more options" + echo " - Read README.md for detailed documentation" + echo "" + + read -p "Do you want to stop SeaweedFS? (y/n) " -n 1 -r + echo + if [[ $REPLY =~ ^[Yy]$ ]]; then + cleanup + fi +} + +# Handle Ctrl+C +trap cleanup INT + +# Run main +main + diff --git a/test/java/spark/run-tests.sh b/test/java/spark/run-tests.sh new file mode 100755 index 000000000..c68667b54 --- /dev/null +++ b/test/java/spark/run-tests.sh @@ -0,0 +1,44 @@ +#!/bin/bash + +set -e + +echo "=== SeaweedFS Spark Integration Tests Runner ===" +echo "" + +# Check if SeaweedFS is running +check_seaweedfs() { + if curl -f http://localhost:8888/ > /dev/null 2>&1; then + echo "✓ SeaweedFS filer is accessible at http://localhost:8888" + return 0 + else + echo "✗ SeaweedFS filer is not accessible" + return 1 + fi +} + +# Main +if ! check_seaweedfs; then + echo "" + echo "Please start SeaweedFS first. You can use:" + echo " cd test/java/spark && docker-compose up -d" + echo "Or:" + echo " make docker-up" + exit 1 +fi + +echo "" +echo "Running Spark integration tests..." +echo "" + +export SEAWEEDFS_TEST_ENABLED=true +export SEAWEEDFS_FILER_HOST=localhost +export SEAWEEDFS_FILER_PORT=8888 +export SEAWEEDFS_FILER_GRPC_PORT=18888 + +# Run tests +mvn test "$@" + +echo "" +echo "✓ Test run completed" +echo "View detailed reports in: target/surefire-reports/" + diff --git a/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java new file mode 100644 index 000000000..dfab5d8be --- /dev/null +++ b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java @@ -0,0 +1,143 @@ +package seaweed.spark; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; + +/** + * Example Spark application demonstrating SeaweedFS integration. + * + * This can be submitted to a Spark cluster using spark-submit. + * + * Example usage: + * spark-submit \ + * --class seaweed.spark.SparkSeaweedFSExample \ + * --master local[2] \ + * --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \ + * --conf spark.hadoop.fs.seaweed.filer.host=localhost \ + * --conf spark.hadoop.fs.seaweed.filer.port=8888 \ + * --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \ + * target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \ + * seaweedfs://localhost:8888/output + */ +public class SparkSeaweedFSExample { + + public static void main(String[] args) { + if (args.length < 1) { + System.err.println("Usage: SparkSeaweedFSExample "); + System.err.println("Example: seaweedfs://localhost:8888/spark-output"); + System.exit(1); + } + + String outputPath = args[0]; + + // Create Spark session + SparkSession spark = SparkSession.builder() + .appName("SeaweedFS Spark Example") + .getOrCreate(); + + try { + System.out.println("=== SeaweedFS Spark Integration Example ===\n"); + + // Example 1: Generate data and write to SeaweedFS + System.out.println("1. Generating sample data..."); + Dataset data = spark.range(0, 1000) + .selectExpr( + "id", + "id * 2 as doubled", + "CAST(rand() * 100 AS INT) as random_value" + ); + + System.out.println(" Generated " + data.count() + " rows"); + data.show(5); + + // Write as Parquet + String parquetPath = outputPath + "/data.parquet"; + System.out.println("\n2. Writing data to SeaweedFS as Parquet..."); + System.out.println(" Path: " + parquetPath); + + data.write() + .mode(SaveMode.Overwrite) + .parquet(parquetPath); + + System.out.println(" ✓ Write completed"); + + // Read back and verify + System.out.println("\n3. Reading data back from SeaweedFS..."); + Dataset readData = spark.read().parquet(parquetPath); + System.out.println(" Read " + readData.count() + " rows"); + + // Perform aggregation + System.out.println("\n4. Performing aggregation..."); + Dataset stats = readData.selectExpr( + "COUNT(*) as count", + "AVG(random_value) as avg_random", + "MAX(doubled) as max_doubled" + ); + + stats.show(); + + // Write aggregation results + String statsPath = outputPath + "/stats.parquet"; + System.out.println("5. Writing stats to: " + statsPath); + stats.write() + .mode(SaveMode.Overwrite) + .parquet(statsPath); + + // Create a partitioned dataset + System.out.println("\n6. Creating partitioned dataset..."); + Dataset partitionedData = data.selectExpr( + "*", + "CAST(id % 10 AS INT) as partition_key" + ); + + String partitionedPath = outputPath + "/partitioned.parquet"; + System.out.println(" Path: " + partitionedPath); + + partitionedData.write() + .mode(SaveMode.Overwrite) + .partitionBy("partition_key") + .parquet(partitionedPath); + + System.out.println(" ✓ Partitioned write completed"); + + // Read specific partition + System.out.println("\n7. Reading specific partition (partition_key=0)..."); + Dataset partition0 = spark.read() + .parquet(partitionedPath) + .filter("partition_key = 0"); + + System.out.println(" Partition 0 contains " + partition0.count() + " rows"); + partition0.show(5); + + // SQL example + System.out.println("\n8. Using Spark SQL..."); + readData.createOrReplaceTempView("seaweedfs_data"); + + Dataset sqlResult = spark.sql( + "SELECT " + + " CAST(id / 100 AS INT) as bucket, " + + " COUNT(*) as count, " + + " AVG(random_value) as avg_random " + + "FROM seaweedfs_data " + + "GROUP BY CAST(id / 100 AS INT) " + + "ORDER BY bucket" + ); + + System.out.println(" Bucketed statistics:"); + sqlResult.show(); + + System.out.println("\n=== Example completed successfully! ==="); + System.out.println("Output location: " + outputPath); + + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } finally { + spark.stop(); + } + } +} + diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java new file mode 100644 index 000000000..3c49a551f --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java @@ -0,0 +1,216 @@ +package seaweed.spark; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Integration tests for Spark read/write operations with SeaweedFS. + */ +public class SparkReadWriteTest extends SparkTestBase { + + @Test + public void testWriteAndReadParquet() { + skipIfTestsDisabled(); + + // Create test data + List people = Arrays.asList( + new Person("Alice", 30), + new Person("Bob", 25), + new Person("Charlie", 35) + ); + + Dataset df = spark.createDataFrame(people, Person.class); + + // Write to SeaweedFS + String outputPath = getTestPath("people.parquet"); + df.write().mode(SaveMode.Overwrite).parquet(outputPath); + + // Read back from SeaweedFS + Dataset readDf = spark.read().parquet(outputPath); + + // Verify + assertEquals(3, readDf.count()); + assertEquals(2, readDf.columns().length); + + List results = readDf.collectAsList(); + assertTrue(results.stream().anyMatch(r -> "Alice".equals(r.getAs("name")) && (Integer)r.getAs("age") == 30)); + assertTrue(results.stream().anyMatch(r -> "Bob".equals(r.getAs("name")) && (Integer)r.getAs("age") == 25)); + assertTrue(results.stream().anyMatch(r -> "Charlie".equals(r.getAs("name")) && (Integer)r.getAs("age") == 35)); + } + + @Test + public void testWriteAndReadCSV() { + skipIfTestsDisabled(); + + // Create test data + List people = Arrays.asList( + new Person("Alice", 30), + new Person("Bob", 25) + ); + + Dataset df = spark.createDataFrame(people, Person.class); + + // Write to SeaweedFS as CSV + String outputPath = getTestPath("people.csv"); + df.write().mode(SaveMode.Overwrite).option("header", "true").csv(outputPath); + + // Read back from SeaweedFS + Dataset readDf = spark.read().option("header", "true").option("inferSchema", "true").csv(outputPath); + + // Verify + assertEquals(2, readDf.count()); + assertEquals(2, readDf.columns().length); + } + + @Test + public void testWriteAndReadJSON() { + skipIfTestsDisabled(); + + // Create test data + List people = Arrays.asList( + new Person("Alice", 30), + new Person("Bob", 25), + new Person("Charlie", 35) + ); + + Dataset df = spark.createDataFrame(people, Person.class); + + // Write to SeaweedFS as JSON + String outputPath = getTestPath("people.json"); + df.write().mode(SaveMode.Overwrite).json(outputPath); + + // Read back from SeaweedFS + Dataset readDf = spark.read().json(outputPath); + + // Verify + assertEquals(3, readDf.count()); + assertEquals(2, readDf.columns().length); + } + + @Test + public void testWritePartitionedData() { + skipIfTestsDisabled(); + + // Create test data with multiple years + List people = Arrays.asList( + new PersonWithYear("Alice", 30, 2020), + new PersonWithYear("Bob", 25, 2021), + new PersonWithYear("Charlie", 35, 2020), + new PersonWithYear("David", 28, 2021) + ); + + Dataset df = spark.createDataFrame(people, PersonWithYear.class); + + // Write partitioned data to SeaweedFS + String outputPath = getTestPath("people_partitioned"); + df.write().mode(SaveMode.Overwrite).partitionBy("year").parquet(outputPath); + + // Read back from SeaweedFS + Dataset readDf = spark.read().parquet(outputPath); + + // Verify + assertEquals(4, readDf.count()); + + // Verify partition filtering works + Dataset filtered2020 = readDf.filter("year = 2020"); + assertEquals(2, filtered2020.count()); + + Dataset filtered2021 = readDf.filter("year = 2021"); + assertEquals(2, filtered2021.count()); + } + + @Test + public void testAppendMode() { + skipIfTestsDisabled(); + + String outputPath = getTestPath("people_append.parquet"); + + // Write first batch + List batch1 = Arrays.asList( + new Person("Alice", 30), + new Person("Bob", 25) + ); + Dataset df1 = spark.createDataFrame(batch1, Person.class); + df1.write().mode(SaveMode.Overwrite).parquet(outputPath); + + // Append second batch + List batch2 = Arrays.asList( + new Person("Charlie", 35), + new Person("David", 28) + ); + Dataset df2 = spark.createDataFrame(batch2, Person.class); + df2.write().mode(SaveMode.Append).parquet(outputPath); + + // Read back and verify + Dataset readDf = spark.read().parquet(outputPath); + assertEquals(4, readDf.count()); + } + + @Test + public void testLargeDataset() { + skipIfTestsDisabled(); + + // Create a larger dataset + Dataset largeDf = spark.range(0, 10000) + .selectExpr("id as value", "id * 2 as doubled"); + + String outputPath = getTestPath("large_dataset.parquet"); + largeDf.write().mode(SaveMode.Overwrite).parquet(outputPath); + + // Read back and verify + Dataset readDf = spark.read().parquet(outputPath); + assertEquals(10000, readDf.count()); + + // Verify some data + Row firstRow = readDf.first(); + assertEquals(0L, firstRow.getLong(0)); + assertEquals(0L, firstRow.getLong(1)); + } + + // Test data classes + public static class Person implements java.io.Serializable { + private String name; + private int age; + + public Person() {} + + public Person(String name, int age) { + this.name = name; + this.age = age; + } + + public String getName() { return name; } + public void setName(String name) { this.name = name; } + public int getAge() { return age; } + public void setAge(int age) { this.age = age; } + } + + public static class PersonWithYear implements java.io.Serializable { + private String name; + private int age; + private int year; + + public PersonWithYear() {} + + public PersonWithYear(String name, int age, int year) { + this.name = name; + this.age = age; + this.year = year; + } + + public String getName() { return name; } + public void setName(String name) { this.name = name; } + public int getAge() { return age; } + public void setAge(int age) { this.age = age; } + public int getYear() { return year; } + public void setYear(int year) { this.year = year; } + } +} + diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java new file mode 100644 index 000000000..90f46054b --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java @@ -0,0 +1,236 @@ +package seaweed.spark; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Integration tests for Spark SQL operations with SeaweedFS. + */ +public class SparkSQLTest extends SparkTestBase { + + @Test + public void testCreateTableAndQuery() { + skipIfTestsDisabled(); + + // Create test data + List employees = Arrays.asList( + new Employee(1, "Alice", "Engineering", 100000), + new Employee(2, "Bob", "Sales", 80000), + new Employee(3, "Charlie", "Engineering", 120000), + new Employee(4, "David", "Sales", 75000) + ); + + Dataset df = spark.createDataFrame(employees, Employee.class); + + // Write to SeaweedFS + String tablePath = getTestPath("employees"); + df.write().mode(SaveMode.Overwrite).parquet(tablePath); + + // Create temporary view + Dataset employeesDf = spark.read().parquet(tablePath); + employeesDf.createOrReplaceTempView("employees"); + + // Run SQL queries + Dataset engineeringEmployees = spark.sql( + "SELECT name, salary FROM employees WHERE department = 'Engineering'" + ); + + assertEquals(2, engineeringEmployees.count()); + + Dataset highPaidEmployees = spark.sql( + "SELECT name, salary FROM employees WHERE salary > 90000" + ); + + assertEquals(2, highPaidEmployees.count()); + } + + @Test + public void testAggregationQueries() { + skipIfTestsDisabled(); + + // Create sales data + List sales = Arrays.asList( + new Sale("2024-01", "Product A", 100), + new Sale("2024-01", "Product B", 150), + new Sale("2024-02", "Product A", 120), + new Sale("2024-02", "Product B", 180), + new Sale("2024-03", "Product A", 110) + ); + + Dataset df = spark.createDataFrame(sales, Sale.class); + + // Write to SeaweedFS + String tablePath = getTestPath("sales"); + df.write().mode(SaveMode.Overwrite).parquet(tablePath); + + // Create temporary view + Dataset salesDf = spark.read().parquet(tablePath); + salesDf.createOrReplaceTempView("sales"); + + // Aggregate query + Dataset monthlySales = spark.sql( + "SELECT month, SUM(amount) as total FROM sales GROUP BY month ORDER BY month" + ); + + List results = monthlySales.collectAsList(); + assertEquals(3, results.size()); + assertEquals("2024-01", results.get(0).getString(0)); + assertEquals(250, results.get(0).getLong(1)); + } + + @Test + public void testJoinOperations() { + skipIfTestsDisabled(); + + // Create employee data + List employees = Arrays.asList( + new Employee(1, "Alice", "Engineering", 100000), + new Employee(2, "Bob", "Sales", 80000) + ); + + // Create department data + List departments = Arrays.asList( + new Department("Engineering", "Building Products"), + new Department("Sales", "Selling Products") + ); + + Dataset empDf = spark.createDataFrame(employees, Employee.class); + Dataset deptDf = spark.createDataFrame(departments, Department.class); + + // Write to SeaweedFS + String empPath = getTestPath("employees_join"); + String deptPath = getTestPath("departments_join"); + + empDf.write().mode(SaveMode.Overwrite).parquet(empPath); + deptDf.write().mode(SaveMode.Overwrite).parquet(deptPath); + + // Read back and create views + spark.read().parquet(empPath).createOrReplaceTempView("emp"); + spark.read().parquet(deptPath).createOrReplaceTempView("dept"); + + // Join query + Dataset joined = spark.sql( + "SELECT e.name, e.salary, d.description " + + "FROM emp e JOIN dept d ON e.department = d.name" + ); + + assertEquals(2, joined.count()); + + List results = joined.collectAsList(); + assertTrue(results.stream().anyMatch(r -> + "Alice".equals(r.getString(0)) && "Building Products".equals(r.getString(2)) + )); + } + + @Test + public void testWindowFunctions() { + skipIfTestsDisabled(); + + // Create employee data with salaries + List employees = Arrays.asList( + new Employee(1, "Alice", "Engineering", 100000), + new Employee(2, "Bob", "Engineering", 120000), + new Employee(3, "Charlie", "Sales", 80000), + new Employee(4, "David", "Sales", 90000) + ); + + Dataset df = spark.createDataFrame(employees, Employee.class); + + String tablePath = getTestPath("employees_window"); + df.write().mode(SaveMode.Overwrite).parquet(tablePath); + + Dataset employeesDf = spark.read().parquet(tablePath); + employeesDf.createOrReplaceTempView("employees_ranked"); + + // Window function query - rank employees by salary within department + Dataset ranked = spark.sql( + "SELECT name, department, salary, " + + "RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank " + + "FROM employees_ranked" + ); + + assertEquals(4, ranked.count()); + + // Verify Bob has rank 1 in Engineering (highest salary) + List results = ranked.collectAsList(); + Row bobRow = results.stream() + .filter(r -> "Bob".equals(r.getString(0))) + .findFirst() + .orElse(null); + + assertNotNull(bobRow); + assertEquals(1, bobRow.getInt(3)); + } + + // Test data classes + public static class Employee implements java.io.Serializable { + private int id; + private String name; + private String department; + private int salary; + + public Employee() {} + + public Employee(int id, String name, String department, int salary) { + this.id = id; + this.name = name; + this.department = department; + this.salary = salary; + } + + public int getId() { return id; } + public void setId(int id) { this.id = id; } + public String getName() { return name; } + public void setName(String name) { this.name = name; } + public String getDepartment() { return department; } + public void setDepartment(String department) { this.department = department; } + public int getSalary() { return salary; } + public void setSalary(int salary) { this.salary = salary; } + } + + public static class Sale implements java.io.Serializable { + private String month; + private String product; + private int amount; + + public Sale() {} + + public Sale(String month, String product, int amount) { + this.month = month; + this.product = product; + this.amount = amount; + } + + public String getMonth() { return month; } + public void setMonth(String month) { this.month = month; } + public String getProduct() { return product; } + public void setProduct(String product) { this.product = product; } + public int getAmount() { return amount; } + public void setAmount(int amount) { this.amount = amount; } + } + + public static class Department implements java.io.Serializable { + private String name; + private String description; + + public Department() {} + + public Department(String name, String description) { + this.name = name; + this.description = description; + } + + public String getName() { return name; } + public void setName(String name) { this.name = name; } + public String getDescription() { return description; } + public void setDescription(String description) { this.description = description; } + } +} + diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java b/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java new file mode 100644 index 000000000..5241e2b66 --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java @@ -0,0 +1,126 @@ +package seaweed.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +/** + * Base class for Spark integration tests with SeaweedFS. + * + * These tests require a running SeaweedFS cluster. + * Set environment variable SEAWEEDFS_TEST_ENABLED=true to enable these tests. + */ +public abstract class SparkTestBase { + + protected SparkSession spark; + protected static final String TEST_ROOT = "/test-spark"; + protected static final boolean TESTS_ENABLED = + "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); + + // SeaweedFS connection settings + protected static final String SEAWEEDFS_HOST = + System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost"); + protected static final String SEAWEEDFS_PORT = + System.getenv().getOrDefault("SEAWEEDFS_FILER_PORT", "8888"); + protected static final String SEAWEEDFS_GRPC_PORT = + System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888"); + + @Before + public void setUpSpark() throws IOException { + if (!TESTS_ENABLED) { + return; + } + + SparkConf sparkConf = new SparkConf() + .setAppName("SeaweedFS Integration Test") + .setMaster("local[1]") // Single thread to avoid concurrent gRPC issues + .set("spark.driver.host", "localhost") + .set("spark.sql.warehouse.dir", getSeaweedFSPath("/spark-warehouse")) + // SeaweedFS configuration + .set("spark.hadoop.fs.defaultFS", String.format("seaweedfs://%s:%s", SEAWEEDFS_HOST, SEAWEEDFS_PORT)) + .set("spark.hadoop.fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem") + .set("spark.hadoop.fs.seaweed.impl", "seaweed.hdfs.SeaweedFileSystem") + .set("spark.hadoop.fs.seaweed.filer.host", SEAWEEDFS_HOST) + .set("spark.hadoop.fs.seaweed.filer.port", SEAWEEDFS_PORT) + .set("spark.hadoop.fs.seaweed.filer.port.grpc", SEAWEEDFS_GRPC_PORT) + .set("spark.hadoop.fs.AbstractFileSystem.seaweedfs.impl", "seaweed.hdfs.SeaweedAbstractFileSystem") + // Set replication to empty string to use filer default + .set("spark.hadoop.fs.seaweed.replication", "") + // Smaller buffer to reduce load + .set("spark.hadoop.fs.seaweed.buffer.size", "1048576") // 1MB + // Reduce parallelism + .set("spark.default.parallelism", "1") + .set("spark.sql.shuffle.partitions", "1") + // Simpler output committer + .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") + .set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") + // Disable speculative execution to reduce load + .set("spark.speculation", "false"); + + spark = SparkSession.builder() + .config(sparkConf) + .getOrCreate(); + + // Clean up test directory + cleanupTestDirectory(); + } + + @After + public void tearDownSpark() { + if (!TESTS_ENABLED || spark == null) { + return; + } + + try { + // Try to cleanup but don't fail if it doesn't work + cleanupTestDirectory(); + } catch (Exception e) { + System.err.println("Cleanup failed: " + e.getMessage()); + } finally { + try { + spark.stop(); + } catch (Exception e) { + System.err.println("Spark stop failed: " + e.getMessage()); + } + spark = null; + } + } + + protected String getSeaweedFSPath(String path) { + return String.format("seaweedfs://%s:%s%s", SEAWEEDFS_HOST, SEAWEEDFS_PORT, path); + } + + protected String getTestPath(String subPath) { + return getSeaweedFSPath(TEST_ROOT + "/" + subPath); + } + + private void cleanupTestDirectory() { + if (spark != null) { + try { + Configuration conf = spark.sparkContext().hadoopConfiguration(); + org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get( + java.net.URI.create(getSeaweedFSPath("/")), conf); + org.apache.hadoop.fs.Path testPath = new org.apache.hadoop.fs.Path(TEST_ROOT); + if (fs.exists(testPath)) { + fs.delete(testPath, true); + } + } catch (Exception e) { + // Suppress cleanup errors - they shouldn't fail tests + // Common in distributed systems with eventual consistency + System.err.println("Warning: cleanup failed (non-critical): " + e.getMessage()); + } + } + } + + protected void skipIfTestsDisabled() { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + org.junit.Assume.assumeTrue("SEAWEEDFS_TEST_ENABLED not set", false); + } + } +} + diff --git a/test/java/spark/src/test/resources/log4j.properties b/test/java/spark/src/test/resources/log4j.properties new file mode 100644 index 000000000..485b77ac0 --- /dev/null +++ b/test/java/spark/src/test/resources/log4j.properties @@ -0,0 +1,18 @@ +# Set root logger level +log4j.rootLogger=WARN, console + +# Console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Set log levels for specific packages +log4j.logger.org.apache.spark=WARN +log4j.logger.org.apache.hadoop=WARN +log4j.logger.seaweed=INFO + +# Suppress unnecessary warnings +log4j.logger.org.apache.spark.util.Utils=ERROR +log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR + diff --git a/test/java/spark/test-one.sh b/test/java/spark/test-one.sh new file mode 100755 index 000000000..aff6f15bf --- /dev/null +++ b/test/java/spark/test-one.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +# Run a single test method for quick iteration + +set -e + +if [ $# -eq 0 ]; then + echo "Usage: ./test-one.sh #" + echo "" + echo "Examples:" + echo " ./test-one.sh SparkReadWriteTest#testWriteAndReadParquet" + echo " ./test-one.sh SparkSQLTest#testCreateTableAndQuery" + echo "" + exit 1 +fi + +# Check if SeaweedFS is running +if ! curl -f http://localhost:8888/ > /dev/null 2>&1; then + echo "✗ SeaweedFS filer is not accessible at http://localhost:8888" + echo "" + echo "Please start SeaweedFS first:" + echo " docker-compose up -d" + echo "" + exit 1 +fi + +echo "✓ SeaweedFS filer is accessible" +echo "" +echo "Running test: $1" +echo "" + +# Set environment variables +export SEAWEEDFS_TEST_ENABLED=true +export SEAWEEDFS_FILER_HOST=localhost +export SEAWEEDFS_FILER_PORT=8888 +export SEAWEEDFS_FILER_GRPC_PORT=18888 + +# Run the specific test +mvn test -Dtest="$1" +