diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 8764780ba..a30c52834 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -34,6 +34,7 @@ public class SeaweedOutputStream extends OutputStream { private long outputIndex; private String replication = ""; private String collection = ""; + private long totalBytesWritten = 0; // Track total bytes for debugging public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { this(filerClient, fullpath, ""); @@ -115,17 +116,17 @@ public class SeaweedOutputStream extends OutputStream { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { - LOG.info("[DEBUG-2024] ⚠️ flushWrittenBytesToServiceInternal: path={} offset={} #chunks={}", + LOG.info("[DEBUG-2024] ⚠️ flushWrittenBytesToServiceInternal: path={} offset={} #chunks={}", path, offset, entry.getChunksCount()); - + // Set the file size in attributes based on our position // This ensures Parquet footer metadata matches what we actually wrote FilerProto.FuseAttributes.Builder attrBuilder = entry.getAttributes().toBuilder(); attrBuilder.setFileSize(offset); entry.setAttributes(attrBuilder); - + LOG.info("[DEBUG-2024] → Set entry.attributes.fileSize = {} bytes before writeMeta", offset); - + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } catch (Exception ex) { throw new IOException(ex); @@ -151,6 +152,8 @@ public class SeaweedOutputStream extends OutputStream { throw new IndexOutOfBoundsException(); } + totalBytesWritten += length; + // System.out.println(path + " write [" + (outputIndex + off) + "," + // ((outputIndex + off) + length) + ")"); @@ -204,12 +207,13 @@ public class SeaweedOutputStream extends OutputStream { } int bufferPosBeforeFlush = buffer.position(); - LOG.info("[DEBUG-2024] close START: path={} position={} buffer.position()={}", path, position, bufferPosBeforeFlush); + LOG.info("[DEBUG-2024] 🔒 close START: path={} position={} buffer.position()={} totalBytesWritten={}", + path, position, bufferPosBeforeFlush, totalBytesWritten); try { flushInternal(); threadExecutor.shutdown(); - LOG.info("[DEBUG-2024] close END: path={} finalPosition={} (buffer had {} bytes that were flushed)", - path, position, bufferPosBeforeFlush); + LOG.info("[DEBUG-2024] ✅ close END: path={} finalPosition={} totalBytesWritten={} (buffer had {} bytes)", + path, position, totalBytesWritten, bufferPosBeforeFlush); } finally { lastError = new IOException("Stream is closed!"); ByteBufferPool.release(buffer); diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java index 1740312fe..672184b04 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java @@ -4,6 +4,8 @@ package seaweed.hdfs; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import seaweedfs.client.FilerClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedOutputStream; @@ -13,9 +15,13 @@ import java.util.Locale; public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities { + private static final Logger LOG = LoggerFactory.getLogger(SeaweedHadoopOutputStream.class); + public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { super(filerClient, path, entry, position, bufferSize, replication); + LOG.info("[DEBUG-2024] 🔧 SeaweedHadoopOutputStream created: path={} position={} bufferSize={} replication={}", + path, position, bufferSize, replication); } /** diff --git a/test/java/spark/CI_SETUP.md b/test/java/spark/CI_SETUP.md index 962f5a9fe..35b488ede 100644 --- a/test/java/spark/CI_SETUP.md +++ b/test/java/spark/CI_SETUP.md @@ -272,3 +272,4 @@ For persistent issues: - Note if it passes locally + diff --git a/test/java/spark/quick-start.sh b/test/java/spark/quick-start.sh index d674977f1..974363311 100755 --- a/test/java/spark/quick-start.sh +++ b/test/java/spark/quick-start.sh @@ -146,3 +146,4 @@ trap cleanup INT main + diff --git a/test/java/spark/run-tests.sh b/test/java/spark/run-tests.sh index 420141bba..f637c8c59 100755 --- a/test/java/spark/run-tests.sh +++ b/test/java/spark/run-tests.sh @@ -43,3 +43,4 @@ echo "✓ Test run completed" echo "View detailed reports in: target/surefire-reports/" + diff --git a/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java index be0bb395e..8919bfbdf 100644 --- a/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java +++ b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java @@ -142,3 +142,4 @@ public class SparkSeaweedFSExample { } + diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java index 2cfd64a2d..d0c01736a 100644 --- a/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java +++ b/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java @@ -235,3 +235,4 @@ public class SparkSQLTest extends SparkTestBase { } +