diff --git a/.github/workflows/spark-integration-tests.yml b/.github/workflows/spark-integration-tests.yml index bbf4c1d5e..88624799d 100644 --- a/.github/workflows/spark-integration-tests.yml +++ b/.github/workflows/spark-integration-tests.yml @@ -135,50 +135,43 @@ jobs: if docker compose logs spark-tests 2>&1 | grep -q "PARQUET FILE WRITTEN TO EMPLOYEES"; then if [ "$DOWNLOADED" = "false" ]; then echo "" - echo "=== EMPLOYEES FILE WRITTEN! Extracting from logs and downloading ===" + echo "=== EMPLOYEES FILE WRITTEN! Extracting chunk IDs and downloading from volume ===" - # Extract the EXACT filename from the write log + # Extract chunk IDs directly from the write log (bypasses filer entirely!) FULL_LOG=$(docker compose logs spark-tests 2>&1) - WRITTEN_FILE=$(echo "$FULL_LOG" | grep "PARQUET FILE WRITTEN TO EMPLOYEES" | tail -1 | grep -oP 'part-[a-f0-9-]+-c000\.snappy\.parquet') + WRITE_LOG=$(echo "$FULL_LOG" | grep "PARQUET FILE WRITTEN TO EMPLOYEES" | tail -1) - echo "Filename from log: $WRITTEN_FILE" + echo "Write log: $WRITE_LOG" - if [ -z "$WRITTEN_FILE" ]; then - echo "ERROR: Could not extract filename from write log" - echo "Showing write log lines:" - echo "$FULL_LOG" | grep "PARQUET FILE WRITTEN" | tail -5 + # Extract chunk IDs from CHUNKS: [id1,id2,...] in the log + CHUNK_IDS=$(echo "$WRITE_LOG" | grep -oP 'CHUNKS: \[\K[^\]]+') + echo "Chunk IDs: $CHUNK_IDS" + + if [ -z "$CHUNK_IDS" ]; then + echo "ERROR: No chunk IDs in write log - using old format?" + # Fallback: try to find any chunk ID near the write log + CHUNK_IDS=$(echo "$FULL_LOG" | grep -B 20 "PARQUET FILE WRITTEN TO EMPLOYEES" | grep 'file_id: "' | tail -1 | grep -oP '"\K[^"]+') + echo "Fallback chunk ID: $CHUNK_IDS" fi - # Try multiple locations: final, temporary, _temporary subdirs - for BASE_DIR in "employees" "employees/_temporary" "employees/_temporary/0/_temporary"; do - echo "Trying: /test-spark/$BASE_DIR/" - - # List what's actually there - LISTING=$(curl -s "http://localhost:8888/test-spark/$BASE_DIR/" 2>/dev/null) - FILES=$(echo "$LISTING" | grep -oP 'part-[a-f0-9-]+-c000\.snappy\.parquet' || true) - - if [ -n "$FILES" ]; then - echo "Found files in $BASE_DIR:" - echo "$FILES" + if [ -n "$CHUNK_IDS" ]; then + # Download each chunk (usually just one for small files) + IFS=',' read -ra CHUNKS <<< "$CHUNK_IDS" + for CHUNK_ID in "${CHUNKS[@]}"; do + echo "Downloading chunk from volume server: http://localhost:8080/$CHUNK_ID" + curl -o "test.parquet" "http://localhost:8080/$CHUNK_ID" - for FILE in $FILES; do - echo "Downloading: $FILE from $BASE_DIR" - curl -o "${FILE}" "http://localhost:8888/test-spark/$BASE_DIR/$FILE" - if [ -f "$FILE" ] && [ -s "$FILE" ]; then - FILE_SIZE=$(stat --format=%s "$FILE" 2>/dev/null || stat -f%z "$FILE" 2>/dev/null) - echo "SUCCESS: Downloaded $FILE_SIZE bytes from $BASE_DIR" - cp "$FILE" test.parquet - DOWNLOADED=true - break 2 # Break out of both loops - fi - done - fi - done - - if [ "$DOWNLOADED" = "false" ]; then - echo "WARNING: File not found in any location" - echo "Trying recursive search..." - curl -s "http://localhost:8888/test-spark/employees/?recursive=true" | grep parquet || true + if [ -f test.parquet ] && [ -s test.parquet ]; then + FILE_SIZE=$(stat --format=%s test.parquet 2>/dev/null || stat -f%z test.parquet 2>/dev/null) + echo "SUCCESS: Downloaded $FILE_SIZE bytes from volume server!" + DOWNLOADED=true + break + else + echo "FAILED: Chunk $CHUNK_ID returned 404 or empty" + fi + done + else + echo "ERROR: Could not extract chunk IDs" fi fi fi diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 3dd5106b1..a40ea64cb 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -249,11 +249,19 @@ public class SeaweedOutputStream extends OutputStream { LOG.info( "[DEBUG-2024] close END: path={} finalPosition={} totalBytesWritten={} writeCalls={} (buffer had {} bytes)", path, position, totalBytesWritten, writeCallCount, bufferPosBeforeFlush); - + // Special logging for employees directory files (to help CI download timing) if (path.contains("/test-spark/employees/") && path.endsWith(".parquet")) { String filename = path.substring(path.lastIndexOf('/') + 1); - LOG.warn("=== PARQUET FILE WRITTEN TO EMPLOYEES: {} ({} bytes) ===", filename, position); + // Log filename, size, AND chunk IDs for direct volume download + StringBuilder chunkInfo = new StringBuilder(); + for (int i = 0; i < entry.getChunksCount(); i++) { + FilerProto.FileChunk chunk = entry.getChunks(i); + if (i > 0) chunkInfo.append(","); + chunkInfo.append(chunk.getFileId()); + } + LOG.warn("=== PARQUET FILE WRITTEN TO EMPLOYEES: {} ({} bytes) CHUNKS: [{}] ===", + filename, position, chunkInfo.toString()); } } finally { lastError = new IOException("Stream is closed!");