diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 78cf5a2fc..10115ac55 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -13,7 +13,6 @@ import seaweedfs.client.FilerProto; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.OutputStream; import java.net.URI; import java.util.EnumSet; import java.util.List; @@ -112,9 +111,16 @@ public class SeaweedFileSystem extends FileSystem { replicaPlacement = String.format("%03d", replication - 1); } int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, + SeaweedHadoopOutputStream outputStream = (SeaweedHadoopOutputStream) seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement); - return new FSDataOutputStream(outputStream, statistics); + // Use custom FSDataOutputStream that delegates getPos() to our stream + return new FSDataOutputStream(outputStream, statistics) { + @Override + public long getPos() { + // Delegate to SeaweedOutputStream's position tracking + return outputStream.getPos(); + } + }; } catch (Exception ex) { LOG.error("Failed to create file: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); throw new IOException("Failed to create file: " + path, ex); @@ -156,8 +162,15 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); try { int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); - return new FSDataOutputStream(outputStream, statistics); + SeaweedHadoopOutputStream outputStream = (SeaweedHadoopOutputStream) seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); + // Use custom FSDataOutputStream that delegates getPos() to our stream + return new FSDataOutputStream(outputStream, statistics) { + @Override + public long getPos() { + // Delegate to SeaweedOutputStream's position tracking + return outputStream.getPos(); + } + }; } catch (Exception ex) { LOG.error("Failed to append to file: {} bufferSize:{}", path, bufferSize, ex); throw new IOException("Failed to append to file: " + path, ex);