Browse Source

tests not needed now

pull/7526/head
chrislu 1 week ago
parent
commit
32118a82bc
  1. 72
      test/java/spark/src/test/java/seaweed/spark/DirectFileReadTest.java
  2. 299
      test/java/spark/src/test/java/seaweed/spark/ParquetMemoryComparisonTest.java
  3. 214
      test/java/spark/src/test/java/seaweed/spark/ShadowVsLocalOnlyComparisonTest.java
  4. 264
      test/java/spark/src/test/java/seaweed/spark/SparkSQLReadDifferenceTest.java
  5. 306
      test/java/spark/src/test/java/seaweed/spark/SparkShadowComparisonTest.java
  6. 343
      test/java/spark/src/test/java/seaweed/spark/SparkShadowReadComparisonTest.java

72
test/java/spark/src/test/java/seaweed/spark/DirectFileReadTest.java

@ -1,72 +0,0 @@
package seaweed.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* Test reading LOCAL_ONLY files directly via file:// protocol
* to verify the files themselves are valid.
*/
public class DirectFileReadTest extends SparkTestBase {
@Test
public void testReadLocalOnlyFileDirectly() {
skipIfTestsDisabled();
// First write using LOCAL_ONLY mode (through SeaweedFS path)
java.util.List<SparkSQLTest.Employee> employees = java.util.Arrays.asList(
new SparkSQLTest.Employee(1, "Alice", "Engineering", 100000),
new SparkSQLTest.Employee(2, "Bob", "Sales", 80000),
new SparkSQLTest.Employee(3, "Charlie", "Engineering", 120000),
new SparkSQLTest.Employee(4, "David", "Sales", 75000));
Dataset<Row> df = spark.createDataFrame(employees, SparkSQLTest.Employee.class);
String tablePath = getTestPath("employees_direct_test");
df.write().mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(tablePath);
System.out.println("โœ… Write completed to: " + tablePath);
// Now try to read the LOCAL_ONLY .debug file directly using file:// protocol
// This bypasses LocalOnlyInputStream and uses native file system
String debugFilePath = "file:///workspace/target/debug-local/";
try {
// List files in debug directory
java.io.File debugDir = new java.io.File("/workspace/target/debug-local/");
java.io.File[] files = debugDir.listFiles((dir, name) -> name.endsWith(".parquet.debug"));
if (files != null && files.length > 0) {
String localFile = "file://" + files[0].getAbsolutePath();
System.out.println("๐Ÿ“ Found LOCAL_ONLY file: " + localFile);
System.out.println("๐Ÿ“ File size: " + files[0].length() + " bytes");
// Try to read it directly
Dataset<Row> directRead = spark.read().parquet(localFile);
long count = directRead.count();
System.out.println("โœ… Direct read successful! Row count: " + count);
// Try SQL query on it
directRead.createOrReplaceTempView("employees_direct");
Dataset<Row> filtered = spark.sql(
"SELECT name, salary FROM employees_direct WHERE department = 'Engineering'");
long engineeringCount = filtered.count();
System.out.println("โœ… SQL query successful! Engineering employees: " + engineeringCount);
assertEquals("Should have 2 engineering employees", 2, engineeringCount);
} else {
fail("No .debug files found in /workspace/target/debug-local/");
}
} catch (Exception e) {
System.err.println("โŒ Direct read failed: " + e.getMessage());
e.printStackTrace();
throw new RuntimeException("Direct file read failed", e);
}
}
}

299
test/java/spark/src/test/java/seaweed/spark/ParquetMemoryComparisonTest.java

