From 7b9b04cd591867a1bd2ff9e71e629f7187efba68 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 23 Nov 2025 21:14:25 -0800 Subject: [PATCH] feat: add explicit logging when employees Parquet file is written MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PRECISION TRIGGER: Log exactly when the file we need is written! Changes: 1. SeaweedOutputStream.close(): Add WARN log for /test-spark/employees/*.parquet - Format: '=== PARQUET FILE WRITTEN TO EMPLOYEES: filename (size bytes) ===' - Uses WARN level so it stands out in logs 2. Workflow: Trigger download on this exact log message - Instead of 'Running seaweed.spark.SparkSQLTest' (too early) - Now triggers on 'PARQUET FILE WRITTEN TO EMPLOYEES' (exact moment!) Timeline: File write starts ↓ close() called → LOG APPEARS ↓ Workflow detects log → DOWNLOAD NOW! ← We're here instantly! ↓ Spark reads file → EOF error ↓ Analyze downloaded file ✅ This gives us the EXACT moment to download, with near-zero latency! --- .github/workflows/spark-integration-tests.yml | 6 +++--- .../src/main/java/seaweedfs/client/SeaweedOutputStream.java | 6 ++++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/.github/workflows/spark-integration-tests.yml b/.github/workflows/spark-integration-tests.yml index ca23cc2eb..fe489e506 100644 --- a/.github/workflows/spark-integration-tests.yml +++ b/.github/workflows/spark-integration-tests.yml @@ -131,11 +131,11 @@ jobs: ( DOWNLOADED=false while docker ps | grep -q seaweedfs-spark-tests; do - # Check if we've reached the SQL test (where employees files are created) - if docker compose logs spark-tests 2>&1 | grep -q "Running seaweed.spark.SparkSQLTest"; then + # Check if an employees Parquet file has been written (we log this explicitly) + if docker compose logs spark-tests 2>&1 | grep -q "PARQUET FILE WRITTEN TO EMPLOYEES"; then if [ "$DOWNLOADED" = "false" ]; then echo "" - echo "=== SparkSQLTest started! Polling for employees file creation ===" + echo "=== EMPLOYEES FILE WRITTEN! Downloading immediately ===" # Poll for files to appear (max 30 seconds) for i in {1..30}; do diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 222e7d9dc..3dd5106b1 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -249,6 +249,12 @@ public class SeaweedOutputStream extends OutputStream { LOG.info( "[DEBUG-2024] close END: path={} finalPosition={} totalBytesWritten={} writeCalls={} (buffer had {} bytes)", path, position, totalBytesWritten, writeCallCount, bufferPosBeforeFlush); + + // Special logging for employees directory files (to help CI download timing) + if (path.contains("/test-spark/employees/") && path.endsWith(".parquet")) { + String filename = path.substring(path.lastIndexOf('/') + 1); + LOG.warn("=== PARQUET FILE WRITTEN TO EMPLOYEES: {} ({} bytes) ===", filename, position); + } } finally { lastError = new IOException("Stream is closed!"); ByteBufferPool.release(buffer);