diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 59933b511..d5ab76aa3 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -103,34 +103,27 @@ public class SeaweedOutputStream extends OutputStream { * @return current position (flushed + buffered bytes) */ public synchronized long getPos() throws IOException { - // EXPERIMENT: NO flushes during getPos() - only flush on close() - // Testing: 17 chunks=78 bytes, 10 chunks=78 bytes, now trying 1 chunk getPosCallCount++; - // DO NOT FLUSH - just track for logging - if (path.contains("parquet") && buffer.position() > 0) { - LOG.warn("[DEBUG-2024] getPos() #{} SKIPPING FLUSH (buffered={} bytes, will create single chunk on close)", - getPosCallCount, buffer.position()); + // CRITICAL FIX: Flush buffer before returning position! + // Parquet records offsets from getPos() and expects them to match actual file layout. + // If we return virtualPosition (flushed + buffered) without flushing, the offsets + // will be wrong after the buffer is finally flushed on close(). + if (buffer.position() > 0) { + if (path.contains("parquet")) { + LOG.warn("[DEBUG-2024] getPos() #{} FLUSHING {} buffered bytes before returning position", + getPosCallCount, buffer.position()); + } + writeCurrentBufferToService(); } - // Return virtual position (flushed + buffered) to account for unflushed data - long returnPos = position + buffer.position(); - if (path.contains("parquet")) { - StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); - String caller = "unknown"; - if (stackTrace.length > 2) { - StackTraceElement callerElement = stackTrace[2]; - caller = callerElement.getClassName() + "." + callerElement.getMethodName() + ":" - + callerElement.getLineNumber(); - } - - LOG.warn( - "[DEBUG-2024] getPos() #{} called by {}: returning virtualPos={} (flushed={} buffered={}) totalBytesWritten={} writeCalls={} path={}", - getPosCallCount, caller, returnPos, position, buffer.position(), totalBytesWritten, writeCallCount, - path.substring(Math.max(0, path.length() - 80))); + LOG.warn("[DEBUG-2024] getPos() #{}: returning position={} (flushed, buffer now empty) totalBytesWritten={} writeCalls={}", + getPosCallCount, position, totalBytesWritten, writeCallCount); } - return returnPos; // Return virtual position (includes buffered data) + + // Return actual flushed position (buffer is now empty) + return position; } public static String getParentDirectory(String path) {