Browse Source

feat: implement flush-on-getPos() to ensure accurate offsets

IMPLEMENTATION:
- Added buffer flush in getPos() before returning position
- Every getPos() call now flushes buffered data
- Updated FSDataOutputStream wrappers to handle IOException
- Extensive debug logging added

RESULT:
- Flushing is working ✓ (logs confirm)
- File size is correct (1260 bytes) ✓
- EOF exception STILL PERSISTS 

DEEPER ROOT CAUSE DISCOVERED:
Parquet records offsets when getPos() is called, THEN writes more data,
THEN writes footer with those recorded (now stale) offsets.

Example:
1. Write data → getPos() returns 100 → Parquet stores '100'
2. Write dictionary (no getPos())
3. Write footer containing '100' (but actual offset is now 110)

Flush-on-getPos() doesn't help because Parquet uses the RETURNED VALUE,
not the current position when writing footer.

NEXT: Need to investigate Parquet's footer writing or disable buffering entirely.
pull/7526/head
chrislu 1 week ago
parent
commit
9eb71466d8
  1. 50
      other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
  2. 28
      other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
  3. 139
      test/java/spark/FLUSH_ON_GETPOS_STATUS.md
  4. 39
      test/java/spark/ReadParquetMeta.java
  5. 50
      test/java/spark/capture-parquet.sh

50
other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

@ -101,7 +101,18 @@ public class SeaweedOutputStream extends OutputStream {
*
* @return current position (flushed + buffered bytes)
*/
public synchronized long getPos() {
public synchronized long getPos() throws IOException {
// CRITICAL FIX: Flush buffer before returning position
// This ensures Parquet (and other clients) get accurate offsets based on committed data
// Without this, Parquet records stale offsets and fails with EOF exceptions
if (buffer.position() > 0) {
if (path.contains("parquet")) {
LOG.warn("[DEBUG-2024] getPos() FLUSHING buffer ({} bytes) before returning position, path={}",
buffer.position(), path.substring(path.lastIndexOf('/') + 1));
}
writeCurrentBufferToService();
}
if (path.contains("parquet")) {
// Get caller info for debugging
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
@ -113,11 +124,11 @@ public class SeaweedOutputStream extends OutputStream {
}
LOG.warn(
"[DEBUG-2024] getPos() called by {}: returning VIRTUAL position={} (flushed={} buffered={}) totalBytesWritten={} writeCalls={} path={}",
caller, virtualPosition, position, buffer.position(), totalBytesWritten, writeCallCount,
"[DEBUG-2024] getPos() called by {}: returning position={} (all data flushed) totalBytesWritten={} writeCalls={} path={}",
caller, position, totalBytesWritten, writeCallCount,
path.substring(Math.max(0, path.length() - 80))); // Last 80 chars of path
}
return virtualPosition; // Return virtual position (always accurate)
return position; // Return position (now guaranteed to be accurate after flush)
}
public static String getParentDirectory(String path) {
@ -189,16 +200,16 @@ public class SeaweedOutputStream extends OutputStream {
totalBytesWritten += length;
writeCallCount++;
virtualPosition += length; // Update virtual position for getPos()
// Enhanced debug logging for ALL writes to track the exact sequence
if (path.contains("parquet") || path.contains("employees")) {
long beforeBufferPos = buffer.position();
// Always log writes to see the complete pattern
if (length >= 20 || writeCallCount >= 220 || writeCallCount % 50 == 0) {
LOG.warn(
"[DEBUG-2024] WRITE #{}: {} bytes | virtualPos={} (flushed={} + buffered={}) | totalWritten={} | file={}",
writeCallCount, length, virtualPosition, position, beforeBufferPos,
writeCallCount, length, virtualPosition, position, beforeBufferPos,
totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
}
@ -410,17 +421,21 @@ public class SeaweedOutputStream extends OutputStream {
protected synchronized void flushInternal() throws IOException {
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flushInternal() START: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}",
virtualPosition, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
LOG.warn(
"[DEBUG-2024] flushInternal() START: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}",
virtualPosition, position, buffer.position(), totalBytesWritten,
path.substring(path.lastIndexOf('/') + 1));
}
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToService();
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flushInternal() END: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}",
virtualPosition, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
LOG.warn(
"[DEBUG-2024] flushInternal() END: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}",
virtualPosition, position, buffer.position(), totalBytesWritten,
path.substring(path.lastIndexOf('/') + 1));
}
}
@ -428,7 +443,8 @@ public class SeaweedOutputStream extends OutputStream {
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn(
"[DEBUG-2024] flushInternalAsync() START: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}",
virtualPosition, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
virtualPosition, position, buffer.position(), totalBytesWritten,
path.substring(path.lastIndexOf('/') + 1));
}
maybeThrowLastError();
@ -436,8 +452,10 @@ public class SeaweedOutputStream extends OutputStream {
flushWrittenBytesToServiceAsync();
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flushInternalAsync() END: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}",
virtualPosition, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
LOG.warn(
"[DEBUG-2024] flushInternalAsync() END: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}",
virtualPosition, position, buffer.position(), totalBytesWritten,
path.substring(path.lastIndexOf('/') + 1));
}
}

28
other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java

@ -120,10 +120,15 @@ public class SeaweedFileSystem extends FileSystem {
return new FSDataOutputStream(outputStream, statistics) {
@Override
public long getPos() {
long pos = outputStream.getPos();
LOG.warn("[DEBUG-2024] FSDataOutputStream.getPos() override called! Returning: {} for path: {}",
pos, finalPath);
return pos;
try {
long pos = outputStream.getPos();
LOG.warn("[DEBUG-2024] FSDataOutputStream.getPos() override called! Returning: {} for path: {}",
pos, finalPath);
return pos;
} catch (IOException e) {
LOG.error("[DEBUG-2024] IOException in getPos(), wrapping as RuntimeException", e);
throw new RuntimeException("Failed to get position", e);
}
}
};
} catch (Exception ex) {
@ -176,11 +181,16 @@ public class SeaweedFileSystem extends FileSystem {
return new FSDataOutputStream(outputStream, statistics) {
@Override
public long getPos() {
long pos = outputStream.getPos();
LOG.warn(
"[DEBUG-2024] FSDataOutputStream.getPos() override called (append)! Returning: {} for path: {}",
pos, finalPath);
return pos;
try {
long pos = outputStream.getPos();
LOG.warn(
"[DEBUG-2024] FSDataOutputStream.getPos() override called (append)! Returning: {} for path: {}",
pos, finalPath);
return pos;
} catch (IOException e) {
LOG.error("[DEBUG-2024] IOException in getPos() (append), wrapping as RuntimeException", e);
throw new RuntimeException("Failed to get position", e);
}
}
};
} catch (Exception ex) {

139
test/java/spark/FLUSH_ON_GETPOS_STATUS.md

@ -0,0 +1,139 @@
# Flush-on-getPos() Implementation: Status
## Implementation
Added flush-on-getPos() logic to `SeaweedOutputStream`:
```java
public synchronized long getPos() throws IOException {
// Flush buffer before returning position
if (buffer.position() > 0) {
writeCurrentBufferToService();
}
return position; // Now accurate after flush
}
```
## Test Results
### ✅ What Works
1. **Flushing is happening**: Logs show "FLUSHING buffer (X bytes)" before each getPos() call
2. **Many small flushes**: Each getPos() call flushes whatever is in the buffer
3. **File size is correct**: FileStatus shows length=1260 bytes ✓
4. **File is written successfully**: The parquet file exists and has the correct size
### ❌ What Still Fails
**EOF Exception PERSISTS**: `EOFException: Reached the end of stream. Still have: 78 bytes left`
## Root Cause: Deeper Than Expected
The problem is NOT just about getPos() returning stale values. Even with flush-on-getPos():
1. **Parquet writes column chunks** → calls getPos() → **gets flushed position**
2. **Parquet internally records these offsets** in memory
3. **Parquet writes more data** (dictionary, headers, etc.)
4. **Parquet writes footer** containing the RECORDED offsets (from step 2)
5. **Problem**: The recorded offsets are relative to when they were captured, but subsequent writes shift everything
## The Real Issue: Relative vs. Absolute Offsets
Parquet's write pattern:
```
Write A (100 bytes) → getPos() returns 100 → Parquet records "A is at offset 100"
Write B (50 bytes) → getPos() returns 150 → Parquet records "B is at offset 150"
Write dictionary → No getPos()!
Write footer → Contains: "A at 100, B at 150"
But the actual file structure is:
[A: 0-100] [B: 100-150] [dict: 150-160] [footer: 160-end]
When reading:
Parquet seeks to offset 100 (expecting A) → But that's where B is!
Result: EOF exception
```
## Why Flush-on-getPos() Doesn't Help
Even though we flush on getPos(), Parquet:
1. Records the offset VALUE (e.g., "100")
2. Writes more data AFTER recording but BEFORE writing footer
3. Footer contains the recorded values (which are now stale)
## The Fundamental Problem
**Parquet assumes an unbuffered stream where:**
- `getPos()` returns the EXACT byte offset in the final file
- No data will be written between when `getPos()` is called and when the footer is written
**SeaweedFS uses a buffered stream where:**
- Data is written to buffer first, then flushed
- Multiple operations can happen between getPos() calls
- Footer metadata itself gets written AFTER Parquet records all offsets
## Why This Works in HDFS/S3
They likely use one of these approaches:
1. **Completely unbuffered for Parquet** - Every write goes directly to disk
2. **Syncable.hflush() contract** - Parquet calls hflush() at key points
3. **Different file format handling** - Special case for Parquet writes
## Next Steps: Possible Solutions
### Option A: Disable Buffering for Parquet
```java
if (path.endsWith(".parquet")) {
this.bufferSize = 1; // Effectively unbuffered
}
```
**Pros**: Guaranteed correct offsets
**Cons**: Terrible performance
### Option B: Implement Syncable.hflush()
Make Parquet call `hflush()` instead of just `flush()`:
```java
@Override
public void hflush() throws IOException {
writeCurrentBufferToService();
flushWrittenBytesToService();
}
```
**Pros**: Clean, follows Hadoop contract
**Cons**: Requires Parquet/Spark to use hflush() (they might not)
### Option C: Post-Process Parquet Files
After writing, re-read and fix the footer offsets:
```java
// After close, update footer with correct offsets
```
**Pros**: No performance impact during write
**Cons**: Complex, fragile
### Option D: Investigate Parquet Footer Writing
Look at Parquet source code to understand WHEN it writes the footer relative to getPos() calls.
Maybe we can intercept at the right moment.
## Recommendation
**Check if Parquet/Spark uses Syncable.hflush()**:
1. Look at Parquet writer source code
2. Check if it calls `hflush()` or just `flush()`
3. If it uses `hflush()`, implement it properly
4. If not, we may need Option A (disable buffering)
## Files Modified
- `other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java`
- Added flush in `getPos()`
- Changed return to `position` (after flush)
- `other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java`
- Updated FSDataOutputStream wrappers to handle IOException
## Status
- ✅ Flush-on-getPos() implemented
- ✅ Flushing is working (logs confirm)
- ❌ EOF exception persists
- ⏭️ Need to investigate Parquet's footer writing mechanism
The fix is not complete. The problem is more fundamental than we initially thought.

39
test/java/spark/ReadParquetMeta.java

@ -0,0 +1,39 @@
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
public class ReadParquetMeta {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Path path = new Path(args[0]);
HadoopInputFile inputFile = HadoopInputFile.fromPath(path, conf);
try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) {
ParquetMetadata meta = reader.getFooter();
System.out.println("=== Parquet File Metadata ===");
System.out.println("Blocks (row groups): " + meta.getBlocks().size());
System.out.println("File size from footer: " + inputFile.getLength());
System.out.println("");
meta.getBlocks().forEach(block -> {
System.out.println("Row Group:");
System.out.println(" Rows: " + block.getRowCount());
System.out.println(" Total byte size: " + block.getTotalByteSize());
System.out.println(" Columns: " + block.getColumns().size());
System.out.println("");
block.getColumns().forEach(col -> {
System.out.println(" Column: " + col.getPath());
System.out.println(" First data page offset: " + col.getFirstDataPageOffset());
System.out.println(" Dictionary page offset: " + col.getDictionaryPageOffset());
System.out.println(" Total size: " + col.getTotalSize());
System.out.println(" Total uncompressed size: " + col.getTotalUncompressedSize());
System.out.println("");
});
});
}
}
}

50
test/java/spark/capture-parquet.sh

@ -0,0 +1,50 @@
#!/bin/bash
# Run Spark test and capture the Parquet file before cleanup
echo "Starting SeaweedFS services..."
docker compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer
sleep 10
echo "Running Spark test in background..."
docker compose run --rm -e SEAWEEDFS_TEST_ENABLED=true spark-tests bash -c "mvn test -Dtest=SparkSQLTest#testCreateTableAndQuery 2>&1" > /tmp/spark-test-capture.log &
TEST_PID=$!
echo "Monitoring for Parquet file creation..."
while kill -0 $TEST_PID 2>/dev/null; do
# Check if employees directory exists
FILES=$(curl -s http://localhost:8888/test-spark/employees/ 2>/dev/null | grep -o 'part-[^"]*\.parquet' || echo "")
if [ -n "$FILES" ]; then
echo "Found Parquet file(s)!"
for FILE in $FILES; do
echo "Downloading: $FILE"
curl -s "http://localhost:8888/test-spark/employees/$FILE" > "/tmp/$FILE"
FILE_SIZE=$(stat -f%z "/tmp/$FILE" 2>/dev/null || stat --format=%s "/tmp/$FILE" 2>/dev/null)
echo "Downloaded $FILE: $FILE_SIZE bytes"
if [ -f "/tmp/$FILE" ] && [ $FILE_SIZE -gt 0 ]; then
echo "SUCCESS: Captured $FILE"
echo "Installing parquet-tools..."
pip3 install -q parquet-tools 2>/dev/null || echo "parquet-tools might already be installed"
echo ""
echo "=== Parquet File Metadata ==="
python3 -m parquet_tools meta "/tmp/$FILE" || echo "parquet-tools failed"
echo ""
echo "=== File Header (first 100 bytes) ==="
hexdump -C "/tmp/$FILE" | head -10
echo ""
echo "=== File Footer (last 100 bytes) ==="
tail -c 100 "/tmp/$FILE" | hexdump -C
kill $TEST_PID 2>/dev/null
exit 0
fi
done
fi
sleep 0.5
done
echo "Test completed, checking logs..."
tail -50 /tmp/spark-test-capture.log
Loading…
Cancel
Save