diff --git a/test/java/spark/docker-compose.yml b/test/java/spark/docker-compose.yml index 39d32b7d0..be7cd8289 100644 --- a/test/java/spark/docker-compose.yml +++ b/test/java/spark/docker-compose.yml @@ -81,6 +81,8 @@ services: - HADOOP_HOME=/tmp # Disable Java DNS caching to ensure fresh DNS lookups - MAVEN_OPTS=-Dsun.net.inetaddr.ttl=0 -Dnetworkaddress.cache.ttl=0 + # Force fsync on close to ensure data is flushed before file is considered written + - SPARK_SUBMIT_OPTS=-Dfs.seaweedfs.impl.disable.cache=true command: sh -c "sleep 30 && ping -c 1 seaweedfs-volume && ping -c 1 seaweedfs-filer && mvn test" depends_on: seaweedfs-filer: diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java b/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java index 5241e2b66..4e257131e 100644 --- a/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java +++ b/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java @@ -59,7 +59,11 @@ public abstract class SparkTestBase { .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") .set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") // Disable speculative execution to reduce load - .set("spark.speculation", "false"); + .set("spark.speculation", "false") + // Ensure files are fully synced before close + .set("spark.hadoop.fs.seaweed.write.flush.sync", "true") + // Disable filesystem cache to avoid stale reads + .set("spark.hadoop.fs.seaweedfs.impl.disable.cache", "true"); spark = SparkSession.builder() .config(sparkConf)