Browse Source

fix: SeaweedInputStream returning 0 bytes for inline content reads

ROOT CAUSE IDENTIFIED:
In SeaweedInputStream.read(ByteBuffer buf), when reading inline content
(stored directly in the protobuf entry), the code was copying data to
the buffer but NOT updating bytesRead, causing it to return 0.

This caused Parquet's H2SeekableInputStream.readFully() to fail with:
"EOFException: Still have: 78 bytes left"

The readFully() method calls read() in a loop until all requested bytes
are read. When read() returns 0 or -1 prematurely, it throws EOF.

CHANGES:
1. SeaweedInputStream.java:
   - Fixed inline content read to set bytesRead = len after copying
   - Added debug logging to track position, len, and bytesRead
   - This ensures read() always returns the actual number of bytes read

2. SeaweedStreamIntegrationTest.java:
   - Added comprehensive testRangeReads() that simulates Parquet behavior:
     * Seeks to specific offsets (like reading footer at end)
     * Reads specific byte ranges (like reading column chunks)
     * Uses readFully() pattern with multiple sequential read() calls
     * Tests the exact scenario that was failing (78-byte read at offset 1197)
   - This test will catch any future regressions in range read behavior

VERIFICATION:
Local testing showed:
- contentLength correctly set to 1275 bytes
- Chunk download retrieved all 1275 bytes from volume server
- BUT read() was returning -1 before fulfilling Parquet's request
- After fix, test compiles successfully

Related to: Spark integration test failures with Parquet files
pull/7526/head
chrislu 1 week ago
parent
commit
e95f7061a4
  1. 31
      other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
  2. 102
      other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java

31
other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java

