Browse Source

debug: confirmed root cause - Parquet tries to read 78 bytes past EOF

**KEY FINDING:**

Parquet is trying to read 78 bytes starting at position 1275, but the file ends at 1275!

This means:
1. The Parquet footer metadata contains INCORRECT offsets or sizes
2. It thinks there's a column chunk or row group at bytes [1275-1353)
3. But the actual file is only 1275 bytes

During write, getPos() returned correct values (0, 190, 231, 262, etc., up to 1267).
Final file size: 1275 bytes (1267 data + 8-byte footer).

During read:
- Successfully reads [383, 1267) → 884 bytes 
- Successfully reads [1267, 1275) → 8 bytes 
- Successfully reads [4, 1275) → 1271 bytes 
- FAILS trying to read [1275, 1353) → 78 bytes 

The '78 bytes' is ALWAYS constant across all test runs, indicating a systematic
offset calculation error, not random corruption.

Files modified:
- SeaweedInputStream.java - Added EOF logging to early return path
- ROOT_CAUSE_CONFIRMED.md - Analysis document
- ParquetReproducerTest.java - Attempted standalone reproducer (incomplete)
- pom.xml - Downgraded Parquet to 1.13.1 (didn't fix issue)

Next: The issue is likely in how getPos() is called during column chunk writes.
The footer records incorrect offsets, making it expect data beyond EOF.
pull/7526/head
chrislu 1 week ago
parent
commit
8f33f5240d
  1. 111
      test/java/spark/ROOT_CAUSE_CONFIRMED.md
  2. 2
      test/java/spark/pom.xml
  3. 238
      test/java/spark/src/test/java/seaweed/spark/ParquetReproducerTest.java

111
test/java/spark/ROOT_CAUSE_CONFIRMED.md

@ -0,0 +1,111 @@
# Root Cause Confirmed: Parquet Footer Metadata Issue
## The Bug (CONFIRMED)
Parquet is trying to **read 78 bytes from position 1275**, but the file ends at position 1275!
```
[DEBUG-2024] SeaweedInputStream.read() returning EOF:
path=.../employees/part-00000-....snappy.parquet
position=1275
contentLength=1275
bufRemaining=78
```
## What This Means
The Parquet footer metadata says there's a column chunk or row group at byte offset **1275** that is **78 bytes long**. But the file is only 1275 bytes total!
## Evidence
### During Write
- `getPos()` returned: 0, 4, 59, 92, 139, 172, 190, 231, 262, 285, 310, 333, 346, 357, 372, 383, 1267
- Last data position: **1267**
- Final file size: **1275** (1267 + 8-byte footer)
### During Read
- ✅ Read [383, 1267) → 884 bytes ✅
- ✅ Read [1267, 1275) → 8 bytes ✅
- ✅ Read [4, 1275) → 1271 bytes ✅
- ❌ **Read [1275, 1353) → TRIED to read 78 bytes → EOF!**
## Why The Downloaded File Works
When you download the file and use `parquet-tools`, it reads correctly because:
- The file IS valid and complete
- parquet-tools can interpret the footer correctly
- **But Spark/Parquet at runtime interprets the footer DIFFERENTLY**
## Possible Causes
### 1. Parquet Version Mismatch ⚠️
- pom.xml declares Parquet 1.16.0
- But Spark 3.5.0 might bundle a different Parquet version
- Runtime version conflict → footer interpretation mismatch
### 2. Buffer Position vs. Flushed Position
- `getPos()` returns `position + buffer.position()`
- If Parquet calls `getPos()` before buffer is flushed, offsets could be wrong
- But our logs show getPos() values that seem correct...
### 3. Parquet 1.16.0 Footer Format Change
- Parquet 1.16.0 might have changed footer layout
- Writing with 1.16.0 format but reading with different logic
- The "78 bytes" might be a footer size constant that changed
## The 78-Byte Constant
**Interesting pattern**: The missing bytes is ALWAYS 78. This suggests:
- It's not random data corruption
- It's a systematic offset calculation error
- 78 bytes might be related to:
- Footer metadata size
- Column statistics size
- Row group index size
- Magic bytes + length fields
## Next Steps
### Option A: Downgrade Parquet
Try Parquet 1.13.1 (what Spark 3.5.0 normally uses):
```xml
<parquet.version>1.13.1</parquet.version>
```
### Option B: Check Runtime Parquet Version
Add logging to see what Parquet version is actually loaded:
```java
LOG.info("Parquet version: {}", ParquetFileReader.class.getPackage().getImplementationVersion());
```
### Option C: Force Buffer Flush Before getPos()
Override `getPos()` to force flush:
```java
public synchronized long getPos() {
flush(); // Ensure all data is written
return position + buffer.position();
}
```
### Option D: Analyze Footer Hex Dump
Download the file and examine the last 100 bytes to see footer structure:
```bash
hexdump -C test.parquet | tail -20
```
## Test Plan
1. Try downgrading to Parquet 1.13.1
2. If that works, it confirms version incompatibility
3. If not, analyze footer structure with hex dump
4. Check if Spark's bundled Parquet overrides our dependency
## Files Modified
- `SeaweedInputStream.java` - Added EOF logging
- Root cause: Parquet footer has offset 1275 for 78-byte chunk that doesn't exist

2
test/java/spark/pom.xml

@ -23,7 +23,7 @@
<seaweedfs.hadoop3.client.version>3.80.1-SNAPSHOT</seaweedfs.hadoop3.client.version>
<jackson.version>2.15.3</jackson.version>
<netty.version>4.1.125.Final</netty.version>
<parquet.version>1.16.0</parquet.version>
<parquet.version>1.13.1</parquet.version> <!-- Downgraded to match Spark 3.5.0 default -->
<parquet.format.version>2.12.0</parquet.format.version>
<surefire.jvm.args>
-Xmx2g

238
test/java/spark/src/test/java/seaweed/spark/ParquetReproducerTest.java

@ -0,0 +1,238 @@
package seaweed.spark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
/**
* Minimal reproducer for Parquet EOF issue.
*
* This test writes a simple Parquet file to SeaweedFS and then tries to read it back.
* It should reproduce the "EOFException: Still have: 78 bytes left" error.
*/
public class ParquetReproducerTest {
private static final boolean TESTS_ENABLED =
"true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
private static final String FILER_HOST = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost");
private static final String FILER_PORT = System.getenv().getOrDefault("SEAWEEDFS_FILER_PORT", "8888");
private static final String FILER_GRPC_PORT = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888");
private static final String TEST_FILE = "/test-parquet-reproducer/employees.parquet";
private Configuration conf;
private org.apache.hadoop.fs.FileSystem fs;
@Before
public void setUp() throws Exception {
if (!TESTS_ENABLED) {
return;
}
System.out.println("\n=== Parquet EOF Reproducer Test ===");
// Create configuration
conf = new Configuration();
conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
conf.set("fs.seaweed.filer.host", FILER_HOST);
conf.set("fs.seaweed.filer.port", FILER_PORT);
conf.set("fs.seaweed.filer.port.grpc", FILER_GRPC_PORT);
conf.set("fs.seaweed.buffer.size", "1048576");
// Get filesystem
Path testPath = new Path("seaweedfs://" + FILER_HOST + ":" + FILER_PORT + TEST_FILE);
fs = testPath.getFileSystem(conf);
// Clean up any existing test file
if (fs.exists(testPath)) {
fs.delete(testPath, true);
}
}
@After
public void tearDown() throws Exception {
if (!TESTS_ENABLED) {
return;
}
// Clean up test file
Path testPath = new Path("seaweedfs://" + FILER_HOST + ":" + FILER_PORT + TEST_FILE);
if (fs.exists(testPath)) {
fs.delete(testPath, true);
}
if (fs != null) {
fs.close();
}
}
@Test
public void testParquetWriteAndRead() throws IOException {
if (!TESTS_ENABLED) {
return;
}
System.out.println("\n1. Writing Parquet file to SeaweedFS...");
writeParquetFile();
System.out.println("\n2. Reading Parquet file metadata...");
readParquetMetadata();
System.out.println("\n3. Attempting to read row groups...");
readParquetData();
System.out.println("\n4. Test completed successfully!");
}
private void writeParquetFile() throws IOException {
// Define schema: same as Spark SQL test (id, name, department, salary)
MessageType schema = Types.buildMessage()
.required(INT32).named("id")
.optional(BINARY).as(org.apache.parquet.schema.LogicalTypeAnnotation.stringType()).named("name")
.optional(BINARY).as(org.apache.parquet.schema.LogicalTypeAnnotation.stringType()).named("department")
.required(INT32).named("salary")
.named("employee");
Path path = new Path("seaweedfs://" + FILER_HOST + ":" + FILER_PORT + TEST_FILE);
Configuration writerConf = new Configuration(conf);
GroupWriteSupport.setSchema(schema, writerConf);
System.out.println("Creating ParquetWriter...");
System.out.println("Path: " + path);
GroupWriteSupport writeSupport = new GroupWriteSupport();
GroupWriteSupport.setSchema(schema, writerConf);
try (ParquetWriter<Group> writer = new org.apache.parquet.hadoop.example.GroupWriteSupport.Builder(path)
.withConf(writerConf)
.withWriteMode(ParquetFileWriter.Mode.CREATE)
.build()) {
SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
// Write 4 employees (same data as Spark test)
Group group1 = groupFactory.newGroup()
.append("id", 1)
.append("name", "Alice")
.append("department", "Engineering")
.append("salary", 100000);
writer.write(group1);
System.out.println("Wrote employee 1: Alice");
Group group2 = groupFactory.newGroup()
.append("id", 2)
.append("name", "Bob")
.append("department", "Sales")
.append("salary", 80000);
writer.write(group2);
System.out.println("Wrote employee 2: Bob");
Group group3 = groupFactory.newGroup()
.append("id", 3)
.append("name", "Charlie")
.append("department", "Engineering")
.append("salary", 120000);
writer.write(group3);
System.out.println("Wrote employee 3: Charlie");
Group group4 = groupFactory.newGroup()
.append("id", 4)
.append("name", "David")
.append("department", "Sales")
.append("salary", 75000);
writer.write(group4);
System.out.println("Wrote employee 4: David");
}
System.out.println("ParquetWriter closed successfully");
// Verify file exists and get size
long fileSize = fs.getFileStatus(new Path("seaweedfs://" + FILER_HOST + ":" + FILER_PORT + TEST_FILE)).getLen();
System.out.println("File written successfully. Size: " + fileSize + " bytes");
}
private void readParquetMetadata() throws IOException {
Path path = new Path("seaweedfs://" + FILER_HOST + ":" + FILER_PORT + TEST_FILE);
System.out.println("Opening file for metadata read...");
HadoopInputFile inputFile = HadoopInputFile.fromPath(path, conf);
System.out.println("Creating ParquetFileReader...");
try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) {
ParquetMetadata footer = reader.getFooter();
System.out.println("\n=== Parquet Footer Metadata ===");
System.out.println("Blocks (row groups): " + footer.getBlocks().size());
System.out.println("Schema: " + footer.getFileMetaData().getSchema());
footer.getBlocks().forEach(block -> {
System.out.println("\nRow Group:");
System.out.println(" Row count: " + block.getRowCount());
System.out.println(" Total byte size: " + block.getTotalByteSize());
System.out.println(" Columns: " + block.getColumns().size());
block.getColumns().forEach(column -> {
System.out.println(" Column: " + column.getPath());
System.out.println(" Data page offset: " + column.getFirstDataPageOffset());
System.out.println(" Dictionary page offset: " + column.getDictionaryPageOffset());
System.out.println(" Total size: " + column.getTotalSize());
System.out.println(" Total uncompressed size: " + column.getTotalUncompressedSize());
});
});
System.out.println("\nMetadata read completed successfully");
} catch (Exception e) {
System.err.println("\n!!! EXCEPTION in readParquetMetadata !!!");
System.err.println("Exception type: " + e.getClass().getName());
System.err.println("Message: " + e.getMessage());
e.printStackTrace();
throw e;
}
}
private void readParquetData() throws IOException {
Path path = new Path("seaweedfs://" + FILER_HOST + ":" + FILER_PORT + TEST_FILE);
System.out.println("Opening file for data read...");
HadoopInputFile inputFile = HadoopInputFile.fromPath(path, conf);
System.out.println("Creating ParquetFileReader...");
try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) {
System.out.println("Attempting to read row group...");
org.apache.parquet.hadoop.metadata.BlockMetaData block = reader.readNextRowGroup();
if (block != null) {
System.out.println("SUCCESS: Read row group with " + block.getRowCount() + " rows");
} else {
System.out.println("WARNING: No row groups found");
}
} catch (Exception e) {
System.err.println("\n!!! EXCEPTION in readParquetData !!!");
System.err.println("Exception type: " + e.getClass().getName());
System.err.println("Message: " + e.getMessage());
e.printStackTrace();
throw e;
}
System.out.println("ParquetFileReader closed successfully");
}
}
Loading…
Cancel
Save