diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 97882ba9b..5f2f2451d 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -544,7 +544,6 @@ impl VolumeServer for VolumeGrpcService { .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; let dat_size = v.dat_file_size().unwrap_or(0); - let dat_path = v.file_name(".dat"); let super_block_size = v.super_block.block_size() as u64; // If since_ns is very large (after all data), return empty @@ -579,37 +578,28 @@ impl VolumeServer for VolumeGrpcService { } } }; - drop(store); - - // Read the .dat file - let file = std::fs::File::open(&dat_path).map_err(|e| Status::internal(e.to_string()))?; - - let mut reader = std::io::BufReader::new(file); - use std::io::{Read, Seek, SeekFrom}; - reader - .seek(SeekFrom::Start(start_offset)) - .map_err(|e| Status::internal(e.to_string()))?; - let mut results = Vec::new(); let mut bytes_to_read = (dat_size - start_offset) as i64; let buffer_size = 2 * 1024 * 1024; + let mut offset = start_offset; while bytes_to_read > 0 { let chunk = std::cmp::min(bytes_to_read as usize, buffer_size); - let mut buf = vec![0u8; chunk]; - match reader.read(&mut buf) { - Ok(0) => break, - Ok(n) => { - buf.truncate(n); + match v.read_dat_slice(offset, chunk) { + Ok(buf) if buf.is_empty() => break, + Ok(buf) => { + let read_len = buf.len() as i64; results.push(Ok(volume_server_pb::VolumeIncrementalCopyResponse { file_content: buf, })); - bytes_to_read -= n as i64; + bytes_to_read -= read_len; + offset += read_len as u64; } Err(e) => return Err(Status::internal(e.to_string())), } } + drop(store); let stream = tokio_stream::iter(results); Ok(Response::new(Box::pin(stream))) } @@ -3640,6 +3630,14 @@ fn get_disk_usage(path: &str) -> (u64, u64) { #[cfg(test)] mod tests { use super::*; + use crate::config::MinFreeSpace; + use crate::remote_storage::s3_tier::{global_s3_tier_registry, S3TierBackend, S3TierConfig}; + use crate::security::{Guard, SigningKey}; + use crate::storage::needle_map::NeedleMapKind; + use crate::storage::store::Store; + use std::sync::RwLock; + use tempfile::TempDir; + use tokio_stream::StreamExt; #[test] fn test_parse_grpc_address_with_explicit_grpc_port() { @@ -3690,4 +3688,258 @@ mod tests { assert!(volume_is_remote_only(dat_path.to_str().unwrap(), true)); assert!(!volume_is_remote_only(dat_path.to_str().unwrap(), false)); } + + fn spawn_fake_s3_server(body: Vec) -> (String, tokio::sync::oneshot::Sender<()>) { + use axum::http::{header, HeaderMap, HeaderValue, StatusCode}; + use axum::routing::any; + use axum::Router; + + let body = Arc::new(body); + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + listener.set_nonblocking(true).unwrap(); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + + std::thread::spawn(move || { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + runtime.block_on(async move { + let app = Router::new().fallback(any(move |headers: HeaderMap| { + let body = body.clone(); + async move { + let bytes = body.as_ref(); + if let Some(range) = headers + .get(header::RANGE) + .and_then(|value| value.to_str().ok()) + { + if let Some(range_value) = range.strip_prefix("bytes=") { + let mut parts = range_value.splitn(2, '-'); + let start = parts + .next() + .and_then(|value| value.parse::().ok()) + .unwrap_or(0); + let end = parts + .next() + .and_then(|value| value.parse::().ok()) + .unwrap_or_else(|| bytes.len().saturating_sub(1)); + let start = start.min(bytes.len()); + let end = end.min(bytes.len().saturating_sub(1)); + let payload = if start > end || start >= bytes.len() { + Vec::new() + } else { + bytes[start..=end].to_vec() + }; + let mut response_headers = HeaderMap::new(); + response_headers.insert( + header::CONTENT_RANGE, + HeaderValue::from_str(&format!( + "bytes {}-{}/{}", + start, + end, + bytes.len() + )) + .unwrap(), + ); + response_headers.insert( + header::CONTENT_LENGTH, + HeaderValue::from_str(&payload.len().to_string()).unwrap(), + ); + return (StatusCode::PARTIAL_CONTENT, response_headers, payload); + } + } + + let mut response_headers = HeaderMap::new(); + response_headers.insert( + header::CONTENT_LENGTH, + HeaderValue::from_str(&bytes.len().to_string()).unwrap(), + ); + (StatusCode::OK, response_headers, bytes.to_vec()) + } + })); + + let listener = tokio::net::TcpListener::from_std(listener).unwrap(); + axum::serve(listener, app) + .with_graceful_shutdown(async move { + let _ = shutdown_rx.await; + }) + .await + .unwrap(); + }); + }); + + (format!("http://{}", addr), shutdown_tx) + } + + fn make_remote_only_service() + -> ( + VolumeGrpcService, + TempDir, + tokio::sync::oneshot::Sender<()>, + Vec, + u64, + ) { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + + let (dat_bytes, super_block_size) = { + let mut volume = crate::storage::volume::Volume::new( + dir, + dir, + "", + VolumeId(1), + NeedleMapKind::InMemory, + None, + None, + 0, + Version::current(), + ) + .unwrap(); + let mut needle = Needle { + id: NeedleId(7), + cookie: Cookie(0x7788), + data: b"remote-incremental-copy".to_vec(), + data_size: "remote-incremental-copy".len() as u32, + ..Needle::default() + }; + volume.write_needle(&mut needle, true).unwrap(); + volume.sync_to_disk().unwrap(); + ( + std::fs::read(volume.file_name(".dat")).unwrap(), + volume.super_block.block_size() as u64, + ) + }; + + let dat_path = format!("{}/1.dat", dir); + std::fs::remove_file(&dat_path).unwrap(); + + let (endpoint, shutdown_tx) = spawn_fake_s3_server(dat_bytes.clone()); + global_s3_tier_registry().write().unwrap().clear(); + let tier_config = S3TierConfig { + access_key: "access".to_string(), + secret_key: "secret".to_string(), + region: "us-east-1".to_string(), + bucket: "bucket-a".to_string(), + endpoint, + storage_class: "STANDARD".to_string(), + force_path_style: true, + }; + { + let mut registry = global_s3_tier_registry().write().unwrap(); + registry.register("s3.default".to_string(), S3TierBackend::new(&tier_config)); + registry.register("s3".to_string(), S3TierBackend::new(&tier_config)); + } + + let vif = crate::storage::volume::VifVolumeInfo { + files: vec![crate::storage::volume::VifRemoteFile { + backend_type: "s3".to_string(), + backend_id: "default".to_string(), + key: "remote-key".to_string(), + offset: 0, + file_size: dat_bytes.len() as u64, + modified_time: 123, + extension: ".dat".to_string(), + }], + version: Version::current().0 as u32, + bytes_offset: crate::storage::types::OFFSET_SIZE as u32, + dat_file_size: dat_bytes.len() as i64, + ..Default::default() + }; + std::fs::write( + format!("{}/1.vif", dir), + serde_json::to_string_pretty(&vif).unwrap(), + ) + .unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + + let state = Arc::new(VolumeServerState { + store: RwLock::new(store), + guard: RwLock::new(Guard::new( + &[], + SigningKey(vec![]), + 0, + SigningKey(vec![]), + 0, + )), + is_stopping: RwLock::new(false), + maintenance: std::sync::atomic::AtomicBool::new(false), + state_version: std::sync::atomic::AtomicU32::new(0), + concurrent_upload_limit: 0, + concurrent_download_limit: 0, + inflight_upload_data_timeout: std::time::Duration::from_secs(60), + inflight_download_data_timeout: std::time::Duration::from_secs(60), + inflight_upload_bytes: std::sync::atomic::AtomicI64::new(0), + inflight_download_bytes: std::sync::atomic::AtomicI64::new(0), + upload_notify: tokio::sync::Notify::new(), + download_notify: tokio::sync::Notify::new(), + data_center: String::new(), + rack: String::new(), + file_size_limit_bytes: 0, + maintenance_byte_per_second: 0, + is_heartbeating: std::sync::atomic::AtomicBool::new(true), + has_master: false, + pre_stop_seconds: 0, + volume_state_notify: tokio::sync::Notify::new(), + write_queue: std::sync::OnceLock::new(), + s3_tier_registry: std::sync::RwLock::new( + crate::remote_storage::s3_tier::S3TierRegistry::new(), + ), + read_mode: crate::config::ReadMode::Local, + master_url: String::new(), + master_urls: Vec::new(), + self_url: String::new(), + http_client: reqwest::Client::new(), + outgoing_http_scheme: "http".to_string(), + outgoing_grpc_tls: None, + metrics_runtime: std::sync::RwLock::new( + crate::server::volume_server::RuntimeMetricsConfig::default(), + ), + metrics_notify: tokio::sync::Notify::new(), + fix_jpg_orientation: false, + has_slow_read: false, + read_buffer_size_bytes: 1024 * 1024, + security_file: String::new(), + cli_white_list: vec![], + }); + + (VolumeGrpcService { state }, tmp, shutdown_tx, dat_bytes, super_block_size) + } + + #[tokio::test] + async fn test_volume_incremental_copy_streams_remote_only_volume_data() { + let (service, _tmp, shutdown_tx, dat_bytes, super_block_size) = make_remote_only_service(); + + let response = service + .volume_incremental_copy(Request::new( + volume_server_pb::VolumeIncrementalCopyRequest { + volume_id: 1, + since_ns: 0, + }, + )) + .await + .unwrap(); + + let mut stream = response.into_inner(); + let mut copied = Vec::new(); + while let Some(message) = stream.next().await { + copied.extend_from_slice(&message.unwrap().file_content); + } + + assert_eq!(copied, dat_bytes[super_block_size as usize..]); + + let _ = shutdown_tx.send(()); + global_s3_tier_registry().write().unwrap().clear(); + } } diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 11ddb55ab..bc04da499 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -813,6 +813,23 @@ impl Volume { } } + /// Read a raw byte range from the current .dat backend. + /// + /// This matches Go paths that stream directly from `DataBackend`, including + /// remote-only tiered volumes whose `.dat` is no longer present locally. + pub fn read_dat_slice(&self, offset: u64, size: usize) -> Result, VolumeError> { + let _guard = self.data_file_access_control.read_lock(); + let dat_size = self.current_dat_file_size()?; + if size == 0 || offset >= dat_size { + return Ok(Vec::new()); + } + + let read_len = std::cmp::min(size as u64, dat_size - offset) as usize; + let mut buf = vec![0u8; read_len]; + self.read_exact_at_backend(&mut buf, offset)?; + Ok(buf) + } + // ---- SuperBlock I/O ---- fn read_super_block(&mut self) -> Result<(), VolumeError> {