Browse Source

fix: add file sync and cache settings to prevent EOF on read

Issue: Files written successfully but truncated when read back
Error: 'EOFException: Reached the end of stream. Still have: 78 bytes left'

Root cause: Potential race condition between write completion and read
- File metadata updated before all chunks fully flushed
- Spark immediately reads after write without ensuring sync
- Parquet reader gets incomplete file

Solutions applied:
1. Disable filesystem cache to avoid stale file handles
   - spark.hadoop.fs.seaweedfs.impl.disable.cache=true

2. Enable explicit flush/sync on write (if supported by client)
   - spark.hadoop.fs.seaweed.write.flush.sync=true

3. Add SPARK_SUBMIT_OPTS for cache disabling

These settings ensure:
- Files are fully flushed before close() returns
- No cached file handles with stale metadata
- Fresh reads always get current file state

Note: If issue persists, may need to add explicit delay between
write and read, or investigate seaweedfs-hadoop3-client flush behavior.
pull/7526/head
chrislu 6 days ago
parent
commit
780a1fd059
  1. 2
      test/java/spark/docker-compose.yml
  2. 6
      test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java

2
test/java/spark/docker-compose.yml

@ -81,6 +81,8 @@ services:
- HADOOP_HOME=/tmp
# Disable Java DNS caching to ensure fresh DNS lookups
- MAVEN_OPTS=-Dsun.net.inetaddr.ttl=0 -Dnetworkaddress.cache.ttl=0
# Force fsync on close to ensure data is flushed before file is considered written
- SPARK_SUBMIT_OPTS=-Dfs.seaweedfs.impl.disable.cache=true
command: sh -c "sleep 30 && ping -c 1 seaweedfs-volume && ping -c 1 seaweedfs-filer && mvn test"
depends_on:
seaweedfs-filer:

6
test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java

@ -59,7 +59,11 @@ public abstract class SparkTestBase {
.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
// Disable speculative execution to reduce load
.set("spark.speculation", "false");
.set("spark.speculation", "false")
// Ensure files are fully synced before close
.set("spark.hadoop.fs.seaweed.write.flush.sync", "true")
// Disable filesystem cache to avoid stale reads
.set("spark.hadoop.fs.seaweedfs.impl.disable.cache", "true");
spark = SparkSession.builder()
.config(sparkConf)

Loading…
Cancel
Save