@ -1,299 +0,0 @@
package seaweed.spark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.junit.Assert.*;
/**
* Test to compare in-memory Parquet file with SeaweedFS-stored Parquet file
* to identify what metadata differences cause the 78-byte EOF error.
*/
public class ParquetMemoryComparisonTest extends SparkTestBase {
private static final String SCHEMA_STRING =
"message Employee { " +
" required int32 id; " +
" required binary name (UTF8); " +
" required int32 age; " +
"}";
private static final MessageType SCHEMA = MessageTypeParser.parseMessageType(SCHEMA_STRING);
private FileSystem localFs;
private FileSystem seaweedFs;
@Before
public void setUp() throws Exception {
if (!TESTS_ENABLED) {
return;
}
Configuration conf = new Configuration();
// Local filesystem
localFs = FileSystem.getLocal(conf);
// SeaweedFS
conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST);
conf.set("fs.seaweed.filer.port", SEAWEEDFS_PORT);
seaweedFs = FileSystem.get(java.net.URI.create("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT), conf);
System.out.println("=== Test Setup Complete ===");
System.out.println("Local FS: " + localFs.getClass().getName());
System.out.println("SeaweedFS: " + seaweedFs.getClass().getName());
}
@After
public void tearDown() throws Exception {
if (localFs != null) {
localFs.close();
}
if (seaweedFs != null) {
seaweedFs.close();
}
}
@Test
public void testCompareMemoryVsSeaweedFSParquet() throws Exception {
if (!TESTS_ENABLED) {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
System.out.println("\n=== PARQUET MEMORY vs SEAWEEDFS COMPARISON TEST ===\n");
// 1. Write identical Parquet file to local temp and SeaweedFS
Path localPath = new Path("/tmp/test-local-" + System.currentTimeMillis() + ".parquet");
Path seaweedPath = new Path("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT +
"/test-spark/comparison-test.parquet");
System.out.println("Writing to local: " + localPath);
System.out.println("Writing to SeaweedFS: " + seaweedPath);
// Write same data to both locations
writeTestParquetFile(localFs, localPath);
writeTestParquetFile(seaweedFs, seaweedPath);
System.out.println("\n=== Files Written Successfully ===\n");
// 2. Read raw bytes from both files
byte[] localBytes = readAllBytes(localFs, localPath);
byte[] seaweedBytes = readAllBytes(seaweedFs, seaweedPath);
System.out.println("Local file size: " + localBytes.length + " bytes");
System.out.println("SeaweedFS file size: " + seaweedBytes.length + " bytes");
// 3. Compare byte-by-byte
if (localBytes.length != seaweedBytes.length) {
System.out.println("\nโŒ SIZE MISMATCH!");
System.out.println("Difference: " + Math.abs(localBytes.length - seaweedBytes.length) + " bytes");
} else {
System.out.println("\nโœ… Sizes match!");
}
// Find first difference
int firstDiff = -1;
int minLen = Math.min(localBytes.length, seaweedBytes.length);
for (int i = 0; i < minLen; i++) {
if (localBytes[i] != seaweedBytes[i]) {
firstDiff = i;
break;
}
}
if (firstDiff >= 0) {
System.out.println("\nโŒ CONTENT DIFFERS at byte offset: " + firstDiff);
System.out.println("Context (20 bytes before and after):");
printByteContext(localBytes, seaweedBytes, firstDiff, 20);
} else if (localBytes.length == seaweedBytes.length) {
System.out.println("\nโœ… Files are IDENTICAL!");
}
// 4. Parse Parquet metadata from both
System.out.println("\n=== Parquet Metadata Comparison ===\n");
ParquetMetadata localMeta = readParquetMetadata(localFs, localPath);
ParquetMetadata seaweedMeta = readParquetMetadata(seaweedFs, seaweedPath);
System.out.println("Local metadata:");
printParquetMetadata(localMeta);
System.out.println("\nSeaweedFS metadata:");
printParquetMetadata(seaweedMeta);
// 5. Try reading both files with Parquet reader
System.out.println("\n=== Reading Files with ParquetFileReader ===\n");
try {
System.out.println("Reading local file...");
int localRows = countParquetRows(localFs, localPath);
System.out.println("โœ… Local file: " + localRows + " rows read successfully");
} catch (Exception e) {
System.out.println("โŒ Local file read failed: " + e.getMessage());
e.printStackTrace();
}
try {
System.out.println("\nReading SeaweedFS file...");
int seaweedRows = countParquetRows(seaweedFs, seaweedPath);
System.out.println("โœ… SeaweedFS file: " + seaweedRows + " rows read successfully");
} catch (Exception e) {
System.out.println("โŒ SeaweedFS file read failed: " + e.getMessage());
System.out.println("Error type: " + e.getClass().getName());
if (e.getMessage() != null && e.getMessage().contains("bytes left")) {
System.out.println("๐ŸŽฏ THIS IS THE 78-BYTE EOF ERROR!");
}
e.printStackTrace();
}
// Cleanup
localFs.delete(localPath, false);
seaweedFs.delete(seaweedPath, false);
System.out.println("\n=== Test Complete ===\n");
}
private void writeTestParquetFile(FileSystem fs, Path path) throws IOException {
Configuration conf = fs.getConf();
GroupWriteSupport.setSchema(SCHEMA, conf);
try (ParquetWriter<Group> writer = org.apache.parquet.hadoop.example.ExampleParquetWriter.builder(path)
.withConf(conf)
.withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE)
.build()) {
SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA);
// Write 3 rows (same as Spark test)
Group group1 = factory.newGroup()
.append("id", 1)
.append("name", "Alice")
.append("age", 30);
writer.write(group1);
Group group2 = factory.newGroup()
.append("id", 2)
.append("name", "Bob")
.append("age", 25);
writer.write(group2);
Group group3 = factory.newGroup()
.append("id", 3)
.append("name", "Charlie")
.append("age", 35);
writer.write(group3);
}
}
private byte[] readAllBytes(FileSystem fs, Path path) throws IOException {
try (FSDataInputStream in = fs.open(path)) {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
byte[] chunk = new byte[8192];
int bytesRead;
while ((bytesRead = in.read(chunk)) != -1) {
buffer.write(chunk, 0, bytesRead);
}
return buffer.toByteArray();
}
}
private ParquetMetadata readParquetMetadata(FileSystem fs, Path path) throws IOException {
Configuration conf = fs.getConf();
try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
return reader.getFooter();
}
}
private void printParquetMetadata(ParquetMetadata meta) {
System.out.println(" Blocks: " + meta.getBlocks().size());
meta.getBlocks().forEach(block -> {
System.out.println(" Block rows: " + block.getRowCount());
System.out.println(" Block total size: " + block.getTotalByteSize());
System.out.println(" Block compressed size: " + block.getCompressedSize());
System.out.println(" Columns: " + block.getColumns().size());
block.getColumns().forEach(col -> {
System.out.println(" Column: " + col.getPath());
System.out.println(" Starting pos: " + col.getStartingPos());
System.out.println(" Total size: " + col.getTotalSize());
System.out.println(" Total uncompressed: " + col.getTotalUncompressedSize());
});
});
}
private int countParquetRows(FileSystem fs, Path path) throws IOException {
Configuration conf = fs.getConf();
int rowCount = 0;
try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
org.apache.parquet.column.page.PageReadStore pages;
while ((pages = reader.readNextRowGroup()) != null) {
rowCount += pages.getRowCount();
}
}
return rowCount;
}
private void printByteContext(byte[] local, byte[] seaweed, int offset, int context) {
int start = Math.max(0, offset - context);
int endLocal = Math.min(local.length, offset + context);
int endSeaweed = Math.min(seaweed.length, offset + context);
System.out.println("\nLocal bytes [" + start + " to " + endLocal + "]:");
printHexDump(local, start, endLocal, offset);
System.out.println("\nSeaweedFS bytes [" + start + " to " + endSeaweed + "]:");
printHexDump(seaweed, start, endSeaweed, offset);
}
private void printHexDump(byte[] bytes, int start, int end, int highlight) {
StringBuilder hex = new StringBuilder();
StringBuilder ascii = new StringBuilder();
for (int i = start; i < end; i++) {
if (i > start && i % 16 == 0) {
System.out.printf("%04x: %-48s %s\n", i - 16, hex.toString(), ascii.toString());
hex.setLength(0);
ascii.setLength(0);
}
byte b = bytes[i];
String hexStr = String.format("%02x ", b & 0xFF);
if (i == highlight) {
hexStr = "[" + hexStr.trim() + "] ";
}
hex.append(hexStr);
char c = (b >= 32 && b < 127) ? (char) b : '.';
if (i == highlight) {
ascii.append('[').append(c).append(']');
} else {
ascii.append(c);
}
}
if (hex.length() > 0) {
System.out.printf("%04x: %-48s %s\n", (end / 16) * 16, hex.toString(), ascii.toString());
}
}
}

214
test/java/spark/src/test/java/seaweed/spark/ShadowVsLocalOnlyComparisonTest.java

