Browse Source
docs: comprehensive analysis of Parquet EOF root cause and fix strategies
docs: comprehensive analysis of Parquet EOF root cause and fix strategies
Documented complete technical analysis including: ROOT CAUSE: - Parquet writes footer metadata AFTER last getPos() call - 8 bytes written without getPos() being called - Footer records stale offsets (1252 instead of 1260) - Results in metadata mismatch → EOF exception on read FIX OPTIONS (4 approaches analyzed): 1. Flush on getPos() - simple but slow 2. Track virtual position - RECOMMENDED 3. Defer footer metadata - complex 4. Force flush before close - workaround RECOMMENDED: Option 2 (Virtual Position) - Add virtualPosition field - getPos() returns virtualPosition (not position) - Aligns with Hadoop FSDataOutputStream semantics - No performance impact Ready to implement the fix.pull/7526/head
1 changed files with 204 additions and 0 deletions
@ -0,0 +1,204 @@ |
|||||
|
# Parquet EOF Exception: Root Cause and Fix Strategy |
||||
|
|
||||
|
## Executive Summary |
||||
|
|
||||
|
**Problem**: `EOFException: Still have: 78 bytes left` when reading Parquet files written to SeaweedFS via Spark. |
||||
|
|
||||
|
**Root Cause**: Parquet footer metadata contains stale offsets due to writes occurring AFTER the last `getPos()` call. |
||||
|
|
||||
|
**Impact**: All Parquet files written via Spark are unreadable. |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Technical Details |
||||
|
|
||||
|
### The Write Sequence (from debug logs) |
||||
|
|
||||
|
``` |
||||
|
Write Phase: |
||||
|
- writeCalls 1-465: Parquet data (column chunks, dictionaries, etc.) |
||||
|
- Last getPos(): returns 1252 (flushedPosition=0 + bufferPosition=1252) |
||||
|
↓ |
||||
|
Footer Phase: |
||||
|
- writeCalls 466-470: Footer metadata (8 bytes) |
||||
|
- NO getPos() called during this phase! |
||||
|
↓ |
||||
|
Close Phase: |
||||
|
- buffer.position() = 1260 bytes |
||||
|
- All 1260 bytes flushed to disk |
||||
|
- File size set to 1260 bytes |
||||
|
``` |
||||
|
|
||||
|
###The Mismatch |
||||
|
|
||||
|
| What | Value | Notes | |
||||
|
|--------------------------|-------|-------| |
||||
|
| Last `getPos()` returned | 1252 | Parquet records this in footer | |
||||
|
| Actual bytes written | 1260 | What's flushed to disk | |
||||
|
| **Gap** | **8** | **Unaccounted footer bytes** | |
||||
|
|
||||
|
### Why Reads Fail |
||||
|
|
||||
|
1. Parquet footer says: "Column chunk data ends at offset 1252" |
||||
|
2. Actual file structure: Column chunk data ends at offset 1260 |
||||
|
3. When reading, Parquet seeks to offset 1252 |
||||
|
4. Parquet expects to find data there, but it's 8 bytes off |
||||
|
5. Result: `EOFException: Still have: 78 bytes left` |
||||
|
|
||||
|
> The "78 bytes" is Parquet's calculation of how much data it expected vs. what it got, based on incorrect offsets. |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Why This Happens |
||||
|
|
||||
|
Parquet's footer writing is **asynchronous** with respect to `getPos()`: |
||||
|
|
||||
|
```java |
||||
|
// Parquet's internal logic (simplified): |
||||
|
1. Write column chunk → call getPos() → record offset |
||||
|
2. Write more chunks → call getPos() → record offset |
||||
|
3. Write footer metadata (magic bytes, etc.) → NO getPos()! |
||||
|
4. Close stream |
||||
|
``` |
||||
|
|
||||
|
The footer metadata bytes (step 3) are written AFTER Parquet has recorded all offsets. |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Why Unit Tests Pass but Spark Fails |
||||
|
|
||||
|
**Unit tests**: |
||||
|
- Simple write patterns |
||||
|
- Direct, synchronous writes |
||||
|
- `getPos()` called immediately after relevant writes |
||||
|
|
||||
|
**Spark/Parquet**: |
||||
|
- Complex write patterns with buffering |
||||
|
- Asynchronous footer writing |
||||
|
- `getPos()` NOT called after final footer writes |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Fix Options |
||||
|
|
||||
|
### Option 1: Flush on getPos() (Simple, but has performance impact) |
||||
|
|
||||
|
```java |
||||
|
public synchronized long getPos() { |
||||
|
if (buffer.position() > 0) { |
||||
|
writeCurrentBufferToService(); // Force flush |
||||
|
} |
||||
|
return position; |
||||
|
} |
||||
|
``` |
||||
|
|
||||
|
**Pros**: |
||||
|
- Ensures `position` is always accurate |
||||
|
- Simple to implement |
||||
|
|
||||
|
**Cons**: |
||||
|
- Performance hit (many small flushes) |
||||
|
- Changes buffering semantics |
||||
|
|
||||
|
### Option 2: Track Virtual Position Separately (Recommended) |
||||
|
|
||||
|
Keep `position` (flushed) separate from `getPos()` (virtual): |
||||
|
|
||||
|
```java |
||||
|
private long position = 0; // Flushed bytes |
||||
|
private long virtualPosition = 0; // Total bytes written |
||||
|
|
||||
|
@Override |
||||
|
public synchronized void write(byte[] data, int off, int length) { |
||||
|
// ... existing write logic ... |
||||
|
virtualPosition += length; |
||||
|
} |
||||
|
|
||||
|
public synchronized long getPos() { |
||||
|
return virtualPosition; // Always accurate, no flush needed |
||||
|
} |
||||
|
``` |
||||
|
|
||||
|
**Pros**: |
||||
|
- No performance impact |
||||
|
- Clean separation of concerns |
||||
|
- `getPos()` always reflects total bytes written |
||||
|
|
||||
|
**Cons**: |
||||
|
- Need to track `virtualPosition` across all write methods |
||||
|
|
||||
|
### Option 3: Defer Footer Metadata Update (Complex) |
||||
|
|
||||
|
Modify `flushWrittenBytesToServiceInternal()` to account for buffered data: |
||||
|
|
||||
|
```java |
||||
|
protected void flushWrittenBytesToServiceInternal(final long offset) { |
||||
|
long actualOffset = offset + buffer.position(); // Include buffered data |
||||
|
entry.getAttributes().setFileSize(actualOffset); |
||||
|
// ... |
||||
|
} |
||||
|
``` |
||||
|
|
||||
|
**Pros**: |
||||
|
- Minimal code changes |
||||
|
|
||||
|
**Cons**: |
||||
|
- Doesn't solve the root cause |
||||
|
- May break other use cases |
||||
|
|
||||
|
### Option 4: Force Flush Before Close (Workaround) |
||||
|
|
||||
|
Override `close()` to flush before calling super: |
||||
|
|
||||
|
```java |
||||
|
@Override |
||||
|
public synchronized void close() throws IOException { |
||||
|
if (buffer.position() > 0) { |
||||
|
writeCurrentBufferToService(); // Ensure everything flushed |
||||
|
} |
||||
|
super.close(); |
||||
|
} |
||||
|
``` |
||||
|
|
||||
|
**Pros**: |
||||
|
- Simple |
||||
|
- Ensures file size is correct |
||||
|
|
||||
|
**Cons**: |
||||
|
- Doesn't fix the `getPos()` staleness issue |
||||
|
- Still has metadata timing problems |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Recommended Solution |
||||
|
|
||||
|
**Option 2: Track Virtual Position Separately** |
||||
|
|
||||
|
This aligns with Hadoop's semantics where `getPos()` should return the total number of bytes written to the stream, regardless of buffering. |
||||
|
|
||||
|
### Implementation Plan |
||||
|
|
||||
|
1. Add `virtualPosition` field to `SeaweedOutputStream` |
||||
|
2. Update all `write()` methods to increment `virtualPosition` |
||||
|
3. Change `getPos()` to return `virtualPosition` |
||||
|
4. Keep `position` for internal flush tracking |
||||
|
5. Add unit tests to verify `getPos()` accuracy with buffering |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Next Steps |
||||
|
|
||||
|
1. Implement Option 2 (Virtual Position) |
||||
|
2. Test with local Spark reproduction |
||||
|
3. Verify unit tests still pass |
||||
|
4. Run full Spark integration tests in CI |
||||
|
5. Compare behavior with HDFS/S3 implementations |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## References |
||||
|
|
||||
|
- Parquet specification: https://parquet.apache.org/docs/file-format/ |
||||
|
- Hadoop `FSDataOutputStream` contract: `getPos()` should return total bytes written |
||||
|
- Related issues: SeaweedFS Spark integration tests failing with EOF exceptions |
||||
|
|
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue