From 9eb71466d8bb6782473af2f8b2f76d1fb3403c37 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 24 Nov 2025 01:29:23 -0800 Subject: [PATCH] feat: implement flush-on-getPos() to ensure accurate offsets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit IMPLEMENTATION: - Added buffer flush in getPos() before returning position - Every getPos() call now flushes buffered data - Updated FSDataOutputStream wrappers to handle IOException - Extensive debug logging added RESULT: - Flushing is working ✓ (logs confirm) - File size is correct (1260 bytes) ✓ - EOF exception STILL PERSISTS ❌ DEEPER ROOT CAUSE DISCOVERED: Parquet records offsets when getPos() is called, THEN writes more data, THEN writes footer with those recorded (now stale) offsets. Example: 1. Write data → getPos() returns 100 → Parquet stores '100' 2. Write dictionary (no getPos()) 3. Write footer containing '100' (but actual offset is now 110) Flush-on-getPos() doesn't help because Parquet uses the RETURNED VALUE, not the current position when writing footer. NEXT: Need to investigate Parquet's footer writing or disable buffering entirely. --- .../seaweedfs/client/SeaweedOutputStream.java | 50 +++++-- .../java/seaweed/hdfs/SeaweedFileSystem.java | 28 ++-- test/java/spark/FLUSH_ON_GETPOS_STATUS.md | 139 ++++++++++++++++++ test/java/spark/ReadParquetMeta.java | 39 +++++ test/java/spark/capture-parquet.sh | 50 +++++++ 5 files changed, 281 insertions(+), 25 deletions(-) create mode 100644 test/java/spark/FLUSH_ON_GETPOS_STATUS.md create mode 100644 test/java/spark/ReadParquetMeta.java create mode 100755 test/java/spark/capture-parquet.sh diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index fc59f3c0d..ab47bcbea 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -101,7 +101,18 @@ public class SeaweedOutputStream extends OutputStream { * * @return current position (flushed + buffered bytes) */ - public synchronized long getPos() { + public synchronized long getPos() throws IOException { + // CRITICAL FIX: Flush buffer before returning position + // This ensures Parquet (and other clients) get accurate offsets based on committed data + // Without this, Parquet records stale offsets and fails with EOF exceptions + if (buffer.position() > 0) { + if (path.contains("parquet")) { + LOG.warn("[DEBUG-2024] getPos() FLUSHING buffer ({} bytes) before returning position, path={}", + buffer.position(), path.substring(path.lastIndexOf('/') + 1)); + } + writeCurrentBufferToService(); + } + if (path.contains("parquet")) { // Get caller info for debugging StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); @@ -113,11 +124,11 @@ public class SeaweedOutputStream extends OutputStream { } LOG.warn( - "[DEBUG-2024] getPos() called by {}: returning VIRTUAL position={} (flushed={} buffered={}) totalBytesWritten={} writeCalls={} path={}", - caller, virtualPosition, position, buffer.position(), totalBytesWritten, writeCallCount, + "[DEBUG-2024] getPos() called by {}: returning position={} (all data flushed) totalBytesWritten={} writeCalls={} path={}", + caller, position, totalBytesWritten, writeCallCount, path.substring(Math.max(0, path.length() - 80))); // Last 80 chars of path } - return virtualPosition; // Return virtual position (always accurate) + return position; // Return position (now guaranteed to be accurate after flush) } public static String getParentDirectory(String path) { @@ -189,16 +200,16 @@ public class SeaweedOutputStream extends OutputStream { totalBytesWritten += length; writeCallCount++; virtualPosition += length; // Update virtual position for getPos() - + // Enhanced debug logging for ALL writes to track the exact sequence if (path.contains("parquet") || path.contains("employees")) { long beforeBufferPos = buffer.position(); - + // Always log writes to see the complete pattern if (length >= 20 || writeCallCount >= 220 || writeCallCount % 50 == 0) { LOG.warn( "[DEBUG-2024] WRITE #{}: {} bytes | virtualPos={} (flushed={} + buffered={}) | totalWritten={} | file={}", - writeCallCount, length, virtualPosition, position, beforeBufferPos, + writeCallCount, length, virtualPosition, position, beforeBufferPos, totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); } } @@ -410,17 +421,21 @@ public class SeaweedOutputStream extends OutputStream { protected synchronized void flushInternal() throws IOException { if (path.contains("parquet") || path.contains("employees")) { - LOG.warn("[DEBUG-2024] flushInternal() START: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}", - virtualPosition, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); + LOG.warn( + "[DEBUG-2024] flushInternal() START: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}", + virtualPosition, position, buffer.position(), totalBytesWritten, + path.substring(path.lastIndexOf('/') + 1)); } - + maybeThrowLastError(); writeCurrentBufferToService(); flushWrittenBytesToService(); - + if (path.contains("parquet") || path.contains("employees")) { - LOG.warn("[DEBUG-2024] flushInternal() END: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}", - virtualPosition, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); + LOG.warn( + "[DEBUG-2024] flushInternal() END: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}", + virtualPosition, position, buffer.position(), totalBytesWritten, + path.substring(path.lastIndexOf('/') + 1)); } } @@ -428,7 +443,8 @@ public class SeaweedOutputStream extends OutputStream { if (path.contains("parquet") || path.contains("employees")) { LOG.warn( "[DEBUG-2024] flushInternalAsync() START: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}", - virtualPosition, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); + virtualPosition, position, buffer.position(), totalBytesWritten, + path.substring(path.lastIndexOf('/') + 1)); } maybeThrowLastError(); @@ -436,8 +452,10 @@ public class SeaweedOutputStream extends OutputStream { flushWrittenBytesToServiceAsync(); if (path.contains("parquet") || path.contains("employees")) { - LOG.warn("[DEBUG-2024] flushInternalAsync() END: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}", - virtualPosition, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); + LOG.warn( + "[DEBUG-2024] flushInternalAsync() END: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}", + virtualPosition, position, buffer.position(), totalBytesWritten, + path.substring(path.lastIndexOf('/') + 1)); } } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 07410a065..88a4c2102 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -120,10 +120,15 @@ public class SeaweedFileSystem extends FileSystem { return new FSDataOutputStream(outputStream, statistics) { @Override public long getPos() { - long pos = outputStream.getPos(); - LOG.warn("[DEBUG-2024] FSDataOutputStream.getPos() override called! Returning: {} for path: {}", - pos, finalPath); - return pos; + try { + long pos = outputStream.getPos(); + LOG.warn("[DEBUG-2024] FSDataOutputStream.getPos() override called! Returning: {} for path: {}", + pos, finalPath); + return pos; + } catch (IOException e) { + LOG.error("[DEBUG-2024] IOException in getPos(), wrapping as RuntimeException", e); + throw new RuntimeException("Failed to get position", e); + } } }; } catch (Exception ex) { @@ -176,11 +181,16 @@ public class SeaweedFileSystem extends FileSystem { return new FSDataOutputStream(outputStream, statistics) { @Override public long getPos() { - long pos = outputStream.getPos(); - LOG.warn( - "[DEBUG-2024] FSDataOutputStream.getPos() override called (append)! Returning: {} for path: {}", - pos, finalPath); - return pos; + try { + long pos = outputStream.getPos(); + LOG.warn( + "[DEBUG-2024] FSDataOutputStream.getPos() override called (append)! Returning: {} for path: {}", + pos, finalPath); + return pos; + } catch (IOException e) { + LOG.error("[DEBUG-2024] IOException in getPos() (append), wrapping as RuntimeException", e); + throw new RuntimeException("Failed to get position", e); + } } }; } catch (Exception ex) { diff --git a/test/java/spark/FLUSH_ON_GETPOS_STATUS.md b/test/java/spark/FLUSH_ON_GETPOS_STATUS.md new file mode 100644 index 000000000..974234410 --- /dev/null +++ b/test/java/spark/FLUSH_ON_GETPOS_STATUS.md @@ -0,0 +1,139 @@ +# Flush-on-getPos() Implementation: Status + +## Implementation + +Added flush-on-getPos() logic to `SeaweedOutputStream`: +```java +public synchronized long getPos() throws IOException { + // Flush buffer before returning position + if (buffer.position() > 0) { + writeCurrentBufferToService(); + } + return position; // Now accurate after flush +} +``` + +## Test Results + +### ✅ What Works +1. **Flushing is happening**: Logs show "FLUSHING buffer (X bytes)" before each getPos() call +2. **Many small flushes**: Each getPos() call flushes whatever is in the buffer +3. **File size is correct**: FileStatus shows length=1260 bytes ✓ +4. **File is written successfully**: The parquet file exists and has the correct size + +### ❌ What Still Fails +**EOF Exception PERSISTS**: `EOFException: Reached the end of stream. Still have: 78 bytes left` + +## Root Cause: Deeper Than Expected + +The problem is NOT just about getPos() returning stale values. Even with flush-on-getPos(): + +1. **Parquet writes column chunks** → calls getPos() → **gets flushed position** +2. **Parquet internally records these offsets** in memory +3. **Parquet writes more data** (dictionary, headers, etc.) +4. **Parquet writes footer** containing the RECORDED offsets (from step 2) +5. **Problem**: The recorded offsets are relative to when they were captured, but subsequent writes shift everything + +## The Real Issue: Relative vs. Absolute Offsets + +Parquet's write pattern: +``` +Write A (100 bytes) → getPos() returns 100 → Parquet records "A is at offset 100" +Write B (50 bytes) → getPos() returns 150 → Parquet records "B is at offset 150" +Write dictionary → No getPos()! +Write footer → Contains: "A at 100, B at 150" + +But the actual file structure is: +[A: 0-100] [B: 100-150] [dict: 150-160] [footer: 160-end] + +When reading: +Parquet seeks to offset 100 (expecting A) → But that's where B is! +Result: EOF exception +``` + +## Why Flush-on-getPos() Doesn't Help + +Even though we flush on getPos(), Parquet: +1. Records the offset VALUE (e.g., "100") +2. Writes more data AFTER recording but BEFORE writing footer +3. Footer contains the recorded values (which are now stale) + +## The Fundamental Problem + +**Parquet assumes an unbuffered stream where:** +- `getPos()` returns the EXACT byte offset in the final file +- No data will be written between when `getPos()` is called and when the footer is written + +**SeaweedFS uses a buffered stream where:** +- Data is written to buffer first, then flushed +- Multiple operations can happen between getPos() calls +- Footer metadata itself gets written AFTER Parquet records all offsets + +## Why This Works in HDFS/S3 + +They likely use one of these approaches: +1. **Completely unbuffered for Parquet** - Every write goes directly to disk +2. **Syncable.hflush() contract** - Parquet calls hflush() at key points +3. **Different file format handling** - Special case for Parquet writes + +## Next Steps: Possible Solutions + +### Option A: Disable Buffering for Parquet +```java +if (path.endsWith(".parquet")) { + this.bufferSize = 1; // Effectively unbuffered +} +``` +**Pros**: Guaranteed correct offsets +**Cons**: Terrible performance + +### Option B: Implement Syncable.hflush() +Make Parquet call `hflush()` instead of just `flush()`: +```java +@Override +public void hflush() throws IOException { + writeCurrentBufferToService(); + flushWrittenBytesToService(); +} +``` +**Pros**: Clean, follows Hadoop contract +**Cons**: Requires Parquet/Spark to use hflush() (they might not) + +### Option C: Post-Process Parquet Files +After writing, re-read and fix the footer offsets: +```java +// After close, update footer with correct offsets +``` +**Pros**: No performance impact during write +**Cons**: Complex, fragile + +### Option D: Investigate Parquet Footer Writing +Look at Parquet source code to understand WHEN it writes the footer relative to getPos() calls. +Maybe we can intercept at the right moment. + +## Recommendation + +**Check if Parquet/Spark uses Syncable.hflush()**: +1. Look at Parquet writer source code +2. Check if it calls `hflush()` or just `flush()` +3. If it uses `hflush()`, implement it properly +4. If not, we may need Option A (disable buffering) + +## Files Modified + +- `other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java` + - Added flush in `getPos()` + - Changed return to `position` (after flush) + +- `other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java` + - Updated FSDataOutputStream wrappers to handle IOException + +## Status + +- ✅ Flush-on-getPos() implemented +- ✅ Flushing is working (logs confirm) +- ❌ EOF exception persists +- ⏭️ Need to investigate Parquet's footer writing mechanism + +The fix is not complete. The problem is more fundamental than we initially thought. + diff --git a/test/java/spark/ReadParquetMeta.java b/test/java/spark/ReadParquetMeta.java new file mode 100644 index 000000000..74641a485 --- /dev/null +++ b/test/java/spark/ReadParquetMeta.java @@ -0,0 +1,39 @@ +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +public class ReadParquetMeta { + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + Path path = new Path(args[0]); + HadoopInputFile inputFile = HadoopInputFile.fromPath(path, conf); + + try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) { + ParquetMetadata meta = reader.getFooter(); + + System.out.println("=== Parquet File Metadata ==="); + System.out.println("Blocks (row groups): " + meta.getBlocks().size()); + System.out.println("File size from footer: " + inputFile.getLength()); + System.out.println(""); + + meta.getBlocks().forEach(block -> { + System.out.println("Row Group:"); + System.out.println(" Rows: " + block.getRowCount()); + System.out.println(" Total byte size: " + block.getTotalByteSize()); + System.out.println(" Columns: " + block.getColumns().size()); + System.out.println(""); + + block.getColumns().forEach(col -> { + System.out.println(" Column: " + col.getPath()); + System.out.println(" First data page offset: " + col.getFirstDataPageOffset()); + System.out.println(" Dictionary page offset: " + col.getDictionaryPageOffset()); + System.out.println(" Total size: " + col.getTotalSize()); + System.out.println(" Total uncompressed size: " + col.getTotalUncompressedSize()); + System.out.println(""); + }); + }); + } + } +} diff --git a/test/java/spark/capture-parquet.sh b/test/java/spark/capture-parquet.sh new file mode 100755 index 000000000..18e608fda --- /dev/null +++ b/test/java/spark/capture-parquet.sh @@ -0,0 +1,50 @@ +#!/bin/bash +# Run Spark test and capture the Parquet file before cleanup + +echo "Starting SeaweedFS services..." +docker compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer +sleep 10 + +echo "Running Spark test in background..." +docker compose run --rm -e SEAWEEDFS_TEST_ENABLED=true spark-tests bash -c "mvn test -Dtest=SparkSQLTest#testCreateTableAndQuery 2>&1" > /tmp/spark-test-capture.log & +TEST_PID=$! + +echo "Monitoring for Parquet file creation..." +while kill -0 $TEST_PID 2>/dev/null; do + # Check if employees directory exists + FILES=$(curl -s http://localhost:8888/test-spark/employees/ 2>/dev/null | grep -o 'part-[^"]*\.parquet' || echo "") + if [ -n "$FILES" ]; then + echo "Found Parquet file(s)!" + for FILE in $FILES; do + echo "Downloading: $FILE" + curl -s "http://localhost:8888/test-spark/employees/$FILE" > "/tmp/$FILE" + FILE_SIZE=$(stat -f%z "/tmp/$FILE" 2>/dev/null || stat --format=%s "/tmp/$FILE" 2>/dev/null) + echo "Downloaded $FILE: $FILE_SIZE bytes" + + if [ -f "/tmp/$FILE" ] && [ $FILE_SIZE -gt 0 ]; then + echo "SUCCESS: Captured $FILE" + echo "Installing parquet-tools..." + pip3 install -q parquet-tools 2>/dev/null || echo "parquet-tools might already be installed" + + echo "" + echo "=== Parquet File Metadata ===" + python3 -m parquet_tools meta "/tmp/$FILE" || echo "parquet-tools failed" + + echo "" + echo "=== File Header (first 100 bytes) ===" + hexdump -C "/tmp/$FILE" | head -10 + + echo "" + echo "=== File Footer (last 100 bytes) ===" + tail -c 100 "/tmp/$FILE" | hexdump -C + + kill $TEST_PID 2>/dev/null + exit 0 + fi + done + fi + sleep 0.5 +done + +echo "Test completed, checking logs..." +tail -50 /tmp/spark-test-capture.log