Browse Source

feat: add comprehensive debug logging to track Parquet write sequence

Added extensive WARN-level debug messages to trace the exact sequence of:
- Every write() operation with position tracking
- All getPos() calls with caller stack traces
- flush() and flushInternal() operations
- Buffer flushes and position updates
- Metadata updates

BREAKTHROUGH FINDING:
- Last getPos() call: returns 1252 bytes (at writeCall #465)
- 5 more writes happen: add 8 bytes → buffer.position()=1260
- close() flushes all 1260 bytes to disk
- But Parquet footer records offsets based on 1252!

Result: 8-byte offset mismatch in Parquet footer metadata
→ Causes EOFException: 'Still have: 78 bytes left'

The 78 bytes is NOT missing data - it's a metadata calculation error
due to Parquet footer offsets being stale by 8 bytes.
pull/7526/head
chrislu 1 week ago
parent
commit
3e754792a5
  1. 74
      other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
  2. 82
      test/java/spark/DEBUG_BREAKTHROUGH.md

74
other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

@ -154,7 +154,10 @@ public class SeaweedOutputStream extends OutputStream {
attrBuilder.setFileSize(offset);
entry.setAttributes(attrBuilder);
LOG.info("[DEBUG-2024] Set entry.attributes.fileSize = {} bytes before writeMeta", offset);
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] METADATA UPDATE: setting entry.attributes.fileSize = {} bytes | #chunks={} | path={}",
offset, entry.getChunksCount(), path.substring(path.lastIndexOf('/') + 1));
}
SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
} catch (Exception ex) {
@ -183,19 +186,18 @@ public class SeaweedOutputStream extends OutputStream {
totalBytesWritten += length;
writeCallCount++;
// Log significant writes AND writes near the end (potential footer)
if (path.contains("parquet")) {
if (length >= 20) {
LOG.info(
"[DEBUG-2024] write({} bytes): totalSoFar={} writeCalls={} position={} bufferPos={}, file={}",
length, totalBytesWritten, writeCallCount, position, buffer.position(),
path.substring(path.lastIndexOf('/') + 1));
} else if (writeCallCount >= 220) {
// Log all small writes after call 220 (likely footer writes)
LOG.info(
"[DEBUG-2024] write({} bytes): totalSoFar={} writeCalls={} position={} bufferPos={} [FOOTER?], file={}",
length, totalBytesWritten, writeCallCount, position, buffer.position(),
path.substring(path.lastIndexOf('/') + 1));
// Enhanced debug logging for ALL writes to track the exact sequence
if (path.contains("parquet") || path.contains("employees")) {
long beforeBufferPos = buffer.position();
long currentVirtualPos = position + beforeBufferPos;
// Always log writes to see the complete pattern
if (length >= 20 || writeCallCount >= 220 || writeCallCount % 50 == 0) {
LOG.warn(
"[DEBUG-2024] WRITE #{}: {} bytes | virtualPos={} (flushed={} + buffer={}) | totalWritten={} | file={}",
writeCallCount, length, currentVirtualPos, position, beforeBufferPos,
totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
}
@ -242,6 +244,11 @@ public class SeaweedOutputStream extends OutputStream {
*/
@Override
public void flush() throws IOException {
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flush() CALLED: supportFlush={} position={} buffer.position()={} totalWritten={} path={}",
supportFlush, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
if (supportFlush) {
flushInternalAsync();
}
@ -301,16 +308,27 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized void writeCurrentBufferToService() throws IOException {
int bufferPos = buffer.position();
LOG.info("[DEBUG-2024] writeCurrentBufferToService: path={} buffer.position()={} totalPosition={}", path,
bufferPos, position);
long positionBefore = position;
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] writeCurrentBufferToService START: buffer.position()={} currentFlushedPosition={} totalWritten={} path={}",
bufferPos, position, totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
if (bufferPos == 0) {
LOG.info(" Skipping write, buffer is empty");
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] -> Skipping: buffer is empty");
}
return;
}
int written = submitWriteBufferToService(buffer, position);
position += written;
LOG.info(" Submitted {} bytes for write, new position={}", written, position);
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] writeCurrentBufferToService END: submitted {} bytes, flushedPosition {} -> {}, totalWritten={} path={}",
written, positionBefore, position, totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
buffer = ByteBufferPool.request(bufferSize);
@ -385,15 +403,35 @@ public class SeaweedOutputStream extends OutputStream {
}
protected synchronized void flushInternal() throws IOException {
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flushInternal() START: position={} buffer.position()={} totalWritten={} path={}",
position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToService();
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flushInternal() END: position={} buffer.position()={} totalWritten={} path={}",
position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
}
protected synchronized void flushInternalAsync() throws IOException {
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flushInternalAsync() START: position={} buffer.position()={} totalWritten={} path={}",
position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToServiceAsync();
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flushInternalAsync() END: position={} buffer.position()={} totalWritten={} path={}",
position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
}
private synchronized void flushWrittenBytesToService() throws IOException {

82
test/java/spark/DEBUG_BREAKTHROUGH.md

@ -0,0 +1,82 @@
# Debug Breakthrough: Root Cause Identified
## Complete Event Sequence
### 1. Write Pattern
```
- writeCalls 1-465: Writing Parquet data
- Last getPos() call: writeCalls=465, returns 1252
→ flushedPosition=0 + bufferPosition=1252 = 1252
- writeCalls 466-470: 5 more writes (8 bytes total)
→ These are footer metadata bytes
→ Parquet does NOT call getPos() after these writes
- close() called:
→ buffer.position()=1260 (1252 + 8)
→ All 1260 bytes flushed to disk
→ File size set to 1260 bytes
```
### 2. The Problem
**Parquet's write sequence:**
1. Write column chunk data, calling `getPos()` after each write → records offsets
2. **Last `getPos()` returns 1252**
3. Write footer metadata (8 bytes) → **NO getPos() call!**
4. Close file → flushes all 1260 bytes
**Result**: Parquet footer says data ends at **1252**, but file actually has **1260** bytes.
### 3. The Discrepancy
```
Last getPos(): 1252 bytes (what Parquet recorded in footer)
Actual file: 1260 bytes (what was flushed)
Missing: 8 bytes (footer metadata written without getPos())
```
### 4. Why It Fails on Read
When Parquet tries to read the file:
- Footer says column chunks end at offset 1252
- Parquet tries to read from 1252, expecting more data
- But the actual data structure is offset by 8 bytes
- Results in: `EOFException: Still have: 78 bytes left`
### 5. Key Insight: The "78 bytes"
The **78 bytes** is NOT missing data — it's a **metadata mismatch**:
- Parquet footer contains incorrect offsets
- These offsets are off by 8 bytes (the final footer writes)
- When reading, Parquet calculates it needs 78 more bytes based on wrong offsets
## Root Cause
**Parquet assumes `getPos()` reflects ALL bytes written, even buffered ones.**
Our implementation is correct:
```java
public long getPos() {
return position + buffer.position(); // ✅ Includes buffered data
}
```
BUT: Parquet writes footer metadata AFTER the last `getPos()` call, so those 8 bytes
are not accounted for in the footer's offset calculations.
## Why Unit Tests Pass but Spark Fails
**Unit tests**: Direct writes → immediate getPos() → correct offsets
**Spark/Parquet**: Complex write sequence → footer written AFTER last getPos() → stale offsets
## The Fix
We need to ensure that when Parquet writes its footer, ALL bytes (including those 8 footer bytes)
are accounted for in the file position. Options:
1. **Force flush on getPos()** - ensures position is up-to-date
2. **Override FSDataOutputStream more deeply** - intercept all write operations
3. **Investigate Parquet's footer writing logic** - understand why it doesn't call getPos()
Next: Examine how HDFS/S3 FileSystem implementations handle this.
Loading…
Cancel
Save