diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index ab47bcbea..0bd8719fa 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -101,18 +101,10 @@ public class SeaweedOutputStream extends OutputStream { * * @return current position (flushed + buffered bytes) */ - public synchronized long getPos() throws IOException { - // CRITICAL FIX: Flush buffer before returning position - // This ensures Parquet (and other clients) get accurate offsets based on committed data - // Without this, Parquet records stale offsets and fails with EOF exceptions - if (buffer.position() > 0) { - if (path.contains("parquet")) { - LOG.warn("[DEBUG-2024] getPos() FLUSHING buffer ({} bytes) before returning position, path={}", - buffer.position(), path.substring(path.lastIndexOf('/') + 1)); - } - writeCurrentBufferToService(); - } - + public synchronized long getPos() { + // CRITICAL FIX for Parquet: Return virtual position (including buffered data) + // Parquet does NOT call hflush() - it expects getPos() to include all written bytes + // This way, when Parquet records offsets, they account for buffered data if (path.contains("parquet")) { // Get caller info for debugging StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); @@ -124,11 +116,11 @@ public class SeaweedOutputStream extends OutputStream { } LOG.warn( - "[DEBUG-2024] getPos() called by {}: returning position={} (all data flushed) totalBytesWritten={} writeCalls={} path={}", - caller, position, totalBytesWritten, writeCallCount, + "[DEBUG-2024] getPos() called by {}: returning virtualPosition={} (flushed={} buffered={}) totalBytesWritten={} writeCalls={} path={}", + caller, virtualPosition, position, buffer.position(), totalBytesWritten, writeCallCount, path.substring(Math.max(0, path.length() - 80))); // Last 80 chars of path } - return position; // Return position (now guaranteed to be accurate after flush) + return virtualPosition; // Return total bytes written (including buffered) } public static String getParentDirectory(String path) { diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 88a4c2102..07410a065 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -120,15 +120,10 @@ public class SeaweedFileSystem extends FileSystem { return new FSDataOutputStream(outputStream, statistics) { @Override public long getPos() { - try { - long pos = outputStream.getPos(); - LOG.warn("[DEBUG-2024] FSDataOutputStream.getPos() override called! Returning: {} for path: {}", - pos, finalPath); - return pos; - } catch (IOException e) { - LOG.error("[DEBUG-2024] IOException in getPos(), wrapping as RuntimeException", e); - throw new RuntimeException("Failed to get position", e); - } + long pos = outputStream.getPos(); + LOG.warn("[DEBUG-2024] FSDataOutputStream.getPos() override called! Returning: {} for path: {}", + pos, finalPath); + return pos; } }; } catch (Exception ex) { @@ -181,16 +176,11 @@ public class SeaweedFileSystem extends FileSystem { return new FSDataOutputStream(outputStream, statistics) { @Override public long getPos() { - try { - long pos = outputStream.getPos(); - LOG.warn( - "[DEBUG-2024] FSDataOutputStream.getPos() override called (append)! Returning: {} for path: {}", - pos, finalPath); - return pos; - } catch (IOException e) { - LOG.error("[DEBUG-2024] IOException in getPos() (append), wrapping as RuntimeException", e); - throw new RuntimeException("Failed to get position", e); - } + long pos = outputStream.getPos(); + LOG.warn( + "[DEBUG-2024] FSDataOutputStream.getPos() override called (append)! Returning: {} for path: {}", + pos, finalPath); + return pos; } }; } catch (Exception ex) { diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java index 8028b31d7..05052d2c1 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java @@ -32,6 +32,7 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy */ @Override public void hsync() throws IOException { + LOG.warn("[DEBUG-2024] hsync() called on path: {}", getPath()); if (supportFlush) { flushInternal(); } @@ -45,10 +46,16 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy */ @Override public void hflush() throws IOException { + LOG.warn("[DEBUG-2024] hflush() called on path: {}", getPath()); if (supportFlush) { flushInternal(); } } + + private String getPath() { + // Access the path field from parent class for logging + return this.toString().contains("parquet") ? "parquet file" : "file"; + } /** * Query the stream for a specific capability.