Browse Source

Complete Spark integration test suite

pull/7526/head
chrislu 7 days ago
parent
commit
89a6d42cee
  1. 31
      test/java/spark/.gitignore
  2. 273
      test/java/spark/CI_SETUP.md
  3. 76
      test/java/spark/Makefile
  4. 361
      test/java/spark/README.md
  5. 72
      test/java/spark/docker-compose.yml
  6. 159
      test/java/spark/pom.xml
  7. 147
      test/java/spark/quick-start.sh
  8. 44
      test/java/spark/run-tests.sh
  9. 143
      test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java
  10. 216
      test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java
  11. 236
      test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java
  12. 126
      test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java
  13. 18
      test/java/spark/src/test/resources/log4j.properties
  14. 40
      test/java/spark/test-one.sh

31
test/java/spark/.gitignore

@ -0,0 +1,31 @@
# Maven
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
# IDE
.idea/
*.iml
.vscode/
.classpath
.project
.settings/
# Spark
spark-warehouse/
metastore_db/
derby.log
# Logs
*.log
# OS
.DS_Store
Thumbs.db

273
test/java/spark/CI_SETUP.md

@ -0,0 +1,273 @@
# GitHub Actions CI/CD Setup
## Overview
The Spark integration tests are now configured to run automatically via GitHub Actions.
## Workflow File
**Location**: `.github/workflows/spark-integration-tests.yml`
## Triggers
The workflow runs automatically on:
1. **Push to master/main** - When code is pushed to main branches
2. **Pull Requests** - When PRs target master/main
3. **Manual Trigger** - Via workflow_dispatch in GitHub UI
The workflow only runs when changes are detected in:
- `test/java/spark/**`
- `other/java/hdfs2/**`
- `other/java/hdfs3/**`
- `other/java/client/**`
- The workflow file itself
## Jobs
### Job 1: spark-tests (Required)
**Duration**: ~5-10 minutes
Steps:
1. ✓ Checkout code
2. ✓ Setup JDK 11
3. ✓ Start SeaweedFS (master, volume, filer)
4. ✓ Build project
5. ✓ Run all integration tests (10 tests)
6. ✓ Upload test results
7. ✓ Publish test report
8. ✓ Cleanup
**Test Coverage**:
- SparkReadWriteTest: 6 tests
- SparkSQLTest: 4 tests
### Job 2: spark-example (Optional)
**Duration**: ~5 minutes
**Runs**: Only on push/manual trigger (not on PRs)
Steps:
1. ✓ Checkout code
2. ✓ Setup JDK 11
3. ✓ Download Apache Spark 3.5.0 (cached)
4. ✓ Start SeaweedFS
5. ✓ Build project
6. ✓ Run example Spark application
7. ✓ Verify output
8. ✓ Cleanup
### Job 3: summary (Status Check)
**Duration**: < 1 minute
Provides overall test status summary.
## Viewing Results
### In GitHub UI
1. Go to the **Actions** tab in your GitHub repository
2. Click on **Spark Integration Tests** workflow
3. View individual workflow runs
4. Check test reports and logs
### Status Badge
Add this badge to your README.md to show the workflow status:
```markdown
[![Spark Integration Tests](https://github.com/seaweedfs/seaweedfs/actions/workflows/spark-integration-tests.yml/badge.svg)](https://github.com/seaweedfs/seaweedfs/actions/workflows/spark-integration-tests.yml)
```
### Test Reports
After each run:
- Test results are uploaded as artifacts (retained for 30 days)
- Detailed JUnit reports are published
- Logs are available for each step
## Configuration
### Environment Variables
Set in the workflow:
```yaml
env:
SEAWEEDFS_TEST_ENABLED: true
SEAWEEDFS_FILER_HOST: localhost
SEAWEEDFS_FILER_PORT: 8888
SEAWEEDFS_FILER_GRPC_PORT: 18888
```
### Timeout
- spark-tests job: 30 minutes max
- spark-example job: 20 minutes max
## Troubleshooting CI Failures
### SeaweedFS Connection Issues
**Symptom**: Tests fail with connection refused
**Check**:
1. View SeaweedFS logs in the workflow output
2. Look for "Display SeaweedFS logs on failure" step
3. Verify health check succeeded
**Solution**: The workflow already includes retry logic and health checks
### Test Failures
**Symptom**: Tests pass locally but fail in CI
**Check**:
1. Download test artifacts from the workflow run
2. Review detailed surefire reports
3. Check for timing issues or resource constraints
**Common Issues**:
- Docker startup timing (already handled with 30 retries)
- Network issues (retry logic included)
- Resource limits (CI has sufficient memory)
### Build Failures
**Symptom**: Maven build fails
**Check**:
1. Verify dependencies are available
2. Check Maven cache
3. Review build logs
### Example Application Failures
**Note**: This job is optional and only runs on push/manual trigger
**Check**:
1. Verify Spark was downloaded and cached correctly
2. Check spark-submit logs
3. Verify SeaweedFS output directory
## Manual Workflow Trigger
To manually run the workflow:
1. Go to **Actions** tab
2. Select **Spark Integration Tests**
3. Click **Run workflow** button
4. Select branch
5. Click **Run workflow**
This is useful for:
- Testing changes before pushing
- Re-running failed tests
- Testing with different configurations
## Local Testing Matching CI
To run tests locally that match the CI environment:
```bash
# Use the same Docker setup as CI
cd test/java/spark
docker-compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer
# Wait for services (same as CI)
for i in {1..30}; do
curl -f http://localhost:8888/ && break
sleep 2
done
# Run tests (same environment variables as CI)
export SEAWEEDFS_TEST_ENABLED=true
export SEAWEEDFS_FILER_HOST=localhost
export SEAWEEDFS_FILER_PORT=8888
export SEAWEEDFS_FILER_GRPC_PORT=18888
mvn test -B
# Cleanup
docker-compose down -v
```
## Maintenance
### Updating Spark Version
To update to a newer Spark version:
1. Update `pom.xml`: Change `<spark.version>`
2. Update workflow: Change Spark download URL
3. Test locally first
4. Create PR to test in CI
### Updating Java Version
1. Update `pom.xml`: Change `<maven.compiler.source>` and `<target>`
2. Update workflow: Change JDK version in `setup-java` steps
3. Test locally
4. Update README with new requirements
### Adding New Tests
New test classes are automatically discovered and run by the workflow.
Just ensure they:
- Extend `SparkTestBase`
- Use `skipIfTestsDisabled()`
- Are in the correct package
## CI Performance
### Typical Run Times
| Job | Duration | Can Fail Build? |
|-----|----------|-----------------|
| spark-tests | 5-10 min | Yes |
| spark-example | 5 min | No (optional) |
| summary | < 1 min | Only if tests fail |
### Optimizations
The workflow includes:
- ✓ Maven dependency caching
- ✓ Spark binary caching
- ✓ Parallel job execution
- ✓ Smart path filtering
- ✓ Docker layer caching
### Resource Usage
- Memory: ~4GB per job
- Disk: ~2GB (cached)
- Network: ~500MB (first run)
## Security Considerations
- No secrets required (tests use default ports)
- Runs in isolated Docker environment
- Clean up removes all test data
- No external services accessed
## Future Enhancements
Potential improvements:
- [ ] Matrix testing (multiple Spark versions)
- [ ] Performance benchmarking
- [ ] Code coverage reporting
- [ ] Integration with larger datasets
- [ ] Multi-node Spark cluster testing
## Support
If CI tests fail:
1. Check workflow logs in GitHub Actions
2. Download test artifacts for detailed reports
3. Try reproducing locally using the "Local Testing" section above
4. Review recent changes in the failing paths
5. Check SeaweedFS logs in the workflow output
For persistent issues:
- Open an issue with workflow run link
- Include test failure logs
- Note if it passes locally

