Browse Source

fix: Override FSDataOutputStream.getPos() to use SeaweedOutputStream position

CRITICAL FIX for Parquet 78-byte EOF error!

Root Cause Analysis:
- Hadoop's FSDataOutputStream tracks position with an internal counter
- It does NOT call SeaweedOutputStream.getPos() by default
- When Parquet writes data and calls getPos() to record column chunk offsets,
  it gets FSDataOutputStream's counter, not SeaweedOutputStream's actual position
- This creates a 78-byte mismatch between recorded offsets and actual file size
- Result: EOFException when reading (tries to read beyond file end)

The Fix:
- Override getPos() in the anonymous FSDataOutputStream subclass
- Delegate to SeaweedOutputStream.getPos() which returns 'position + buffer.position()'
- This ensures Parquet gets the correct position when recording metadata
- Column chunk offsets in footer will now match actual data positions

This should fix the consistent 78-byte discrepancy we've been seeing across
all Parquet file writes (regardless of file size: 684, 693, 1275 bytes, etc.)
pull/7526/head
chrislu 1 week ago
parent
commit
9e7ed48688
  1. 23
      other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java

23
other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java

@ -13,7 +13,6 @@ import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet;
import java.util.List;
@ -112,9 +111,16 @@ public class SeaweedFileSystem extends FileSystem {
replicaPlacement = String.format("%03d", replication - 1);
}
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission,
SeaweedHadoopOutputStream outputStream = (SeaweedHadoopOutputStream) seaweedFileSystemStore.createFile(path, overwrite, permission,
seaweedBufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
// Use custom FSDataOutputStream that delegates getPos() to our stream
return new FSDataOutputStream(outputStream, statistics) {
@Override
public long getPos() {
// Delegate to SeaweedOutputStream's position tracking
return outputStream.getPos();
}
};
} catch (Exception ex) {
LOG.error("Failed to create file: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
throw new IOException("Failed to create file: " + path, ex);
@ -156,8 +162,15 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path);
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, "");
return new FSDataOutputStream(outputStream, statistics);
SeaweedHadoopOutputStream outputStream = (SeaweedHadoopOutputStream) seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, "");
// Use custom FSDataOutputStream that delegates getPos() to our stream
return new FSDataOutputStream(outputStream, statistics) {
@Override
public long getPos() {
// Delegate to SeaweedOutputStream's position tracking
return outputStream.getPos();
}
};
} catch (Exception ex) {
LOG.error("Failed to append to file: {} bufferSize:{}", path, bufferSize, ex);
throw new IOException("Failed to append to file: " + path, ex);

Loading…
Cancel
Save