feat: Discover view, Fleet dashboard, MeshMap, type fixes
- New Discover.vue (app store redesign) - Fleet.vue dashboard for .228 - MeshMap.vue component - Fixed Discover.vue type errors (unused var, type predicate) - Various UI updates (Apps, Dashboard, Marketplace, Mesh, Web5) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -4,8 +4,8 @@
|
||||
//! Data stays local until explicitly shared via future relay mechanism.
|
||||
|
||||
use super::RpcHandler;
|
||||
use anyhow::Result;
|
||||
use tracing::info;
|
||||
use anyhow::{Context, Result};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
const ANALYTICS_FILE: &str = "analytics-config.json";
|
||||
|
||||
@@ -202,4 +202,237 @@ impl RpcHandler {
|
||||
|
||||
Ok(report)
|
||||
}
|
||||
|
||||
// ── Fleet telemetry collector endpoints ──────────────────────────────
|
||||
|
||||
/// Receive a telemetry report from a fleet node.
|
||||
/// Stores it in telemetry-fleet/ directory, indexed by node_id.
|
||||
/// Does NOT require auth — called by remote nodes posting reports.
|
||||
pub(super) async fn handle_telemetry_ingest(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
|
||||
let report = params.context("Missing telemetry report payload")?;
|
||||
|
||||
// Validate required fields
|
||||
let node_id = report.get("node_id")
|
||||
.and_then(|v| v.as_str())
|
||||
.context("Missing required field: node_id")?;
|
||||
if node_id.is_empty() || node_id.len() > 64 {
|
||||
anyhow::bail!("Invalid node_id: must be 1-64 characters");
|
||||
}
|
||||
// Sanitize node_id to prevent path traversal
|
||||
if node_id.contains('/') || node_id.contains('\\') || node_id.contains("..") {
|
||||
anyhow::bail!("Invalid node_id: contains disallowed characters");
|
||||
}
|
||||
let _version = report.get("version")
|
||||
.and_then(|v| v.as_str())
|
||||
.context("Missing required field: version")?;
|
||||
let _reported_at = report.get("reported_at")
|
||||
.and_then(|v| v.as_str())
|
||||
.context("Missing required field: reported_at")?;
|
||||
|
||||
let fleet_dir = self.config.data_dir.join("telemetry-fleet");
|
||||
tokio::fs::create_dir_all(&fleet_dir).await
|
||||
.context("Failed to create telemetry-fleet directory")?;
|
||||
|
||||
// Write latest report (overwrites previous)
|
||||
let latest_path = fleet_dir.join(format!("{}.json", node_id));
|
||||
let report_json = serde_json::to_string_pretty(&report)
|
||||
.context("Failed to serialize report")?;
|
||||
tokio::fs::write(&latest_path, &report_json).await
|
||||
.context("Failed to write latest fleet report")?;
|
||||
|
||||
// Append to history file (cap at 200 entries)
|
||||
let history_path = fleet_dir.join(format!("{}-history.json", node_id));
|
||||
let mut history: Vec<serde_json::Value> = match tokio::fs::read_to_string(&history_path).await {
|
||||
Ok(data) => serde_json::from_str(&data).unwrap_or_default(),
|
||||
Err(_) => Vec::new(),
|
||||
};
|
||||
history.push(report.clone());
|
||||
// Keep only the last 200 entries
|
||||
if history.len() > 200 {
|
||||
let start = history.len() - 200;
|
||||
history = history.split_off(start);
|
||||
}
|
||||
let history_json = serde_json::to_string_pretty(&history)
|
||||
.context("Failed to serialize history")?;
|
||||
tokio::fs::write(&history_path, &history_json).await
|
||||
.context("Failed to write fleet history")?;
|
||||
|
||||
debug!(node_id = %node_id, "Ingested fleet telemetry report");
|
||||
|
||||
Ok(serde_json::json!({
|
||||
"status": "ok",
|
||||
"node_id": node_id,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Get all fleet nodes' latest reports.
|
||||
/// Reads all {node_id}.json files from telemetry-fleet/ (excluding *-history.json).
|
||||
pub(super) async fn handle_telemetry_fleet_status(&self) -> Result<serde_json::Value> {
|
||||
let fleet_dir = self.config.data_dir.join("telemetry-fleet");
|
||||
if !fleet_dir.exists() {
|
||||
return Ok(serde_json::json!({ "nodes": [] }));
|
||||
}
|
||||
|
||||
let mut nodes: Vec<serde_json::Value> = Vec::new();
|
||||
let mut entries = tokio::fs::read_dir(&fleet_dir).await
|
||||
.context("Failed to read telemetry-fleet directory")?;
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let file_name = entry.file_name();
|
||||
let name = file_name.to_string_lossy();
|
||||
// Skip history files and non-JSON files
|
||||
if name.ends_with("-history.json") || !name.ends_with(".json") {
|
||||
continue;
|
||||
}
|
||||
|
||||
match tokio::fs::read_to_string(entry.path()).await {
|
||||
Ok(data) => {
|
||||
match serde_json::from_str::<serde_json::Value>(&data) {
|
||||
Ok(mut report) => {
|
||||
// Compute online/offline status from reported_at
|
||||
let is_online = report.get("reported_at")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
|
||||
.map(|dt| {
|
||||
let age = chrono::Utc::now().signed_duration_since(dt);
|
||||
age.num_minutes() < 30
|
||||
})
|
||||
.unwrap_or(false);
|
||||
|
||||
// Compute human-readable last_seen
|
||||
let last_seen = report.get("reported_at")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
|
||||
.map(|dt| {
|
||||
let age = chrono::Utc::now().signed_duration_since(dt);
|
||||
let mins = age.num_minutes();
|
||||
if mins < 1 {
|
||||
"just now".to_string()
|
||||
} else if mins < 60 {
|
||||
format!("{}m ago", mins)
|
||||
} else if mins < 1440 {
|
||||
format!("{}h ago", mins / 60)
|
||||
} else {
|
||||
format!("{}d ago", mins / 1440)
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
|
||||
if let Some(obj) = report.as_object_mut() {
|
||||
obj.insert("online".to_string(), serde_json::json!(is_online));
|
||||
obj.insert("last_seen".to_string(), serde_json::json!(last_seen));
|
||||
}
|
||||
nodes.push(report);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(file = %name, error = %e, "Skipping corrupt fleet report");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(file = %name, error = %e, "Failed to read fleet report");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by node_id for stable ordering
|
||||
nodes.sort_by(|a, b| {
|
||||
let a_id = a.get("node_id").and_then(|v| v.as_str()).unwrap_or("");
|
||||
let b_id = b.get("node_id").and_then(|v| v.as_str()).unwrap_or("");
|
||||
a_id.cmp(b_id)
|
||||
});
|
||||
|
||||
info!(count = nodes.len(), "Fleet status query");
|
||||
|
||||
Ok(serde_json::json!({ "nodes": nodes }))
|
||||
}
|
||||
|
||||
/// Get history for a specific fleet node.
|
||||
/// Reads telemetry-fleet/{node_id}-history.json.
|
||||
pub(super) async fn handle_telemetry_fleet_node_history(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
|
||||
let p = params.context("Missing params")?;
|
||||
let node_id = p.get("node_id")
|
||||
.and_then(|v| v.as_str())
|
||||
.context("Missing required field: node_id")?;
|
||||
|
||||
// Sanitize node_id
|
||||
if node_id.is_empty() || node_id.len() > 64
|
||||
|| node_id.contains('/') || node_id.contains('\\') || node_id.contains("..")
|
||||
{
|
||||
anyhow::bail!("Invalid node_id");
|
||||
}
|
||||
|
||||
let history_path = self.config.data_dir
|
||||
.join("telemetry-fleet")
|
||||
.join(format!("{}-history.json", node_id));
|
||||
|
||||
let history: Vec<serde_json::Value> = match tokio::fs::read_to_string(&history_path).await {
|
||||
Ok(data) => serde_json::from_str(&data).unwrap_or_default(),
|
||||
Err(_) => Vec::new(),
|
||||
};
|
||||
|
||||
Ok(serde_json::json!({
|
||||
"node_id": node_id,
|
||||
"entries": history,
|
||||
"count": history.len(),
|
||||
}))
|
||||
}
|
||||
|
||||
/// Get aggregated fleet alerts across all nodes.
|
||||
/// Reads all fleet reports, collects recent_alerts, sorts by timestamp descending.
|
||||
pub(super) async fn handle_telemetry_fleet_alerts(&self) -> Result<serde_json::Value> {
|
||||
let fleet_dir = self.config.data_dir.join("telemetry-fleet");
|
||||
if !fleet_dir.exists() {
|
||||
return Ok(serde_json::json!({ "alerts": [] }));
|
||||
}
|
||||
|
||||
let mut all_alerts: Vec<serde_json::Value> = Vec::new();
|
||||
let mut entries = tokio::fs::read_dir(&fleet_dir).await
|
||||
.context("Failed to read telemetry-fleet directory")?;
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let file_name = entry.file_name();
|
||||
let name = file_name.to_string_lossy();
|
||||
// Only read latest reports, skip history files
|
||||
if name.ends_with("-history.json") || !name.ends_with(".json") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let data = match tokio::fs::read_to_string(entry.path()).await {
|
||||
Ok(d) => d,
|
||||
Err(_) => continue,
|
||||
};
|
||||
let report: serde_json::Value = match serde_json::from_str(&data) {
|
||||
Ok(r) => r,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let node_id = report.get("node_id")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
|
||||
if let Some(alerts) = report.get("recent_alerts").and_then(|v| v.as_array()) {
|
||||
for alert in alerts {
|
||||
let mut enriched = alert.clone();
|
||||
if let Some(obj) = enriched.as_object_mut() {
|
||||
obj.insert("node_id".to_string(), serde_json::json!(node_id));
|
||||
}
|
||||
all_alerts.push(enriched);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by timestamp descending (most recent first)
|
||||
all_alerts.sort_by(|a, b| {
|
||||
let a_ts = a.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0);
|
||||
let b_ts = b.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0);
|
||||
b_ts.cmp(&a_ts)
|
||||
});
|
||||
|
||||
Ok(serde_json::json!({
|
||||
"alerts": all_alerts,
|
||||
"count": all_alerts.len(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user