diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index 4830dca48..da94d464d 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -88,7 +88,7 @@ const DEFAULT_STREAMING_CHUNK_SIZE: usize = 64 * 1024; // 64 KB /// A body that streams needle data from the dat file in chunks using pread, /// avoiding loading the entire payload into memory at once. struct StreamingBody { - dat_file: std::fs::File, + source: crate::storage::volume::NeedleStreamSource, data_offset: u64, data_size: u32, pos: usize, @@ -185,8 +185,8 @@ impl http_body::Body for StreamingBody { let chunk_len = std::cmp::min(self.chunk_size, total - self.pos); let file_offset = self.data_offset + self.pos as u64; - let file_clone = match self.dat_file.try_clone() { - Ok(f) => f, + let source_clone = match self.source.clone_for_read() { + Ok(source) => source, Err(e) => return std::task::Poll::Ready(Some(Err(e))), }; let data_file_access_control = self.data_file_access_control.clone(); @@ -199,16 +199,7 @@ impl http_body::Body for StreamingBody { Some(data_file_access_control.read_lock()) }; let mut buf = vec![0u8; chunk_len]; - #[cfg(unix)] - { - use std::os::unix::fs::FileExt; - file_clone.read_exact_at(&mut buf, file_offset)?; - } - #[cfg(windows)] - { - use std::os::windows::fs::FileExt; - file_clone.seek_read(&mut buf, file_offset)?; - } + source_clone.read_exact_at(&mut buf, file_offset)?; Ok::(bytes::Bytes::from(buf)) }); @@ -1086,7 +1077,7 @@ async fn get_or_head_handler_inner( }; let streaming = StreamingBody { - dat_file: info.dat_file, + source: info.source, data_offset: info.data_file_offset, data_size: info.data_size, pos: 0, diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index bc04da499..6af5c725a 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -328,9 +328,45 @@ impl Drop for DataFileWriteLease { /// Information needed to stream needle data directly from the dat file /// without loading the entire payload into memory. +pub(crate) enum NeedleStreamSource { + Local(File), + Remote(RemoteDatFile), +} + +impl NeedleStreamSource { + pub(crate) fn clone_for_read(&self) -> io::Result { + match self { + NeedleStreamSource::Local(file) => Ok(NeedleStreamSource::Local(file.try_clone()?)), + NeedleStreamSource::Remote(remote) => Ok(NeedleStreamSource::Remote(remote.clone())), + } + } + + pub(crate) fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> { + match self { + NeedleStreamSource::Local(file) => { + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + file.read_exact_at(buf, offset)?; + } + #[cfg(windows)] + { + read_exact_at(file, buf, offset)?; + } + #[cfg(not(any(unix, windows)))] + { + compile_error!("Platform not supported: only unix and windows are supported"); + } + Ok(()) + } + NeedleStreamSource::Remote(remote) => remote.read_exact_at(buf, offset), + } + } +} + pub struct NeedleStreamInfo { - /// Cloned file handle for the dat file. - pub dat_file: File, + /// Stream source for the dat file, local or remote. + pub(crate) source: NeedleStreamSource, /// Absolute byte offset within the dat file where needle data starts. pub data_file_offset: u64, /// Size of the data payload in bytes. @@ -348,7 +384,7 @@ pub struct NeedleStreamInfo { } #[derive(Clone)] -struct RemoteDatFile { +pub(crate) struct RemoteDatFile { backend: Arc, key: String, file_size: u64, @@ -356,7 +392,7 @@ struct RemoteDatFile { } impl RemoteDatFile { - fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> { + pub(crate) fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> { let data = self .backend .read_range_blocking(&self.key, offset, buf.len()) @@ -1077,13 +1113,16 @@ impl Volume { offset as u64 + NEEDLE_HEADER_SIZE as u64 + 4 // skip DataSize (4 bytes) }; - let cloned_file = match self.dat_file.as_ref() { - Some(dat_file) => dat_file.try_clone().map_err(VolumeError::Io)?, - None => return Err(VolumeError::StreamingUnsupported), + let source = match (self.dat_file.as_ref(), self.remote_dat_file.as_ref()) { + (Some(dat_file), _) => NeedleStreamSource::Local( + dat_file.try_clone().map_err(VolumeError::Io)?, + ), + (None, Some(remote_dat_file)) => NeedleStreamSource::Remote(remote_dat_file.clone()), + (None, None) => return Err(VolumeError::StreamingUnsupported), }; Ok(NeedleStreamInfo { - dat_file: cloned_file, + source, data_file_offset, data_size: n.data_size, data_file_access_control: self.data_file_access_control.clone(), @@ -3310,10 +3349,13 @@ mod tests { id: NeedleId(7), ..Needle::default() }; - match v.read_needle_stream_info(&mut meta, false) { - Ok(_) => panic!("expected remote-backed volume stream info to require fallback"), - Err(err) => assert!(matches!(err, VolumeError::StreamingUnsupported)), - } + let info = v.read_needle_stream_info(&mut meta, false).unwrap(); + assert!(matches!(info.source, NeedleStreamSource::Remote(_))); + let mut streamed = vec![0u8; info.data_size as usize]; + info.source + .read_exact_at(&mut streamed, info.data_file_offset) + .unwrap(); + assert_eq!(streamed, b"remote-only"); assert_eq!(meta.data_size, 11); let _ = shutdown_tx.send(());