From a3cf4eb8438d16b0609a36d3123f0b00fd8ba02a Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 23 Nov 2025 11:54:01 -0800 Subject: [PATCH] debug: track stream lifecycle and total bytes written MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added comprehensive logging to identify why Parquet files fail with 'EOFException: Still have: 78 bytes left'. Key additions: 1. SeaweedHadoopOutputStream constructor logging with 🔧 marker - Shows when output streams are created - Logs path, position, bufferSize, replication 2. totalBytesWritten counter in SeaweedOutputStream - Tracks cumulative bytes written via write() calls - Helps identify if Parquet wrote 762 bytes but only 684 reached chunks 3. Enhanced close() logging with 🔒 and ✅ markers - Shows totalBytesWritten vs position vs buffer.position() - If totalBytesWritten=762 but position=684, write submission failed - If buffer.position()=78 at close, buffer wasn't flushed Expected scenarios in next run: A) Stream never created → No 🔧 log for .parquet files B) Write failed → totalBytesWritten=762 but position=684 C) Buffer not flushed → buffer.position()=78 at close D) All correct → totalBytesWritten=position=684, but Parquet expects 762 This will pinpoint whether the issue is in: - Stream creation/lifecycle - Write submission - Buffer flushing - Or Parquet's internal state --- .../seaweedfs/client/SeaweedOutputStream.java | 18 +++++++++++------- .../hdfs/SeaweedHadoopOutputStream.java | 6 ++++++ test/java/spark/CI_SETUP.md | 1 + test/java/spark/quick-start.sh | 1 + test/java/spark/run-tests.sh | 1 + .../seaweed/spark/SparkSeaweedFSExample.java | 1 + .../test/java/seaweed/spark/SparkSQLTest.java | 1 + 7 files changed, 22 insertions(+), 7 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 8764780ba..a30c52834 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -34,6 +34,7 @@ public class SeaweedOutputStream extends OutputStream { private long outputIndex; private String replication = ""; private String collection = ""; + private long totalBytesWritten = 0; // Track total bytes for debugging public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { this(filerClient, fullpath, ""); @@ -115,17 +116,17 @@ public class SeaweedOutputStream extends OutputStream { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { - LOG.info("[DEBUG-2024] ⚠️ flushWrittenBytesToServiceInternal: path={} offset={} #chunks={}", + LOG.info("[DEBUG-2024] ⚠️ flushWrittenBytesToServiceInternal: path={} offset={} #chunks={}", path, offset, entry.getChunksCount()); - + // Set the file size in attributes based on our position // This ensures Parquet footer metadata matches what we actually wrote FilerProto.FuseAttributes.Builder attrBuilder = entry.getAttributes().toBuilder(); attrBuilder.setFileSize(offset); entry.setAttributes(attrBuilder); - + LOG.info("[DEBUG-2024] → Set entry.attributes.fileSize = {} bytes before writeMeta", offset); - + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } catch (Exception ex) { throw new IOException(ex); @@ -151,6 +152,8 @@ public class SeaweedOutputStream extends OutputStream { throw new IndexOutOfBoundsException(); } + totalBytesWritten += length; + // System.out.println(path + " write [" + (outputIndex + off) + "," + // ((outputIndex + off) + length) + ")"); @@ -204,12 +207,13 @@ public class SeaweedOutputStream extends OutputStream { } int bufferPosBeforeFlush = buffer.position(); - LOG.info("[DEBUG-2024] close START: path={} position={} buffer.position()={}", path, position, bufferPosBeforeFlush); + LOG.info("[DEBUG-2024] 🔒 close START: path={} position={} buffer.position()={} totalBytesWritten={}", + path, position, bufferPosBeforeFlush, totalBytesWritten); try { flushInternal(); threadExecutor.shutdown(); - LOG.info("[DEBUG-2024] close END: path={} finalPosition={} (buffer had {} bytes that were flushed)", - path, position, bufferPosBeforeFlush); + LOG.info("[DEBUG-2024] ✅ close END: path={} finalPosition={} totalBytesWritten={} (buffer had {} bytes)", + path, position, totalBytesWritten, bufferPosBeforeFlush); } finally { lastError = new IOException("Stream is closed!"); ByteBufferPool.release(buffer); diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java index 1740312fe..672184b04 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java @@ -4,6 +4,8 @@ package seaweed.hdfs; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import seaweedfs.client.FilerClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedOutputStream; @@ -13,9 +15,13 @@ import java.util.Locale; public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities { + private static final Logger LOG = LoggerFactory.getLogger(SeaweedHadoopOutputStream.class); + public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { super(filerClient, path, entry, position, bufferSize, replication); + LOG.info("[DEBUG-2024] 🔧 SeaweedHadoopOutputStream created: path={} position={} bufferSize={} replication={}", + path, position, bufferSize, replication); } /** diff --git a/test/java/spark/CI_SETUP.md b/test/java/spark/CI_SETUP.md index 962f5a9fe..35b488ede 100644 --- a/test/java/spark/CI_SETUP.md +++ b/test/java/spark/CI_SETUP.md @@ -272,3 +272,4 @@ For persistent issues: - Note if it passes locally + diff --git a/test/java/spark/quick-start.sh b/test/java/spark/quick-start.sh index d674977f1..974363311 100755 --- a/test/java/spark/quick-start.sh +++ b/test/java/spark/quick-start.sh @@ -146,3 +146,4 @@ trap cleanup INT main + diff --git a/test/java/spark/run-tests.sh b/test/java/spark/run-tests.sh index 420141bba..f637c8c59 100755 --- a/test/java/spark/run-tests.sh +++ b/test/java/spark/run-tests.sh @@ -43,3 +43,4 @@ echo "✓ Test run completed" echo "View detailed reports in: target/surefire-reports/" + diff --git a/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java index be0bb395e..8919bfbdf 100644 --- a/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java +++ b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java @@ -142,3 +142,4 @@ public class SparkSeaweedFSExample { } + diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java index 2cfd64a2d..d0c01736a 100644 --- a/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java +++ b/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java @@ -235,3 +235,4 @@ public class SparkSQLTest extends SparkTestBase { } +