Browse Source

fmt

pull/7526/head
chrislu 6 days ago
parent
commit
221252d34e
  1. 24
      other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
  2. 177
      test/java/spark/EOF_EXCEPTION_ANALYSIS.md
  3. 15
      test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java
  4. 130
      test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java
  5. 17
      test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java

24
other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java

@ -51,7 +51,7 @@ public class SeaweedRead {
} }
} }
//TODO parallel this
// TODO parallel this
long readCount = 0; long readCount = 0;
long startOffset = position; long startOffset = position;
for (ChunkView chunkView : chunkViews) { for (ChunkView chunkView : chunkViews) {
@ -59,7 +59,7 @@ public class SeaweedRead {
if (startOffset < chunkView.logicOffset) { if (startOffset < chunkView.logicOffset) {
long gap = chunkView.logicOffset - startOffset; long gap = chunkView.logicOffset - startOffset;
LOG.debug("zero [{},{})", startOffset, startOffset + gap); LOG.debug("zero [{},{})", startOffset, startOffset + gap);
buf.position(buf.position()+ (int)gap);
buf.position(buf.position() + (int) gap);
readCount += gap; readCount += gap;
startOffset += gap; startOffset += gap;
} }
@ -86,7 +86,7 @@ public class SeaweedRead {
if (startOffset < limit) { if (startOffset < limit) {
long gap = limit - startOffset; long gap = limit - startOffset;
LOG.debug("zero2 [{},{})", startOffset, startOffset + gap); LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
buf.position(buf.position()+ (int)gap);
buf.position(buf.position() + (int) gap);
readCount += gap; readCount += gap;
startOffset += gap; startOffset += gap;
} }
@ -94,7 +94,8 @@ public class SeaweedRead {
return readCount; return readCount;
} }
private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView,
FilerProto.Locations locations) throws IOException {
byte[] chunkData = chunkCache.getChunk(chunkView.fileId); byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
@ -105,13 +106,15 @@ public class SeaweedRead {
int len = (int) chunkView.size - (int) (startOffset - chunkView.logicOffset); int len = (int) chunkView.size - (int) (startOffset - chunkView.logicOffset);
LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}", LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}",
chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset);
chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset,
chunkView.logicOffset + chunkView.size, startOffset);
buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len); buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len);
return len; return len;
} }
public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView,
FilerProto.Locations locations) throws IOException {
byte[] data = null; byte[] data = null;
IOException lastException = null; IOException lastException = null;
@ -214,8 +217,7 @@ public class SeaweedRead {
chunkStart, chunkStart,
isFullChunk, isFullChunk,
chunk.cipherKey, chunk.cipherKey,
chunk.isCompressed
));
chunk.isCompressed));
} }
} }
return views; return views;
@ -268,7 +270,8 @@ public class SeaweedRead {
public final byte[] cipherKey; public final byte[] cipherKey;
public final boolean isCompressed; public final boolean isCompressed;
public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset,
boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start; this.start = start;
this.stop = stop; this.stop = stop;
this.modifiedTime = modifiedTime; this.modifiedTime = modifiedTime;
@ -302,7 +305,8 @@ public class SeaweedRead {
public final byte[] cipherKey; public final byte[] cipherKey;
public final boolean isCompressed; public final boolean isCompressed;
public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey,
boolean isCompressed) {
this.fileId = fileId; this.fileId = fileId;
this.offset = offset; this.offset = offset;
this.size = size; this.size = size;

177
test/java/spark/EOF_EXCEPTION_ANALYSIS.md

@ -0,0 +1,177 @@
# EOFException Analysis: "Still have: 78 bytes left"
## Problem Summary
Spark Parquet writes succeed, but subsequent reads fail with:
```
java.io.EOFException: Reached the end of stream. Still have: 78 bytes left
```
## What the Logs Tell Us
### Write Phase ✅ (Everything looks correct)
**year=2020 file:**
```
🔧 Created stream: position=0 bufferSize=1048576
🔒 close START: position=0 buffer.position()=696 totalBytesWritten=696
→ Submitted 696 bytes, new position=696
✅ close END: finalPosition=696 totalBytesWritten=696
Calculated file size: 696 (chunks: 696, attr: 696, #chunks: 1)
```
**year=2021 file:**
```
🔧 Created stream: position=0 bufferSize=1048576
🔒 close START: position=0 buffer.position()=684 totalBytesWritten=684
→ Submitted 684 bytes, new position=684
✅ close END: finalPosition=684 totalBytesWritten=684
Calculated file size: 684 (chunks: 684, attr: 684, #chunks: 1)
```
**Key observations:**
- ✅ `totalBytesWritten == position == buffer == chunks == attr`
- ✅ All bytes received through `write()` are flushed and stored
- ✅ File metadata is consistent
- ✅ No bytes lost in SeaweedFS layer
### Read Phase ❌ (Parquet expects more bytes)
**Consistent pattern:**
- year=2020: wrote 696 bytes, **expects 774 bytes** → missing 78
- year=2021: wrote 684 bytes, **expects 762 bytes** → missing 78
The **78-byte discrepancy is constant across both files**, suggesting it's not random data loss.
## Hypotheses
### H1: Parquet Footer Not Fully Written
Parquet file structure:
```
[Magic "PAR1" 4B] [Data pages] [Footer] [Footer length 4B] [Magic "PAR1" 4B]
```
**Possible scenario:**
1. Parquet writes 684 bytes of data pages
2. Parquet **intends** to write 78 bytes of footer metadata
3. Our `SeaweedOutputStream.close()` is called
4. Only data pages (684 bytes) make it to the file
5. Footer (78 bytes) is lost or never written
**Evidence for:**
- 78 bytes is a reasonable size for a Parquet footer with minimal metadata
- Files say "snappy.parquet" → compressed, so footer would be small
- Consistent 78-byte loss across files
**Evidence against:**
- Our `close()` logs show all bytes received via `write()` were processed
- If Parquet wrote footer to stream, we'd see `totalBytesWritten=762`
### H2: FSDataOutputStream Position Tracking Mismatch
Hadoop wraps our stream:
```java
new FSDataOutputStream(seaweedOutputStream, statistics)
```
**Possible scenario:**
1. Parquet writes 684 bytes → `FSDataOutputStream` increments position to 684
2. Parquet writes 78-byte footer → `FSDataOutputStream` increments position to 762
3. **BUT** only 684 bytes reach our `SeaweedOutputStream.write()`
4. Parquet queries `FSDataOutputStream.getPos()` → returns 762
5. Parquet writes "file size: 762" in its footer
6. Actual file only has 684 bytes
**Evidence for:**
- Would explain why our logs show 684 but Parquet expects 762
- FSDataOutputStream might have its own buffering
**Evidence against:**
- FSDataOutputStream is well-tested Hadoop core component
- Unlikely to lose bytes
### H3: Race Condition During File Rename
Files are written to `_temporary/` then renamed to final location.
**Possible scenario:**
1. Write completes successfully (684 bytes)
2. `close()` flushes and updates metadata
3. File is renamed while metadata is propagating
4. Read happens before metadata sync completes
5. Reader gets stale file size or incomplete footer
**Evidence for:**
- Distributed systems often have eventual consistency issues
- Rename might not sync metadata immediately
**Evidence against:**
- We added `fs.seaweed.write.flush.sync=true` to force sync
- Error is consistent, not intermittent
### H4: Compression-Related Size Confusion
Files use Snappy compression (`*.snappy.parquet`).
**Possible scenario:**
1. Parquet tracks uncompressed size internally
2. Writes compressed data to stream
3. Size mismatch between compressed file and uncompressed metadata
**Evidence against:**
- Parquet handles compression internally and consistently
- Would affect all Parquet users, not just SeaweedFS
## Next Debugging Steps
### Added: getPos() Logging
```java
public synchronized long getPos() {
long currentPos = position + buffer.position();
LOG.info("[DEBUG-2024] 📍 getPos() called: flushedPosition={} bufferPosition={} returning={}",
position, buffer.position(), currentPos);
return currentPos;
}
```
**Will reveal:**
- If/when Parquet queries position
- What value is returned vs what was actually written
- If FSDataOutputStream bypasses our position tracking
### Next Steps if getPos() is NOT called:
→ Parquet is not using position tracking
→ Focus on footer write completion
### Next Steps if getPos() returns 762 but we only wrote 684:
→ FSDataOutputStream has buffering issue or byte loss
→ Need to investigate Hadoop wrapper behavior
### Next Steps if getPos() returns 684 (correct):
→ Issue is in footer metadata or read path
→ Need to examine Parquet footer contents
## Parquet File Format Context
Typical small Parquet file (~700 bytes):
```
Offset Content
0-3 Magic "PAR1"
4-650 Row group data (compressed)
651-728 Footer metadata (schema, row group pointers)
729-732 Footer length (4 bytes, value: 78)
733-736 Magic "PAR1"
Total: 737 bytes
```
If footer length field says "78" but only data exists:
- File ends at byte 650
- Footer starts at byte 651 (but doesn't exist)
- Reader tries to read 78 bytes, gets EOFException
This matches our error pattern perfectly.
## Recommended Fix Directions
1. **Ensure footer is fully written before close returns**
2. **Add explicit fsync/hsync before metadata write**
3. **Verify FSDataOutputStream doesn't buffer separately**
4. **Check if Parquet needs special OutputStreamAdapter**

15
test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java

@ -46,8 +46,7 @@ public class SparkSeaweedFSExample {
.selectExpr( .selectExpr(
"id", "id",
"id * 2 as doubled", "id * 2 as doubled",
"CAST(rand() * 100 AS INT) as random_value"
);
"CAST(rand() * 100 AS INT) as random_value");
System.out.println(" Generated " + data.count() + " rows"); System.out.println(" Generated " + data.count() + " rows");
data.show(5); data.show(5);
@ -73,8 +72,7 @@ public class SparkSeaweedFSExample {
Dataset<Row> stats = readData.selectExpr( Dataset<Row> stats = readData.selectExpr(
"COUNT(*) as count", "COUNT(*) as count",
"AVG(random_value) as avg_random", "AVG(random_value) as avg_random",
"MAX(doubled) as max_doubled"
);
"MAX(doubled) as max_doubled");
stats.show(); stats.show();
@ -89,8 +87,7 @@ public class SparkSeaweedFSExample {
System.out.println("\n6. Creating partitioned dataset..."); System.out.println("\n6. Creating partitioned dataset...");
Dataset<Row> partitionedData = data.selectExpr( Dataset<Row> partitionedData = data.selectExpr(
"*", "*",
"CAST(id % 10 AS INT) as partition_key"
);
"CAST(id % 10 AS INT) as partition_key");
String partitionedPath = outputPath + "/partitioned.parquet"; String partitionedPath = outputPath + "/partitioned.parquet";
System.out.println(" Path: " + partitionedPath); System.out.println(" Path: " + partitionedPath);
@ -122,8 +119,7 @@ public class SparkSeaweedFSExample {
" AVG(random_value) as avg_random " + " AVG(random_value) as avg_random " +
"FROM seaweedfs_data " + "FROM seaweedfs_data " +
"GROUP BY CAST(id / 100 AS INT) " + "GROUP BY CAST(id / 100 AS INT) " +
"ORDER BY bucket"
);
"ORDER BY bucket");
System.out.println(" Bucketed statistics:"); System.out.println(" Bucketed statistics:");
sqlResult.show(); sqlResult.show();
@ -140,6 +136,3 @@ public class SparkSeaweedFSExample {
} }
} }
} }

