Browse Source

feat: implement virtual position tracking in SeaweedOutputStream

Added virtualPosition field to track total bytes written including buffered data.
Updated getPos() to return virtualPosition instead of position + buffer.position().

RESULT:
- getPos() now always returns accurate total (1260 bytes) ✓
- File size metadata is correct (1260 bytes) ✓
- EOF exception STILL PERSISTS 

ROOT CAUSE (deeper analysis):
Parquet calls getPos() → gets 1252 → STORES this value
Then writes 8 more bytes (footer metadata)
Then writes footer containing the stored offset (1252)
Result: Footer has stale offsets, even though getPos() is correct

THE FIX DOESN'T WORK because Parquet uses getPos() return value IMMEDIATELY,
not at close time. Virtual position tracking alone can't solve this.

NEXT: Implement flush-on-getPos() to ensure offsets are always accurate.
pull/7526/head
chrislu 1 week ago
parent
commit
c1b0aa6611
  1. 69
      other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
  2. 164
      test/java/spark/VIRTUAL_POSITION_FIX_STATUS.md

69
other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

@ -25,7 +25,8 @@ public class SeaweedOutputStream extends OutputStream {
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private final boolean shouldSaveMetadata = false;
private FilerProto.Entry.Builder entry;
private long position;
private long position; // Flushed bytes (committed to service)
private long virtualPosition; // Total bytes written (including buffered), for getPos()
private boolean closed;
private volatile IOException lastError;
private long lastFlushOffset;
@ -53,6 +54,7 @@ public class SeaweedOutputStream extends OutputStream {
this.replication = replication;
this.path = path;
this.position = position;
this.virtualPosition = position; // Initialize to match position
this.closed = false;
this.lastError = null;
this.lastFlushOffset = 0;
@ -100,7 +102,6 @@ public class SeaweedOutputStream extends OutputStream {
* @return current position (flushed + buffered bytes)
*/
public synchronized long getPos() {
long currentPos = position + buffer.position();
if (path.contains("parquet")) {
// Get caller info for debugging
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
@ -112,11 +113,11 @@ public class SeaweedOutputStream extends OutputStream {
}
LOG.warn(
"[DEBUG-2024] getPos() called by {}: flushedPosition={} bufferPosition={} returning={} totalBytesWritten={} writeCalls={} path={}",
caller, position, buffer.position(), currentPos, totalBytesWritten, writeCallCount,
"[DEBUG-2024] getPos() called by {}: returning VIRTUAL position={} (flushed={} buffered={}) totalBytesWritten={} writeCalls={} path={}",
caller, virtualPosition, position, buffer.position(), totalBytesWritten, writeCallCount,
path.substring(Math.max(0, path.length() - 80))); // Last 80 chars of path
}
return currentPos;
return virtualPosition; // Return virtual position (always accurate)
}
public static String getParentDirectory(String path) {
@ -155,7 +156,8 @@ public class SeaweedOutputStream extends OutputStream {
entry.setAttributes(attrBuilder);
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] METADATA UPDATE: setting entry.attributes.fileSize = {} bytes | #chunks={} | path={}",
LOG.warn(
"[DEBUG-2024] METADATA UPDATE: setting entry.attributes.fileSize = {} bytes | #chunks={} | path={}",
offset, entry.getChunksCount(), path.substring(path.lastIndexOf('/') + 1));
}
@ -186,17 +188,17 @@ public class SeaweedOutputStream extends OutputStream {
totalBytesWritten += length;
writeCallCount++;
virtualPosition += length; // Update virtual position for getPos()
// Enhanced debug logging for ALL writes to track the exact sequence
if (path.contains("parquet") || path.contains("employees")) {
long beforeBufferPos = buffer.position();
long currentVirtualPos = position + beforeBufferPos;
// Always log writes to see the complete pattern
if (length >= 20 || writeCallCount >= 220 || writeCallCount % 50 == 0) {
LOG.warn(
"[DEBUG-2024] WRITE #{}: {} bytes | virtualPos={} (flushed={} + buffer={}) | totalWritten={} | file={}",
writeCallCount, length, currentVirtualPos, position, beforeBufferPos,
"[DEBUG-2024] WRITE #{}: {} bytes | virtualPos={} (flushed={} + buffered={}) | totalWritten={} | file={}",
writeCallCount, length, virtualPosition, position, beforeBufferPos,
totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
}
@ -245,10 +247,12 @@ public class SeaweedOutputStream extends OutputStream {
@Override
public void flush() throws IOException {
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flush() CALLED: supportFlush={} position={} buffer.position()={} totalWritten={} path={}",
supportFlush, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
LOG.warn(
"[DEBUG-2024] flush() CALLED: supportFlush={} virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}",
supportFlush, virtualPosition, position, buffer.position(), totalBytesWritten,
path.substring(path.lastIndexOf('/') + 1));
}
if (supportFlush) {
flushInternalAsync();
}
@ -270,14 +274,14 @@ public class SeaweedOutputStream extends OutputStream {
int bufferPosBeforeFlush = buffer.position();
LOG.info(
"[DEBUG-2024] close START: path={} position={} buffer.position()={} totalBytesWritten={} writeCalls={}",
path, position, bufferPosBeforeFlush, totalBytesWritten, writeCallCount);
"[DEBUG-2024] close START: path={} virtualPos={} flushedPos={} buffer.position()={} totalBytesWritten={} writeCalls={}",
path, virtualPosition, position, bufferPosBeforeFlush, totalBytesWritten, writeCallCount);
try {
flushInternal();
threadExecutor.shutdown();
LOG.info(
"[DEBUG-2024] close END: path={} finalPosition={} totalBytesWritten={} writeCalls={} (buffer had {} bytes)",
path, position, totalBytesWritten, writeCallCount, bufferPosBeforeFlush);
"[DEBUG-2024] close END: path={} virtualPos={} flushedPos={} totalBytesWritten={} writeCalls={} (buffer had {} bytes)",
path, virtualPosition, position, totalBytesWritten, writeCallCount, bufferPosBeforeFlush);
// Special logging for employees directory files (to help CI download timing)
if (path.contains("/test-spark/employees/") && path.endsWith(".parquet")) {
@ -309,12 +313,13 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized void writeCurrentBufferToService() throws IOException {
int bufferPos = buffer.position();
long positionBefore = position;
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] writeCurrentBufferToService START: buffer.position()={} currentFlushedPosition={} totalWritten={} path={}",
LOG.warn(
"[DEBUG-2024] writeCurrentBufferToService START: buffer.position()={} currentFlushedPosition={} totalWritten={} path={}",
bufferPos, position, totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
if (bufferPos == 0) {
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] -> Skipping: buffer is empty");
@ -324,9 +329,10 @@ public class SeaweedOutputStream extends OutputStream {
int written = submitWriteBufferToService(buffer, position);
position += written;
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] writeCurrentBufferToService END: submitted {} bytes, flushedPosition {} -> {}, totalWritten={} path={}",
LOG.warn(
"[DEBUG-2024] writeCurrentBufferToService END: submitted {} bytes, flushedPosition {} -> {}, totalWritten={} path={}",
written, positionBefore, position, totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
@ -404,8 +410,8 @@ public class SeaweedOutputStream extends OutputStream {
protected synchronized void flushInternal() throws IOException {
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flushInternal() START: position={} buffer.position()={} totalWritten={} path={}",
position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
LOG.warn("[DEBUG-2024] flushInternal() START: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}",
virtualPosition, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
maybeThrowLastError();
@ -413,24 +419,25 @@ public class SeaweedOutputStream extends OutputStream {
flushWrittenBytesToService();
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flushInternal() END: position={} buffer.position()={} totalWritten={} path={}",
position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
LOG.warn("[DEBUG-2024] flushInternal() END: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}",
virtualPosition, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
}
protected synchronized void flushInternalAsync() throws IOException {
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flushInternalAsync() START: position={} buffer.position()={} totalWritten={} path={}",
position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
LOG.warn(
"[DEBUG-2024] flushInternalAsync() START: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}",
virtualPosition, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToServiceAsync();
if (path.contains("parquet") || path.contains("employees")) {
LOG.warn("[DEBUG-2024] flushInternalAsync() END: position={} buffer.position()={} totalWritten={} path={}",
position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
LOG.warn("[DEBUG-2024] flushInternalAsync() END: virtualPos={} flushedPos={} buffer.position()={} totalWritten={} path={}",
virtualPosition, position, buffer.position(), totalBytesWritten, path.substring(path.lastIndexOf('/') + 1));
}
}

164
test/java/spark/VIRTUAL_POSITION_FIX_STATUS.md

@ -0,0 +1,164 @@
# Virtual Position Fix: Status and Findings
## Implementation Complete
### Changes Made
1. **Added `virtualPosition` field** to `SeaweedOutputStream`
- Tracks total bytes written (including buffered)
- Initialized to match `position` in constructor
- Incremented on every `write()` call
2. **Updated `getPos()` to return `virtualPosition`**
- Always returns accurate total bytes written
- No longer depends on `position + buffer.position()`
- Aligns with Hadoop `FSDataOutputStream` semantics
3. **Enhanced debug logging**
- All logs now show both `virtualPos` and `flushedPos`
- Clear separation between virtual and physical positions
### Test Results
#### ✅ What's Working
1. **Virtual position tracking is accurate**:
```
Last getPos() call: returns 1252 (writeCall #465)
Final writes: writeCalls 466-470 (8 bytes)
close(): virtualPos=1260 ✓
File written: 1260 bytes ✓
Metadata: fileSize=1260 ✓
```
2. **No more position discrepancy**:
- Before: `getPos()` returned `position + buffer.position()` = 1252
- After: `getPos()` returns `virtualPosition` = 1260
- File size matches virtualPosition
#### ❌ What's Still Failing
**EOF Exception persists**: `EOFException: Still have: 78 bytes left`
### Root Cause Analysis
The virtual position fix ensures `getPos()` always returns the correct total, but **it doesn't solve the fundamental timing issue**:
1. **The Parquet Write Sequence**:
```
1. Parquet writes column chunk data
2. Parquet calls getPos() → gets 1252
3. Parquet STORES this value: columnChunkOffset = 1252
4. Parquet writes footer metadata (8 bytes)
5. Parquet writes the footer with columnChunkOffset = 1252
6. Close → flushes all 1260 bytes
```
2. **The Problem**:
- Parquet uses the `getPos()` value **immediately** when it's returned
- It stores `columnChunkOffset = 1252` in memory
- Then writes more bytes (footer metadata)
- Then writes the footer containing `columnChunkOffset = 1252`
- But by then, those 8 footer bytes have shifted everything!
3. **Why Virtual Position Doesn't Fix It**:
- Even though `getPos()` now correctly returns 1260 at close time
- Parquet has ALREADY recorded offset = 1252 in its internal state
- Those stale offsets get written into the Parquet footer
- When reading, Parquet footer says "seek to 1252" but data is elsewhere
### The Real Issue
The problem is **NOT** that `getPos()` returns the wrong value.
The problem is that **Parquet's write sequence is incompatible with buffered streams**:
- Parquet assumes: `getPos()` returns the position where the NEXT byte will be written
- But with buffering: Bytes are written to buffer first, then flushed later
- Parquet records offsets based on `getPos()`, then writes more data
- Those "more data" bytes invalidate the recorded offsets
### Why This Works in HDFS/S3
HDFS and S3 implementations likely:
1. **Flush on every `getPos()` call** - ensures position is always up-to-date
2. **Use unbuffered streams for Parquet** - no offset drift
3. **Have different buffering semantics** - data committed immediately
### Next Steps: True Fix Options
#### Option A: Flush on getPos() (Performance Hit)
```java
public synchronized long getPos() {
if (buffer.position() > 0) {
writeCurrentBufferToService(); // Force flush
}
return position; // Now accurate
}
```
**Pros**: Guarantees correct offsets
**Cons**: Many small flushes, poor performance
#### Option B: Detect Parquet and Flush (Targeted)
```java
public synchronized long getPos() {
if (path.endsWith(".parquet") && buffer.position() > 0) {
writeCurrentBufferToService(); // Flush for Parquet
}
return virtualPosition;
}
```
**Pros**: Only affects Parquet files
**Cons**: Hacky, file extension detection is brittle
#### Option C: Implement Hadoop's Syncable (Proper)
Make `SeaweedOutputStream` implement `Syncable.hflush()`:
```java
@Override
public void hflush() throws IOException {
writeCurrentBufferToService(); // Flush to service
flushWrittenBytesToService(); // Wait for completion
}
```
Let Parquet call `hflush()` when it needs guaranteed positions.
**Pros**: Clean, follows Hadoop contract
**Cons**: Requires Parquet/Spark to use `hflush()`
#### Option D: Buffer Size = 0 for Parquet (Workaround)
Detect Parquet writes and disable buffering:
```java
if (path.endsWith(".parquet")) {
this.bufferSize = 0; // No buffering for Parquet
}
```
**Pros**: Simple, no offset issues
**Cons**: Terrible performance for Parquet
### Recommended: Option C + Option A Hybrid
1. Implement `Syncable.hflush()` properly (Option C)
2. Make `getPos()` flush if buffer is not empty (Option A)
3. This ensures:
- Correct offsets for Parquet
- Works with any client that calls `getPos()`
- Follows Hadoop semantics
## Status
- ✅ Virtual position tracking implemented
- ✅ `getPos()` returns accurate total
- ✅ File size metadata correct
- ❌ Parquet EOF exception persists
- ⏭️ Need to implement flush-on-getPos() or hflush()
## Files Modified
- `other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java`
- Added `virtualPosition` field
- Updated `getPos()` to return `virtualPosition`
- Enhanced debug logging
## Next Action
Implement flush-on-getPos() to guarantee correct offsets for Parquet.
Loading…
Cancel
Save