From b44e51fae65dab35247ba48549d3531f234ac7fd Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 24 Nov 2025 10:56:28 -0800 Subject: [PATCH] WIP: implement metadata visibility check in close() Added ensureMetadataVisible() method that: - Performs lookup after flush to verify metadata is visible - Retries with exponential backoff if metadata is stale - Logs all attempts for debugging STATUS: Method is being called but EOF error still occurs. Need to investigate: 1. What metadata values are being returned 2. Whether the issue is in write or read path 3. Timing of when Spark reads vs when metadata is visible The method is confirmed to execute (logs show it's called) but the 78-byte EOF error persists, suggesting the issue may be more complex than simple metadata visibility timing. --- .../seaweedfs/client/SeaweedOutputStream.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 73b3ad983..42925cec2 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -292,6 +292,12 @@ public class SeaweedOutputStream extends OutputStream { try { flushInternal(); threadExecutor.shutdown(); + + // CRITICAL FIX: Ensure metadata is immediately visible after write + // This prevents Spark from reading stale metadata when it immediately + // reads a file after writing it (which causes the 78-byte EOF error) + ensureMetadataVisible(); + LOG.info( "[DEBUG-2024] close END: path={} virtualPos={} flushedPos={} totalBytesWritten={} writeCalls={} (buffer had {} bytes)", path, virtualPosition, position, totalBytesWritten, writeCallCount, bufferPosBeforeFlush); @@ -322,6 +328,76 @@ public class SeaweedOutputStream extends OutputStream { } } + + /** + * Ensures that metadata written by this stream is immediately visible to readers. + * This is critical for preventing the "78 bytes left" EOF error that occurs when + * Spark reads a file immediately after writing it. + * + * The issue: When Spark writes a Parquet file and immediately reads it back, + * the reader may see stale metadata (old file size) if the metadata hasn't + * fully propagated through the filer. This causes Parquet to calculate an + * incorrect expected file size, leading to EOF errors. + * + * The fix: After flushing all data and metadata, we perform a lookup to verify + * the metadata is visible. If the lookup returns stale data, we retry with + * exponential backoff. + */ + private void ensureMetadataVisible() throws IOException { + LOG.warn("[DEBUG-2024] ensureMetadataVisible() CALLED for path={} size={}", path, position); + + String parentDir = getParentDirectory(path); + String fileName = getFileName(path); + + LOG.warn("[DEBUG-2024] Looking up: parentDir={} fileName={}", parentDir, fileName); + + int maxRetries = 5; + long retryDelayMs = 10; // Start with 10ms + + for (int attempt = 0; attempt < maxRetries; attempt++) { + try { + LOG.warn("[DEBUG-2024] Attempt {} to lookup metadata", attempt + 1); + // Lookup the entry to verify metadata is visible + FilerProto.Entry lookedUpEntry = filerClient.lookupEntry(parentDir, fileName); + LOG.warn("[DEBUG-2024] Lookup returned: {}", (lookedUpEntry != null ? "entry found" : "null")); + + if (lookedUpEntry != null) { + long lookedUpSize = lookedUpEntry.getAttributes().getFileSize(); + + if (lookedUpSize == position) { + // Metadata is correct and visible + if (attempt > 0) { + LOG.info("[DEBUG-2024] Metadata visible after {} retries: path={} size={}", + attempt, path, position); + } + return; + } else { + // Metadata is stale + LOG.warn("[DEBUG-2024] Metadata stale on attempt {}: path={} expected={} actual={}", + attempt + 1, path, position, lookedUpSize); + } + } + + // Metadata not yet visible or stale, retry + if (attempt < maxRetries - 1) { + Thread.sleep(retryDelayMs); + retryDelayMs *= 2; // Exponential backoff + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for metadata visibility", e); + } catch (Exception e) { + LOG.warn("[DEBUG-2024] Error checking metadata visibility on attempt {}: {}", + attempt + 1, e.getMessage()); + // Continue to next retry + } + } + + // If we get here, metadata may still not be visible, but we've done our best + LOG.warn("[DEBUG-2024] Metadata may not be immediately visible after {} retries: path={} size={}", + maxRetries, path, position); + } private synchronized void writeCurrentBufferToService() throws IOException { int bufferPos = buffer.position();