130
test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java

@ -24,8 +24,7 @@ public class SparkSQLTest extends SparkTestBase {
new Employee(1, "Alice", "Engineering", 100000), new Employee(1, "Alice", "Engineering", 100000),
new Employee(2, "Bob", "Sales", 80000), new Employee(2, "Bob", "Sales", 80000),
new Employee(3, "Charlie", "Engineering", 120000), new Employee(3, "Charlie", "Engineering", 120000),
new Employee(4, "David", "Sales", 75000)
);
new Employee(4, "David", "Sales", 75000));
Dataset<Row> df = spark.createDataFrame(employees, Employee.class); Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
@ -39,14 +38,12 @@ public class SparkSQLTest extends SparkTestBase {
// Run SQL queries // Run SQL queries
Dataset<Row> engineeringEmployees = spark.sql( Dataset<Row> engineeringEmployees = spark.sql(
"SELECT name, salary FROM employees WHERE department = 'Engineering'"
);
"SELECT name, salary FROM employees WHERE department = 'Engineering'");
assertEquals(2, engineeringEmployees.count()); assertEquals(2, engineeringEmployees.count());
Dataset<Row> highPaidEmployees = spark.sql( Dataset<Row> highPaidEmployees = spark.sql(
"SELECT name, salary FROM employees WHERE salary > 90000"
);
"SELECT name, salary FROM employees WHERE salary > 90000");
assertEquals(2, highPaidEmployees.count()); assertEquals(2, highPaidEmployees.count());
} }
@ -61,8 +58,7 @@ public class SparkSQLTest extends SparkTestBase {
new Sale("2024-01", "Product B", 150), new Sale("2024-01", "Product B", 150),
new Sale("2024-02", "Product A", 120), new Sale("2024-02", "Product A", 120),
new Sale("2024-02", "Product B", 180), new Sale("2024-02", "Product B", 180),
new Sale("2024-03", "Product A", 110)
);
new Sale("2024-03", "Product A", 110));
Dataset<Row> df = spark.createDataFrame(sales, Sale.class); Dataset<Row> df = spark.createDataFrame(sales, Sale.class);
@ -76,8 +72,7 @@ public class SparkSQLTest extends SparkTestBase {
// Aggregate query // Aggregate query
Dataset<Row> monthlySales = spark.sql( Dataset<Row> monthlySales = spark.sql(
"SELECT month, SUM(amount) as total FROM sales GROUP BY month ORDER BY month"
);
"SELECT month, SUM(amount) as total FROM sales GROUP BY month ORDER BY month");
List<Row> results = monthlySales.collectAsList(); List<Row> results = monthlySales.collectAsList();
assertEquals(3, results.size()); assertEquals(3, results.size());
@ -92,14 +87,12 @@ public class SparkSQLTest extends SparkTestBase {
// Create employee data // Create employee data
List<Employee> employees = Arrays.asList( List<Employee> employees = Arrays.asList(
new Employee(1, "Alice", "Engineering", 100000), new Employee(1, "Alice", "Engineering", 100000),
new Employee(2, "Bob", "Sales", 80000)
);
new Employee(2, "Bob", "Sales", 80000));
// Create department data // Create department data
List<Department> departments = Arrays.asList( List<Department> departments = Arrays.asList(
new Department("Engineering", "Building Products"), new Department("Engineering", "Building Products"),
new Department("Sales", "Selling Products")
);
new Department("Sales", "Selling Products"));
Dataset<Row> empDf = spark.createDataFrame(employees, Employee.class); Dataset<Row> empDf = spark.createDataFrame(employees, Employee.class);
Dataset<Row> deptDf = spark.createDataFrame(departments, Department.class); Dataset<Row> deptDf = spark.createDataFrame(departments, Department.class);
@ -118,15 +111,13 @@ public class SparkSQLTest extends SparkTestBase {
// Join query // Join query
Dataset<Row> joined = spark.sql( Dataset<Row> joined = spark.sql(
"SELECT e.name, e.salary, d.description " + "SELECT e.name, e.salary, d.description " +
"FROM emp e JOIN dept d ON e.department = d.name"
);
"FROM emp e JOIN dept d ON e.department = d.name");
assertEquals(2, joined.count()); assertEquals(2, joined.count());
List<Row> results = joined.collectAsList(); List<Row> results = joined.collectAsList();
assertTrue(results.stream().anyMatch(r ->
"Alice".equals(r.getString(0)) && "Building Products".equals(r.getString(2))
));
assertTrue(results.stream()
.anyMatch(r -> "Alice".equals(r.getString(0)) && "Building Products".equals(r.getString(2))));
} }
@Test @Test
@ -138,8 +129,7 @@ public class SparkSQLTest extends SparkTestBase {
new Employee(1, "Alice", "Engineering", 100000), new Employee(1, "Alice", "Engineering", 100000),
new Employee(2, "Bob", "Engineering", 120000), new Employee(2, "Bob", "Engineering", 120000),
new Employee(3, "Charlie", "Sales", 80000), new Employee(3, "Charlie", "Sales", 80000),
new Employee(4, "David", "Sales", 90000)
);
new Employee(4, "David", "Sales", 90000));
Dataset<Row> df = spark.createDataFrame(employees, Employee.class); Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
@ -153,8 +143,7 @@ public class SparkSQLTest extends SparkTestBase {
Dataset<Row> ranked = spark.sql( Dataset<Row> ranked = spark.sql(
"SELECT name, department, salary, " + "SELECT name, department, salary, " +
"RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank " + "RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank " +
"FROM employees_ranked"
);
"FROM employees_ranked");
assertEquals(4, ranked.count()); assertEquals(4, ranked.count());
@ -176,7 +165,8 @@ public class SparkSQLTest extends SparkTestBase {
private String department; private String department;
private int salary; private int salary;
public Employee() {}
public Employee() {
}
public Employee(int id, String name, String department, int salary) { public Employee(int id, String name, String department, int salary) {
this.id = id; this.id = id;
@ -185,14 +175,37 @@ public class SparkSQLTest extends SparkTestBase {
this.salary = salary; this.salary = salary;
} }
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDepartment() { return department; }
public void setDepartment(String department) { this.department = department; }
public int getSalary() { return salary; }
public void setSalary(int salary) { this.salary = salary; }
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDepartment() {
return department;
}
public void setDepartment(String department) {
this.department = department;
}
public int getSalary() {
return salary;
}
public void setSalary(int salary) {
this.salary = salary;
}
} }
public static class Sale implements java.io.Serializable { public static class Sale implements java.io.Serializable {
@ -200,7 +213,8 @@ public class SparkSQLTest extends SparkTestBase {
private String product; private String product;
private int amount; private int amount;
public Sale() {}
public Sale() {
}
public Sale(String month, String product, int amount) { public Sale(String month, String product, int amount) {
this.month = month; this.month = month;
@ -208,31 +222,57 @@ public class SparkSQLTest extends SparkTestBase {
this.amount = amount; this.amount = amount;
} }
public String getMonth() { return month; }
public void setMonth(String month) { this.month = month; }
public String getProduct() { return product; }
public void setProduct(String product) { this.product = product; }
public int getAmount() { return amount; }
public void setAmount(int amount) { this.amount = amount; }
public String getMonth() {
return month;
}
public void setMonth(String month) {
this.month = month;
}
public String getProduct() {
return product;
}
public void setProduct(String product) {
this.product = product;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
} }
public static class Department implements java.io.Serializable { public static class Department implements java.io.Serializable {
private String name; private String name;
private String description; private String description;
public Department() {}
public Department() {
}
public Department(String name, String description) { public Department(String name, String description) {
this.name = name; this.name = name;
this.description = description; this.description = description;
} }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
public String getName() {
return name;
} }
}
public void setName(String name) {
this.name = name;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
}
}

17
test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java

@ -18,16 +18,13 @@ public abstract class SparkTestBase {
protected SparkSession spark; protected SparkSession spark;
protected static final String TEST_ROOT = "/test-spark"; protected static final String TEST_ROOT = "/test-spark";
protected static final boolean TESTS_ENABLED =
"true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
protected static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
// SeaweedFS connection settings // SeaweedFS connection settings
protected static final String SEAWEEDFS_HOST =
System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost");
protected static final String SEAWEEDFS_PORT =
System.getenv().getOrDefault("SEAWEEDFS_FILER_PORT", "8888");
protected static final String SEAWEEDFS_GRPC_PORT =
System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888");
protected static final String SEAWEEDFS_HOST = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost");
protected static final String SEAWEEDFS_PORT = System.getenv().getOrDefault("SEAWEEDFS_FILER_PORT", "8888");
protected static final String SEAWEEDFS_GRPC_PORT = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT",
"18888");
@Before @Before
public void setUpSpark() throws IOException { public void setUpSpark() throws IOException {
@ -57,7 +54,8 @@ public abstract class SparkTestBase {
.set("spark.sql.shuffle.partitions", "1") .set("spark.sql.shuffle.partitions", "1")
// Simpler output committer // Simpler output committer
.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
.set("spark.sql.sources.commitProtocolClass",
"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
// Disable speculative execution to reduce load // Disable speculative execution to reduce load
.set("spark.speculation", "false") .set("spark.speculation", "false")
// Increase task retry to handle transient consistency issues // Increase task retry to handle transient consistency issues
@ -128,4 +126,3 @@ public abstract class SparkTestBase {
} }
} }
} }
Loading…
Cancel
Save