@ -1,214 +0,0 @@
package seaweed.spark;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
/**
* CRITICAL TEST: Compare shadow file (reference) with LOCAL_ONLY mode output.
*
* This test:
* 1. Writes with SHADOW mode enabled โ†’ produces reference file
* 2. Writes with LOCAL_ONLY mode โ†’ produces local-only file
* 3. Compares the two files byte-by-byte
* 4. Attempts to read both with Spark SQL
*/
public class ShadowVsLocalOnlyComparisonTest extends SparkTestBase {
private String shadowDir;
private String localOnlyDir;
@Before
public void setUp() throws Exception {
super.setUpSpark();
shadowDir = "/workspace/target/shadow-comparison";
localOnlyDir = "/workspace/target/local-only-comparison";
// Clean up previous runs
deleteDirectory(new File(shadowDir));
deleteDirectory(new File(localOnlyDir));
new File(shadowDir).mkdirs();
new File(localOnlyDir).mkdirs();
}
@After
public void tearDown() throws Exception {
super.tearDownSpark();
}
@Test
public void testShadowVsLocalOnlyComparison() throws IOException {
skipIfTestsDisabled();
System.out.println("\nโ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—");
System.out.println("โ•‘ CRITICAL: Shadow vs LOCAL_ONLY Comparison โ•‘");
System.out.println("โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•");
List<Employee> employees = Arrays.asList(
new Employee(1, "Alice", "Engineering", 100000),
new Employee(2, "Bob", "Sales", 80000),
new Employee(3, "Charlie", "Engineering", 120000),
new Employee(4, "David", "Sales", 75000));
Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
// PHASE 1: Write with SHADOW mode
System.out.println("\n=== PHASE 1: Write with SHADOW mode (creates reference) ===");
System.setProperty("SEAWEEDFS_SHADOW_MODE", "true");
System.setProperty("SEAWEEDFS_DEBUG_MODE", "SEAWEED_ONLY");
spark.conf().set("fs.seaweedfs.shadow.dir", shadowDir);
String shadowOutputPath = "seaweedfs://seaweedfs-filer:8888/test-spark/shadow-test/employees";
df.write().mode(SaveMode.Overwrite).parquet(shadowOutputPath);
File[] shadowFiles = new File(shadowDir).listFiles((dir, name) -> name.endsWith(".shadow"));
assertNotNull("Shadow files should exist", shadowFiles);
assertTrue("Should have at least one shadow file", shadowFiles.length > 0);
File shadowFile = shadowFiles[0];
System.out.println("Shadow file: " + shadowFile.getName() + " (" + shadowFile.length() + " bytes)");
// PHASE 2: Write with LOCAL_ONLY mode
System.out.println("\n=== PHASE 2: Write with LOCAL_ONLY mode ===");
System.setProperty("SEAWEEDFS_SHADOW_MODE", "false");
System.setProperty("SEAWEEDFS_DEBUG_MODE", "LOCAL_ONLY");
spark.conf().set("fs.seaweedfs.debug.dir", localOnlyDir);
String localOnlyOutputPath = "seaweedfs://seaweedfs-filer:8888/test-spark/local-only-test/employees";
df.write().mode(SaveMode.Overwrite).parquet(localOnlyOutputPath);
File[] localOnlyFiles = new File(localOnlyDir).listFiles((dir, name) -> name.endsWith(".debug"));
assertNotNull("LOCAL_ONLY files should exist", localOnlyFiles);
assertTrue("Should have at least one LOCAL_ONLY file", localOnlyFiles.length > 0);
File localOnlyFile = localOnlyFiles[0];
System.out.println("LOCAL_ONLY file: " + localOnlyFile.getName() + " (" + localOnlyFile.length() + " bytes)");
// PHASE 3: Compare files byte-by-byte
System.out.println("\n=== PHASE 3: Compare files byte-by-byte ===");
assertEquals("File sizes should match", shadowFile.length(), localOnlyFile.length());
byte[] shadowBytes = Files.readAllBytes(shadowFile.toPath());
byte[] localOnlyBytes = Files.readAllBytes(localOnlyFile.toPath());
System.out.println("Comparing " + shadowBytes.length + " bytes...");
// Compare byte-by-byte and report first difference
boolean identical = true;
for (int i = 0; i < shadowBytes.length; i++) {
if (shadowBytes[i] != localOnlyBytes[i]) {
identical = false;
System.err.println("โŒ FIRST DIFFERENCE at byte " + i + ":");
System.err.println(" Shadow: 0x" + String.format("%02x", shadowBytes[i] & 0xFF));
System.err.println(" LOCAL_ONLY: 0x" + String.format("%02x", localOnlyBytes[i] & 0xFF));
// Show context
int contextStart = Math.max(0, i - 10);
int contextEnd = Math.min(shadowBytes.length, i + 10);
System.err.println(" Context (shadow):");
for (int j = contextStart; j < contextEnd; j++) {
System.err.print(String.format("%02x ", shadowBytes[j] & 0xFF));
}
System.err.println();
System.err.println(" Context (local_only):");
for (int j = contextStart; j < contextEnd; j++) {
System.err.print(String.format("%02x ", localOnlyBytes[j] & 0xFF));
}
System.err.println();
break;
}
}
if (identical) {
System.out.println("โœ… Files are IDENTICAL!");
} else {
fail("Files are NOT identical");
}
// PHASE 4: Try reading shadow file with Spark
System.out.println("\n=== PHASE 4: Try reading shadow file with Spark ===");
try {
// Copy shadow file to a location Spark can read
String testPath = "file://" + shadowDir + "/test.parquet";
Files.copy(shadowFile.toPath(), new File(shadowDir + "/test.parquet").toPath());
Dataset<Row> shadowDf = spark.read().parquet(testPath);
shadowDf.createOrReplaceTempView("shadow_test");
Dataset<Row> shadowResult = spark.sql("SELECT * FROM shadow_test WHERE department = 'Engineering'");
System.out.println("โœ… Shadow file SQL query: " + shadowResult.count() + " rows");
} catch (Exception e) {
System.err.println("โŒ Shadow file SQL query FAILED: " + e.getMessage());
e.printStackTrace();
}
// PHASE 5: Try reading LOCAL_ONLY file with Spark
System.out.println("\n=== PHASE 5: Try reading LOCAL_ONLY file with Spark ===");
try {
Dataset<Row> localOnlyDf = spark.read().parquet(localOnlyOutputPath);
localOnlyDf.createOrReplaceTempView("local_only_test");
Dataset<Row> localOnlyResult = spark.sql("SELECT * FROM local_only_test WHERE department = 'Engineering'");
System.out.println("โœ… LOCAL_ONLY SQL query: " + localOnlyResult.count() + " rows");
} catch (Exception e) {
System.err.println("โŒ LOCAL_ONLY SQL query FAILED: " + e.getMessage());
assertTrue("Expected 78-byte EOF error", e.getMessage().contains("78 bytes left"));
}
System.out.println("\nโ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—");
System.out.println("โ•‘ Comparison complete. See logs for details. โ•‘");
System.out.println("โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•");
}
private void deleteDirectory(File dir) {
if (dir.exists()) {
File[] files = dir.listFiles();
if (files != null) {
for (File file : files) {
if (file.isDirectory()) {
deleteDirectory(file);
} else {
file.delete();
}
}
}
dir.delete();
}
}
public static class Employee implements java.io.Serializable {
private int id;
private String name;
private String department;
private int salary;
public Employee() {}
public Employee(int id, String name, String department, int salary) {
this.id = id;
this.name = name;
this.department = department;
this.salary = salary;
}
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDepartment() { return department; }
public void setDepartment(String department) { this.department = department; }
public int getSalary() { return salary; }
public void setSalary(int salary) { this.salary = salary; }
}
}

264
test/java/spark/src/test/java/seaweed/spark/SparkSQLReadDifferenceTest.java

