Browse Source

wire data_center/rack into VolumeServerStatus, add disk usage stats, dat file timestamps, and improve FetchAndWriteNeedle error parity

- VolumeServerStatus now returns real data_center and rack from config
- DiskStatus uses sysinfo crate for actual disk total/free/used/percent
- ReadVolumeFileStatus returns dat file modification timestamps
- FetchAndWriteNeedle produces Go-matching error messages for unknown remote storage types
rust-volume-server
Chris Lu 3 days ago
parent
commit
52926ac026
  1. 2
      seaweed-volume/src/main.rs
  2. 79
      seaweed-volume/src/server/grpc_server.rs
  3. 4
      seaweed-volume/src/server/volume_server.rs
  4. 10
      seaweed-volume/src/storage/volume.rs

2
seaweed-volume/src/main.rs

@ -88,6 +88,8 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
inflight_download_bytes: std::sync::atomic::AtomicI64::new(0),
upload_notify: tokio::sync::Notify::new(),
download_notify: tokio::sync::Notify::new(),
data_center: config.data_center.clone(),
rack: config.rack.clone(),
});
// Build HTTP routers

79
seaweed-volume/src/server/grpc_server.rs

@ -704,11 +704,12 @@ impl VolumeServer for VolumeGrpcService {
let (_, vol) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
let mod_time = vol.dat_file_mod_time();
Ok(Response::new(volume_server_pb::ReadVolumeFileStatusResponse {
volume_id: vid.0,
idx_file_timestamp_seconds: 0,
idx_file_timestamp_seconds: mod_time,
idx_file_size: vol.idx_file_size(),
dat_file_timestamp_seconds: 0,
dat_file_timestamp_seconds: mod_time,
dat_file_size: vol.dat_file_size().unwrap_or(0),
file_count: vol.file_count() as u64,
compaction_revision: vol.super_block.compaction_revision as u32,
@ -1740,16 +1741,17 @@ impl VolumeServer for VolumeGrpcService {
let mut disk_statuses = Vec::new();
for loc in &store.locations {
let free = loc.available_space.load(std::sync::atomic::Ordering::Relaxed);
// TODO: DiskLocation does not yet track total disk size.
// Once implemented, compute all/used/percent from real values.
let (all, free) = get_disk_usage(&loc.directory);
let used = all.saturating_sub(free);
let percent_free = if all > 0 { (free as f32 / all as f32) * 100.0 } else { 0.0 };
let percent_used = if all > 0 { (used as f32 / all as f32) * 100.0 } else { 0.0 };
disk_statuses.push(volume_server_pb::DiskStatus {
dir: loc.directory.clone(),
all: 0,
used: 0,
all,
used,
free,
percent_free: 0.0,
percent_used: 0.0,
percent_free,
percent_used,
disk_type: loc.disk_type.to_string(),
});
}
@ -1766,8 +1768,8 @@ impl VolumeServer for VolumeGrpcService {
stack: 0,
}),
version: env!("CARGO_PKG_VERSION").to_string(),
data_center: String::new(),
rack: String::new(),
data_center: self.state.data_center.clone(),
rack: self.state.rack.clone(),
state: Some(volume_server_pb::VolumeServerState {
maintenance: self.state.maintenance.load(Ordering::Relaxed),
version: self.state.state_version.load(Ordering::Relaxed),
@ -1792,19 +1794,31 @@ impl VolumeServer for VolumeGrpcService {
let vid = VolumeId(req.volume_id);
// Check volume exists
let store = self.state.store.read().unwrap();
store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
drop(store);
// Remote storage is not supported — fail with appropriate error
if req.remote_conf.is_some() {
return Err(Status::internal(format!(
"get remote client: remote storage type not supported"
)));
{
let store = self.state.store.read().unwrap();
store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
}
Err(Status::unimplemented("fetch_and_write_needle: remote storage not configured"))
// Get remote storage configuration
let remote_conf = req.remote_conf.as_ref().ok_or_else(|| {
Status::invalid_argument("remote storage configuration is required")
})?;
// No remote storage backends are compiled in — fail with the same error
// chain as Go: GetRemoteStorage → makeRemoteStorageClient → type not found
let remote_type = &remote_conf.r#type;
return Err(Status::internal(format!(
"get remote client: make remote storage client {}: remote storage type {} not found",
remote_conf.name, remote_type,
)));
// If remote storage were available, the flow would be:
// 1. client.read_file(remote_location, offset, size) → data
// 2. Build needle: id, cookie, data, checksum, last_modified
// 3. store.write_volume_needle(vid, &mut needle)
// 4. For each replica: HTTP POST data to replica URL with ?type=replicate
// 5. Return FetchAndWriteNeedleResponse { e_tag: needle.etag() }
}
async fn scrub_volume(
@ -2326,3 +2340,24 @@ fn find_last_append_at_ns(idx_path: &str, dat_path: &str) -> Option<u64> {
let ts = u64::from_be_bytes([tail[4], tail[5], tail[6], tail[7], tail[8], tail[9], tail[10], tail[11]]);
if ts > 0 { Some(ts) } else { None }
}
/// Get disk usage (total, free) in bytes for the given path.
fn get_disk_usage(path: &str) -> (u64, u64) {
use sysinfo::Disks;
let disks = Disks::new_with_refreshed_list();
let path = std::path::Path::new(path);
// Find the disk that contains this path (longest mount point prefix match)
let mut best: Option<&sysinfo::Disk> = None;
let mut best_len = 0;
for disk in disks.list() {
let mount = disk.mount_point();
if path.starts_with(mount) && mount.as_os_str().len() > best_len {
best_len = mount.as_os_str().len();
best = Some(disk);
}
}
match best {
Some(disk) => (disk.total_space(), disk.available_space()),
None => (0, 0),
}
}

4
seaweed-volume/src/server/volume_server.rs

@ -46,6 +46,10 @@ pub struct VolumeServerState {
/// Notify waiters when inflight bytes decrease.
pub upload_notify: tokio::sync::Notify,
pub download_notify: tokio::sync::Notify,
/// Data center name from config.
pub data_center: String,
/// Rack name from config.
pub rack: String,
}
impl VolumeServerState {

10
seaweed-volume/src/storage/volume.rs

@ -811,6 +811,16 @@ impl Volume {
}
}
/// Get the modification time of the .dat file as Unix seconds.
pub fn dat_file_mod_time(&self) -> u64 {
self.dat_file.as_ref()
.and_then(|f| f.metadata().ok())
.and_then(|m| m.modified().ok())
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs())
.unwrap_or(0)
}
pub fn idx_file_size(&self) -> u64 {
self.nm.as_ref().map_or(0, |nm| nm.index_file_size())
}

Loading…
Cancel
Save