10 changed files with 1 additions and 841 deletions
-
226.github/workflows/spark-integration-tests.yml
-
39test/java/spark/ReadParquetMeta.java
-
38test/java/spark/TEST_ALL_THREE_MODES.sh
-
50test/java/spark/capture-parquet.sh
-
180test/java/spark/download_and_test.sh
-
34test/java/spark/patch-parquet.sh
-
40test/java/spark/test-one.sh
-
55test/java/spark/test_parquet_external_read.sh
-
60test/java/spark/test_parquet_readability.sh
-
120test/java/spark/test_with_readers.sh
@ -1,39 +0,0 @@ |
|||||
import org.apache.parquet.hadoop.ParquetFileReader; |
|
||||
import org.apache.parquet.hadoop.metadata.ParquetMetadata; |
|
||||
import org.apache.parquet.hadoop.util.HadoopInputFile; |
|
||||
import org.apache.hadoop.conf.Configuration; |
|
||||
import org.apache.hadoop.fs.Path; |
|
||||
|
|
||||
public class ReadParquetMeta { |
|
||||
public static void main(String[] args) throws Exception { |
|
||||
Configuration conf = new Configuration(); |
|
||||
Path path = new Path(args[0]); |
|
||||
HadoopInputFile inputFile = HadoopInputFile.fromPath(path, conf); |
|
||||
|
|
||||
try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) { |
|
||||
ParquetMetadata meta = reader.getFooter(); |
|
||||
|
|
||||
System.out.println("=== Parquet File Metadata ==="); |
|
||||
System.out.println("Blocks (row groups): " + meta.getBlocks().size()); |
|
||||
System.out.println("File size from footer: " + inputFile.getLength()); |
|
||||
System.out.println(""); |
|
||||
|
|
||||
meta.getBlocks().forEach(block -> { |
|
||||
System.out.println("Row Group:"); |
|
||||
System.out.println(" Rows: " + block.getRowCount()); |
|
||||
System.out.println(" Total byte size: " + block.getTotalByteSize()); |
|
||||
System.out.println(" Columns: " + block.getColumns().size()); |
|
||||
System.out.println(""); |
|
||||
|
|
||||
block.getColumns().forEach(col -> { |
|
||||
System.out.println(" Column: " + col.getPath()); |
|
||||
System.out.println(" First data page offset: " + col.getFirstDataPageOffset()); |
|
||||
System.out.println(" Dictionary page offset: " + col.getDictionaryPageOffset()); |
|
||||
System.out.println(" Total size: " + col.getTotalSize()); |
|
||||
System.out.println(" Total uncompressed size: " + col.getTotalUncompressedSize()); |
|
||||
System.out.println(""); |
|
||||
}); |
|
||||
}); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
@ -1,38 +0,0 @@ |
|||||
#!/bin/bash |
|
||||
set -e |
|
||||
|
|
||||
echo "==========================================" |
|
||||
echo "Testing All Three Debug Modes" |
|
||||
echo "==========================================" |
|
||||
echo "" |
|
||||
|
|
||||
cd /Users/chrislu/go/src/github.com/seaweedfs/seaweedfs/test/java/spark |
|
||||
|
|
||||
# Mode 1: SEAWEED_ONLY (default) |
|
||||
echo "=== MODE 1: SEAWEED_ONLY ===" |
|
||||
docker compose run --rm -e SEAWEEDFS_TEST_ENABLED=true \ |
|
||||
spark-tests bash -c 'cd /workspace && mvn test -Dtest=SparkSQLTest#testCreateTableAndQuery 2>&1' \ |
|
||||
| grep -E "Tests run|BUILD SUCCESS|BUILD FAILURE|EOFException" | tail -5 |
|
||||
echo "" |
|
||||
|
|
||||
# Mode 2: LOCAL_ONLY |
|
||||
echo "=== MODE 2: LOCAL_ONLY ===" |
|
||||
docker compose run --rm -e SEAWEEDFS_TEST_ENABLED=true \ |
|
||||
-e SEAWEEDFS_DEBUG_MODE=LOCAL_ONLY \ |
|
||||
-e SEAWEEDFS_DEBUG_DIR=/workspace/target/debug-local \ |
|
||||
spark-tests bash -c 'mkdir -p /workspace/target/debug-local && cd /workspace && mvn test -Dtest=SparkSQLTest#testCreateTableAndQuery 2>&1' \ |
|
||||
| grep -E "Tests run|BUILD SUCCESS|BUILD FAILURE|EOFException|length is too low" | tail -5 |
|
||||
echo "" |
|
||||
|
|
||||
# Mode 3: DUAL_COMPARE |
|
||||
echo "=== MODE 3: DUAL_COMPARE ===" |
|
||||
docker compose run --rm -e SEAWEEDFS_TEST_ENABLED=true \ |
|
||||
-e SEAWEEDFS_DEBUG_MODE=DUAL_COMPARE \ |
|
||||
-e SEAWEEDFS_DEBUG_DIR=/workspace/target/debug-dual \ |
|
||||
spark-tests bash -c 'mkdir -p /workspace/target/debug-dual && cd /workspace && mvn test -Dtest=SparkSQLTest#testCreateTableAndQuery 2>&1' \ |
|
||||
| grep -E "Tests run|BUILD SUCCESS|BUILD FAILURE|EOFException" | tail -5 |
|
||||
echo "" |
|
||||
|
|
||||
echo "==========================================" |
|
||||
echo "Test Summary" |
|
||||
echo "==========================================" |
|
||||
@ -1,50 +0,0 @@ |
|||||
#!/bin/bash |
|
||||
# Run Spark test and capture the Parquet file before cleanup |
|
||||
|
|
||||
echo "Starting SeaweedFS services..." |
|
||||
docker compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer |
|
||||
sleep 10 |
|
||||
|
|
||||
echo "Running Spark test in background..." |
|
||||
docker compose run --rm -e SEAWEEDFS_TEST_ENABLED=true spark-tests bash -c "mvn test -Dtest=SparkSQLTest#testCreateTableAndQuery 2>&1" > /tmp/spark-test-capture.log & |
|
||||
TEST_PID=$! |
|
||||
|
|
||||
echo "Monitoring for Parquet file creation..." |
|
||||
while kill -0 $TEST_PID 2>/dev/null; do |
|
||||
# Check if employees directory exists |
|
||||
FILES=$(curl -s http://localhost:8888/test-spark/employees/ 2>/dev/null | grep -o 'part-[^"]*\.parquet' || echo "") |
|
||||
if [ -n "$FILES" ]; then |
|
||||
echo "Found Parquet file(s)!" |
|
||||
for FILE in $FILES; do |
|
||||
echo "Downloading: $FILE" |
|
||||
curl -s "http://localhost:8888/test-spark/employees/$FILE" > "/tmp/$FILE" |
|
||||
FILE_SIZE=$(stat -f%z "/tmp/$FILE" 2>/dev/null || stat --format=%s "/tmp/$FILE" 2>/dev/null) |
|
||||
echo "Downloaded $FILE: $FILE_SIZE bytes" |
|
||||
|
|
||||
if [ -f "/tmp/$FILE" ] && [ $FILE_SIZE -gt 0 ]; then |
|
||||
echo "SUCCESS: Captured $FILE" |
|
||||
echo "Installing parquet-tools..." |
|
||||
pip3 install -q parquet-tools 2>/dev/null || echo "parquet-tools might already be installed" |
|
||||
|
|
||||
echo "" |
|
||||
echo "=== Parquet File Metadata ===" |
|
||||
python3 -m parquet_tools meta "/tmp/$FILE" || echo "parquet-tools failed" |
|
||||
|
|
||||
echo "" |
|
||||
echo "=== File Header (first 100 bytes) ===" |
|
||||
hexdump -C "/tmp/$FILE" | head -10 |
|
||||
|
|
||||
echo "" |
|
||||
echo "=== File Footer (last 100 bytes) ===" |
|
||||
tail -c 100 "/tmp/$FILE" | hexdump -C |
|
||||
|
|
||||
kill $TEST_PID 2>/dev/null |
|
||||
exit 0 |
|
||||
fi |
|
||||
done |
|
||||
fi |
|
||||
sleep 0.5 |
|
||||
done |
|
||||
|
|
||||
echo "Test completed, checking logs..." |
|
||||
tail -50 /tmp/spark-test-capture.log |
|
||||
@ -1,180 +0,0 @@ |
|||||
#!/bin/bash |
|
||||
set -e |
|
||||
|
|
||||
echo "=== Downloading Parquet file and testing with multiple readers ===" |
|
||||
echo "" |
|
||||
|
|
||||
# Start services if not running |
|
||||
docker compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer 2>&1 | grep -v "Running" |
|
||||
sleep 3 |
|
||||
|
|
||||
# Write a file using Spark |
|
||||
echo "1. Writing Parquet file with Spark..." |
|
||||
docker compose run --rm -e SEAWEEDFS_TEST_ENABLED=true spark-tests bash -c ' |
|
||||
cd /workspace |
|
||||
# Run the test that writes a file |
|
||||
mvn test -Dtest=SparkSQLTest#testCreateTableAndQuery 2>&1 | tail -20 |
|
||||
' > /tmp/spark_write.log 2>&1 & |
|
||||
WRITE_PID=$! |
|
||||
|
|
||||
# Wait a bit for file to be written |
|
||||
sleep 8 |
|
||||
|
|
||||
# Find and download the file from the temporary directory |
|
||||
echo "2. Finding Parquet file in temporary directory..." |
|
||||
TEMP_FILE=$(docker compose exec -T seaweedfs-filer sh -c ' |
|
||||
find /data -name "*.parquet" -type f 2>/dev/null | grep -v "_SUCCESS" | head -1 |
|
||||
' 2>&1 | tr -d '\r') |
|
||||
|
|
||||
if [ -z "$TEMP_FILE" ]; then |
|
||||
echo "Waiting for file to be written..." |
|
||||
sleep 5 |
|
||||
TEMP_FILE=$(docker compose exec -T seaweedfs-filer sh -c ' |
|
||||
find /data -name "*.parquet" -type f 2>/dev/null | grep -v "_SUCCESS" | head -1 |
|
||||
' 2>&1 | tr -d '\r') |
|
||||
fi |
|
||||
|
|
||||
if [ -z "$TEMP_FILE" ]; then |
|
||||
echo "ERROR: No Parquet file found!" |
|
||||
echo "Checking what files exist..." |
|
||||
docker compose exec -T seaweedfs-filer sh -c 'find /data -type f 2>/dev/null | head -20' |
|
||||
wait $WRITE_PID |
|
||||
exit 1 |
|
||||
fi |
|
||||
|
|
||||
echo "Found: $TEMP_FILE" |
|
||||
|
|
||||
# Copy file from container |
|
||||
echo "3. Copying file from container..." |
|
||||
docker compose cp seaweedfs-filer:$TEMP_FILE /tmp/spark_written.parquet 2>&1 | grep -v "Successfully" |
|
||||
|
|
||||
# Also try to get it via HTTP |
|
||||
echo "4. Also downloading via HTTP API..." |
|
||||
# Get the file path relative to /data |
|
||||
REL_PATH=$(echo $TEMP_FILE | sed 's|/data||') |
|
||||
curl -s "http://localhost:8888${REL_PATH}" -o /tmp/spark_written_http.parquet 2>&1 |
|
||||
|
|
||||
# Use whichever file is larger/valid |
|
||||
if [ -f /tmp/spark_written.parquet ] && [ -s /tmp/spark_written.parquet ]; then |
|
||||
cp /tmp/spark_written.parquet /tmp/test.parquet |
|
||||
echo "Using file copied from container" |
|
||||
elif [ -f /tmp/spark_written_http.parquet ] && [ -s /tmp/spark_written_http.parquet ]; then |
|
||||
cp /tmp/spark_written_http.parquet /tmp/test.parquet |
|
||||
echo "Using file downloaded via HTTP" |
|
||||
else |
|
||||
echo "ERROR: Failed to get file!" |
|
||||
exit 1 |
|
||||
fi |
|
||||
|
|
||||
FILE_SIZE=$(stat -f%z /tmp/test.parquet 2>/dev/null || stat --format=%s /tmp/test.parquet 2>/dev/null) |
|
||||
echo "Got file: $FILE_SIZE bytes" |
|
||||
echo "" |
|
||||
|
|
||||
# Kill the write process |
|
||||
kill $WRITE_PID 2>/dev/null || true |
|
||||
wait $WRITE_PID 2>/dev/null || true |
|
||||
|
|
||||
# Now test with various readers |
|
||||
echo "=== Testing with Multiple Parquet Readers ===" |
|
||||
echo "" |
|
||||
|
|
||||
# 1. Check magic bytes |
|
||||
echo "1. Magic Bytes Check:" |
|
||||
echo -n " First 4 bytes: " |
|
||||
head -c 4 /tmp/test.parquet | xxd -p |
|
||||
echo -n " Last 4 bytes: " |
|
||||
tail -c 4 /tmp/test.parquet | xxd -p |
|
||||
|
|
||||
FIRST=$(head -c 4 /tmp/test.parquet | xxd -p) |
|
||||
LAST=$(tail -c 4 /tmp/test.parquet | xxd -p) |
|
||||
if [ "$FIRST" = "50415231" ] && [ "$LAST" = "50415231" ]; then |
|
||||
echo " ✅ Valid PAR1 magic bytes" |
|
||||
else |
|
||||
echo " ❌ Invalid magic bytes!" |
|
||||
fi |
|
||||
echo "" |
|
||||
|
|
||||
# 2. Python pyarrow |
|
||||
echo "2. Testing with Python pyarrow:" |
|
||||
python3 << 'PYEOF' |
|
||||
try: |
|
||||
import pyarrow.parquet as pq |
|
||||
table = pq.read_table('/tmp/test.parquet') |
|
||||
print(f" ✅ SUCCESS: Read {table.num_rows} rows, {table.num_columns} columns") |
|
||||
print(f" Schema: {table.schema}") |
|
||||
print(f" First row: {table.to_pandas().iloc[0].to_dict()}") |
|
||||
except Exception as e: |
|
||||
print(f" ❌ FAILED: {e}") |
|
||||
PYEOF |
|
||||
echo "" |
|
||||
|
|
||||
# 3. DuckDB |
|
||||
echo "3. Testing with DuckDB:" |
|
||||
python3 << 'PYEOF' |
|
||||
try: |
|
||||
import duckdb |
|
||||
conn = duckdb.connect(':memory:') |
|
||||
result = conn.execute("SELECT * FROM '/tmp/test.parquet'").fetchall() |
|
||||
print(f" ✅ SUCCESS: Read {len(result)} rows") |
|
||||
print(f" Data: {result}") |
|
||||
except Exception as e: |
|
||||
print(f" ❌ FAILED: {e}") |
|
||||
PYEOF |
|
||||
echo "" |
|
||||
|
|
||||
# 4. Pandas |
|
||||
echo "4. Testing with Pandas:" |
|
||||
python3 << 'PYEOF' |
|
||||
try: |
|
||||
import pandas as pd |
|
||||
df = pd.read_parquet('/tmp/test.parquet') |
|
||||
print(f" ✅ SUCCESS: Read {len(df)} rows, {len(df.columns)} columns") |
|
||||
print(f" Columns: {list(df.columns)}") |
|
||||
print(f" Data:\n{df}") |
|
||||
except Exception as e: |
|
||||
print(f" ❌ FAILED: {e}") |
|
||||
PYEOF |
|
||||
echo "" |
|
||||
|
|
||||
# 5. Java ParquetReader (using our test container) |
|
||||
echo "5. Testing with Java ParquetReader:" |
|
||||
docker compose run --rm spark-tests bash -c ' |
|
||||
cat > /tmp/ReadParquet.java << "JAVAEOF" |
|
||||
import org.apache.hadoop.conf.Configuration; |
|
||||
import org.apache.hadoop.fs.Path; |
|
||||
import org.apache.parquet.hadoop.ParquetReader; |
|
||||
import org.apache.parquet.hadoop.example.GroupReadSupport; |
|
||||
import org.apache.parquet.example.data.Group; |
|
||||
|
|
||||
public class ReadParquet { |
|
||||
public static void main(String[] args) throws Exception { |
|
||||
Configuration conf = new Configuration(); |
|
||||
Path path = new Path("/tmp/test.parquet"); |
|
||||
|
|
||||
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path) |
|
||||
.withConf(conf).build()) { |
|
||||
Group group; |
|
||||
int count = 0; |
|
||||
while ((group = reader.read()) != null && count < 5) { |
|
||||
System.out.println(" Row " + count + ": " + group); |
|
||||
count++; |
|
||||
} |
|
||||
System.out.println(" ✅ SUCCESS: Read " + count + " rows"); |
|
||||
} catch (Exception e) { |
|
||||
System.out.println(" ❌ FAILED: " + e.getMessage()); |
|
||||
e.printStackTrace(); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
JAVAEOF |
|
||||
|
|
||||
# Copy the file into container |
|
||||
cat > /tmp/test.parquet |
|
||||
' < /tmp/test.parquet 2>&1 | head -1 |
|
||||
|
|
||||
echo "" |
|
||||
echo "=== Summary ===" |
|
||||
echo "File size: $FILE_SIZE bytes" |
|
||||
echo "If all readers succeeded, the file is VALID." |
|
||||
echo "If readers failed, the footer metadata is corrupted." |
|
||||
|
|
||||
@ -1,34 +0,0 @@ |
|||||
#!/bin/bash |
|
||||
# This script patches the Parquet JAR to use LinkedHashSet instead of HashSet |
|
||||
|
|
||||
JAR_PATH="$HOME/.m2/repository/org/apache/parquet/parquet-hadoop/1.14.4/parquet-hadoop-1.14.4.jar" |
|
||||
BACKUP_PATH="$HOME/.m2/repository/org/apache/parquet/parquet-hadoop/1.14.4/parquet-hadoop-1.14.4.jar.backup" |
|
||||
|
|
||||
echo "Patching Parquet JAR at: $JAR_PATH" |
|
||||
|
|
||||
# Backup original JAR |
|
||||
if [ ! -f "$BACKUP_PATH" ]; then |
|
||||
cp "$JAR_PATH" "$BACKUP_PATH" |
|
||||
echo "Created backup at: $BACKUP_PATH" |
|
||||
fi |
|
||||
|
|
||||
# Extract the JAR |
|
||||
TEMP_DIR=$(mktemp -d) |
|
||||
cd "$TEMP_DIR" |
|
||||
jar xf "$JAR_PATH" |
|
||||
|
|
||||
# Find and patch the class file |
|
||||
# We need to modify the bytecode to change HashSet to LinkedHashSet |
|
||||
# This is complex, so let's document what needs to be done |
|
||||
|
|
||||
echo "JAR extracted to: $TEMP_DIR" |
|
||||
echo "To patch, we need to:" |
|
||||
echo "1. Decompile ParquetFileWriter.class" |
|
||||
echo "2. Change HashSet to LinkedHashSet" |
|
||||
echo "3. Recompile" |
|
||||
echo "4. Repackage JAR" |
|
||||
echo "" |
|
||||
echo "This requires javap, javac with all dependencies, and jar" |
|
||||
echo "Simpler approach: Use the patched source to rebuild the module" |
|
||||
|
|
||||
rm -rf "$TEMP_DIR" |
|
||||
@ -1,40 +0,0 @@ |
|||||
#!/bin/bash |
|
||||
|
|
||||
# Run a single test method for quick iteration |
|
||||
|
|
||||
set -e |
|
||||
|
|
||||
if [ $# -eq 0 ]; then |
|
||||
echo "Usage: ./test-one.sh <TestClass>#<methodName>" |
|
||||
echo "" |
|
||||
echo "Examples:" |
|
||||
echo " ./test-one.sh SparkReadWriteTest#testWriteAndReadParquet" |
|
||||
echo " ./test-one.sh SparkSQLTest#testCreateTableAndQuery" |
|
||||
echo "" |
|
||||
exit 1 |
|
||||
fi |
|
||||
|
|
||||
# Check if SeaweedFS is running |
|
||||
if ! curl -f http://localhost:8888/ > /dev/null 2>&1; then |
|
||||
echo "✗ SeaweedFS filer is not accessible at http://localhost:8888" |
|
||||
echo "" |
|
||||
echo "Please start SeaweedFS first:" |
|
||||
echo " docker-compose up -d" |
|
||||
echo "" |
|
||||
exit 1 |
|
||||
fi |
|
||||
|
|
||||
echo "✓ SeaweedFS filer is accessible" |
|
||||
echo "" |
|
||||
echo "Running test: $1" |
|
||||
echo "" |
|
||||
|
|
||||
# Set environment variables |
|
||||
export SEAWEEDFS_TEST_ENABLED=true |
|
||||
export SEAWEEDFS_FILER_HOST=localhost |
|
||||
export SEAWEEDFS_FILER_PORT=8888 |
|
||||
export SEAWEEDFS_FILER_GRPC_PORT=18888 |
|
||||
|
|
||||
# Run the specific test |
|
||||
mvn test -Dtest="$1" |
|
||||
|
|
||||
@ -1,55 +0,0 @@ |
|||||
#!/bin/bash |
|
||||
set -e |
|
||||
|
|
||||
echo "=== Testing if Parquet file can be read by external tools ===" |
|
||||
|
|
||||
# Use our working ParquetMemoryComparisonTest to write a file |
|
||||
echo "1. Writing Parquet file with ParquetWriter (known to work)..." |
|
||||
docker compose run --rm -e SEAWEEDFS_TEST_ENABLED=true spark-tests bash -c ' |
|
||||
cd /workspace |
|
||||
mvn test -Dtest=ParquetMemoryComparisonTest#testCompareMemoryVsSeaweedFSParquet -q 2>&1 | tail -10 |
|
||||
' > /tmp/write_test.log 2>&1 |
|
||||
|
|
||||
# The test writes to: /test-spark/comparison-test.parquet |
|
||||
echo "2. Downloading file from SeaweedFS..." |
|
||||
curl -s "http://localhost:8888/test-spark/comparison-test.parquet" -o /tmp/test.parquet |
|
||||
|
|
||||
if [ ! -f /tmp/test.parquet ] || [ ! -s /tmp/test.parquet ]; then |
|
||||
echo "ERROR: Failed to download file!" |
|
||||
echo "Checking if file exists..." |
|
||||
curl -s "http://localhost:8888/test-spark/?pretty=y" |
|
||||
exit 1 |
|
||||
fi |
|
||||
|
|
||||
FILE_SIZE=$(stat -f%z /tmp/test.parquet 2>/dev/null || stat --format=%s /tmp/test.parquet 2>/dev/null) |
|
||||
echo "Downloaded $FILE_SIZE bytes" |
|
||||
|
|
||||
# Install parquet-tools if needed |
|
||||
pip3 install -q parquet-tools 2>&1 | grep -v "Requirement already satisfied" || true |
|
||||
|
|
||||
echo "" |
|
||||
echo "=== File Header (first 100 bytes) ===" |
|
||||
hexdump -C /tmp/test.parquet | head -10 |
|
||||
|
|
||||
echo "" |
|
||||
echo "=== File Footer (last 100 bytes) ===" |
|
||||
tail -c 100 /tmp/test.parquet | hexdump -C |
|
||||
|
|
||||
echo "" |
|
||||
echo "=== Parquet Metadata ===" |
|
||||
parquet-tools inspect /tmp/test.parquet 2>&1 || echo "FAILED to inspect" |
|
||||
|
|
||||
echo "" |
|
||||
echo "=== Try to read data ===" |
|
||||
parquet-tools show /tmp/test.parquet 2>&1 | head -20 || echo "FAILED to read data" |
|
||||
|
|
||||
echo "" |
|
||||
echo "=== Conclusion ===" |
|
||||
if parquet-tools show /tmp/test.parquet > /dev/null 2>&1; then |
|
||||
echo "✅ SUCCESS: File written to SeaweedFS can be read by parquet-tools!" |
|
||||
echo "This proves the file format is valid." |
|
||||
else |
|
||||
echo "❌ FAILED: File cannot be read by parquet-tools" |
|
||||
echo "The file may be corrupted." |
|
||||
fi |
|
||||
|
|
||||
@ -1,60 +0,0 @@ |
|||||
#!/bin/bash |
|
||||
set -e |
|
||||
|
|
||||
echo "=== Testing if Parquet file written by Spark can be read by parquet-tools ===" |
|
||||
|
|
||||
# Run the test to write a Parquet file |
|
||||
echo "1. Writing Parquet file with Spark..." |
|
||||
docker compose run --rm -e SEAWEEDFS_TEST_ENABLED=true spark-tests bash -c ' |
|
||||
cd /workspace |
|
||||
mvn test -Dtest=SparkSQLTest#testCreateTableAndQuery -q 2>&1 | tail -5 |
|
||||
' > /tmp/write_test.log 2>&1 || true |
|
||||
|
|
||||
# Find the Parquet file that was written |
|
||||
echo "2. Finding Parquet file..." |
|
||||
PARQUET_FILE=$(docker compose run --rm spark-tests bash -c ' |
|
||||
curl -s "http://seaweedfs-filer:8888/test-spark/employees/?pretty=y" | grep -oP "\"name\":\s*\"\K[^\"]+\.parquet" | head -1 |
|
||||
' 2>&1 | grep -v "Creating" | grep "\.parquet" | head -1) |
|
||||
|
|
||||
if [ -z "$PARQUET_FILE" ]; then |
|
||||
echo "ERROR: No Parquet file found!" |
|
||||
exit 1 |
|
||||
fi |
|
||||
|
|
||||
echo "Found file: $PARQUET_FILE" |
|
||||
|
|
||||
# Download the file |
|
||||
echo "3. Downloading file from SeaweedFS..." |
|
||||
curl -s "http://localhost:8888/test-spark/employees/$PARQUET_FILE" -o /tmp/test.parquet |
|
||||
|
|
||||
if [ ! -f /tmp/test.parquet ] || [ ! -s /tmp/test.parquet ]; then |
|
||||
echo "ERROR: Failed to download file!" |
|
||||
exit 1 |
|
||||
fi |
|
||||
|
|
||||
FILE_SIZE=$(stat -f%z /tmp/test.parquet 2>/dev/null || stat --format=%s /tmp/test.parquet 2>/dev/null) |
|
||||
echo "Downloaded $FILE_SIZE bytes" |
|
||||
|
|
||||
# Try to read with parquet-tools |
|
||||
echo "4. Reading with parquet-tools..." |
|
||||
pip3 install -q parquet-tools 2>&1 | grep -v "Requirement already satisfied" || true |
|
||||
|
|
||||
echo "" |
|
||||
echo "=== Parquet Metadata ===" |
|
||||
parquet-tools inspect /tmp/test.parquet 2>&1 || echo "FAILED to inspect" |
|
||||
|
|
||||
echo "" |
|
||||
echo "=== Try to read data ===" |
|
||||
parquet-tools show /tmp/test.parquet 2>&1 || echo "FAILED to read data" |
|
||||
|
|
||||
echo "" |
|
||||
echo "=== Conclusion ===" |
|
||||
if parquet-tools show /tmp/test.parquet > /dev/null 2>&1; then |
|
||||
echo "✅ SUCCESS: File can be read by parquet-tools!" |
|
||||
echo "The file itself is VALID Parquet format." |
|
||||
echo "The issue is specific to how Spark reads it back." |
|
||||
else |
|
||||
echo "❌ FAILED: File cannot be read by parquet-tools" |
|
||||
echo "The file is CORRUPTED or has invalid Parquet format." |
|
||||
fi |
|
||||
|
|
||||
@ -1,120 +0,0 @@ |
|||||
#!/bin/bash |
|
||||
set -e |
|
||||
|
|
||||
echo "=== Testing Parquet file with multiple readers ===" |
|
||||
echo "" |
|
||||
|
|
||||
# Start services |
|
||||
docker compose up -d 2>&1 | grep -v "Running" |
|
||||
sleep 2 |
|
||||
|
|
||||
# Run test and capture chunk ID |
|
||||
echo "1. Writing Parquet file and capturing chunk ID..." |
|
||||
docker compose run --rm -e SEAWEEDFS_TEST_ENABLED=true spark-tests bash -c ' |
|
||||
cd /workspace |
|
||||
mvn test -Dtest=SparkSQLTest#testCreateTableAndQuery 2>&1 |
|
||||
' 2>&1 | tee /tmp/test_output.log | tail -20 & |
|
||||
TEST_PID=$! |
|
||||
|
|
||||
# Wait for the file to be written |
|
||||
echo "2. Waiting for file write..." |
|
||||
sleep 10 |
|
||||
|
|
||||
# Extract chunk ID from logs |
|
||||
CHUNK_ID=$(grep "PARQUET FILE WRITTEN TO EMPLOYEES" /tmp/test_output.log | grep -oP 'CHUNKS: \[\K[^\]]+' | head -1) |
|
||||
|
|
||||
if [ -z "$CHUNK_ID" ]; then |
|
||||
echo "Waiting more..." |
|
||||
sleep 5 |
|
||||
CHUNK_ID=$(grep "PARQUET FILE WRITTEN TO EMPLOYEES" /tmp/test_output.log | grep -oP 'CHUNKS: \[\K[^\]]+' | head -1) |
|
||||
fi |
|
||||
|
|
||||
if [ -z "$CHUNK_ID" ]; then |
|
||||
echo "ERROR: Could not find chunk ID in logs" |
|
||||
echo "Log excerpt:" |
|
||||
grep -E "PARQUET|CHUNKS|employees" /tmp/test_output.log | tail -20 |
|
||||
kill $TEST_PID 2>/dev/null || true |
|
||||
exit 1 |
|
||||
fi |
|
||||
|
|
||||
echo "Found chunk ID: $CHUNK_ID" |
|
||||
|
|
||||
# Download directly from volume server |
|
||||
echo "3. Downloading from volume server..." |
|
||||
curl -s "http://localhost:8080/$CHUNK_ID" -o /tmp/test.parquet |
|
||||
|
|
||||
if [ ! -f /tmp/test.parquet ] || [ ! -s /tmp/test.parquet ]; then |
|
||||
echo "ERROR: Download failed!" |
|
||||
exit 1 |
|
||||
fi |
|
||||
|
|
||||
FILE_SIZE=$(stat -f%z /tmp/test.parquet 2>/dev/null || stat --format=%s /tmp/test.parquet 2>/dev/null) |
|
||||
echo "Downloaded: $FILE_SIZE bytes" |
|
||||
echo "" |
|
||||
|
|
||||
# Kill test process |
|
||||
kill $TEST_PID 2>/dev/null || true |
|
||||
wait $TEST_PID 2>/dev/null || true |
|
||||
|
|
||||
# Test with readers |
|
||||
echo "=== Testing with Multiple Parquet Readers ===" |
|
||||
echo "" |
|
||||
|
|
||||
# Check magic bytes |
|
||||
echo "1. Magic Bytes:" |
|
||||
FIRST=$(head -c 4 /tmp/test.parquet | xxd -p) |
|
||||
LAST=$(tail -c 4 /tmp/test.parquet | xxd -p) |
|
||||
echo " First 4 bytes: $FIRST" |
|
||||
echo " Last 4 bytes: $LAST" |
|
||||
if [ "$FIRST" = "50415231" ] && [ "$LAST" = "50415231" ]; then |
|
||||
echo " ✅ Valid PAR1 magic" |
|
||||
else |
|
||||
echo " ❌ Invalid magic!" |
|
||||
fi |
|
||||
echo "" |
|
||||
|
|
||||
# Python pyarrow |
|
||||
echo "2. Python pyarrow:" |
|
||||
python3 -c " |
|
||||
import pyarrow.parquet as pq |
|
||||
try: |
|
||||
table = pq.read_table('/tmp/test.parquet') |
|
||||
print(f' ✅ Read {table.num_rows} rows, {table.num_columns} columns') |
|
||||
print(f' Data: {table.to_pandas().to_dict(\"records\")}') |
|
||||
except Exception as e: |
|
||||
print(f' ❌ FAILED: {e}') |
|
||||
" 2>&1 |
|
||||
echo "" |
|
||||
|
|
||||
# Pandas |
|
||||
echo "3. Pandas:" |
|
||||
python3 -c " |
|
||||
import pandas as pd |
|
||||
try: |
|
||||
df = pd.read_parquet('/tmp/test.parquet') |
|
||||
print(f' ✅ Read {len(df)} rows') |
|
||||
print(f' Data:\n{df}') |
|
||||
except Exception as e: |
|
||||
print(f' ❌ FAILED: {e}') |
|
||||
" 2>&1 |
|
||||
echo "" |
|
||||
|
|
||||
# DuckDB |
|
||||
echo "4. DuckDB:" |
|
||||
python3 -c " |
|
||||
import duckdb |
|
||||
try: |
|
||||
conn = duckdb.connect(':memory:') |
|
||||
result = conn.execute('SELECT * FROM \"/tmp/test.parquet\"').fetchall() |
|
||||
print(f' ✅ Read {len(result)} rows') |
|
||||
print(f' Data: {result}') |
|
||||
except Exception as e: |
|
||||
print(f' ❌ FAILED: {e}') |
|
||||
" 2>&1 |
|
||||
echo "" |
|
||||
|
|
||||
echo "=== Summary ===" |
|
||||
echo "File: $FILE_SIZE bytes" |
|
||||
echo "If readers succeeded: File is VALID ✅" |
|
||||
echo "If readers failed: Footer metadata is corrupted ❌" |
|
||||
|
|
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue