Browse Source

Stream remote dat in incremental copy

rust-volume-server
Chris Lu 3 days ago
parent
commit
9a7eb93b8a
  1. 288
      seaweed-volume/src/server/grpc_server.rs
  2. 17
      seaweed-volume/src/storage/volume.rs

288
seaweed-volume/src/server/grpc_server.rs

@ -544,7 +544,6 @@ impl VolumeServer for VolumeGrpcService {
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
let dat_size = v.dat_file_size().unwrap_or(0);
let dat_path = v.file_name(".dat");
let super_block_size = v.super_block.block_size() as u64;
// If since_ns is very large (after all data), return empty
@ -579,37 +578,28 @@ impl VolumeServer for VolumeGrpcService {
}
}
};
drop(store);
// Read the .dat file
let file = std::fs::File::open(&dat_path).map_err(|e| Status::internal(e.to_string()))?;
let mut reader = std::io::BufReader::new(file);
use std::io::{Read, Seek, SeekFrom};
reader
.seek(SeekFrom::Start(start_offset))
.map_err(|e| Status::internal(e.to_string()))?;
let mut results = Vec::new();
let mut bytes_to_read = (dat_size - start_offset) as i64;
let buffer_size = 2 * 1024 * 1024;
let mut offset = start_offset;
while bytes_to_read > 0 {
let chunk = std::cmp::min(bytes_to_read as usize, buffer_size);
let mut buf = vec![0u8; chunk];
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
buf.truncate(n);
match v.read_dat_slice(offset, chunk) {
Ok(buf) if buf.is_empty() => break,
Ok(buf) => {
let read_len = buf.len() as i64;
results.push(Ok(volume_server_pb::VolumeIncrementalCopyResponse {
file_content: buf,
}));
bytes_to_read -= n as i64;
bytes_to_read -= read_len;
offset += read_len as u64;
}
Err(e) => return Err(Status::internal(e.to_string())),
}
}
drop(store);
let stream = tokio_stream::iter(results);
Ok(Response::new(Box::pin(stream)))
}
@ -3640,6 +3630,14 @@ fn get_disk_usage(path: &str) -> (u64, u64) {
#[cfg(test)]
mod tests {
use super::*;
use crate::config::MinFreeSpace;
use crate::remote_storage::s3_tier::{global_s3_tier_registry, S3TierBackend, S3TierConfig};
use crate::security::{Guard, SigningKey};
use crate::storage::needle_map::NeedleMapKind;
use crate::storage::store::Store;
use std::sync::RwLock;
use tempfile::TempDir;
use tokio_stream::StreamExt;
#[test]
fn test_parse_grpc_address_with_explicit_grpc_port() {
@ -3690,4 +3688,258 @@ mod tests {
assert!(volume_is_remote_only(dat_path.to_str().unwrap(), true));
assert!(!volume_is_remote_only(dat_path.to_str().unwrap(), false));
}
fn spawn_fake_s3_server(body: Vec<u8>) -> (String, tokio::sync::oneshot::Sender<()>) {
use axum::http::{header, HeaderMap, HeaderValue, StatusCode};
use axum::routing::any;
use axum::Router;
let body = Arc::new(body);
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
listener.set_nonblocking(true).unwrap();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
let app = Router::new().fallback(any(move |headers: HeaderMap| {
let body = body.clone();
async move {
let bytes = body.as_ref();
if let Some(range) = headers
.get(header::RANGE)
.and_then(|value| value.to_str().ok())
{
if let Some(range_value) = range.strip_prefix("bytes=") {
let mut parts = range_value.splitn(2, '-');
let start = parts
.next()
.and_then(|value| value.parse::<usize>().ok())
.unwrap_or(0);
let end = parts
.next()
.and_then(|value| value.parse::<usize>().ok())
.unwrap_or_else(|| bytes.len().saturating_sub(1));
let start = start.min(bytes.len());
let end = end.min(bytes.len().saturating_sub(1));
let payload = if start > end || start >= bytes.len() {
Vec::new()
} else {
bytes[start..=end].to_vec()
};
let mut response_headers = HeaderMap::new();
response_headers.insert(
header::CONTENT_RANGE,
HeaderValue::from_str(&format!(
"bytes {}-{}/{}",
start,
end,
bytes.len()
))
.unwrap(),
);
response_headers.insert(
header::CONTENT_LENGTH,
HeaderValue::from_str(&payload.len().to_string()).unwrap(),
);
return (StatusCode::PARTIAL_CONTENT, response_headers, payload);
}
}
let mut response_headers = HeaderMap::new();
response_headers.insert(
header::CONTENT_LENGTH,
HeaderValue::from_str(&bytes.len().to_string()).unwrap(),
);
(StatusCode::OK, response_headers, bytes.to_vec())
}
}));
let listener = tokio::net::TcpListener::from_std(listener).unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.await;
})
.await
.unwrap();
});
});
(format!("http://{}", addr), shutdown_tx)
}
fn make_remote_only_service()
-> (
VolumeGrpcService,
TempDir,
tokio::sync::oneshot::Sender<()>,
Vec<u8>,
u64,
) {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let (dat_bytes, super_block_size) = {
let mut volume = crate::storage::volume::Volume::new(
dir,
dir,
"",
VolumeId(1),
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
let mut needle = Needle {
id: NeedleId(7),
cookie: Cookie(0x7788),
data: b"remote-incremental-copy".to_vec(),
data_size: "remote-incremental-copy".len() as u32,
..Needle::default()
};
volume.write_needle(&mut needle, true).unwrap();
volume.sync_to_disk().unwrap();
(
std::fs::read(volume.file_name(".dat")).unwrap(),
volume.super_block.block_size() as u64,
)
};
let dat_path = format!("{}/1.dat", dir);
std::fs::remove_file(&dat_path).unwrap();
let (endpoint, shutdown_tx) = spawn_fake_s3_server(dat_bytes.clone());
global_s3_tier_registry().write().unwrap().clear();
let tier_config = S3TierConfig {
access_key: "access".to_string(),
secret_key: "secret".to_string(),
region: "us-east-1".to_string(),
bucket: "bucket-a".to_string(),
endpoint,
storage_class: "STANDARD".to_string(),
force_path_style: true,
};
{
let mut registry = global_s3_tier_registry().write().unwrap();
registry.register("s3.default".to_string(), S3TierBackend::new(&tier_config));
registry.register("s3".to_string(), S3TierBackend::new(&tier_config));
}
let vif = crate::storage::volume::VifVolumeInfo {
files: vec![crate::storage::volume::VifRemoteFile {
backend_type: "s3".to_string(),
backend_id: "default".to_string(),
key: "remote-key".to_string(),
offset: 0,
file_size: dat_bytes.len() as u64,
modified_time: 123,
extension: ".dat".to_string(),
}],
version: Version::current().0 as u32,
bytes_offset: crate::storage::types::OFFSET_SIZE as u32,
dat_file_size: dat_bytes.len() as i64,
..Default::default()
};
std::fs::write(
format!("{}/1.vif", dir),
serde_json::to_string_pretty(&vif).unwrap(),
)
.unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store
.add_location(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
let state = Arc::new(VolumeServerState {
store: RwLock::new(store),
guard: RwLock::new(Guard::new(
&[],
SigningKey(vec![]),
0,
SigningKey(vec![]),
0,
)),
is_stopping: RwLock::new(false),
maintenance: std::sync::atomic::AtomicBool::new(false),
state_version: std::sync::atomic::AtomicU32::new(0),
concurrent_upload_limit: 0,
concurrent_download_limit: 0,
inflight_upload_data_timeout: std::time::Duration::from_secs(60),
inflight_download_data_timeout: std::time::Duration::from_secs(60),
inflight_upload_bytes: std::sync::atomic::AtomicI64::new(0),
inflight_download_bytes: std::sync::atomic::AtomicI64::new(0),
upload_notify: tokio::sync::Notify::new(),
download_notify: tokio::sync::Notify::new(),
data_center: String::new(),
rack: String::new(),
file_size_limit_bytes: 0,
maintenance_byte_per_second: 0,
is_heartbeating: std::sync::atomic::AtomicBool::new(true),
has_master: false,
pre_stop_seconds: 0,
volume_state_notify: tokio::sync::Notify::new(),
write_queue: std::sync::OnceLock::new(),
s3_tier_registry: std::sync::RwLock::new(
crate::remote_storage::s3_tier::S3TierRegistry::new(),
),
read_mode: crate::config::ReadMode::Local,
master_url: String::new(),
master_urls: Vec::new(),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
outgoing_grpc_tls: None,
metrics_runtime: std::sync::RwLock::new(
crate::server::volume_server::RuntimeMetricsConfig::default(),
),
metrics_notify: tokio::sync::Notify::new(),
fix_jpg_orientation: false,
has_slow_read: false,
read_buffer_size_bytes: 1024 * 1024,
security_file: String::new(),
cli_white_list: vec![],
});
(VolumeGrpcService { state }, tmp, shutdown_tx, dat_bytes, super_block_size)
}
#[tokio::test]
async fn test_volume_incremental_copy_streams_remote_only_volume_data() {
let (service, _tmp, shutdown_tx, dat_bytes, super_block_size) = make_remote_only_service();
let response = service
.volume_incremental_copy(Request::new(
volume_server_pb::VolumeIncrementalCopyRequest {
volume_id: 1,
since_ns: 0,
},
))
.await
.unwrap();
let mut stream = response.into_inner();
let mut copied = Vec::new();
while let Some(message) = stream.next().await {
copied.extend_from_slice(&message.unwrap().file_content);
}
assert_eq!(copied, dat_bytes[super_block_size as usize..]);
let _ = shutdown_tx.send(());
global_s3_tier_registry().write().unwrap().clear();
}
}

17
seaweed-volume/src/storage/volume.rs

@ -813,6 +813,23 @@ impl Volume {
}
}
/// Read a raw byte range from the current .dat backend.
///
/// This matches Go paths that stream directly from `DataBackend`, including
/// remote-only tiered volumes whose `.dat` is no longer present locally.
pub fn read_dat_slice(&self, offset: u64, size: usize) -> Result<Vec<u8>, VolumeError> {
let _guard = self.data_file_access_control.read_lock();
let dat_size = self.current_dat_file_size()?;
if size == 0 || offset >= dat_size {
return Ok(Vec::new());
}
let read_len = std::cmp::min(size as u64, dat_size - offset) as usize;
let mut buf = vec![0u8; read_len];
self.read_exact_at_backend(&mut buf, offset)?;
Ok(buf)
}
// ---- SuperBlock I/O ----
fn read_super_block(&mut self) -> Result<(), VolumeError> {

Loading…
Cancel
Save