@ -23,7 +23,7 @@ public class SeaweedInputStream extends InputStream {
private final long contentLength;
private FilerProto.Entry entry;
private long position = 0; // cursor of the file
private long position = 0; // cursor of the file
private boolean closed = false;
@ -44,7 +44,7 @@ public class SeaweedInputStream extends InputStream {
}
this.contentLength = SeaweedRead.fileSize(entry);
LOG.warn("[DEBUG-2024] SeaweedInputStream created (from fullpath): path={} contentLength={} #chunks={}",
LOG.warn("[DEBUG-2024] SeaweedInputStream created (from fullpath): path={} contentLength={} #chunks={}",
fullpath, this.contentLength, entry.getChunksCount());
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
@ -66,7 +66,7 @@ public class SeaweedInputStream extends InputStream {
}
this.contentLength = SeaweedRead.fileSize(entry);
LOG.warn("[DEBUG-2024] SeaweedInputStream created (from entry): path={} contentLength={} #chunks={}",
LOG.warn("[DEBUG-2024] SeaweedInputStream created (from entry): path={} contentLength={} #chunks={}",
path, this.contentLength, entry.getChunksCount());
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
@ -103,7 +103,8 @@ public class SeaweedInputStream extends InputStream {
throw new IllegalArgumentException("requested read length is less than zero");
}
if (len > (b.length - off)) {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
throw new IllegalArgumentException(
"requested read length is more than will fit after requested offset in buffer");
}
ByteBuffer buf = ByteBuffer.wrap(b, off, len);
@ -118,21 +119,26 @@ public class SeaweedInputStream extends InputStream {
throw new IllegalArgumentException("attempting to read from negative offset");
}
if (position >= contentLength) {
return -1; // Hadoop prefers -1 to EOFException
return -1; // Hadoop prefers -1 to EOFException
}
long bytesRead = 0;
int len = buf.remaining();
if (this.position< Integer.MAX_VALUE && (this.position + len )<= entry.getContent().size()) {
entry.getContent().substring((int)this.position, (int)(this.position + len)).copyTo(buf);
if (this.position < Integer.MAX_VALUE && (this.position + len) <= entry.getContent().size()) {
entry.getContent().substring((int) this.position, (int) (this.position + len)).copyTo(buf);
bytesRead = len; // FIX: Update bytesRead after inline copy
} else {
bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf,
SeaweedRead.fileSize(entry));
}
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
LOG.warn("[DEBUG-2024] SeaweedInputStream.read(): path={} position={} len={} bytesRead={} newPosition={}",
path, position, len, bytesRead, position + bytesRead);
if (bytesRead > 0) {
this.position += bytesRead;
}
@ -192,12 +198,15 @@ public class SeaweedInputStream extends InputStream {
}
final long remaining = this.contentLength - this.position;
return remaining <= Integer.MAX_VALUE
? (int) remaining : Integer.MAX_VALUE;
? (int) remaining
: Integer.MAX_VALUE;
}
/**
* Returns the length of the file that this stream refers to. Note that the length returned is the length
* as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
* Returns the length of the file that this stream refers to. Note that the
* length returned is the length
* as of the time the Stream was opened. Specifically, if there have been
* subsequent appends to the file,
* they wont be reflected in the returned length.
*
* @return length of the file.

102
other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java

@ -413,5 +413,107 @@ public class SeaweedStreamIntegrationTest {
assertEquals("Content should match", testContent,
new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
}
/**
* Tests range reads similar to how Parquet reads column chunks.
* This simulates:
* 1. Seeking to specific offsets
* 2. Reading specific byte ranges
* 3. Verifying each read() call returns the correct number of bytes
*
* This test specifically addresses the bug where read() was returning 0
* for inline content or -1 prematurely for chunked reads.
*/
@Test
public void testRangeReads() throws IOException {
if (!TESTS_ENABLED) {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
String testPath = TEST_ROOT + "/rangereads.dat";
// Create a 1275-byte file (similar to the Parquet file size that was failing)
byte[] testData = new byte[1275];
Random random = new Random(42); // Fixed seed for reproducibility
random.nextBytes(testData);
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testData);
outputStream.close();
// Read file entry
FilerProto.Entry entry = filerClient.lookupEntry(
SeaweedOutputStream.getParentDirectory(testPath),
SeaweedOutputStream.getFileName(testPath)
);
// Test 1: Read last 8 bytes (like reading Parquet footer length)
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
inputStream.seek(1267);
byte[] buffer = new byte[8];
int bytesRead = inputStream.read(buffer, 0, 8);
assertEquals("Should read 8 bytes at offset 1267", 8, bytesRead);
assertArrayEquals("Content at offset 1267 should match",
Arrays.copyOfRange(testData, 1267, 1275), buffer);
inputStream.close();
// Test 2: Read large chunk in middle (like reading column data)
inputStream = new SeaweedInputStream(filerClient, testPath, entry);
inputStream.seek(383);
buffer = new byte[884]; // Read bytes 383-1267
bytesRead = inputStream.read(buffer, 0, 884);
assertEquals("Should read 884 bytes at offset 383", 884, bytesRead);
assertArrayEquals("Content at offset 383 should match",
Arrays.copyOfRange(testData, 383, 1267), buffer);
inputStream.close();
// Test 3: Read from beginning (like reading Parquet magic bytes)
inputStream = new SeaweedInputStream(filerClient, testPath, entry);
buffer = new byte[4];
bytesRead = inputStream.read(buffer, 0, 4);
assertEquals("Should read 4 bytes at offset 0", 4, bytesRead);
assertArrayEquals("Content at offset 0 should match",
Arrays.copyOfRange(testData, 0, 4), buffer);
inputStream.close();
// Test 4: Multiple sequential reads without seeking (like H2SeekableInputStream.readFully)
// This is the critical test case that was failing!
inputStream = new SeaweedInputStream(filerClient, testPath, entry);
inputStream.seek(1197); // Position where EOF was being returned prematurely
byte[] fullBuffer = new byte[78]; // Try to read the "missing" 78 bytes
int totalRead = 0;
int offset = 0;
int remaining = 78;
// Simulate Parquet's H2SeekableInputStream.readFully() loop
while (remaining > 0) {
int read = inputStream.read(fullBuffer, offset, remaining);
if (read == -1) {
fail(String.format(
"Got EOF after reading %d bytes, but expected to read %d more bytes (total requested: 78)",
totalRead, remaining));
}
assertTrue("Each read() should return positive bytes", read > 0);
totalRead += read;
offset += read;
remaining -= read;
}
assertEquals("Should read all 78 bytes in readFully loop", 78, totalRead);
assertArrayEquals("Content at offset 1197 should match",
Arrays.copyOfRange(testData, 1197, 1275), fullBuffer);
inputStream.close();
// Test 5: Read entire file in one go
inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] allData = new byte[1275];
bytesRead = inputStream.read(allData, 0, 1275);
assertEquals("Should read entire 1275 bytes", 1275, bytesRead);
assertArrayEquals("Entire content should match", testData, allData);
inputStream.close();
}
}
Loading…
Cancel
Save