@ -1,264 +0,0 @@
package seaweed.spark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
/**
* CRITICAL DIAGNOSTIC TEST: Compare the exact sequence of FileSystem operations
* between RawLocalFS (works) and LOCAL_ONLY (fails) during SQL query execution.
*
* This test will help us understand what's different about how Spark SQL
* interacts with SeaweedFileSystem vs RawLocalFileSystem.
*/
public class SparkSQLReadDifferenceTest extends SparkTestBase {
private String rawLocalDir;
private String localOnlyDir;
private FileSystem rawLocalFs;
@Before
public void setUp() throws Exception {
// Enable detailed logging
System.setProperty("seaweedfs.detailed.logging", "true");
super.setUpSpark();
// Set up RawLocalFileSystem directory
rawLocalDir = "/tmp/spark-sql-diff-rawlocal-" + System.currentTimeMillis();
new File(rawLocalDir).mkdirs();
Configuration conf = spark.sparkContext().hadoopConfiguration();
rawLocalFs = new RawLocalFileSystem();
rawLocalFs.initialize(new URI("file:///"), conf);
rawLocalFs.delete(new Path(rawLocalDir), true);
rawLocalFs.mkdirs(new Path(rawLocalDir));
// Set up LOCAL_ONLY directory
localOnlyDir = "/workspace/target/debug-sql-diff";
new File(localOnlyDir).mkdirs();
for (File f : new File(localOnlyDir).listFiles()) {
f.delete();
}
System.out.println("\nโ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—");
System.out.println("โ•‘ SQL READ DIFFERENCE TEST: RawLocalFS vs LOCAL_ONLY โ•‘");
System.out.println("โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•");
}
@After
public void tearDown() throws Exception {
if (rawLocalFs != null) {
rawLocalFs.delete(new Path(rawLocalDir), true);
rawLocalFs.close();
}
super.tearDownSpark();
}
@Test
public void testSQLReadDifference() throws IOException {
// Create test data
List<Employee> employees = Arrays.asList(
new Employee(1, "Alice", "Engineering", 100000),
new Employee(2, "Bob", "Sales", 80000),
new Employee(3, "Charlie", "Engineering", 120000),
new Employee(4, "David", "Sales", 75000));
Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
// ========================================================================
// PART 1: RawLocalFS - SQL Query (WORKS)
// ========================================================================
System.out.println("\n" + "=".repeat(70));
System.out.println("PART 1: RawLocalFS - SQL Query (Expected to WORK)");
System.out.println("=".repeat(70));
String rawLocalPath = "file://" + rawLocalDir + "/employees";
System.out.println("Writing to: " + rawLocalPath);
df.write().mode(SaveMode.Overwrite).parquet(rawLocalPath);
System.out.println("โœ… Write completed\n");
System.out.println("--- Executing SQL Query on RawLocalFS ---");
try {
Dataset<Row> rawDf = spark.read().parquet(rawLocalPath);
System.out.println("โœ… Initial read successful");
rawDf.createOrReplaceTempView("employees_raw");
System.out.println("โœ… Temp view created");
System.out.println("\nExecuting: SELECT name, salary FROM employees_raw WHERE department = 'Engineering'");
Dataset<Row> rawResult = spark.sql("SELECT name, salary FROM employees_raw WHERE department = 'Engineering'");
System.out.println("Triggering execution with count()...");
long rawCount = rawResult.count();
System.out.println("โœ… RawLocalFS SQL query SUCCESSFUL! Row count: " + rawCount);
assertEquals("Should have 2 engineering employees", 2, rawCount);
System.out.println("\nโœ…โœ…โœ… RawLocalFS: ALL OPERATIONS SUCCESSFUL โœ…โœ…โœ…\n");
} catch (Exception e) {
System.err.println("โŒ RawLocalFS SQL query FAILED (unexpected!): " + e.getMessage());
e.printStackTrace();
fail("RawLocalFS should not fail!");
}
// ========================================================================
// PART 2: LOCAL_ONLY - SQL Query (FAILS)
// ========================================================================
System.out.println("\n" + "=".repeat(70));
System.out.println("PART 2: LOCAL_ONLY - SQL Query (Expected to FAIL with 78-byte error)");
System.out.println("=".repeat(70));
// Enable LOCAL_ONLY mode
System.setProperty("SEAWEEDFS_DEBUG_MODE", "LOCAL_ONLY");
spark.sparkContext().hadoopConfiguration().set("fs.seaweedfs.debug.dir", localOnlyDir);
String localOnlyPath = getTestPath("employees_localonly");
System.out.println("Writing to: " + localOnlyPath);
df.write().mode(SaveMode.Overwrite).parquet(localOnlyPath);
System.out.println("โœ… Write completed\n");
System.out.println("--- Executing SQL Query on LOCAL_ONLY ---");
try {
Dataset<Row> localDf = spark.read().parquet(localOnlyPath);
System.out.println("โœ… Initial read successful");
localDf.createOrReplaceTempView("employees_local");
System.out.println("โœ… Temp view created");
System.out.println("\nExecuting: SELECT name, salary FROM employees_local WHERE department = 'Engineering'");
Dataset<Row> localResult = spark.sql("SELECT name, salary FROM employees_local WHERE department = 'Engineering'");
System.out.println("Triggering execution with count()...");
long localCount = localResult.count();
System.out.println("โœ… LOCAL_ONLY SQL query SUCCESSFUL! Row count: " + localCount);
assertEquals("Should have 2 engineering employees", 2, localCount);
System.out.println("\nโœ…โœ…โœ… LOCAL_ONLY: ALL OPERATIONS SUCCESSFUL โœ…โœ…โœ…\n");
} catch (Exception e) {
System.err.println("\nโŒโŒโŒ LOCAL_ONLY SQL query FAILED โŒโŒโŒ");
System.err.println("Error: " + e.getMessage());
if (e.getMessage() != null && e.getMessage().contains("78 bytes")) {
System.err.println("\n๐Ÿ” CONFIRMED: 78-byte EOF error!");
System.err.println("This error occurs during SQL query execution on LOCAL_ONLY mode.");
}
System.err.println("\nFull stack trace:");
e.printStackTrace();
System.err.println("\n" + "=".repeat(70));
System.err.println("ANALYSIS: Comparing RawLocalFS (works) vs LOCAL_ONLY (fails)");
System.err.println("=".repeat(70));
System.err.println();
System.err.println("Both tests:");
System.err.println(" - Write identical data (same DataFrame)");
System.err.println(" - Execute identical SQL query");
System.err.println(" - Use identical Spark configuration");
System.err.println();
System.err.println("Key differences:");
System.err.println(" 1. Path scheme:");
System.err.println(" - RawLocalFS: file:///tmp/...");
System.err.println(" - LOCAL_ONLY: seaweedfs://seaweedfs-filer:8888/...");
System.err.println();
System.err.println(" 2. FileSystem implementation:");
System.err.println(" - RawLocalFS: Hadoop's native RawLocalFileSystem");
System.err.println(" - LOCAL_ONLY: SeaweedFileSystem (but writes to local disk)");
System.err.println();
System.err.println(" 3. InputStream type:");
System.err.println(" - RawLocalFS: LocalFSFileInputStream");
System.err.println(" - LOCAL_ONLY: SeaweedHadoopInputStream -> LocalOnlyInputStream");
System.err.println();
System.err.println("The 78-byte error suggests that:");
System.err.println(" - Spark SQL expects to read 78 more bytes");
System.err.println(" - But the InputStream reports EOF");
System.err.println(" - This happens even though the file is correct (1260 bytes)");
System.err.println();
System.err.println("Possible causes:");
System.err.println(" 1. getFileStatus() returns wrong file size");
System.err.println(" 2. InputStream.available() returns wrong value");
System.err.println(" 3. Seek operations don't work correctly");
System.err.println(" 4. Multiple InputStreams interfere with each other");
System.err.println(" 5. Metadata is cached incorrectly between operations");
System.err.println();
// Don't fail the test - we want to see the full output
// fail("LOCAL_ONLY failed as expected");
}
// ========================================================================
// PART 3: Compare Files
// ========================================================================
System.out.println("\n" + "=".repeat(70));
System.out.println("PART 3: File Comparison");
System.out.println("=".repeat(70));
File rawLocalParquetDir = new File(rawLocalDir + "/employees");
File[] rawLocalFiles = rawLocalParquetDir.listFiles((dir, name) -> name.endsWith(".parquet"));
File[] localOnlyFiles = new File(localOnlyDir).listFiles((dir, name) -> name.endsWith(".parquet.debug"));
if (rawLocalFiles != null && rawLocalFiles.length > 0 &&
localOnlyFiles != null && localOnlyFiles.length > 0) {
File rawFile = rawLocalFiles[0];
File localFile = localOnlyFiles[0];
System.out.println("\nRawLocalFS file: " + rawFile.getName() + " (" + rawFile.length() + " bytes)");
System.out.println("LOCAL_ONLY file: " + localFile.getName() + " (" + localFile.length() + " bytes)");
if (rawFile.length() == localFile.length()) {
System.out.println("โœ… File sizes match!");
} else {
System.out.println("โŒ File size mismatch: " + (rawFile.length() - localFile.length()) + " bytes");
}
}
System.out.println("\nโ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—");
System.out.println("โ•‘ TEST COMPLETE - Check logs above for differences โ•‘");
System.out.println("โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•");
}
// Employee class for Spark DataFrame
public static class Employee implements java.io.Serializable {
private int id;
private String name;
private String department;
private int salary;
public Employee() {} // Required for Spark
public Employee(int id, String name, String department, int salary) {
this.id = id;
this.name = name;
this.department = department;
this.salary = salary;
}
// Getters and Setters (required for Spark)
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDepartment() { return department; }
public void setDepartment(String department) { this.department = department; }
public int getSalary() { return salary; }
public void setSalary(int salary) { this.salary = salary; }
}
}

