Browse Source

Persist EC expireAtSec in vif metadata

rust-volume-server
Chris Lu 3 days ago
parent
commit
a4690c6906
  1. 139
      seaweed-volume/src/server/grpc_server.rs

139
seaweed-volume/src/server/grpc_server.rs

@ -1711,7 +1711,7 @@ impl VolumeServer for VolumeGrpcService {
let collection = &req.collection;
// Find the volume's directory and validate collection
let (dir, idx_dir, vol_version, dat_file_size) = {
let (dir, idx_dir, vol_version, dat_file_size, expire_at_sec) = {
let store = self.state.store.read().unwrap();
let (loc_idx, vol) = store
.find_volume(vid)
@ -1724,11 +1724,24 @@ impl VolumeServer for VolumeGrpcService {
}
let version = vol.version().0 as u32;
let dat_size = vol.dat_file_size().unwrap_or(0) as i64;
let expire_at_sec = {
let ttl_seconds = vol.super_block.ttl.to_seconds();
if ttl_seconds > 0 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
+ ttl_seconds
} else {
0
}
};
(
store.locations[loc_idx].directory.clone(),
store.locations[loc_idx].idx_directory.clone(),
version,
dat_size,
expire_at_sec,
)
};
@ -1755,6 +1768,7 @@ impl VolumeServer for VolumeGrpcService {
let vif = crate::storage::volume::VifVolumeInfo {
version: vol_version,
dat_file_size,
expire_at_sec,
ec_shard_config: Some(crate::storage::volume::VifEcShardConfig {
data_shards: data_shards,
parity_shards: parity_shards,
@ -3906,6 +3920,101 @@ mod tests {
(VolumeGrpcService { state }, tmp, shutdown_tx, dat_bytes, super_block_size)
}
fn make_local_service_with_volume(
collection: &str,
ttl: Option<crate::storage::needle::ttl::TTL>,
) -> (VolumeGrpcService, TempDir) {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store
.add_location(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
store
.add_volume(
VolumeId(1),
collection,
None,
ttl,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
{
let (_, volume) = store.find_volume_mut(VolumeId(1)).unwrap();
let mut needle = Needle {
id: NeedleId(11),
cookie: Cookie(0x3344),
data: b"ec-generate".to_vec(),
data_size: b"ec-generate".len() as u32,
..Needle::default()
};
volume.write_needle(&mut needle, true).unwrap();
volume.sync_to_disk().unwrap();
}
let state = Arc::new(VolumeServerState {
store: RwLock::new(store),
guard: RwLock::new(Guard::new(
&[],
SigningKey(vec![]),
0,
SigningKey(vec![]),
0,
)),
is_stopping: RwLock::new(false),
maintenance: std::sync::atomic::AtomicBool::new(false),
state_version: std::sync::atomic::AtomicU32::new(0),
concurrent_upload_limit: 0,
concurrent_download_limit: 0,
inflight_upload_data_timeout: std::time::Duration::from_secs(60),
inflight_download_data_timeout: std::time::Duration::from_secs(60),
inflight_upload_bytes: std::sync::atomic::AtomicI64::new(0),
inflight_download_bytes: std::sync::atomic::AtomicI64::new(0),
upload_notify: tokio::sync::Notify::new(),
download_notify: tokio::sync::Notify::new(),
data_center: String::new(),
rack: String::new(),
file_size_limit_bytes: 0,
maintenance_byte_per_second: 0,
is_heartbeating: std::sync::atomic::AtomicBool::new(true),
has_master: false,
pre_stop_seconds: 0,
volume_state_notify: tokio::sync::Notify::new(),
write_queue: std::sync::OnceLock::new(),
s3_tier_registry: std::sync::RwLock::new(
crate::remote_storage::s3_tier::S3TierRegistry::new(),
),
read_mode: crate::config::ReadMode::Local,
master_url: String::new(),
master_urls: Vec::new(),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
outgoing_grpc_tls: None,
metrics_runtime: std::sync::RwLock::new(
crate::server::volume_server::RuntimeMetricsConfig::default(),
),
metrics_notify: tokio::sync::Notify::new(),
fix_jpg_orientation: false,
has_slow_read: false,
read_buffer_size_bytes: 1024 * 1024,
security_file: String::new(),
cli_white_list: vec![],
});
(VolumeGrpcService { state }, tmp)
}
#[tokio::test]
async fn test_volume_incremental_copy_streams_remote_only_volume_data() {
let (service, _tmp, shutdown_tx, dat_bytes, super_block_size) = make_remote_only_service();
@ -3931,4 +4040,32 @@ mod tests {
let _ = shutdown_tx.send(());
global_s3_tier_registry().write().unwrap().clear();
}
#[tokio::test]
async fn test_volume_ec_shards_generate_persists_expire_at_sec() {
let ttl = crate::storage::needle::ttl::TTL::read("3m").unwrap();
let (service, tmp) = make_local_service_with_volume("ttl", Some(ttl));
let before = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
service
.volume_ec_shards_generate(Request::new(
volume_server_pb::VolumeEcShardsGenerateRequest {
volume_id: 1,
collection: "ttl".to_string(),
},
))
.await
.unwrap();
let vif_path = tmp.path().join("ttl_1.vif");
let vif: crate::storage::volume::VifVolumeInfo = serde_json::from_str(
&std::fs::read_to_string(vif_path).unwrap(),
)
.unwrap();
assert!(vif.expire_at_sec >= before + ttl.to_seconds());
assert!(vif.expire_at_sec <= before + ttl.to_seconds() + 5);
}
}
Loading…
Cancel
Save