From 221252d34e724a1fee179d41572e5f3a59d4b0c1 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 23 Nov 2025 12:43:08 -0800 Subject: [PATCH] fmt --- .../seaweedfs/client/SeaweedOutputStream.java | 4 +- .../java/seaweedfs/client/SeaweedRead.java | 28 +-- test/java/spark/EOF_EXCEPTION_ANALYSIS.md | 177 +++++++++++++++++ .../seaweed/spark/SparkSeaweedFSExample.java | 89 ++++----- .../test/java/seaweed/spark/SparkSQLTest.java | 180 +++++++++++------- .../java/seaweed/spark/SparkTestBase.java | 81 ++++---- weed/pb/grpc_client_server.go | 6 +- 7 files changed, 388 insertions(+), 177 deletions(-) create mode 100644 test/java/spark/EOF_EXCEPTION_ANALYSIS.md diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index c0c5863e5..8b51555b5 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -168,8 +168,8 @@ public class SeaweedOutputStream extends OutputStream { totalBytesWritten += length; if (path.contains("parquet")) { - LOG.info("[DEBUG-2024] ✍️ write({} bytes): totalSoFar={} position={} bufferPos={}, file={}", - length, totalBytesWritten, position, buffer.position(), + LOG.info("[DEBUG-2024] ✍️ write({} bytes): totalSoFar={} position={} bufferPos={}, file={}", + length, totalBytesWritten, position, buffer.position(), path.substring(path.lastIndexOf('/') + 1)); } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index dbade3408..e446891eb 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -24,7 +24,7 @@ public class SeaweedRead { // returns bytesRead public static long read(FilerClient filerClient, List visibleIntervals, - final long position, final ByteBuffer buf, final long fileSize) throws IOException { + final long position, final ByteBuffer buf, final long fileSize) throws IOException { List chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining()); @@ -51,7 +51,7 @@ public class SeaweedRead { } } - //TODO parallel this + // TODO parallel this long readCount = 0; long startOffset = position; for (ChunkView chunkView : chunkViews) { @@ -59,7 +59,7 @@ public class SeaweedRead { if (startOffset < chunkView.logicOffset) { long gap = chunkView.logicOffset - startOffset; LOG.debug("zero [{},{})", startOffset, startOffset + gap); - buf.position(buf.position()+ (int)gap); + buf.position(buf.position() + (int) gap); readCount += gap; startOffset += gap; } @@ -86,7 +86,7 @@ public class SeaweedRead { if (startOffset < limit) { long gap = limit - startOffset; LOG.debug("zero2 [{},{})", startOffset, startOffset + gap); - buf.position(buf.position()+ (int)gap); + buf.position(buf.position() + (int) gap); readCount += gap; startOffset += gap; } @@ -94,7 +94,8 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, + FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); @@ -105,13 +106,15 @@ public class SeaweedRead { int len = (int) chunkView.size - (int) (startOffset - chunkView.logicOffset); LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}", - chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset); + chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, + chunkView.logicOffset + chunkView.size, startOffset); buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len); return len; } - public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, + FilerProto.Locations locations) throws IOException { byte[] data = null; IOException lastException = null; @@ -214,8 +217,7 @@ public class SeaweedRead { chunkStart, isFullChunk, chunk.cipherKey, - chunk.isCompressed - )); + chunk.isCompressed)); } } return views; @@ -242,7 +244,7 @@ public class SeaweedRead { long chunksSize = totalSize(entry.getChunksList()); long attrSize = entry.getAttributes().getFileSize(); long finalSize = Math.max(chunksSize, attrSize); - LOG.info("Calculated file size: {} (chunks: {}, attr: {}, #chunks: {})", + LOG.info("Calculated file size: {} (chunks: {}, attr: {}, #chunks: {})", finalSize, chunksSize, attrSize, entry.getChunksCount()); return finalSize; } @@ -268,7 +270,8 @@ public class SeaweedRead { public final byte[] cipherKey; public final boolean isCompressed; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, + boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; @@ -302,7 +305,8 @@ public class SeaweedRead { public final byte[] cipherKey; public final boolean isCompressed; - public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { + public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, + boolean isCompressed) { this.fileId = fileId; this.offset = offset; this.size = size; diff --git a/test/java/spark/EOF_EXCEPTION_ANALYSIS.md b/test/java/spark/EOF_EXCEPTION_ANALYSIS.md new file mode 100644 index 000000000..a244b796c --- /dev/null +++ b/test/java/spark/EOF_EXCEPTION_ANALYSIS.md @@ -0,0 +1,177 @@ +# EOFException Analysis: "Still have: 78 bytes left" + +## Problem Summary + +Spark Parquet writes succeed, but subsequent reads fail with: +``` +java.io.EOFException: Reached the end of stream. Still have: 78 bytes left +``` + +## What the Logs Tell Us + +### Write Phase ✅ (Everything looks correct) + +**year=2020 file:** +``` +🔧 Created stream: position=0 bufferSize=1048576 +🔒 close START: position=0 buffer.position()=696 totalBytesWritten=696 +→ Submitted 696 bytes, new position=696 +✅ close END: finalPosition=696 totalBytesWritten=696 +Calculated file size: 696 (chunks: 696, attr: 696, #chunks: 1) +``` + +**year=2021 file:** +``` +🔧 Created stream: position=0 bufferSize=1048576 +🔒 close START: position=0 buffer.position()=684 totalBytesWritten=684 +→ Submitted 684 bytes, new position=684 +✅ close END: finalPosition=684 totalBytesWritten=684 +Calculated file size: 684 (chunks: 684, attr: 684, #chunks: 1) +``` + +**Key observations:** +- ✅ `totalBytesWritten == position == buffer == chunks == attr` +- ✅ All bytes received through `write()` are flushed and stored +- ✅ File metadata is consistent +- ✅ No bytes lost in SeaweedFS layer + +### Read Phase ❌ (Parquet expects more bytes) + +**Consistent pattern:** +- year=2020: wrote 696 bytes, **expects 774 bytes** → missing 78 +- year=2021: wrote 684 bytes, **expects 762 bytes** → missing 78 + +The **78-byte discrepancy is constant across both files**, suggesting it's not random data loss. + +## Hypotheses + +### H1: Parquet Footer Not Fully Written +Parquet file structure: +``` +[Magic "PAR1" 4B] [Data pages] [Footer] [Footer length 4B] [Magic "PAR1" 4B] +``` + +**Possible scenario:** +1. Parquet writes 684 bytes of data pages +2. Parquet **intends** to write 78 bytes of footer metadata +3. Our `SeaweedOutputStream.close()` is called +4. Only data pages (684 bytes) make it to the file +5. Footer (78 bytes) is lost or never written + +**Evidence for:** +- 78 bytes is a reasonable size for a Parquet footer with minimal metadata +- Files say "snappy.parquet" → compressed, so footer would be small +- Consistent 78-byte loss across files + +**Evidence against:** +- Our `close()` logs show all bytes received via `write()` were processed +- If Parquet wrote footer to stream, we'd see `totalBytesWritten=762` + +### H2: FSDataOutputStream Position Tracking Mismatch +Hadoop wraps our stream: +```java +new FSDataOutputStream(seaweedOutputStream, statistics) +``` + +**Possible scenario:** +1. Parquet writes 684 bytes → `FSDataOutputStream` increments position to 684 +2. Parquet writes 78-byte footer → `FSDataOutputStream` increments position to 762 +3. **BUT** only 684 bytes reach our `SeaweedOutputStream.write()` +4. Parquet queries `FSDataOutputStream.getPos()` → returns 762 +5. Parquet writes "file size: 762" in its footer +6. Actual file only has 684 bytes + +**Evidence for:** +- Would explain why our logs show 684 but Parquet expects 762 +- FSDataOutputStream might have its own buffering + +**Evidence against:** +- FSDataOutputStream is well-tested Hadoop core component +- Unlikely to lose bytes + +### H3: Race Condition During File Rename +Files are written to `_temporary/` then renamed to final location. + +**Possible scenario:** +1. Write completes successfully (684 bytes) +2. `close()` flushes and updates metadata +3. File is renamed while metadata is propagating +4. Read happens before metadata sync completes +5. Reader gets stale file size or incomplete footer + +**Evidence for:** +- Distributed systems often have eventual consistency issues +- Rename might not sync metadata immediately + +**Evidence against:** +- We added `fs.seaweed.write.flush.sync=true` to force sync +- Error is consistent, not intermittent + +### H4: Compression-Related Size Confusion +Files use Snappy compression (`*.snappy.parquet`). + +**Possible scenario:** +1. Parquet tracks uncompressed size internally +2. Writes compressed data to stream +3. Size mismatch between compressed file and uncompressed metadata + +**Evidence against:** +- Parquet handles compression internally and consistently +- Would affect all Parquet users, not just SeaweedFS + +## Next Debugging Steps + +### Added: getPos() Logging +```java +public synchronized long getPos() { + long currentPos = position + buffer.position(); + LOG.info("[DEBUG-2024] 📍 getPos() called: flushedPosition={} bufferPosition={} returning={}", + position, buffer.position(), currentPos); + return currentPos; +} +``` + +**Will reveal:** +- If/when Parquet queries position +- What value is returned vs what was actually written +- If FSDataOutputStream bypasses our position tracking + +### Next Steps if getPos() is NOT called: +→ Parquet is not using position tracking +→ Focus on footer write completion + +### Next Steps if getPos() returns 762 but we only wrote 684: +→ FSDataOutputStream has buffering issue or byte loss +→ Need to investigate Hadoop wrapper behavior + +### Next Steps if getPos() returns 684 (correct): +→ Issue is in footer metadata or read path +→ Need to examine Parquet footer contents + +## Parquet File Format Context + +Typical small Parquet file (~700 bytes): +``` +Offset Content +0-3 Magic "PAR1" +4-650 Row group data (compressed) +651-728 Footer metadata (schema, row group pointers) +729-732 Footer length (4 bytes, value: 78) +733-736 Magic "PAR1" +Total: 737 bytes +``` + +If footer length field says "78" but only data exists: +- File ends at byte 650 +- Footer starts at byte 651 (but doesn't exist) +- Reader tries to read 78 bytes, gets EOFException + +This matches our error pattern perfectly. + +## Recommended Fix Directions + +1. **Ensure footer is fully written before close returns** +2. **Add explicit fsync/hsync before metadata write** +3. **Verify FSDataOutputStream doesn't buffer separately** +4. **Check if Parquet needs special OutputStreamAdapter** + diff --git a/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java index 8919bfbdf..8a40f6071 100644 --- a/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java +++ b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java @@ -12,14 +12,14 @@ import org.apache.spark.sql.SparkSession; * * Example usage: * spark-submit \ - * --class seaweed.spark.SparkSeaweedFSExample \ - * --master local[2] \ - * --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \ - * --conf spark.hadoop.fs.seaweed.filer.host=localhost \ - * --conf spark.hadoop.fs.seaweed.filer.port=8888 \ - * --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \ - * target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \ - * seaweedfs://localhost:8888/output + * --class seaweed.spark.SparkSeaweedFSExample \ + * --master local[2] \ + * --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \ + * --conf spark.hadoop.fs.seaweed.filer.host=localhost \ + * --conf spark.hadoop.fs.seaweed.filer.port=8888 \ + * --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \ + * target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \ + * seaweedfs://localhost:8888/output */ public class SparkSeaweedFSExample { @@ -34,8 +34,8 @@ public class SparkSeaweedFSExample { // Create Spark session SparkSession spark = SparkSession.builder() - .appName("SeaweedFS Spark Example") - .getOrCreate(); + .appName("SeaweedFS Spark Example") + .getOrCreate(); try { System.out.println("=== SeaweedFS Spark Integration Example ===\n"); @@ -43,11 +43,10 @@ public class SparkSeaweedFSExample { // Example 1: Generate data and write to SeaweedFS System.out.println("1. Generating sample data..."); Dataset data = spark.range(0, 1000) - .selectExpr( - "id", - "id * 2 as doubled", - "CAST(rand() * 100 AS INT) as random_value" - ); + .selectExpr( + "id", + "id * 2 as doubled", + "CAST(rand() * 100 AS INT) as random_value"); System.out.println(" Generated " + data.count() + " rows"); data.show(5); @@ -56,10 +55,10 @@ public class SparkSeaweedFSExample { String parquetPath = outputPath + "/data.parquet"; System.out.println("\n2. Writing data to SeaweedFS as Parquet..."); System.out.println(" Path: " + parquetPath); - + data.write() - .mode(SaveMode.Overwrite) - .parquet(parquetPath); + .mode(SaveMode.Overwrite) + .parquet(parquetPath); System.out.println(" ✓ Write completed"); @@ -67,63 +66,60 @@ public class SparkSeaweedFSExample { System.out.println("\n3. Reading data back from SeaweedFS..."); Dataset readData = spark.read().parquet(parquetPath); System.out.println(" Read " + readData.count() + " rows"); - + // Perform aggregation System.out.println("\n4. Performing aggregation..."); Dataset stats = readData.selectExpr( - "COUNT(*) as count", - "AVG(random_value) as avg_random", - "MAX(doubled) as max_doubled" - ); - + "COUNT(*) as count", + "AVG(random_value) as avg_random", + "MAX(doubled) as max_doubled"); + stats.show(); // Write aggregation results String statsPath = outputPath + "/stats.parquet"; System.out.println("5. Writing stats to: " + statsPath); stats.write() - .mode(SaveMode.Overwrite) - .parquet(statsPath); + .mode(SaveMode.Overwrite) + .parquet(statsPath); // Create a partitioned dataset System.out.println("\n6. Creating partitioned dataset..."); Dataset partitionedData = data.selectExpr( - "*", - "CAST(id % 10 AS INT) as partition_key" - ); + "*", + "CAST(id % 10 AS INT) as partition_key"); String partitionedPath = outputPath + "/partitioned.parquet"; System.out.println(" Path: " + partitionedPath); - + partitionedData.write() - .mode(SaveMode.Overwrite) - .partitionBy("partition_key") - .parquet(partitionedPath); + .mode(SaveMode.Overwrite) + .partitionBy("partition_key") + .parquet(partitionedPath); System.out.println(" ✓ Partitioned write completed"); // Read specific partition System.out.println("\n7. Reading specific partition (partition_key=0)..."); Dataset partition0 = spark.read() - .parquet(partitionedPath) - .filter("partition_key = 0"); - + .parquet(partitionedPath) + .filter("partition_key = 0"); + System.out.println(" Partition 0 contains " + partition0.count() + " rows"); partition0.show(5); // SQL example System.out.println("\n8. Using Spark SQL..."); readData.createOrReplaceTempView("seaweedfs_data"); - + Dataset sqlResult = spark.sql( - "SELECT " + - " CAST(id / 100 AS INT) as bucket, " + - " COUNT(*) as count, " + - " AVG(random_value) as avg_random " + - "FROM seaweedfs_data " + - "GROUP BY CAST(id / 100 AS INT) " + - "ORDER BY bucket" - ); + "SELECT " + + " CAST(id / 100 AS INT) as bucket, " + + " COUNT(*) as count, " + + " AVG(random_value) as avg_random " + + "FROM seaweedfs_data " + + "GROUP BY CAST(id / 100 AS INT) " + + "ORDER BY bucket"); System.out.println(" Bucketed statistics:"); sqlResult.show(); @@ -140,6 +136,3 @@ public class SparkSeaweedFSExample { } } } - - - diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java index d0c01736a..231952023 100644 --- a/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java +++ b/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java @@ -21,11 +21,10 @@ public class SparkSQLTest extends SparkTestBase { // Create test data List employees = Arrays.asList( - new Employee(1, "Alice", "Engineering", 100000), - new Employee(2, "Bob", "Sales", 80000), - new Employee(3, "Charlie", "Engineering", 120000), - new Employee(4, "David", "Sales", 75000) - ); + new Employee(1, "Alice", "Engineering", 100000), + new Employee(2, "Bob", "Sales", 80000), + new Employee(3, "Charlie", "Engineering", 120000), + new Employee(4, "David", "Sales", 75000)); Dataset df = spark.createDataFrame(employees, Employee.class); @@ -39,15 +38,13 @@ public class SparkSQLTest extends SparkTestBase { // Run SQL queries Dataset engineeringEmployees = spark.sql( - "SELECT name, salary FROM employees WHERE department = 'Engineering'" - ); - + "SELECT name, salary FROM employees WHERE department = 'Engineering'"); + assertEquals(2, engineeringEmployees.count()); Dataset highPaidEmployees = spark.sql( - "SELECT name, salary FROM employees WHERE salary > 90000" - ); - + "SELECT name, salary FROM employees WHERE salary > 90000"); + assertEquals(2, highPaidEmployees.count()); } @@ -57,12 +54,11 @@ public class SparkSQLTest extends SparkTestBase { // Create sales data List sales = Arrays.asList( - new Sale("2024-01", "Product A", 100), - new Sale("2024-01", "Product B", 150), - new Sale("2024-02", "Product A", 120), - new Sale("2024-02", "Product B", 180), - new Sale("2024-03", "Product A", 110) - ); + new Sale("2024-01", "Product A", 100), + new Sale("2024-01", "Product B", 150), + new Sale("2024-02", "Product A", 120), + new Sale("2024-02", "Product B", 180), + new Sale("2024-03", "Product A", 110)); Dataset df = spark.createDataFrame(sales, Sale.class); @@ -76,8 +72,7 @@ public class SparkSQLTest extends SparkTestBase { // Aggregate query Dataset monthlySales = spark.sql( - "SELECT month, SUM(amount) as total FROM sales GROUP BY month ORDER BY month" - ); + "SELECT month, SUM(amount) as total FROM sales GROUP BY month ORDER BY month"); List results = monthlySales.collectAsList(); assertEquals(3, results.size()); @@ -91,15 +86,13 @@ public class SparkSQLTest extends SparkTestBase { // Create employee data List employees = Arrays.asList( - new Employee(1, "Alice", "Engineering", 100000), - new Employee(2, "Bob", "Sales", 80000) - ); + new Employee(1, "Alice", "Engineering", 100000), + new Employee(2, "Bob", "Sales", 80000)); // Create department data List departments = Arrays.asList( - new Department("Engineering", "Building Products"), - new Department("Sales", "Selling Products") - ); + new Department("Engineering", "Building Products"), + new Department("Sales", "Selling Products")); Dataset empDf = spark.createDataFrame(employees, Employee.class); Dataset deptDf = spark.createDataFrame(departments, Department.class); @@ -107,7 +100,7 @@ public class SparkSQLTest extends SparkTestBase { // Write to SeaweedFS String empPath = getTestPath("employees_join"); String deptPath = getTestPath("departments_join"); - + empDf.write().mode(SaveMode.Overwrite).parquet(empPath); deptDf.write().mode(SaveMode.Overwrite).parquet(deptPath); @@ -117,16 +110,14 @@ public class SparkSQLTest extends SparkTestBase { // Join query Dataset joined = spark.sql( - "SELECT e.name, e.salary, d.description " + - "FROM emp e JOIN dept d ON e.department = d.name" - ); + "SELECT e.name, e.salary, d.description " + + "FROM emp e JOIN dept d ON e.department = d.name"); assertEquals(2, joined.count()); - + List results = joined.collectAsList(); - assertTrue(results.stream().anyMatch(r -> - "Alice".equals(r.getString(0)) && "Building Products".equals(r.getString(2)) - )); + assertTrue(results.stream() + .anyMatch(r -> "Alice".equals(r.getString(0)) && "Building Products".equals(r.getString(2)))); } @Test @@ -135,11 +126,10 @@ public class SparkSQLTest extends SparkTestBase { // Create employee data with salaries List employees = Arrays.asList( - new Employee(1, "Alice", "Engineering", 100000), - new Employee(2, "Bob", "Engineering", 120000), - new Employee(3, "Charlie", "Sales", 80000), - new Employee(4, "David", "Sales", 90000) - ); + new Employee(1, "Alice", "Engineering", 100000), + new Employee(2, "Bob", "Engineering", 120000), + new Employee(3, "Charlie", "Sales", 80000), + new Employee(4, "David", "Sales", 90000)); Dataset df = spark.createDataFrame(employees, Employee.class); @@ -151,20 +141,19 @@ public class SparkSQLTest extends SparkTestBase { // Window function query - rank employees by salary within department Dataset ranked = spark.sql( - "SELECT name, department, salary, " + - "RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank " + - "FROM employees_ranked" - ); + "SELECT name, department, salary, " + + "RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank " + + "FROM employees_ranked"); assertEquals(4, ranked.count()); - + // Verify Bob has rank 1 in Engineering (highest salary) List results = ranked.collectAsList(); Row bobRow = results.stream() - .filter(r -> "Bob".equals(r.getString(0))) - .findFirst() - .orElse(null); - + .filter(r -> "Bob".equals(r.getString(0))) + .findFirst() + .orElse(null); + assertNotNull(bobRow); assertEquals(1, bobRow.getInt(3)); } @@ -176,7 +165,8 @@ public class SparkSQLTest extends SparkTestBase { private String department; private int salary; - public Employee() {} + public Employee() { + } public Employee(int id, String name, String department, int salary) { this.id = id; @@ -185,14 +175,37 @@ public class SparkSQLTest extends SparkTestBase { this.salary = salary; } - public int getId() { return id; } - public void setId(int id) { this.id = id; } - public String getName() { return name; } - public void setName(String name) { this.name = name; } - public String getDepartment() { return department; } - public void setDepartment(String department) { this.department = department; } - public int getSalary() { return salary; } - public void setSalary(int salary) { this.salary = salary; } + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDepartment() { + return department; + } + + public void setDepartment(String department) { + this.department = department; + } + + public int getSalary() { + return salary; + } + + public void setSalary(int salary) { + this.salary = salary; + } } public static class Sale implements java.io.Serializable { @@ -200,7 +213,8 @@ public class SparkSQLTest extends SparkTestBase { private String product; private int amount; - public Sale() {} + public Sale() { + } public Sale(String month, String product, int amount) { this.month = month; @@ -208,31 +222,57 @@ public class SparkSQLTest extends SparkTestBase { this.amount = amount; } - public String getMonth() { return month; } - public void setMonth(String month) { this.month = month; } - public String getProduct() { return product; } - public void setProduct(String product) { this.product = product; } - public int getAmount() { return amount; } - public void setAmount(int amount) { this.amount = amount; } + public String getMonth() { + return month; + } + + public void setMonth(String month) { + this.month = month; + } + + public String getProduct() { + return product; + } + + public void setProduct(String product) { + this.product = product; + } + + public int getAmount() { + return amount; + } + + public void setAmount(int amount) { + this.amount = amount; + } } public static class Department implements java.io.Serializable { private String name; private String description; - public Department() {} + public Department() { + } public Department(String name, String description) { this.name = name; this.description = description; } - public String getName() { return name; } - public void setName(String name) { this.name = name; } - public String getDescription() { return description; } - public void setDescription(String description) { this.description = description; } - } -} + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + public String getDescription() { + return description; + } + public void setDescription(String description) { + this.description = description; + } + } +} diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java b/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java index fbc858c10..5b17e6f2d 100644 --- a/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java +++ b/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java @@ -18,16 +18,13 @@ public abstract class SparkTestBase { protected SparkSession spark; protected static final String TEST_ROOT = "/test-spark"; - protected static final boolean TESTS_ENABLED = - "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); - + protected static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); + // SeaweedFS connection settings - protected static final String SEAWEEDFS_HOST = - System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost"); - protected static final String SEAWEEDFS_PORT = - System.getenv().getOrDefault("SEAWEEDFS_FILER_PORT", "8888"); - protected static final String SEAWEEDFS_GRPC_PORT = - System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888"); + protected static final String SEAWEEDFS_HOST = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost"); + protected static final String SEAWEEDFS_PORT = System.getenv().getOrDefault("SEAWEEDFS_FILER_PORT", "8888"); + protected static final String SEAWEEDFS_GRPC_PORT = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", + "18888"); @Before public void setUpSpark() throws IOException { @@ -36,39 +33,40 @@ public abstract class SparkTestBase { } SparkConf sparkConf = new SparkConf() - .setAppName("SeaweedFS Integration Test") - .setMaster("local[1]") // Single thread to avoid concurrent gRPC issues - .set("spark.driver.host", "localhost") - .set("spark.sql.warehouse.dir", getSeaweedFSPath("/spark-warehouse")) - // SeaweedFS configuration - .set("spark.hadoop.fs.defaultFS", String.format("seaweedfs://%s:%s", SEAWEEDFS_HOST, SEAWEEDFS_PORT)) - .set("spark.hadoop.fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem") - .set("spark.hadoop.fs.seaweed.impl", "seaweed.hdfs.SeaweedFileSystem") - .set("spark.hadoop.fs.seaweed.filer.host", SEAWEEDFS_HOST) - .set("spark.hadoop.fs.seaweed.filer.port", SEAWEEDFS_PORT) - .set("spark.hadoop.fs.seaweed.filer.port.grpc", SEAWEEDFS_GRPC_PORT) - .set("spark.hadoop.fs.AbstractFileSystem.seaweedfs.impl", "seaweed.hdfs.SeaweedAbstractFileSystem") - // Set replication to empty string to use filer default - .set("spark.hadoop.fs.seaweed.replication", "") - // Smaller buffer to reduce load - .set("spark.hadoop.fs.seaweed.buffer.size", "1048576") // 1MB - // Reduce parallelism - .set("spark.default.parallelism", "1") - .set("spark.sql.shuffle.partitions", "1") - // Simpler output committer - .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") - .set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") - // Disable speculative execution to reduce load - .set("spark.speculation", "false") - // Increase task retry to handle transient consistency issues - .set("spark.task.maxFailures", "4") - // Wait longer before retrying failed tasks - .set("spark.task.reaper.enabled", "true") - .set("spark.task.reaper.pollingInterval", "1s"); + .setAppName("SeaweedFS Integration Test") + .setMaster("local[1]") // Single thread to avoid concurrent gRPC issues + .set("spark.driver.host", "localhost") + .set("spark.sql.warehouse.dir", getSeaweedFSPath("/spark-warehouse")) + // SeaweedFS configuration + .set("spark.hadoop.fs.defaultFS", String.format("seaweedfs://%s:%s", SEAWEEDFS_HOST, SEAWEEDFS_PORT)) + .set("spark.hadoop.fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem") + .set("spark.hadoop.fs.seaweed.impl", "seaweed.hdfs.SeaweedFileSystem") + .set("spark.hadoop.fs.seaweed.filer.host", SEAWEEDFS_HOST) + .set("spark.hadoop.fs.seaweed.filer.port", SEAWEEDFS_PORT) + .set("spark.hadoop.fs.seaweed.filer.port.grpc", SEAWEEDFS_GRPC_PORT) + .set("spark.hadoop.fs.AbstractFileSystem.seaweedfs.impl", "seaweed.hdfs.SeaweedAbstractFileSystem") + // Set replication to empty string to use filer default + .set("spark.hadoop.fs.seaweed.replication", "") + // Smaller buffer to reduce load + .set("spark.hadoop.fs.seaweed.buffer.size", "1048576") // 1MB + // Reduce parallelism + .set("spark.default.parallelism", "1") + .set("spark.sql.shuffle.partitions", "1") + // Simpler output committer + .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") + .set("spark.sql.sources.commitProtocolClass", + "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") + // Disable speculative execution to reduce load + .set("spark.speculation", "false") + // Increase task retry to handle transient consistency issues + .set("spark.task.maxFailures", "4") + // Wait longer before retrying failed tasks + .set("spark.task.reaper.enabled", "true") + .set("spark.task.reaper.pollingInterval", "1s"); spark = SparkSession.builder() - .config(sparkConf) - .getOrCreate(); + .config(sparkConf) + .getOrCreate(); // Clean up test directory cleanupTestDirectory(); @@ -108,7 +106,7 @@ public abstract class SparkTestBase { try { Configuration conf = spark.sparkContext().hadoopConfiguration(); org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get( - java.net.URI.create(getSeaweedFSPath("/")), conf); + java.net.URI.create(getSeaweedFSPath("/")), conf); org.apache.hadoop.fs.Path testPath = new org.apache.hadoop.fs.Path(TEST_ROOT); if (fs.exists(testPath)) { fs.delete(testPath, true); @@ -128,4 +126,3 @@ public abstract class SparkTestBase { } } } - diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index a9b721730..9caf1f511 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -62,10 +62,10 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server { }), grpc.MaxRecvMsgSize(Max_Message_Size), grpc.MaxSendMsgSize(Max_Message_Size), - grpc.MaxConcurrentStreams(1000), // Allow more concurrent streams - grpc.InitialWindowSize(16*1024*1024), // 16MB initial window + grpc.MaxConcurrentStreams(1000), // Allow more concurrent streams + grpc.InitialWindowSize(16*1024*1024), // 16MB initial window grpc.InitialConnWindowSize(16*1024*1024), // 16MB connection window - grpc.MaxHeaderListSize(8*1024*1024), // 8MB header list limit + grpc.MaxHeaderListSize(8*1024*1024), // 8MB header list limit grpc.UnaryInterceptor(requestIDUnaryInterceptor()), ) for _, opt := range opts {