Browse Source

Load remote-only tiered volumes

rust-volume-server
Chris Lu 6 days ago
parent
commit
ca24736288
  1. 51
      seaweed-volume/src/remote_storage/s3_tier.rs
  2. 1
      seaweed-volume/src/server/handlers.rs
  3. 20
      seaweed-volume/src/storage/disk_location.rs
  4. 18
      seaweed-volume/src/storage/store.rs
  5. 486
      seaweed-volume/src/storage/volume.rs

51
seaweed-volume/src/remote_storage/s3_tier.rs

@ -367,6 +367,27 @@ impl S3TierBackend {
Ok(file_size) Ok(file_size)
} }
pub async fn read_range(&self, key: &str, offset: u64, size: usize) -> Result<Vec<u8>, String> {
let end = offset + (size as u64).saturating_sub(1);
let range = format!("bytes={}-{}", offset, end);
let resp = self
.client
.get_object()
.bucket(&self.bucket)
.key(key)
.range(&range)
.send()
.await
.map_err(|e| format!("failed to get object {} range {}: {}", key, range, e))?;
let body = resp
.body
.collect()
.await
.map_err(|e| format!("failed to read object {} body: {}", key, e))?;
Ok(body.into_bytes().to_vec())
}
/// Delete a file from S3. /// Delete a file from S3.
pub async fn delete_file(&self, key: &str) -> Result<(), String> { pub async fn delete_file(&self, key: &str) -> Result<(), String> {
self.client self.client
@ -394,6 +415,36 @@ impl S3TierBackend {
Ok(()) Ok(())
}) })
} }
pub fn read_range_blocking(
&self,
key: &str,
offset: u64,
size: usize,
) -> Result<Vec<u8>, String> {
let client = self.client.clone();
let bucket = self.bucket.clone();
let key = key.to_string();
block_on_tier_future(async move {
let end = offset + (size as u64).saturating_sub(1);
let range = format!("bytes={}-{}", offset, end);
let resp = client
.get_object()
.bucket(&bucket)
.key(&key)
.range(&range)
.send()
.await
.map_err(|e| format!("failed to get object {} range {}: {}", key, range, e))?;
let body = resp
.body
.collect()
.await
.map_err(|e| format!("failed to read object {} body: {}", key, e))?;
Ok(body.into_bytes().to_vec())
})
}
} }
/// Parse a backend name like "s3" or "s3.default" into (backend_type, backend_id). /// Parse a backend name like "s3" or "s3.default" into (backend_type, backend_id).

1
seaweed-volume/src/server/handlers.rs

