Browse Source

debug: track stream lifecycle and total bytes written

Added comprehensive logging to identify why Parquet files fail with
'EOFException: Still have: 78 bytes left'.

Key additions:
1. SeaweedHadoopOutputStream constructor logging with 🔧 marker
   - Shows when output streams are created
   - Logs path, position, bufferSize, replication

2. totalBytesWritten counter in SeaweedOutputStream
   - Tracks cumulative bytes written via write() calls
   - Helps identify if Parquet wrote 762 bytes but only 684 reached chunks

3. Enhanced close() logging with 🔒 and  markers
   - Shows totalBytesWritten vs position vs buffer.position()
   - If totalBytesWritten=762 but position=684, write submission failed
   - If buffer.position()=78 at close, buffer wasn't flushed

Expected scenarios in next run:
A) Stream never created → No 🔧 log for .parquet files
B) Write failed → totalBytesWritten=762 but position=684
C) Buffer not flushed → buffer.position()=78 at close
D) All correct → totalBytesWritten=position=684, but Parquet expects 762

This will pinpoint whether the issue is in:
- Stream creation/lifecycle
- Write submission
- Buffer flushing
- Or Parquet's internal state
pull/7526/head
chrislu 4 weeks ago
parent
commit
a3cf4eb843
  1. 18
      other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
  2. 6
      other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
  3. 1
      test/java/spark/CI_SETUP.md
  4. 1
      test/java/spark/quick-start.sh
  5. 1
      test/java/spark/run-tests.sh
  6. 1
      test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java
  7. 1
      test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java

18
other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

@ -34,6 +34,7 @@ public class SeaweedOutputStream extends OutputStream {
private long outputIndex; private long outputIndex;
private String replication = ""; private String replication = "";
private String collection = ""; private String collection = "";
private long totalBytesWritten = 0; // Track total bytes for debugging
public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { public SeaweedOutputStream(FilerClient filerClient, final String fullpath) {
this(filerClient, fullpath, ""); this(filerClient, fullpath, "");
@ -115,17 +116,17 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try { try {
LOG.info("[DEBUG-2024] ⚠️ flushWrittenBytesToServiceInternal: path={} offset={} #chunks={}",
LOG.info("[DEBUG-2024] ⚠️ flushWrittenBytesToServiceInternal: path={} offset={} #chunks={}",
path, offset, entry.getChunksCount()); path, offset, entry.getChunksCount());
// Set the file size in attributes based on our position // Set the file size in attributes based on our position
// This ensures Parquet footer metadata matches what we actually wrote // This ensures Parquet footer metadata matches what we actually wrote
FilerProto.FuseAttributes.Builder attrBuilder = entry.getAttributes().toBuilder(); FilerProto.FuseAttributes.Builder attrBuilder = entry.getAttributes().toBuilder();
attrBuilder.setFileSize(offset); attrBuilder.setFileSize(offset);
entry.setAttributes(attrBuilder); entry.setAttributes(attrBuilder);
LOG.info("[DEBUG-2024] → Set entry.attributes.fileSize = {} bytes before writeMeta", offset); LOG.info("[DEBUG-2024] → Set entry.attributes.fileSize = {} bytes before writeMeta", offset);
SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
} catch (Exception ex) { } catch (Exception ex) {
throw new IOException(ex); throw new IOException(ex);
@ -151,6 +152,8 @@ public class SeaweedOutputStream extends OutputStream {
throw new IndexOutOfBoundsException(); throw new IndexOutOfBoundsException();
} }
totalBytesWritten += length;
// System.out.println(path + " write [" + (outputIndex + off) + "," + // System.out.println(path + " write [" + (outputIndex + off) + "," +
// ((outputIndex + off) + length) + ")"); // ((outputIndex + off) + length) + ")");
@ -204,12 +207,13 @@ public class SeaweedOutputStream extends OutputStream {
} }
int bufferPosBeforeFlush = buffer.position(); int bufferPosBeforeFlush = buffer.position();
LOG.info("[DEBUG-2024] close START: path={} position={} buffer.position()={}", path, position, bufferPosBeforeFlush);
LOG.info("[DEBUG-2024] 🔒 close START: path={} position={} buffer.position()={} totalBytesWritten={}",
path, position, bufferPosBeforeFlush, totalBytesWritten);
try { try {
flushInternal(); flushInternal();
threadExecutor.shutdown(); threadExecutor.shutdown();
LOG.info("[DEBUG-2024] close END: path={} finalPosition={} (buffer had {} bytes that were flushed)",
path, position, bufferPosBeforeFlush);
LOG.info("[DEBUG-2024] close END: path={} finalPosition={} totalBytesWritten={} (buffer had {} bytes)",
path, position, totalBytesWritten, bufferPosBeforeFlush);
} finally { } finally {
lastError = new IOException("Stream is closed!"); lastError = new IOException("Stream is closed!");
ByteBufferPool.release(buffer); ByteBufferPool.release(buffer);

6
other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java

@ -4,6 +4,8 @@ package seaweed.hdfs;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.Syncable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerClient; import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerProto; import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedOutputStream; import seaweedfs.client.SeaweedOutputStream;
@ -13,9 +15,13 @@ import java.util.Locale;
public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities { public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedHadoopOutputStream.class);
public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) { final long position, final int bufferSize, final String replication) {
super(filerClient, path, entry, position, bufferSize, replication); super(filerClient, path, entry, position, bufferSize, replication);
LOG.info("[DEBUG-2024] 🔧 SeaweedHadoopOutputStream created: path={} position={} bufferSize={} replication={}",
path, position, bufferSize, replication);
} }
/** /**

1
test/java/spark/CI_SETUP.md

@ -272,3 +272,4 @@ For persistent issues:
- Note if it passes locally - Note if it passes locally

1
test/java/spark/quick-start.sh

@ -146,3 +146,4 @@ trap cleanup INT
main main

1
test/java/spark/run-tests.sh

@ -43,3 +43,4 @@ echo "✓ Test run completed"
echo "View detailed reports in: target/surefire-reports/" echo "View detailed reports in: target/surefire-reports/"

1
test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java

@ -142,3 +142,4 @@ public class SparkSeaweedFSExample {
} }

1
test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java

@ -235,3 +235,4 @@ public class SparkSQLTest extends SparkTestBase {
} }
Loading…
Cancel
Save