Browse Source

clean up

pull/7526/head
chrislu 1 week ago
parent
commit
0da72322d1
  1. 167
      other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
  2. 11
      other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java

167
other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

@ -23,21 +23,15 @@ public class SeaweedOutputStream extends OutputStream {
private final ThreadPoolExecutor threadExecutor; private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService; private final ExecutorCompletionService<Void> completionService;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations; private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private final boolean shouldSaveMetadata = false;
private FilerProto.Entry.Builder entry; private FilerProto.Entry.Builder entry;
private long position; // Flushed bytes (committed to service) private long position; // Flushed bytes (committed to service)
private long virtualPosition; // Total bytes written (including buffered), for getPos()
private boolean closed; private boolean closed;
private volatile IOException lastError; private volatile IOException lastError;
private long lastFlushOffset; private long lastFlushOffset;
private long lastTotalAppendOffset = 0; private long lastTotalAppendOffset = 0;
private ByteBuffer buffer; private ByteBuffer buffer;
private long outputIndex;
private String replication = ""; private String replication = "";
private String collection = ""; private String collection = "";
private long totalBytesWritten = 0; // Track total bytes for debugging
private long writeCallCount = 0; // Track number of write() calls
private int getPosCallCount = 0; // Track getPos() calls for throttling flushes
public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { public SeaweedOutputStream(FilerClient filerClient, final String fullpath) {
this(filerClient, fullpath, ""); this(filerClient, fullpath, "");
@ -49,12 +43,11 @@ public class SeaweedOutputStream extends OutputStream {
public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) { final long position, final int bufferSize, final String replication) {
this.filerClient = filerClient; this.filerClient = filerClient;
this.replication = replication; this.replication = replication;
this.path = path; this.path = path;
this.position = position; this.position = position;
this.virtualPosition = position; // Initialize to match position
this.closed = false; this.closed = false;
this.lastError = null; this.lastError = null;
this.lastFlushOffset = 0; this.lastFlushOffset = 0;
@ -102,8 +95,6 @@ public class SeaweedOutputStream extends OutputStream {
* @return current position (flushed + buffered bytes) * @return current position (flushed + buffered bytes)
*/ */
public synchronized long getPos() throws IOException { public synchronized long getPos() throws IOException {
getPosCallCount++;
// Guard against NPE if called after close() // Guard against NPE if called after close()
if (buffer == null) { if (buffer == null) {
return position; return position;
@ -144,7 +135,6 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try { try {
// Set the file size in attributes based on our position // Set the file size in attributes based on our position
// This ensures Parquet footer metadata matches what we actually wrote // This ensures Parquet footer metadata matches what we actually wrote
@ -177,30 +167,10 @@ public class SeaweedOutputStream extends OutputStream {
throw new IndexOutOfBoundsException(); throw new IndexOutOfBoundsException();
} }
totalBytesWritten += length;
writeCallCount++;
virtualPosition += length; // Update virtual position for getPos()
// Enhanced debug logging for ALL writes to track the exact sequence
if (path.contains("parquet") || path.contains("employees")) {
long beforeBufferPos = buffer.position();
// Always log writes to see the complete pattern
if (length >= 20 || writeCallCount >= 220 || writeCallCount % 50 == 0) {
}
}
// System.out.println(path + " write [" + (outputIndex + off) + "," +
// ((outputIndex + off) + length) + ")");
int currentOffset = off; int currentOffset = off;
int writableBytes = bufferSize - buffer.position(); int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length; int numberOfBytesToWrite = length;
// Track position before write
long posBeforeWrite = position + buffer.position();
while (numberOfBytesToWrite > 0) { while (numberOfBytesToWrite > 0) {
if (numberOfBytesToWrite < writableBytes) { if (numberOfBytesToWrite < writableBytes) {
@ -213,10 +183,6 @@ public class SeaweedOutputStream extends OutputStream {
buffer.put(data, currentOffset, writableBytes); buffer.put(data, currentOffset, writableBytes);
currentOffset += writableBytes; currentOffset += writableBytes;
if (path.contains("parquet")) {
}
writeCurrentBufferToService(); writeCurrentBufferToService();
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
writableBytes = bufferSize - buffer.position(); writableBytes = bufferSize - buffer.position();
@ -232,10 +198,6 @@ public class SeaweedOutputStream extends OutputStream {
*/ */
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
if (path.contains("parquet") || path.contains("employees")) {
}
if (supportFlush) { if (supportFlush) {
flushInternalAsync(); flushInternalAsync();
} }
@ -255,33 +217,9 @@ public class SeaweedOutputStream extends OutputStream {
return; return;
} }
int bufferPosBeforeFlush = buffer.position();
try { try {
flushInternal(); flushInternal();
threadExecutor.shutdown(); threadExecutor.shutdown();
// NOTE: Metadata visibility check was attempted here but caused hangs
// because lookupEntry() blocks when called from within close().
// The real issue is in Spark's file commit/rename process, not here.
// See BREAKTHROUGH_IO_COMPARISON.md for details.
// Special logging for employees directory files (to help CI download timing)
if (path.contains("/test-spark/employees/") && path.endsWith(".parquet")) {
String filename = path.substring(path.lastIndexOf('/') + 1);
// Log filename, size, AND chunk IDs for direct volume download
StringBuilder chunkInfo = new StringBuilder();
for (int i = 0; i < entry.getChunksCount(); i++) {
FilerProto.FileChunk chunk = entry.getChunks(i);
if (i > 0)
chunkInfo.append(",");
chunkInfo.append(chunk.getFileId());
}
LOG.warn("=== PARQUET FILE WRITTEN TO EMPLOYEES: {} ({} bytes) CHUNKS: [{}] ===",
filename, position, chunkInfo.toString());
}
} finally { } finally {
lastError = new IOException("Stream is closed!"); lastError = new IOException("Stream is closed!");
ByteBufferPool.release(buffer); ByteBufferPool.release(buffer);
@ -295,101 +233,16 @@ public class SeaweedOutputStream extends OutputStream {
} }
/**
* Ensures that metadata written by this stream is immediately visible to
* readers.
* This is critical for preventing the "78 bytes left" EOF error that occurs
* when
* Spark reads a file immediately after writing it.
*
* The issue: When Spark writes a Parquet file and immediately reads it back,
* the reader may see stale metadata (old file size) if the metadata hasn't
* fully propagated through the filer. This causes Parquet to calculate an
* incorrect expected file size, leading to EOF errors.
*
* The fix: After flushing all data and metadata, we perform a lookup to verify
* the metadata is visible. If the lookup returns stale data, we retry with
* exponential backoff.
*/
private void ensureMetadataVisible() throws IOException {
try {
String parentDir = getParentDirectory(path);
String fileName = getFileName(path);
int maxRetries = 5;
long retryDelayMs = 10; // Start with 10ms
for (int attempt = 0; attempt < maxRetries; attempt++) {
try {
// Lookup the entry to verify metadata is visible
FilerProto.Entry lookedUpEntry = filerClient.lookupEntry(parentDir, fileName);
if (lookedUpEntry != null) {
long lookedUpSize = lookedUpEntry.getAttributes().getFileSize();
if (lookedUpSize == position) {
// Metadata is correct and visible
if (attempt > 0) {
}
return;
} else {
// Metadata is stale
}
}
// Metadata not yet visible or stale, retry
if (attempt < maxRetries - 1) {
Thread.sleep(retryDelayMs);
retryDelayMs *= 2; // Exponential backoff
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting for metadata visibility", e);
} catch (Exception e) {
// Continue to next retry
}
}
// If we get here, metadata may still not be visible, but we've done our best
} catch (Exception e) {
// Don't throw - we don't want to fail the close() operation
}
}
private synchronized void writeCurrentBufferToService() throws IOException { private synchronized void writeCurrentBufferToService() throws IOException {
int bufferPos = buffer.position(); int bufferPos = buffer.position();
long positionBefore = position;
if (path.contains("parquet") || path.contains("employees")) {
}
if (bufferPos == 0) { if (bufferPos == 0) {
if (path.contains("parquet") || path.contains("employees")) {
}
return; return;
} }
int written = submitWriteBufferToService(buffer, position); int written = submitWriteBufferToService(buffer, position);
position += written; position += written;
if (path.contains("parquet") || path.contains("employees")) {
}
buffer = ByteBufferPool.request(bufferSize); buffer = ByteBufferPool.request(bufferSize);
} }
@ -463,31 +316,15 @@ public class SeaweedOutputStream extends OutputStream {
} }
protected synchronized void flushInternal() throws IOException { protected synchronized void flushInternal() throws IOException {
if (path.contains("parquet") || path.contains("employees")) {
}
maybeThrowLastError(); maybeThrowLastError();
writeCurrentBufferToService(); writeCurrentBufferToService();
flushWrittenBytesToService(); flushWrittenBytesToService();
if (path.contains("parquet") || path.contains("employees")) {
}
} }
protected synchronized void flushInternalAsync() throws IOException { protected synchronized void flushInternalAsync() throws IOException {
if (path.contains("parquet") || path.contains("employees")) {
}
maybeThrowLastError(); maybeThrowLastError();
writeCurrentBufferToService(); writeCurrentBufferToService();
flushWrittenBytesToServiceAsync(); flushWrittenBytesToServiceAsync();
if (path.contains("parquet") || path.contains("employees")) {
}
} }
private synchronized void flushWrittenBytesToService() throws IOException { private synchronized void flushWrittenBytesToService() throws IOException {
@ -513,12 +350,10 @@ public class SeaweedOutputStream extends OutputStream {
private static class WriteOperation { private static class WriteOperation {
private final Future<Void> task; private final Future<Void> task;
private final long startOffset;
private final long length; private final long length;
WriteOperation(final Future<Void> task, final long startOffset, final long length) { WriteOperation(final Future<Void> task, final long startOffset, final long length) {
this.task = task; this.task = task;
this.startOffset = startOffset;
this.length = length; this.length = length;
} }
} }

11
other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java

@ -20,7 +20,7 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy
public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) { final long position, final int bufferSize, final String replication) {
super(filerClient, path, entry, position, bufferSize, replication); super(filerClient, path, entry, position, bufferSize, replication);
} }
/** /**
@ -31,7 +31,7 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy
*/ */
@Override @Override
public void hsync() throws IOException { public void hsync() throws IOException {
if (supportFlush) { if (supportFlush) {
flushInternal(); flushInternal();
} }
@ -45,17 +45,12 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy
*/ */
@Override @Override
public void hflush() throws IOException { public void hflush() throws IOException {
if (supportFlush) { if (supportFlush) {
flushInternal(); flushInternal();
} }
} }
private String getPath() {
// Access the path field from parent class for logging
return this.toString().contains("parquet") ? "parquet file" : "file";
}
/** /**
* Query the stream for a specific capability. * Query the stream for a specific capability.
* *

Loading…
Cancel
Save