diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 0fd528362..8764780ba 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -44,7 +44,7 @@ public class SeaweedOutputStream extends OutputStream { } public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, - final long position, final int bufferSize, final String replication) { + final long position, final int bufferSize, final String replication) { this.filerClient = filerClient; this.replication = replication; this.path = path; @@ -58,8 +58,7 @@ public class SeaweedOutputStream extends OutputStream { this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, + this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount, maxConcurrentRequestCount, 120L, TimeUnit.SECONDS, @@ -77,8 +76,7 @@ public class SeaweedOutputStream extends OutputStream { .setFileMode(0755) .setCrtime(now) .setMtime(now) - .clearGroupName() - ); + .clearGroupName()); } } @@ -86,6 +84,7 @@ public class SeaweedOutputStream extends OutputStream { public void setReplication(String replication) { this.replication = replication; } + public void setCollection(String collection) { this.collection = collection; } @@ -93,7 +92,7 @@ public class SeaweedOutputStream extends OutputStream { public static String getParentDirectory(String path) { int protoIndex = path.indexOf("://"); if (protoIndex >= 0) { - int pathStart = path.indexOf("/", protoIndex+3); + int pathStart = path.indexOf("/", protoIndex + 3); path = path.substring(pathStart); } if (path.equals("/")) { @@ -116,6 +115,17 @@ public class SeaweedOutputStream extends OutputStream { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { + LOG.info("[DEBUG-2024] ⚠️ flushWrittenBytesToServiceInternal: path={} offset={} #chunks={}", + path, offset, entry.getChunksCount()); + + // Set the file size in attributes based on our position + // This ensures Parquet footer metadata matches what we actually wrote + FilerProto.FuseAttributes.Builder attrBuilder = entry.getAttributes().toBuilder(); + attrBuilder.setFileSize(offset); + entry.setAttributes(attrBuilder); + + LOG.info("[DEBUG-2024] → Set entry.attributes.fileSize = {} bytes before writeMeta", offset); + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } catch (Exception ex) { throw new IOException(ex); @@ -125,7 +135,7 @@ public class SeaweedOutputStream extends OutputStream { @Override public void write(final int byteVal) throws IOException { - write(new byte[]{(byte) (byteVal & 0xFF)}); + write(new byte[] { (byte) (byteVal & 0xFF) }); } @Override @@ -141,7 +151,8 @@ public class SeaweedOutputStream extends OutputStream { throw new IndexOutOfBoundsException(); } - // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); + // System.out.println(path + " write [" + (outputIndex + off) + "," + + // ((outputIndex + off) + length) + ")"); int currentOffset = off; int writableBytes = bufferSize - buffer.position(); @@ -154,7 +165,8 @@ public class SeaweedOutputStream extends OutputStream { break; } - // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + + // ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); buffer.put(data, currentOffset, writableBytes); currentOffset += writableBytes; writeCurrentBufferToService(); @@ -191,11 +203,13 @@ public class SeaweedOutputStream extends OutputStream { return; } - LOG.info("[DEBUG-2024] close: path={} totalPosition={} buffer.position()={}", path, position, buffer.position()); + int bufferPosBeforeFlush = buffer.position(); + LOG.info("[DEBUG-2024] close START: path={} position={} buffer.position()={}", path, position, bufferPosBeforeFlush); try { flushInternal(); threadExecutor.shutdown(); - LOG.info("close completed: path={} finalPosition={}", path, position); + LOG.info("[DEBUG-2024] close END: path={} finalPosition={} (buffer had {} bytes that were flushed)", + path, position, bufferPosBeforeFlush); } finally { lastError = new IOException("Stream is closed!"); ByteBufferPool.release(buffer); @@ -211,7 +225,8 @@ public class SeaweedOutputStream extends OutputStream { private synchronized void writeCurrentBufferToService() throws IOException { int bufferPos = buffer.position(); - LOG.info("[DEBUG-2024] writeCurrentBufferToService: path={} buffer.position()={} totalPosition={}", path, bufferPos, position); + LOG.info("[DEBUG-2024] writeCurrentBufferToService: path={} buffer.position()={} totalPosition={}", path, + bufferPos, position); if (bufferPos == 0) { LOG.info(" → Skipping write, buffer is empty"); return; @@ -225,18 +240,22 @@ public class SeaweedOutputStream extends OutputStream { } - private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { + private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) + throws IOException { - ((Buffer)bufferToWrite).flip(); + ((Buffer) bufferToWrite).flip(); int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { waitForTaskToComplete(); } final Future job = completionService.submit(() -> { - // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); - SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); - // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + // System.out.println(path + " is going to save [" + (writePosition) + "," + + // ((writePosition) + bytesLength) + ")"); + SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(), + bufferToWrite.position(), bufferToWrite.limit(), path); + // System.out.println(path + " saved [" + (writePosition) + "," + + // ((writePosition) + bytesLength) + ")"); ByteBufferPool.release(bufferToWrite); return null; }); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 8037435fd..2f62827cd 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -125,7 +125,8 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); - LOG.info("[DEBUG-2024] ✓ Wrote chunk to {} at offset {} size {} bytes, etag: {}", targetUrl, offset, bytesLength, etag); + LOG.info("[DEBUG-2024] ✓ Wrote chunk to {} at offset {} size {} bytes, etag: {}", targetUrl, offset, + bytesLength, etag); return FilerProto.FileChunk.newBuilder() .setFileId(fileId) @@ -147,8 +148,17 @@ public class SeaweedWrite { for (FilerProto.FileChunk chunk : chunks) { totalSize = Math.max(totalSize, chunk.getOffset() + chunk.getSize()); } - LOG.info("✓ Writing metadata to {} with {} chunks, total size: {} bytes", - parentDirectory + "/" + entry.getName(), chunks.size(), totalSize); + + // Check if there's a size mismatch with attributes + long attrFileSize = entry.getAttributes().getFileSize(); + LOG.info( + "[DEBUG-2024] ✓ Writing metadata to {} with {} chunks, totalSize from chunks: {} bytes, attr.fileSize: {} bytes{}", + parentDirectory + "/" + entry.getName(), + chunks.size(), + totalSize, + attrFileSize, + (totalSize != attrFileSize) ? " ⚠️ MISMATCH!" : ""); + entry.clearChunks(); entry.addAllChunks(chunks); filerClient.getBlockingStub().createEntry( diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index d3c2751a5..c6579c3fb 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -572,7 +572,7 @@ - 3.80 + 3.80.1-SNAPSHOT 3.4.0