From 32118a82bc4bf6872b4668a807e2671702f30c81 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 24 Nov 2025 21:24:57 -0800 Subject: [PATCH] tests not needed now --- .../seaweed/spark/DirectFileReadTest.java | 72 ---- .../spark/ParquetMemoryComparisonTest.java | 299 --------------- .../ShadowVsLocalOnlyComparisonTest.java | 214 ----------- .../spark/SparkSQLReadDifferenceTest.java | 264 -------------- .../spark/SparkShadowComparisonTest.java | 306 ---------------- .../spark/SparkShadowReadComparisonTest.java | 343 ------------------ 6 files changed, 1498 deletions(-) delete mode 100644 test/java/spark/src/test/java/seaweed/spark/DirectFileReadTest.java delete mode 100644 test/java/spark/src/test/java/seaweed/spark/ParquetMemoryComparisonTest.java delete mode 100644 test/java/spark/src/test/java/seaweed/spark/ShadowVsLocalOnlyComparisonTest.java delete mode 100644 test/java/spark/src/test/java/seaweed/spark/SparkSQLReadDifferenceTest.java delete mode 100644 test/java/spark/src/test/java/seaweed/spark/SparkShadowComparisonTest.java delete mode 100644 test/java/spark/src/test/java/seaweed/spark/SparkShadowReadComparisonTest.java diff --git a/test/java/spark/src/test/java/seaweed/spark/DirectFileReadTest.java b/test/java/spark/src/test/java/seaweed/spark/DirectFileReadTest.java deleted file mode 100644 index 96c778f05..000000000 --- a/test/java/spark/src/test/java/seaweed/spark/DirectFileReadTest.java +++ /dev/null @@ -1,72 +0,0 @@ -package seaweed.spark; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.junit.Test; - -import static org.junit.Assert.*; - -/** - * Test reading LOCAL_ONLY files directly via file:// protocol - * to verify the files themselves are valid. - */ -public class DirectFileReadTest extends SparkTestBase { - - @Test - public void testReadLocalOnlyFileDirectly() { - skipIfTestsDisabled(); - - // First write using LOCAL_ONLY mode (through SeaweedFS path) - java.util.List employees = java.util.Arrays.asList( - new SparkSQLTest.Employee(1, "Alice", "Engineering", 100000), - new SparkSQLTest.Employee(2, "Bob", "Sales", 80000), - new SparkSQLTest.Employee(3, "Charlie", "Engineering", 120000), - new SparkSQLTest.Employee(4, "David", "Sales", 75000)); - - Dataset df = spark.createDataFrame(employees, SparkSQLTest.Employee.class); - - String tablePath = getTestPath("employees_direct_test"); - df.write().mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(tablePath); - - System.out.println("āœ… Write completed to: " + tablePath); - - // Now try to read the LOCAL_ONLY .debug file directly using file:// protocol - // This bypasses LocalOnlyInputStream and uses native file system - String debugFilePath = "file:///workspace/target/debug-local/"; - - try { - // List files in debug directory - java.io.File debugDir = new java.io.File("/workspace/target/debug-local/"); - java.io.File[] files = debugDir.listFiles((dir, name) -> name.endsWith(".parquet.debug")); - - if (files != null && files.length > 0) { - String localFile = "file://" + files[0].getAbsolutePath(); - System.out.println("šŸ“ Found LOCAL_ONLY file: " + localFile); - System.out.println("šŸ“ File size: " + files[0].length() + " bytes"); - - // Try to read it directly - Dataset directRead = spark.read().parquet(localFile); - long count = directRead.count(); - System.out.println("āœ… Direct read successful! Row count: " + count); - - // Try SQL query on it - directRead.createOrReplaceTempView("employees_direct"); - Dataset filtered = spark.sql( - "SELECT name, salary FROM employees_direct WHERE department = 'Engineering'"); - long engineeringCount = filtered.count(); - System.out.println("āœ… SQL query successful! Engineering employees: " + engineeringCount); - - assertEquals("Should have 2 engineering employees", 2, engineeringCount); - - } else { - fail("No .debug files found in /workspace/target/debug-local/"); - } - - } catch (Exception e) { - System.err.println("āŒ Direct read failed: " + e.getMessage()); - e.printStackTrace(); - throw new RuntimeException("Direct file read failed", e); - } - } -} - diff --git a/test/java/spark/src/test/java/seaweed/spark/ParquetMemoryComparisonTest.java b/test/java/spark/src/test/java/seaweed/spark/ParquetMemoryComparisonTest.java deleted file mode 100644 index 764f88630..000000000 --- a/test/java/spark/src/test/java/seaweed/spark/ParquetMemoryComparisonTest.java +++ /dev/null @@ -1,299 +0,0 @@ -package seaweed.spark; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.example.data.Group; -import org.apache.parquet.example.data.simple.SimpleGroupFactory; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.example.GroupWriteSupport; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; - -import static org.junit.Assert.*; - -/** - * Test to compare in-memory Parquet file with SeaweedFS-stored Parquet file - * to identify what metadata differences cause the 78-byte EOF error. - */ -public class ParquetMemoryComparisonTest extends SparkTestBase { - - private static final String SCHEMA_STRING = - "message Employee { " + - " required int32 id; " + - " required binary name (UTF8); " + - " required int32 age; " + - "}"; - - private static final MessageType SCHEMA = MessageTypeParser.parseMessageType(SCHEMA_STRING); - - private FileSystem localFs; - private FileSystem seaweedFs; - - @Before - public void setUp() throws Exception { - if (!TESTS_ENABLED) { - return; - } - - Configuration conf = new Configuration(); - - // Local filesystem - localFs = FileSystem.getLocal(conf); - - // SeaweedFS - conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem"); - conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST); - conf.set("fs.seaweed.filer.port", SEAWEEDFS_PORT); - seaweedFs = FileSystem.get(java.net.URI.create("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT), conf); - - System.out.println("=== Test Setup Complete ==="); - System.out.println("Local FS: " + localFs.getClass().getName()); - System.out.println("SeaweedFS: " + seaweedFs.getClass().getName()); - } - - @After - public void tearDown() throws Exception { - if (localFs != null) { - localFs.close(); - } - if (seaweedFs != null) { - seaweedFs.close(); - } - } - - @Test - public void testCompareMemoryVsSeaweedFSParquet() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - System.out.println("\n=== PARQUET MEMORY vs SEAWEEDFS COMPARISON TEST ===\n"); - - // 1. Write identical Parquet file to local temp and SeaweedFS - Path localPath = new Path("/tmp/test-local-" + System.currentTimeMillis() + ".parquet"); - Path seaweedPath = new Path("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT + - "/test-spark/comparison-test.parquet"); - - System.out.println("Writing to local: " + localPath); - System.out.println("Writing to SeaweedFS: " + seaweedPath); - - // Write same data to both locations - writeTestParquetFile(localFs, localPath); - writeTestParquetFile(seaweedFs, seaweedPath); - - System.out.println("\n=== Files Written Successfully ===\n"); - - // 2. Read raw bytes from both files - byte[] localBytes = readAllBytes(localFs, localPath); - byte[] seaweedBytes = readAllBytes(seaweedFs, seaweedPath); - - System.out.println("Local file size: " + localBytes.length + " bytes"); - System.out.println("SeaweedFS file size: " + seaweedBytes.length + " bytes"); - - // 3. Compare byte-by-byte - if (localBytes.length != seaweedBytes.length) { - System.out.println("\nāŒ SIZE MISMATCH!"); - System.out.println("Difference: " + Math.abs(localBytes.length - seaweedBytes.length) + " bytes"); - } else { - System.out.println("\nāœ… Sizes match!"); - } - - // Find first difference - int firstDiff = -1; - int minLen = Math.min(localBytes.length, seaweedBytes.length); - for (int i = 0; i < minLen; i++) { - if (localBytes[i] != seaweedBytes[i]) { - firstDiff = i; - break; - } - } - - if (firstDiff >= 0) { - System.out.println("\nāŒ CONTENT DIFFERS at byte offset: " + firstDiff); - System.out.println("Context (20 bytes before and after):"); - printByteContext(localBytes, seaweedBytes, firstDiff, 20); - } else if (localBytes.length == seaweedBytes.length) { - System.out.println("\nāœ… Files are IDENTICAL!"); - } - - // 4. Parse Parquet metadata from both - System.out.println("\n=== Parquet Metadata Comparison ===\n"); - - ParquetMetadata localMeta = readParquetMetadata(localFs, localPath); - ParquetMetadata seaweedMeta = readParquetMetadata(seaweedFs, seaweedPath); - - System.out.println("Local metadata:"); - printParquetMetadata(localMeta); - - System.out.println("\nSeaweedFS metadata:"); - printParquetMetadata(seaweedMeta); - - // 5. Try reading both files with Parquet reader - System.out.println("\n=== Reading Files with ParquetFileReader ===\n"); - - try { - System.out.println("Reading local file..."); - int localRows = countParquetRows(localFs, localPath); - System.out.println("āœ… Local file: " + localRows + " rows read successfully"); - } catch (Exception e) { - System.out.println("āŒ Local file read failed: " + e.getMessage()); - e.printStackTrace(); - } - - try { - System.out.println("\nReading SeaweedFS file..."); - int seaweedRows = countParquetRows(seaweedFs, seaweedPath); - System.out.println("āœ… SeaweedFS file: " + seaweedRows + " rows read successfully"); - } catch (Exception e) { - System.out.println("āŒ SeaweedFS file read failed: " + e.getMessage()); - System.out.println("Error type: " + e.getClass().getName()); - if (e.getMessage() != null && e.getMessage().contains("bytes left")) { - System.out.println("šŸŽÆ THIS IS THE 78-BYTE EOF ERROR!"); - } - e.printStackTrace(); - } - - // Cleanup - localFs.delete(localPath, false); - seaweedFs.delete(seaweedPath, false); - - System.out.println("\n=== Test Complete ===\n"); - } - - private void writeTestParquetFile(FileSystem fs, Path path) throws IOException { - Configuration conf = fs.getConf(); - GroupWriteSupport.setSchema(SCHEMA, conf); - - try (ParquetWriter writer = org.apache.parquet.hadoop.example.ExampleParquetWriter.builder(path) - .withConf(conf) - .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE) - .build()) { - - SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA); - - // Write 3 rows (same as Spark test) - Group group1 = factory.newGroup() - .append("id", 1) - .append("name", "Alice") - .append("age", 30); - writer.write(group1); - - Group group2 = factory.newGroup() - .append("id", 2) - .append("name", "Bob") - .append("age", 25); - writer.write(group2); - - Group group3 = factory.newGroup() - .append("id", 3) - .append("name", "Charlie") - .append("age", 35); - writer.write(group3); - } - } - - private byte[] readAllBytes(FileSystem fs, Path path) throws IOException { - try (FSDataInputStream in = fs.open(path)) { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - byte[] chunk = new byte[8192]; - int bytesRead; - while ((bytesRead = in.read(chunk)) != -1) { - buffer.write(chunk, 0, bytesRead); - } - return buffer.toByteArray(); - } - } - - private ParquetMetadata readParquetMetadata(FileSystem fs, Path path) throws IOException { - Configuration conf = fs.getConf(); - try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) { - return reader.getFooter(); - } - } - - private void printParquetMetadata(ParquetMetadata meta) { - System.out.println(" Blocks: " + meta.getBlocks().size()); - meta.getBlocks().forEach(block -> { - System.out.println(" Block rows: " + block.getRowCount()); - System.out.println(" Block total size: " + block.getTotalByteSize()); - System.out.println(" Block compressed size: " + block.getCompressedSize()); - System.out.println(" Columns: " + block.getColumns().size()); - block.getColumns().forEach(col -> { - System.out.println(" Column: " + col.getPath()); - System.out.println(" Starting pos: " + col.getStartingPos()); - System.out.println(" Total size: " + col.getTotalSize()); - System.out.println(" Total uncompressed: " + col.getTotalUncompressedSize()); - }); - }); - } - - private int countParquetRows(FileSystem fs, Path path) throws IOException { - Configuration conf = fs.getConf(); - int rowCount = 0; - try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) { - org.apache.parquet.column.page.PageReadStore pages; - while ((pages = reader.readNextRowGroup()) != null) { - rowCount += pages.getRowCount(); - } - } - return rowCount; - } - - private void printByteContext(byte[] local, byte[] seaweed, int offset, int context) { - int start = Math.max(0, offset - context); - int endLocal = Math.min(local.length, offset + context); - int endSeaweed = Math.min(seaweed.length, offset + context); - - System.out.println("\nLocal bytes [" + start + " to " + endLocal + "]:"); - printHexDump(local, start, endLocal, offset); - - System.out.println("\nSeaweedFS bytes [" + start + " to " + endSeaweed + "]:"); - printHexDump(seaweed, start, endSeaweed, offset); - } - - private void printHexDump(byte[] bytes, int start, int end, int highlight) { - StringBuilder hex = new StringBuilder(); - StringBuilder ascii = new StringBuilder(); - - for (int i = start; i < end; i++) { - if (i > start && i % 16 == 0) { - System.out.printf("%04x: %-48s %s\n", i - 16, hex.toString(), ascii.toString()); - hex.setLength(0); - ascii.setLength(0); - } - - byte b = bytes[i]; - String hexStr = String.format("%02x ", b & 0xFF); - if (i == highlight) { - hexStr = "[" + hexStr.trim() + "] "; - } - hex.append(hexStr); - - char c = (b >= 32 && b < 127) ? (char) b : '.'; - if (i == highlight) { - ascii.append('[').append(c).append(']'); - } else { - ascii.append(c); - } - } - - if (hex.length() > 0) { - System.out.printf("%04x: %-48s %s\n", (end / 16) * 16, hex.toString(), ascii.toString()); - } - } -} - diff --git a/test/java/spark/src/test/java/seaweed/spark/ShadowVsLocalOnlyComparisonTest.java b/test/java/spark/src/test/java/seaweed/spark/ShadowVsLocalOnlyComparisonTest.java deleted file mode 100644 index 966abbf61..000000000 --- a/test/java/spark/src/test/java/seaweed/spark/ShadowVsLocalOnlyComparisonTest.java +++ /dev/null @@ -1,214 +0,0 @@ -package seaweed.spark; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.util.Arrays; -import java.util.List; - -import static org.junit.Assert.*; - -/** - * CRITICAL TEST: Compare shadow file (reference) with LOCAL_ONLY mode output. - * - * This test: - * 1. Writes with SHADOW mode enabled → produces reference file - * 2. Writes with LOCAL_ONLY mode → produces local-only file - * 3. Compares the two files byte-by-byte - * 4. Attempts to read both with Spark SQL - */ -public class ShadowVsLocalOnlyComparisonTest extends SparkTestBase { - - private String shadowDir; - private String localOnlyDir; - - @Before - public void setUp() throws Exception { - super.setUpSpark(); - shadowDir = "/workspace/target/shadow-comparison"; - localOnlyDir = "/workspace/target/local-only-comparison"; - - // Clean up previous runs - deleteDirectory(new File(shadowDir)); - deleteDirectory(new File(localOnlyDir)); - - new File(shadowDir).mkdirs(); - new File(localOnlyDir).mkdirs(); - } - - @After - public void tearDown() throws Exception { - super.tearDownSpark(); - } - - @Test - public void testShadowVsLocalOnlyComparison() throws IOException { - skipIfTestsDisabled(); - - System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); - System.out.println("ā•‘ CRITICAL: Shadow vs LOCAL_ONLY Comparison ā•‘"); - System.out.println("ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•"); - - List employees = Arrays.asList( - new Employee(1, "Alice", "Engineering", 100000), - new Employee(2, "Bob", "Sales", 80000), - new Employee(3, "Charlie", "Engineering", 120000), - new Employee(4, "David", "Sales", 75000)); - - Dataset df = spark.createDataFrame(employees, Employee.class); - - // PHASE 1: Write with SHADOW mode - System.out.println("\n=== PHASE 1: Write with SHADOW mode (creates reference) ==="); - System.setProperty("SEAWEEDFS_SHADOW_MODE", "true"); - System.setProperty("SEAWEEDFS_DEBUG_MODE", "SEAWEED_ONLY"); - spark.conf().set("fs.seaweedfs.shadow.dir", shadowDir); - - String shadowOutputPath = "seaweedfs://seaweedfs-filer:8888/test-spark/shadow-test/employees"; - df.write().mode(SaveMode.Overwrite).parquet(shadowOutputPath); - - File[] shadowFiles = new File(shadowDir).listFiles((dir, name) -> name.endsWith(".shadow")); - assertNotNull("Shadow files should exist", shadowFiles); - assertTrue("Should have at least one shadow file", shadowFiles.length > 0); - File shadowFile = shadowFiles[0]; - System.out.println("Shadow file: " + shadowFile.getName() + " (" + shadowFile.length() + " bytes)"); - - // PHASE 2: Write with LOCAL_ONLY mode - System.out.println("\n=== PHASE 2: Write with LOCAL_ONLY mode ==="); - System.setProperty("SEAWEEDFS_SHADOW_MODE", "false"); - System.setProperty("SEAWEEDFS_DEBUG_MODE", "LOCAL_ONLY"); - spark.conf().set("fs.seaweedfs.debug.dir", localOnlyDir); - - String localOnlyOutputPath = "seaweedfs://seaweedfs-filer:8888/test-spark/local-only-test/employees"; - df.write().mode(SaveMode.Overwrite).parquet(localOnlyOutputPath); - - File[] localOnlyFiles = new File(localOnlyDir).listFiles((dir, name) -> name.endsWith(".debug")); - assertNotNull("LOCAL_ONLY files should exist", localOnlyFiles); - assertTrue("Should have at least one LOCAL_ONLY file", localOnlyFiles.length > 0); - File localOnlyFile = localOnlyFiles[0]; - System.out.println("LOCAL_ONLY file: " + localOnlyFile.getName() + " (" + localOnlyFile.length() + " bytes)"); - - // PHASE 3: Compare files byte-by-byte - System.out.println("\n=== PHASE 3: Compare files byte-by-byte ==="); - assertEquals("File sizes should match", shadowFile.length(), localOnlyFile.length()); - - byte[] shadowBytes = Files.readAllBytes(shadowFile.toPath()); - byte[] localOnlyBytes = Files.readAllBytes(localOnlyFile.toPath()); - - System.out.println("Comparing " + shadowBytes.length + " bytes..."); - - // Compare byte-by-byte and report first difference - boolean identical = true; - for (int i = 0; i < shadowBytes.length; i++) { - if (shadowBytes[i] != localOnlyBytes[i]) { - identical = false; - System.err.println("āŒ FIRST DIFFERENCE at byte " + i + ":"); - System.err.println(" Shadow: 0x" + String.format("%02x", shadowBytes[i] & 0xFF)); - System.err.println(" LOCAL_ONLY: 0x" + String.format("%02x", localOnlyBytes[i] & 0xFF)); - - // Show context - int contextStart = Math.max(0, i - 10); - int contextEnd = Math.min(shadowBytes.length, i + 10); - System.err.println(" Context (shadow):"); - for (int j = contextStart; j < contextEnd; j++) { - System.err.print(String.format("%02x ", shadowBytes[j] & 0xFF)); - } - System.err.println(); - System.err.println(" Context (local_only):"); - for (int j = contextStart; j < contextEnd; j++) { - System.err.print(String.format("%02x ", localOnlyBytes[j] & 0xFF)); - } - System.err.println(); - break; - } - } - - if (identical) { - System.out.println("āœ… Files are IDENTICAL!"); - } else { - fail("Files are NOT identical"); - } - - // PHASE 4: Try reading shadow file with Spark - System.out.println("\n=== PHASE 4: Try reading shadow file with Spark ==="); - try { - // Copy shadow file to a location Spark can read - String testPath = "file://" + shadowDir + "/test.parquet"; - Files.copy(shadowFile.toPath(), new File(shadowDir + "/test.parquet").toPath()); - - Dataset shadowDf = spark.read().parquet(testPath); - shadowDf.createOrReplaceTempView("shadow_test"); - Dataset shadowResult = spark.sql("SELECT * FROM shadow_test WHERE department = 'Engineering'"); - System.out.println("āœ… Shadow file SQL query: " + shadowResult.count() + " rows"); - } catch (Exception e) { - System.err.println("āŒ Shadow file SQL query FAILED: " + e.getMessage()); - e.printStackTrace(); - } - - // PHASE 5: Try reading LOCAL_ONLY file with Spark - System.out.println("\n=== PHASE 5: Try reading LOCAL_ONLY file with Spark ==="); - try { - Dataset localOnlyDf = spark.read().parquet(localOnlyOutputPath); - localOnlyDf.createOrReplaceTempView("local_only_test"); - Dataset localOnlyResult = spark.sql("SELECT * FROM local_only_test WHERE department = 'Engineering'"); - System.out.println("āœ… LOCAL_ONLY SQL query: " + localOnlyResult.count() + " rows"); - } catch (Exception e) { - System.err.println("āŒ LOCAL_ONLY SQL query FAILED: " + e.getMessage()); - assertTrue("Expected 78-byte EOF error", e.getMessage().contains("78 bytes left")); - } - - System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); - System.out.println("ā•‘ Comparison complete. See logs for details. ā•‘"); - System.out.println("ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•"); - } - - private void deleteDirectory(File dir) { - if (dir.exists()) { - File[] files = dir.listFiles(); - if (files != null) { - for (File file : files) { - if (file.isDirectory()) { - deleteDirectory(file); - } else { - file.delete(); - } - } - } - dir.delete(); - } - } - - public static class Employee implements java.io.Serializable { - private int id; - private String name; - private String department; - private int salary; - - public Employee() {} - - public Employee(int id, String name, String department, int salary) { - this.id = id; - this.name = name; - this.department = department; - this.salary = salary; - } - - public int getId() { return id; } - public void setId(int id) { this.id = id; } - public String getName() { return name; } - public void setName(String name) { this.name = name; } - public String getDepartment() { return department; } - public void setDepartment(String department) { this.department = department; } - public int getSalary() { return salary; } - public void setSalary(int salary) { this.salary = salary; } - } -} - diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkSQLReadDifferenceTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkSQLReadDifferenceTest.java deleted file mode 100644 index f9cc58f38..000000000 --- a/test/java/spark/src/test/java/seaweed/spark/SparkSQLReadDifferenceTest.java +++ /dev/null @@ -1,264 +0,0 @@ -package seaweed.spark; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.util.Arrays; -import java.util.List; - -import static org.junit.Assert.*; - -/** - * CRITICAL DIAGNOSTIC TEST: Compare the exact sequence of FileSystem operations - * between RawLocalFS (works) and LOCAL_ONLY (fails) during SQL query execution. - * - * This test will help us understand what's different about how Spark SQL - * interacts with SeaweedFileSystem vs RawLocalFileSystem. - */ -public class SparkSQLReadDifferenceTest extends SparkTestBase { - - private String rawLocalDir; - private String localOnlyDir; - private FileSystem rawLocalFs; - - @Before - public void setUp() throws Exception { - // Enable detailed logging - System.setProperty("seaweedfs.detailed.logging", "true"); - super.setUpSpark(); - - // Set up RawLocalFileSystem directory - rawLocalDir = "/tmp/spark-sql-diff-rawlocal-" + System.currentTimeMillis(); - new File(rawLocalDir).mkdirs(); - - Configuration conf = spark.sparkContext().hadoopConfiguration(); - rawLocalFs = new RawLocalFileSystem(); - rawLocalFs.initialize(new URI("file:///"), conf); - rawLocalFs.delete(new Path(rawLocalDir), true); - rawLocalFs.mkdirs(new Path(rawLocalDir)); - - // Set up LOCAL_ONLY directory - localOnlyDir = "/workspace/target/debug-sql-diff"; - new File(localOnlyDir).mkdirs(); - for (File f : new File(localOnlyDir).listFiles()) { - f.delete(); - } - - System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); - System.out.println("ā•‘ SQL READ DIFFERENCE TEST: RawLocalFS vs LOCAL_ONLY ā•‘"); - System.out.println("ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•"); - } - - @After - public void tearDown() throws Exception { - if (rawLocalFs != null) { - rawLocalFs.delete(new Path(rawLocalDir), true); - rawLocalFs.close(); - } - super.tearDownSpark(); - } - - @Test - public void testSQLReadDifference() throws IOException { - // Create test data - List employees = Arrays.asList( - new Employee(1, "Alice", "Engineering", 100000), - new Employee(2, "Bob", "Sales", 80000), - new Employee(3, "Charlie", "Engineering", 120000), - new Employee(4, "David", "Sales", 75000)); - - Dataset df = spark.createDataFrame(employees, Employee.class); - - // ======================================================================== - // PART 1: RawLocalFS - SQL Query (WORKS) - // ======================================================================== - System.out.println("\n" + "=".repeat(70)); - System.out.println("PART 1: RawLocalFS - SQL Query (Expected to WORK)"); - System.out.println("=".repeat(70)); - - String rawLocalPath = "file://" + rawLocalDir + "/employees"; - System.out.println("Writing to: " + rawLocalPath); - df.write().mode(SaveMode.Overwrite).parquet(rawLocalPath); - System.out.println("āœ… Write completed\n"); - - System.out.println("--- Executing SQL Query on RawLocalFS ---"); - try { - Dataset rawDf = spark.read().parquet(rawLocalPath); - System.out.println("āœ… Initial read successful"); - - rawDf.createOrReplaceTempView("employees_raw"); - System.out.println("āœ… Temp view created"); - - System.out.println("\nExecuting: SELECT name, salary FROM employees_raw WHERE department = 'Engineering'"); - Dataset rawResult = spark.sql("SELECT name, salary FROM employees_raw WHERE department = 'Engineering'"); - - System.out.println("Triggering execution with count()..."); - long rawCount = rawResult.count(); - - System.out.println("āœ… RawLocalFS SQL query SUCCESSFUL! Row count: " + rawCount); - assertEquals("Should have 2 engineering employees", 2, rawCount); - - System.out.println("\nāœ…āœ…āœ… RawLocalFS: ALL OPERATIONS SUCCESSFUL āœ…āœ…āœ…\n"); - } catch (Exception e) { - System.err.println("āŒ RawLocalFS SQL query FAILED (unexpected!): " + e.getMessage()); - e.printStackTrace(); - fail("RawLocalFS should not fail!"); - } - - // ======================================================================== - // PART 2: LOCAL_ONLY - SQL Query (FAILS) - // ======================================================================== - System.out.println("\n" + "=".repeat(70)); - System.out.println("PART 2: LOCAL_ONLY - SQL Query (Expected to FAIL with 78-byte error)"); - System.out.println("=".repeat(70)); - - // Enable LOCAL_ONLY mode - System.setProperty("SEAWEEDFS_DEBUG_MODE", "LOCAL_ONLY"); - spark.sparkContext().hadoopConfiguration().set("fs.seaweedfs.debug.dir", localOnlyDir); - - String localOnlyPath = getTestPath("employees_localonly"); - System.out.println("Writing to: " + localOnlyPath); - df.write().mode(SaveMode.Overwrite).parquet(localOnlyPath); - System.out.println("āœ… Write completed\n"); - - System.out.println("--- Executing SQL Query on LOCAL_ONLY ---"); - try { - Dataset localDf = spark.read().parquet(localOnlyPath); - System.out.println("āœ… Initial read successful"); - - localDf.createOrReplaceTempView("employees_local"); - System.out.println("āœ… Temp view created"); - - System.out.println("\nExecuting: SELECT name, salary FROM employees_local WHERE department = 'Engineering'"); - Dataset localResult = spark.sql("SELECT name, salary FROM employees_local WHERE department = 'Engineering'"); - - System.out.println("Triggering execution with count()..."); - long localCount = localResult.count(); - - System.out.println("āœ… LOCAL_ONLY SQL query SUCCESSFUL! Row count: " + localCount); - assertEquals("Should have 2 engineering employees", 2, localCount); - - System.out.println("\nāœ…āœ…āœ… LOCAL_ONLY: ALL OPERATIONS SUCCESSFUL āœ…āœ…āœ…\n"); - } catch (Exception e) { - System.err.println("\nāŒāŒāŒ LOCAL_ONLY SQL query FAILED āŒāŒāŒ"); - System.err.println("Error: " + e.getMessage()); - - if (e.getMessage() != null && e.getMessage().contains("78 bytes")) { - System.err.println("\nšŸ” CONFIRMED: 78-byte EOF error!"); - System.err.println("This error occurs during SQL query execution on LOCAL_ONLY mode."); - } - - System.err.println("\nFull stack trace:"); - e.printStackTrace(); - - System.err.println("\n" + "=".repeat(70)); - System.err.println("ANALYSIS: Comparing RawLocalFS (works) vs LOCAL_ONLY (fails)"); - System.err.println("=".repeat(70)); - System.err.println(); - System.err.println("Both tests:"); - System.err.println(" - Write identical data (same DataFrame)"); - System.err.println(" - Execute identical SQL query"); - System.err.println(" - Use identical Spark configuration"); - System.err.println(); - System.err.println("Key differences:"); - System.err.println(" 1. Path scheme:"); - System.err.println(" - RawLocalFS: file:///tmp/..."); - System.err.println(" - LOCAL_ONLY: seaweedfs://seaweedfs-filer:8888/..."); - System.err.println(); - System.err.println(" 2. FileSystem implementation:"); - System.err.println(" - RawLocalFS: Hadoop's native RawLocalFileSystem"); - System.err.println(" - LOCAL_ONLY: SeaweedFileSystem (but writes to local disk)"); - System.err.println(); - System.err.println(" 3. InputStream type:"); - System.err.println(" - RawLocalFS: LocalFSFileInputStream"); - System.err.println(" - LOCAL_ONLY: SeaweedHadoopInputStream -> LocalOnlyInputStream"); - System.err.println(); - System.err.println("The 78-byte error suggests that:"); - System.err.println(" - Spark SQL expects to read 78 more bytes"); - System.err.println(" - But the InputStream reports EOF"); - System.err.println(" - This happens even though the file is correct (1260 bytes)"); - System.err.println(); - System.err.println("Possible causes:"); - System.err.println(" 1. getFileStatus() returns wrong file size"); - System.err.println(" 2. InputStream.available() returns wrong value"); - System.err.println(" 3. Seek operations don't work correctly"); - System.err.println(" 4. Multiple InputStreams interfere with each other"); - System.err.println(" 5. Metadata is cached incorrectly between operations"); - System.err.println(); - - // Don't fail the test - we want to see the full output - // fail("LOCAL_ONLY failed as expected"); - } - - // ======================================================================== - // PART 3: Compare Files - // ======================================================================== - System.out.println("\n" + "=".repeat(70)); - System.out.println("PART 3: File Comparison"); - System.out.println("=".repeat(70)); - - File rawLocalParquetDir = new File(rawLocalDir + "/employees"); - File[] rawLocalFiles = rawLocalParquetDir.listFiles((dir, name) -> name.endsWith(".parquet")); - - File[] localOnlyFiles = new File(localOnlyDir).listFiles((dir, name) -> name.endsWith(".parquet.debug")); - - if (rawLocalFiles != null && rawLocalFiles.length > 0 && - localOnlyFiles != null && localOnlyFiles.length > 0) { - - File rawFile = rawLocalFiles[0]; - File localFile = localOnlyFiles[0]; - - System.out.println("\nRawLocalFS file: " + rawFile.getName() + " (" + rawFile.length() + " bytes)"); - System.out.println("LOCAL_ONLY file: " + localFile.getName() + " (" + localFile.length() + " bytes)"); - - if (rawFile.length() == localFile.length()) { - System.out.println("āœ… File sizes match!"); - } else { - System.out.println("āŒ File size mismatch: " + (rawFile.length() - localFile.length()) + " bytes"); - } - } - - System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); - System.out.println("ā•‘ TEST COMPLETE - Check logs above for differences ā•‘"); - System.out.println("ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•"); - } - - // Employee class for Spark DataFrame - public static class Employee implements java.io.Serializable { - private int id; - private String name; - private String department; - private int salary; - - public Employee() {} // Required for Spark - - public Employee(int id, String name, String department, int salary) { - this.id = id; - this.name = name; - this.department = department; - this.salary = salary; - } - - // Getters and Setters (required for Spark) - public int getId() { return id; } - public void setId(int id) { this.id = id; } - public String getName() { return name; } - public void setName(String name) { this.name = name; } - public String getDepartment() { return department; } - public void setDepartment(String department) { this.department = department; } - public int getSalary() { return salary; } - public void setSalary(int salary) { this.salary = salary; } - } -} - diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkShadowComparisonTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkShadowComparisonTest.java deleted file mode 100644 index ddc645abc..000000000 --- a/test/java/spark/src/test/java/seaweed/spark/SparkShadowComparisonTest.java +++ /dev/null @@ -1,306 +0,0 @@ -package seaweed.spark; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.nio.file.Files; -import java.util.Arrays; -import java.util.List; - -import static org.junit.Assert.*; - -/** - * CRITICAL COMPARISON TEST: Use RawLocalFileSystem as a "shadow" to compare - * all I/O operations with LOCAL_ONLY mode. - * - * This test writes the same data to both: - * 1. RawLocalFileSystem (file://) - Known to work - * 2. SeaweedFS LOCAL_ONLY mode (seaweedfs://) - Has 78-byte error - * - * Then compares the resulting files byte-by-byte to find the exact difference. - */ -public class SparkShadowComparisonTest extends SparkTestBase { - - private String rawLocalDir; - private String localOnlyDir; - private FileSystem rawLocalFs; - - @Before - public void setUp() throws Exception { - super.setUpSpark(); - - // Set up RawLocalFileSystem directory - rawLocalDir = "/tmp/spark-shadow-rawlocal-" + System.currentTimeMillis(); - new File(rawLocalDir).mkdirs(); - - Configuration conf = spark.sparkContext().hadoopConfiguration(); - rawLocalFs = new RawLocalFileSystem(); - rawLocalFs.initialize(new URI("file:///"), conf); - rawLocalFs.delete(new Path(rawLocalDir), true); - rawLocalFs.mkdirs(new Path(rawLocalDir)); - - // Set up LOCAL_ONLY directory (will be in debug dir) - localOnlyDir = "/workspace/target/debug-shadow"; - new File(localOnlyDir).mkdirs(); - - // Clean up previous runs - for (File f : new File(localOnlyDir).listFiles()) { - f.delete(); - } - - System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); - System.out.println("ā•‘ SHADOW COMPARISON: RawLocalFS vs LOCAL_ONLY ā•‘"); - System.out.println("ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•"); - System.out.println("RawLocalFS directory: " + rawLocalDir); - System.out.println("LOCAL_ONLY directory: " + localOnlyDir); - } - - @After - public void tearDown() throws Exception { - if (rawLocalFs != null) { - rawLocalFs.delete(new Path(rawLocalDir), true); - rawLocalFs.close(); - } - super.tearDownSpark(); - } - - @Test - public void testShadowComparison() throws IOException { - System.out.println("\n=== PHASE 1: Write to RawLocalFileSystem ==="); - - // Create test data - List employees = Arrays.asList( - new Employee(1, "Alice", "Engineering", 100000), - new Employee(2, "Bob", "Sales", 80000), - new Employee(3, "Charlie", "Engineering", 120000), - new Employee(4, "David", "Sales", 75000)); - - Dataset df = spark.createDataFrame(employees, Employee.class); - - // Write to RawLocalFileSystem - String rawLocalPath = "file://" + rawLocalDir + "/employees"; - System.out.println("Writing to RawLocalFS: " + rawLocalPath); - - try { - df.write().mode(SaveMode.Overwrite).parquet(rawLocalPath); - System.out.println("āœ… RawLocalFS write completed successfully!"); - } catch (Exception e) { - System.err.println("āŒ RawLocalFS write FAILED: " + e.getMessage()); - e.printStackTrace(); - fail("RawLocalFS write should not fail!"); - } - - // List files written by RawLocalFS - File rawLocalParquetDir = new File(rawLocalDir + "/employees"); - File[] rawLocalFiles = rawLocalParquetDir.listFiles((dir, name) -> name.endsWith(".parquet")); - assertNotNull("RawLocalFS should have written files", rawLocalFiles); - assertTrue("RawLocalFS should have at least one parquet file", rawLocalFiles.length > 0); - - System.out.println("RawLocalFS wrote " + rawLocalFiles.length + " parquet file(s):"); - for (File f : rawLocalFiles) { - System.out.println(" - " + f.getName() + " (" + f.length() + " bytes)"); - } - - System.out.println("\n=== PHASE 2: Write to LOCAL_ONLY mode ==="); - - // Set environment for LOCAL_ONLY mode - System.setProperty("SEAWEEDFS_DEBUG_MODE", "LOCAL_ONLY"); - spark.sparkContext().hadoopConfiguration().set("fs.seaweedfs.debug.dir", localOnlyDir); - - // Write to LOCAL_ONLY - String localOnlyPath = getTestPath("employees_localonly"); - System.out.println("Writing to LOCAL_ONLY: " + localOnlyPath); - - boolean localOnlyWriteSucceeded = false; - try { - df.write().mode(SaveMode.Overwrite).parquet(localOnlyPath); - System.out.println("āœ… LOCAL_ONLY write completed successfully!"); - localOnlyWriteSucceeded = true; - } catch (Exception e) { - System.err.println("āš ļø LOCAL_ONLY write completed but may have issues: " + e.getMessage()); - // Don't fail here - we want to compare files even if write "succeeded" - } - - // List files written by LOCAL_ONLY - File[] localOnlyFiles = new File(localOnlyDir).listFiles((dir, name) -> name.endsWith(".debug")); - if (localOnlyFiles == null || localOnlyFiles.length == 0) { - System.err.println("āŒ LOCAL_ONLY did not write any .debug files!"); - fail("LOCAL_ONLY should have written .debug files"); - } - - System.out.println("LOCAL_ONLY wrote " + localOnlyFiles.length + " .debug file(s):"); - for (File f : localOnlyFiles) { - System.out.println(" - " + f.getName() + " (" + f.length() + " bytes)"); - } - - System.out.println("\n=== PHASE 3: Compare Files Byte-by-Byte ==="); - - // Match files by pattern (both should have part-00000-*.snappy.parquet) - File rawFile = rawLocalFiles[0]; // Should only be one file - File localOnlyFile = null; - - // Find the .debug file that looks like a parquet file - for (File f : localOnlyFiles) { - if (f.getName().contains("part-") && f.getName().endsWith(".parquet.debug")) { - localOnlyFile = f; - break; - } - } - - if (localOnlyFile == null) { - System.out.println("āŒ Could not find LOCAL_ONLY parquet file!"); - System.out.println("Available .debug files:"); - for (File f : localOnlyFiles) { - System.out.println(" - " + f.getName()); - } - fail("LOCAL_ONLY should have written a parquet .debug file"); - } - - System.out.println("\nComparing:"); - System.out.println(" RawLocalFS: " + rawFile.getName() + " (" + rawFile.length() + " bytes)"); - System.out.println(" LOCAL_ONLY: " + localOnlyFile.getName() + " (" + localOnlyFile.length() + " bytes)"); - - // Compare file sizes - long sizeDiff = rawFile.length() - localOnlyFile.length(); - if (sizeDiff != 0) { - System.out.println(" āš ļø SIZE DIFFERENCE: " + sizeDiff + " bytes"); - System.out.println(" RawLocalFS is " + (sizeDiff > 0 ? "LARGER" : "SMALLER") + " by " + Math.abs(sizeDiff) + " bytes"); - - if (Math.abs(sizeDiff) == 78) { - System.out.println(" šŸ” THIS IS THE 78-BYTE DIFFERENCE!"); - } - } else { - System.out.println(" āœ… File sizes match!"); - } - - // Compare file contents byte-by-byte - byte[] rawBytes = Files.readAllBytes(rawFile.toPath()); - byte[] localOnlyBytes = Files.readAllBytes(localOnlyFile.toPath()); - - int minLen = Math.min(rawBytes.length, localOnlyBytes.length); - int firstDiffIndex = -1; - - for (int i = 0; i < minLen; i++) { - if (rawBytes[i] != localOnlyBytes[i]) { - firstDiffIndex = i; - break; - } - } - - if (firstDiffIndex >= 0) { - System.out.println(" āš ļø CONTENT DIFFERS at byte offset: " + firstDiffIndex); - System.out.println(" Showing 32 bytes around difference:"); - - int start = Math.max(0, firstDiffIndex - 16); - int end = Math.min(minLen, firstDiffIndex + 16); - - System.out.print(" RawLocalFS: "); - for (int i = start; i < end; i++) { - System.out.printf("%02X ", rawBytes[i]); - if (i == firstDiffIndex) System.out.print("| "); - } - System.out.println(); - - System.out.print(" LOCAL_ONLY: "); - for (int i = start; i < end; i++) { - System.out.printf("%02X ", localOnlyBytes[i]); - if (i == firstDiffIndex) System.out.print("| "); - } - System.out.println(); - } else if (rawBytes.length == localOnlyBytes.length) { - System.out.println(" āœ… File contents are IDENTICAL!"); - } else { - System.out.println(" āš ļø Files match up to " + minLen + " bytes, but differ in length"); - - // Show the extra bytes - if (rawBytes.length > localOnlyBytes.length) { - System.out.println(" RawLocalFS has " + (rawBytes.length - minLen) + " extra bytes at end:"); - System.out.print(" "); - for (int i = minLen; i < Math.min(rawBytes.length, minLen + 32); i++) { - System.out.printf("%02X ", rawBytes[i]); - } - System.out.println(); - } else { - System.out.println(" LOCAL_ONLY has " + (localOnlyBytes.length - minLen) + " extra bytes at end:"); - System.out.print(" "); - for (int i = minLen; i < Math.min(localOnlyBytes.length, minLen + 32); i++) { - System.out.printf("%02X ", localOnlyBytes[i]); - } - System.out.println(); - } - } - - System.out.println("\n=== PHASE 4: Try Reading Both Files ==="); - - // Try reading RawLocalFS file - System.out.println("\nReading from RawLocalFS:"); - try { - Dataset rawDf = spark.read().parquet(rawLocalPath); - long rawCount = rawDf.count(); - System.out.println("āœ… RawLocalFS read successful! Row count: " + rawCount); - assertEquals("Should have 4 employees", 4, rawCount); - } catch (Exception e) { - System.err.println("āŒ RawLocalFS read FAILED: " + e.getMessage()); - e.printStackTrace(); - fail("RawLocalFS read should not fail!"); - } - - // Try reading LOCAL_ONLY file - System.out.println("\nReading from LOCAL_ONLY:"); - try { - Dataset localOnlyDf = spark.read().parquet(localOnlyPath); - long localOnlyCount = localOnlyDf.count(); - System.out.println("āœ… LOCAL_ONLY read successful! Row count: " + localOnlyCount); - assertEquals("Should have 4 employees", 4, localOnlyCount); - } catch (Exception e) { - System.err.println("āŒ LOCAL_ONLY read FAILED: " + e.getMessage()); - if (e.getMessage() != null && e.getMessage().contains("78 bytes")) { - System.err.println("šŸ” CONFIRMED: 78-byte error occurs during READ, not WRITE!"); - } - // Don't fail - we expect this to fail - } - - System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); - System.out.println("ā•‘ SHADOW COMPARISON COMPLETE ā•‘"); - System.out.println("ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•"); - } - - // Employee class for Spark DataFrame - public static class Employee implements java.io.Serializable { - private int id; - private String name; - private String department; - private int salary; - - public Employee() {} // Required for Spark - - public Employee(int id, String name, String department, int salary) { - this.id = id; - this.name = name; - this.department = department; - this.salary = salary; - } - - // Getters and Setters (required for Spark) - public int getId() { return id; } - public void setId(int id) { this.id = id; } - public String getName() { return name; } - public void setName(String name) { this.name = name; } - public String getDepartment() { return department; } - public void setDepartment(String department) { this.department = department; } - public int getSalary() { return salary; } - public void setSalary(int salary) { this.salary = salary; } - } -} - diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkShadowReadComparisonTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkShadowReadComparisonTest.java deleted file mode 100644 index 99cdaaa81..000000000 --- a/test/java/spark/src/test/java/seaweed/spark/SparkShadowReadComparisonTest.java +++ /dev/null @@ -1,343 +0,0 @@ -package seaweed.spark; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; - -import static org.junit.Assert.*; - -/** - * CRITICAL READ COMPARISON TEST: Compare all read operations between RawLocalFileSystem - * and SeaweedFS LOCAL_ONLY mode. - * - * This test: - * 1. Writes identical data to both RawLocalFS and LOCAL_ONLY - * 2. Performs the same read operations on both - * 3. Compares the results of each read operation - * 4. Identifies where the divergence happens - */ -public class SparkShadowReadComparisonTest extends SparkTestBase { - - private String rawLocalDir; - private String localOnlyDir; - private FileSystem rawLocalFs; - private FileSystem seaweedFs; - private String rawLocalParquetFile; - private String localOnlyParquetFile; - - @Before - public void setUp() throws Exception { - super.setUpSpark(); - - // Set up RawLocalFileSystem directory - rawLocalDir = "/tmp/spark-shadow-read-rawlocal-" + System.currentTimeMillis(); - new File(rawLocalDir).mkdirs(); - - Configuration conf = spark.sparkContext().hadoopConfiguration(); - rawLocalFs = new RawLocalFileSystem(); - rawLocalFs.initialize(new URI("file:///"), conf); - rawLocalFs.delete(new Path(rawLocalDir), true); - rawLocalFs.mkdirs(new Path(rawLocalDir)); - - // Set up LOCAL_ONLY directory - localOnlyDir = "/workspace/target/debug-shadow-read"; - new File(localOnlyDir).mkdirs(); - for (File f : new File(localOnlyDir).listFiles()) { - f.delete(); - } - - // Get SeaweedFS instance - seaweedFs = FileSystem.get(URI.create("seaweedfs://seaweedfs-filer:8888"), conf); - - System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); - System.out.println("ā•‘ SHADOW READ COMPARISON: RawLocalFS vs LOCAL_ONLY ā•‘"); - System.out.println("ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•"); - System.out.println("RawLocalFS directory: " + rawLocalDir); - System.out.println("LOCAL_ONLY directory: " + localOnlyDir); - } - - @After - public void tearDown() throws Exception { - if (rawLocalFs != null) { - rawLocalFs.delete(new Path(rawLocalDir), true); - rawLocalFs.close(); - } - super.tearDownSpark(); - } - - @Test - public void testShadowReadComparison() throws IOException { - System.out.println("\n=== PHASE 1: Write Identical Data to Both FileSystems ==="); - - // Create test data - List employees = Arrays.asList( - new Employee(1, "Alice", "Engineering", 100000), - new Employee(2, "Bob", "Sales", 80000), - new Employee(3, "Charlie", "Engineering", 120000), - new Employee(4, "David", "Sales", 75000)); - - Dataset df = spark.createDataFrame(employees, Employee.class); - - // Write to RawLocalFileSystem - String rawLocalPath = "file://" + rawLocalDir + "/employees"; - System.out.println("Writing to RawLocalFS: " + rawLocalPath); - df.write().mode(SaveMode.Overwrite).parquet(rawLocalPath); - System.out.println("āœ… RawLocalFS write completed"); - - // Set environment for LOCAL_ONLY mode - System.setProperty("SEAWEEDFS_DEBUG_MODE", "LOCAL_ONLY"); - spark.sparkContext().hadoopConfiguration().set("fs.seaweedfs.debug.dir", localOnlyDir); - - // Write to LOCAL_ONLY - String localOnlyPath = getTestPath("employees_read_test"); - System.out.println("Writing to LOCAL_ONLY: " + localOnlyPath); - df.write().mode(SaveMode.Overwrite).parquet(localOnlyPath); - System.out.println("āœ… LOCAL_ONLY write completed"); - - // Find the parquet files - File rawLocalParquetDir = new File(rawLocalDir + "/employees"); - File[] rawLocalFiles = rawLocalParquetDir.listFiles((dir, name) -> name.endsWith(".parquet")); - assertNotNull("RawLocalFS should have written files", rawLocalFiles); - assertTrue("RawLocalFS should have at least one parquet file", rawLocalFiles.length > 0); - rawLocalParquetFile = rawLocalFiles[0].getAbsolutePath(); - - File[] localOnlyFiles = new File(localOnlyDir).listFiles((dir, name) -> name.endsWith(".parquet.debug")); - assertNotNull("LOCAL_ONLY should have written files", localOnlyFiles); - assertTrue("LOCAL_ONLY should have at least one parquet file", localOnlyFiles.length > 0); - localOnlyParquetFile = localOnlyFiles[0].getAbsolutePath(); - - System.out.println("RawLocalFS file: " + rawLocalParquetFile); - System.out.println("LOCAL_ONLY file: " + localOnlyParquetFile); - - System.out.println("\n=== PHASE 2: Compare Low-Level Read Operations ==="); - - // Open both files for reading - FSDataInputStream rawStream = rawLocalFs.open(new Path(rawLocalParquetFile)); - - // For LOCAL_ONLY, we need to read the .debug file directly using RawLocalFS - // because it's just a local file - FSDataInputStream localOnlyStream = rawLocalFs.open(new Path(localOnlyParquetFile)); - - try { - // Test 1: Read file length - System.out.println("\n--- Test 1: File Length ---"); - long rawLength = rawLocalFs.getFileStatus(new Path(rawLocalParquetFile)).getLen(); - long localOnlyLength = rawLocalFs.getFileStatus(new Path(localOnlyParquetFile)).getLen(); - System.out.println("RawLocalFS length: " + rawLength); - System.out.println("LOCAL_ONLY length: " + localOnlyLength); - if (rawLength == localOnlyLength) { - System.out.println("āœ… Lengths match!"); - } else { - System.out.println("āŒ Length mismatch: " + (rawLength - localOnlyLength) + " bytes"); - } - assertEquals("File lengths should match", rawLength, localOnlyLength); - - // Test 2: Read first 100 bytes - System.out.println("\n--- Test 2: Read First 100 Bytes ---"); - byte[] rawBuffer1 = new byte[100]; - byte[] localOnlyBuffer1 = new byte[100]; - rawStream.readFully(0, rawBuffer1); - localOnlyStream.readFully(0, localOnlyBuffer1); - boolean firstBytesMatch = Arrays.equals(rawBuffer1, localOnlyBuffer1); - System.out.println("First 100 bytes match: " + (firstBytesMatch ? "āœ…" : "āŒ")); - if (!firstBytesMatch) { - System.out.println("First difference at byte: " + findFirstDifference(rawBuffer1, localOnlyBuffer1)); - } - assertTrue("First 100 bytes should match", firstBytesMatch); - - // Test 3: Read last 100 bytes (Parquet footer) - System.out.println("\n--- Test 3: Read Last 100 Bytes (Parquet Footer) ---"); - byte[] rawBuffer2 = new byte[100]; - byte[] localOnlyBuffer2 = new byte[100]; - rawStream.readFully(rawLength - 100, rawBuffer2); - localOnlyStream.readFully(localOnlyLength - 100, localOnlyBuffer2); - boolean lastBytesMatch = Arrays.equals(rawBuffer2, localOnlyBuffer2); - System.out.println("Last 100 bytes match: " + (lastBytesMatch ? "āœ…" : "āŒ")); - if (!lastBytesMatch) { - System.out.println("First difference at byte: " + findFirstDifference(rawBuffer2, localOnlyBuffer2)); - System.out.println("RawLocalFS last 20 bytes:"); - printHex(rawBuffer2, 80, 100); - System.out.println("LOCAL_ONLY last 20 bytes:"); - printHex(localOnlyBuffer2, 80, 100); - } - assertTrue("Last 100 bytes should match", lastBytesMatch); - - // Test 4: Read entire file - System.out.println("\n--- Test 4: Read Entire File ---"); - byte[] rawFull = new byte[(int) rawLength]; - byte[] localOnlyFull = new byte[(int) localOnlyLength]; - rawStream.readFully(0, rawFull); - localOnlyStream.readFully(0, localOnlyFull); - boolean fullMatch = Arrays.equals(rawFull, localOnlyFull); - System.out.println("Full file match: " + (fullMatch ? "āœ…" : "āŒ")); - if (!fullMatch) { - int firstDiff = findFirstDifference(rawFull, localOnlyFull); - System.out.println("First difference at byte: " + firstDiff); - } - assertTrue("Full file should match", fullMatch); - - // Test 5: Sequential reads - System.out.println("\n--- Test 5: Sequential Reads (10 bytes at a time) ---"); - rawStream.seek(0); - localOnlyStream.seek(0); - boolean sequentialMatch = true; - int chunkSize = 10; - int chunksRead = 0; - while (rawStream.getPos() < rawLength && localOnlyStream.getPos() < localOnlyLength) { - byte[] rawChunk = new byte[chunkSize]; - byte[] localOnlyChunk = new byte[chunkSize]; - int rawRead = rawStream.read(rawChunk); - int localOnlyRead = localOnlyStream.read(localOnlyChunk); - - if (rawRead != localOnlyRead) { - System.out.println("āŒ Read size mismatch at chunk " + chunksRead + ": raw=" + rawRead + " localOnly=" + localOnlyRead); - sequentialMatch = false; - break; - } - - if (!Arrays.equals(rawChunk, localOnlyChunk)) { - System.out.println("āŒ Content mismatch at chunk " + chunksRead + " (byte offset " + (chunksRead * chunkSize) + ")"); - sequentialMatch = false; - break; - } - chunksRead++; - } - System.out.println("Sequential reads (" + chunksRead + " chunks): " + (sequentialMatch ? "āœ…" : "āŒ")); - assertTrue("Sequential reads should match", sequentialMatch); - - } finally { - rawStream.close(); - localOnlyStream.close(); - } - - System.out.println("\n=== PHASE 3: Compare Spark Read Operations ==="); - - // Test 6: Spark read from RawLocalFS - System.out.println("\n--- Test 6: Spark Read from RawLocalFS ---"); - try { - Dataset rawDf = spark.read().parquet(rawLocalPath); - long rawCount = rawDf.count(); - System.out.println("āœ… RawLocalFS Spark read successful! Row count: " + rawCount); - assertEquals("Should have 4 employees", 4, rawCount); - } catch (Exception e) { - System.err.println("āŒ RawLocalFS Spark read FAILED: " + e.getMessage()); - e.printStackTrace(); - fail("RawLocalFS Spark read should not fail!"); - } - - // Test 7: Spark read from LOCAL_ONLY - System.out.println("\n--- Test 7: Spark Read from LOCAL_ONLY ---"); - try { - Dataset localOnlyDf = spark.read().parquet(localOnlyPath); - long localOnlyCount = localOnlyDf.count(); - System.out.println("āœ… LOCAL_ONLY Spark read successful! Row count: " + localOnlyCount); - assertEquals("Should have 4 employees", 4, localOnlyCount); - } catch (Exception e) { - System.err.println("āŒ LOCAL_ONLY Spark read FAILED: " + e.getMessage()); - if (e.getMessage() != null && e.getMessage().contains("78 bytes")) { - System.err.println("šŸ” FOUND IT! 78-byte error occurs during Spark read!"); - System.err.println("But low-level reads worked, so the issue is in Spark's Parquet reader!"); - } - e.printStackTrace(); - // Don't fail - we want to see the full output - } - - // Test 8: SQL query on RawLocalFS - System.out.println("\n--- Test 8: SQL Query on RawLocalFS ---"); - try { - Dataset rawDf = spark.read().parquet(rawLocalPath); - rawDf.createOrReplaceTempView("employees_raw"); - Dataset rawResult = spark.sql("SELECT name, salary FROM employees_raw WHERE department = 'Engineering'"); - long rawResultCount = rawResult.count(); - System.out.println("āœ… RawLocalFS SQL query successful! Row count: " + rawResultCount); - assertEquals("Should have 2 engineering employees", 2, rawResultCount); - } catch (Exception e) { - System.err.println("āŒ RawLocalFS SQL query FAILED: " + e.getMessage()); - e.printStackTrace(); - fail("RawLocalFS SQL query should not fail!"); - } - - // Test 9: SQL query on LOCAL_ONLY - System.out.println("\n--- Test 9: SQL Query on LOCAL_ONLY ---"); - try { - Dataset localOnlyDf = spark.read().parquet(localOnlyPath); - localOnlyDf.createOrReplaceTempView("employees_localonly"); - Dataset localOnlyResult = spark.sql("SELECT name, salary FROM employees_localonly WHERE department = 'Engineering'"); - long localOnlyResultCount = localOnlyResult.count(); - System.out.println("āœ… LOCAL_ONLY SQL query successful! Row count: " + localOnlyResultCount); - assertEquals("Should have 2 engineering employees", 2, localOnlyResultCount); - } catch (Exception e) { - System.err.println("āŒ LOCAL_ONLY SQL query FAILED: " + e.getMessage()); - if (e.getMessage() != null && e.getMessage().contains("78 bytes")) { - System.err.println("šŸ” 78-byte error in SQL query!"); - } - e.printStackTrace(); - // Don't fail - we want to see the full output - } - - System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); - System.out.println("ā•‘ SHADOW READ COMPARISON COMPLETE ā•‘"); - System.out.println("ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•"); - } - - private int findFirstDifference(byte[] a, byte[] b) { - int minLen = Math.min(a.length, b.length); - for (int i = 0; i < minLen; i++) { - if (a[i] != b[i]) { - return i; - } - } - return minLen; - } - - private void printHex(byte[] data, int start, int end) { - System.out.print(" "); - for (int i = start; i < end && i < data.length; i++) { - System.out.printf("%02X ", data[i]); - } - System.out.println(); - } - - // Employee class for Spark DataFrame - public static class Employee implements java.io.Serializable { - private int id; - private String name; - private String department; - private int salary; - - public Employee() {} // Required for Spark - - public Employee(int id, String name, String department, int salary) { - this.id = id; - this.name = name; - this.department = department; - this.salary = salary; - } - - // Getters and Setters (required for Spark) - public int getId() { return id; } - public void setId(int id) { this.id = id; } - public String getName() { return name; } - public void setName(String name) { this.name = name; } - public String getDepartment() { return department; } - public void setDepartment(String department) { this.department = department; } - public int getSalary() { return salary; } - public void setSalary(int salary) { this.salary = salary; } - } -} -