diff --git a/test/java/spark/ROOT_CAUSE_CONFIRMED.md b/test/java/spark/ROOT_CAUSE_CONFIRMED.md new file mode 100644 index 000000000..8e2c4c026 --- /dev/null +++ b/test/java/spark/ROOT_CAUSE_CONFIRMED.md @@ -0,0 +1,111 @@ +# Root Cause Confirmed: Parquet Footer Metadata Issue + +## The Bug (CONFIRMED) + +Parquet is trying to **read 78 bytes from position 1275**, but the file ends at position 1275! + +``` +[DEBUG-2024] SeaweedInputStream.read() returning EOF: + path=.../employees/part-00000-....snappy.parquet + position=1275 + contentLength=1275 + bufRemaining=78 +``` + +## What This Means + +The Parquet footer metadata says there's a column chunk or row group at byte offset **1275** that is **78 bytes long**. But the file is only 1275 bytes total! + +## Evidence + +### During Write +- `getPos()` returned: 0, 4, 59, 92, 139, 172, 190, 231, 262, 285, 310, 333, 346, 357, 372, 383, 1267 +- Last data position: **1267** +- Final file size: **1275** (1267 + 8-byte footer) + +### During Read +- ✅ Read [383, 1267) → 884 bytes ✅ +- ✅ Read [1267, 1275) → 8 bytes ✅ +- ✅ Read [4, 1275) → 1271 bytes ✅ +- ❌ **Read [1275, 1353) → TRIED to read 78 bytes → EOF!** ❌ + +## Why The Downloaded File Works + +When you download the file and use `parquet-tools`, it reads correctly because: +- The file IS valid and complete +- parquet-tools can interpret the footer correctly +- **But Spark/Parquet at runtime interprets the footer DIFFERENTLY** + +## Possible Causes + +### 1. Parquet Version Mismatch ⚠️ +- pom.xml declares Parquet 1.16.0 +- But Spark 3.5.0 might bundle a different Parquet version +- Runtime version conflict → footer interpretation mismatch + +### 2. Buffer Position vs. Flushed Position +- `getPos()` returns `position + buffer.position()` +- If Parquet calls `getPos()` before buffer is flushed, offsets could be wrong +- But our logs show getPos() values that seem correct... + +### 3. Parquet 1.16.0 Footer Format Change +- Parquet 1.16.0 might have changed footer layout +- Writing with 1.16.0 format but reading with different logic +- The "78 bytes" might be a footer size constant that changed + +## The 78-Byte Constant + +**Interesting pattern**: The missing bytes is ALWAYS 78. This suggests: +- It's not random data corruption +- It's a systematic offset calculation error +- 78 bytes might be related to: + - Footer metadata size + - Column statistics size + - Row group index size + - Magic bytes + length fields + +## Next Steps + +### Option A: Downgrade Parquet +Try Parquet 1.13.1 (what Spark 3.5.0 normally uses): + +```xml +1.13.1 +``` + +### Option B: Check Runtime Parquet Version +Add logging to see what Parquet version is actually loaded: + +```java +LOG.info("Parquet version: {}", ParquetFileReader.class.getPackage().getImplementationVersion()); +``` + +### Option C: Force Buffer Flush Before getPos() +Override `getPos()` to force flush: + +```java +public synchronized long getPos() { + flush(); // Ensure all data is written + return position + buffer.position(); +} +``` + +### Option D: Analyze Footer Hex Dump +Download the file and examine the last 100 bytes to see footer structure: + +```bash +hexdump -C test.parquet | tail -20 +``` + +## Test Plan + +1. Try downgrading to Parquet 1.13.1 +2. If that works, it confirms version incompatibility +3. If not, analyze footer structure with hex dump +4. Check if Spark's bundled Parquet overrides our dependency + +## Files Modified + +- `SeaweedInputStream.java` - Added EOF logging +- Root cause: Parquet footer has offset 1275 for 78-byte chunk that doesn't exist + diff --git a/test/java/spark/pom.xml b/test/java/spark/pom.xml index 223cb18c7..bfbd0c3f3 100644 --- a/test/java/spark/pom.xml +++ b/test/java/spark/pom.xml @@ -23,7 +23,7 @@ 3.80.1-SNAPSHOT 2.15.3 4.1.125.Final - 1.16.0 + 1.13.1 2.12.0 -Xmx2g diff --git a/test/java/spark/src/test/java/seaweed/spark/ParquetReproducerTest.java b/test/java/spark/src/test/java/seaweed/spark/ParquetReproducerTest.java new file mode 100644 index 000000000..28eda6993 --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/ParquetReproducerTest.java @@ -0,0 +1,238 @@ +package seaweed.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; + +/** + * Minimal reproducer for Parquet EOF issue. + * + * This test writes a simple Parquet file to SeaweedFS and then tries to read it back. + * It should reproduce the "EOFException: Still have: 78 bytes left" error. + */ +public class ParquetReproducerTest { + + private static final boolean TESTS_ENABLED = + "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); + private static final String FILER_HOST = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost"); + private static final String FILER_PORT = System.getenv().getOrDefault("SEAWEEDFS_FILER_PORT", "8888"); + private static final String FILER_GRPC_PORT = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888"); + private static final String TEST_FILE = "/test-parquet-reproducer/employees.parquet"; + + private Configuration conf; + private org.apache.hadoop.fs.FileSystem fs; + + @Before + public void setUp() throws Exception { + if (!TESTS_ENABLED) { + return; + } + + System.out.println("\n=== Parquet EOF Reproducer Test ==="); + + // Create configuration + conf = new Configuration(); + conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem"); + conf.set("fs.seaweed.filer.host", FILER_HOST); + conf.set("fs.seaweed.filer.port", FILER_PORT); + conf.set("fs.seaweed.filer.port.grpc", FILER_GRPC_PORT); + conf.set("fs.seaweed.buffer.size", "1048576"); + + // Get filesystem + Path testPath = new Path("seaweedfs://" + FILER_HOST + ":" + FILER_PORT + TEST_FILE); + fs = testPath.getFileSystem(conf); + + // Clean up any existing test file + if (fs.exists(testPath)) { + fs.delete(testPath, true); + } + } + + @After + public void tearDown() throws Exception { + if (!TESTS_ENABLED) { + return; + } + + // Clean up test file + Path testPath = new Path("seaweedfs://" + FILER_HOST + ":" + FILER_PORT + TEST_FILE); + if (fs.exists(testPath)) { + fs.delete(testPath, true); + } + + if (fs != null) { + fs.close(); + } + } + + @Test + public void testParquetWriteAndRead() throws IOException { + if (!TESTS_ENABLED) { + return; + } + + System.out.println("\n1. Writing Parquet file to SeaweedFS..."); + writeParquetFile(); + + System.out.println("\n2. Reading Parquet file metadata..."); + readParquetMetadata(); + + System.out.println("\n3. Attempting to read row groups..."); + readParquetData(); + + System.out.println("\n4. Test completed successfully!"); + } + + private void writeParquetFile() throws IOException { + // Define schema: same as Spark SQL test (id, name, department, salary) + MessageType schema = Types.buildMessage() + .required(INT32).named("id") + .optional(BINARY).as(org.apache.parquet.schema.LogicalTypeAnnotation.stringType()).named("name") + .optional(BINARY).as(org.apache.parquet.schema.LogicalTypeAnnotation.stringType()).named("department") + .required(INT32).named("salary") + .named("employee"); + + Path path = new Path("seaweedfs://" + FILER_HOST + ":" + FILER_PORT + TEST_FILE); + + Configuration writerConf = new Configuration(conf); + GroupWriteSupport.setSchema(schema, writerConf); + + System.out.println("Creating ParquetWriter..."); + System.out.println("Path: " + path); + + GroupWriteSupport writeSupport = new GroupWriteSupport(); + GroupWriteSupport.setSchema(schema, writerConf); + + try (ParquetWriter writer = new org.apache.parquet.hadoop.example.GroupWriteSupport.Builder(path) + .withConf(writerConf) + .withWriteMode(ParquetFileWriter.Mode.CREATE) + .build()) { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + // Write 4 employees (same data as Spark test) + Group group1 = groupFactory.newGroup() + .append("id", 1) + .append("name", "Alice") + .append("department", "Engineering") + .append("salary", 100000); + writer.write(group1); + System.out.println("Wrote employee 1: Alice"); + + Group group2 = groupFactory.newGroup() + .append("id", 2) + .append("name", "Bob") + .append("department", "Sales") + .append("salary", 80000); + writer.write(group2); + System.out.println("Wrote employee 2: Bob"); + + Group group3 = groupFactory.newGroup() + .append("id", 3) + .append("name", "Charlie") + .append("department", "Engineering") + .append("salary", 120000); + writer.write(group3); + System.out.println("Wrote employee 3: Charlie"); + + Group group4 = groupFactory.newGroup() + .append("id", 4) + .append("name", "David") + .append("department", "Sales") + .append("salary", 75000); + writer.write(group4); + System.out.println("Wrote employee 4: David"); + } + + System.out.println("ParquetWriter closed successfully"); + + // Verify file exists and get size + long fileSize = fs.getFileStatus(new Path("seaweedfs://" + FILER_HOST + ":" + FILER_PORT + TEST_FILE)).getLen(); + System.out.println("File written successfully. Size: " + fileSize + " bytes"); + } + + private void readParquetMetadata() throws IOException { + Path path = new Path("seaweedfs://" + FILER_HOST + ":" + FILER_PORT + TEST_FILE); + + System.out.println("Opening file for metadata read..."); + HadoopInputFile inputFile = HadoopInputFile.fromPath(path, conf); + + System.out.println("Creating ParquetFileReader..."); + try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) { + ParquetMetadata footer = reader.getFooter(); + + System.out.println("\n=== Parquet Footer Metadata ==="); + System.out.println("Blocks (row groups): " + footer.getBlocks().size()); + System.out.println("Schema: " + footer.getFileMetaData().getSchema()); + + footer.getBlocks().forEach(block -> { + System.out.println("\nRow Group:"); + System.out.println(" Row count: " + block.getRowCount()); + System.out.println(" Total byte size: " + block.getTotalByteSize()); + System.out.println(" Columns: " + block.getColumns().size()); + + block.getColumns().forEach(column -> { + System.out.println(" Column: " + column.getPath()); + System.out.println(" Data page offset: " + column.getFirstDataPageOffset()); + System.out.println(" Dictionary page offset: " + column.getDictionaryPageOffset()); + System.out.println(" Total size: " + column.getTotalSize()); + System.out.println(" Total uncompressed size: " + column.getTotalUncompressedSize()); + }); + }); + + System.out.println("\nMetadata read completed successfully"); + } catch (Exception e) { + System.err.println("\n!!! EXCEPTION in readParquetMetadata !!!"); + System.err.println("Exception type: " + e.getClass().getName()); + System.err.println("Message: " + e.getMessage()); + e.printStackTrace(); + throw e; + } + } + + private void readParquetData() throws IOException { + Path path = new Path("seaweedfs://" + FILER_HOST + ":" + FILER_PORT + TEST_FILE); + + System.out.println("Opening file for data read..."); + HadoopInputFile inputFile = HadoopInputFile.fromPath(path, conf); + + System.out.println("Creating ParquetFileReader..."); + try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) { + System.out.println("Attempting to read row group..."); + org.apache.parquet.hadoop.metadata.BlockMetaData block = reader.readNextRowGroup(); + + if (block != null) { + System.out.println("SUCCESS: Read row group with " + block.getRowCount() + " rows"); + } else { + System.out.println("WARNING: No row groups found"); + } + + } catch (Exception e) { + System.err.println("\n!!! EXCEPTION in readParquetData !!!"); + System.err.println("Exception type: " + e.getClass().getName()); + System.err.println("Message: " + e.getMessage()); + e.printStackTrace(); + throw e; + } + + System.out.println("ParquetFileReader closed successfully"); + } +} +