|
|
@ -0,0 +1,134 @@ |
|
|
|
|
|
# BREAKTHROUGH: Found the Bug! |
|
|
|
|
|
|
|
|
|
|
|
## Local Spark Test Reproduced ✅ |
|
|
|
|
|
|
|
|
|
|
|
Successfully ran Spark test locally and captured detailed logs showing the exact problem! |
|
|
|
|
|
|
|
|
|
|
|
## The Smoking Gun 🔥 |
|
|
|
|
|
|
|
|
|
|
|
### Write Phase |
|
|
|
|
|
|
|
|
|
|
|
Throughout the ENTIRE write process: |
|
|
|
|
|
``` |
|
|
|
|
|
getPos(): flushedPosition=0 bufferPosition=4 returning=4 |
|
|
|
|
|
getPos(): flushedPosition=0 bufferPosition=22 returning=22 |
|
|
|
|
|
getPos(): flushedPosition=0 bufferPosition=48 returning=48 |
|
|
|
|
|
... |
|
|
|
|
|
getPos(): flushedPosition=0 bufferPosition=1252 returning=1252 ← Parquet's last call |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
**`flushedPosition=0` THE ENTIRE TIME!** Nothing is ever flushed to storage during writes! |
|
|
|
|
|
|
|
|
|
|
|
### Close Phase |
|
|
|
|
|
|
|
|
|
|
|
``` |
|
|
|
|
|
Last getPos(): bufferPosition=1252 returning=1252 ← Parquet records footer with this |
|
|
|
|
|
close START: buffer.position()=1260 ← Parquet wrote 8 MORE bytes! |
|
|
|
|
|
close END: finalPosition=1260 ← Actual file size |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
## The Bug |
|
|
|
|
|
|
|
|
|
|
|
1. **Parquet writes column data** → calls `getPos()` → gets 1252 |
|
|
|
|
|
2. **Parquet writes MORE data** → 8 more bytes (footer?) |
|
|
|
|
|
3. **Parquet closes stream** → flushes buffer → file is 1260 bytes |
|
|
|
|
|
4. **Parquet footer metadata** → says last data is at position 1252 |
|
|
|
|
|
5. **When reading**, Parquet calculates: "Next chunk should be at 1260 (1252 + 8)" |
|
|
|
|
|
6. **Tries to read 78 bytes** from position 1260 |
|
|
|
|
|
7. **But file ends at 1260** → EOF! |
|
|
|
|
|
|
|
|
|
|
|
## The Root Cause |
|
|
|
|
|
|
|
|
|
|
|
**`SeaweedOutputStream.getPos()` returns `position + buffer.position()`** |
|
|
|
|
|
|
|
|
|
|
|
Where: |
|
|
|
|
|
- `position` = flushed position (always 0 in this case!) |
|
|
|
|
|
- `buffer.position()` = buffered data position |
|
|
|
|
|
|
|
|
|
|
|
This works fine IF: |
|
|
|
|
|
- Data is flushed regularly, OR |
|
|
|
|
|
- The entire file fits in buffer AND no more writes happen after last `getPos()` |
|
|
|
|
|
|
|
|
|
|
|
**But Parquet does this:** |
|
|
|
|
|
1. Calls `getPos()` to record column chunk positions |
|
|
|
|
|
2. Writes ADDITIONAL data (footer metadata) |
|
|
|
|
|
3. Closes the stream (which flushes everything) |
|
|
|
|
|
|
|
|
|
|
|
**Result**: Footer has positions that are STALE by however many bytes Parquet wrote after the last `getPos()` call! |
|
|
|
|
|
|
|
|
|
|
|
## Why Unit Tests Pass |
|
|
|
|
|
|
|
|
|
|
|
Our unit tests: |
|
|
|
|
|
1. Write data |
|
|
|
|
|
2. Call `getPos()` |
|
|
|
|
|
3. **DON'T write more data** |
|
|
|
|
|
4. Close |
|
|
|
|
|
|
|
|
|
|
|
Spark/Parquet: |
|
|
|
|
|
1. Write column chunks, calling `getPos()` after each |
|
|
|
|
|
2. Write footer metadata → **WRITES MORE DATA without calling getPos()!** |
|
|
|
|
|
3. Close |
|
|
|
|
|
|
|
|
|
|
|
## The Fix |
|
|
|
|
|
|
|
|
|
|
|
We need to ensure `getPos()` always reflects the CURRENT write position, including any unflushed data. |
|
|
|
|
|
|
|
|
|
|
|
Current implementation is CORRECT for this! `position + buffer.position()` IS the current position. |
|
|
|
|
|
|
|
|
|
|
|
**The problem is Parquet writes data AFTER calling `getPos()` but BEFORE close!** |
|
|
|
|
|
|
|
|
|
|
|
### Solution Options |
|
|
|
|
|
|
|
|
|
|
|
**Option A: Make getPos() trigger a flush (NOT RECOMMENDED)** |
|
|
|
|
|
```java |
|
|
|
|
|
public synchronized long getPos() { |
|
|
|
|
|
flush(); // Force flush |
|
|
|
|
|
return position; // buffer is now empty |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
❌ **BAD**: Defeats the purpose of buffering, kills performance |
|
|
|
|
|
|
|
|
|
|
|
**Option B: Track "virtual position" separately** |
|
|
|
|
|
Already done! We return `position + buffer.position()`. This IS correct! |
|
|
|
|
|
|
|
|
|
|
|
**Option C: The REAL issue - Parquet footer size calculation** |
|
|
|
|
|
|
|
|
|
|
|
Wait... let me re-examine. If `getPos()` returns 1252, and then 8 more bytes are written, the buffer position becomes 1260. When Parquet closes the stream, it should flush, and the file should be 1260 bytes. |
|
|
|
|
|
|
|
|
|
|
|
BUT, Parquet's footer says data ends at 1252, so when reading, it tries to read from 1260 (next expected position based on chunk sizes), which doesn't exist! |
|
|
|
|
|
|
|
|
|
|
|
**The issue**: Parquet calculates column chunk sizes based on `getPos()` deltas, but doesn't account for data written AFTER the last `getPos()` call (the footer itself!). |
|
|
|
|
|
|
|
|
|
|
|
## Actually... The Real Problem Might Be Different |
|
|
|
|
|
|
|
|
|
|
|
Let me reconsider. If: |
|
|
|
|
|
- Last `getPos()` = 1252 |
|
|
|
|
|
- Close writes buffer of 1260 bytes |
|
|
|
|
|
- File size = 1260 |
|
|
|
|
|
|
|
|
|
|
|
Then Parquet footer is written as part of that 1260 bytes. The footer should say: |
|
|
|
|
|
- Row group/column chunks end at position 1252 |
|
|
|
|
|
- Footer starts at 1252 |
|
|
|
|
|
- File size = 1260 |
|
|
|
|
|
|
|
|
|
|
|
When reading: |
|
|
|
|
|
- Read column chunks [0, 1252) |
|
|
|
|
|
- Read footer at [1252, 1260) |
|
|
|
|
|
- Should work! |
|
|
|
|
|
|
|
|
|
|
|
**But the error says trying to read 78 bytes past EOF!** |
|
|
|
|
|
|
|
|
|
|
|
This means Parquet thinks there's data at position 1260-1338, which doesn't exist. |
|
|
|
|
|
|
|
|
|
|
|
The "78 bytes" must be something Parquet calculated incorrectly in the footer metadata! |
|
|
|
|
|
|
|
|
|
|
|
## Next Step |
|
|
|
|
|
|
|
|
|
|
|
We need to: |
|
|
|
|
|
1. Download the actual Parquet file |
|
|
|
|
|
2. Examine its footer with `parquet-tools meta` |
|
|
|
|
|
3. See what offsets/sizes are recorded |
|
|
|
|
|
4. Compare with actual file layout |
|
|
|
|
|
|
|
|
|
|
|
The footer metadata is WRONG, and we need to see exactly HOW it's wrong. |
|
|
|
|
|
|