diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 73b3ad983..42925cec2 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -292,6 +292,12 @@ public class SeaweedOutputStream extends OutputStream { try { flushInternal(); threadExecutor.shutdown(); + + // CRITICAL FIX: Ensure metadata is immediately visible after write + // This prevents Spark from reading stale metadata when it immediately + // reads a file after writing it (which causes the 78-byte EOF error) + ensureMetadataVisible(); + LOG.info( "[DEBUG-2024] close END: path={} virtualPos={} flushedPos={} totalBytesWritten={} writeCalls={} (buffer had {} bytes)", path, virtualPosition, position, totalBytesWritten, writeCallCount, bufferPosBeforeFlush); @@ -322,6 +328,76 @@ public class SeaweedOutputStream extends OutputStream { } } + + /** + * Ensures that metadata written by this stream is immediately visible to readers. + * This is critical for preventing the "78 bytes left" EOF error that occurs when + * Spark reads a file immediately after writing it. + * + * The issue: When Spark writes a Parquet file and immediately reads it back, + * the reader may see stale metadata (old file size) if the metadata hasn't + * fully propagated through the filer. This causes Parquet to calculate an + * incorrect expected file size, leading to EOF errors. + * + * The fix: After flushing all data and metadata, we perform a lookup to verify + * the metadata is visible. If the lookup returns stale data, we retry with + * exponential backoff. + */ + private void ensureMetadataVisible() throws IOException { + LOG.warn("[DEBUG-2024] ensureMetadataVisible() CALLED for path={} size={}", path, position); + + String parentDir = getParentDirectory(path); + String fileName = getFileName(path); + + LOG.warn("[DEBUG-2024] Looking up: parentDir={} fileName={}", parentDir, fileName); + + int maxRetries = 5; + long retryDelayMs = 10; // Start with 10ms + + for (int attempt = 0; attempt < maxRetries; attempt++) { + try { + LOG.warn("[DEBUG-2024] Attempt {} to lookup metadata", attempt + 1); + // Lookup the entry to verify metadata is visible + FilerProto.Entry lookedUpEntry = filerClient.lookupEntry(parentDir, fileName); + LOG.warn("[DEBUG-2024] Lookup returned: {}", (lookedUpEntry != null ? "entry found" : "null")); + + if (lookedUpEntry != null) { + long lookedUpSize = lookedUpEntry.getAttributes().getFileSize(); + + if (lookedUpSize == position) { + // Metadata is correct and visible + if (attempt > 0) { + LOG.info("[DEBUG-2024] Metadata visible after {} retries: path={} size={}", + attempt, path, position); + } + return; + } else { + // Metadata is stale + LOG.warn("[DEBUG-2024] Metadata stale on attempt {}: path={} expected={} actual={}", + attempt + 1, path, position, lookedUpSize); + } + } + + // Metadata not yet visible or stale, retry + if (attempt < maxRetries - 1) { + Thread.sleep(retryDelayMs); + retryDelayMs *= 2; // Exponential backoff + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for metadata visibility", e); + } catch (Exception e) { + LOG.warn("[DEBUG-2024] Error checking metadata visibility on attempt {}: {}", + attempt + 1, e.getMessage()); + // Continue to next retry + } + } + + // If we get here, metadata may still not be visible, but we've done our best + LOG.warn("[DEBUG-2024] Metadata may not be immediately visible after {} retries: path={} size={}", + maxRetries, path, position); + } private synchronized void writeCurrentBufferToService() throws IOException { int bufferPos = buffer.position();