@ -58,8 +58,7 @@ public class SeaweedOutputStream extends OutputStream {
this . maxConcurrentRequestCount = Runtime . getRuntime ( ) . availableProcessors ( ) ;
this . maxConcurrentRequestCount = Runtime . getRuntime ( ) . availableProcessors ( ) ;
this . threadExecutor
= new ThreadPoolExecutor ( maxConcurrentRequestCount ,
this . threadExecutor = new ThreadPoolExecutor ( maxConcurrentRequestCount ,
maxConcurrentRequestCount ,
maxConcurrentRequestCount ,
120L ,
120L ,
TimeUnit . SECONDS ,
TimeUnit . SECONDS ,
@ -77,8 +76,7 @@ public class SeaweedOutputStream extends OutputStream {
. setFileMode ( 0755 )
. setFileMode ( 0755 )
. setCrtime ( now )
. setCrtime ( now )
. setMtime ( now )
. setMtime ( now )
. clearGroupName ( )
) ;
. clearGroupName ( ) ) ;
}
}
}
}
@ -86,6 +84,7 @@ public class SeaweedOutputStream extends OutputStream {
public void setReplication ( String replication ) {
public void setReplication ( String replication ) {
this . replication = replication ;
this . replication = replication ;
}
}
public void setCollection ( String collection ) {
public void setCollection ( String collection ) {
this . collection = collection ;
this . collection = collection ;
}
}
@ -93,7 +92,7 @@ public class SeaweedOutputStream extends OutputStream {
public static String getParentDirectory ( String path ) {
public static String getParentDirectory ( String path ) {
int protoIndex = path . indexOf ( "://" ) ;
int protoIndex = path . indexOf ( "://" ) ;
if ( protoIndex > = 0 ) {
if ( protoIndex > = 0 ) {
int pathStart = path . indexOf ( "/" , protoIndex + 3 ) ;
int pathStart = path . indexOf ( "/" , protoIndex + 3 ) ;
path = path . substring ( pathStart ) ;
path = path . substring ( pathStart ) ;
}
}
if ( path . equals ( "/" ) ) {
if ( path . equals ( "/" ) ) {
@ -116,6 +115,17 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized void flushWrittenBytesToServiceInternal ( final long offset ) throws IOException {
private synchronized void flushWrittenBytesToServiceInternal ( final long offset ) throws IOException {
try {
try {
LOG . info ( "[DEBUG-2024] ⚠️ flushWrittenBytesToServiceInternal: path={} offset={} #chunks={}" ,
path , offset , entry . getChunksCount ( ) ) ;
/ / Set the file size in attributes based on our position
/ / This ensures Parquet footer metadata matches what we actually wrote
FilerProto . FuseAttributes . Builder attrBuilder = entry . getAttributes ( ) . toBuilder ( ) ;
attrBuilder . setFileSize ( offset ) ;
entry . setAttributes ( attrBuilder ) ;
LOG . info ( "[DEBUG-2024] → Set entry.attributes.fileSize = {} bytes before writeMeta" , offset ) ;
SeaweedWrite . writeMeta ( filerClient , getParentDirectory ( path ) , entry ) ;
SeaweedWrite . writeMeta ( filerClient , getParentDirectory ( path ) , entry ) ;
} catch ( Exception ex ) {
} catch ( Exception ex ) {
throw new IOException ( ex ) ;
throw new IOException ( ex ) ;
@ -125,7 +135,7 @@ public class SeaweedOutputStream extends OutputStream {
@Override
@Override
public void write ( final int byteVal ) throws IOException {
public void write ( final int byteVal ) throws IOException {
write ( new byte [ ] { ( byte ) ( byteVal & 0xFF ) } ) ;
write ( new byte [ ] { ( byte ) ( byteVal & 0xFF ) } ) ;
}
}
@Override
@Override
@ -141,7 +151,8 @@ public class SeaweedOutputStream extends OutputStream {
throw new IndexOutOfBoundsException ( ) ;
throw new IndexOutOfBoundsException ( ) ;
}
}
/ / System . out . println ( path + " write [" + ( outputIndex + off ) + "," + ( ( outputIndex + off ) + length ) + ")" ) ;
/ / System . out . println ( path + " write [" + ( outputIndex + off ) + "," +
/ / ( ( outputIndex + off ) + length ) + ")" ) ;
int currentOffset = off ;
int currentOffset = off ;
int writableBytes = bufferSize - buffer . position ( ) ;
int writableBytes = bufferSize - buffer . position ( ) ;
@ -154,7 +165,8 @@ public class SeaweedOutputStream extends OutputStream {
break ;
break ;
}
}
/ / System . out . println ( path + " [" + ( outputIndex + currentOffset ) + "," + ( ( outputIndex + currentOffset ) + writableBytes ) + ") " + buffer . capacity ( ) ) ;
/ / System . out . println ( path + " [" + ( outputIndex + currentOffset ) + "," +
/ / ( ( outputIndex + currentOffset ) + writableBytes ) + ") " + buffer . capacity ( ) ) ;
buffer . put ( data , currentOffset , writableBytes ) ;
buffer . put ( data , currentOffset , writableBytes ) ;
currentOffset + = writableBytes ;
currentOffset + = writableBytes ;
writeCurrentBufferToService ( ) ;
writeCurrentBufferToService ( ) ;
@ -191,11 +203,13 @@ public class SeaweedOutputStream extends OutputStream {
return ;
return ;
}
}
LOG . info ( "[DEBUG-2024] close: path={} totalPosition={} buffer.position()={}" , path , position , buffer . position ( ) ) ;
int bufferPosBeforeFlush = buffer . position ( ) ;
LOG . info ( "[DEBUG-2024] close START: path={} position={} buffer.position()={}" , path , position , bufferPosBeforeFlush ) ;
try {
try {
flushInternal ( ) ;
flushInternal ( ) ;
threadExecutor . shutdown ( ) ;
threadExecutor . shutdown ( ) ;
LOG . info ( "close completed: path={} finalPosition={}" , path , position ) ;
LOG . info ( "[DEBUG-2024] close END: path={} finalPosition={} (buffer had {} bytes that were flushed)" ,
path , position , bufferPosBeforeFlush ) ;
} finally {
} finally {
lastError = new IOException ( "Stream is closed!" ) ;
lastError = new IOException ( "Stream is closed!" ) ;
ByteBufferPool . release ( buffer ) ;
ByteBufferPool . release ( buffer ) ;
@ -211,7 +225,8 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized void writeCurrentBufferToService ( ) throws IOException {
private synchronized void writeCurrentBufferToService ( ) throws IOException {
int bufferPos = buffer . position ( ) ;
int bufferPos = buffer . position ( ) ;
LOG . info ( "[DEBUG-2024] writeCurrentBufferToService: path={} buffer.position()={} totalPosition={}" , path , bufferPos , position ) ;
LOG . info ( "[DEBUG-2024] writeCurrentBufferToService: path={} buffer.position()={} totalPosition={}" , path ,
bufferPos , position ) ;
if ( bufferPos = = 0 ) {
if ( bufferPos = = 0 ) {
LOG . info ( " → Skipping write, buffer is empty" ) ;
LOG . info ( " → Skipping write, buffer is empty" ) ;
return ;
return ;
@ -225,18 +240,22 @@ public class SeaweedOutputStream extends OutputStream {
}
}
private synchronized int submitWriteBufferToService ( final ByteBuffer bufferToWrite , final long writePosition ) throws IOException {
private synchronized int submitWriteBufferToService ( final ByteBuffer bufferToWrite , final long writePosition )
throws IOException {
( ( Buffer ) bufferToWrite ) . flip ( ) ;
( ( Buffer ) bufferToWrite ) . flip ( ) ;
int bytesLength = bufferToWrite . limit ( ) - bufferToWrite . position ( ) ;
int bytesLength = bufferToWrite . limit ( ) - bufferToWrite . position ( ) ;
if ( threadExecutor . getQueue ( ) . size ( ) > = maxConcurrentRequestCount ) {
if ( threadExecutor . getQueue ( ) . size ( ) > = maxConcurrentRequestCount ) {
waitForTaskToComplete ( ) ;
waitForTaskToComplete ( ) ;
}
}
final Future < Void > job = completionService . submit ( ( ) - > {
final Future < Void > job = completionService . submit ( ( ) - > {
/ / System . out . println ( path + " is going to save [" + ( writePosition ) + "," + ( ( writePosition ) + bytesLength ) + ")" ) ;
SeaweedWrite . writeData ( entry , replication , collection , filerClient , writePosition , bufferToWrite . array ( ) , bufferToWrite . position ( ) , bufferToWrite . limit ( ) , path ) ;
/ / System . out . println ( path + " saved [" + ( writePosition ) + "," + ( ( writePosition ) + bytesLength ) + ")" ) ;
/ / System . out . println ( path + " is going to save [" + ( writePosition ) + "," +
/ / ( ( writePosition ) + bytesLength ) + ")" ) ;
SeaweedWrite . writeData ( entry , replication , collection , filerClient , writePosition , bufferToWrite . array ( ) ,
bufferToWrite . position ( ) , bufferToWrite . limit ( ) , path ) ;
/ / System . out . println ( path + " saved [" + ( writePosition ) + "," +
/ / ( ( writePosition ) + bytesLength ) + ")" ) ;
ByteBufferPool . release ( bufferToWrite ) ;
ByteBufferPool . release ( bufferToWrite ) ;
return null ;
return null ;
} ) ;
} ) ;