From a4690c69060fdaa93a002078250604a82d38c1f6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 16:34:15 -0700 Subject: [PATCH] Persist EC expireAtSec in vif metadata --- seaweed-volume/src/server/grpc_server.rs | 139 ++++++++++++++++++++++- 1 file changed, 138 insertions(+), 1 deletion(-) diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index fdfa9fcc9..3f4ad8760 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -1711,7 +1711,7 @@ impl VolumeServer for VolumeGrpcService { let collection = &req.collection; // Find the volume's directory and validate collection - let (dir, idx_dir, vol_version, dat_file_size) = { + let (dir, idx_dir, vol_version, dat_file_size, expire_at_sec) = { let store = self.state.store.read().unwrap(); let (loc_idx, vol) = store .find_volume(vid) @@ -1724,11 +1724,24 @@ impl VolumeServer for VolumeGrpcService { } let version = vol.version().0 as u32; let dat_size = vol.dat_file_size().unwrap_or(0) as i64; + let expire_at_sec = { + let ttl_seconds = vol.super_block.ttl.to_seconds(); + if ttl_seconds > 0 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + + ttl_seconds + } else { + 0 + } + }; ( store.locations[loc_idx].directory.clone(), store.locations[loc_idx].idx_directory.clone(), version, dat_size, + expire_at_sec, ) }; @@ -1755,6 +1768,7 @@ impl VolumeServer for VolumeGrpcService { let vif = crate::storage::volume::VifVolumeInfo { version: vol_version, dat_file_size, + expire_at_sec, ec_shard_config: Some(crate::storage::volume::VifEcShardConfig { data_shards: data_shards, parity_shards: parity_shards, @@ -3906,6 +3920,101 @@ mod tests { (VolumeGrpcService { state }, tmp, shutdown_tx, dat_bytes, super_block_size) } + fn make_local_service_with_volume( + collection: &str, + ttl: Option, + ) -> (VolumeGrpcService, TempDir) { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + store + .add_volume( + VolumeId(1), + collection, + None, + ttl, + 0, + DiskType::HardDrive, + Version::current(), + ) + .unwrap(); + { + let (_, volume) = store.find_volume_mut(VolumeId(1)).unwrap(); + let mut needle = Needle { + id: NeedleId(11), + cookie: Cookie(0x3344), + data: b"ec-generate".to_vec(), + data_size: b"ec-generate".len() as u32, + ..Needle::default() + }; + volume.write_needle(&mut needle, true).unwrap(); + volume.sync_to_disk().unwrap(); + } + + let state = Arc::new(VolumeServerState { + store: RwLock::new(store), + guard: RwLock::new(Guard::new( + &[], + SigningKey(vec![]), + 0, + SigningKey(vec![]), + 0, + )), + is_stopping: RwLock::new(false), + maintenance: std::sync::atomic::AtomicBool::new(false), + state_version: std::sync::atomic::AtomicU32::new(0), + concurrent_upload_limit: 0, + concurrent_download_limit: 0, + inflight_upload_data_timeout: std::time::Duration::from_secs(60), + inflight_download_data_timeout: std::time::Duration::from_secs(60), + inflight_upload_bytes: std::sync::atomic::AtomicI64::new(0), + inflight_download_bytes: std::sync::atomic::AtomicI64::new(0), + upload_notify: tokio::sync::Notify::new(), + download_notify: tokio::sync::Notify::new(), + data_center: String::new(), + rack: String::new(), + file_size_limit_bytes: 0, + maintenance_byte_per_second: 0, + is_heartbeating: std::sync::atomic::AtomicBool::new(true), + has_master: false, + pre_stop_seconds: 0, + volume_state_notify: tokio::sync::Notify::new(), + write_queue: std::sync::OnceLock::new(), + s3_tier_registry: std::sync::RwLock::new( + crate::remote_storage::s3_tier::S3TierRegistry::new(), + ), + read_mode: crate::config::ReadMode::Local, + master_url: String::new(), + master_urls: Vec::new(), + self_url: String::new(), + http_client: reqwest::Client::new(), + outgoing_http_scheme: "http".to_string(), + outgoing_grpc_tls: None, + metrics_runtime: std::sync::RwLock::new( + crate::server::volume_server::RuntimeMetricsConfig::default(), + ), + metrics_notify: tokio::sync::Notify::new(), + fix_jpg_orientation: false, + has_slow_read: false, + read_buffer_size_bytes: 1024 * 1024, + security_file: String::new(), + cli_white_list: vec![], + }); + + (VolumeGrpcService { state }, tmp) + } + #[tokio::test] async fn test_volume_incremental_copy_streams_remote_only_volume_data() { let (service, _tmp, shutdown_tx, dat_bytes, super_block_size) = make_remote_only_service(); @@ -3931,4 +4040,32 @@ mod tests { let _ = shutdown_tx.send(()); global_s3_tier_registry().write().unwrap().clear(); } + + #[tokio::test] + async fn test_volume_ec_shards_generate_persists_expire_at_sec() { + let ttl = crate::storage::needle::ttl::TTL::read("3m").unwrap(); + let (service, tmp) = make_local_service_with_volume("ttl", Some(ttl)); + let before = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + + service + .volume_ec_shards_generate(Request::new( + volume_server_pb::VolumeEcShardsGenerateRequest { + volume_id: 1, + collection: "ttl".to_string(), + }, + )) + .await + .unwrap(); + + let vif_path = tmp.path().join("ttl_1.vif"); + let vif: crate::storage::volume::VifVolumeInfo = serde_json::from_str( + &std::fs::read_to_string(vif_path).unwrap(), + ) + .unwrap(); + assert!(vif.expire_at_sec >= before + ttl.to_seconds()); + assert!(vif.expire_at_sec <= before + ttl.to_seconds() + 5); + } }