feat: add alerting system with configurable rules and UI (MON-02, MON-03)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -436,6 +436,10 @@ impl RpcHandler {
|
||||
"monitoring.current" => self.handle_monitoring_current().await,
|
||||
"monitoring.history" => self.handle_monitoring_history(params).await,
|
||||
"monitoring.containers" => self.handle_monitoring_containers().await,
|
||||
"monitoring.alerts" => self.handle_monitoring_alerts(params).await,
|
||||
"monitoring.alert-rules" => self.handle_monitoring_alert_rules().await,
|
||||
"monitoring.configure-alert" => self.handle_monitoring_configure_alert(params).await,
|
||||
"monitoring.acknowledge-alert" => self.handle_monitoring_acknowledge_alert(params).await,
|
||||
|
||||
// System updates
|
||||
"update.check" => self.handle_update_check().await,
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::RpcHandler;
|
||||
use crate::monitoring::AlertRuleKind;
|
||||
use anyhow::Result;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -59,4 +60,76 @@ impl RpcHandler {
|
||||
None => Ok(serde_json::json!({ "containers": [] })),
|
||||
}
|
||||
}
|
||||
|
||||
/// monitoring.alerts — get fired alert history
|
||||
pub(super) async fn handle_monitoring_alerts(
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
debug!("Getting alert history");
|
||||
|
||||
let count = params
|
||||
.as_ref()
|
||||
.and_then(|p| p.get("count"))
|
||||
.and_then(|v| v.as_u64())
|
||||
.unwrap_or(50) as usize;
|
||||
|
||||
let alerts = self.metrics_store.get_fired_alerts(count).await;
|
||||
|
||||
Ok(serde_json::json!({
|
||||
"count": alerts.len(),
|
||||
"alerts": alerts,
|
||||
}))
|
||||
}
|
||||
|
||||
/// monitoring.alert-rules — get current alert rules
|
||||
pub(super) async fn handle_monitoring_alert_rules(&self) -> Result<serde_json::Value> {
|
||||
debug!("Getting alert rules");
|
||||
|
||||
let rules = self.metrics_store.get_alert_rules().await;
|
||||
Ok(serde_json::json!({ "rules": rules }))
|
||||
}
|
||||
|
||||
/// monitoring.configure-alert — update an alert rule
|
||||
pub(super) async fn handle_monitoring_configure_alert(
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
|
||||
|
||||
let kind_str = params
|
||||
.get("kind")
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing 'kind' parameter"))?;
|
||||
|
||||
let kind: AlertRuleKind = serde_json::from_value(serde_json::json!(kind_str))
|
||||
.map_err(|_| anyhow::anyhow!("Invalid alert kind: {}", kind_str))?;
|
||||
|
||||
let enabled = params.get("enabled").and_then(|v| v.as_bool());
|
||||
let threshold = params.get("threshold").and_then(|v| v.as_f64());
|
||||
|
||||
self.metrics_store
|
||||
.update_alert_rule(&kind, enabled, threshold)
|
||||
.await;
|
||||
|
||||
debug!("Updated alert rule: {:?}", kind);
|
||||
Ok(serde_json::json!({ "updated": true, "kind": kind_str }))
|
||||
}
|
||||
|
||||
/// monitoring.acknowledge-alert — acknowledge a fired alert
|
||||
pub(super) async fn handle_monitoring_acknowledge_alert(
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
|
||||
|
||||
let alert_id = params
|
||||
.get("id")
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing 'id' parameter"))?;
|
||||
|
||||
let found = self.metrics_store.acknowledge_alert(alert_id).await;
|
||||
|
||||
Ok(serde_json::json!({ "acknowledged": found, "id": alert_id }))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::collections::VecDeque;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, warn};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
/// Maximum entries at 1-minute resolution (24 hours = 1440 minutes)
|
||||
const MAX_1MIN_ENTRIES: usize = 1440;
|
||||
@@ -51,6 +51,78 @@ pub struct ContainerMetrics {
|
||||
pub block_write_bytes: u64,
|
||||
}
|
||||
|
||||
/// Maximum number of fired alerts to keep in history.
|
||||
const MAX_ALERT_HISTORY: usize = 100;
|
||||
|
||||
/// Types of alert rules the system can evaluate.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum AlertRuleKind {
|
||||
DiskUsage,
|
||||
RamUsage,
|
||||
ContainerCrash,
|
||||
BackendErrorSpike,
|
||||
SslCertExpiry,
|
||||
}
|
||||
|
||||
/// A configured alert rule with threshold and enabled state.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AlertRule {
|
||||
pub kind: AlertRuleKind,
|
||||
pub threshold: f64,
|
||||
pub enabled: bool,
|
||||
pub description: String,
|
||||
}
|
||||
|
||||
/// A fired alert instance.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct FiredAlert {
|
||||
pub id: String,
|
||||
pub kind: AlertRuleKind,
|
||||
pub message: String,
|
||||
pub value: f64,
|
||||
pub threshold: f64,
|
||||
pub timestamp: i64,
|
||||
pub acknowledged: bool,
|
||||
}
|
||||
|
||||
impl AlertRule {
|
||||
fn default_rules() -> Vec<AlertRule> {
|
||||
vec![
|
||||
AlertRule {
|
||||
kind: AlertRuleKind::DiskUsage,
|
||||
threshold: 90.0,
|
||||
enabled: true,
|
||||
description: "Disk usage exceeds threshold".to_string(),
|
||||
},
|
||||
AlertRule {
|
||||
kind: AlertRuleKind::RamUsage,
|
||||
threshold: 90.0,
|
||||
enabled: true,
|
||||
description: "RAM usage exceeds threshold".to_string(),
|
||||
},
|
||||
AlertRule {
|
||||
kind: AlertRuleKind::ContainerCrash,
|
||||
threshold: 1.0,
|
||||
enabled: true,
|
||||
description: "Container stopped unexpectedly".to_string(),
|
||||
},
|
||||
AlertRule {
|
||||
kind: AlertRuleKind::BackendErrorSpike,
|
||||
threshold: 500.0,
|
||||
enabled: true,
|
||||
description: "RPC latency exceeds threshold (ms)".to_string(),
|
||||
},
|
||||
AlertRule {
|
||||
kind: AlertRuleKind::SslCertExpiry,
|
||||
threshold: 30.0,
|
||||
enabled: true,
|
||||
description: "SSL certificate expires within N days".to_string(),
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
/// Thread-safe metrics store with ring buffers at two resolutions.
|
||||
pub struct MetricsStore {
|
||||
minute_data: RwLock<VecDeque<MetricSnapshot>>,
|
||||
@@ -58,6 +130,8 @@ pub struct MetricsStore {
|
||||
minute_count: RwLock<u32>,
|
||||
rpc_latency: RwLock<(f64, u64)>,
|
||||
ws_connections: AtomicU32,
|
||||
alert_rules: RwLock<Vec<AlertRule>>,
|
||||
fired_alerts: RwLock<VecDeque<FiredAlert>>,
|
||||
}
|
||||
|
||||
impl MetricsStore {
|
||||
@@ -68,6 +142,8 @@ impl MetricsStore {
|
||||
minute_count: RwLock::new(0),
|
||||
rpc_latency: RwLock::new((0.0, 0)),
|
||||
ws_connections: AtomicU32::new(0),
|
||||
alert_rules: RwLock::new(AlertRule::default_rules()),
|
||||
fired_alerts: RwLock::new(VecDeque::with_capacity(MAX_ALERT_HISTORY)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,10 +223,198 @@ impl MetricsStore {
|
||||
let start = buf.len().saturating_sub(last_n);
|
||||
buf.iter().skip(start).cloned().collect()
|
||||
}
|
||||
|
||||
/// Get the current alert rules.
|
||||
pub async fn get_alert_rules(&self) -> Vec<AlertRule> {
|
||||
self.alert_rules.read().await.clone()
|
||||
}
|
||||
|
||||
/// Update an alert rule by kind.
|
||||
pub async fn update_alert_rule(&self, kind: &AlertRuleKind, enabled: Option<bool>, threshold: Option<f64>) {
|
||||
let mut rules = self.alert_rules.write().await;
|
||||
if let Some(rule) = rules.iter_mut().find(|r| &r.kind == kind) {
|
||||
if let Some(e) = enabled {
|
||||
rule.enabled = e;
|
||||
}
|
||||
if let Some(t) = threshold {
|
||||
rule.threshold = t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get fired alert history.
|
||||
pub async fn get_fired_alerts(&self, last_n: usize) -> Vec<FiredAlert> {
|
||||
let buf = self.fired_alerts.read().await;
|
||||
let start = buf.len().saturating_sub(last_n);
|
||||
buf.iter().skip(start).cloned().collect()
|
||||
}
|
||||
|
||||
/// Acknowledge a fired alert by id.
|
||||
pub async fn acknowledge_alert(&self, alert_id: &str) -> bool {
|
||||
let mut buf = self.fired_alerts.write().await;
|
||||
if let Some(alert) = buf.iter_mut().find(|a| a.id == alert_id) {
|
||||
alert.acknowledged = true;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Evaluate alert rules against a snapshot and return any new alerts.
|
||||
pub async fn check_alerts(&self, snapshot: &MetricSnapshot) -> Vec<FiredAlert> {
|
||||
let rules = self.alert_rules.read().await;
|
||||
let mut new_alerts = Vec::new();
|
||||
let ts = snapshot.timestamp;
|
||||
|
||||
for rule in rules.iter() {
|
||||
if !rule.enabled {
|
||||
continue;
|
||||
}
|
||||
|
||||
match rule.kind {
|
||||
AlertRuleKind::DiskUsage => {
|
||||
if snapshot.system.disk_total_bytes > 0 {
|
||||
let pct = (snapshot.system.disk_used_bytes as f64
|
||||
/ snapshot.system.disk_total_bytes as f64)
|
||||
* 100.0;
|
||||
if pct > rule.threshold {
|
||||
new_alerts.push(FiredAlert {
|
||||
id: format!("disk-{}", ts),
|
||||
kind: AlertRuleKind::DiskUsage,
|
||||
message: format!("Disk usage at {:.1}% (threshold: {:.0}%)", pct, rule.threshold),
|
||||
value: pct,
|
||||
threshold: rule.threshold,
|
||||
timestamp: ts,
|
||||
acknowledged: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
AlertRuleKind::RamUsage => {
|
||||
if snapshot.system.mem_total_bytes > 0 {
|
||||
let pct = (snapshot.system.mem_used_bytes as f64
|
||||
/ snapshot.system.mem_total_bytes as f64)
|
||||
* 100.0;
|
||||
if pct > rule.threshold {
|
||||
new_alerts.push(FiredAlert {
|
||||
id: format!("ram-{}", ts),
|
||||
kind: AlertRuleKind::RamUsage,
|
||||
message: format!("RAM usage at {:.1}% (threshold: {:.0}%)", pct, rule.threshold),
|
||||
value: pct,
|
||||
threshold: rule.threshold,
|
||||
timestamp: ts,
|
||||
acknowledged: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
AlertRuleKind::BackendErrorSpike => {
|
||||
if snapshot.rpc_latency_ms > rule.threshold {
|
||||
new_alerts.push(FiredAlert {
|
||||
id: format!("latency-{}", ts),
|
||||
kind: AlertRuleKind::BackendErrorSpike,
|
||||
message: format!(
|
||||
"RPC latency at {:.0}ms (threshold: {:.0}ms)",
|
||||
snapshot.rpc_latency_ms, rule.threshold
|
||||
),
|
||||
value: snapshot.rpc_latency_ms,
|
||||
threshold: rule.threshold,
|
||||
timestamp: ts,
|
||||
acknowledged: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
// ContainerCrash and SslCertExpiry are checked via dedicated paths
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Store fired alerts
|
||||
if !new_alerts.is_empty() {
|
||||
let mut buf = self.fired_alerts.write().await;
|
||||
for alert in &new_alerts {
|
||||
if buf.len() >= MAX_ALERT_HISTORY {
|
||||
buf.pop_front();
|
||||
}
|
||||
buf.push_back(alert.clone());
|
||||
}
|
||||
}
|
||||
|
||||
new_alerts
|
||||
}
|
||||
|
||||
/// Fire a container crash alert (called by health monitor).
|
||||
pub async fn fire_container_alert(&self, container_name: &str, state: &str) {
|
||||
let rules = self.alert_rules.read().await;
|
||||
let enabled = rules
|
||||
.iter()
|
||||
.any(|r| r.kind == AlertRuleKind::ContainerCrash && r.enabled);
|
||||
drop(rules);
|
||||
|
||||
if !enabled {
|
||||
return;
|
||||
}
|
||||
|
||||
let ts = chrono::Utc::now().timestamp();
|
||||
let alert = FiredAlert {
|
||||
id: format!("container-{}-{}", container_name, ts),
|
||||
kind: AlertRuleKind::ContainerCrash,
|
||||
message: format!("Container '{}' is {} — may need attention", container_name, state),
|
||||
value: 1.0,
|
||||
threshold: 1.0,
|
||||
timestamp: ts,
|
||||
acknowledged: false,
|
||||
};
|
||||
|
||||
let mut buf = self.fired_alerts.write().await;
|
||||
if buf.len() >= MAX_ALERT_HISTORY {
|
||||
buf.pop_front();
|
||||
}
|
||||
buf.push_back(alert);
|
||||
}
|
||||
|
||||
/// Fire an SSL cert expiry alert.
|
||||
pub async fn fire_ssl_alert(&self, days_remaining: f64) {
|
||||
let rules = self.alert_rules.read().await;
|
||||
let threshold = rules
|
||||
.iter()
|
||||
.find(|r| r.kind == AlertRuleKind::SslCertExpiry && r.enabled)
|
||||
.map(|r| r.threshold);
|
||||
drop(rules);
|
||||
|
||||
let threshold = match threshold {
|
||||
Some(t) if days_remaining < t => t,
|
||||
_ => return,
|
||||
};
|
||||
|
||||
let ts = chrono::Utc::now().timestamp();
|
||||
let alert = FiredAlert {
|
||||
id: format!("ssl-{}", ts),
|
||||
kind: AlertRuleKind::SslCertExpiry,
|
||||
message: format!(
|
||||
"SSL certificate expires in {:.0} days (threshold: {:.0} days)",
|
||||
days_remaining, threshold
|
||||
),
|
||||
value: days_remaining,
|
||||
threshold,
|
||||
timestamp: ts,
|
||||
acknowledged: false,
|
||||
};
|
||||
|
||||
let mut buf = self.fired_alerts.write().await;
|
||||
if buf.len() >= MAX_ALERT_HISTORY {
|
||||
buf.pop_front();
|
||||
}
|
||||
buf.push_back(alert);
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn the background metrics collector (runs every 60 seconds).
|
||||
pub fn spawn_metrics_collector(store: Arc<MetricsStore>) {
|
||||
/// Also evaluates alert rules on each snapshot and pushes notifications.
|
||||
pub fn spawn_metrics_collector(
|
||||
store: Arc<MetricsStore>,
|
||||
state: Option<Arc<crate::state::StateManager>>,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
// Wait 30s for system to stabilize after boot
|
||||
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
|
||||
@@ -162,8 +426,48 @@ pub fn spawn_metrics_collector(store: Arc<MetricsStore>) {
|
||||
|
||||
match collector::collect_snapshot().await {
|
||||
Ok(snapshot) => {
|
||||
// Check alert rules before pushing (needs snapshot data)
|
||||
let alerts = store.check_alerts(&snapshot).await;
|
||||
|
||||
store.push(snapshot).await;
|
||||
debug!("Metrics snapshot collected");
|
||||
|
||||
// Push alert notifications via WebSocket
|
||||
if !alerts.is_empty() {
|
||||
if let Some(ref state_mgr) = state {
|
||||
let (mut data, _rev) = state_mgr.get_snapshot().await;
|
||||
for alert in &alerts {
|
||||
let level = match alert.kind {
|
||||
AlertRuleKind::DiskUsage | AlertRuleKind::RamUsage => {
|
||||
if alert.value > 95.0 {
|
||||
crate::data_model::NotificationLevel::Error
|
||||
} else {
|
||||
crate::data_model::NotificationLevel::Warning
|
||||
}
|
||||
}
|
||||
AlertRuleKind::ContainerCrash => {
|
||||
crate::data_model::NotificationLevel::Error
|
||||
}
|
||||
_ => crate::data_model::NotificationLevel::Warning,
|
||||
};
|
||||
let notification = crate::data_model::Notification {
|
||||
id: alert.id.clone(),
|
||||
level,
|
||||
title: format!("{:?} Alert", alert.kind),
|
||||
message: alert.message.clone(),
|
||||
timestamp: chrono::Utc::now().to_rfc3339(),
|
||||
app_id: None,
|
||||
};
|
||||
data.notifications.push(notification);
|
||||
}
|
||||
// Keep max 20 notifications
|
||||
while data.notifications.len() > 20 {
|
||||
data.notifications.remove(0);
|
||||
}
|
||||
state_mgr.update_data(data).await;
|
||||
info!("Fired {} alert(s)", alerts.len());
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to collect metrics: {}", e);
|
||||
|
||||
@@ -78,7 +78,7 @@ impl Server {
|
||||
|
||||
// Create metrics store and spawn background collector
|
||||
let metrics_store = Arc::new(MetricsStore::new());
|
||||
crate::monitoring::spawn_metrics_collector(metrics_store.clone());
|
||||
crate::monitoring::spawn_metrics_collector(metrics_store.clone(), Some(state_manager.clone()));
|
||||
|
||||
let api_handler = Arc::new(
|
||||
ApiHandler::new(config.clone(), state_manager.clone(), metrics_store).await?,
|
||||
|
||||
Reference in New Issue
Block a user