|
|
|
@ -49,8 +49,7 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
|
|
|
|
public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, |
|
|
|
final long position, final int bufferSize, final String replication) { |
|
|
|
LOG.warn("[DEBUG-2024] SeaweedOutputStream BASE constructor called: path={} position={} bufferSize={}", |
|
|
|
path, position, bufferSize); |
|
|
|
|
|
|
|
this.filerClient = filerClient; |
|
|
|
this.replication = replication; |
|
|
|
this.path = path; |
|
|
|
@ -110,9 +109,7 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
long virtualPos = position + buffer.position(); |
|
|
|
|
|
|
|
if (path.contains("parquet")) { |
|
|
|
LOG.warn( |
|
|
|
"[DEBUG-2024] getPos() #{}: returning virtualPos={} (flushed={} + buffered={}) totalBytesWritten={} writeCalls={}", |
|
|
|
getPosCallCount, virtualPos, position, buffer.position(), totalBytesWritten, writeCallCount); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return virtualPos; |
|
|
|
@ -144,8 +141,7 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
|
|
|
|
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { |
|
|
|
try { |
|
|
|
LOG.info("[DEBUG-2024] flushWrittenBytesToServiceInternal: path={} offset={} #chunks={}", |
|
|
|
path, offset, entry.getChunksCount()); |
|
|
|
|
|
|
|
|
|
|
|
// Set the file size in attributes based on our position |
|
|
|
// This ensures Parquet footer metadata matches what we actually wrote |
|
|
|
@ -199,10 +195,7 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
|
|
|
|
// Always log writes to see the complete pattern |
|
|
|
if (length >= 20 || writeCallCount >= 220 || writeCallCount % 50 == 0) { |
|
|
|
LOG.warn( |
|
|
|
"[DEBUG-2024] WRITE #{}: {} bytes | virtualPos={} (flushed={} + buffered={}) | totalWritten={} | file={}", |
|
|
|
writeCallCount, length, virtualPosition, position, beforeBufferPos, |
|
|
|
totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -229,9 +222,7 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
currentOffset += writableBytes; |
|
|
|
|
|
|
|
if (path.contains("parquet")) { |
|
|
|
LOG.warn( |
|
|
|
"[DEBUG-2024] Buffer FLUSH: posBeforeFlush={} flushingBufferSize={} newPositionAfterFlush={} totalWritten={}", |
|
|
|
posBeforeWrite, bufferSize, position + bufferSize, totalBytesWritten); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
writeCurrentBufferToService(); |
|
|
|
@ -250,10 +241,7 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
@Override |
|
|
|
public void flush() throws IOException { |
|
|
|
if (path.contains("parquet") || path.contains("employees")) { |
|
|
|
LOG.warn( |
|
|
|
"[DEBUG-2024] flush() CALLED: supportFlush={} virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}", |
|
|
|
supportFlush, virtualPosition, position, buffer.position(), totalBytesWritten, |
|
|
|
path.substring(path.lastIndexOf('/') + 1)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (supportFlush) { |
|
|
|
@ -276,9 +264,7 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
} |
|
|
|
|
|
|
|
int bufferPosBeforeFlush = buffer.position(); |
|
|
|
LOG.info( |
|
|
|
"[DEBUG-2024] close START: path={} virtualPos={} flushedPos={} buffer.position()={} totalBytesWritten={} writeCalls={}", |
|
|
|
path, virtualPosition, position, bufferPosBeforeFlush, totalBytesWritten, writeCallCount); |
|
|
|
|
|
|
|
try { |
|
|
|
flushInternal(); |
|
|
|
threadExecutor.shutdown(); |
|
|
|
@ -288,9 +274,7 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
// The real issue is in Spark's file commit/rename process, not here. |
|
|
|
// See BREAKTHROUGH_IO_COMPARISON.md for details. |
|
|
|
|
|
|
|
LOG.info( |
|
|
|
"[DEBUG-2024] close END: path={} virtualPos={} flushedPos={} totalBytesWritten={} writeCalls={} (buffer had {} bytes)", |
|
|
|
path, virtualPosition, position, totalBytesWritten, writeCallCount, bufferPosBeforeFlush); |
|
|
|
|
|
|
|
|
|
|
|
// Special logging for employees directory files (to help CI download timing) |
|
|
|
if (path.contains("/test-spark/employees/") && path.endsWith(".parquet")) { |
|
|
|
@ -337,22 +321,22 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
*/ |
|
|
|
private void ensureMetadataVisible() throws IOException { |
|
|
|
try { |
|
|
|
LOG.warn("[DEBUG-2024] ensureMetadataVisible() CALLED for path={} size={}", path, position); |
|
|
|
|
|
|
|
|
|
|
|
String parentDir = getParentDirectory(path); |
|
|
|
String fileName = getFileName(path); |
|
|
|
|
|
|
|
LOG.warn("[DEBUG-2024] Looking up: parentDir={} fileName={}", parentDir, fileName); |
|
|
|
|
|
|
|
|
|
|
|
int maxRetries = 5; |
|
|
|
long retryDelayMs = 10; // Start with 10ms |
|
|
|
|
|
|
|
for (int attempt = 0; attempt < maxRetries; attempt++) { |
|
|
|
try { |
|
|
|
LOG.warn("[DEBUG-2024] Attempt {} to lookup metadata", attempt + 1); |
|
|
|
|
|
|
|
// Lookup the entry to verify metadata is visible |
|
|
|
FilerProto.Entry lookedUpEntry = filerClient.lookupEntry(parentDir, fileName); |
|
|
|
LOG.warn("[DEBUG-2024] Lookup returned: {}", (lookedUpEntry != null ? "entry found" : "null")); |
|
|
|
|
|
|
|
|
|
|
|
if (lookedUpEntry != null) { |
|
|
|
long lookedUpSize = lookedUpEntry.getAttributes().getFileSize(); |
|
|
|
@ -360,14 +344,12 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
if (lookedUpSize == position) { |
|
|
|
// Metadata is correct and visible |
|
|
|
if (attempt > 0) { |
|
|
|
LOG.info("[DEBUG-2024] Metadata visible after {} retries: path={} size={}", |
|
|
|
attempt, path, position); |
|
|
|
|
|
|
|
} |
|
|
|
return; |
|
|
|
} else { |
|
|
|
// Metadata is stale |
|
|
|
LOG.warn("[DEBUG-2024] Metadata stale on attempt {}: path={} expected={} actual={}", |
|
|
|
attempt + 1, path, position, lookedUpSize); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -381,17 +363,15 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
Thread.currentThread().interrupt(); |
|
|
|
throw new IOException("Interrupted while waiting for metadata visibility", e); |
|
|
|
} catch (Exception e) { |
|
|
|
LOG.warn("[DEBUG-2024] Error checking metadata visibility on attempt {}: {}", |
|
|
|
attempt + 1, e.getMessage()); |
|
|
|
|
|
|
|
// Continue to next retry |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// If we get here, metadata may still not be visible, but we've done our best |
|
|
|
LOG.warn("[DEBUG-2024] Metadata may not be immediately visible after {} retries: path={} size={}", |
|
|
|
maxRetries, path, position); |
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
LOG.error("[DEBUG-2024] EXCEPTION in ensureMetadataVisible(): {}", e.getMessage(), e); |
|
|
|
|
|
|
|
// Don't throw - we don't want to fail the close() operation |
|
|
|
} |
|
|
|
} |
|
|
|
@ -401,14 +381,12 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
long positionBefore = position; |
|
|
|
|
|
|
|
if (path.contains("parquet") || path.contains("employees")) { |
|
|
|
LOG.warn( |
|
|
|
"[DEBUG-2024] writeCurrentBufferToService START: buffer.position()={} currentFlushedPosition={} totalWritten={} path={}", |
|
|
|
bufferPos, position, totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (bufferPos == 0) { |
|
|
|
if (path.contains("parquet") || path.contains("employees")) { |
|
|
|
LOG.warn("[DEBUG-2024] -> Skipping: buffer is empty"); |
|
|
|
|
|
|
|
} |
|
|
|
return; |
|
|
|
} |
|
|
|
@ -417,9 +395,7 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
position += written; |
|
|
|
|
|
|
|
if (path.contains("parquet") || path.contains("employees")) { |
|
|
|
LOG.warn( |
|
|
|
"[DEBUG-2024] writeCurrentBufferToService END: submitted {} bytes, flushedPosition {} -> {}, totalWritten={} path={}", |
|
|
|
written, positionBefore, position, totalBytesWritten, path.substring(path.lastIndexOf('/') + 1)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
buffer = ByteBufferPool.request(bufferSize); |
|
|
|
@ -496,10 +472,7 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
|
|
|
|
protected synchronized void flushInternal() throws IOException { |
|
|
|
if (path.contains("parquet") || path.contains("employees")) { |
|
|
|
LOG.warn( |
|
|
|
"[DEBUG-2024] flushInternal() START: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}", |
|
|
|
virtualPosition, position, buffer.position(), totalBytesWritten, |
|
|
|
path.substring(path.lastIndexOf('/') + 1)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
maybeThrowLastError(); |
|
|
|
@ -507,19 +480,13 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
flushWrittenBytesToService(); |
|
|
|
|
|
|
|
if (path.contains("parquet") || path.contains("employees")) { |
|
|
|
LOG.warn( |
|
|
|
"[DEBUG-2024] flushInternal() END: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}", |
|
|
|
virtualPosition, position, buffer.position(), totalBytesWritten, |
|
|
|
path.substring(path.lastIndexOf('/') + 1)); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
protected synchronized void flushInternalAsync() throws IOException { |
|
|
|
if (path.contains("parquet") || path.contains("employees")) { |
|
|
|
LOG.warn( |
|
|
|
"[DEBUG-2024] flushInternalAsync() START: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}", |
|
|
|
virtualPosition, position, buffer.position(), totalBytesWritten, |
|
|
|
path.substring(path.lastIndexOf('/') + 1)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
maybeThrowLastError(); |
|
|
|
@ -527,10 +494,7 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
flushWrittenBytesToServiceAsync(); |
|
|
|
|
|
|
|
if (path.contains("parquet") || path.contains("employees")) { |
|
|
|
LOG.warn( |
|
|
|
"[DEBUG-2024] flushInternalAsync() END: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}", |
|
|
|
virtualPosition, position, buffer.position(), totalBytesWritten, |
|
|
|
path.substring(path.lastIndexOf('/') + 1)); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|