diff --git a/test/java/spark/PARQUET_ROOT_CAUSE_AND_FIX.md b/test/java/spark/PARQUET_ROOT_CAUSE_AND_FIX.md new file mode 100644 index 000000000..4fe028774 --- /dev/null +++ b/test/java/spark/PARQUET_ROOT_CAUSE_AND_FIX.md @@ -0,0 +1,204 @@ +# Parquet EOF Exception: Root Cause and Fix Strategy + +## Executive Summary + +**Problem**: `EOFException: Still have: 78 bytes left` when reading Parquet files written to SeaweedFS via Spark. + +**Root Cause**: Parquet footer metadata contains stale offsets due to writes occurring AFTER the last `getPos()` call. + +**Impact**: All Parquet files written via Spark are unreadable. + +--- + +## Technical Details + +### The Write Sequence (from debug logs) + +``` +Write Phase: +- writeCalls 1-465: Parquet data (column chunks, dictionaries, etc.) +- Last getPos(): returns 1252 (flushedPosition=0 + bufferPosition=1252) + ↓ +Footer Phase: +- writeCalls 466-470: Footer metadata (8 bytes) +- NO getPos() called during this phase! + ↓ +Close Phase: +- buffer.position() = 1260 bytes +- All 1260 bytes flushed to disk +- File size set to 1260 bytes +``` + +###The Mismatch + +| What | Value | Notes | +|--------------------------|-------|-------| +| Last `getPos()` returned | 1252 | Parquet records this in footer | +| Actual bytes written | 1260 | What's flushed to disk | +| **Gap** | **8** | **Unaccounted footer bytes** | + +### Why Reads Fail + +1. Parquet footer says: "Column chunk data ends at offset 1252" +2. Actual file structure: Column chunk data ends at offset 1260 +3. When reading, Parquet seeks to offset 1252 +4. Parquet expects to find data there, but it's 8 bytes off +5. Result: `EOFException: Still have: 78 bytes left` + +> The "78 bytes" is Parquet's calculation of how much data it expected vs. what it got, based on incorrect offsets. + +--- + +## Why This Happens + +Parquet's footer writing is **asynchronous** with respect to `getPos()`: + +```java +// Parquet's internal logic (simplified): +1. Write column chunk → call getPos() → record offset +2. Write more chunks → call getPos() → record offset +3. Write footer metadata (magic bytes, etc.) → NO getPos()! +4. Close stream +``` + +The footer metadata bytes (step 3) are written AFTER Parquet has recorded all offsets. + +--- + +## Why Unit Tests Pass but Spark Fails + +**Unit tests**: +- Simple write patterns +- Direct, synchronous writes +- `getPos()` called immediately after relevant writes + +**Spark/Parquet**: +- Complex write patterns with buffering +- Asynchronous footer writing +- `getPos()` NOT called after final footer writes + +--- + +## Fix Options + +### Option 1: Flush on getPos() (Simple, but has performance impact) + +```java +public synchronized long getPos() { + if (buffer.position() > 0) { + writeCurrentBufferToService(); // Force flush + } + return position; +} +``` + +**Pros**: +- Ensures `position` is always accurate +- Simple to implement + +**Cons**: +- Performance hit (many small flushes) +- Changes buffering semantics + +### Option 2: Track Virtual Position Separately (Recommended) + +Keep `position` (flushed) separate from `getPos()` (virtual): + +```java +private long position = 0; // Flushed bytes +private long virtualPosition = 0; // Total bytes written + +@Override +public synchronized void write(byte[] data, int off, int length) { + // ... existing write logic ... + virtualPosition += length; +} + +public synchronized long getPos() { + return virtualPosition; // Always accurate, no flush needed +} +``` + +**Pros**: +- No performance impact +- Clean separation of concerns +- `getPos()` always reflects total bytes written + +**Cons**: +- Need to track `virtualPosition` across all write methods + +### Option 3: Defer Footer Metadata Update (Complex) + +Modify `flushWrittenBytesToServiceInternal()` to account for buffered data: + +```java +protected void flushWrittenBytesToServiceInternal(final long offset) { + long actualOffset = offset + buffer.position(); // Include buffered data + entry.getAttributes().setFileSize(actualOffset); + // ... +} +``` + +**Pros**: +- Minimal code changes + +**Cons**: +- Doesn't solve the root cause +- May break other use cases + +### Option 4: Force Flush Before Close (Workaround) + +Override `close()` to flush before calling super: + +```java +@Override +public synchronized void close() throws IOException { + if (buffer.position() > 0) { + writeCurrentBufferToService(); // Ensure everything flushed + } + super.close(); +} +``` + +**Pros**: +- Simple +- Ensures file size is correct + +**Cons**: +- Doesn't fix the `getPos()` staleness issue +- Still has metadata timing problems + +--- + +## Recommended Solution + +**Option 2: Track Virtual Position Separately** + +This aligns with Hadoop's semantics where `getPos()` should return the total number of bytes written to the stream, regardless of buffering. + +### Implementation Plan + +1. Add `virtualPosition` field to `SeaweedOutputStream` +2. Update all `write()` methods to increment `virtualPosition` +3. Change `getPos()` to return `virtualPosition` +4. Keep `position` for internal flush tracking +5. Add unit tests to verify `getPos()` accuracy with buffering + +--- + +## Next Steps + +1. Implement Option 2 (Virtual Position) +2. Test with local Spark reproduction +3. Verify unit tests still pass +4. Run full Spark integration tests in CI +5. Compare behavior with HDFS/S3 implementations + +--- + +## References + +- Parquet specification: https://parquet.apache.org/docs/file-format/ +- Hadoop `FSDataOutputStream` contract: `getPos()` should return total bytes written +- Related issues: SeaweedFS Spark integration tests failing with EOF exceptions +