76
test/java/spark/Makefile

@ -0,0 +1,76 @@
.PHONY: help build test test-local test-docker clean run-example docker-up docker-down
help:
@echo "SeaweedFS Spark Integration Tests"
@echo ""
@echo "Available targets:"
@echo " build - Build the project"
@echo " test - Run integration tests (requires SeaweedFS running)"
@echo " test-local - Run tests against local SeaweedFS"
@echo " test-docker - Run tests in Docker with SeaweedFS"
@echo " run-example - Run the example Spark application"
@echo " docker-up - Start SeaweedFS in Docker"
@echo " docker-down - Stop SeaweedFS Docker containers"
@echo " clean - Clean build artifacts"
build:
mvn clean package
test:
@if [ -z "$$SEAWEEDFS_TEST_ENABLED" ]; then \
echo "Setting SEAWEEDFS_TEST_ENABLED=true"; \
export SEAWEEDFS_TEST_ENABLED=true; \
fi
mvn test
test-local:
@echo "Testing against local SeaweedFS (localhost:8888)..."
./run-tests.sh
test-docker:
@echo "Running tests in Docker..."
docker-compose up --build --abort-on-container-exit spark-tests
docker-compose down
docker-up:
@echo "Starting SeaweedFS in Docker..."
docker-compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer
@echo "Waiting for services to be ready..."
@sleep 5
@echo "SeaweedFS is ready!"
@echo " Master: http://localhost:9333"
@echo " Filer: http://localhost:8888"
docker-down:
@echo "Stopping SeaweedFS Docker containers..."
docker-compose down -v
run-example:
@echo "Running example application..."
@if ! command -v spark-submit > /dev/null; then \
echo "Error: spark-submit not found. Please install Apache Spark."; \
exit 1; \
fi
spark-submit \
--class seaweed.spark.SparkSeaweedFSExample \
--master local[2] \
--conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \
--conf spark.hadoop.fs.seaweed.filer.host=localhost \
--conf spark.hadoop.fs.seaweed.filer.port=8888 \
--conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \
--conf spark.hadoop.fs.seaweed.replication="" \
target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \
seaweedfs://localhost:8888/spark-example-output
clean:
mvn clean
@echo "Build artifacts cleaned"
verify-seaweedfs:
@echo "Verifying SeaweedFS connection..."
@curl -f http://localhost:8888/ > /dev/null 2>&1 && \
echo "✓ SeaweedFS filer is accessible" || \
(echo "✗ SeaweedFS filer is not accessible at http://localhost:8888"; exit 1)
.DEFAULT_GOAL := help

361
test/java/spark/README.md

