diff --git a/seaweed-volume/src/remote_storage/s3_tier.rs b/seaweed-volume/src/remote_storage/s3_tier.rs index f1b7a69f4..be88adcf8 100644 --- a/seaweed-volume/src/remote_storage/s3_tier.rs +++ b/seaweed-volume/src/remote_storage/s3_tier.rs @@ -367,6 +367,27 @@ impl S3TierBackend { Ok(file_size) } + pub async fn read_range(&self, key: &str, offset: u64, size: usize) -> Result, String> { + let end = offset + (size as u64).saturating_sub(1); + let range = format!("bytes={}-{}", offset, end); + let resp = self + .client + .get_object() + .bucket(&self.bucket) + .key(key) + .range(&range) + .send() + .await + .map_err(|e| format!("failed to get object {} range {}: {}", key, range, e))?; + + let body = resp + .body + .collect() + .await + .map_err(|e| format!("failed to read object {} body: {}", key, e))?; + Ok(body.into_bytes().to_vec()) + } + /// Delete a file from S3. pub async fn delete_file(&self, key: &str) -> Result<(), String> { self.client @@ -394,6 +415,36 @@ impl S3TierBackend { Ok(()) }) } + + pub fn read_range_blocking( + &self, + key: &str, + offset: u64, + size: usize, + ) -> Result, String> { + let client = self.client.clone(); + let bucket = self.bucket.clone(); + let key = key.to_string(); + block_on_tier_future(async move { + let end = offset + (size as u64).saturating_sub(1); + let range = format!("bytes={}-{}", offset, end); + let resp = client + .get_object() + .bucket(&bucket) + .key(&key) + .range(&range) + .send() + .await + .map_err(|e| format!("failed to get object {} range {}: {}", key, range, e))?; + + let body = resp + .body + .collect() + .await + .map_err(|e| format!("failed to read object {} body: {}", key, e))?; + Ok(body.into_bytes().to_vec()) + }) + } } /// Parse a backend name like "s3" or "s3.default" into (backend_type, backend_id). diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index 25836306e..4830dca48 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -804,6 +804,7 @@ async fn get_or_head_handler_inner( let stream_info = store.read_volume_needle_stream_info(vid, &mut n, read_deleted); let stream_info = match stream_info { Ok(info) => Some(info), + Err(crate::storage::volume::VolumeError::StreamingUnsupported) => None, Err(crate::storage::volume::VolumeError::NotFound) => { metrics::HANDLER_COUNTER .with_label_values(&[metrics::ERROR_GET_NOT_FOUND]) diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs index 7e3a700d5..3714cea4d 100644 --- a/seaweed-volume/src/storage/disk_location.rs +++ b/seaweed-volume/src/storage/disk_location.rs @@ -4,7 +4,7 @@ //! A Store contains one or more DiskLocations (one per configured directory). //! Matches Go's storage/disk_location.go. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs; use std::io; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering}; @@ -106,12 +106,13 @@ impl DiskLocation { // Scan for .dat files let entries = fs::read_dir(&self.directory)?; let mut dat_files: Vec<(String, VolumeId)> = Vec::new(); + let mut seen = HashSet::new(); for entry in entries { let entry = entry?; let name = entry.file_name().into_string().unwrap_or_default(); - if name.ends_with(".dat") { - if let Some((collection, vid)) = parse_volume_filename(&name) { + if let Some((collection, vid)) = parse_volume_filename(&name) { + if seen.insert((collection.clone(), vid)) { dat_files.push((collection, vid)); } } @@ -628,7 +629,10 @@ fn calculate_expected_shard_size(dat_file_size: i64) -> i64 { /// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId). fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> { - let stem = filename.strip_suffix(".dat")?; + let stem = filename + .strip_suffix(".dat") + .or_else(|| filename.strip_suffix(".vif")) + .or_else(|| filename.strip_suffix(".idx"))?; if let Some(pos) = stem.rfind('_') { let collection = &stem[..pos]; let id_str = &stem[pos + 1..]; @@ -659,6 +663,14 @@ mod tests { parse_volume_filename("pics_7.dat"), Some(("pics".to_string(), VolumeId(7))) ); + assert_eq!( + parse_volume_filename("42.vif"), + Some(("".to_string(), VolumeId(42))) + ); + assert_eq!( + parse_volume_filename("pics_7.idx"), + Some(("pics".to_string(), VolumeId(7))) + ); assert_eq!(parse_volume_filename("notadat.idx"), None); assert_eq!(parse_volume_filename("bad.dat"), None); } diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 3d4aedfe4..f93fc7a0a 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -216,7 +216,9 @@ impl Store { } let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid); let dat_path = format!("{}.dat", base); - if std::path::Path::new(&dat_path).exists() { + let vif_path = format!("{}.vif", base); + if std::path::Path::new(&dat_path).exists() || std::path::Path::new(&vif_path).exists() + { return loc.create_volume( vid, collection, @@ -264,12 +266,9 @@ impl Store { for entry in entries.flatten() { let name = entry.file_name(); let name = name.to_string_lossy(); - if !name.ends_with(".dat") { - continue; - } if let Some((collection, file_vid)) = parse_volume_filename(&name) { if file_vid == vid { - let base = name.trim_end_matches(".dat"); + let base = strip_volume_suffix(&name)?; let base_path = format!("{}/{}", loc.directory, base); return Some((loc_idx, base_path, collection)); } @@ -716,7 +715,7 @@ impl Store { /// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId). fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> { - let stem = filename.strip_suffix(".dat")?; + let stem = strip_volume_suffix(filename)?; if let Some(pos) = stem.rfind('_') { let collection = &stem[..pos]; let id_str = &stem[pos + 1..]; @@ -728,6 +727,13 @@ fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> { } } +fn strip_volume_suffix(filename: &str) -> Option<&str> { + filename + .strip_suffix(".dat") + .or_else(|| filename.strip_suffix(".vif")) + .or_else(|| filename.strip_suffix(".idx")) +} + fn load_vif_volume_info(path: &str) -> Result { let content = match std::fs::read_to_string(path) { Ok(c) => c, diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 66be54b93..11ddb55ab 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -67,6 +67,9 @@ pub enum VolumeError { #[error("IO error: {0}")] Io(#[from] io::Error), + + #[error("streaming from remote-backed volume requires buffered fallback")] + StreamingUnsupported, } // ============================================================================ @@ -344,6 +347,36 @@ pub struct NeedleStreamInfo { pub compaction_revision: u16, } +#[derive(Clone)] +struct RemoteDatFile { + backend: Arc, + key: String, + file_size: u64, + modified_time: u64, +} + +impl RemoteDatFile { + fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> { + let data = self + .backend + .read_range_blocking(&self.key, offset, buf.len()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + if data.len() != buf.len() { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!( + "remote read short read at offset {}: got {}, expected {}", + offset, + data.len(), + buf.len() + ), + )); + } + buf.copy_from_slice(&data); + Ok(()) + } +} + // ============================================================================ // Volume // ============================================================================ @@ -355,6 +388,7 @@ pub struct Volume { pub collection: String, dat_file: Option, + remote_dat_file: Option, nm: Option, needle_map_kind: NeedleMapKind, data_file_access_control: Arc, @@ -422,6 +456,7 @@ impl Volume { dir_idx: dir_idx.to_string(), collection: collection.to_string(), dat_file: None, + remote_dat_file: None, nm: None, needle_map_kind, data_file_access_control: Arc::new(DataFileAccessControl::default()), @@ -489,7 +524,28 @@ impl Volume { let dat_path = self.file_name(".dat"); let mut already_has_super_block = false; - if Path::new(&dat_path).exists() { + self.load_vif()?; + + if self.volume_info.read_only && !self.has_remote_file { + self.no_write_or_delete = true; + } + + if self.has_remote_file { + self.load_remote_dat_file()?; + if let Some(remote_file) = self.volume_info.files.first() { + if remote_file.modified_time > 0 { + self.last_modified_ts_seconds = remote_file.modified_time; + } else if let Ok(metadata) = fs::metadata(self.vif_path()) { + self.last_modified_ts_seconds = metadata + .modified() + .unwrap_or(SystemTime::UNIX_EPOCH) + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + } + } + already_has_super_block = true; + } else if Path::new(&dat_path).exists() { let metadata = fs::metadata(&dat_path)?; // Try to open read-write; fall back to read-only @@ -536,9 +592,20 @@ impl Volume { } if already_has_super_block { - self.read_super_block()?; - if !self.super_block.version.is_supported() { - return Err(VolumeError::UnsupportedVersion(self.super_block.version.0)); + match self.read_super_block() { + Ok(()) => { + if !self.super_block.version.is_supported() { + return Err(VolumeError::UnsupportedVersion(self.super_block.version.0)); + } + } + Err(e) if self.has_remote_file => { + warn!( + volume_id = self.id.0, + error = %e, + "failed to read remote super block during load" + ); + } + Err(e) => return Err(e), } } else { self.maybe_write_super_block(version)?; @@ -548,8 +615,6 @@ impl Volume { self.load_index()?; } - self.load_vif()?; - Ok(()) } @@ -670,16 +735,89 @@ impl Volume { Ok(()) } + fn load_remote_dat_file(&mut self) -> Result<(), VolumeError> { + let (storage_name, storage_key) = self.remote_storage_name_key(); + let backend = crate::remote_storage::s3_tier::global_s3_tier_registry() + .read() + .unwrap() + .get(&storage_name) + .ok_or_else(|| { + VolumeError::Io(io::Error::new( + io::ErrorKind::NotFound, + format!("remote tier backend {} not found", storage_name), + )) + })?; + + let remote_file = self.volume_info.files.first().ok_or_else(|| { + VolumeError::Io(io::Error::new( + io::ErrorKind::NotFound, + "remote volume has no remote file entries", + )) + })?; + + let file_size = if remote_file.file_size > 0 { + remote_file.file_size + } else if self.volume_info.dat_file_size > 0 { + self.volume_info.dat_file_size as u64 + } else { + return Err(VolumeError::Io(io::Error::new( + io::ErrorKind::InvalidData, + format!("remote volume {} is missing file size metadata", self.id.0), + ))); + }; + + self.dat_file = None; + self.remote_dat_file = Some(RemoteDatFile { + backend, + key: storage_key, + file_size, + modified_time: remote_file.modified_time, + }); + Ok(()) + } + + fn read_exact_at_backend(&self, buf: &mut [u8], offset: u64) -> Result<(), VolumeError> { + if let Some(dat_file) = self.dat_file.as_ref() { + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + dat_file.read_exact_at(buf, offset)?; + } + #[cfg(windows)] + { + read_exact_at(dat_file, buf, offset)?; + } + #[cfg(not(any(unix, windows)))] + { + compile_error!("Platform not supported: only unix and windows are supported"); + } + Ok(()) + } else if let Some(remote_dat_file) = self.remote_dat_file.as_ref() { + remote_dat_file.read_exact_at(buf, offset)?; + Ok(()) + } else { + Err(VolumeError::Io(io::Error::new( + io::ErrorKind::Other, + "dat file not open", + ))) + } + } + + fn current_dat_file_size(&self) -> io::Result { + if let Some(ref f) = self.dat_file { + Ok(f.metadata()?.len()) + } else if let Some(ref remote_dat_file) = self.remote_dat_file { + Ok(remote_dat_file.file_size) + } else { + Ok(0) + } + } + // ---- SuperBlock I/O ---- fn read_super_block(&mut self) -> Result<(), VolumeError> { - let dat_file = self.dat_file.as_mut().ok_or_else(|| { - VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) - })?; - - dat_file.seek(SeekFrom::Start(0))?; let mut header = [0u8; SUPER_BLOCK_SIZE]; - dat_file.read_exact(&mut header)?; + self.read_exact_at_backend(&mut header, 0)?; let extra_size = u16::from_be_bytes([header[6], header[7]]); let total_size = SUPER_BLOCK_SIZE + extra_size as usize; @@ -687,7 +825,7 @@ impl Volume { let mut full_buf = vec![0u8; total_size]; full_buf[..SUPER_BLOCK_SIZE].copy_from_slice(&header); if extra_size > 0 { - dat_file.read_exact(&mut full_buf[SUPER_BLOCK_SIZE..])?; + self.read_exact_at_backend(&mut full_buf[SUPER_BLOCK_SIZE..], SUPER_BLOCK_SIZE as u64)?; } self.super_block = SuperBlock::from_bytes(&full_buf)?; @@ -816,28 +954,11 @@ impl Volume { offset: i64, size: Size, ) -> Result<(), VolumeError> { - let dat_file = self.dat_file.as_ref().ok_or_else(|| { - VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) - })?; - let version = self.version(); let actual_size = get_actual_size(size, version); - // Use pread (read_at) to avoid seeking with shared reference let mut buf = vec![0u8; actual_size as usize]; - #[cfg(unix)] - { - use std::os::unix::fs::FileExt; - dat_file.read_exact_at(&mut buf, offset as u64)?; - } - #[cfg(windows)] - { - read_exact_at(dat_file, &mut buf, offset as u64)?; - } - #[cfg(not(any(unix, windows)))] - { - compile_error!("Platform not supported: only unix and windows are supported"); - } + self.read_exact_at_backend(&mut buf, offset as u64)?; n.read_bytes(&mut buf, offset, size, version)?; Ok(()) @@ -850,23 +971,10 @@ impl Volume { } fn read_needle_blob_unlocked(&self, offset: i64, size: Size) -> Result, VolumeError> { - let dat_file = self.dat_file.as_ref().ok_or_else(|| { - VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) - })?; - let version = self.version(); let actual_size = get_actual_size(size, version); let mut buf = vec![0u8; actual_size as usize]; - - #[cfg(unix)] - { - use std::os::unix::fs::FileExt; - dat_file.read_exact_at(&mut buf, offset as u64)?; - } - #[cfg(windows)] - { - read_exact_at(dat_file, &mut buf, offset as u64)?; - } + self.read_exact_at_backend(&mut buf, offset as u64)?; Ok(buf) } @@ -900,10 +1008,6 @@ impl Volume { return Err(VolumeError::NotFound); } - let dat_file = self.dat_file.as_ref().ok_or_else(|| { - VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) - })?; - #[cfg_attr(feature = "5bytes", allow(unused_mut))] let mut offset = nv.offset.to_actual_offset(); let version = self.version(); @@ -914,15 +1018,7 @@ impl Volume { #[cfg_attr(feature = "5bytes", allow(unused_mut))] let mut read_and_parse = |off: i64| -> Result<(), VolumeError> { let mut buf = vec![0u8; actual_size as usize]; - #[cfg(unix)] - { - use std::os::unix::fs::FileExt; - dat_file.read_exact_at(&mut buf, off as u64)?; - } - #[cfg(windows)] - { - read_exact_at(dat_file, &mut buf, off as u64)?; - } + self.read_exact_at_backend(&mut buf, off as u64)?; n.read_bytes_meta_only(&mut buf, off, read_size, version)?; Ok(()) }; @@ -964,7 +1060,10 @@ impl Volume { offset as u64 + NEEDLE_HEADER_SIZE as u64 + 4 // skip DataSize (4 bytes) }; - let cloned_file = dat_file.try_clone().map_err(VolumeError::Io)?; + let cloned_file = match self.dat_file.as_ref() { + Some(dat_file) => dat_file.try_clone().map_err(VolumeError::Io)?, + None => return Err(VolumeError::StreamingUnsupported), + }; Ok(NeedleStreamInfo { dat_file: cloned_file, @@ -1100,20 +1199,8 @@ impl Volume { } fn read_needle_header_unlocked(&self, n: &mut Needle, offset: i64) -> Result<(), VolumeError> { - let dat_file = self.dat_file.as_ref().ok_or_else(|| { - VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) - })?; - let mut header = [0u8; NEEDLE_HEADER_SIZE]; - #[cfg(unix)] - { - use std::os::unix::fs::FileExt; - dat_file.read_exact_at(&mut header, offset as u64)?; - } - #[cfg(windows)] - { - read_exact_at(dat_file, &mut header, offset as u64)?; - } + self.read_exact_at_backend(&mut header, offset as u64)?; n.read_header(&header); Ok(()) @@ -1291,7 +1378,9 @@ impl Volume { /// Returns (files_checked, broken_needles) tuple. /// Each needle is read from disk and its CRC checksum is verified. pub fn scrub(&self) -> Result<(u64, Vec), VolumeError> { - let _dat_file = self.dat_file.as_ref().ok_or(VolumeError::NotFound)?; + if self.dat_file.is_none() && self.remote_dat_file.is_none() { + return Err(VolumeError::NotFound); + } let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; let dat_size = self.dat_file_size().map_err(|e| VolumeError::Io(e))?; @@ -1338,20 +1427,17 @@ impl Volume { &self, from_offset: u64, ) -> Result, Vec, u64)>, VolumeError> { - let dat_file = self.dat_file.as_ref().ok_or(VolumeError::NotFound)?; let version = self.super_block.version; - let dat_size = dat_file.metadata()?.len(); + let dat_size = self.current_dat_file_size()?; let mut entries = Vec::new(); let mut offset = from_offset; - let mut dat = dat_file.try_clone()?; while offset < dat_size { // Read needle header (16 bytes) let mut header = [0u8; NEEDLE_HEADER_SIZE]; - dat.seek(SeekFrom::Start(offset))?; - match dat.read_exact(&mut header) { + match self.read_exact_at_backend(&mut header, offset) { Ok(()) => {} - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, + Err(VolumeError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break, Err(e) => return Err(e.into()), } @@ -1370,10 +1456,9 @@ impl Volume { // Read body bytes let mut body = vec![0u8; body_length as usize]; - dat.seek(SeekFrom::Start(offset + NEEDLE_HEADER_SIZE as u64))?; - match dat.read_exact(&mut body) { + match self.read_exact_at_backend(&mut body, offset + NEEDLE_HEADER_SIZE as u64) { Ok(()) => {} - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, + Err(VolumeError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break, Err(e) => return Err(e.into()), } @@ -1582,22 +1667,11 @@ impl Volume { /// Read the append_at_ns timestamp from a needle at the given offset in the .dat file. fn read_append_at_ns(&self, offset: Offset) -> Result { - let dat_file = self.dat_file.as_ref().ok_or_else(|| { - VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) - })?; let actual_offset = offset.to_actual_offset() as u64; let version = self.version(); let mut header_buf = [0u8; NEEDLE_HEADER_SIZE]; - #[cfg(unix)] - { - use std::os::unix::fs::FileExt; - dat_file.read_exact_at(&mut header_buf, actual_offset)?; - } - #[cfg(not(unix))] - { - read_exact_at(dat_file, &mut header_buf, actual_offset)?; - } + self.read_exact_at_backend(&mut header_buf, actual_offset)?; let (_cookie, _id, size) = Needle::parse_header(&header_buf); if size.0 <= 0 { @@ -1606,15 +1680,7 @@ impl Volume { let actual_size = get_actual_size(size, version); let mut buf = vec![0u8; actual_size as usize]; - #[cfg(unix)] - { - use std::os::unix::fs::FileExt; - dat_file.read_exact_at(&mut buf, actual_offset)?; - } - #[cfg(not(unix))] - { - read_exact_at(dat_file, &mut buf, actual_offset)?; - } + self.read_exact_at_backend(&mut buf, actual_offset)?; let mut n = Needle::default(); n.read_bytes_meta_only(&mut buf, offset.to_actual_offset(), size, version)?; @@ -1783,22 +1849,25 @@ impl Volume { } pub fn dat_file_size(&self) -> io::Result { - if let Some(ref f) = self.dat_file { - Ok(f.metadata()?.len()) - } else { - Ok(0) - } + self.current_dat_file_size() } /// Get the modification time of the .dat file as Unix seconds. pub fn dat_file_mod_time(&self) -> u64 { - self.dat_file - .as_ref() - .and_then(|f| f.metadata().ok()) - .and_then(|m| m.modified().ok()) - .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) - .map(|d| d.as_secs()) - .unwrap_or(0) + if let Some(dat_file) = self.dat_file.as_ref() { + dat_file + .metadata() + .ok() + .and_then(|m| m.modified().ok()) + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_secs()) + .unwrap_or(0) + } else { + self.remote_dat_file + .as_ref() + .map(|remote_dat_file| remote_dat_file.modified_time) + .unwrap_or(0) + } } pub fn idx_file_size(&self) -> u64 { @@ -1947,6 +2016,7 @@ impl Volume { let _ = dat_file.sync_all(); } self.dat_file = None; + self.remote_dat_file = None; let cpd_path = self.file_name(".cpd"); let cpx_path = self.file_name(".cpx"); @@ -2152,6 +2222,7 @@ impl Volume { let _ = dat_file.sync_all(); } self.dat_file = None; + self.remote_dat_file = None; if let Some(ref nm) = self.nm { let _ = nm.sync(); } @@ -2351,6 +2422,82 @@ mod tests { use crate::storage::needle::crc::CRC; use tempfile::TempDir; + fn spawn_fake_s3_server(body: Vec) -> (String, tokio::sync::oneshot::Sender<()>) { + use axum::http::{header, HeaderMap, HeaderValue, StatusCode}; + use axum::routing::any; + use axum::Router; + + let body = Arc::new(body); + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + listener.set_nonblocking(true).unwrap(); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + + std::thread::spawn(move || { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + runtime.block_on(async move { + let app = Router::new().fallback(any(move |headers: HeaderMap| { + let body = body.clone(); + async move { + let bytes = body.as_ref(); + if let Some(range) = headers + .get(header::RANGE) + .and_then(|value| value.to_str().ok()) + { + if let Some(spec) = range.strip_prefix("bytes=") { + let (start, end) = spec.split_once('-').unwrap(); + let start = start.parse::().unwrap(); + let end = if end.is_empty() { + bytes.len().saturating_sub(1) + } else { + end.parse::().unwrap() + } + .min(bytes.len().saturating_sub(1)); + let chunk = bytes[start..=end].to_vec(); + let mut response_headers = HeaderMap::new(); + response_headers.insert( + header::CONTENT_LENGTH, + HeaderValue::from_str(&chunk.len().to_string()).unwrap(), + ); + response_headers.insert( + header::CONTENT_RANGE, + HeaderValue::from_str(&format!( + "bytes {}-{}/{}", + start, + end, + bytes.len() + )) + .unwrap(), + ); + return (StatusCode::PARTIAL_CONTENT, response_headers, chunk); + } + } + + let mut response_headers = HeaderMap::new(); + response_headers.insert( + header::CONTENT_LENGTH, + HeaderValue::from_str(&bytes.len().to_string()).unwrap(), + ); + (StatusCode::OK, response_headers, bytes.to_vec()) + } + })); + + let listener = tokio::net::TcpListener::from_std(listener).unwrap(); + axum::serve(listener, app) + .with_graceful_shutdown(async move { + let _ = shutdown_rx.await; + }) + .await + .unwrap(); + }); + }); + + (format!("http://{}", addr), shutdown_tx) + } + fn make_test_volume(dir: &str) -> Volume { Volume::new( dir, @@ -3046,6 +3193,115 @@ mod tests { } } + #[test] + fn test_remote_only_volume_load_reads_from_tier_backend() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + + let dat_bytes = { + let mut v = make_test_volume(dir); + let mut n = Needle { + id: NeedleId(7), + cookie: Cookie(0x7788), + data: b"remote-only".to_vec(), + data_size: 11, + ..Needle::default() + }; + v.write_needle(&mut n, true).unwrap(); + v.sync_to_disk().unwrap(); + std::fs::read(v.file_name(".dat")).unwrap() + }; + + let dat_path = format!("{}/1.dat", dir); + std::fs::remove_file(&dat_path).unwrap(); + + let (endpoint, shutdown_tx) = spawn_fake_s3_server(dat_bytes.clone()); + crate::remote_storage::s3_tier::global_s3_tier_registry() + .write() + .unwrap() + .clear(); + let tier_config = crate::remote_storage::s3_tier::S3TierConfig { + access_key: "access".to_string(), + secret_key: "secret".to_string(), + region: "us-east-1".to_string(), + bucket: "bucket-a".to_string(), + endpoint, + storage_class: "STANDARD".to_string(), + force_path_style: true, + }; + { + let mut registry = crate::remote_storage::s3_tier::global_s3_tier_registry() + .write() + .unwrap(); + registry.register( + "s3.default".to_string(), + crate::remote_storage::s3_tier::S3TierBackend::new(&tier_config), + ); + registry.register( + "s3".to_string(), + crate::remote_storage::s3_tier::S3TierBackend::new(&tier_config), + ); + } + + let vif = VifVolumeInfo { + files: vec![VifRemoteFile { + backend_type: "s3".to_string(), + backend_id: "default".to_string(), + key: "remote-key".to_string(), + offset: 0, + file_size: dat_bytes.len() as u64, + modified_time: 123, + extension: ".dat".to_string(), + }], + version: Version::current().0 as u32, + bytes_offset: OFFSET_SIZE as u32, + dat_file_size: dat_bytes.len() as i64, + ..VifVolumeInfo::default() + }; + std::fs::write( + format!("{}/1.vif", dir), + serde_json::to_string_pretty(&vif).unwrap(), + ) + .unwrap(); + + let v = Volume::new( + dir, + dir, + "", + VolumeId(1), + NeedleMapKind::InMemory, + None, + None, + 0, + Version::current(), + ) + .unwrap(); + + assert!(v.has_remote_file); + assert!(v.dat_file.is_none()); + assert!(v.remote_dat_file.is_some()); + + let mut n = Needle { + id: NeedleId(7), + ..Needle::default() + }; + let size = v.read_needle(&mut n).unwrap(); + assert_eq!(size, 11); + assert_eq!(n.data, b"remote-only"); + + let mut meta = Needle { + id: NeedleId(7), + ..Needle::default() + }; + match v.read_needle_stream_info(&mut meta, false) { + Ok(_) => panic!("expected remote-backed volume stream info to require fallback"), + Err(err) => assert!(matches!(err, VolumeError::StreamingUnsupported)), + } + assert_eq!(meta.data_size, 11); + + let _ = shutdown_tx.send(()); + } + /// Volume destroy removes .vif alongside the primary data files. #[test] fn test_destroy_removes_vif() {