diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java index cdc16a9bf..0e016ad6e 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -23,7 +23,7 @@ public class SeaweedInputStream extends InputStream { private final long contentLength; private FilerProto.Entry entry; - private long position = 0; // cursor of the file + private long position = 0; // cursor of the file private boolean closed = false; @@ -44,7 +44,7 @@ public class SeaweedInputStream extends InputStream { } this.contentLength = SeaweedRead.fileSize(entry); - LOG.warn("[DEBUG-2024] SeaweedInputStream created (from fullpath): path={} contentLength={} #chunks={}", + LOG.warn("[DEBUG-2024] SeaweedInputStream created (from fullpath): path={} contentLength={} #chunks={}", fullpath, this.contentLength, entry.getChunksCount()); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); @@ -66,7 +66,7 @@ public class SeaweedInputStream extends InputStream { } this.contentLength = SeaweedRead.fileSize(entry); - LOG.warn("[DEBUG-2024] SeaweedInputStream created (from entry): path={} contentLength={} #chunks={}", + LOG.warn("[DEBUG-2024] SeaweedInputStream created (from entry): path={} contentLength={} #chunks={}", path, this.contentLength, entry.getChunksCount()); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); @@ -103,7 +103,8 @@ public class SeaweedInputStream extends InputStream { throw new IllegalArgumentException("requested read length is less than zero"); } if (len > (b.length - off)) { - throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); + throw new IllegalArgumentException( + "requested read length is more than will fit after requested offset in buffer"); } ByteBuffer buf = ByteBuffer.wrap(b, off, len); @@ -118,21 +119,26 @@ public class SeaweedInputStream extends InputStream { throw new IllegalArgumentException("attempting to read from negative offset"); } if (position >= contentLength) { - return -1; // Hadoop prefers -1 to EOFException + return -1; // Hadoop prefers -1 to EOFException } long bytesRead = 0; int len = buf.remaining(); - if (this.position< Integer.MAX_VALUE && (this.position + len )<= entry.getContent().size()) { - entry.getContent().substring((int)this.position, (int)(this.position + len)).copyTo(buf); + if (this.position < Integer.MAX_VALUE && (this.position + len) <= entry.getContent().size()) { + entry.getContent().substring((int) this.position, (int) (this.position + len)).copyTo(buf); + bytesRead = len; // FIX: Update bytesRead after inline copy } else { - bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); + bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, + SeaweedRead.fileSize(entry)); } if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } + LOG.warn("[DEBUG-2024] SeaweedInputStream.read(): path={} position={} len={} bytesRead={} newPosition={}", + path, position, len, bytesRead, position + bytesRead); + if (bytesRead > 0) { this.position += bytesRead; } @@ -192,12 +198,15 @@ public class SeaweedInputStream extends InputStream { } final long remaining = this.contentLength - this.position; return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; + ? (int) remaining + : Integer.MAX_VALUE; } /** - * Returns the length of the file that this stream refers to. Note that the length returned is the length - * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, + * Returns the length of the file that this stream refers to. Note that the + * length returned is the length + * as of the time the Stream was opened. Specifically, if there have been + * subsequent appends to the file, * they wont be reflected in the returned length. * * @return length of the file. diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java index f384e059f..16872a7e6 100644 --- a/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java @@ -413,5 +413,107 @@ public class SeaweedStreamIntegrationTest { assertEquals("Content should match", testContent, new String(buffer, 0, bytesRead, StandardCharsets.UTF_8)); } + + /** + * Tests range reads similar to how Parquet reads column chunks. + * This simulates: + * 1. Seeking to specific offsets + * 2. Reading specific byte ranges + * 3. Verifying each read() call returns the correct number of bytes + * + * This test specifically addresses the bug where read() was returning 0 + * for inline content or -1 prematurely for chunked reads. + */ + @Test + public void testRangeReads() throws IOException { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + String testPath = TEST_ROOT + "/rangereads.dat"; + + // Create a 1275-byte file (similar to the Parquet file size that was failing) + byte[] testData = new byte[1275]; + Random random = new Random(42); // Fixed seed for reproducibility + random.nextBytes(testData); + + // Write file + SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); + outputStream.write(testData); + outputStream.close(); + + // Read file entry + FilerProto.Entry entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath) + ); + + // Test 1: Read last 8 bytes (like reading Parquet footer length) + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); + inputStream.seek(1267); + byte[] buffer = new byte[8]; + int bytesRead = inputStream.read(buffer, 0, 8); + assertEquals("Should read 8 bytes at offset 1267", 8, bytesRead); + assertArrayEquals("Content at offset 1267 should match", + Arrays.copyOfRange(testData, 1267, 1275), buffer); + inputStream.close(); + + // Test 2: Read large chunk in middle (like reading column data) + inputStream = new SeaweedInputStream(filerClient, testPath, entry); + inputStream.seek(383); + buffer = new byte[884]; // Read bytes 383-1267 + bytesRead = inputStream.read(buffer, 0, 884); + assertEquals("Should read 884 bytes at offset 383", 884, bytesRead); + assertArrayEquals("Content at offset 383 should match", + Arrays.copyOfRange(testData, 383, 1267), buffer); + inputStream.close(); + + // Test 3: Read from beginning (like reading Parquet magic bytes) + inputStream = new SeaweedInputStream(filerClient, testPath, entry); + buffer = new byte[4]; + bytesRead = inputStream.read(buffer, 0, 4); + assertEquals("Should read 4 bytes at offset 0", 4, bytesRead); + assertArrayEquals("Content at offset 0 should match", + Arrays.copyOfRange(testData, 0, 4), buffer); + inputStream.close(); + + // Test 4: Multiple sequential reads without seeking (like H2SeekableInputStream.readFully) + // This is the critical test case that was failing! + inputStream = new SeaweedInputStream(filerClient, testPath, entry); + inputStream.seek(1197); // Position where EOF was being returned prematurely + + byte[] fullBuffer = new byte[78]; // Try to read the "missing" 78 bytes + int totalRead = 0; + int offset = 0; + int remaining = 78; + + // Simulate Parquet's H2SeekableInputStream.readFully() loop + while (remaining > 0) { + int read = inputStream.read(fullBuffer, offset, remaining); + if (read == -1) { + fail(String.format( + "Got EOF after reading %d bytes, but expected to read %d more bytes (total requested: 78)", + totalRead, remaining)); + } + assertTrue("Each read() should return positive bytes", read > 0); + totalRead += read; + offset += read; + remaining -= read; + } + + assertEquals("Should read all 78 bytes in readFully loop", 78, totalRead); + assertArrayEquals("Content at offset 1197 should match", + Arrays.copyOfRange(testData, 1197, 1275), fullBuffer); + inputStream.close(); + + // Test 5: Read entire file in one go + inputStream = new SeaweedInputStream(filerClient, testPath, entry); + byte[] allData = new byte[1275]; + bytesRead = inputStream.read(allData, 0, 1275); + assertEquals("Should read entire 1275 bytes", 1275, bytesRead); + assertArrayEquals("Entire content should match", testData, allData); + inputStream.close(); + } }