|
|
@ -169,12 +169,35 @@ public class SeaweedOutputStream extends OutputStream { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
buffer.flip(); |
|
|
|
int bytesLength = buffer.limit() - buffer.position(); |
|
|
|
SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit()); |
|
|
|
// System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")"); |
|
|
|
position += bytesLength; |
|
|
|
buffer.clear(); |
|
|
|
position += submitWriteBufferToService(buffer, position); |
|
|
|
|
|
|
|
buffer = ByteBufferPool.request(bufferSize); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { |
|
|
|
|
|
|
|
bufferToWrite.flip(); |
|
|
|
int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); |
|
|
|
|
|
|
|
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { |
|
|
|
waitForTaskToComplete(); |
|
|
|
} |
|
|
|
final Future<Void> job = completionService.submit(() -> { |
|
|
|
System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); |
|
|
|
SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit()); |
|
|
|
System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); |
|
|
|
bufferToWrite.clear(); |
|
|
|
ByteBufferPool.release(bufferToWrite); |
|
|
|
return null; |
|
|
|
}); |
|
|
|
|
|
|
|
writeOperations.add(new WriteOperation(job, writePosition, bytesLength)); |
|
|
|
|
|
|
|
// Try to shrink the queue |
|
|
|
shrinkWriteOperationQueue(); |
|
|
|
|
|
|
|
return bytesLength; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|