306
test/java/spark/src/test/java/seaweed/spark/SparkShadowComparisonTest.java

@ -1,306 +0,0 @@
package seaweed.spark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
/**
* CRITICAL COMPARISON TEST: Use RawLocalFileSystem as a "shadow" to compare
* all I/O operations with LOCAL_ONLY mode.
*
* This test writes the same data to both:
* 1. RawLocalFileSystem (file://) - Known to work
* 2. SeaweedFS LOCAL_ONLY mode (seaweedfs://) - Has 78-byte error
*
* Then compares the resulting files byte-by-byte to find the exact difference.
*/
public class SparkShadowComparisonTest extends SparkTestBase {
private String rawLocalDir;
private String localOnlyDir;
private FileSystem rawLocalFs;
@Before
public void setUp() throws Exception {
super.setUpSpark();
// Set up RawLocalFileSystem directory
rawLocalDir = "/tmp/spark-shadow-rawlocal-" + System.currentTimeMillis();
new File(rawLocalDir).mkdirs();
Configuration conf = spark.sparkContext().hadoopConfiguration();
rawLocalFs = new RawLocalFileSystem();
rawLocalFs.initialize(new URI("file:///"), conf);
rawLocalFs.delete(new Path(rawLocalDir), true);
rawLocalFs.mkdirs(new Path(rawLocalDir));
// Set up LOCAL_ONLY directory (will be in debug dir)
localOnlyDir = "/workspace/target/debug-shadow";
new File(localOnlyDir).mkdirs();
// Clean up previous runs
for (File f : new File(localOnlyDir).listFiles()) {
f.delete();
}
System.out.println("\nโ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—");
System.out.println("โ•‘ SHADOW COMPARISON: RawLocalFS vs LOCAL_ONLY โ•‘");
System.out.println("โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•");
System.out.println("RawLocalFS directory: " + rawLocalDir);
System.out.println("LOCAL_ONLY directory: " + localOnlyDir);
}
@After
public void tearDown() throws Exception {
if (rawLocalFs != null) {
rawLocalFs.delete(new Path(rawLocalDir), true);
rawLocalFs.close();
}
super.tearDownSpark();
}
@Test
public void testShadowComparison() throws IOException {
System.out.println("\n=== PHASE 1: Write to RawLocalFileSystem ===");
// Create test data
List<Employee> employees = Arrays.asList(
new Employee(1, "Alice", "Engineering", 100000),
new Employee(2, "Bob", "Sales", 80000),
new Employee(3, "Charlie", "Engineering", 120000),
new Employee(4, "David", "Sales", 75000));
Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
// Write to RawLocalFileSystem
String rawLocalPath = "file://" + rawLocalDir + "/employees";
System.out.println("Writing to RawLocalFS: " + rawLocalPath);
try {
df.write().mode(SaveMode.Overwrite).parquet(rawLocalPath);
System.out.println("โœ… RawLocalFS write completed successfully!");
} catch (Exception e) {
System.err.println("โŒ RawLocalFS write FAILED: " + e.getMessage());
e.printStackTrace();
fail("RawLocalFS write should not fail!");
}
// List files written by RawLocalFS
File rawLocalParquetDir = new File(rawLocalDir + "/employees");
File[] rawLocalFiles = rawLocalParquetDir.listFiles((dir, name) -> name.endsWith(".parquet"));
assertNotNull("RawLocalFS should have written files", rawLocalFiles);
assertTrue("RawLocalFS should have at least one parquet file", rawLocalFiles.length > 0);
System.out.println("RawLocalFS wrote " + rawLocalFiles.length + " parquet file(s):");
for (File f : rawLocalFiles) {
System.out.println(" - " + f.getName() + " (" + f.length() + " bytes)");
}
System.out.println("\n=== PHASE 2: Write to LOCAL_ONLY mode ===");
// Set environment for LOCAL_ONLY mode
System.setProperty("SEAWEEDFS_DEBUG_MODE", "LOCAL_ONLY");
spark.sparkContext().hadoopConfiguration().set("fs.seaweedfs.debug.dir", localOnlyDir);
// Write to LOCAL_ONLY
String localOnlyPath = getTestPath("employees_localonly");
System.out.println("Writing to LOCAL_ONLY: " + localOnlyPath);
boolean localOnlyWriteSucceeded = false;
try {
df.write().mode(SaveMode.Overwrite).parquet(localOnlyPath);
System.out.println("โœ… LOCAL_ONLY write completed successfully!");
localOnlyWriteSucceeded = true;
} catch (Exception e) {
System.err.println("โš ๏ธ LOCAL_ONLY write completed but may have issues: " + e.getMessage());
// Don't fail here - we want to compare files even if write "succeeded"
}
// List files written by LOCAL_ONLY
File[] localOnlyFiles = new File(localOnlyDir).listFiles((dir, name) -> name.endsWith(".debug"));
if (localOnlyFiles == null || localOnlyFiles.length == 0) {
System.err.println("โŒ LOCAL_ONLY did not write any .debug files!");
fail("LOCAL_ONLY should have written .debug files");
}
System.out.println("LOCAL_ONLY wrote " + localOnlyFiles.length + " .debug file(s):");
for (File f : localOnlyFiles) {
System.out.println(" - " + f.getName() + " (" + f.length() + " bytes)");
}
System.out.println("\n=== PHASE 3: Compare Files Byte-by-Byte ===");
// Match files by pattern (both should have part-00000-*.snappy.parquet)
File rawFile = rawLocalFiles[0]; // Should only be one file
File localOnlyFile = null;
// Find the .debug file that looks like a parquet file
for (File f : localOnlyFiles) {
if (f.getName().contains("part-") && f.getName().endsWith(".parquet.debug")) {
localOnlyFile = f;
break;
}
}
if (localOnlyFile == null) {
System.out.println("โŒ Could not find LOCAL_ONLY parquet file!");
System.out.println("Available .debug files:");
for (File f : localOnlyFiles) {
System.out.println(" - " + f.getName());
}
fail("LOCAL_ONLY should have written a parquet .debug file");
}
System.out.println("\nComparing:");
System.out.println(" RawLocalFS: " + rawFile.getName() + " (" + rawFile.length() + " bytes)");
System.out.println(" LOCAL_ONLY: " + localOnlyFile.getName() + " (" + localOnlyFile.length() + " bytes)");
// Compare file sizes
long sizeDiff = rawFile.length() - localOnlyFile.length();
if (sizeDiff != 0) {
System.out.println(" โš ๏ธ SIZE DIFFERENCE: " + sizeDiff + " bytes");
System.out.println(" RawLocalFS is " + (sizeDiff > 0 ? "LARGER" : "SMALLER") + " by " + Math.abs(sizeDiff) + " bytes");
if (Math.abs(sizeDiff) == 78) {
System.out.println(" ๐Ÿ” THIS IS THE 78-BYTE DIFFERENCE!");
}
} else {
System.out.println(" โœ… File sizes match!");
}
// Compare file contents byte-by-byte
byte[] rawBytes = Files.readAllBytes(rawFile.toPath());
byte[] localOnlyBytes = Files.readAllBytes(localOnlyFile.toPath());
int minLen = Math.min(rawBytes.length, localOnlyBytes.length);
int firstDiffIndex = -1;
for (int i = 0; i < minLen; i++) {
if (rawBytes[i] != localOnlyBytes[i]) {
firstDiffIndex = i;
break;
}
}
if (firstDiffIndex >= 0) {
System.out.println(" โš ๏ธ CONTENT DIFFERS at byte offset: " + firstDiffIndex);
System.out.println(" Showing 32 bytes around difference:");
int start = Math.max(0, firstDiffIndex - 16);
int end = Math.min(minLen, firstDiffIndex + 16);
System.out.print(" RawLocalFS: ");
for (int i = start; i < end; i++) {
System.out.printf("%02X ", rawBytes[i]);
if (i == firstDiffIndex) System.out.print("| ");
}
System.out.println();
System.out.print(" LOCAL_ONLY: ");
for (int i = start; i < end; i++) {
System.out.printf("%02X ", localOnlyBytes[i]);
if (i == firstDiffIndex) System.out.print("| ");
}
System.out.println();
} else if (rawBytes.length == localOnlyBytes.length) {
System.out.println(" โœ… File contents are IDENTICAL!");
} else {
System.out.println(" โš ๏ธ Files match up to " + minLen + " bytes, but differ in length");
// Show the extra bytes
if (rawBytes.length > localOnlyBytes.length) {
System.out.println(" RawLocalFS has " + (rawBytes.length - minLen) + " extra bytes at end:");
System.out.print(" ");
for (int i = minLen; i < Math.min(rawBytes.length, minLen + 32); i++) {
System.out.printf("%02X ", rawBytes[i]);
}
System.out.println();
} else {
System.out.println(" LOCAL_ONLY has " + (localOnlyBytes.length - minLen) + " extra bytes at end:");
System.out.print(" ");
for (int i = minLen; i < Math.min(localOnlyBytes.length, minLen + 32); i++) {
System.out.printf("%02X ", localOnlyBytes[i]);
}
System.out.println();
}
}
System.out.println("\n=== PHASE 4: Try Reading Both Files ===");
// Try reading RawLocalFS file
System.out.println("\nReading from RawLocalFS:");
try {
Dataset<Row> rawDf = spark.read().parquet(rawLocalPath);
long rawCount = rawDf.count();
System.out.println("โœ… RawLocalFS read successful! Row count: " + rawCount);
assertEquals("Should have 4 employees", 4, rawCount);
} catch (Exception e) {
System.err.println("โŒ RawLocalFS read FAILED: " + e.getMessage());
e.printStackTrace();
fail("RawLocalFS read should not fail!");
}
// Try reading LOCAL_ONLY file
System.out.println("\nReading from LOCAL_ONLY:");
try {
Dataset<Row> localOnlyDf = spark.read().parquet(localOnlyPath);
long localOnlyCount = localOnlyDf.count();
System.out.println("โœ… LOCAL_ONLY read successful! Row count: " + localOnlyCount);
assertEquals("Should have 4 employees", 4, localOnlyCount);
} catch (Exception e) {
System.err.println("โŒ LOCAL_ONLY read FAILED: " + e.getMessage());
if (e.getMessage() != null && e.getMessage().contains("78 bytes")) {
System.err.println("๐Ÿ” CONFIRMED: 78-byte error occurs during READ, not WRITE!");
}
// Don't fail - we expect this to fail
}
System.out.println("\nโ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—");
System.out.println("โ•‘ SHADOW COMPARISON COMPLETE โ•‘");
System.out.println("โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•");
}
// Employee class for Spark DataFrame
public static class Employee implements java.io.Serializable {
private int id;
private String name;
private String department;
private int salary;
public Employee() {} // Required for Spark
public Employee(int id, String name, String department, int salary) {
this.id = id;
this.name = name;
this.department = department;
this.salary = salary;
}
// Getters and Setters (required for Spark)
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDepartment() { return department; }
public void setDepartment(String department) { this.department = department; }
public int getSalary() { return salary; }
public void setSalary(int salary) { this.salary = salary; }
}
}

