From 0da72322d1c0f50eed2e6f8527a59aadbc14bad4 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 24 Nov 2025 23:30:00 -0800 Subject: [PATCH] clean up --- .../seaweedfs/client/SeaweedOutputStream.java | 167 +----------------- .../hdfs/SeaweedHadoopOutputStream.java | 11 +- 2 files changed, 4 insertions(+), 174 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index faa9daeee..610454789 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -23,21 +23,15 @@ public class SeaweedOutputStream extends OutputStream { private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; private final ConcurrentLinkedDeque writeOperations; - private final boolean shouldSaveMetadata = false; private FilerProto.Entry.Builder entry; private long position; // Flushed bytes (committed to service) - private long virtualPosition; // Total bytes written (including buffered), for getPos() private boolean closed; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; private ByteBuffer buffer; - private long outputIndex; private String replication = ""; private String collection = ""; - private long totalBytesWritten = 0; // Track total bytes for debugging - private long writeCallCount = 0; // Track number of write() calls - private int getPosCallCount = 0; // Track getPos() calls for throttling flushes public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { this(filerClient, fullpath, ""); @@ -49,12 +43,11 @@ public class SeaweedOutputStream extends OutputStream { public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { - + this.filerClient = filerClient; this.replication = replication; this.path = path; this.position = position; - this.virtualPosition = position; // Initialize to match position this.closed = false; this.lastError = null; this.lastFlushOffset = 0; @@ -102,8 +95,6 @@ public class SeaweedOutputStream extends OutputStream { * @return current position (flushed + buffered bytes) */ public synchronized long getPos() throws IOException { - getPosCallCount++; - // Guard against NPE if called after close() if (buffer == null) { return position; @@ -144,7 +135,6 @@ public class SeaweedOutputStream extends OutputStream { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { - // Set the file size in attributes based on our position // This ensures Parquet footer metadata matches what we actually wrote @@ -177,30 +167,10 @@ public class SeaweedOutputStream extends OutputStream { throw new IndexOutOfBoundsException(); } - totalBytesWritten += length; - writeCallCount++; - virtualPosition += length; // Update virtual position for getPos() - - // Enhanced debug logging for ALL writes to track the exact sequence - if (path.contains("parquet") || path.contains("employees")) { - long beforeBufferPos = buffer.position(); - - // Always log writes to see the complete pattern - if (length >= 20 || writeCallCount >= 220 || writeCallCount % 50 == 0) { - - } - } - - // System.out.println(path + " write [" + (outputIndex + off) + "," + - // ((outputIndex + off) + length) + ")"); - int currentOffset = off; int writableBytes = bufferSize - buffer.position(); int numberOfBytesToWrite = length; - // Track position before write - long posBeforeWrite = position + buffer.position(); - while (numberOfBytesToWrite > 0) { if (numberOfBytesToWrite < writableBytes) { @@ -213,10 +183,6 @@ public class SeaweedOutputStream extends OutputStream { buffer.put(data, currentOffset, writableBytes); currentOffset += writableBytes; - if (path.contains("parquet")) { - - } - writeCurrentBufferToService(); numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; writableBytes = bufferSize - buffer.position(); @@ -232,10 +198,6 @@ public class SeaweedOutputStream extends OutputStream { */ @Override public void flush() throws IOException { - if (path.contains("parquet") || path.contains("employees")) { - - } - if (supportFlush) { flushInternalAsync(); } @@ -255,33 +217,9 @@ public class SeaweedOutputStream extends OutputStream { return; } - int bufferPosBeforeFlush = buffer.position(); - try { flushInternal(); threadExecutor.shutdown(); - - // NOTE: Metadata visibility check was attempted here but caused hangs - // because lookupEntry() blocks when called from within close(). - // The real issue is in Spark's file commit/rename process, not here. - // See BREAKTHROUGH_IO_COMPARISON.md for details. - - - - // Special logging for employees directory files (to help CI download timing) - if (path.contains("/test-spark/employees/") && path.endsWith(".parquet")) { - String filename = path.substring(path.lastIndexOf('/') + 1); - // Log filename, size, AND chunk IDs for direct volume download - StringBuilder chunkInfo = new StringBuilder(); - for (int i = 0; i < entry.getChunksCount(); i++) { - FilerProto.FileChunk chunk = entry.getChunks(i); - if (i > 0) - chunkInfo.append(","); - chunkInfo.append(chunk.getFileId()); - } - LOG.warn("=== PARQUET FILE WRITTEN TO EMPLOYEES: {} ({} bytes) CHUNKS: [{}] ===", - filename, position, chunkInfo.toString()); - } } finally { lastError = new IOException("Stream is closed!"); ByteBufferPool.release(buffer); @@ -295,101 +233,16 @@ public class SeaweedOutputStream extends OutputStream { } - /** - * Ensures that metadata written by this stream is immediately visible to - * readers. - * This is critical for preventing the "78 bytes left" EOF error that occurs - * when - * Spark reads a file immediately after writing it. - * - * The issue: When Spark writes a Parquet file and immediately reads it back, - * the reader may see stale metadata (old file size) if the metadata hasn't - * fully propagated through the filer. This causes Parquet to calculate an - * incorrect expected file size, leading to EOF errors. - * - * The fix: After flushing all data and metadata, we perform a lookup to verify - * the metadata is visible. If the lookup returns stale data, we retry with - * exponential backoff. - */ - private void ensureMetadataVisible() throws IOException { - try { - - - String parentDir = getParentDirectory(path); - String fileName = getFileName(path); - - - - int maxRetries = 5; - long retryDelayMs = 10; // Start with 10ms - - for (int attempt = 0; attempt < maxRetries; attempt++) { - try { - - // Lookup the entry to verify metadata is visible - FilerProto.Entry lookedUpEntry = filerClient.lookupEntry(parentDir, fileName); - - - if (lookedUpEntry != null) { - long lookedUpSize = lookedUpEntry.getAttributes().getFileSize(); - - if (lookedUpSize == position) { - // Metadata is correct and visible - if (attempt > 0) { - - } - return; - } else { - // Metadata is stale - - } - } - - // Metadata not yet visible or stale, retry - if (attempt < maxRetries - 1) { - Thread.sleep(retryDelayMs); - retryDelayMs *= 2; // Exponential backoff - } - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while waiting for metadata visibility", e); - } catch (Exception e) { - - // Continue to next retry - } - } - - // If we get here, metadata may still not be visible, but we've done our best - - } catch (Exception e) { - - // Don't throw - we don't want to fail the close() operation - } - } - private synchronized void writeCurrentBufferToService() throws IOException { int bufferPos = buffer.position(); - long positionBefore = position; - - if (path.contains("parquet") || path.contains("employees")) { - - } if (bufferPos == 0) { - if (path.contains("parquet") || path.contains("employees")) { - - } return; } int written = submitWriteBufferToService(buffer, position); position += written; - if (path.contains("parquet") || path.contains("employees")) { - - } - buffer = ByteBufferPool.request(bufferSize); } @@ -463,31 +316,15 @@ public class SeaweedOutputStream extends OutputStream { } protected synchronized void flushInternal() throws IOException { - if (path.contains("parquet") || path.contains("employees")) { - - } - maybeThrowLastError(); writeCurrentBufferToService(); flushWrittenBytesToService(); - - if (path.contains("parquet") || path.contains("employees")) { - - } } protected synchronized void flushInternalAsync() throws IOException { - if (path.contains("parquet") || path.contains("employees")) { - - } - maybeThrowLastError(); writeCurrentBufferToService(); flushWrittenBytesToServiceAsync(); - - if (path.contains("parquet") || path.contains("employees")) { - - } } private synchronized void flushWrittenBytesToService() throws IOException { @@ -513,12 +350,10 @@ public class SeaweedOutputStream extends OutputStream { private static class WriteOperation { private final Future task; - private final long startOffset; private final long length; WriteOperation(final Future task, final long startOffset, final long length) { this.task = task; - this.startOffset = startOffset; this.length = length; } } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java index f6c9655dc..a1a43820c 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java @@ -20,7 +20,7 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { super(filerClient, path, entry, position, bufferSize, replication); - + } /** @@ -31,7 +31,7 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy */ @Override public void hsync() throws IOException { - + if (supportFlush) { flushInternal(); } @@ -45,17 +45,12 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy */ @Override public void hflush() throws IOException { - + if (supportFlush) { flushInternal(); } } - private String getPath() { - // Access the path field from parent class for logging - return this.toString().contains("parquet") ? "parquet file" : "file"; - } - /** * Query the stream for a specific capability. *