diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index e44f59b7e..1164f39e1 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -154,7 +154,10 @@ public class SeaweedOutputStream extends OutputStream { attrBuilder.setFileSize(offset); entry.setAttributes(attrBuilder); - LOG.info("[DEBUG-2024] Set entry.attributes.fileSize = {} bytes before writeMeta", offset); + if (path.contains("parquet") || path.contains("employees")) { + LOG.warn("[DEBUG-2024] METADATA UPDATE: setting entry.attributes.fileSize = {} bytes | #chunks={} | path={}", + offset, entry.getChunksCount(), path.substring(path.lastIndexOf('/') + 1)); + } SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } catch (Exception ex) { @@ -183,19 +186,18 @@ public class SeaweedOutputStream extends OutputStream { totalBytesWritten += length; writeCallCount++; - // Log significant writes AND writes near the end (potential footer) - if (path.contains("parquet")) { - if (length >= 20) { - LOG.info( - "[DEBUG-2024] write({} bytes): totalSoFar={} writeCalls={} position={} bufferPos={}, file={}", - length, totalBytesWritten, writeCallCount, position, buffer.position(), - path.substring(path.lastIndexOf('/') + 1)); - } else if (writeCallCount >= 220) { - // Log all small writes after call 220 (likely footer writes) - LOG.info( - "[DEBUG-2024] write({} bytes): totalSoFar={} writeCalls={} position={} bufferPos={} [FOOTER?], file={}", - length, totalBytesWritten, writeCallCount, position, buffer.position(), - path.substring(path.lastIndexOf('/') + 1)); + + // Enhanced debug logging for ALL writes to track the exact sequence + if (path.contains("parquet") || path.contains("employees")) { + long beforeBufferPos = buffer.position(); + long currentVirtualPos = position + beforeBufferPos; + + // Always log writes to see the complete pattern + if (length >= 20 || writeCallCount >= 220 || writeCallCount % 50 == 0) { + LOG.warn( + "[DEBUG-2024] WRITE #{}: {} bytes | virtualPos={} (flushed={} + buffer={}) | totalWritten={} | file={}", + writeCallCount, length, currentVirtualPos, position, beforeBufferPos, + totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); } } @@ -242,6 +244,11 @@ public class SeaweedOutputStream extends OutputStream { */ @Override public void flush() throws IOException { + if (path.contains("parquet") || path.contains("employees")) { + LOG.warn("[DEBUG-2024] flush() CALLED: supportFlush={} position={} buffer.position()={} totalWritten={} path={}", + supportFlush, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); + } + if (supportFlush) { flushInternalAsync(); } @@ -301,16 +308,27 @@ public class SeaweedOutputStream extends OutputStream { private synchronized void writeCurrentBufferToService() throws IOException { int bufferPos = buffer.position(); - LOG.info("[DEBUG-2024] writeCurrentBufferToService: path={} buffer.position()={} totalPosition={}", path, - bufferPos, position); + long positionBefore = position; + + if (path.contains("parquet") || path.contains("employees")) { + LOG.warn("[DEBUG-2024] writeCurrentBufferToService START: buffer.position()={} currentFlushedPosition={} totalWritten={} path={}", + bufferPos, position, totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); + } + if (bufferPos == 0) { - LOG.info(" Skipping write, buffer is empty"); + if (path.contains("parquet") || path.contains("employees")) { + LOG.warn("[DEBUG-2024] -> Skipping: buffer is empty"); + } return; } int written = submitWriteBufferToService(buffer, position); position += written; - LOG.info(" Submitted {} bytes for write, new position={}", written, position); + + if (path.contains("parquet") || path.contains("employees")) { + LOG.warn("[DEBUG-2024] writeCurrentBufferToService END: submitted {} bytes, flushedPosition {} -> {}, totalWritten={} path={}", + written, positionBefore, position, totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); + } buffer = ByteBufferPool.request(bufferSize); @@ -385,15 +403,35 @@ public class SeaweedOutputStream extends OutputStream { } protected synchronized void flushInternal() throws IOException { + if (path.contains("parquet") || path.contains("employees")) { + LOG.warn("[DEBUG-2024] flushInternal() START: position={} buffer.position()={} totalWritten={} path={}", + position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); + } + maybeThrowLastError(); writeCurrentBufferToService(); flushWrittenBytesToService(); + + if (path.contains("parquet") || path.contains("employees")) { + LOG.warn("[DEBUG-2024] flushInternal() END: position={} buffer.position()={} totalWritten={} path={}", + position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); + } } protected synchronized void flushInternalAsync() throws IOException { + if (path.contains("parquet") || path.contains("employees")) { + LOG.warn("[DEBUG-2024] flushInternalAsync() START: position={} buffer.position()={} totalWritten={} path={}", + position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); + } + maybeThrowLastError(); writeCurrentBufferToService(); flushWrittenBytesToServiceAsync(); + + if (path.contains("parquet") || path.contains("employees")) { + LOG.warn("[DEBUG-2024] flushInternalAsync() END: position={} buffer.position()={} totalWritten={} path={}", + position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); + } } private synchronized void flushWrittenBytesToService() throws IOException { diff --git a/test/java/spark/DEBUG_BREAKTHROUGH.md b/test/java/spark/DEBUG_BREAKTHROUGH.md new file mode 100644 index 000000000..36e9be753 --- /dev/null +++ b/test/java/spark/DEBUG_BREAKTHROUGH.md @@ -0,0 +1,82 @@ +# Debug Breakthrough: Root Cause Identified + +## Complete Event Sequence + +### 1. Write Pattern +``` +- writeCalls 1-465: Writing Parquet data +- Last getPos() call: writeCalls=465, returns 1252 + → flushedPosition=0 + bufferPosition=1252 = 1252 + +- writeCalls 466-470: 5 more writes (8 bytes total) + → These are footer metadata bytes + → Parquet does NOT call getPos() after these writes + +- close() called: + → buffer.position()=1260 (1252 + 8) + → All 1260 bytes flushed to disk + → File size set to 1260 bytes +``` + +### 2. The Problem + +**Parquet's write sequence:** +1. Write column chunk data, calling `getPos()` after each write → records offsets +2. **Last `getPos()` returns 1252** +3. Write footer metadata (8 bytes) → **NO getPos() call!** +4. Close file → flushes all 1260 bytes + +**Result**: Parquet footer says data ends at **1252**, but file actually has **1260** bytes. + +### 3. The Discrepancy + +``` +Last getPos(): 1252 bytes (what Parquet recorded in footer) +Actual file: 1260 bytes (what was flushed) +Missing: 8 bytes (footer metadata written without getPos()) +``` + +### 4. Why It Fails on Read + +When Parquet tries to read the file: +- Footer says column chunks end at offset 1252 +- Parquet tries to read from 1252, expecting more data +- But the actual data structure is offset by 8 bytes +- Results in: `EOFException: Still have: 78 bytes left` + +### 5. Key Insight: The "78 bytes" + +The **78 bytes** is NOT missing data — it's a **metadata mismatch**: +- Parquet footer contains incorrect offsets +- These offsets are off by 8 bytes (the final footer writes) +- When reading, Parquet calculates it needs 78 more bytes based on wrong offsets + +## Root Cause + +**Parquet assumes `getPos()` reflects ALL bytes written, even buffered ones.** + +Our implementation is correct: +```java +public long getPos() { + return position + buffer.position(); // ✅ Includes buffered data +} +``` + +BUT: Parquet writes footer metadata AFTER the last `getPos()` call, so those 8 bytes +are not accounted for in the footer's offset calculations. + +## Why Unit Tests Pass but Spark Fails + +**Unit tests**: Direct writes → immediate getPos() → correct offsets +**Spark/Parquet**: Complex write sequence → footer written AFTER last getPos() → stale offsets + +## The Fix + +We need to ensure that when Parquet writes its footer, ALL bytes (including those 8 footer bytes) +are accounted for in the file position. Options: + +1. **Force flush on getPos()** - ensures position is up-to-date +2. **Override FSDataOutputStream more deeply** - intercept all write operations +3. **Investigate Parquet's footer writing logic** - understand why it doesn't call getPos() + +Next: Examine how HDFS/S3 FileSystem implementations handle this.