|
|
|
@ -24,6 +24,10 @@ use super::volume_server::VolumeServerState; |
|
|
|
|
|
|
|
type BoxStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + 'static>>;
|
|
|
|
|
|
|
|
fn volume_is_remote_only(dat_path: &str, has_remote_file: bool) -> bool {
|
|
|
|
has_remote_file && !std::path::Path::new(dat_path).exists()
|
|
|
|
}
|
|
|
|
|
|
|
|
struct WriteThrottler {
|
|
|
|
bytes_per_second: i64,
|
|
|
|
last_size_counter: i64,
|
|
|
|
@ -2401,8 +2405,11 @@ impl VolumeServer for VolumeGrpcService { |
|
|
|
)));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check if already on remote
|
|
|
|
if vol.has_remote_file {
|
|
|
|
let dat_path = vol.dat_path();
|
|
|
|
|
|
|
|
// Match Go's DiskFile check: if the .dat file is still local, we can
|
|
|
|
// keep tiering it even when remote file entries already exist.
|
|
|
|
if volume_is_remote_only(&dat_path, vol.has_remote_file) {
|
|
|
|
// Already on remote -- return empty stream (matches Go: returns nil)
|
|
|
|
let stream = tokio_stream::empty();
|
|
|
|
return Ok(Response::new(
|
|
|
|
@ -2424,7 +2431,7 @@ impl VolumeServer for VolumeGrpcService { |
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
vol.dat_path()
|
|
|
|
dat_path
|
|
|
|
};
|
|
|
|
|
|
|
|
// Look up the S3 tier backend
|
|
|
|
@ -3668,4 +3675,19 @@ mod tests { |
|
|
|
fn test_parse_grpc_address_invalid() {
|
|
|
|
assert!(parse_grpc_address("no-colon").is_err());
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_volume_is_remote_only_requires_missing_local_dat_file() {
|
|
|
|
let temp_dir = tempfile::tempdir().unwrap();
|
|
|
|
let dat_path = temp_dir.path().join("1.dat");
|
|
|
|
std::fs::write(&dat_path, b"dat").unwrap();
|
|
|
|
|
|
|
|
assert!(!volume_is_remote_only(dat_path.to_str().unwrap(), true));
|
|
|
|
assert!(!volume_is_remote_only(dat_path.to_str().unwrap(), false));
|
|
|
|
|
|
|
|
std::fs::remove_file(&dat_path).unwrap();
|
|
|
|
|
|
|
|
assert!(volume_is_remote_only(dat_path.to_str().unwrap(), true));
|
|
|
|
assert!(!volume_is_remote_only(dat_path.to_str().unwrap(), false));
|
|
|
|
}
|
|
|
|
}
|