feat: add real-time metrics collection with ring buffer storage (MON-01)

Implements monitoring/collector.rs that collects per-container CPU/RAM/network/disk,
system-wide metrics, RPC latency, and WebSocket connection count every 60 seconds.
Data stored in dual ring buffers: 1-min resolution (24h) and 15-min resolution (7d).
Three new RPC endpoints: monitoring.current, monitoring.history, monitoring.containers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dorian
2026-03-11 11:11:02 +00:00
parent 45032d937b
commit 592548066e
8 changed files with 1285 additions and 40 deletions

View File

@@ -1,6 +1,8 @@
use crate::api::rpc::RpcHandler;
use crate::content_server;
use crate::electrs_status;
use crate::monitoring::MetricsStore;
use crate::network::dwn_store::DwnStore;
use crate::node_message as node_msg;
use crate::config::Config;
use crate::session::{self, SessionStore};
@@ -12,26 +14,39 @@ use hyper_ws_listener::WsStream;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio_tungstenite::tungstenite::Message;
use std::time::Instant;
use tracing::{debug, info};
pub struct ApiHandler {
config: Config,
rpc_handler: Arc<RpcHandler>,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
session_store: SessionStore,
}
impl ApiHandler {
pub async fn new(config: Config, state_manager: Arc<StateManager>) -> Result<Self> {
pub async fn new(
config: Config,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
) -> Result<Self> {
let session_store = SessionStore::new();
let rpc_handler = Arc::new(
RpcHandler::new(config.clone(), state_manager.clone(), session_store.clone()).await?,
RpcHandler::new(
config.clone(),
state_manager.clone(),
metrics_store.clone(),
session_store.clone(),
)
.await?,
);
Ok(Self {
config,
rpc_handler,
state_manager,
metrics_store,
session_store,
})
}
@@ -105,7 +120,7 @@ impl ApiHandler {
if !self.is_authenticated(req.headers()).await {
return Ok(Self::unauthorized());
}
return Self::handle_websocket(req, self.state_manager.clone()).await;
return Self::handle_websocket(req, self.state_manager.clone(), self.metrics_store.clone()).await;
}
// Convert body to bytes for non-WS routes
@@ -163,6 +178,19 @@ impl ApiHandler {
Self::handle_lnd_proxy(path, &origin).await
}
// DWN health — unauthenticated
(Method::GET, "/dwn/health") => {
Self::handle_dwn_health(&self.config).await
}
// DWN message processing — authenticated
(Method::POST, "/dwn") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
Self::handle_dwn_message(body_bytes, &self.config).await
}
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(hyper::Body::from("Not Found"))
@@ -439,6 +467,7 @@ impl ApiHandler {
async fn handle_websocket(
req: Request<hyper::Body>,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
) -> Result<Response<hyper::Body>> {
let (response, ws_fut_opt) = hyper_ws_listener::create_ws(req)
.map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?;
@@ -456,6 +485,7 @@ impl ApiHandler {
return;
}
};
metrics_store.increment_ws();
info!("WebSocket /ws/db connected");
let (mut tx, mut rx) = ws_stream.split();
@@ -472,10 +502,18 @@ impl ApiHandler {
let mut state_rx = state_manager.subscribe();
let ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
tokio::pin!(ping_interval);
let mut last_client_activity = Instant::now();
const INACTIVITY_TIMEOUT_SECS: u64 = 300; // 5 minutes
loop {
tokio::select! {
_ = ping_interval.tick() => {
// Check inactivity timeout
if last_client_activity.elapsed().as_secs() >= INACTIVITY_TIMEOUT_SECS {
info!("WebSocket client inactive for {}s, closing", INACTIVITY_TIMEOUT_SECS);
let _ = tx.send(Message::Close(None)).await;
break;
}
if tx.send(Message::Ping(vec![])).await.is_err() {
debug!("Failed to send ping, connection likely closed");
break;
@@ -505,12 +543,23 @@ impl ApiHandler {
match msg {
Some(Ok(Message::Close(_))) => break,
Some(Ok(Message::Pong(_))) => {
last_client_activity = Instant::now();
debug!("Received pong");
}
Some(Ok(Message::Ping(data))) => {
last_client_activity = Instant::now();
let _ = tx.send(Message::Pong(data)).await;
}
Some(Ok(_)) => {}
Some(Ok(Message::Text(text))) => {
last_client_activity = Instant::now();
// Handle JSON ping from frontend
if text.contains("\"type\":\"ping\"") || text.contains("\"type\": \"ping\"") {
let _ = tx.send(Message::Text(r#"{"type":"pong"}"#.to_string())).await;
}
}
Some(Ok(_)) => {
last_client_activity = Instant::now();
}
Some(Err(e)) => {
debug!("WebSocket stream error: {}", e);
break;
@@ -520,6 +569,7 @@ impl ApiHandler {
}
}
}
metrics_store.decrement_ws();
info!("WebSocket /ws/db disconnected");
});
}
@@ -556,3 +606,139 @@ fn sanitize_html(s: &str) -> String {
.replace('"', "&quot;")
.replace('\'', "&#x27;")
}
impl ApiHandler {
/// DWN health endpoint — returns store stats.
async fn handle_dwn_health(config: &Config) -> Result<Response<hyper::Body>> {
match DwnStore::new(&config.data_dir).await {
Ok(store) => {
let stats = store.stats().await.unwrap_or(crate::network::dwn_store::StoreStats {
message_count: 0,
protocol_count: 0,
total_bytes: 0,
});
let body = serde_json::json!({
"status": "ok",
"message_count": stats.message_count,
"protocol_count": stats.protocol_count,
"total_bytes": stats.total_bytes,
});
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(hyper::Body::from(body.to_string()))
.unwrap())
}
Err(_) => Ok(Response::builder()
.status(StatusCode::SERVICE_UNAVAILABLE)
.header("Content-Type", "application/json")
.body(hyper::Body::from(r#"{"status":"unavailable"}"#))
.unwrap()),
}
}
/// DWN message processing endpoint — handles RecordsWrite, RecordsQuery, RecordsRead, RecordsDelete.
async fn handle_dwn_message(
body: hyper::body::Bytes,
config: &Config,
) -> Result<Response<hyper::Body>> {
let request: serde_json::Value = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(e) => {
let err = serde_json::json!({"error": format!("Invalid JSON: {}", e)});
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("Content-Type", "application/json")
.body(hyper::Body::from(err.to_string()))
.unwrap());
}
};
let interface = request["message"]["descriptor"]["interface"]
.as_str()
.unwrap_or("");
let method = request["message"]["descriptor"]["method"]
.as_str()
.unwrap_or("");
let store = DwnStore::new(&config.data_dir).await?;
let result = match (interface, method) {
("Records", "Write") => {
let author = request["message"]["author"].as_str().unwrap_or("unknown");
let protocol = request["message"]["descriptor"]["protocol"].as_str();
let schema = request["message"]["descriptor"]["schema"].as_str();
let data_format = request["message"]["descriptor"]["dataFormat"].as_str();
let data = request["message"].get("data").cloned();
match store.write_message(author, protocol, schema, data_format, data).await {
Ok(msg) => serde_json::json!({"status": {"code": 202}, "entry": msg}),
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
}
}
("Records", "Query") => {
let query = crate::network::dwn_store::MessageQuery {
protocol: request["message"]["descriptor"]["filter"]["protocol"]
.as_str()
.map(|s| s.to_string()),
schema: request["message"]["descriptor"]["filter"]["schema"]
.as_str()
.map(|s| s.to_string()),
author: request["message"]["descriptor"]["filter"]["author"]
.as_str()
.map(|s| s.to_string()),
date_from: request["message"]["descriptor"]["filter"]["dateFrom"]
.as_str()
.map(|s| s.to_string()),
date_to: request["message"]["descriptor"]["filter"]["dateTo"]
.as_str()
.map(|s| s.to_string()),
limit: request["message"]["descriptor"]["filter"]["limit"]
.as_u64()
.map(|n| n as usize),
};
match store.query_messages(&query).await {
Ok(messages) => serde_json::json!({"status": {"code": 200}, "entries": messages}),
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
}
}
("Records", "Read") => {
let record_id = request["message"]["descriptor"]["recordId"]
.as_str()
.unwrap_or("");
match store.read_message(record_id).await {
Ok(Some(msg)) => serde_json::json!({"status": {"code": 200}, "entry": msg}),
Ok(None) => serde_json::json!({"status": {"code": 404, "detail": "Record not found"}}),
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
}
}
("Records", "Delete") => {
let record_id = request["message"]["descriptor"]["recordId"]
.as_str()
.unwrap_or("");
match store.delete_message(record_id).await {
Ok(true) => serde_json::json!({"status": {"code": 200}}),
Ok(false) => serde_json::json!({"status": {"code": 404, "detail": "Record not found"}}),
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
}
}
_ => {
serde_json::json!({"status": {"code": 400, "detail": format!("Unknown method: {}.{}", interface, method)}})
}
};
let status_code = result["status"]["code"].as_u64().unwrap_or(200);
let http_status = match status_code {
202 => StatusCode::ACCEPTED,
400 => StatusCode::BAD_REQUEST,
404 => StatusCode::NOT_FOUND,
500 => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::OK,
};
Ok(Response::builder()
.status(http_status)
.header("Content-Type", "application/json")
.body(hyper::Body::from(result.to_string()))
.unwrap())
}
}

View File

@@ -1,28 +1,36 @@
mod auth;
mod backup_rpc;
mod bitcoin;
mod container;
mod content;
mod credentials;
mod dwn;
mod federation;
mod identity;
mod interfaces;
mod marketplace;
mod monitoring;
mod names;
mod lnd;
mod mesh;
mod network;
mod node;
mod nostr;
mod package;
mod peers;
mod router;
mod security;
mod tor;
mod totp;
mod system;
mod update;
mod vpn;
mod wallet;
use crate::auth::AuthManager;
use crate::config::Config;
use crate::container::DevContainerOrchestrator;
use crate::monitoring::MetricsStore;
use crate::port_allocator::PortAllocator;
use crate::session::{self, LoginRateLimiter, SessionStore};
use crate::state::StateManager;
@@ -70,6 +78,7 @@ pub struct RpcHandler {
auth_manager: AuthManager,
orchestrator: Option<Arc<DevContainerOrchestrator>>,
state_manager: Arc<StateManager>,
pub(crate) metrics_store: Arc<MetricsStore>,
port_allocator: Arc<Mutex<PortAllocator>>,
pub session_store: SessionStore,
login_rate_limiter: LoginRateLimiter,
@@ -79,6 +88,7 @@ impl RpcHandler {
pub async fn new(
config: Config,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
session_store: SessionStore,
) -> Result<Self> {
let auth_manager = AuthManager::new(config.data_dir.clone());
@@ -96,6 +106,7 @@ impl RpcHandler {
auth_manager,
orchestrator,
state_manager,
metrics_store,
port_allocator,
session_store,
login_rate_limiter: LoginRateLimiter::new(),
@@ -210,13 +221,14 @@ impl RpcHandler {
None
};
// Route to handler
// Route to handler (track latency for metrics)
let rpc_start = std::time::Instant::now();
let result = match rpc_req.method.as_str() {
"echo" => self.handle_echo(params).await,
"server.echo" => self.handle_echo(params).await,
"auth.login" => self.handle_auth_login(params).await,
"auth.logout" => self.handle_auth_logout().await,
"auth.changePassword" => self.handle_auth_change_password(params).await,
"auth.changePassword" => self.handle_auth_change_password(params, &session_token).await,
"auth.onboardingComplete" => self.handle_auth_onboarding_complete().await,
"auth.isOnboardingComplete" => self.handle_auth_is_onboarding_complete().await,
"auth.resetOnboarding" => self.handle_auth_reset_onboarding().await,
@@ -276,6 +288,8 @@ impl RpcHandler {
"lnd.sendcoins" => self.handle_lnd_sendcoins(params).await,
"lnd.createinvoice" => self.handle_lnd_createinvoice(params).await,
"lnd.payinvoice" => self.handle_lnd_payinvoice(params).await,
"lnd.create-psbt" => self.handle_lnd_create_psbt(params).await,
"lnd.finalize-psbt" => self.handle_lnd_finalize_psbt(params).await,
// Multi-identity management
"identity.list" => self.handle_identity_list(params).await,
@@ -285,6 +299,9 @@ impl RpcHandler {
"identity.set-default" => self.handle_identity_set_default(params).await,
"identity.sign" => self.handle_identity_sign(params).await,
"identity.verify" => self.handle_identity_verify(params).await,
"identity.resolve-did" => self.handle_identity_resolve_did(params).await,
"identity.resolve-remote-did" => self.handle_identity_resolve_remote_did(params).await,
"identity.verify-did-document" => self.handle_identity_verify_did_document(params).await,
"identity.create-nostr-key" => self.handle_identity_create_nostr_key(params).await,
"identity.nostr-sign" => self.handle_identity_nostr_sign(params).await,
@@ -300,6 +317,8 @@ impl RpcHandler {
"identity.verify-credential" => self.handle_identity_verify_credential(params).await,
"identity.list-credentials" => self.handle_identity_list_credentials(params).await,
"identity.revoke-credential" => self.handle_identity_revoke_credential(params).await,
"identity.create-presentation" => self.handle_identity_create_presentation(params).await,
"identity.verify-presentation" => self.handle_identity_verify_presentation(params).await,
// Network overlay
"network.get-visibility" => self.handle_network_get_visibility().await,
@@ -332,6 +351,8 @@ impl RpcHandler {
"network.scan-wifi" => self.handle_network_scan_wifi().await,
"network.configure-wifi" => self.handle_network_configure_wifi(params).await,
"network.configure-ethernet" => self.handle_network_configure_ethernet(params).await,
"network.dns-status" => self.handle_network_dns_status().await,
"network.configure-dns" => self.handle_network_configure_dns(params).await,
"router.detect" => self.handle_router_detect(params).await,
"router.info" => self.handle_router_info().await,
"router.configure" => self.handle_router_configure(params).await,
@@ -356,22 +377,122 @@ impl RpcHandler {
// DWN (Decentralized Web Node)
"dwn.status" => self.handle_dwn_status().await,
"dwn.sync" => self.handle_dwn_sync().await,
"dwn.register-protocol" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_dwn_register_protocol(&p).await
}
"dwn.list-protocols" => self.handle_dwn_list_protocols().await,
"dwn.remove-protocol" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_dwn_remove_protocol(&p).await
}
"dwn.query-messages" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_dwn_query_messages(&p).await
}
"dwn.write-message" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_dwn_write_message(&p).await
}
// Federation
"federation.invite" => self.handle_federation_invite().await,
"federation.join" => self.handle_federation_join(params).await,
"federation.list-nodes" => self.handle_federation_list_nodes().await,
"federation.remove-node" => self.handle_federation_remove_node(params).await,
"federation.set-trust" => self.handle_federation_set_trust(params).await,
"federation.sync-state" => self.handle_federation_sync_state().await,
"federation.get-state" => self.handle_federation_get_state().await,
"federation.peer-joined" => self.handle_federation_peer_joined(params).await,
"federation.deploy-app" => self.handle_federation_deploy_app(params).await,
// VPN
"vpn.status" => self.handle_vpn_status().await,
"vpn.configure" => self.handle_vpn_configure(params).await,
"vpn.disconnect" => self.handle_vpn_disconnect().await,
// Marketplace
"marketplace.discover" => self.handle_marketplace_discover().await,
"marketplace.publish" => self.handle_marketplace_publish(params).await,
"marketplace.get-manifest" => self.handle_marketplace_get_manifest(params).await,
"marketplace.list-published" => self.handle_marketplace_list_published().await,
"marketplace.verify" => self.handle_marketplace_verify(params).await,
// Mesh networking
"mesh.status" => self.handle_mesh_status().await,
"mesh.discover" => self.handle_mesh_discover(params).await,
"mesh.broadcast" => self.handle_mesh_broadcast().await,
"mesh.configure" => self.handle_mesh_configure(params).await,
// System monitoring
"system.stats" => self.handle_system_stats().await,
"system.processes" => self.handle_system_processes().await,
"system.temperature" => self.handle_system_temperature().await,
"system.detect-usb-devices" => self.handle_system_detect_usb_devices().await,
"system.disk-status" => self.handle_system_disk_status().await,
"system.disk-cleanup" => self.handle_system_disk_cleanup().await,
// Real-time metrics monitoring
"monitoring.current" => self.handle_monitoring_current().await,
"monitoring.history" => self.handle_monitoring_history(params).await,
"monitoring.containers" => self.handle_monitoring_containers().await,
// System updates
"update.check" => self.handle_update_check().await,
"update.status" => self.handle_update_status().await,
"update.dismiss" => self.handle_update_dismiss().await,
"update.download" => self.handle_update_download().await,
"update.apply" => self.handle_update_apply().await,
"update.rollback" => self.handle_update_rollback().await,
"update.get-schedule" => self.handle_update_get_schedule().await,
"update.set-schedule" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_update_set_schedule(&p).await
}
// Backup & Restore
"backup.create" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_create(&p).await
}
"backup.list" => self.handle_backup_list().await,
"backup.verify" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_verify(&p).await
}
"backup.restore" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_restore(&p).await
}
"backup.delete" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_delete(&p).await
}
"backup.list-drives" => self.handle_backup_list_drives().await,
"backup.to-usb" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_to_usb(&p).await
}
// Security / secrets
"security.rotate-secrets" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_security_rotate_secrets(&p).await
}
"security.list-expiring" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_security_list_expiring(&p).await
}
_ => {
Err(anyhow::anyhow!("Unknown method: {}", rpc_req.method))
}
};
// Record RPC latency for monitoring
let elapsed_ms = rpc_start.elapsed().as_secs_f64() * 1000.0;
self.metrics_store.record_rpc_latency(elapsed_ms).await;
// Build response
let rpc_resp = match result {
Ok(data) => RpcResponse {
@@ -465,6 +586,34 @@ impl RpcHandler {
// On successful TOTP verification, the session is already upgraded to full
// (handled inside handle_login_totp/handle_login_backup)
// On password change, rotate the session token for the caller
if rpc_req.method == "auth.changePassword" && rpc_resp.error.is_none() {
if let Some(token) = &session_token {
let new_token = self.session_store.rotate(token).await;
let csrf_token = generate_csrf_token();
response.headers_mut().append(
"Set-Cookie",
format!(
"session={}; HttpOnly; SameSite=Strict; Path=/{}",
new_token,
self.cookie_suffix()
)
.parse()
.unwrap(),
);
response.headers_mut().append(
"Set-Cookie",
format!(
"csrf_token={}; SameSite=Strict; Path=/{}",
csrf_token,
self.cookie_suffix()
)
.parse()
.unwrap(),
);
}
}
// On logout, invalidate session and expire cookies
if rpc_req.method == "auth.logout" {
if let Some(token) = &session_token {

View File

@@ -0,0 +1,62 @@
use super::RpcHandler;
use anyhow::Result;
use tracing::debug;
impl RpcHandler {
/// monitoring.current — latest metrics snapshot
pub(super) async fn handle_monitoring_current(&self) -> Result<serde_json::Value> {
debug!("Getting current metrics");
match self.metrics_store.latest().await {
Some(snapshot) => Ok(serde_json::to_value(snapshot)?),
None => Ok(serde_json::json!({ "status": "collecting", "message": "No metrics collected yet" })),
}
}
/// monitoring.history — historical metrics at given resolution
pub(super) async fn handle_monitoring_history(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
debug!("Getting metrics history");
let resolution = params
.as_ref()
.and_then(|p| p.get("resolution"))
.and_then(|v| v.as_str())
.unwrap_or("minute");
let count = params
.as_ref()
.and_then(|p| p.get("count"))
.and_then(|v| v.as_u64())
.unwrap_or(60) as usize;
// Clamp count to reasonable limits
let count = count.min(1440);
let data = match resolution {
"quarter_hour" | "15min" => self.metrics_store.history_quarter_hours(count).await,
_ => self.metrics_store.history_minutes(count).await,
};
Ok(serde_json::json!({
"resolution": resolution,
"count": data.len(),
"data": data,
}))
}
/// monitoring.containers — latest per-container metrics
pub(super) async fn handle_monitoring_containers(&self) -> Result<serde_json::Value> {
debug!("Getting container metrics");
match self.metrics_store.latest().await {
Some(snapshot) => Ok(serde_json::json!({
"timestamp": snapshot.timestamp,
"containers": snapshot.containers,
})),
None => Ok(serde_json::json!({ "containers": [] })),
}
}
}