feat: add real-time metrics collection with ring buffer storage (MON-01)

Implements monitoring/collector.rs that collects per-container CPU/RAM/network/disk,
system-wide metrics, RPC latency, and WebSocket connection count every 60 seconds.
Data stored in dual ring buffers: 1-min resolution (24h) and 15-min resolution (7d).
Three new RPC endpoints: monitoring.current, monitoring.history, monitoring.containers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dorian
2026-03-11 11:11:02 +00:00
parent 47c783ceac
commit baeeb72f27
8 changed files with 1285 additions and 40 deletions

View File

@@ -0,0 +1,384 @@
use super::{ContainerMetrics, MetricSnapshot, SystemMetrics};
use anyhow::{Context, Result};
/// Collect a full metrics snapshot from the system.
pub async fn collect_snapshot() -> Result<MetricSnapshot> {
let timestamp = chrono::Utc::now().timestamp();
let (cpu, mem, disk, net, load) = tokio::join!(
read_cpu_usage(),
read_meminfo(),
read_disk_usage(),
read_network_totals(),
read_loadavg(),
);
let cpu = cpu.unwrap_or(0.0);
let (mem_used, mem_total) = mem.unwrap_or((0, 0));
let (disk_used, disk_total) = disk.unwrap_or((0, 0));
let (net_rx, net_tx) = net.unwrap_or((0, 0));
let (l1, l5, l15) = load.unwrap_or((0.0, 0.0, 0.0));
let system = SystemMetrics {
cpu_percent: cpu,
mem_used_bytes: mem_used,
mem_total_bytes: mem_total,
disk_used_bytes: disk_used,
disk_total_bytes: disk_total,
net_rx_bytes: net_rx,
net_tx_bytes: net_tx,
load_avg_1: l1,
load_avg_5: l5,
load_avg_15: l15,
};
let containers = read_container_stats().await.unwrap_or_default();
Ok(MetricSnapshot {
timestamp,
system,
containers,
rpc_latency_ms: 0.0, // filled in by MetricsStore::push
ws_connections: 0, // filled in by MetricsStore::push
})
}
/// Compute CPU usage by sampling /proc/stat twice with a 250ms gap.
async fn read_cpu_usage() -> Result<f64> {
let snap1 = read_cpu_jiffies().await?;
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
let snap2 = read_cpu_jiffies().await?;
let total_delta = snap2.0.saturating_sub(snap1.0);
let idle_delta = snap2.1.saturating_sub(snap1.1);
if total_delta == 0 {
return Ok(0.0);
}
let usage = 100.0 * (1.0 - (idle_delta as f64 / total_delta as f64));
Ok((usage * 10.0).round() / 10.0)
}
/// Returns (total_jiffies, idle_jiffies) from /proc/stat.
async fn read_cpu_jiffies() -> Result<(u64, u64)> {
let content = tokio::fs::read_to_string("/proc/stat")
.await
.context("Failed to read /proc/stat")?;
let cpu_line = content
.lines()
.next()
.ok_or_else(|| anyhow::anyhow!("Empty /proc/stat"))?;
let vals: Vec<u64> = cpu_line
.split_whitespace()
.skip(1)
.filter_map(|v| v.parse().ok())
.collect();
if vals.len() < 4 {
anyhow::bail!("Not enough fields in /proc/stat cpu line");
}
let idle = vals[3];
let total: u64 = vals.iter().sum();
Ok((total, idle))
}
/// Read memory used/total from /proc/meminfo.
async fn read_meminfo() -> Result<(u64, u64)> {
let content = tokio::fs::read_to_string("/proc/meminfo")
.await
.context("Failed to read /proc/meminfo")?;
let mut total_kb: u64 = 0;
let mut available_kb: u64 = 0;
for line in content.lines() {
if let Some(val) = line.strip_prefix("MemTotal:") {
total_kb = parse_kb(val)?;
} else if let Some(val) = line.strip_prefix("MemAvailable:") {
available_kb = parse_kb(val)?;
}
}
Ok((
total_kb.saturating_sub(available_kb) * 1024,
total_kb * 1024,
))
}
fn parse_kb(val: &str) -> Result<u64> {
val.trim()
.trim_end_matches("kB")
.trim()
.parse::<u64>()
.context("parse meminfo kB value")
}
/// Read disk used/total via `df` for the root filesystem.
async fn read_disk_usage() -> Result<(u64, u64)> {
let output = tokio::process::Command::new("df")
.args(["--block-size=1", "--output=used,size", "/"])
.output()
.await
.context("Failed to run df")?;
if !output.status.success() {
anyhow::bail!("df failed: {}", String::from_utf8_lossy(&output.stderr));
}
let stdout = String::from_utf8(output.stdout).context("df output not utf8")?;
let data_line = stdout
.lines()
.nth(1)
.ok_or_else(|| anyhow::anyhow!("No data line from df"))?;
let mut parts = data_line.split_whitespace();
let used: u64 = parts
.next()
.ok_or_else(|| anyhow::anyhow!("Missing used"))?
.parse()
.context("parse df used")?;
let total: u64 = parts
.next()
.ok_or_else(|| anyhow::anyhow!("Missing total"))?
.parse()
.context("parse df total")?;
Ok((used, total))
}
/// Read load averages from /proc/loadavg.
async fn read_loadavg() -> Result<(f64, f64, f64)> {
let content = tokio::fs::read_to_string("/proc/loadavg")
.await
.context("Failed to read /proc/loadavg")?;
let mut parts = content.split_whitespace();
let l1: f64 = parts
.next()
.ok_or_else(|| anyhow::anyhow!("Missing load1"))?
.parse()
.context("parse load1")?;
let l5: f64 = parts
.next()
.ok_or_else(|| anyhow::anyhow!("Missing load5"))?
.parse()
.context("parse load5")?;
let l15: f64 = parts
.next()
.ok_or_else(|| anyhow::anyhow!("Missing load15"))?
.parse()
.context("parse load15")?;
Ok((l1, l5, l15))
}
/// Read total network RX/TX bytes from /proc/net/dev (sum of all real interfaces).
async fn read_network_totals() -> Result<(u64, u64)> {
let content = tokio::fs::read_to_string("/proc/net/dev")
.await
.context("Failed to read /proc/net/dev")?;
let mut rx_total: u64 = 0;
let mut tx_total: u64 = 0;
for line in content.lines().skip(2) {
let line = line.trim();
if line.is_empty() {
continue;
}
// Split on colon to separate interface name from data
let (iface, data) = match line.split_once(':') {
Some((i, d)) => (i.trim(), d),
None => continue,
};
// Skip loopback
if iface == "lo" {
continue;
}
let parts: Vec<&str> = data.split_whitespace().collect();
// Fields: rx_bytes rx_packets rx_errs ... (8 fields) tx_bytes tx_packets ...
if parts.len() >= 10 {
if let Ok(rx) = parts[0].parse::<u64>() {
rx_total += rx;
}
if let Ok(tx) = parts[8].parse::<u64>() {
tx_total += tx;
}
}
}
Ok((rx_total, tx_total))
}
/// Get per-container resource stats via `podman stats --no-stream --format json`.
async fn read_container_stats() -> Result<Vec<ContainerMetrics>> {
let output = tokio::process::Command::new("sudo")
.args(["podman", "stats", "--no-stream", "--format", "json"])
.output()
.await
.context("Failed to run podman stats")?;
if !output.status.success() {
anyhow::bail!(
"podman stats failed: {}",
String::from_utf8_lossy(&output.stderr)
);
}
let stdout = String::from_utf8_lossy(&output.stdout);
let entries: Vec<serde_json::Value> = serde_json::from_str(&stdout).unwrap_or_default();
Ok(entries
.iter()
.filter_map(|e| {
let name = e
.get("name")
.or_else(|| e.get("Name"))
.and_then(|v| v.as_str())?
.to_string();
Some(ContainerMetrics {
name,
cpu_percent: parse_percent_field(e, "cpu_percent")
.or_else(|| parse_percent_field(e, "CPUPerc"))
.unwrap_or(0.0),
mem_used_bytes: parse_bytes_field(e, "mem_usage")
.or_else(|| parse_bytes_field(e, "MemUsage"))
.unwrap_or(0),
mem_limit_bytes: parse_bytes_field(e, "mem_limit")
.or_else(|| parse_bytes_field(e, "MemLimit"))
.unwrap_or(0),
net_rx_bytes: parse_bytes_field(e, "net_input")
.or_else(|| parse_bytes_field(e, "NetInput"))
.unwrap_or(0),
net_tx_bytes: parse_bytes_field(e, "net_output")
.or_else(|| parse_bytes_field(e, "NetOutput"))
.unwrap_or(0),
block_read_bytes: parse_bytes_field(e, "block_input")
.or_else(|| parse_bytes_field(e, "BlockInput"))
.unwrap_or(0),
block_write_bytes: parse_bytes_field(e, "block_output")
.or_else(|| parse_bytes_field(e, "BlockOutput"))
.unwrap_or(0),
})
})
.collect())
}
/// Parse a percentage field that may be a number or a string like "12.5%".
fn parse_percent_field(obj: &serde_json::Value, key: &str) -> Option<f64> {
let val = obj.get(key)?;
if let Some(n) = val.as_f64() {
return Some(n);
}
val.as_str()?.trim_end_matches('%').trim().parse::<f64>().ok()
}
/// Parse a bytes field that may be a number or a human-readable string.
fn parse_bytes_field(obj: &serde_json::Value, key: &str) -> Option<u64> {
let val = obj.get(key)?;
if let Some(n) = val.as_u64() {
return Some(n);
}
parse_human_bytes(val.as_str()?)
}
/// Parse human-readable byte strings like "1.5GiB", "256MiB", "100MiB / 16GiB".
fn parse_human_bytes(s: &str) -> Option<u64> {
let s = s.trim();
if s == "--" || s.is_empty() {
return Some(0);
}
// Handle "X / Y" format — take only the first part
let s = s.split('/').next()?.trim();
let (num_str, multiplier) = if let Some(n) = s.strip_suffix("GiB") {
(n, 1024u64 * 1024 * 1024)
} else if let Some(n) = s.strip_suffix("MiB") {
(n, 1024u64 * 1024)
} else if let Some(n) = s.strip_suffix("KiB") {
(n, 1024u64)
} else if let Some(n) = s.strip_suffix("GB") {
(n, 1_000_000_000u64)
} else if let Some(n) = s.strip_suffix("MB") {
(n, 1_000_000u64)
} else if let Some(n) = s.strip_suffix("kB") {
(n, 1000u64)
} else if let Some(n) = s.strip_suffix('B') {
(n, 1u64)
} else {
(s, 1u64)
};
let num: f64 = num_str.trim().parse().ok()?;
Some((num * multiplier as f64) as u64)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_human_bytes_gib() {
assert_eq!(parse_human_bytes("1GiB"), Some(1073741824));
assert_eq!(parse_human_bytes("1.5GiB"), Some(1610612736));
}
#[test]
fn test_parse_human_bytes_mib() {
assert_eq!(parse_human_bytes("256MiB"), Some(268435456));
}
#[test]
fn test_parse_human_bytes_kib() {
assert_eq!(parse_human_bytes("1024KiB"), Some(1048576));
}
#[test]
fn test_parse_human_bytes_si() {
assert_eq!(parse_human_bytes("1000MB"), Some(1000000000));
assert_eq!(parse_human_bytes("100B"), Some(100));
}
#[test]
fn test_parse_human_bytes_empty() {
assert_eq!(parse_human_bytes("--"), Some(0));
assert_eq!(parse_human_bytes(""), Some(0));
}
#[test]
fn test_parse_human_bytes_slash_format() {
assert_eq!(parse_human_bytes("100MiB / 16GiB"), Some(104857600));
}
#[test]
fn test_parse_percent_string() {
let obj = serde_json::json!({"cpu": "12.5%"});
assert_eq!(parse_percent_field(&obj, "cpu"), Some(12.5));
}
#[test]
fn test_parse_percent_number() {
let obj = serde_json::json!({"cpu": 12.5});
assert_eq!(parse_percent_field(&obj, "cpu"), Some(12.5));
}
#[test]
fn test_parse_percent_missing() {
let obj = serde_json::json!({"other": 1});
assert_eq!(parse_percent_field(&obj, "cpu"), None);
}
#[test]
fn test_parse_bytes_field_number() {
let obj = serde_json::json!({"mem": 1048576});
assert_eq!(parse_bytes_field(&obj, "mem"), Some(1048576));
}
#[test]
fn test_parse_bytes_field_string() {
let obj = serde_json::json!({"mem": "256MiB"});
assert_eq!(parse_bytes_field(&obj, "mem"), Some(268435456));
}
}