343
test/java/spark/src/test/java/seaweed/spark/SparkShadowReadComparisonTest.java

@ -1,343 +0,0 @@
package seaweed.spark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
/**
* CRITICAL READ COMPARISON TEST: Compare all read operations between RawLocalFileSystem
* and SeaweedFS LOCAL_ONLY mode.
*
* This test:
* 1. Writes identical data to both RawLocalFS and LOCAL_ONLY
* 2. Performs the same read operations on both
* 3. Compares the results of each read operation
* 4. Identifies where the divergence happens
*/
public class SparkShadowReadComparisonTest extends SparkTestBase {
private String rawLocalDir;
private String localOnlyDir;
private FileSystem rawLocalFs;
private FileSystem seaweedFs;
private String rawLocalParquetFile;
private String localOnlyParquetFile;
@Before
public void setUp() throws Exception {
super.setUpSpark();
// Set up RawLocalFileSystem directory
rawLocalDir = "/tmp/spark-shadow-read-rawlocal-" + System.currentTimeMillis();
new File(rawLocalDir).mkdirs();
Configuration conf = spark.sparkContext().hadoopConfiguration();
rawLocalFs = new RawLocalFileSystem();
rawLocalFs.initialize(new URI("file:///"), conf);
rawLocalFs.delete(new Path(rawLocalDir), true);
rawLocalFs.mkdirs(new Path(rawLocalDir));
// Set up LOCAL_ONLY directory
localOnlyDir = "/workspace/target/debug-shadow-read";
new File(localOnlyDir).mkdirs();
for (File f : new File(localOnlyDir).listFiles()) {
f.delete();
}
// Get SeaweedFS instance
seaweedFs = FileSystem.get(URI.create("seaweedfs://seaweedfs-filer:8888"), conf);
System.out.println("\nโ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—");
System.out.println("โ•‘ SHADOW READ COMPARISON: RawLocalFS vs LOCAL_ONLY โ•‘");
System.out.println("โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•");
System.out.println("RawLocalFS directory: " + rawLocalDir);
System.out.println("LOCAL_ONLY directory: " + localOnlyDir);
}
@After
public void tearDown() throws Exception {
if (rawLocalFs != null) {
rawLocalFs.delete(new Path(rawLocalDir), true);
rawLocalFs.close();
}
super.tearDownSpark();
}
@Test
public void testShadowReadComparison() throws IOException {
System.out.println("\n=== PHASE 1: Write Identical Data to Both FileSystems ===");
// Create test data
List<Employee> employees = Arrays.asList(
new Employee(1, "Alice", "Engineering", 100000),
new Employee(2, "Bob", "Sales", 80000),
new Employee(3, "Charlie", "Engineering", 120000),
new Employee(4, "David", "Sales", 75000));
Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
// Write to RawLocalFileSystem
String rawLocalPath = "file://" + rawLocalDir + "/employees";
System.out.println("Writing to RawLocalFS: " + rawLocalPath);
df.write().mode(SaveMode.Overwrite).parquet(rawLocalPath);
System.out.println("โœ… RawLocalFS write completed");
// Set environment for LOCAL_ONLY mode
System.setProperty("SEAWEEDFS_DEBUG_MODE", "LOCAL_ONLY");
spark.sparkContext().hadoopConfiguration().set("fs.seaweedfs.debug.dir", localOnlyDir);
// Write to LOCAL_ONLY
String localOnlyPath = getTestPath("employees_read_test");
System.out.println("Writing to LOCAL_ONLY: " + localOnlyPath);
df.write().mode(SaveMode.Overwrite).parquet(localOnlyPath);
System.out.println("โœ… LOCAL_ONLY write completed");
// Find the parquet files
File rawLocalParquetDir = new File(rawLocalDir + "/employees");
File[] rawLocalFiles = rawLocalParquetDir.listFiles((dir, name) -> name.endsWith(".parquet"));
assertNotNull("RawLocalFS should have written files", rawLocalFiles);
assertTrue("RawLocalFS should have at least one parquet file", rawLocalFiles.length > 0);
rawLocalParquetFile = rawLocalFiles[0].getAbsolutePath();
File[] localOnlyFiles = new File(localOnlyDir).listFiles((dir, name) -> name.endsWith(".parquet.debug"));
assertNotNull("LOCAL_ONLY should have written files", localOnlyFiles);
assertTrue("LOCAL_ONLY should have at least one parquet file", localOnlyFiles.length > 0);
localOnlyParquetFile = localOnlyFiles[0].getAbsolutePath();
System.out.println("RawLocalFS file: " + rawLocalParquetFile);
System.out.println("LOCAL_ONLY file: " + localOnlyParquetFile);
System.out.println("\n=== PHASE 2: Compare Low-Level Read Operations ===");
// Open both files for reading
FSDataInputStream rawStream = rawLocalFs.open(new Path(rawLocalParquetFile));
// For LOCAL_ONLY, we need to read the .debug file directly using RawLocalFS
// because it's just a local file
FSDataInputStream localOnlyStream = rawLocalFs.open(new Path(localOnlyParquetFile));
try {
// Test 1: Read file length
System.out.println("\n--- Test 1: File Length ---");
long rawLength = rawLocalFs.getFileStatus(new Path(rawLocalParquetFile)).getLen();
long localOnlyLength = rawLocalFs.getFileStatus(new Path(localOnlyParquetFile)).getLen();
System.out.println("RawLocalFS length: " + rawLength);
System.out.println("LOCAL_ONLY length: " + localOnlyLength);
if (rawLength == localOnlyLength) {
System.out.println("โœ… Lengths match!");
} else {
System.out.println("โŒ Length mismatch: " + (rawLength - localOnlyLength) + " bytes");
}
assertEquals("File lengths should match", rawLength, localOnlyLength);
// Test 2: Read first 100 bytes
System.out.println("\n--- Test 2: Read First 100 Bytes ---");
byte[] rawBuffer1 = new byte[100];
byte[] localOnlyBuffer1 = new byte[100];
rawStream.readFully(0, rawBuffer1);
localOnlyStream.readFully(0, localOnlyBuffer1);
boolean firstBytesMatch = Arrays.equals(rawBuffer1, localOnlyBuffer1);
System.out.println("First 100 bytes match: " + (firstBytesMatch ? "โœ…" : "โŒ"));
if (!firstBytesMatch) {
System.out.println("First difference at byte: " + findFirstDifference(rawBuffer1, localOnlyBuffer1));
}
assertTrue("First 100 bytes should match", firstBytesMatch);
// Test 3: Read last 100 bytes (Parquet footer)
System.out.println("\n--- Test 3: Read Last 100 Bytes (Parquet Footer) ---");
byte[] rawBuffer2 = new byte[100];
byte[] localOnlyBuffer2 = new byte[100];
rawStream.readFully(rawLength - 100, rawBuffer2);
localOnlyStream.readFully(localOnlyLength - 100, localOnlyBuffer2);
boolean lastBytesMatch = Arrays.equals(rawBuffer2, localOnlyBuffer2);
System.out.println("Last 100 bytes match: " + (lastBytesMatch ? "โœ…" : "โŒ"));
if (!lastBytesMatch) {
System.out.println("First difference at byte: " + findFirstDifference(rawBuffer2, localOnlyBuffer2));
System.out.println("RawLocalFS last 20 bytes:");
printHex(rawBuffer2, 80, 100);
System.out.println("LOCAL_ONLY last 20 bytes:");
printHex(localOnlyBuffer2, 80, 100);
}
assertTrue("Last 100 bytes should match", lastBytesMatch);
// Test 4: Read entire file
System.out.println("\n--- Test 4: Read Entire File ---");
byte[] rawFull = new byte[(int) rawLength];
byte[] localOnlyFull = new byte[(int) localOnlyLength];
rawStream.readFully(0, rawFull);
localOnlyStream.readFully(0, localOnlyFull);
boolean fullMatch = Arrays.equals(rawFull, localOnlyFull);
System.out.println("Full file match: " + (fullMatch ? "โœ…" : "โŒ"));
if (!fullMatch) {
int firstDiff = findFirstDifference(rawFull, localOnlyFull);
System.out.println("First difference at byte: " + firstDiff);
}
assertTrue("Full file should match", fullMatch);
// Test 5: Sequential reads
System.out.println("\n--- Test 5: Sequential Reads (10 bytes at a time) ---");
rawStream.seek(0);
localOnlyStream.seek(0);
boolean sequentialMatch = true;
int chunkSize = 10;
int chunksRead = 0;
while (rawStream.getPos() < rawLength && localOnlyStream.getPos() < localOnlyLength) {
byte[] rawChunk = new byte[chunkSize];
byte[] localOnlyChunk = new byte[chunkSize];
int rawRead = rawStream.read(rawChunk);
int localOnlyRead = localOnlyStream.read(localOnlyChunk);
if (rawRead != localOnlyRead) {
System.out.println("โŒ Read size mismatch at chunk " + chunksRead + ": raw=" + rawRead + " localOnly=" + localOnlyRead);
sequentialMatch = false;
break;
}
if (!Arrays.equals(rawChunk, localOnlyChunk)) {
System.out.println("โŒ Content mismatch at chunk " + chunksRead + " (byte offset " + (chunksRead * chunkSize) + ")");
sequentialMatch = false;
break;
}
chunksRead++;
}
System.out.println("Sequential reads (" + chunksRead + " chunks): " + (sequentialMatch ? "โœ…" : "โŒ"));
assertTrue("Sequential reads should match", sequentialMatch);
} finally {
rawStream.close();
localOnlyStream.close();
}
System.out.println("\n=== PHASE 3: Compare Spark Read Operations ===");
// Test 6: Spark read from RawLocalFS
System.out.println("\n--- Test 6: Spark Read from RawLocalFS ---");
try {
Dataset<Row> rawDf = spark.read().parquet(rawLocalPath);
long rawCount = rawDf.count();
System.out.println("โœ… RawLocalFS Spark read successful! Row count: " + rawCount);
assertEquals("Should have 4 employees", 4, rawCount);
} catch (Exception e) {
System.err.println("โŒ RawLocalFS Spark read FAILED: " + e.getMessage());
e.printStackTrace();
fail("RawLocalFS Spark read should not fail!");
}
// Test 7: Spark read from LOCAL_ONLY
System.out.println("\n--- Test 7: Spark Read from LOCAL_ONLY ---");
try {
Dataset<Row> localOnlyDf = spark.read().parquet(localOnlyPath);
long localOnlyCount = localOnlyDf.count();
System.out.println("โœ… LOCAL_ONLY Spark read successful! Row count: " + localOnlyCount);
assertEquals("Should have 4 employees", 4, localOnlyCount);
} catch (Exception e) {
System.err.println("โŒ LOCAL_ONLY Spark read FAILED: " + e.getMessage());
if (e.getMessage() != null && e.getMessage().contains("78 bytes")) {
System.err.println("๐Ÿ” FOUND IT! 78-byte error occurs during Spark read!");
System.err.println("But low-level reads worked, so the issue is in Spark's Parquet reader!");
}
e.printStackTrace();
// Don't fail - we want to see the full output
}
// Test 8: SQL query on RawLocalFS
System.out.println("\n--- Test 8: SQL Query on RawLocalFS ---");
try {
Dataset<Row> rawDf = spark.read().parquet(rawLocalPath);
rawDf.createOrReplaceTempView("employees_raw");
Dataset<Row> rawResult = spark.sql("SELECT name, salary FROM employees_raw WHERE department = 'Engineering'");
long rawResultCount = rawResult.count();
System.out.println("โœ… RawLocalFS SQL query successful! Row count: " + rawResultCount);
assertEquals("Should have 2 engineering employees", 2, rawResultCount);
} catch (Exception e) {
System.err.println("โŒ RawLocalFS SQL query FAILED: " + e.getMessage());
e.printStackTrace();
fail("RawLocalFS SQL query should not fail!");
}
// Test 9: SQL query on LOCAL_ONLY
System.out.println("\n--- Test 9: SQL Query on LOCAL_ONLY ---");
try {
Dataset<Row> localOnlyDf = spark.read().parquet(localOnlyPath);
localOnlyDf.createOrReplaceTempView("employees_localonly");
Dataset<Row> localOnlyResult = spark.sql("SELECT name, salary FROM employees_localonly WHERE department = 'Engineering'");
long localOnlyResultCount = localOnlyResult.count();
System.out.println("โœ… LOCAL_ONLY SQL query successful! Row count: " + localOnlyResultCount);
assertEquals("Should have 2 engineering employees", 2, localOnlyResultCount);
} catch (Exception e) {
System.err.println("โŒ LOCAL_ONLY SQL query FAILED: " + e.getMessage());
if (e.getMessage() != null && e.getMessage().contains("78 bytes")) {
System.err.println("๐Ÿ” 78-byte error in SQL query!");
}
e.printStackTrace();
// Don't fail - we want to see the full output
}
System.out.println("\nโ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—");
System.out.println("โ•‘ SHADOW READ COMPARISON COMPLETE โ•‘");
System.out.println("โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•");
}
private int findFirstDifference(byte[] a, byte[] b) {
int minLen = Math.min(a.length, b.length);
for (int i = 0; i < minLen; i++) {
if (a[i] != b[i]) {
return i;
}
}
return minLen;
}
private void printHex(byte[] data, int start, int end) {
System.out.print(" ");
for (int i = start; i < end && i < data.length; i++) {
System.out.printf("%02X ", data[i]);
}
System.out.println();
}
// Employee class for Spark DataFrame
public static class Employee implements java.io.Serializable {
private int id;
private String name;
private String department;
private int salary;
public Employee() {} // Required for Spark
public Employee(int id, String name, String department, int salary) {
this.id = id;
this.name = name;
this.department = department;
this.salary = salary;
}
// Getters and Setters (required for Spark)
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDepartment() { return department; }
public void setDepartment(String department) { this.department = department; }
public int getSalary() { return salary; }
public void setSalary(int salary) { this.salary = salary; }
}
}
Loadingโ€ฆ
Cancel
Save