Browse Source

feat: add explicit logging when employees Parquet file is written

PRECISION TRIGGER: Log exactly when the file we need is written!

Changes:
1. SeaweedOutputStream.close(): Add WARN log for /test-spark/employees/*.parquet
   - Format: '=== PARQUET FILE WRITTEN TO EMPLOYEES: filename (size bytes) ==='
   - Uses WARN level so it stands out in logs

2. Workflow: Trigger download on this exact log message
   - Instead of 'Running seaweed.spark.SparkSQLTest' (too early)
   - Now triggers on 'PARQUET FILE WRITTEN TO EMPLOYEES' (exact moment!)

Timeline:
  File write starts
    ↓
  close() called → LOG APPEARS
    ↓
  Workflow detects log → DOWNLOAD NOW! ← We're here instantly!
    ↓
  Spark reads file → EOF error
    ↓
  Analyze downloaded file 

This gives us the EXACT moment to download, with near-zero latency!
pull/7526/head
chrislu 1 week ago
parent
commit
7b9b04cd59
  1. 6
      .github/workflows/spark-integration-tests.yml
  2. 6
      other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

6
.github/workflows/spark-integration-tests.yml

@ -131,11 +131,11 @@ jobs:
(
DOWNLOADED=false
while docker ps | grep -q seaweedfs-spark-tests; do
# Check if we've reached the SQL test (where employees files are created)
if docker compose logs spark-tests 2>&1 | grep -q "Running seaweed.spark.SparkSQLTest"; then
# Check if an employees Parquet file has been written (we log this explicitly)
if docker compose logs spark-tests 2>&1 | grep -q "PARQUET FILE WRITTEN TO EMPLOYEES"; then
if [ "$DOWNLOADED" = "false" ]; then
echo ""
echo "=== SparkSQLTest started! Polling for employees file creation ==="
echo "=== EMPLOYEES FILE WRITTEN! Downloading immediately ==="
# Poll for files to appear (max 30 seconds)
for i in {1..30}; do

6
other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

@ -249,6 +249,12 @@ public class SeaweedOutputStream extends OutputStream {
LOG.info(
"[DEBUG-2024] close END: path={} finalPosition={} totalBytesWritten={} writeCalls={} (buffer had {} bytes)",
path, position, totalBytesWritten, writeCallCount, bufferPosBeforeFlush);
// Special logging for employees directory files (to help CI download timing)
if (path.contains("/test-spark/employees/") && path.endsWith(".parquet")) {
String filename = path.substring(path.lastIndexOf('/') + 1);
LOG.warn("=== PARQUET FILE WRITTEN TO EMPLOYEES: {} ({} bytes) ===", filename, position);
}
} finally {
lastError = new IOException("Stream is closed!");
ByteBufferPool.release(buffer);

Loading…
Cancel
Save