Browse Source

Stream remote volume reads through HTTP

rust-volume-server
Chris Lu 4 days ago
parent
commit
6d270c73f7
  1. 19
      seaweed-volume/src/server/handlers.rs
  2. 66
      seaweed-volume/src/storage/volume.rs

19
seaweed-volume/src/server/handlers.rs

@ -88,7 +88,7 @@ const DEFAULT_STREAMING_CHUNK_SIZE: usize = 64 * 1024; // 64 KB
/// A body that streams needle data from the dat file in chunks using pread, /// A body that streams needle data from the dat file in chunks using pread,
/// avoiding loading the entire payload into memory at once. /// avoiding loading the entire payload into memory at once.
struct StreamingBody { struct StreamingBody {
dat_file: std::fs::File,
source: crate::storage::volume::NeedleStreamSource,
data_offset: u64, data_offset: u64,
data_size: u32, data_size: u32,
pos: usize, pos: usize,
@ -185,8 +185,8 @@ impl http_body::Body for StreamingBody {
let chunk_len = std::cmp::min(self.chunk_size, total - self.pos); let chunk_len = std::cmp::min(self.chunk_size, total - self.pos);
let file_offset = self.data_offset + self.pos as u64; let file_offset = self.data_offset + self.pos as u64;
let file_clone = match self.dat_file.try_clone() {
Ok(f) => f,
let source_clone = match self.source.clone_for_read() {
Ok(source) => source,
Err(e) => return std::task::Poll::Ready(Some(Err(e))), Err(e) => return std::task::Poll::Ready(Some(Err(e))),
}; };
let data_file_access_control = self.data_file_access_control.clone(); let data_file_access_control = self.data_file_access_control.clone();
@ -199,16 +199,7 @@ impl http_body::Body for StreamingBody {
Some(data_file_access_control.read_lock()) Some(data_file_access_control.read_lock())
}; };
let mut buf = vec![0u8; chunk_len]; let mut buf = vec![0u8; chunk_len];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file_clone.read_exact_at(&mut buf, file_offset)?;
}
#[cfg(windows)]
{
use std::os::windows::fs::FileExt;
file_clone.seek_read(&mut buf, file_offset)?;
}
source_clone.read_exact_at(&mut buf, file_offset)?;
Ok::<bytes::Bytes, std::io::Error>(bytes::Bytes::from(buf)) Ok::<bytes::Bytes, std::io::Error>(bytes::Bytes::from(buf))
}); });
@ -1086,7 +1077,7 @@ async fn get_or_head_handler_inner(
}; };
let streaming = StreamingBody { let streaming = StreamingBody {
dat_file: info.dat_file,
source: info.source,
data_offset: info.data_file_offset, data_offset: info.data_file_offset,
data_size: info.data_size, data_size: info.data_size,
pos: 0, pos: 0,

66
seaweed-volume/src/storage/volume.rs

@ -328,9 +328,45 @@ impl Drop for DataFileWriteLease {
/// Information needed to stream needle data directly from the dat file /// Information needed to stream needle data directly from the dat file
/// without loading the entire payload into memory. /// without loading the entire payload into memory.
pub(crate) enum NeedleStreamSource {
Local(File),
Remote(RemoteDatFile),
}
impl NeedleStreamSource {
pub(crate) fn clone_for_read(&self) -> io::Result<Self> {
match self {
NeedleStreamSource::Local(file) => Ok(NeedleStreamSource::Local(file.try_clone()?)),
NeedleStreamSource::Remote(remote) => Ok(NeedleStreamSource::Remote(remote.clone())),
}
}
pub(crate) fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
match self {
NeedleStreamSource::Local(file) => {
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file.read_exact_at(buf, offset)?;
}
#[cfg(windows)]
{
read_exact_at(file, buf, offset)?;
}
#[cfg(not(any(unix, windows)))]
{
compile_error!("Platform not supported: only unix and windows are supported");
}
Ok(())
}
NeedleStreamSource::Remote(remote) => remote.read_exact_at(buf, offset),
}
}
}
pub struct NeedleStreamInfo { pub struct NeedleStreamInfo {
/// Cloned file handle for the dat file.
pub dat_file: File,
/// Stream source for the dat file, local or remote.
pub(crate) source: NeedleStreamSource,
/// Absolute byte offset within the dat file where needle data starts. /// Absolute byte offset within the dat file where needle data starts.
pub data_file_offset: u64, pub data_file_offset: u64,
/// Size of the data payload in bytes. /// Size of the data payload in bytes.
@ -348,7 +384,7 @@ pub struct NeedleStreamInfo {
} }
#[derive(Clone)] #[derive(Clone)]
struct RemoteDatFile {
pub(crate) struct RemoteDatFile {
backend: Arc<crate::remote_storage::s3_tier::S3TierBackend>, backend: Arc<crate::remote_storage::s3_tier::S3TierBackend>,
key: String, key: String,
file_size: u64, file_size: u64,
@ -356,7 +392,7 @@ struct RemoteDatFile {
} }
impl RemoteDatFile { impl RemoteDatFile {
fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
pub(crate) fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
let data = self let data = self
.backend .backend
.read_range_blocking(&self.key, offset, buf.len()) .read_range_blocking(&self.key, offset, buf.len())
@ -1077,13 +1113,16 @@ impl Volume {
offset as u64 + NEEDLE_HEADER_SIZE as u64 + 4 // skip DataSize (4 bytes) offset as u64 + NEEDLE_HEADER_SIZE as u64 + 4 // skip DataSize (4 bytes)
}; };
let cloned_file = match self.dat_file.as_ref() {
Some(dat_file) => dat_file.try_clone().map_err(VolumeError::Io)?,
None => return Err(VolumeError::StreamingUnsupported),
let source = match (self.dat_file.as_ref(), self.remote_dat_file.as_ref()) {
(Some(dat_file), _) => NeedleStreamSource::Local(
dat_file.try_clone().map_err(VolumeError::Io)?,
),
(None, Some(remote_dat_file)) => NeedleStreamSource::Remote(remote_dat_file.clone()),
(None, None) => return Err(VolumeError::StreamingUnsupported),
}; };
Ok(NeedleStreamInfo { Ok(NeedleStreamInfo {
dat_file: cloned_file,
source,
data_file_offset, data_file_offset,
data_size: n.data_size, data_size: n.data_size,
data_file_access_control: self.data_file_access_control.clone(), data_file_access_control: self.data_file_access_control.clone(),
@ -3310,10 +3349,13 @@ mod tests {
id: NeedleId(7), id: NeedleId(7),
..Needle::default() ..Needle::default()
}; };
match v.read_needle_stream_info(&mut meta, false) {
Ok(_) => panic!("expected remote-backed volume stream info to require fallback"),
Err(err) => assert!(matches!(err, VolumeError::StreamingUnsupported)),
}
let info = v.read_needle_stream_info(&mut meta, false).unwrap();
assert!(matches!(info.source, NeedleStreamSource::Remote(_)));
let mut streamed = vec![0u8; info.data_size as usize];
info.source
.read_exact_at(&mut streamed, info.data_file_offset)
.unwrap();
assert_eq!(streamed, b"remote-only");
assert_eq!(meta.data_size, 11); assert_eq!(meta.data_size, 11);
let _ = shutdown_tx.send(()); let _ = shutdown_tx.send(());

Loading…
Cancel
Save