View File

@@ -0,0 +1,364 @@
pub mod collector;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, warn};
/// Maximum entries at 1-minute resolution (24 hours = 1440 minutes)
const MAX_1MIN_ENTRIES: usize = 1440;
/// Maximum entries at 15-minute resolution (7 days = 672 quarter-hours)
const MAX_15MIN_ENTRIES: usize = 672;
/// A single metrics snapshot collected at a point in time.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricSnapshot {
pub timestamp: i64,
pub system: SystemMetrics,
pub containers: Vec<ContainerMetrics>,
pub rpc_latency_ms: f64,
pub ws_connections: u32,
}
/// System-wide resource metrics.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemMetrics {
pub cpu_percent: f64,
pub mem_used_bytes: u64,
pub mem_total_bytes: u64,
pub disk_used_bytes: u64,
pub disk_total_bytes: u64,
pub net_rx_bytes: u64,
pub net_tx_bytes: u64,
pub load_avg_1: f64,
pub load_avg_5: f64,
pub load_avg_15: f64,
}
/// Per-container resource metrics.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContainerMetrics {
pub name: String,
pub cpu_percent: f64,
pub mem_used_bytes: u64,
pub mem_limit_bytes: u64,
pub net_rx_bytes: u64,
pub net_tx_bytes: u64,
pub block_read_bytes: u64,
pub block_write_bytes: u64,
}
/// Thread-safe metrics store with ring buffers at two resolutions.
pub struct MetricsStore {
minute_data: RwLock<VecDeque<MetricSnapshot>>,
quarter_hour_data: RwLock<VecDeque<MetricSnapshot>>,
minute_count: RwLock<u32>,
rpc_latency: RwLock<(f64, u64)>,
ws_connections: AtomicU32,
}
impl MetricsStore {
pub fn new() -> Self {
Self {
minute_data: RwLock::new(VecDeque::with_capacity(MAX_1MIN_ENTRIES)),
quarter_hour_data: RwLock::new(VecDeque::with_capacity(MAX_15MIN_ENTRIES)),
minute_count: RwLock::new(0),
rpc_latency: RwLock::new((0.0, 0)),
ws_connections: AtomicU32::new(0),
}
}
/// Record a new metric snapshot (called every minute by collector).
pub async fn push(&self, mut snapshot: MetricSnapshot) {
// Fill in RPC latency from accumulated samples
{
let mut latency = self.rpc_latency.write().await;
if latency.1 > 0 {
snapshot.rpc_latency_ms = (latency.0 / latency.1 as f64 * 10.0).round() / 10.0;
*latency = (0.0, 0);
}
}
// Fill in current WS connection count
snapshot.ws_connections = self.ws_connections.load(Ordering::Relaxed);
// Push to 1-minute ring buffer
{
let mut buf = self.minute_data.write().await;
if buf.len() >= MAX_1MIN_ENTRIES {
buf.pop_front();
}
buf.push_back(snapshot.clone());
}
// Every 15 minutes, push to quarter-hour ring buffer
{
let mut count = self.minute_count.write().await;
*count += 1;
if *count >= 15 {
*count = 0;
let mut buf = self.quarter_hour_data.write().await;
if buf.len() >= MAX_15MIN_ENTRIES {
buf.pop_front();
}
buf.push_back(snapshot);
}
}
}
/// Record an RPC request latency sample (milliseconds).
pub async fn record_rpc_latency(&self, latency_ms: f64) {
let mut data = self.rpc_latency.write().await;
data.0 += latency_ms;
data.1 += 1;
}
/// Increment WebSocket connection count (called on connect).
pub fn increment_ws(&self) {
self.ws_connections.fetch_add(1, Ordering::Relaxed);
}
/// Decrement WebSocket connection count (called on disconnect).
pub fn decrement_ws(&self) {
// Use saturating semantics to avoid underflow
let _ = self.ws_connections.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v > 0 { Some(v - 1) } else { Some(0) }
});
}
/// Get the latest snapshot.
pub async fn latest(&self) -> Option<MetricSnapshot> {
self.minute_data.read().await.back().cloned()
}
/// Get minute-resolution data for the last N minutes.
pub async fn history_minutes(&self, last_n: usize) -> Vec<MetricSnapshot> {
let buf = self.minute_data.read().await;
let start = buf.len().saturating_sub(last_n);
buf.iter().skip(start).cloned().collect()
}
/// Get quarter-hour-resolution data for the last N entries.
pub async fn history_quarter_hours(&self, last_n: usize) -> Vec<MetricSnapshot> {
let buf = self.quarter_hour_data.read().await;
let start = buf.len().saturating_sub(last_n);
buf.iter().skip(start).cloned().collect()
}
}
/// Spawn the background metrics collector (runs every 60 seconds).
pub fn spawn_metrics_collector(store: Arc<MetricsStore>) {
tokio::spawn(async move {
// Wait 30s for system to stabilize after boot
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
loop {
interval.tick().await;
match collector::collect_snapshot().await {
Ok(snapshot) => {
store.push(snapshot).await;
debug!("Metrics snapshot collected");
}
Err(e) => {
warn!("Failed to collect metrics: {}", e);
}
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_metrics_store_new() {
let store = MetricsStore::new();
assert_eq!(store.ws_connections.load(Ordering::Relaxed), 0);
}
#[test]
fn test_ws_connection_tracking() {
let store = MetricsStore::new();
store.increment_ws();
store.increment_ws();
assert_eq!(store.ws_connections.load(Ordering::Relaxed), 2);
store.decrement_ws();
assert_eq!(store.ws_connections.load(Ordering::Relaxed), 1);
store.decrement_ws();
assert_eq!(store.ws_connections.load(Ordering::Relaxed), 0);
// Decrement below zero should stay at 0
store.decrement_ws();
assert_eq!(store.ws_connections.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn test_push_and_latest() {
let store = MetricsStore::new();
assert!(store.latest().await.is_none());
let snapshot = MetricSnapshot {
timestamp: 1000,
system: SystemMetrics {
cpu_percent: 25.0,
mem_used_bytes: 1_000_000,
mem_total_bytes: 4_000_000,
disk_used_bytes: 500_000,
disk_total_bytes: 1_000_000,
net_rx_bytes: 100,
net_tx_bytes: 200,
load_avg_1: 1.0,
load_avg_5: 0.5,
load_avg_15: 0.3,
},
containers: vec![],
rpc_latency_ms: 0.0,
ws_connections: 0,
};
store.push(snapshot).await;
let latest = store.latest().await.unwrap();
assert_eq!(latest.timestamp, 1000);
assert_eq!(latest.system.cpu_percent, 25.0);
}
#[tokio::test]
async fn test_rpc_latency_recording() {
let store = MetricsStore::new();
store.record_rpc_latency(10.0).await;
store.record_rpc_latency(20.0).await;
store.record_rpc_latency(30.0).await;
let snapshot = MetricSnapshot {
timestamp: 2000,
system: SystemMetrics {
cpu_percent: 0.0,
mem_used_bytes: 0,
mem_total_bytes: 0,
disk_used_bytes: 0,
disk_total_bytes: 0,
net_rx_bytes: 0,
net_tx_bytes: 0,
load_avg_1: 0.0,
load_avg_5: 0.0,
load_avg_15: 0.0,
},
containers: vec![],
rpc_latency_ms: 0.0,
ws_connections: 0,
};
store.push(snapshot).await;
let latest = store.latest().await.unwrap();
assert_eq!(latest.rpc_latency_ms, 20.0); // average of 10+20+30 = 20
}
#[tokio::test]
async fn test_history_minutes() {
let store = MetricsStore::new();
for i in 0..5 {
let snapshot = MetricSnapshot {
timestamp: i * 60,
system: SystemMetrics {
cpu_percent: i as f64,
mem_used_bytes: 0,
mem_total_bytes: 0,
disk_used_bytes: 0,
disk_total_bytes: 0,
net_rx_bytes: 0,
net_tx_bytes: 0,
load_avg_1: 0.0,
load_avg_5: 0.0,
load_avg_15: 0.0,
},
containers: vec![],
rpc_latency_ms: 0.0,
ws_connections: 0,
};
store.push(snapshot).await;
}
let history = store.history_minutes(3).await;
assert_eq!(history.len(), 3);
assert_eq!(history[0].timestamp, 120);
assert_eq!(history[2].timestamp, 240);
}
#[tokio::test]
async fn test_ring_buffer_eviction() {
let store = MetricsStore::new();
// Push more than MAX_1MIN_ENTRIES
for i in 0..(MAX_1MIN_ENTRIES + 10) {
let snapshot = MetricSnapshot {
timestamp: i as i64,
system: SystemMetrics {
cpu_percent: 0.0,
mem_used_bytes: 0,
mem_total_bytes: 0,
disk_used_bytes: 0,
disk_total_bytes: 0,
net_rx_bytes: 0,
net_tx_bytes: 0,
load_avg_1: 0.0,
load_avg_5: 0.0,
load_avg_15: 0.0,
},
containers: vec![],
rpc_latency_ms: 0.0,
ws_connections: 0,
};
store.push(snapshot).await;
}
let all = store.history_minutes(MAX_1MIN_ENTRIES + 100).await;
assert_eq!(all.len(), MAX_1MIN_ENTRIES);
// Oldest entry should be 10 (first 10 were evicted)
assert_eq!(all[0].timestamp, 10);
}
#[tokio::test]
async fn test_quarter_hour_downsampling() {
let store = MetricsStore::new();
// Push exactly 15 entries to trigger one quarter-hour sample
for i in 0..15 {
let snapshot = MetricSnapshot {
timestamp: i * 60,
system: SystemMetrics {
cpu_percent: 50.0,
mem_used_bytes: 0,
mem_total_bytes: 0,
disk_used_bytes: 0,
disk_total_bytes: 0,
net_rx_bytes: 0,
net_tx_bytes: 0,
load_avg_1: 0.0,
load_avg_5: 0.0,
load_avg_15: 0.0,
},
containers: vec![],
rpc_latency_ms: 0.0,
ws_connections: 0,
};
store.push(snapshot).await;
}
let qh = store.history_quarter_hours(10).await;
assert_eq!(qh.len(), 1);
assert_eq!(qh[0].timestamp, 14 * 60); // The 15th entry (index 14)
}
#[test]
fn test_constants() {
assert_eq!(MAX_1MIN_ENTRIES, 1440);
assert_eq!(MAX_15MIN_ENTRIES, 672);
}
}