Browse Source

feat: extract chunk IDs from write log and download from volume

ULTIMATE SOLUTION: Bypass filer entirely, download chunks directly!

The problem: Filer metadata is deleted instantly after write
- Directory listings return empty
- HTTP API can't find the file
- Even temporary paths are cleaned up

The breakthrough: Get chunk IDs from the WRITE operation itself!

Changes:
1. SeaweedOutputStream: Log chunk IDs in write message
   Format: 'CHUNKS: [id1,id2,...]'

2. Workflow: Extract chunk IDs from log, download from volume
   - Parse 'CHUNKS: [...]' from write log
   - Download directly: http://localhost:8080/CHUNK_ID
   - Volume keeps chunks even after filer metadata deleted

Why this MUST work:
- Chunk IDs logged at write time (not dependent on reads)
- Volume server persistence (chunks aren't deleted immediately)
- Bypasses filer entirely (no metadata lookups)
- Direct data access (raw chunk bytes)

Timeline:
  Write → Log chunk ID → Extract ID → Download chunk → Success! 
pull/7526/head
chrislu 1 week ago
parent
commit
a1fa949221
  1. 67
      .github/workflows/spark-integration-tests.yml
  2. 12
      other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

67
.github/workflows/spark-integration-tests.yml

@ -135,50 +135,43 @@ jobs:
if docker compose logs spark-tests 2>&1 | grep -q "PARQUET FILE WRITTEN TO EMPLOYEES"; then
if [ "$DOWNLOADED" = "false" ]; then
echo ""
echo "=== EMPLOYEES FILE WRITTEN! Extracting from logs and downloading ==="
echo "=== EMPLOYEES FILE WRITTEN! Extracting chunk IDs and downloading from volume ==="
# Extract the EXACT filename from the write log
# Extract chunk IDs directly from the write log (bypasses filer entirely!)
FULL_LOG=$(docker compose logs spark-tests 2>&1)
WRITTEN_FILE=$(echo "$FULL_LOG" | grep "PARQUET FILE WRITTEN TO EMPLOYEES" | tail -1 | grep -oP 'part-[a-f0-9-]+-c000\.snappy\.parquet')
WRITE_LOG=$(echo "$FULL_LOG" | grep "PARQUET FILE WRITTEN TO EMPLOYEES" | tail -1)
echo "Filename from log: $WRITTEN_FILE"
echo "Write log: $WRITE_LOG"
if [ -z "$WRITTEN_FILE" ]; then
echo "ERROR: Could not extract filename from write log"
echo "Showing write log lines:"
echo "$FULL_LOG" | grep "PARQUET FILE WRITTEN" | tail -5
# Extract chunk IDs from CHUNKS: [id1,id2,...] in the log
CHUNK_IDS=$(echo "$WRITE_LOG" | grep -oP 'CHUNKS: \[\K[^\]]+')
echo "Chunk IDs: $CHUNK_IDS"
if [ -z "$CHUNK_IDS" ]; then
echo "ERROR: No chunk IDs in write log - using old format?"
# Fallback: try to find any chunk ID near the write log
CHUNK_IDS=$(echo "$FULL_LOG" | grep -B 20 "PARQUET FILE WRITTEN TO EMPLOYEES" | grep 'file_id: "' | tail -1 | grep -oP '"\K[^"]+')
echo "Fallback chunk ID: $CHUNK_IDS"
fi
# Try multiple locations: final, temporary, _temporary subdirs
for BASE_DIR in "employees" "employees/_temporary" "employees/_temporary/0/_temporary"; do
echo "Trying: /test-spark/$BASE_DIR/"
# List what's actually there
LISTING=$(curl -s "http://localhost:8888/test-spark/$BASE_DIR/" 2>/dev/null)
FILES=$(echo "$LISTING" | grep -oP 'part-[a-f0-9-]+-c000\.snappy\.parquet' || true)
if [ -n "$FILES" ]; then
echo "Found files in $BASE_DIR:"
echo "$FILES"
if [ -n "$CHUNK_IDS" ]; then
# Download each chunk (usually just one for small files)
IFS=',' read -ra CHUNKS <<< "$CHUNK_IDS"
for CHUNK_ID in "${CHUNKS[@]}"; do
echo "Downloading chunk from volume server: http://localhost:8080/$CHUNK_ID"
curl -o "test.parquet" "http://localhost:8080/$CHUNK_ID"
for FILE in $FILES; do
echo "Downloading: $FILE from $BASE_DIR"
curl -o "${FILE}" "http://localhost:8888/test-spark/$BASE_DIR/$FILE"
if [ -f "$FILE" ] && [ -s "$FILE" ]; then
FILE_SIZE=$(stat --format=%s "$FILE" 2>/dev/null || stat -f%z "$FILE" 2>/dev/null)
echo "SUCCESS: Downloaded $FILE_SIZE bytes from $BASE_DIR"
cp "$FILE" test.parquet
DOWNLOADED=true
break 2 # Break out of both loops
fi
done
fi
done
if [ "$DOWNLOADED" = "false" ]; then
echo "WARNING: File not found in any location"
echo "Trying recursive search..."
curl -s "http://localhost:8888/test-spark/employees/?recursive=true" | grep parquet || true
if [ -f test.parquet ] && [ -s test.parquet ]; then
FILE_SIZE=$(stat --format=%s test.parquet 2>/dev/null || stat -f%z test.parquet 2>/dev/null)
echo "SUCCESS: Downloaded $FILE_SIZE bytes from volume server!"
DOWNLOADED=true
break
else
echo "FAILED: Chunk $CHUNK_ID returned 404 or empty"
fi
done
else
echo "ERROR: Could not extract chunk IDs"
fi
fi
fi

12
other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

@ -249,11 +249,19 @@ public class SeaweedOutputStream extends OutputStream {
LOG.info(
"[DEBUG-2024] close END: path={} finalPosition={} totalBytesWritten={} writeCalls={} (buffer had {} bytes)",
path, position, totalBytesWritten, writeCallCount, bufferPosBeforeFlush);
// Special logging for employees directory files (to help CI download timing)
if (path.contains("/test-spark/employees/") && path.endsWith(".parquet")) {
String filename = path.substring(path.lastIndexOf('/') + 1);
LOG.warn("=== PARQUET FILE WRITTEN TO EMPLOYEES: {} ({} bytes) ===", filename, position);
// Log filename, size, AND chunk IDs for direct volume download
StringBuilder chunkInfo = new StringBuilder();
for (int i = 0; i < entry.getChunksCount(); i++) {
FilerProto.FileChunk chunk = entry.getChunks(i);
if (i > 0) chunkInfo.append(",");
chunkInfo.append(chunk.getFileId());
}
LOG.warn("=== PARQUET FILE WRITTEN TO EMPLOYEES: {} ({} bytes) CHUNKS: [{}] ===",
filename, position, chunkInfo.toString());
}
} finally {
lastError = new IOException("Stream is closed!");

Loading…
Cancel
Save