@ -0,0 +1,361 @@
# SeaweedFS Spark Integration Tests
Comprehensive integration tests for Apache Spark with SeaweedFS HDFS client.
## Overview
This test suite validates that Apache Spark works correctly with SeaweedFS as the storage backend, covering:
- **Data I/O**: Reading and writing data in various formats (Parquet, CSV, JSON)
- **Spark SQL**: Complex SQL queries, joins, aggregations, and window functions
- **Partitioning**: Partitioned writes and partition pruning
- **Performance**: Large dataset operations
## Prerequisites
### 1. Running SeaweedFS
Start SeaweedFS with default ports:
```bash
# Terminal 1: Start master
weed master
# Terminal 2: Start volume server
weed volume -mserver=localhost:9333
# Terminal 3: Start filer
weed filer -master=localhost:9333
```
Verify services are running:
- Master: http://localhost:9333
- Filer HTTP: http://localhost:8888
- Filer gRPC: localhost:18888
### 2. Java and Maven
- Java 8 or higher
- Maven 3.6 or higher
### 3. Apache Spark (for standalone execution)
Download and extract Apache Spark 3.5.0:
```bash
wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
tar xzf spark-3.5.0-bin-hadoop3.tgz
export SPARK_HOME=$(pwd)/spark-3.5.0-bin-hadoop3
export PATH=$SPARK_HOME/bin:$PATH
```
## Building
```bash
mvn clean package
```
This creates:
- Test JAR: `target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar`
- Fat JAR (with dependencies): `target/original-seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar`
## Running Integration Tests
### Quick Test
Run all integration tests (requires running SeaweedFS):
```bash
# Enable integration tests
export SEAWEEDFS_TEST_ENABLED=true
# Run all tests
mvn test
```
### Run Specific Test
```bash
export SEAWEEDFS_TEST_ENABLED=true
# Run only read/write tests
mvn test -Dtest=SparkReadWriteTest
# Run only SQL tests
mvn test -Dtest=SparkSQLTest
```
### Custom SeaweedFS Configuration
If your SeaweedFS is running on a different host or port:
```bash
export SEAWEEDFS_TEST_ENABLED=true
export SEAWEEDFS_FILER_HOST=my-seaweedfs-host
export SEAWEEDFS_FILER_PORT=8888
export SEAWEEDFS_FILER_GRPC_PORT=18888
mvn test
```
### Skip Tests
By default, tests are skipped if `SEAWEEDFS_TEST_ENABLED` is not set:
```bash
mvn test # Tests will be skipped with message
```
## Running the Example Application
### Local Mode
Run the example application in Spark local mode:
```bash
spark-submit \
--class seaweed.spark.SparkSeaweedFSExample \
--master local[2] \
--conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \
--conf spark.hadoop.fs.seaweed.filer.host=localhost \
--conf spark.hadoop.fs.seaweed.filer.port=8888 \
--conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \
--conf spark.hadoop.fs.seaweed.replication="" \
target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \
seaweedfs://localhost:8888/spark-example-output
```
### Cluster Mode
For production Spark clusters:
```bash
spark-submit \
--class seaweed.spark.SparkSeaweedFSExample \
--master spark://master-host:7077 \
--deploy-mode cluster \
--conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \
--conf spark.hadoop.fs.seaweed.filer.host=seaweedfs-filer \
--conf spark.hadoop.fs.seaweed.filer.port=8888 \
--conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \
--conf spark.hadoop.fs.seaweed.replication=001 \
--conf spark.executor.instances=4 \
--conf spark.executor.memory=4g \
--conf spark.executor.cores=2 \
target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \
seaweedfs://seaweedfs-filer:8888/spark-output
```
## Configuration
### SeaweedFS Configuration Options
Configure Spark to use SeaweedFS through Hadoop configuration:
| Property | Description | Default | Example |
|----------|-------------|---------|---------|
| `spark.hadoop.fs.seaweedfs.impl` | FileSystem implementation class | - | `seaweed.hdfs.SeaweedFileSystem` |
| `spark.hadoop.fs.seaweed.filer.host` | SeaweedFS filer hostname | `localhost` | `seaweedfs-filer` |
| `spark.hadoop.fs.seaweed.filer.port` | SeaweedFS filer HTTP port | `8888` | `8888` |
| `spark.hadoop.fs.seaweed.filer.port.grpc` | SeaweedFS filer gRPC port | `18888` | `18888` |
| `spark.hadoop.fs.seaweed.replication` | Replication strategy | (uses HDFS default) | `001`, `""` (filer default) |
| `spark.hadoop.fs.seaweed.buffer.size` | Buffer size for I/O | `4MB` | `8388608` |
### Replication Configuration Priority
1. **Non-empty value** (e.g., `001`) - uses that specific replication
2. **Empty string** (`""`) - uses SeaweedFS filer's default replication
3. **Not configured** - uses Hadoop/Spark's replication parameter
## Test Coverage
### SparkReadWriteTest
- ✓ Write and read Parquet files
- ✓ Write and read CSV files with headers
- ✓ Write and read JSON files
- ✓ Partitioned data writes with partition pruning
- ✓ Append mode operations
- ✓ Large dataset handling (10,000+ rows)
### SparkSQLTest
- ✓ Create tables and run SELECT queries
- ✓ Aggregation queries (GROUP BY, SUM, AVG)
- ✓ JOIN operations between datasets
- ✓ Window functions (RANK, PARTITION BY)
## Continuous Integration
### GitHub Actions
A GitHub Actions workflow is configured at `.github/workflows/spark-integration-tests.yml` that automatically:
- Runs on push/PR to `master`/`main` when Spark or HDFS code changes
- Starts SeaweedFS in Docker
- Runs all integration tests
- Runs the example application
- Uploads test reports
- Can be triggered manually via workflow_dispatch
The workflow includes two jobs:
1. **spark-tests**: Runs all integration tests (10 tests)
2. **spark-example**: Runs the example Spark application
View the workflow status in the GitHub Actions tab of the repository.
### CI-Friendly Test Execution
```bash
# In CI environment
./scripts/start-seaweedfs.sh # Start SeaweedFS in background
export SEAWEEDFS_TEST_ENABLED=true
mvn clean test
./scripts/stop-seaweedfs.sh # Cleanup
```
### Docker-Based Testing
Use docker-compose for isolated testing:
```bash
docker-compose up -d seaweedfs
export SEAWEEDFS_TEST_ENABLED=true
mvn test
docker-compose down
```
## Troubleshooting
### Tests are Skipped
**Symptom**: Tests show "Skipping test - SEAWEEDFS_TEST_ENABLED not set"
**Solution**:
```bash
export SEAWEEDFS_TEST_ENABLED=true
mvn test
```
### Connection Refused Errors
**Symptom**: `java.net.ConnectException: Connection refused`
**Solution**:
1. Verify SeaweedFS is running:
```bash
curl http://localhost:8888/
```
2. Check if ports are accessible:
```bash
netstat -an | grep 8888
netstat -an | grep 18888
```
### ClassNotFoundException: seaweed.hdfs.SeaweedFileSystem
**Symptom**: Spark cannot find the SeaweedFS FileSystem implementation
**Solution**:
1. Ensure the SeaweedFS HDFS client is in your classpath
2. For spark-submit, add the JAR:
```bash
spark-submit --jars /path/to/seaweedfs-hadoop3-client-*.jar ...
```
### Out of Memory Errors
**Symptom**: `java.lang.OutOfMemoryError: Java heap space`
**Solution**:
```bash
mvn test -DargLine="-Xmx4g"
```
For spark-submit:
```bash
spark-submit --driver-memory 4g --executor-memory 4g ...
```
### gRPC Version Conflicts
**Symptom**: `java.lang.NoSuchMethodError` related to gRPC
**Solution**: Ensure consistent gRPC versions. The project uses Spark 3.5.0 compatible versions.
## Performance Tips
1. **Increase buffer size** for large files:
```bash
--conf spark.hadoop.fs.seaweed.buffer.size=8388608
```
2. **Use appropriate replication** based on your cluster:
```bash
--conf spark.hadoop.fs.seaweed.replication=001
```
3. **Enable partition pruning** by partitioning data on commonly filtered columns
4. **Use columnar formats** (Parquet) for better performance
## Additional Examples
### PySpark with SeaweedFS
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("PySparkSeaweedFS") \
.config("spark.hadoop.fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem") \
.config("spark.hadoop.fs.seaweed.filer.host", "localhost") \
.config("spark.hadoop.fs.seaweed.filer.port", "8888") \
.config("spark.hadoop.fs.seaweed.filer.port.grpc", "18888") \
.getOrCreate()
# Write data
df = spark.range(1000)
df.write.parquet("seaweedfs://localhost:8888/pyspark-output")
# Read data
df_read = spark.read.parquet("seaweedfs://localhost:8888/pyspark-output")
df_read.show()
```
### Scala with SeaweedFS
```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("ScalaSeaweedFS")
.config("spark.hadoop.fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem")
.config("spark.hadoop.fs.seaweed.filer.host", "localhost")
.config("spark.hadoop.fs.seaweed.filer.port", "8888")
.config("spark.hadoop.fs.seaweed.filer.port.grpc", "18888")
.getOrCreate()
// Write data
val df = spark.range(1000)
df.write.parquet("seaweedfs://localhost:8888/scala-output")
// Read data
val dfRead = spark.read.parquet("seaweedfs://localhost:8888/scala-output")
dfRead.show()
```
## Contributing
When adding new tests:
1. Extend `SparkTestBase` for new test classes
2. Use `skipIfTestsDisabled()` in test methods
3. Clean up test data in tearDown
4. Add documentation to this README
5. Ensure tests work in CI environment
## License
Same as SeaweedFS project.

72
test/java/spark/docker-compose.yml

@ -0,0 +1,72 @@
version: '3.8'
services:
seaweedfs-master:
image: seaweedfs:test-grpc-v1.77.0
container_name: seaweedfs-spark-master
ports:
- "9333:9333"
- "19333:19333"
command: "master -ip=seaweedfs-master -ip.bind=0.0.0.0"
networks:
- seaweedfs-spark
seaweedfs-volume:
image: seaweedfs:test-grpc-v1.77.0
container_name: seaweedfs-spark-volume
ports:
- "8080:8080"
- "18080:18080"
command: "volume -mserver=seaweedfs-master:9333 -ip.bind=0.0.0.0 -port=8080"
depends_on:
- seaweedfs-master
networks:
- seaweedfs-spark
seaweedfs-filer:
image: seaweedfs:test-grpc-v1.77.0
container_name: seaweedfs-spark-filer
ports:
- "8888:8888"
- "18888:18888"
command: ["filer", "-master=seaweedfs-master:9333", "-ip.bind=0.0.0.0", "-port=8888", "-port.grpc=18888"]
depends_on:
- seaweedfs-master
- seaweedfs-volume
networks:
- seaweedfs-spark
environment:
- GRPC_GO_LOG_VERBOSITY_LEVEL=99
- GRPC_GO_LOG_SEVERITY_LEVEL=info
- GLOG_v=4
- GLOG_logtostderr=true
- GODEBUG=http2debug=2
healthcheck:
test: ["CMD", "wget", "--spider", "-q", "http://localhost:8888/"]
interval: 5s
timeout: 3s
retries: 10
start_period: 10s
spark-tests:
image: maven:3.9-eclipse-temurin-17
container_name: seaweedfs-spark-tests
network_mode: "host"
volumes:
- .:/workspace
- ~/.m2:/root/.m2
working_dir: /workspace
environment:
- SEAWEEDFS_TEST_ENABLED=true
- SEAWEEDFS_FILER_HOST=localhost
- SEAWEEDFS_FILER_PORT=8888
- SEAWEEDFS_FILER_GRPC_PORT=18888
- HADOOP_HOME=/tmp
command: sh -c "sleep 30 && mvn test"
mem_limit: 4g
cpus: 2
networks:
seaweedfs-spark:
driver: bridge

159
test/java/spark/pom.xml

@ -0,0 +1,159 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.seaweedfs</groupId>
<artifactId>seaweedfs-spark-integration-tests</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>SeaweedFS Spark Integration Tests</name>
<description>Integration tests for Apache Spark with SeaweedFS HDFS client</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>3.5.0</spark.version>
<hadoop.version>3.3.6</hadoop.version>
<scala.binary.version>2.12</scala.binary.version>
<junit.version>4.13.2</junit.version>
</properties>
<dependencies>
<!-- Spark Core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- Spark SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- Hadoop Client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<!-- SeaweedFS Hadoop3 Client -->
<dependency>
<groupId>com.seaweedfs</groupId>
<artifactId>seaweedfs-hadoop3-client</artifactId>
<version>3.80</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.36</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<skipTests>${skipTests}</skipTests>
<includes>
<include>**/*Test.java</include>
</includes>
<argLine>
-Xmx2g
-Dhadoop.home.dir=/tmp
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED
</argLine>
<environmentVariables>
<HADOOP_HOME>/tmp</HADOOP_HOME>
</environmentVariables>
</configuration>
</plugin>
<!-- Shade plugin to create fat jar for Spark submit -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>seaweed.spark.SparkSeaweedFSExample</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

147
test/java/spark/quick-start.sh

@ -0,0 +1,147 @@
#!/bin/bash
set -e
echo "=== SeaweedFS Spark Integration Tests Quick Start ==="
echo ""
# Check if SeaweedFS is running
check_seaweedfs() {
echo "Checking if SeaweedFS is running..."
if curl -f http://localhost:8888/ > /dev/null 2>&1; then
echo "✓ SeaweedFS filer is accessible at http://localhost:8888"
return 0
else
echo "✗ SeaweedFS filer is not accessible"
return 1
fi
}
# Start SeaweedFS with Docker if not running
start_seaweedfs() {
echo ""
echo "Starting SeaweedFS with Docker..."
docker-compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer
echo "Waiting for SeaweedFS to be ready..."
for i in {1..30}; do
if curl -f http://localhost:8888/ > /dev/null 2>&1; then
echo "✓ SeaweedFS is ready!"
return 0
fi
echo -n "."
sleep 2
done
echo ""
echo "✗ SeaweedFS failed to start"
return 1
}
# Build the project
build_project() {
echo ""
echo "Building the project..."
mvn clean package -DskipTests
echo "✓ Build completed"
}
# Run tests
run_tests() {
echo ""
echo "Running integration tests..."
export SEAWEEDFS_TEST_ENABLED=true
mvn test
echo "✓ Tests completed"
}
# Run example
run_example() {
echo ""
echo "Running example application..."
if ! command -v spark-submit > /dev/null; then
echo "⚠ spark-submit not found. Skipping example application."
echo "To run the example, install Apache Spark and try: make run-example"
return 0
fi
spark-submit \
--class seaweed.spark.SparkSeaweedFSExample \
--master local[2] \
--conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \
--conf spark.hadoop.fs.seaweed.filer.host=localhost \
--conf spark.hadoop.fs.seaweed.filer.port=8888 \
--conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \
target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \
seaweedfs://localhost:8888/spark-quickstart-output
echo "✓ Example completed"
}
# Cleanup
cleanup() {
echo ""
echo "Cleaning up..."
docker-compose down -v
echo "✓ Cleanup completed"
}
# Main execution
main() {
# Check if Docker is available
if ! command -v docker > /dev/null; then
echo "Error: Docker is not installed or not in PATH"
exit 1
fi
# Check if Maven is available
if ! command -v mvn > /dev/null; then
echo "Error: Maven is not installed or not in PATH"
exit 1
fi
# Check if SeaweedFS is running, if not start it
if ! check_seaweedfs; then
read -p "Do you want to start SeaweedFS with Docker? (y/n) " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
start_seaweedfs || exit 1
else
echo "Please start SeaweedFS manually and rerun this script."
exit 1
fi
fi
# Build project
build_project || exit 1
# Run tests
run_tests || exit 1
# Run example if Spark is available
run_example
echo ""
echo "=== Quick Start Completed Successfully! ==="
echo ""
echo "Next steps:"
echo " - View test results in target/surefire-reports/"
echo " - Check example output at http://localhost:8888/"
echo " - Run 'make help' for more options"
echo " - Read README.md for detailed documentation"
echo ""
read -p "Do you want to stop SeaweedFS? (y/n) " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
cleanup
fi
}
# Handle Ctrl+C
trap cleanup INT
# Run main
main

44
test/java/spark/run-tests.sh

@ -0,0 +1,44 @@
#!/bin/bash
set -e
echo "=== SeaweedFS Spark Integration Tests Runner ==="
echo ""
# Check if SeaweedFS is running
check_seaweedfs() {
if curl -f http://localhost:8888/ > /dev/null 2>&1; then
echo "✓ SeaweedFS filer is accessible at http://localhost:8888"
return 0
else
echo "✗ SeaweedFS filer is not accessible"
return 1
fi
}
# Main
if ! check_seaweedfs; then
echo ""
echo "Please start SeaweedFS first. You can use:"
echo " cd test/java/spark && docker-compose up -d"
echo "Or:"
echo " make docker-up"
exit 1
fi
echo ""
echo "Running Spark integration tests..."
echo ""
export SEAWEEDFS_TEST_ENABLED=true
export SEAWEEDFS_FILER_HOST=localhost
export SEAWEEDFS_FILER_PORT=8888
export SEAWEEDFS_FILER_GRPC_PORT=18888
# Run tests
mvn test "$@"
echo ""
echo "✓ Test run completed"
echo "View detailed reports in: target/surefire-reports/"

143
test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java

@ -0,0 +1,143 @@
package seaweed.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
/**
* Example Spark application demonstrating SeaweedFS integration.
*
* This can be submitted to a Spark cluster using spark-submit.
*
* Example usage:
* spark-submit \
* --class seaweed.spark.SparkSeaweedFSExample \
* --master local[2] \
* --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \
* --conf spark.hadoop.fs.seaweed.filer.host=localhost \
* --conf spark.hadoop.fs.seaweed.filer.port=8888 \
* --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \
* target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \
* seaweedfs://localhost:8888/output
*/
public class SparkSeaweedFSExample {
public static void main(String[] args) {
if (args.length < 1) {
System.err.println("Usage: SparkSeaweedFSExample <output-path>");
System.err.println("Example: seaweedfs://localhost:8888/spark-output");
System.exit(1);
}
String outputPath = args[0];
// Create Spark session
SparkSession spark = SparkSession.builder()
.appName("SeaweedFS Spark Example")
.getOrCreate();
try {
System.out.println("=== SeaweedFS Spark Integration Example ===\n");
// Example 1: Generate data and write to SeaweedFS
System.out.println("1. Generating sample data...");
Dataset<Row> data = spark.range(0, 1000)
.selectExpr(
"id",
"id * 2 as doubled",
"CAST(rand() * 100 AS INT) as random_value"
);
System.out.println(" Generated " + data.count() + " rows");
data.show(5);
// Write as Parquet
String parquetPath = outputPath + "/data.parquet";
System.out.println("\n2. Writing data to SeaweedFS as Parquet...");
System.out.println(" Path: " + parquetPath);
data.write()
.mode(SaveMode.Overwrite)
.parquet(parquetPath);
System.out.println(" ✓ Write completed");
// Read back and verify
System.out.println("\n3. Reading data back from SeaweedFS...");
Dataset<Row> readData = spark.read().parquet(parquetPath);
System.out.println(" Read " + readData.count() + " rows");
// Perform aggregation
System.out.println("\n4. Performing aggregation...");
Dataset<Row> stats = readData.selectExpr(
"COUNT(*) as count",
"AVG(random_value) as avg_random",
"MAX(doubled) as max_doubled"
);
stats.show();
// Write aggregation results
String statsPath = outputPath + "/stats.parquet";
System.out.println("5. Writing stats to: " + statsPath);
stats.write()
.mode(SaveMode.Overwrite)
.parquet(statsPath);
// Create a partitioned dataset
System.out.println("\n6. Creating partitioned dataset...");
Dataset<Row> partitionedData = data.selectExpr(
"*",
"CAST(id % 10 AS INT) as partition_key"
);
String partitionedPath = outputPath + "/partitioned.parquet";
System.out.println(" Path: " + partitionedPath);
partitionedData.write()
.mode(SaveMode.Overwrite)
.partitionBy("partition_key")
.parquet(partitionedPath);
System.out.println(" ✓ Partitioned write completed");
// Read specific partition
System.out.println("\n7. Reading specific partition (partition_key=0)...");
Dataset<Row> partition0 = spark.read()
.parquet(partitionedPath)
.filter("partition_key = 0");
System.out.println(" Partition 0 contains " + partition0.count() + " rows");
partition0.show(5);
// SQL example
System.out.println("\n8. Using Spark SQL...");
readData.createOrReplaceTempView("seaweedfs_data");
Dataset<Row> sqlResult = spark.sql(
"SELECT " +
" CAST(id / 100 AS INT) as bucket, " +
" COUNT(*) as count, " +
" AVG(random_value) as avg_random " +
"FROM seaweedfs_data " +
"GROUP BY CAST(id / 100 AS INT) " +
"ORDER BY bucket"
);
System.out.println(" Bucketed statistics:");
sqlResult.show();
System.out.println("\n=== Example completed successfully! ===");
System.out.println("Output location: " + outputPath);
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
e.printStackTrace();
System.exit(1);
} finally {
spark.stop();
}
}
}

216
test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java

@ -0,0 +1,216 @@
package seaweed.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
/**
* Integration tests for Spark read/write operations with SeaweedFS.
*/
public class SparkReadWriteTest extends SparkTestBase {
@Test
public void testWriteAndReadParquet() {
skipIfTestsDisabled();
// Create test data
List<Person> people = Arrays.asList(
new Person("Alice", 30),
new Person("Bob", 25),
new Person("Charlie", 35)
);
Dataset<Row> df = spark.createDataFrame(people, Person.class);
// Write to SeaweedFS
String outputPath = getTestPath("people.parquet");
df.write().mode(SaveMode.Overwrite).parquet(outputPath);
// Read back from SeaweedFS
Dataset<Row> readDf = spark.read().parquet(outputPath);
// Verify
assertEquals(3, readDf.count());
assertEquals(2, readDf.columns().length);
List<Row> results = readDf.collectAsList();
assertTrue(results.stream().anyMatch(r -> "Alice".equals(r.getAs("name")) && (Integer)r.getAs("age") == 30));
assertTrue(results.stream().anyMatch(r -> "Bob".equals(r.getAs("name")) && (Integer)r.getAs("age") == 25));
assertTrue(results.stream().anyMatch(r -> "Charlie".equals(r.getAs("name")) && (Integer)r.getAs("age") == 35));
}
@Test
public void testWriteAndReadCSV() {
skipIfTestsDisabled();
// Create test data
List<Person> people = Arrays.asList(
new Person("Alice", 30),
new Person("Bob", 25)
);
Dataset<Row> df = spark.createDataFrame(people, Person.class);
// Write to SeaweedFS as CSV
String outputPath = getTestPath("people.csv");
df.write().mode(SaveMode.Overwrite).option("header", "true").csv(outputPath);
// Read back from SeaweedFS
Dataset<Row> readDf = spark.read().option("header", "true").option("inferSchema", "true").csv(outputPath);
// Verify
assertEquals(2, readDf.count());
assertEquals(2, readDf.columns().length);
}
@Test
public void testWriteAndReadJSON() {
skipIfTestsDisabled();
// Create test data
List<Person> people = Arrays.asList(
new Person("Alice", 30),
new Person("Bob", 25),
new Person("Charlie", 35)
);
Dataset<Row> df = spark.createDataFrame(people, Person.class);
// Write to SeaweedFS as JSON
String outputPath = getTestPath("people.json");
df.write().mode(SaveMode.Overwrite).json(outputPath);
// Read back from SeaweedFS
Dataset<Row> readDf = spark.read().json(outputPath);
// Verify
assertEquals(3, readDf.count());
assertEquals(2, readDf.columns().length);
}
@Test
public void testWritePartitionedData() {
skipIfTestsDisabled();
// Create test data with multiple years
List<PersonWithYear> people = Arrays.asList(
new PersonWithYear("Alice", 30, 2020),
new PersonWithYear("Bob", 25, 2021),
new PersonWithYear("Charlie", 35, 2020),
new PersonWithYear("David", 28, 2021)
);
Dataset<Row> df = spark.createDataFrame(people, PersonWithYear.class);
// Write partitioned data to SeaweedFS
String outputPath = getTestPath("people_partitioned");
df.write().mode(SaveMode.Overwrite).partitionBy("year").parquet(outputPath);
// Read back from SeaweedFS
Dataset<Row> readDf = spark.read().parquet(outputPath);
// Verify
assertEquals(4, readDf.count());
// Verify partition filtering works
Dataset<Row> filtered2020 = readDf.filter("year = 2020");
assertEquals(2, filtered2020.count());
Dataset<Row> filtered2021 = readDf.filter("year = 2021");
assertEquals(2, filtered2021.count());
}
@Test
public void testAppendMode() {
skipIfTestsDisabled();
String outputPath = getTestPath("people_append.parquet");
// Write first batch
List<Person> batch1 = Arrays.asList(
new Person("Alice", 30),
new Person("Bob", 25)
);
Dataset<Row> df1 = spark.createDataFrame(batch1, Person.class);
df1.write().mode(SaveMode.Overwrite).parquet(outputPath);
// Append second batch
List<Person> batch2 = Arrays.asList(
new Person("Charlie", 35),
new Person("David", 28)
);
Dataset<Row> df2 = spark.createDataFrame(batch2, Person.class);
df2.write().mode(SaveMode.Append).parquet(outputPath);
// Read back and verify
Dataset<Row> readDf = spark.read().parquet(outputPath);
assertEquals(4, readDf.count());
}
@Test
public void testLargeDataset() {
skipIfTestsDisabled();
// Create a larger dataset
Dataset<Row> largeDf = spark.range(0, 10000)
.selectExpr("id as value", "id * 2 as doubled");
String outputPath = getTestPath("large_dataset.parquet");
largeDf.write().mode(SaveMode.Overwrite).parquet(outputPath);
// Read back and verify
Dataset<Row> readDf = spark.read().parquet(outputPath);
assertEquals(10000, readDf.count());
// Verify some data
Row firstRow = readDf.first();
assertEquals(0L, firstRow.getLong(0));
assertEquals(0L, firstRow.getLong(1));
}
// Test data classes
public static class Person implements java.io.Serializable {
private String name;
private int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
}
public static class PersonWithYear implements java.io.Serializable {
private String name;
private int age;
private int year;
public PersonWithYear() {}
public PersonWithYear(String name, int age, int year) {
this.name = name;
this.age = age;
this.year = year;
}
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
public int getYear() { return year; }
public void setYear(int year) { this.year = year; }
}
}

236
test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java

@ -0,0 +1,236 @@
package seaweed.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
/**
* Integration tests for Spark SQL operations with SeaweedFS.
*/
public class SparkSQLTest extends SparkTestBase {
@Test
public void testCreateTableAndQuery() {
skipIfTestsDisabled();
// Create test data
List<Employee> employees = Arrays.asList(
new Employee(1, "Alice", "Engineering", 100000),
new Employee(2, "Bob", "Sales", 80000),
new Employee(3, "Charlie", "Engineering", 120000),
new Employee(4, "David", "Sales", 75000)
);
Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
// Write to SeaweedFS
String tablePath = getTestPath("employees");
df.write().mode(SaveMode.Overwrite).parquet(tablePath);
// Create temporary view
Dataset<Row> employeesDf = spark.read().parquet(tablePath);
employeesDf.createOrReplaceTempView("employees");
// Run SQL queries
Dataset<Row> engineeringEmployees = spark.sql(
"SELECT name, salary FROM employees WHERE department = 'Engineering'"
);
assertEquals(2, engineeringEmployees.count());
Dataset<Row> highPaidEmployees = spark.sql(
"SELECT name, salary FROM employees WHERE salary > 90000"
);
assertEquals(2, highPaidEmployees.count());
}
@Test
public void testAggregationQueries() {
skipIfTestsDisabled();
// Create sales data
List<Sale> sales = Arrays.asList(
new Sale("2024-01", "Product A", 100),
new Sale("2024-01", "Product B", 150),
new Sale("2024-02", "Product A", 120),
new Sale("2024-02", "Product B", 180),
new Sale("2024-03", "Product A", 110)
);
Dataset<Row> df = spark.createDataFrame(sales, Sale.class);
// Write to SeaweedFS
String tablePath = getTestPath("sales");
df.write().mode(SaveMode.Overwrite).parquet(tablePath);
// Create temporary view
Dataset<Row> salesDf = spark.read().parquet(tablePath);
salesDf.createOrReplaceTempView("sales");
// Aggregate query
Dataset<Row> monthlySales = spark.sql(
"SELECT month, SUM(amount) as total FROM sales GROUP BY month ORDER BY month"
);
List<Row> results = monthlySales.collectAsList();
assertEquals(3, results.size());
assertEquals("2024-01", results.get(0).getString(0));
assertEquals(250, results.get(0).getLong(1));
}
@Test
public void testJoinOperations() {
skipIfTestsDisabled();
// Create employee data
List<Employee> employees = Arrays.asList(
new Employee(1, "Alice", "Engineering", 100000),
new Employee(2, "Bob", "Sales", 80000)
);
// Create department data
List<Department> departments = Arrays.asList(
new Department("Engineering", "Building Products"),
new Department("Sales", "Selling Products")
);
Dataset<Row> empDf = spark.createDataFrame(employees, Employee.class);
Dataset<Row> deptDf = spark.createDataFrame(departments, Department.class);
// Write to SeaweedFS
String empPath = getTestPath("employees_join");
String deptPath = getTestPath("departments_join");
empDf.write().mode(SaveMode.Overwrite).parquet(empPath);
deptDf.write().mode(SaveMode.Overwrite).parquet(deptPath);
// Read back and create views
spark.read().parquet(empPath).createOrReplaceTempView("emp");
spark.read().parquet(deptPath).createOrReplaceTempView("dept");
// Join query
Dataset<Row> joined = spark.sql(
"SELECT e.name, e.salary, d.description " +
"FROM emp e JOIN dept d ON e.department = d.name"
);
assertEquals(2, joined.count());
List<Row> results = joined.collectAsList();
assertTrue(results.stream().anyMatch(r ->
"Alice".equals(r.getString(0)) && "Building Products".equals(r.getString(2))
));
}
@Test
public void testWindowFunctions() {
skipIfTestsDisabled();
// Create employee data with salaries
List<Employee> employees = Arrays.asList(
new Employee(1, "Alice", "Engineering", 100000),
new Employee(2, "Bob", "Engineering", 120000),
new Employee(3, "Charlie", "Sales", 80000),
new Employee(4, "David", "Sales", 90000)
);
Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
String tablePath = getTestPath("employees_window");
df.write().mode(SaveMode.Overwrite).parquet(tablePath);
Dataset<Row> employeesDf = spark.read().parquet(tablePath);
employeesDf.createOrReplaceTempView("employees_ranked");
// Window function query - rank employees by salary within department
Dataset<Row> ranked = spark.sql(
"SELECT name, department, salary, " +
"RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank " +
"FROM employees_ranked"
);
assertEquals(4, ranked.count());
// Verify Bob has rank 1 in Engineering (highest salary)
List<Row> results = ranked.collectAsList();
Row bobRow = results.stream()
.filter(r -> "Bob".equals(r.getString(0)))
.findFirst()
.orElse(null);
assertNotNull(bobRow);
assertEquals(1, bobRow.getInt(3));
}
// Test data classes
public static class Employee implements java.io.Serializable {
private int id;
private String name;
private String department;
private int salary;
public Employee() {}
public Employee(int id, String name, String department, int salary) {
this.id = id;
this.name = name;
this.department = department;
this.salary = salary;
}
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDepartment() { return department; }
public void setDepartment(String department) { this.department = department; }
public int getSalary() { return salary; }
public void setSalary(int salary) { this.salary = salary; }
}
public static class Sale implements java.io.Serializable {
private String month;
private String product;
private int amount;
public Sale() {}
public Sale(String month, String product, int amount) {
this.month = month;
this.product = product;
this.amount = amount;
}
public String getMonth() { return month; }
public void setMonth(String month) { this.month = month; }
public String getProduct() { return product; }
public void setProduct(String product) { this.product = product; }
public int getAmount() { return amount; }
public void setAmount(int amount) { this.amount = amount; }
}
public static class Department implements java.io.Serializable {
private String name;
private String description;
public Department() {}
public Department(String name, String description) {
this.name = name;
this.description = description;
}
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
}
}

126
test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java

@ -0,0 +1,126 @@
package seaweed.spark;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
/**
* Base class for Spark integration tests with SeaweedFS.
*
* These tests require a running SeaweedFS cluster.
* Set environment variable SEAWEEDFS_TEST_ENABLED=true to enable these tests.
*/
public abstract class SparkTestBase {
protected SparkSession spark;
protected static final String TEST_ROOT = "/test-spark";
protected static final boolean TESTS_ENABLED =
"true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
// SeaweedFS connection settings
protected static final String SEAWEEDFS_HOST =
System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost");
protected static final String SEAWEEDFS_PORT =
System.getenv().getOrDefault("SEAWEEDFS_FILER_PORT", "8888");
protected static final String SEAWEEDFS_GRPC_PORT =
System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888");
@Before
public void setUpSpark() throws IOException {
if (!TESTS_ENABLED) {
return;
}
SparkConf sparkConf = new SparkConf()
.setAppName("SeaweedFS Integration Test")
.setMaster("local[1]") // Single thread to avoid concurrent gRPC issues
.set("spark.driver.host", "localhost")
.set("spark.sql.warehouse.dir", getSeaweedFSPath("/spark-warehouse"))
// SeaweedFS configuration
.set("spark.hadoop.fs.defaultFS", String.format("seaweedfs://%s:%s", SEAWEEDFS_HOST, SEAWEEDFS_PORT))
.set("spark.hadoop.fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem")
.set("spark.hadoop.fs.seaweed.impl", "seaweed.hdfs.SeaweedFileSystem")
.set("spark.hadoop.fs.seaweed.filer.host", SEAWEEDFS_HOST)
.set("spark.hadoop.fs.seaweed.filer.port", SEAWEEDFS_PORT)
.set("spark.hadoop.fs.seaweed.filer.port.grpc", SEAWEEDFS_GRPC_PORT)
.set("spark.hadoop.fs.AbstractFileSystem.seaweedfs.impl", "seaweed.hdfs.SeaweedAbstractFileSystem")
// Set replication to empty string to use filer default
.set("spark.hadoop.fs.seaweed.replication", "")
// Smaller buffer to reduce load
.set("spark.hadoop.fs.seaweed.buffer.size", "1048576") // 1MB
// Reduce parallelism
.set("spark.default.parallelism", "1")
.set("spark.sql.shuffle.partitions", "1")
// Simpler output committer
.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
// Disable speculative execution to reduce load
.set("spark.speculation", "false");
spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate();
// Clean up test directory
cleanupTestDirectory();
}
@After
public void tearDownSpark() {
if (!TESTS_ENABLED || spark == null) {
return;
}
try {
// Try to cleanup but don't fail if it doesn't work
cleanupTestDirectory();
} catch (Exception e) {
System.err.println("Cleanup failed: " + e.getMessage());
} finally {
try {
spark.stop();
} catch (Exception e) {
System.err.println("Spark stop failed: " + e.getMessage());
}
spark = null;
}
}
protected String getSeaweedFSPath(String path) {
return String.format("seaweedfs://%s:%s%s", SEAWEEDFS_HOST, SEAWEEDFS_PORT, path);
}
protected String getTestPath(String subPath) {
return getSeaweedFSPath(TEST_ROOT + "/" + subPath);
}
private void cleanupTestDirectory() {
if (spark != null) {
try {
Configuration conf = spark.sparkContext().hadoopConfiguration();
org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get(
java.net.URI.create(getSeaweedFSPath("/")), conf);
org.apache.hadoop.fs.Path testPath = new org.apache.hadoop.fs.Path(TEST_ROOT);
if (fs.exists(testPath)) {
fs.delete(testPath, true);
}
} catch (Exception e) {
// Suppress cleanup errors - they shouldn't fail tests
// Common in distributed systems with eventual consistency
System.err.println("Warning: cleanup failed (non-critical): " + e.getMessage());
}
}
}
protected void skipIfTestsDisabled() {
if (!TESTS_ENABLED) {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
org.junit.Assume.assumeTrue("SEAWEEDFS_TEST_ENABLED not set", false);
}
}
}

18
test/java/spark/src/test/resources/log4j.properties

@ -0,0 +1,18 @@
# Set root logger level
log4j.rootLogger=WARN, console
# Console appender
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set log levels for specific packages
log4j.logger.org.apache.spark=WARN
log4j.logger.org.apache.hadoop=WARN
log4j.logger.seaweed=INFO
# Suppress unnecessary warnings
log4j.logger.org.apache.spark.util.Utils=ERROR
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR

40
test/java/spark/test-one.sh

@ -0,0 +1,40 @@
#!/bin/bash
# Run a single test method for quick iteration
set -e
if [ $# -eq 0 ]; then
echo "Usage: ./test-one.sh <TestClass>#<methodName>"
echo ""
echo "Examples:"
echo " ./test-one.sh SparkReadWriteTest#testWriteAndReadParquet"
echo " ./test-one.sh SparkSQLTest#testCreateTableAndQuery"
echo ""
exit 1
fi
# Check if SeaweedFS is running
if ! curl -f http://localhost:8888/ > /dev/null 2>&1; then
echo "✗ SeaweedFS filer is not accessible at http://localhost:8888"
echo ""
echo "Please start SeaweedFS first:"
echo " docker-compose up -d"
echo ""
exit 1
fi
echo "✓ SeaweedFS filer is accessible"
echo ""
echo "Running test: $1"
echo ""
# Set environment variables
export SEAWEEDFS_TEST_ENABLED=true
export SEAWEEDFS_FILER_HOST=localhost
export SEAWEEDFS_FILER_PORT=8888
export SEAWEEDFS_FILER_GRPC_PORT=18888
# Run the specific test
mvn test -Dtest="$1"
Loading…
Cancel
Save