You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

5.3 KiB

Virtual Position Fix: Status and Findings

Implementation Complete

Changes Made

  1. Added virtualPosition field to SeaweedOutputStream

    • Tracks total bytes written (including buffered)
    • Initialized to match position in constructor
    • Incremented on every write() call
  2. Updated getPos() to return virtualPosition

    • Always returns accurate total bytes written
    • No longer depends on position + buffer.position()
    • Aligns with Hadoop FSDataOutputStream semantics
  3. Enhanced debug logging

    • All logs now show both virtualPos and flushedPos
    • Clear separation between virtual and physical positions

Test Results

What's Working

  1. Virtual position tracking is accurate:

    Last getPos() call: returns 1252 (writeCall #465)
    Final writes: writeCalls 466-470 (8 bytes)
    close(): virtualPos=1260 ✓
    File written: 1260 bytes ✓
    Metadata: fileSize=1260 ✓
    
  2. No more position discrepancy:

    • Before: getPos() returned position + buffer.position() = 1252
    • After: getPos() returns virtualPosition = 1260
    • File size matches virtualPosition

What's Still Failing

EOF Exception persists: EOFException: Still have: 78 bytes left

Root Cause Analysis

The virtual position fix ensures getPos() always returns the correct total, but it doesn't solve the fundamental timing issue:

  1. The Parquet Write Sequence:

    1. Parquet writes column chunk data
    2. Parquet calls getPos() → gets 1252
    3. Parquet STORES this value: columnChunkOffset = 1252
    4. Parquet writes footer metadata (8 bytes)
    5. Parquet writes the footer with columnChunkOffset = 1252
    6. Close → flushes all 1260 bytes
    
  2. The Problem:

    • Parquet uses the getPos() value immediately when it's returned
    • It stores columnChunkOffset = 1252 in memory
    • Then writes more bytes (footer metadata)
    • Then writes the footer containing columnChunkOffset = 1252
    • But by then, those 8 footer bytes have shifted everything!
  3. Why Virtual Position Doesn't Fix It:

    • Even though getPos() now correctly returns 1260 at close time
    • Parquet has ALREADY recorded offset = 1252 in its internal state
    • Those stale offsets get written into the Parquet footer
    • When reading, Parquet footer says "seek to 1252" but data is elsewhere

The Real Issue

The problem is NOT that getPos() returns the wrong value. The problem is that Parquet's write sequence is incompatible with buffered streams:

  • Parquet assumes: getPos() returns the position where the NEXT byte will be written
  • But with buffering: Bytes are written to buffer first, then flushed later
  • Parquet records offsets based on getPos(), then writes more data
  • Those "more data" bytes invalidate the recorded offsets

Why This Works in HDFS/S3

HDFS and S3 implementations likely:

  1. Flush on every getPos() call - ensures position is always up-to-date
  2. Use unbuffered streams for Parquet - no offset drift
  3. Have different buffering semantics - data committed immediately

Next Steps: True Fix Options

Option A: Flush on getPos() (Performance Hit)

public synchronized long getPos() {
    if (buffer.position() > 0) {
        writeCurrentBufferToService();  // Force flush
    }
    return position;  // Now accurate
}

Pros: Guarantees correct offsets
Cons: Many small flushes, poor performance

Option B: Detect Parquet and Flush (Targeted)

public synchronized long getPos() {
    if (path.endsWith(".parquet") && buffer.position() > 0) {
        writeCurrentBufferToService();  // Flush for Parquet
    }
    return virtualPosition;
}

Pros: Only affects Parquet files
Cons: Hacky, file extension detection is brittle

Option C: Implement Hadoop's Syncable (Proper)

Make SeaweedOutputStream implement Syncable.hflush():

@Override
public void hflush() throws IOException {
    writeCurrentBufferToService();  // Flush to service
    flushWrittenBytesToService();   // Wait for completion
}

Let Parquet call hflush() when it needs guaranteed positions.

Pros: Clean, follows Hadoop contract
Cons: Requires Parquet/Spark to use hflush()

Option D: Buffer Size = 0 for Parquet (Workaround)

Detect Parquet writes and disable buffering:

if (path.endsWith(".parquet")) {
    this.bufferSize = 0;  // No buffering for Parquet
}

Pros: Simple, no offset issues
Cons: Terrible performance for Parquet

  1. Implement Syncable.hflush() properly (Option C)
  2. Make getPos() flush if buffer is not empty (Option A)
  3. This ensures:
    • Correct offsets for Parquet
    • Works with any client that calls getPos()
    • Follows Hadoop semantics

Status

  • Virtual position tracking implemented
  • getPos() returns accurate total
  • File size metadata correct
  • Parquet EOF exception persists
  • ⏭️ Need to implement flush-on-getPos() or hflush()

Files Modified

  • other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
    • Added virtualPosition field
    • Updated getPos() to return virtualPosition
    • Enhanced debug logging

Next Action

Implement flush-on-getPos() to guarantee correct offsets for Parquet.