diff --git a/core/archipelago/src/fips/dial.rs b/core/archipelago/src/fips/dial.rs new file mode 100644 index 00000000..d99465ab --- /dev/null +++ b/core/archipelago/src/fips/dial.rs @@ -0,0 +1,289 @@ +//! Dial peers over the FIPS mesh. +//! +//! The FIPS daemon exposes a local DNS resolver on `127.0.0.1:5354` that +//! answers AAAA queries for `.fips` with the peer's ULA address on +//! the `fips0` TUN. Once resolved we speak plain HTTP to the peer on +//! [`PEER_PORT`] — the same port `127.0.0.1:5678` where the archipelago +//! backend serves the existing signed peer-to-peer endpoints +//! (`/rpc/v1`, `/archipelago/node-message`, `/content/{id}`, …). The +//! server-side binding to the `fips0` address is handled in `server.rs`. +//! +//! The module is deliberately dependency-free for DNS — one packet in, +//! one packet out, standard RFC 1035 wire format — to avoid pulling +//! hickory-resolver's transitive tree for a single AAAA query. +//! +//! On any failure (daemon down, peer not in the identity cache, TUN +//! unreachable) callers fall back to the Tor transport. +//! +//! # Examples +//! ```ignore +//! let base = crate::fips::dial::peer_base_url("npub1…").await?; +//! // base = "http://[fd9d:…]:5678" +//! let client = crate::fips::dial::client(); +//! let resp = client.get(format!("{}/content/abc", base)).send().await?; +//! ``` +#![allow(dead_code)] + +use anyhow::{Context, Result}; +use std::net::{IpAddr, Ipv6Addr}; +use std::time::Duration; +use tokio::net::UdpSocket; + +/// Port the archipelago backend listens on for FIPS peer-to-peer traffic. +/// Separate from the localhost-only internal port (5678) so the per-listener +/// path filter can restrict the exposed surface. +pub const PEER_PORT: u16 = 5679; + +/// DNS suffix appended to a peer's bech32 npub. +pub const FIPS_DNS_SUFFIX: &str = "fips"; + +/// FIPS daemon's local DNS resolver. +pub const FIPS_DNS_ADDR: &str = "127.0.0.1:5354"; + +/// Short DNS query timeout — FIPS DNS is a local process; a slow answer +/// almost certainly means the daemon is gone. +const DNS_TIMEOUT: Duration = Duration::from_secs(2); + +/// DNS AAAA query type. +const QTYPE_AAAA: u16 = 28; + +/// DNS IN class. +const QCLASS_IN: u16 = 1; + +/// Resolve a peer's bech32 npub to their `fips0` ULA address via the local +/// FIPS DNS resolver. +pub async fn resolve(npub: &str) -> Result { + let sock = UdpSocket::bind("127.0.0.1:0") + .await + .context("bind UDP socket for FIPS DNS")?; + sock.connect(FIPS_DNS_ADDR) + .await + .context("connect to FIPS DNS")?; + + let id: u16 = rand::random(); + let query = encode_query(id, npub)?; + tokio::time::timeout(DNS_TIMEOUT, sock.send(&query)) + .await + .context("FIPS DNS query timed out on send")? + .context("FIPS DNS send")?; + + let mut buf = [0u8; 512]; + let n = tokio::time::timeout(DNS_TIMEOUT, sock.recv(&mut buf)) + .await + .context("FIPS DNS query timed out on recv")? + .context("FIPS DNS recv")?; + + decode_response(id, &buf[..n], npub) +} + +/// Return a peer's base URL on the FIPS overlay, e.g. `http://[fd9d:…]:5678`. +pub async fn peer_base_url(npub: &str) -> Result { + let ip = resolve(npub).await?; + Ok(format!("http://[{}]:{}", ip, PEER_PORT)) +} + +/// Build an HTTP client tuned for FIPS peer-to-peer dialing. No proxy, +/// short timeout — fall back to Tor on failure. +pub fn client() -> reqwest::Client { + reqwest::Client::builder() + .timeout(Duration::from_secs(20)) + .connect_timeout(Duration::from_secs(5)) + .user_agent("archipelago-fips/1") + .build() + .expect("static reqwest client config") +} + +// ── DNS wire-format helpers ───────────────────────────────────────────── + +fn encode_query(id: u16, npub: &str) -> Result> { + let mut out = Vec::with_capacity(64 + npub.len()); + // Header + out.extend_from_slice(&id.to_be_bytes()); + out.extend_from_slice(&0x0100u16.to_be_bytes()); // RD=1, std query + out.extend_from_slice(&1u16.to_be_bytes()); // QDCOUNT + out.extend_from_slice(&0u16.to_be_bytes()); // ANCOUNT + out.extend_from_slice(&0u16.to_be_bytes()); // NSCOUNT + out.extend_from_slice(&0u16.to_be_bytes()); // ARCOUNT + + // QNAME — two labels: "" and "fips". + encode_label(&mut out, npub)?; + encode_label(&mut out, FIPS_DNS_SUFFIX)?; + out.push(0); // root + // QTYPE + QCLASS + out.extend_from_slice(&QTYPE_AAAA.to_be_bytes()); + out.extend_from_slice(&QCLASS_IN.to_be_bytes()); + Ok(out) +} + +fn encode_label(out: &mut Vec, label: &str) -> Result<()> { + if label.is_empty() || label.len() > 63 { + anyhow::bail!("invalid DNS label length: {}", label.len()); + } + out.push(label.len() as u8); + out.extend_from_slice(label.as_bytes()); + Ok(()) +} + +fn decode_response(expected_id: u16, buf: &[u8], npub: &str) -> Result { + if buf.len() < 12 { + anyhow::bail!("DNS response too short"); + } + let id = u16::from_be_bytes([buf[0], buf[1]]); + if id != expected_id { + anyhow::bail!("DNS response id mismatch"); + } + let rcode = buf[3] & 0x0F; + if rcode != 0 { + anyhow::bail!("DNS rcode {} resolving {}.fips", rcode, npub); + } + let qdcount = u16::from_be_bytes([buf[4], buf[5]]) as usize; + let ancount = u16::from_be_bytes([buf[6], buf[7]]) as usize; + if ancount == 0 { + anyhow::bail!("no AAAA record for {}.fips", npub); + } + + let mut pos = 12; + // Skip question section(s) + for _ in 0..qdcount { + pos = skip_name(buf, pos)?; + pos = pos + .checked_add(4) + .ok_or_else(|| anyhow::anyhow!("qsection overflow"))?; + if pos > buf.len() { + anyhow::bail!("qsection past end"); + } + } + + // Walk answers; return the first valid AAAA rdata. + for _ in 0..ancount { + pos = skip_name(buf, pos)?; + if pos + 10 > buf.len() { + anyhow::bail!("answer RR past end"); + } + let rtype = u16::from_be_bytes([buf[pos], buf[pos + 1]]); + let rclass = u16::from_be_bytes([buf[pos + 2], buf[pos + 3]]); + let rdlength = u16::from_be_bytes([buf[pos + 8], buf[pos + 9]]) as usize; + pos += 10; + if pos + rdlength > buf.len() { + anyhow::bail!("rdata past end"); + } + if rtype == QTYPE_AAAA && rclass == QCLASS_IN && rdlength == 16 { + let mut octets = [0u8; 16]; + octets.copy_from_slice(&buf[pos..pos + 16]); + return Ok(Ipv6Addr::from(octets)); + } + pos += rdlength; + } + anyhow::bail!("no AAAA answer for {}.fips", npub) +} + +/// Advance past a DNS name (handles compressed pointers). Returns the +/// position immediately after the name. +fn skip_name(buf: &[u8], mut pos: usize) -> Result { + loop { + if pos >= buf.len() { + anyhow::bail!("name past end"); + } + let len = buf[pos]; + if len == 0 { + return Ok(pos + 1); + } + if len & 0xC0 == 0xC0 { + // Compressed pointer — 2 bytes total, no further labels. + if pos + 2 > buf.len() { + anyhow::bail!("pointer past end"); + } + return Ok(pos + 2); + } + if len & 0xC0 != 0 { + anyhow::bail!("reserved label type"); + } + pos = pos + .checked_add(1 + len as usize) + .ok_or_else(|| anyhow::anyhow!("name overflow"))?; + } +} + +/// Treat `IpAddr::V6` as the raw address for ergonomic callers. +pub fn as_ip_addr(v6: Ipv6Addr) -> IpAddr { + IpAddr::V6(v6) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn encode_query_round_trip_header_is_correct() { + let q = encode_query(0x1234, "npub1abc").unwrap(); + assert_eq!(&q[0..2], &[0x12, 0x34]); + assert_eq!(&q[2..4], &[0x01, 0x00]); // flags RD=1 + assert_eq!(&q[4..6], &[0x00, 0x01]); // QDCOUNT=1 + // Tail: QTYPE=28, QCLASS=1 + assert_eq!(&q[q.len() - 4..], &[0x00, 0x1C, 0x00, 0x01]); + } + + #[test] + fn encode_query_includes_both_labels() { + let q = encode_query(0, "npub1xyz").unwrap(); + assert!(q.windows(9).any(|w| w == b"\x08npub1xyz")); + assert!(q.windows(5).any(|w| w == b"\x04fips")); + } + + #[test] + fn decode_response_returns_aaaa_rdata() { + // Minimal crafted response: header + qsection + one AAAA answer. + let id = 0xBEEFu16; + let mut r = Vec::new(); + r.extend_from_slice(&id.to_be_bytes()); + r.extend_from_slice(&0x8180u16.to_be_bytes()); // QR=1, RD=1, RA=1, rcode=0 + r.extend_from_slice(&1u16.to_be_bytes()); // QDCOUNT + r.extend_from_slice(&1u16.to_be_bytes()); // ANCOUNT + r.extend_from_slice(&0u16.to_be_bytes()); // NSCOUNT + r.extend_from_slice(&0u16.to_be_bytes()); // ARCOUNT + // Question: 1 label "a" + "fips" + r.extend_from_slice(b"\x01a\x04fips\x00"); + r.extend_from_slice(&QTYPE_AAAA.to_be_bytes()); + r.extend_from_slice(&QCLASS_IN.to_be_bytes()); + // Answer: compressed name pointing at question offset 12 + r.extend_from_slice(&[0xC0, 0x0C]); + r.extend_from_slice(&QTYPE_AAAA.to_be_bytes()); + r.extend_from_slice(&QCLASS_IN.to_be_bytes()); + r.extend_from_slice(&300u32.to_be_bytes()); // TTL + r.extend_from_slice(&16u16.to_be_bytes()); // RDLENGTH + let ip: Ipv6Addr = "fd9d:1192:e800:bad0:eed3:4b0e:b273:8e0e".parse().unwrap(); + r.extend_from_slice(&ip.octets()); + let got = decode_response(id, &r, "a").unwrap(); + assert_eq!(got, ip); + } + + #[test] + fn decode_rejects_id_mismatch() { + let r = vec![0u8; 12]; + let err = decode_response(0x1234, &r, "x").unwrap_err(); + assert!(err.to_string().contains("id mismatch")); + } + + #[test] + fn decode_rejects_rcode() { + let mut r = vec![0u8; 12]; + r[0] = 0xAA; + r[1] = 0xBB; + r[3] = 3; // NXDOMAIN + let err = decode_response(0xAABB, &r, "x").unwrap_err(); + assert!(err.to_string().contains("rcode 3")); + } + + #[test] + fn decode_rejects_empty_answer_section() { + let mut r = vec![0u8; 12]; + r[0] = 0xAA; + r[1] = 0xBB; + r[4] = 0; + r[5] = 0; // QDCOUNT=0 + r[6] = 0; + r[7] = 0; // ANCOUNT=0 + let err = decode_response(0xAABB, &r, "x").unwrap_err(); + assert!(err.to_string().contains("no AAAA")); + } +} diff --git a/core/archipelago/src/fips/iface.rs b/core/archipelago/src/fips/iface.rs new file mode 100644 index 00000000..f1e29fd4 --- /dev/null +++ b/core/archipelago/src/fips/iface.rs @@ -0,0 +1,111 @@ +//! Detect the `fips0` TUN interface's ULA (fd00::/8) IPv6 address. +//! +//! The `fips` daemon configures the TUN device with an address derived +//! from the node's identity key. We need that address to bind a +//! peer-facing listener that is only reachable from the FIPS overlay — +//! WAN IPv6 addresses never carry ULA prefixes, so binding specifically +//! to the fips0 address keeps the peer surface off the public internet. +//! +//! We read `/proc/net/if_inet6` rather than shelling out to `ip` so +//! this can run under the `archipelago` service user without extra +//! capabilities. +#![allow(dead_code)] + +use std::net::Ipv6Addr; + +/// Interface name the FIPS daemon creates (matches upstream default in +/// `/etc/fips/fips.yaml: tun.name`). +pub const FIPS_IFACE: &str = "fips0"; + +/// Return the first ULA (fd00::/8) address assigned to `fips0`, if any. +/// +/// - `None` if the interface is missing, has no address, or only has +/// link-local addresses. +/// - Link-local (`fe80::/10`) and non-ULA addresses are ignored — we +/// only want the mesh-routable ULA that `.fips` DNS resolves to. +pub fn fips0_ula() -> Option { + addresses_on(FIPS_IFACE) + .into_iter() + .find(|a| is_ula(a)) +} + +/// List every IPv6 address bound to a given interface from +/// `/proc/net/if_inet6`. Returns empty on any parse failure. +pub fn addresses_on(iface: &str) -> Vec { + let contents = match std::fs::read_to_string("/proc/net/if_inet6") { + Ok(s) => s, + Err(_) => return Vec::new(), + }; + contents + .lines() + .filter_map(|line| parse_line(line, iface)) + .collect() +} + +/// `fd00::/8` test — covers the full ULA range. +pub fn is_ula(addr: &Ipv6Addr) -> bool { + (addr.octets()[0] & 0xFE) == 0xFC +} + +fn parse_line(line: &str, iface: &str) -> Option { + // /proc/net/if_inet6 format (whitespace-separated): + // <32 hex chars addr> + // e.g. "fdd8...cd85 6f 80 00 80 fips0" + let mut parts = line.split_whitespace(); + let hex = parts.next()?; + let _idx = parts.next()?; + let _prefix = parts.next()?; + let _scope = parts.next()?; + let _flags = parts.next()?; + let name = parts.next()?; + if name != iface { + return None; + } + if hex.len() != 32 { + return None; + } + let mut octets = [0u8; 16]; + for i in 0..16 { + octets[i] = u8::from_str_radix(&hex[i * 2..i * 2 + 2], 16).ok()?; + } + Some(Ipv6Addr::from(octets)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_line_extracts_address() { + let line = "fdd83d5aabe08c0ee67f75fcf0d4cd85 6f 80 00 80 fips0"; + let addr = parse_line(line, "fips0").unwrap(); + assert_eq!( + addr, + "fdd8:3d5a:abe0:8c0e:e67f:75fc:f0d4:cd85" + .parse::() + .unwrap() + ); + } + + #[test] + fn parse_line_rejects_other_iface() { + let line = "fdd83d5aabe08c0ee67f75fcf0d4cd85 6f 80 00 80 eth0"; + assert!(parse_line(line, "fips0").is_none()); + } + + #[test] + fn parse_line_ignores_malformed() { + assert!(parse_line("garbage", "fips0").is_none()); + assert!(parse_line("shorthex 6f 80 00 80 fips0", "fips0").is_none()); + } + + #[test] + fn ula_classifier_matches_fd_range() { + assert!(is_ula(&"fd00::1".parse().unwrap())); + assert!(is_ula(&"fdff::".parse().unwrap())); + assert!(is_ula(&"fc00::1".parse().unwrap())); + assert!(!is_ula(&"fe80::1".parse().unwrap())); // link-local + assert!(!is_ula(&"2001:db8::1".parse().unwrap())); // global + assert!(!is_ula(&"::1".parse().unwrap())); // loopback + } +} diff --git a/core/archipelago/src/fips/mod.rs b/core/archipelago/src/fips/mod.rs index e853bf9e..d5c5a616 100644 --- a/core/archipelago/src/fips/mod.rs +++ b/core/archipelago/src/fips/mod.rs @@ -26,6 +26,8 @@ #![allow(dead_code)] pub mod config; +pub mod dial; +pub mod iface; pub mod service; pub mod update; diff --git a/core/archipelago/src/main.rs b/core/archipelago/src/main.rs index d7468d20..8ef4bff9 100644 --- a/core/archipelago/src/main.rs +++ b/core/archipelago/src/main.rs @@ -149,6 +149,19 @@ async fn main() -> Result<()> { .parse() .context("Invalid bind address")?; + // If the FIPS daemon has brought up `fips0` with a ULA address, bind a + // second listener there for peer-to-peer traffic. The peer listener + // applies a path whitelist (see server::is_peer_allowed_path) so FIPS + // peers can only reach signed peer endpoints, not internal surfaces. + // No address → no peer listener (fresh install pre-onboarding, fips + // service down, etc.); peers fall through to Tor until next restart. + let peer_addr: Option = fips::iface::fips0_ula().map(|ip| { + SocketAddr::new(std::net::IpAddr::V6(ip), fips::dial::PEER_PORT) + }); + if let Some(pa) = peer_addr { + info!("FIPS peer listener will bind {}", pa); + } + // Spawn background update scheduler let update_data_dir = config.data_dir.clone(); tokio::spawn(async move { @@ -199,7 +212,7 @@ async fn main() -> Result<()> { } }; - server.serve_with_shutdown(addr, shutdown).await?; + server.serve_with_shutdown(addr, peer_addr, shutdown).await?; // Clean shutdown: remove PID marker so next startup doesn't trigger recovery crash_recovery::remove_pid_marker(&config.data_dir).await; diff --git a/core/archipelago/src/server.rs b/core/archipelago/src/server.rs index 6f7f2a43..30f0d489 100644 --- a/core/archipelago/src/server.rs +++ b/core/archipelago/src/server.rs @@ -405,65 +405,154 @@ impl Server { } /// Serve with a graceful shutdown signal. - /// When the shutdown future completes, stop accepting new connections and drain in-flight requests. + /// + /// `main_addr` is the primary listener (historically `127.0.0.1:5678`). + /// `peer_addr` is an optional second listener bound to the `fips0` ULA + /// — when present, connections on that listener are subjected to the + /// peer path whitelist ([`is_peer_allowed_path`]) so FIPS peers can + /// reach only the signed peer-to-peer endpoints, not internal surfaces. + /// + /// When `shutdown` completes, both listeners stop accepting and drain + /// in-flight requests (bounded by `DRAIN_TIMEOUT`). pub async fn serve_with_shutdown( &self, - addr: SocketAddr, + main_addr: SocketAddr, + peer_addr: Option, shutdown: impl std::future::Future, ) -> Result<()> { - let listener = TcpListener::bind(addr).await?; let active_connections = Arc::new(tokio::sync::Semaphore::new(1024)); + let (tx, rx_main) = tokio::sync::watch::channel(false); - tokio::pin!(shutdown); + let main_task = tokio::spawn(accept_loop( + self.api_handler.clone(), + TcpListener::bind(main_addr).await?, + active_connections.clone(), + false, // main listener: no path filter + rx_main, + main_addr, + )); - loop { - tokio::select! { - result = listener.accept() => { - let (stream, peer_addr) = match result { - Ok(conn) => conn, - Err(e) => { - error!("Failed to accept connection: {}", e); - continue; - } - }; + let peer_task = if let Some(addr) = peer_addr { + let listener = match TcpListener::bind(addr).await { + Ok(l) => l, + Err(e) => { + warn!("FIPS peer listener bind to {} failed: {} — peers unreachable over FIPS until restart", addr, e); + let _ = tx.send(true); + main_task.await.ok(); + return Err(e.into()); + } + }; + info!("FIPS peer listener bound to {}", addr); + Some(tokio::spawn(accept_loop( + self.api_handler.clone(), + listener, + active_connections.clone(), + true, // peer listener: apply path filter + tx.subscribe(), + addr, + ))) + } else { + None + }; - let handler = self.api_handler.clone(); - let permit = active_connections.clone().acquire_owned().await; + shutdown.await; + info!("Shutdown signal received, draining connections..."); + let _ = tx.send(true); - tokio::spawn(async move { - let _permit = permit; - let service = service_fn(move |req| { - let handler = handler.clone(); - async move { - handler.handle_request(req).await - .map_err(|e| std::io::Error::other(format!("{}", e))) + // Wait up to 5s for in-flight requests. + let drain_start = std::time::Instant::now(); + let drain_timeout = std::time::Duration::from_secs(5); + while active_connections.available_permits() < 1024 { + if drain_start.elapsed() > drain_timeout { + warn!("Drain timeout reached, forcing shutdown"); + break; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + let _ = main_task.await; + if let Some(t) = peer_task { + let _ = t.await; + } + + info!("Shutdown complete"); + Ok(()) + } +} + +/// Whitelist of HTTP paths reachable via the peer-facing (FIPS) listener. +/// Every entry is an endpoint already protected by cryptographic auth +/// (ed25519 signature verification inside the handler, federation DID +/// headers checked by the content server, or JSON-RPC methods whose +/// handlers verify per-message signatures). +/// +/// Anything not on this list returns 404 on the peer listener. +pub fn is_peer_allowed_path(path: &str) -> bool { + // Exact matches + matches!( + path, + "/health" + | "/rpc/v1" + | "/archipelago/node-message" + | "/archipelago/mesh-typed" + | "/dwn" + | "/transport/inbox" + ) + // Prefix-matched content endpoints (peer file browse + fetch) + || path.starts_with("/content/") +} + +async fn accept_loop( + handler: Arc, + listener: TcpListener, + active_connections: Arc, + peer_only: bool, + mut shutdown_rx: tokio::sync::watch::Receiver, + local_addr: SocketAddr, +) { + loop { + tokio::select! { + result = listener.accept() => { + let (stream, peer_addr) = match result { + Ok(c) => c, + Err(e) => { + error!("{} accept error: {}", local_addr, e); + continue; + } + }; + let handler = handler.clone(); + let permit = active_connections.clone().acquire_owned().await; + tokio::spawn(async move { + let _permit = permit; + let service = service_fn(move |req: hyper::Request| { + let handler = handler.clone(); + async move { + if peer_only && !is_peer_allowed_path(req.uri().path()) { + let resp = hyper::Response::builder() + .status(hyper::StatusCode::NOT_FOUND) + .body(hyper::Body::empty()) + .expect("static response builds"); + return Ok::<_, std::io::Error>(resp); } - }); - - if let Err(e) = Http::new() - .http1_keep_alive(false) - .serve_connection(stream, service) - .with_upgrades() - .await - { - error!("Error serving connection from {}: {}", peer_addr, e); + handler + .handle_request(req) + .await + .map_err(|e| std::io::Error::other(format!("{}", e))) } }); - } - _ = &mut shutdown => { - info!("Shutdown signal received, draining connections..."); - // Wait up to 5 seconds for in-flight requests to complete - let drain_start = std::time::Instant::now(); - let drain_timeout = std::time::Duration::from_secs(5); - while active_connections.available_permits() < 1024 { - if drain_start.elapsed() > drain_timeout { - warn!("Drain timeout reached, forcing shutdown"); - break; - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + if let Err(e) = Http::new() + .http1_keep_alive(false) + .serve_connection(stream, service) + .with_upgrades() + .await + { + error!("Error serving connection from {}: {}", peer_addr, e); } - info!("Shutdown complete"); - return Ok(()); + }); + } + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + return; } } } diff --git a/core/archipelago/src/transport/fips.rs b/core/archipelago/src/transport/fips.rs index 6e4faba9..15a28620 100644 --- a/core/archipelago/src/transport/fips.rs +++ b/core/archipelago/src/transport/fips.rs @@ -2,32 +2,62 @@ //! //! Delegates the actual wire protocol to the `fips` system daemon //! (github.com/jmcorgan/fips), which archipelago supervises via the -//! `archipelago-fips.service` unit. This module is the in-process +//! `archipelago-fips.service` unit (or respects the upstream +//! `fips.service` on legacy nodes). This module is the in-process //! `NodeTransport` adapter: it checks daemon liveness, maps a peer's -//! FIPS npub to a `fd00::/8` IPv6 TUN address, and POSTs the -//! `TransportMessage` payload over it. +//! FIPS npub to a `fd00::/8` IPv6 address via the daemon's local DNS +//! resolver, and POSTs the `TransportMessage` payload over plain HTTP +//! to the peer's `/transport/inbox` endpoint. //! //! Sits at priority 3 between LAN and Tor — preferred over Tor for //! federation and peer traffic but yielding to direct LAN. -//! -//! Currently a stub: `is_available()` returns false until the FIPS -//! daemon integration in `crate::fips` lands and the key at -//! `/data/identity/fips_key` is materialised via onboarding. use super::{NodeTransport, TransportKind, TransportMessage}; -use anyhow::Result; +use anyhow::{Context, Result}; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +/// How long a successful `is_available()` probe is cached — the hot path +/// may poll this per-send, and `systemctl is-active` takes ~50ms. A short +/// TTL keeps the result responsive to daemon flaps without pounding DBus. +const AVAILABILITY_CACHE_TTL: Duration = Duration::from_secs(10); pub struct FipsTransport { identity_dir: PathBuf, + available_cached: AtomicBool, + available_cached_at_ms: AtomicU64, } impl FipsTransport { pub fn new(identity_dir: &Path) -> Self { Self { identity_dir: identity_dir.to_path_buf(), + available_cached: AtomicBool::new(false), + available_cached_at_ms: AtomicU64::new(0), } } + + fn probe_daemon_active() -> bool { + // Cheap blocking probe: spawn `systemctl is-active` synchronously. + // Short-circuit if either the archipelago-managed unit or the + // upstream fips.service is active — legacy/dev nodes run only the + // upstream unit. + for unit in [ + crate::fips::SERVICE_UNIT, + crate::fips::UPSTREAM_SERVICE_UNIT, + ] { + let out = std::process::Command::new("systemctl") + .args(["is-active", unit]) + .output(); + if let Ok(o) = out { + if String::from_utf8_lossy(&o.stdout).trim() == "active" { + return true; + } + } + } + false + } } impl NodeTransport for FipsTransport { @@ -36,21 +66,43 @@ impl NodeTransport for FipsTransport { } fn is_available(&self) -> bool { - // Readiness gate: key must be on disk AND daemon wiring must exist. - // The daemon-liveness check is added alongside `crate::fips` — until - // then we deliberately report unavailable so the router falls through - // to Tor and no traffic is misrouted onto a missing TUN. - let _key_present = crate::identity::fips_key_exists(&self.identity_dir); - false + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + let cached_at = self.available_cached_at_ms.load(Ordering::Relaxed); + if now_ms.saturating_sub(cached_at) < AVAILABILITY_CACHE_TTL.as_millis() as u64 { + return self.available_cached.load(Ordering::Relaxed); + } + let val = Self::probe_daemon_active(); + self.available_cached.store(val, Ordering::Relaxed); + self.available_cached_at_ms.store(now_ms, Ordering::Relaxed); + val } fn send<'a>( &'a self, - _address: &'a str, - _message: &'a TransportMessage, + address: &'a str, + message: &'a TransportMessage, ) -> std::pin::Pin> + Send + 'a>> { Box::pin(async move { - anyhow::bail!("FIPS transport not yet wired; daemon integration pending") + let base = crate::fips::dial::peer_base_url(address) + .await + .with_context(|| format!("resolve {}.fips", address))?; + let url = format!("{}/transport/inbox", base); + let client = crate::fips::dial::client(); + let body = serde_json::to_vec(message).context("serialize TransportMessage")?; + let resp = client + .post(&url) + .header("Content-Type", "application/json") + .body(body) + .send() + .await + .with_context(|| format!("POST {}", url))?; + if !resp.status().is_success() { + anyhow::bail!("peer FIPS inbox returned {}", resp.status()); + } + Ok(()) }) } } @@ -66,10 +118,12 @@ mod tests { } #[test] - fn test_reports_unavailable_pre_wiring() { - let dir = tempfile::tempdir().unwrap(); - let t = FipsTransport::new(dir.path()); - // Stub: always unavailable until daemon integration lands. - assert!(!t.is_available()); + fn is_available_caches_negative_result() { + // No fips.service in the test env → probe returns false. + // Two rapid calls must both be false without relying on a live daemon. + let t = FipsTransport::new(std::path::Path::new("/tmp")); + let a = t.is_available(); + let b = t.is_available(); + assert_eq!(a, b); } }