Browse Source

Match Go memory status payloads

rust-volume-server
Chris Lu 4 days ago
parent
commit
1257d68d0f
  1. 53
      seaweed-volume/src/server/grpc_server.rs
  2. 31
      seaweed-volume/src/server/handlers.rs
  3. 102
      seaweed-volume/src/server/memory_status.rs
  4. 1
      seaweed-volume/src/server/mod.rs

53
seaweed-volume/src/server/grpc_server.rs

@ -2680,7 +2680,7 @@ impl VolumeServer for VolumeGrpcService {
Ok(Response::new(
volume_server_pb::VolumeServerStatusResponse {
disk_statuses,
memory_status: Some(get_mem_status()),
memory_status: Some(super::memory_status::collect_mem_status()),
version: crate::version::full_version().to_string(),
data_center: self.state.data_center.clone(),
rack: self.state.rack.clone(),
@ -3602,57 +3602,6 @@ fn get_disk_usage(path: &str) -> (u64, u64) {
}
}
/// Build memory status info similar to Go's stats.MemStat().
fn get_mem_status() -> volume_server_pb::MemStatus {
#[allow(unused_mut)]
let mut mem = volume_server_pb::MemStatus {
goroutines: 1,
..Default::default()
};
#[cfg(target_os = "linux")]
{
if let Some((all, free)) = get_system_memory_linux() {
mem.all = all;
mem.free = free;
mem.used = all.saturating_sub(free);
}
if let Some(rss) = get_process_rss_linux() {
mem.self_ = rss;
mem.heap = rss;
}
}
mem
}
#[cfg(target_os = "linux")]
fn get_system_memory_linux() -> Option<(u64, u64)> {
unsafe {
let mut info: libc::sysinfo = std::mem::zeroed();
if libc::sysinfo(&mut info) == 0 {
let unit = info.mem_unit as u64;
let total = info.totalram as u64 * unit;
let free = info.freeram as u64 * unit;
return Some((total, free));
}
}
None
}
#[cfg(target_os = "linux")]
fn get_process_rss_linux() -> Option<u64> {
let statm = std::fs::read_to_string("/proc/self/statm").ok()?;
let mut parts = statm.split_whitespace();
let _size = parts.next()?;
let resident = parts.next()?.parse::<u64>().ok()?;
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as i64;
if page_size <= 0 {
return None;
}
Some(resident * page_size as u64)
}
#[cfg(test)]
mod tests {
use super::*;

31
seaweed-volume/src/server/handlers.rs

@ -2430,16 +2430,17 @@ pub async fn stats_counter_handler() -> Response {
}
pub async fn stats_memory_handler() -> Response {
// Basic memory stats - Rust doesn't have GC stats like Go
let mem = super::memory_status::collect_mem_status();
let info = serde_json::json!({
"Version": crate::version::full_version(),
"Memory": {
"Mallocs": 0,
"Frees": 0,
"HeapSys": 0,
"HeapAlloc": 0,
"HeapIdle": 0,
"HeapReleased": 0,
"goroutines": mem.goroutines,
"all": mem.all,
"used": mem.used,
"free": mem.free,
"self": mem.self_,
"heap": mem.heap,
"stack": mem.stack,
},
});
(
@ -2894,6 +2895,22 @@ mod tests {
assert_eq!(resp.status(), StatusCode::RANGE_NOT_SATISFIABLE);
}
#[tokio::test]
async fn test_stats_memory_handler_matches_go_memstatus_shape() {
let response = stats_memory_handler().await;
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let payload: serde_json::Value = serde_json::from_slice(&body).unwrap();
let memory = payload.get("Memory").unwrap();
for key in ["goroutines", "all", "used", "free", "self", "heap", "stack"] {
assert!(memory.get(key).is_some(), "missing key {}", key);
}
}
#[test]
fn test_is_compressible_file_type() {
// Text types

102
seaweed-volume/src/server/memory_status.rs

@ -0,0 +1,102 @@
use crate::pb::volume_server_pb;
pub fn collect_mem_status() -> volume_server_pb::MemStatus {
#[allow(unused_mut)]
let mut mem = volume_server_pb::MemStatus {
goroutines: 1,
..Default::default()
};
#[cfg(target_os = "linux")]
{
if let Some((all, free)) = get_system_memory_linux() {
mem.all = all;
mem.free = free;
mem.used = all.saturating_sub(free);
}
if let Some(status) = read_process_status_linux() {
if status.threads > 0 {
mem.goroutines = status.threads as i32;
}
if let Some(rss) = status.rss {
mem.self_ = rss;
}
if let Some(heap) = status.data.or(status.rss) {
mem.heap = heap;
}
if let Some(stack) = status.stack {
mem.stack = stack;
}
}
}
mem
}
#[cfg(target_os = "linux")]
fn get_system_memory_linux() -> Option<(u64, u64)> {
unsafe {
let mut info: libc::sysinfo = std::mem::zeroed();
if libc::sysinfo(&mut info) == 0 {
let unit = info.mem_unit as u64;
let total = info.totalram as u64 * unit;
let free = info.freeram as u64 * unit;
return Some((total, free));
}
}
None
}
#[cfg(target_os = "linux")]
#[derive(Default)]
struct ProcessStatus {
threads: u64,
rss: Option<u64>,
data: Option<u64>,
stack: Option<u64>,
}
#[cfg(target_os = "linux")]
fn read_process_status_linux() -> Option<ProcessStatus> {
let status = std::fs::read_to_string("/proc/self/status").ok()?;
let mut out = ProcessStatus::default();
for line in status.lines() {
if let Some(value) = line.strip_prefix("Threads:") {
out.threads = value.trim().parse().ok()?;
continue;
}
if let Some(value) = parse_proc_status_kib_field(line, "VmRSS:") {
out.rss = Some(value);
continue;
}
if let Some(value) = parse_proc_status_kib_field(line, "VmData:") {
out.data = Some(value);
continue;
}
if let Some(value) = parse_proc_status_kib_field(line, "VmStk:") {
out.stack = Some(value);
}
}
Some(out)
}
#[cfg(target_os = "linux")]
fn parse_proc_status_kib_field(line: &str, prefix: &str) -> Option<u64> {
let raw = line.strip_prefix(prefix)?.trim();
let value = raw.strip_suffix(" kB").unwrap_or(raw).trim();
value.parse::<u64>().ok().map(|kib| kib * 1024)
}
#[cfg(test)]
mod tests {
use super::collect_mem_status;
#[test]
fn test_collect_mem_status_reports_live_process_state() {
let mem = collect_mem_status();
assert!(mem.goroutines > 0);
}
}

1
seaweed-volume/src/server/mod.rs

@ -3,6 +3,7 @@ pub mod grpc_client;
pub mod grpc_server;
pub mod handlers;
pub mod heartbeat;
pub mod memory_status;
pub mod profiling;
pub mod volume_server;
pub mod write_queue;
Loading…
Cancel
Save