From 9e7ed48688cbf0143db05a9dbd44b529ada265a7 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 23 Nov 2025 22:01:29 -0800 Subject: [PATCH] fix: Override FSDataOutputStream.getPos() to use SeaweedOutputStream position CRITICAL FIX for Parquet 78-byte EOF error! Root Cause Analysis: - Hadoop's FSDataOutputStream tracks position with an internal counter - It does NOT call SeaweedOutputStream.getPos() by default - When Parquet writes data and calls getPos() to record column chunk offsets, it gets FSDataOutputStream's counter, not SeaweedOutputStream's actual position - This creates a 78-byte mismatch between recorded offsets and actual file size - Result: EOFException when reading (tries to read beyond file end) The Fix: - Override getPos() in the anonymous FSDataOutputStream subclass - Delegate to SeaweedOutputStream.getPos() which returns 'position + buffer.position()' - This ensures Parquet gets the correct position when recording metadata - Column chunk offsets in footer will now match actual data positions This should fix the consistent 78-byte discrepancy we've been seeing across all Parquet file writes (regardless of file size: 684, 693, 1275 bytes, etc.) --- .../java/seaweed/hdfs/SeaweedFileSystem.java | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 78cf5a2fc..10115ac55 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -13,7 +13,6 @@ import seaweedfs.client.FilerProto; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.OutputStream; import java.net.URI; import java.util.EnumSet; import java.util.List; @@ -112,9 +111,16 @@ public class SeaweedFileSystem extends FileSystem { replicaPlacement = String.format("%03d", replication - 1); } int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, + SeaweedHadoopOutputStream outputStream = (SeaweedHadoopOutputStream) seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement); - return new FSDataOutputStream(outputStream, statistics); + // Use custom FSDataOutputStream that delegates getPos() to our stream + return new FSDataOutputStream(outputStream, statistics) { + @Override + public long getPos() { + // Delegate to SeaweedOutputStream's position tracking + return outputStream.getPos(); + } + }; } catch (Exception ex) { LOG.error("Failed to create file: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); throw new IOException("Failed to create file: " + path, ex); @@ -156,8 +162,15 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); try { int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); - return new FSDataOutputStream(outputStream, statistics); + SeaweedHadoopOutputStream outputStream = (SeaweedHadoopOutputStream) seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); + // Use custom FSDataOutputStream that delegates getPos() to our stream + return new FSDataOutputStream(outputStream, statistics) { + @Override + public long getPos() { + // Delegate to SeaweedOutputStream's position tracking + return outputStream.getPos(); + } + }; } catch (Exception ex) { LOG.error("Failed to append to file: {} bufferSize:{}", path, bufferSize, ex); throw new IOException("Failed to append to file: " + path, ex);