Browse Source

fix: search temporary directories for Parquet files

The issue: Files written to employees/ but immediately moved/deleted by Spark

Spark's file commit process:
1. Write to: employees/_temporary/0/_temporary/attempt_xxx/part-xxx.parquet
2. Commit/rename to: employees/part-xxx.parquet
3. Read and delete (on failure)

By the time we check employees/, the file is already gone!

Solution: Search multiple locations
- employees/ (final location)
- employees/_temporary/ (intermediate)
- employees/_temporary/0/_temporary/ (write location)
- Recursive search as fallback

Also:
- Extract exact filename from write log
- Try all locations until we find the file
- Show directory listings for debugging

This should catch files in their temporary location before Spark moves them!
pull/7526/head
chrislu 1 week ago
parent
commit
c774b807e1
  1. 52
      .github/workflows/spark-integration-tests.yml

52
.github/workflows/spark-integration-tests.yml

@ -135,36 +135,50 @@ jobs:
if docker compose logs spark-tests 2>&1 | grep -q "PARQUET FILE WRITTEN TO EMPLOYEES"; then
if [ "$DOWNLOADED" = "false" ]; then
echo ""
echo "=== EMPLOYEES FILE WRITTEN! Downloading immediately ==="
echo "=== EMPLOYEES FILE WRITTEN! Extracting from logs and downloading ==="
# Poll for files to appear (max 30 seconds)
for i in {1..30}; do
EMPLOYEES_FILES=$(curl -s "http://localhost:8888/test-spark/employees/" 2>/dev/null | grep -oP 'part-[a-f0-9-]+-c000\.snappy\.parquet')
# Extract the EXACT filename from the write log
FULL_LOG=$(docker compose logs spark-tests 2>&1)
WRITTEN_FILE=$(echo "$FULL_LOG" | grep "PARQUET FILE WRITTEN TO EMPLOYEES" | tail -1 | grep -oP 'part-[a-f0-9-]+-c000\.snappy\.parquet')
echo "Filename from log: $WRITTEN_FILE"
if [ -z "$WRITTEN_FILE" ]; then
echo "ERROR: Could not extract filename from write log"
echo "Showing write log lines:"
echo "$FULL_LOG" | grep "PARQUET FILE WRITTEN" | tail -5
fi
# Try multiple locations: final, temporary, _temporary subdirs
for BASE_DIR in "employees" "employees/_temporary" "employees/_temporary/0/_temporary"; do
echo "Trying: /test-spark/$BASE_DIR/"
if [ -n "$EMPLOYEES_FILES" ]; then
echo "Files appeared after $i seconds!"
echo "Found employees files, downloading ALL of them..."
for FILE in $EMPLOYEES_FILES; do
echo "Downloading: $FILE"
curl -o "${FILE}" "http://localhost:8888/test-spark/employees/${FILE}"
# List what's actually there
LISTING=$(curl -s "http://localhost:8888/test-spark/$BASE_DIR/" 2>/dev/null)
FILES=$(echo "$LISTING" | grep -oP 'part-[a-f0-9-]+-c000\.snappy\.parquet' || true)
if [ -n "$FILES" ]; then
echo "Found files in $BASE_DIR:"
echo "$FILES"
for FILE in $FILES; do
echo "Downloading: $FILE from $BASE_DIR"
curl -o "${FILE}" "http://localhost:8888/test-spark/$BASE_DIR/$FILE"
if [ -f "$FILE" ] && [ -s "$FILE" ]; then
FILE_SIZE=$(stat --format=%s "$FILE" 2>/dev/null || stat -f%z "$FILE" 2>/dev/null)
echo "SUCCESS: Downloaded $FILE_SIZE bytes"
cp "$FILE" test.parquet # Use first file for analysis
echo "SUCCESS: Downloaded $FILE_SIZE bytes from $BASE_DIR"
cp "$FILE" test.parquet
DOWNLOADED=true
break 2 # Break out of both loops
fi
done
break
fi
if [ $((i % 5)) -eq 0 ]; then
echo "Still waiting for files... ($i/30)"
fi
sleep 1
done
if [ "$DOWNLOADED" = "false" ]; then
echo "WARNING: No files found after 30 seconds of polling"
echo "WARNING: File not found in any location"
echo "Trying recursive search..."
curl -s "http://localhost:8888/test-spark/employees/?recursive=true" | grep parquet || true
fi
fi
fi

Loading…
Cancel
Save