@ -804,6 +804,7 @@ async fn get_or_head_handler_inner(
let stream_info = store.read_volume_needle_stream_info(vid, &mut n, read_deleted); let stream_info = store.read_volume_needle_stream_info(vid, &mut n, read_deleted);
let stream_info = match stream_info { let stream_info = match stream_info {
Ok(info) => Some(info), Ok(info) => Some(info),
Err(crate::storage::volume::VolumeError::StreamingUnsupported) => None,
Err(crate::storage::volume::VolumeError::NotFound) => { Err(crate::storage::volume::VolumeError::NotFound) => {
metrics::HANDLER_COUNTER metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_NOT_FOUND]) .with_label_values(&[metrics::ERROR_GET_NOT_FOUND])

20
seaweed-volume/src/storage/disk_location.rs

@ -4,7 +4,7 @@
//! A Store contains one or more DiskLocations (one per configured directory). //! A Store contains one or more DiskLocations (one per configured directory).
//! Matches Go's storage/disk_location.go. //! Matches Go's storage/disk_location.go.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fs; use std::fs;
use std::io; use std::io;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering};
@ -106,12 +106,13 @@ impl DiskLocation {
// Scan for .dat files // Scan for .dat files
let entries = fs::read_dir(&self.directory)?; let entries = fs::read_dir(&self.directory)?;
let mut dat_files: Vec<(String, VolumeId)> = Vec::new(); let mut dat_files: Vec<(String, VolumeId)> = Vec::new();
let mut seen = HashSet::new();
for entry in entries { for entry in entries {
let entry = entry?; let entry = entry?;
let name = entry.file_name().into_string().unwrap_or_default(); let name = entry.file_name().into_string().unwrap_or_default();
if name.ends_with(".dat") {
if let Some((collection, vid)) = parse_volume_filename(&name) {
if let Some((collection, vid)) = parse_volume_filename(&name) {
if seen.insert((collection.clone(), vid)) {
dat_files.push((collection, vid)); dat_files.push((collection, vid));
} }
} }
@ -628,7 +629,10 @@ fn calculate_expected_shard_size(dat_file_size: i64) -> i64 {
/// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId). /// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId).
fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> { fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> {
let stem = filename.strip_suffix(".dat")?;
let stem = filename
.strip_suffix(".dat")
.or_else(|| filename.strip_suffix(".vif"))
.or_else(|| filename.strip_suffix(".idx"))?;
if let Some(pos) = stem.rfind('_') { if let Some(pos) = stem.rfind('_') {
let collection = &stem[..pos]; let collection = &stem[..pos];
let id_str = &stem[pos + 1..]; let id_str = &stem[pos + 1..];
@ -659,6 +663,14 @@ mod tests {
parse_volume_filename("pics_7.dat"), parse_volume_filename("pics_7.dat"),
Some(("pics".to_string(), VolumeId(7))) Some(("pics".to_string(), VolumeId(7)))
); );
assert_eq!(
parse_volume_filename("42.vif"),
Some(("".to_string(), VolumeId(42)))
);
assert_eq!(
parse_volume_filename("pics_7.idx"),
Some(("pics".to_string(), VolumeId(7)))
);
assert_eq!(parse_volume_filename("notadat.idx"), None); assert_eq!(parse_volume_filename("notadat.idx"), None);
assert_eq!(parse_volume_filename("bad.dat"), None); assert_eq!(parse_volume_filename("bad.dat"), None);
} }

18
seaweed-volume/src/storage/store.rs

@ -216,7 +216,9 @@ impl Store {
} }
let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid); let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid);
let dat_path = format!("{}.dat", base); let dat_path = format!("{}.dat", base);
if std::path::Path::new(&dat_path).exists() {
let vif_path = format!("{}.vif", base);
if std::path::Path::new(&dat_path).exists() || std::path::Path::new(&vif_path).exists()
{
return loc.create_volume( return loc.create_volume(
vid, vid,
collection, collection,
@ -264,12 +266,9 @@ impl Store {
for entry in entries.flatten() { for entry in entries.flatten() {
let name = entry.file_name(); let name = entry.file_name();
let name = name.to_string_lossy(); let name = name.to_string_lossy();
if !name.ends_with(".dat") {
continue;
}
if let Some((collection, file_vid)) = parse_volume_filename(&name) { if let Some((collection, file_vid)) = parse_volume_filename(&name) {
if file_vid == vid { if file_vid == vid {
let base = name.trim_end_matches(".dat");
let base = strip_volume_suffix(&name)?;
let base_path = format!("{}/{}", loc.directory, base); let base_path = format!("{}/{}", loc.directory, base);
return Some((loc_idx, base_path, collection)); return Some((loc_idx, base_path, collection));
} }
@ -716,7 +715,7 @@ impl Store {
/// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId). /// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId).
fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> { fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> {
let stem = filename.strip_suffix(".dat")?;
let stem = strip_volume_suffix(filename)?;
if let Some(pos) = stem.rfind('_') { if let Some(pos) = stem.rfind('_') {
let collection = &stem[..pos]; let collection = &stem[..pos];
let id_str = &stem[pos + 1..]; let id_str = &stem[pos + 1..];
@ -728,6 +727,13 @@ fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> {
} }
} }
fn strip_volume_suffix(filename: &str) -> Option<&str> {
filename
.strip_suffix(".dat")
.or_else(|| filename.strip_suffix(".vif"))
.or_else(|| filename.strip_suffix(".idx"))
}
fn load_vif_volume_info(path: &str) -> Result<VifVolumeInfo, VolumeError> { fn load_vif_volume_info(path: &str) -> Result<VifVolumeInfo, VolumeError> {
let content = match std::fs::read_to_string(path) { let content = match std::fs::read_to_string(path) {
Ok(c) => c, Ok(c) => c,

486
seaweed-volume/src/storage/volume.rs

@ -67,6 +67,9 @@ pub enum VolumeError {
#[error("IO error: {0}")] #[error("IO error: {0}")]
Io(#[from] io::Error), Io(#[from] io::Error),
#[error("streaming from remote-backed volume requires buffered fallback")]
StreamingUnsupported,
} }
// ============================================================================ // ============================================================================
@ -344,6 +347,36 @@ pub struct NeedleStreamInfo {
pub compaction_revision: u16, pub compaction_revision: u16,
} }
#[derive(Clone)]
struct RemoteDatFile {
backend: Arc<crate::remote_storage::s3_tier::S3TierBackend>,
key: String,
file_size: u64,
modified_time: u64,
}
impl RemoteDatFile {
fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
let data = self
.backend
.read_range_blocking(&self.key, offset, buf.len())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
if data.len() != buf.len() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"remote read short read at offset {}: got {}, expected {}",
offset,
data.len(),
buf.len()
),
));
}
buf.copy_from_slice(&data);
Ok(())
}
}
// ============================================================================ // ============================================================================
// Volume // Volume
// ============================================================================ // ============================================================================
@ -355,6 +388,7 @@ pub struct Volume {
pub collection: String, pub collection: String,
dat_file: Option<File>, dat_file: Option<File>,
remote_dat_file: Option<RemoteDatFile>,
nm: Option<NeedleMap>, nm: Option<NeedleMap>,
needle_map_kind: NeedleMapKind, needle_map_kind: NeedleMapKind,
data_file_access_control: Arc<DataFileAccessControl>, data_file_access_control: Arc<DataFileAccessControl>,
@ -422,6 +456,7 @@ impl Volume {
dir_idx: dir_idx.to_string(), dir_idx: dir_idx.to_string(),
collection: collection.to_string(), collection: collection.to_string(),
dat_file: None, dat_file: None,
remote_dat_file: None,
nm: None, nm: None,
needle_map_kind, needle_map_kind,
data_file_access_control: Arc::new(DataFileAccessControl::default()), data_file_access_control: Arc::new(DataFileAccessControl::default()),
@ -489,7 +524,28 @@ impl Volume {
let dat_path = self.file_name(".dat"); let dat_path = self.file_name(".dat");
let mut already_has_super_block = false; let mut already_has_super_block = false;
if Path::new(&dat_path).exists() {
self.load_vif()?;
if self.volume_info.read_only && !self.has_remote_file {
self.no_write_or_delete = true;
}
if self.has_remote_file {
self.load_remote_dat_file()?;
if let Some(remote_file) = self.volume_info.files.first() {
if remote_file.modified_time > 0 {
self.last_modified_ts_seconds = remote_file.modified_time;
} else if let Ok(metadata) = fs::metadata(self.vif_path()) {
self.last_modified_ts_seconds = metadata
.modified()
.unwrap_or(SystemTime::UNIX_EPOCH)
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
}
}
already_has_super_block = true;
} else if Path::new(&dat_path).exists() {
let metadata = fs::metadata(&dat_path)?; let metadata = fs::metadata(&dat_path)?;
// Try to open read-write; fall back to read-only // Try to open read-write; fall back to read-only
@ -536,9 +592,20 @@ impl Volume {
} }
if already_has_super_block { if already_has_super_block {
self.read_super_block()?;
if !self.super_block.version.is_supported() {
return Err(VolumeError::UnsupportedVersion(self.super_block.version.0));
match self.read_super_block() {
Ok(()) => {
if !self.super_block.version.is_supported() {
return Err(VolumeError::UnsupportedVersion(self.super_block.version.0));
}
}
Err(e) if self.has_remote_file => {
warn!(
volume_id = self.id.0,
error = %e,
"failed to read remote super block during load"
);
}
Err(e) => return Err(e),
} }
} else { } else {
self.maybe_write_super_block(version)?; self.maybe_write_super_block(version)?;
@ -548,8 +615,6 @@ impl Volume {
self.load_index()?; self.load_index()?;
} }
self.load_vif()?;
Ok(()) Ok(())
} }
@ -670,16 +735,89 @@ impl Volume {
Ok(()) Ok(())
} }
fn load_remote_dat_file(&mut self) -> Result<(), VolumeError> {
let (storage_name, storage_key) = self.remote_storage_name_key();
let backend = crate::remote_storage::s3_tier::global_s3_tier_registry()
.read()
.unwrap()
.get(&storage_name)
.ok_or_else(|| {
VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
format!("remote tier backend {} not found", storage_name),
))
})?;
let remote_file = self.volume_info.files.first().ok_or_else(|| {
VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
"remote volume has no remote file entries",
))
})?;
let file_size = if remote_file.file_size > 0 {
remote_file.file_size
} else if self.volume_info.dat_file_size > 0 {
self.volume_info.dat_file_size as u64
} else {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!("remote volume {} is missing file size metadata", self.id.0),
)));
};
self.dat_file = None;
self.remote_dat_file = Some(RemoteDatFile {
backend,
key: storage_key,
file_size,
modified_time: remote_file.modified_time,
});
Ok(())
}
fn read_exact_at_backend(&self, buf: &mut [u8], offset: u64) -> Result<(), VolumeError> {
if let Some(dat_file) = self.dat_file.as_ref() {
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(buf, offset)?;
}
#[cfg(windows)]
{
read_exact_at(dat_file, buf, offset)?;
}
#[cfg(not(any(unix, windows)))]
{
compile_error!("Platform not supported: only unix and windows are supported");
}
Ok(())
} else if let Some(remote_dat_file) = self.remote_dat_file.as_ref() {
remote_dat_file.read_exact_at(buf, offset)?;
Ok(())
} else {
Err(VolumeError::Io(io::Error::new(
io::ErrorKind::Other,
"dat file not open",
)))
}
}
fn current_dat_file_size(&self) -> io::Result<u64> {
if let Some(ref f) = self.dat_file {
Ok(f.metadata()?.len())
} else if let Some(ref remote_dat_file) = self.remote_dat_file {
Ok(remote_dat_file.file_size)
} else {
Ok(0)
}
}
// ---- SuperBlock I/O ---- // ---- SuperBlock I/O ----
fn read_super_block(&mut self) -> Result<(), VolumeError> { fn read_super_block(&mut self) -> Result<(), VolumeError> {
let dat_file = self.dat_file.as_mut().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
dat_file.seek(SeekFrom::Start(0))?;
let mut header = [0u8; SUPER_BLOCK_SIZE]; let mut header = [0u8; SUPER_BLOCK_SIZE];
dat_file.read_exact(&mut header)?;
self.read_exact_at_backend(&mut header, 0)?;
let extra_size = u16::from_be_bytes([header[6], header[7]]); let extra_size = u16::from_be_bytes([header[6], header[7]]);
let total_size = SUPER_BLOCK_SIZE + extra_size as usize; let total_size = SUPER_BLOCK_SIZE + extra_size as usize;
@ -687,7 +825,7 @@ impl Volume {
let mut full_buf = vec![0u8; total_size]; let mut full_buf = vec![0u8; total_size];
full_buf[..SUPER_BLOCK_SIZE].copy_from_slice(&header); full_buf[..SUPER_BLOCK_SIZE].copy_from_slice(&header);
if extra_size > 0 { if extra_size > 0 {
dat_file.read_exact(&mut full_buf[SUPER_BLOCK_SIZE..])?;
self.read_exact_at_backend(&mut full_buf[SUPER_BLOCK_SIZE..], SUPER_BLOCK_SIZE as u64)?;
} }
self.super_block = SuperBlock::from_bytes(&full_buf)?; self.super_block = SuperBlock::from_bytes(&full_buf)?;
@ -816,28 +954,11 @@ impl Volume {
offset: i64, offset: i64,
size: Size, size: Size,
) -> Result<(), VolumeError> { ) -> Result<(), VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let version = self.version(); let version = self.version();
let actual_size = get_actual_size(size, version); let actual_size = get_actual_size(size, version);
// Use pread (read_at) to avoid seeking with shared reference
let mut buf = vec![0u8; actual_size as usize]; let mut buf = vec![0u8; actual_size as usize];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut buf, offset as u64)?;
}
#[cfg(windows)]
{
read_exact_at(dat_file, &mut buf, offset as u64)?;
}
#[cfg(not(any(unix, windows)))]
{
compile_error!("Platform not supported: only unix and windows are supported");
}
self.read_exact_at_backend(&mut buf, offset as u64)?;
n.read_bytes(&mut buf, offset, size, version)?; n.read_bytes(&mut buf, offset, size, version)?;
Ok(()) Ok(())
@ -850,23 +971,10 @@ impl Volume {
} }
fn read_needle_blob_unlocked(&self, offset: i64, size: Size) -> Result<Vec<u8>, VolumeError> { fn read_needle_blob_unlocked(&self, offset: i64, size: Size) -> Result<Vec<u8>, VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let version = self.version(); let version = self.version();
let actual_size = get_actual_size(size, version); let actual_size = get_actual_size(size, version);
let mut buf = vec![0u8; actual_size as usize]; let mut buf = vec![0u8; actual_size as usize];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut buf, offset as u64)?;
}
#[cfg(windows)]
{
read_exact_at(dat_file, &mut buf, offset as u64)?;
}
self.read_exact_at_backend(&mut buf, offset as u64)?;
Ok(buf) Ok(buf)
} }
@ -900,10 +1008,6 @@ impl Volume {
return Err(VolumeError::NotFound); return Err(VolumeError::NotFound);
} }
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
#[cfg_attr(feature = "5bytes", allow(unused_mut))] #[cfg_attr(feature = "5bytes", allow(unused_mut))]
let mut offset = nv.offset.to_actual_offset(); let mut offset = nv.offset.to_actual_offset();
let version = self.version(); let version = self.version();
@ -914,15 +1018,7 @@ impl Volume {
#[cfg_attr(feature = "5bytes", allow(unused_mut))] #[cfg_attr(feature = "5bytes", allow(unused_mut))]
let mut read_and_parse = |off: i64| -> Result<(), VolumeError> { let mut read_and_parse = |off: i64| -> Result<(), VolumeError> {
let mut buf = vec![0u8; actual_size as usize]; let mut buf = vec![0u8; actual_size as usize];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut buf, off as u64)?;
}
#[cfg(windows)]
{
read_exact_at(dat_file, &mut buf, off as u64)?;
}
self.read_exact_at_backend(&mut buf, off as u64)?;
n.read_bytes_meta_only(&mut buf, off, read_size, version)?; n.read_bytes_meta_only(&mut buf, off, read_size, version)?;
Ok(()) Ok(())
}; };
@ -964,7 +1060,10 @@ impl Volume {
offset as u64 + NEEDLE_HEADER_SIZE as u64 + 4 // skip DataSize (4 bytes) offset as u64 + NEEDLE_HEADER_SIZE as u64 + 4 // skip DataSize (4 bytes)
}; };
let cloned_file = dat_file.try_clone().map_err(VolumeError::Io)?;
let cloned_file = match self.dat_file.as_ref() {
Some(dat_file) => dat_file.try_clone().map_err(VolumeError::Io)?,
None => return Err(VolumeError::StreamingUnsupported),
};
Ok(NeedleStreamInfo { Ok(NeedleStreamInfo {
dat_file: cloned_file, dat_file: cloned_file,
@ -1100,20 +1199,8 @@ impl Volume {
} }
fn read_needle_header_unlocked(&self, n: &mut Needle, offset: i64) -> Result<(), VolumeError> { fn read_needle_header_unlocked(&self, n: &mut Needle, offset: i64) -> Result<(), VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let mut header = [0u8; NEEDLE_HEADER_SIZE]; let mut header = [0u8; NEEDLE_HEADER_SIZE];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut header, offset as u64)?;
}
#[cfg(windows)]
{
read_exact_at(dat_file, &mut header, offset as u64)?;
}
self.read_exact_at_backend(&mut header, offset as u64)?;
n.read_header(&header); n.read_header(&header);
Ok(()) Ok(())
@ -1291,7 +1378,9 @@ impl Volume {
/// Returns (files_checked, broken_needles) tuple. /// Returns (files_checked, broken_needles) tuple.
/// Each needle is read from disk and its CRC checksum is verified. /// Each needle is read from disk and its CRC checksum is verified.
pub fn scrub(&self) -> Result<(u64, Vec<String>), VolumeError> { pub fn scrub(&self) -> Result<(u64, Vec<String>), VolumeError> {
let _dat_file = self.dat_file.as_ref().ok_or(VolumeError::NotFound)?;
if self.dat_file.is_none() && self.remote_dat_file.is_none() {
return Err(VolumeError::NotFound);
}
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let dat_size = self.dat_file_size().map_err(|e| VolumeError::Io(e))?; let dat_size = self.dat_file_size().map_err(|e| VolumeError::Io(e))?;
@ -1338,20 +1427,17 @@ impl Volume {
&self, &self,
from_offset: u64, from_offset: u64,
) -> Result<Vec<(Vec<u8>, Vec<u8>, u64)>, VolumeError> { ) -> Result<Vec<(Vec<u8>, Vec<u8>, u64)>, VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or(VolumeError::NotFound)?;
let version = self.super_block.version; let version = self.super_block.version;
let dat_size = dat_file.metadata()?.len();
let dat_size = self.current_dat_file_size()?;
let mut entries = Vec::new(); let mut entries = Vec::new();
let mut offset = from_offset; let mut offset = from_offset;
let mut dat = dat_file.try_clone()?;
while offset < dat_size { while offset < dat_size {
// Read needle header (16 bytes) // Read needle header (16 bytes)
let mut header = [0u8; NEEDLE_HEADER_SIZE]; let mut header = [0u8; NEEDLE_HEADER_SIZE];
dat.seek(SeekFrom::Start(offset))?;
match dat.read_exact(&mut header) {
match self.read_exact_at_backend(&mut header, offset) {
Ok(()) => {} Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(VolumeError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
} }
@ -1370,10 +1456,9 @@ impl Volume {
// Read body bytes // Read body bytes
let mut body = vec![0u8; body_length as usize]; let mut body = vec![0u8; body_length as usize];
dat.seek(SeekFrom::Start(offset + NEEDLE_HEADER_SIZE as u64))?;
match dat.read_exact(&mut body) {
match self.read_exact_at_backend(&mut body, offset + NEEDLE_HEADER_SIZE as u64) {
Ok(()) => {} Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(VolumeError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
} }
@ -1582,22 +1667,11 @@ impl Volume {
/// Read the append_at_ns timestamp from a needle at the given offset in the .dat file. /// Read the append_at_ns timestamp from a needle at the given offset in the .dat file.
fn read_append_at_ns(&self, offset: Offset) -> Result<u64, VolumeError> { fn read_append_at_ns(&self, offset: Offset) -> Result<u64, VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let actual_offset = offset.to_actual_offset() as u64; let actual_offset = offset.to_actual_offset() as u64;
let version = self.version(); let version = self.version();
let mut header_buf = [0u8; NEEDLE_HEADER_SIZE]; let mut header_buf = [0u8; NEEDLE_HEADER_SIZE];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut header_buf, actual_offset)?;
}
#[cfg(not(unix))]
{
read_exact_at(dat_file, &mut header_buf, actual_offset)?;
}
self.read_exact_at_backend(&mut header_buf, actual_offset)?;
let (_cookie, _id, size) = Needle::parse_header(&header_buf); let (_cookie, _id, size) = Needle::parse_header(&header_buf);
if size.0 <= 0 { if size.0 <= 0 {
@ -1606,15 +1680,7 @@ impl Volume {
let actual_size = get_actual_size(size, version); let actual_size = get_actual_size(size, version);
let mut buf = vec![0u8; actual_size as usize]; let mut buf = vec![0u8; actual_size as usize];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut buf, actual_offset)?;
}
#[cfg(not(unix))]
{
read_exact_at(dat_file, &mut buf, actual_offset)?;
}
self.read_exact_at_backend(&mut buf, actual_offset)?;
let mut n = Needle::default(); let mut n = Needle::default();
n.read_bytes_meta_only(&mut buf, offset.to_actual_offset(), size, version)?; n.read_bytes_meta_only(&mut buf, offset.to_actual_offset(), size, version)?;
@ -1783,22 +1849,25 @@ impl Volume {
} }
pub fn dat_file_size(&self) -> io::Result<u64> { pub fn dat_file_size(&self) -> io::Result<u64> {
if let Some(ref f) = self.dat_file {
Ok(f.metadata()?.len())
} else {
Ok(0)
}
self.current_dat_file_size()
} }
/// Get the modification time of the .dat file as Unix seconds. /// Get the modification time of the .dat file as Unix seconds.
pub fn dat_file_mod_time(&self) -> u64 { pub fn dat_file_mod_time(&self) -> u64 {
self.dat_file
.as_ref()
.and_then(|f| f.metadata().ok())
.and_then(|m| m.modified().ok())
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs())
.unwrap_or(0)
if let Some(dat_file) = self.dat_file.as_ref() {
dat_file
.metadata()
.ok()
.and_then(|m| m.modified().ok())
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs())
.unwrap_or(0)
} else {
self.remote_dat_file
.as_ref()
.map(|remote_dat_file| remote_dat_file.modified_time)
.unwrap_or(0)
}
} }
pub fn idx_file_size(&self) -> u64 { pub fn idx_file_size(&self) -> u64 {
@ -1947,6 +2016,7 @@ impl Volume {
let _ = dat_file.sync_all(); let _ = dat_file.sync_all();
} }
self.dat_file = None; self.dat_file = None;
self.remote_dat_file = None;
let cpd_path = self.file_name(".cpd"); let cpd_path = self.file_name(".cpd");
let cpx_path = self.file_name(".cpx"); let cpx_path = self.file_name(".cpx");
@ -2152,6 +2222,7 @@ impl Volume {
let _ = dat_file.sync_all(); let _ = dat_file.sync_all();
} }
self.dat_file = None; self.dat_file = None;
self.remote_dat_file = None;
if let Some(ref nm) = self.nm { if let Some(ref nm) = self.nm {
let _ = nm.sync(); let _ = nm.sync();
} }
@ -2351,6 +2422,82 @@ mod tests {
use crate::storage::needle::crc::CRC; use crate::storage::needle::crc::CRC;
use tempfile::TempDir; use tempfile::TempDir;
fn spawn_fake_s3_server(body: Vec<u8>) -> (String, tokio::sync::oneshot::Sender<()>) {
use axum::http::{header, HeaderMap, HeaderValue, StatusCode};
use axum::routing::any;
use axum::Router;
let body = Arc::new(body);
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
listener.set_nonblocking(true).unwrap();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
let app = Router::new().fallback(any(move |headers: HeaderMap| {
let body = body.clone();
async move {
let bytes = body.as_ref();
if let Some(range) = headers
.get(header::RANGE)
.and_then(|value| value.to_str().ok())
{
if let Some(spec) = range.strip_prefix("bytes=") {
let (start, end) = spec.split_once('-').unwrap();
let start = start.parse::<usize>().unwrap();
let end = if end.is_empty() {
bytes.len().saturating_sub(1)
} else {
end.parse::<usize>().unwrap()
}
.min(bytes.len().saturating_sub(1));
let chunk = bytes[start..=end].to_vec();
let mut response_headers = HeaderMap::new();
response_headers.insert(
header::CONTENT_LENGTH,
HeaderValue::from_str(&chunk.len().to_string()).unwrap(),
);
response_headers.insert(
header::CONTENT_RANGE,
HeaderValue::from_str(&format!(
"bytes {}-{}/{}",
start,
end,
bytes.len()
))
.unwrap(),
);
return (StatusCode::PARTIAL_CONTENT, response_headers, chunk);
}
}
let mut response_headers = HeaderMap::new();
response_headers.insert(
header::CONTENT_LENGTH,
HeaderValue::from_str(&bytes.len().to_string()).unwrap(),
);
(StatusCode::OK, response_headers, bytes.to_vec())
}
}));
let listener = tokio::net::TcpListener::from_std(listener).unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.await;
})
.await
.unwrap();
});
});
(format!("http://{}", addr), shutdown_tx)
}
fn make_test_volume(dir: &str) -> Volume { fn make_test_volume(dir: &str) -> Volume {
Volume::new( Volume::new(
dir, dir,
@ -3046,6 +3193,115 @@ mod tests {
} }
} }
#[test]
fn test_remote_only_volume_load_reads_from_tier_backend() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let dat_bytes = {
let mut v = make_test_volume(dir);
let mut n = Needle {
id: NeedleId(7),
cookie: Cookie(0x7788),
data: b"remote-only".to_vec(),
data_size: 11,
..Needle::default()
};
v.write_needle(&mut n, true).unwrap();
v.sync_to_disk().unwrap();
std::fs::read(v.file_name(".dat")).unwrap()
};
let dat_path = format!("{}/1.dat", dir);
std::fs::remove_file(&dat_path).unwrap();
let (endpoint, shutdown_tx) = spawn_fake_s3_server(dat_bytes.clone());
crate::remote_storage::s3_tier::global_s3_tier_registry()
.write()
.unwrap()
.clear();
let tier_config = crate::remote_storage::s3_tier::S3TierConfig {
access_key: "access".to_string(),
secret_key: "secret".to_string(),
region: "us-east-1".to_string(),
bucket: "bucket-a".to_string(),
endpoint,
storage_class: "STANDARD".to_string(),
force_path_style: true,
};
{
let mut registry = crate::remote_storage::s3_tier::global_s3_tier_registry()
.write()
.unwrap();
registry.register(
"s3.default".to_string(),
crate::remote_storage::s3_tier::S3TierBackend::new(&tier_config),
);
registry.register(
"s3".to_string(),
crate::remote_storage::s3_tier::S3TierBackend::new(&tier_config),
);
}
let vif = VifVolumeInfo {
files: vec![VifRemoteFile {
backend_type: "s3".to_string(),
backend_id: "default".to_string(),
key: "remote-key".to_string(),
offset: 0,
file_size: dat_bytes.len() as u64,
modified_time: 123,
extension: ".dat".to_string(),
}],
version: Version::current().0 as u32,
bytes_offset: OFFSET_SIZE as u32,
dat_file_size: dat_bytes.len() as i64,
..VifVolumeInfo::default()
};
std::fs::write(
format!("{}/1.vif", dir),
serde_json::to_string_pretty(&vif).unwrap(),
)
.unwrap();
let v = Volume::new(
dir,
dir,
"",
VolumeId(1),
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
assert!(v.has_remote_file);
assert!(v.dat_file.is_none());
assert!(v.remote_dat_file.is_some());
let mut n = Needle {
id: NeedleId(7),
..Needle::default()
};
let size = v.read_needle(&mut n).unwrap();
assert_eq!(size, 11);
assert_eq!(n.data, b"remote-only");
let mut meta = Needle {
id: NeedleId(7),
..Needle::default()
};
match v.read_needle_stream_info(&mut meta, false) {
Ok(_) => panic!("expected remote-backed volume stream info to require fallback"),
Err(err) => assert!(matches!(err, VolumeError::StreamingUnsupported)),
}
assert_eq!(meta.data_size, 11);
let _ = shutdown_tx.send(());
}
/// Volume destroy removes .vif alongside the primary data files. /// Volume destroy removes .vif alongside the primary data files.
#[test] #[test]
fn test_destroy_removes_vif() { fn test_destroy_removes_vif() {

Loading…
Cancel
Save