From 52926ac02660a481c926ca5076795c6e998322f0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Mar 2026 05:47:49 -0800 Subject: [PATCH] wire data_center/rack into VolumeServerStatus, add disk usage stats, dat file timestamps, and improve FetchAndWriteNeedle error parity - VolumeServerStatus now returns real data_center and rack from config - DiskStatus uses sysinfo crate for actual disk total/free/used/percent - ReadVolumeFileStatus returns dat file modification timestamps - FetchAndWriteNeedle produces Go-matching error messages for unknown remote storage types --- seaweed-volume/src/main.rs | 2 + seaweed-volume/src/server/grpc_server.rs | 79 ++++++++++++++++------ seaweed-volume/src/server/volume_server.rs | 4 ++ seaweed-volume/src/storage/volume.rs | 10 +++ 4 files changed, 73 insertions(+), 22 deletions(-) diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index b46c900ec..f4426d2e5 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -88,6 +88,8 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box 0 { (free as f32 / all as f32) * 100.0 } else { 0.0 }; + let percent_used = if all > 0 { (used as f32 / all as f32) * 100.0 } else { 0.0 }; disk_statuses.push(volume_server_pb::DiskStatus { dir: loc.directory.clone(), - all: 0, - used: 0, + all, + used, free, - percent_free: 0.0, - percent_used: 0.0, + percent_free, + percent_used, disk_type: loc.disk_type.to_string(), }); } @@ -1766,8 +1768,8 @@ impl VolumeServer for VolumeGrpcService { stack: 0, }), version: env!("CARGO_PKG_VERSION").to_string(), - data_center: String::new(), - rack: String::new(), + data_center: self.state.data_center.clone(), + rack: self.state.rack.clone(), state: Some(volume_server_pb::VolumeServerState { maintenance: self.state.maintenance.load(Ordering::Relaxed), version: self.state.state_version.load(Ordering::Relaxed), @@ -1792,19 +1794,31 @@ impl VolumeServer for VolumeGrpcService { let vid = VolumeId(req.volume_id); // Check volume exists - let store = self.state.store.read().unwrap(); - store.find_volume(vid) - .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; - drop(store); - - // Remote storage is not supported — fail with appropriate error - if req.remote_conf.is_some() { - return Err(Status::internal(format!( - "get remote client: remote storage type not supported" - ))); + { + let store = self.state.store.read().unwrap(); + store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; } - Err(Status::unimplemented("fetch_and_write_needle: remote storage not configured")) + // Get remote storage configuration + let remote_conf = req.remote_conf.as_ref().ok_or_else(|| { + Status::invalid_argument("remote storage configuration is required") + })?; + + // No remote storage backends are compiled in — fail with the same error + // chain as Go: GetRemoteStorage → makeRemoteStorageClient → type not found + let remote_type = &remote_conf.r#type; + return Err(Status::internal(format!( + "get remote client: make remote storage client {}: remote storage type {} not found", + remote_conf.name, remote_type, + ))); + + // If remote storage were available, the flow would be: + // 1. client.read_file(remote_location, offset, size) → data + // 2. Build needle: id, cookie, data, checksum, last_modified + // 3. store.write_volume_needle(vid, &mut needle) + // 4. For each replica: HTTP POST data to replica URL with ?type=replicate + // 5. Return FetchAndWriteNeedleResponse { e_tag: needle.etag() } } async fn scrub_volume( @@ -2326,3 +2340,24 @@ fn find_last_append_at_ns(idx_path: &str, dat_path: &str) -> Option { let ts = u64::from_be_bytes([tail[4], tail[5], tail[6], tail[7], tail[8], tail[9], tail[10], tail[11]]); if ts > 0 { Some(ts) } else { None } } + +/// Get disk usage (total, free) in bytes for the given path. +fn get_disk_usage(path: &str) -> (u64, u64) { + use sysinfo::Disks; + let disks = Disks::new_with_refreshed_list(); + let path = std::path::Path::new(path); + // Find the disk that contains this path (longest mount point prefix match) + let mut best: Option<&sysinfo::Disk> = None; + let mut best_len = 0; + for disk in disks.list() { + let mount = disk.mount_point(); + if path.starts_with(mount) && mount.as_os_str().len() > best_len { + best_len = mount.as_os_str().len(); + best = Some(disk); + } + } + match best { + Some(disk) => (disk.total_space(), disk.available_space()), + None => (0, 0), + } +} diff --git a/seaweed-volume/src/server/volume_server.rs b/seaweed-volume/src/server/volume_server.rs index 1bf32e206..d9173da15 100644 --- a/seaweed-volume/src/server/volume_server.rs +++ b/seaweed-volume/src/server/volume_server.rs @@ -46,6 +46,10 @@ pub struct VolumeServerState { /// Notify waiters when inflight bytes decrease. pub upload_notify: tokio::sync::Notify, pub download_notify: tokio::sync::Notify, + /// Data center name from config. + pub data_center: String, + /// Rack name from config. + pub rack: String, } impl VolumeServerState { diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 2c39f399a..dd1d18d67 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -811,6 +811,16 @@ impl Volume { } } + /// Get the modification time of the .dat file as Unix seconds. + pub fn dat_file_mod_time(&self) -> u64 { + self.dat_file.as_ref() + .and_then(|f| f.metadata().ok()) + .and_then(|m| m.modified().ok()) + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_secs()) + .unwrap_or(0) + } + pub fn idx_file_size(&self) -> u64 { self.nm.as_ref().map_or(0, |nm| nm.index_file_size()) }