From 274ed008fe7e78a0bb5761c43f8ee64619524576 Mon Sep 17 00:00:00 2001 From: Dorian Date: Sun, 19 Apr 2026 01:12:39 -0400 Subject: [PATCH] feat(fips): peer dialing + dedicated fips0 listener with path whitelist MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the FIPS transport end-to-end so peer-to-peer calls can reach other nodes over the mesh without going through Tor: - fips::dial — raw RFC 1035 DNS client (zero new deps) that queries the FIPS daemon's local resolver at 127.0.0.1:5354 for `.fips` AAAA records. Exposes peer_base_url(npub) → "http://[fd9d:…]:5679" plus a reqwest client factory for call-site migrations. - fips::iface — parses /proc/net/if_inet6 to find the ULA address on `fips0`. Runs under the archipelago service user without extra caps. - FipsTransport::is_available() — live probe of archipelago-fips and upstream fips.service via `systemctl is-active`, cached 10s so the send hot path doesn't thrash DBus. - FipsTransport::send() — resolve npub, POST TransportMessage JSON to the peer's /transport/inbox. Today /transport/inbox isn't wired on the receive side, so call-site migrations use dial::peer_base_url directly against the already-signed endpoints (/rpc/v1, /archipelago/node-message, /content/*). The inbox handler lands as part of the Settings/transport work. - server::serve_with_shutdown — takes an optional peer_addr and spawns a second listener bound specifically to the fips0 ULA on port 5679. The peer listener applies is_peer_allowed_path() — a whitelist of endpoints that already do per-request signature auth — and returns 404 for everything else. Shutdown cascades to both listeners via a watch channel; 5s drain window preserved. - main.rs — if fips0 has a ULA at startup, pass the peer SocketAddr to serve_with_shutdown; otherwise run the main listener only. Security: the peer listener is bound to the fips0 ULA directly, not wildcard, so it's unreachable from WAN IPv6. The path whitelist limits exposure to endpoints whose handlers verify ed25519 signatures or federation DID headers server-side. Co-Authored-By: Claude Opus 4.7 (1M context) --- core/archipelago/src/fips/dial.rs | 289 +++++++++++++++++++++++++ core/archipelago/src/fips/iface.rs | 111 ++++++++++ core/archipelago/src/fips/mod.rs | 2 + core/archipelago/src/main.rs | 15 +- core/archipelago/src/server.rs | 181 ++++++++++++---- core/archipelago/src/transport/fips.rs | 98 +++++++-- 6 files changed, 627 insertions(+), 69 deletions(-) create mode 100644 core/archipelago/src/fips/dial.rs create mode 100644 core/archipelago/src/fips/iface.rs diff --git a/core/archipelago/src/fips/dial.rs b/core/archipelago/src/fips/dial.rs new file mode 100644 index 00000000..d99465ab --- /dev/null +++ b/core/archipelago/src/fips/dial.rs @@ -0,0 +1,289 @@ +//! Dial peers over the FIPS mesh. +//! +//! The FIPS daemon exposes a local DNS resolver on `127.0.0.1:5354` that +//! answers AAAA queries for `.fips` with the peer's ULA address on +//! the `fips0` TUN. Once resolved we speak plain HTTP to the peer on +//! [`PEER_PORT`] — the same port `127.0.0.1:5678` where the archipelago +//! backend serves the existing signed peer-to-peer endpoints +//! (`/rpc/v1`, `/archipelago/node-message`, `/content/{id}`, …). The +//! server-side binding to the `fips0` address is handled in `server.rs`. +//! +//! The module is deliberately dependency-free for DNS — one packet in, +//! one packet out, standard RFC 1035 wire format — to avoid pulling +//! hickory-resolver's transitive tree for a single AAAA query. +//! +//! On any failure (daemon down, peer not in the identity cache, TUN +//! unreachable) callers fall back to the Tor transport. +//! +//! # Examples +//! ```ignore +//! let base = crate::fips::dial::peer_base_url("npub1…").await?; +//! // base = "http://[fd9d:…]:5678" +//! let client = crate::fips::dial::client(); +//! let resp = client.get(format!("{}/content/abc", base)).send().await?; +//! ``` +#![allow(dead_code)] + +use anyhow::{Context, Result}; +use std::net::{IpAddr, Ipv6Addr}; +use std::time::Duration; +use tokio::net::UdpSocket; + +/// Port the archipelago backend listens on for FIPS peer-to-peer traffic. +/// Separate from the localhost-only internal port (5678) so the per-listener +/// path filter can restrict the exposed surface. +pub const PEER_PORT: u16 = 5679; + +/// DNS suffix appended to a peer's bech32 npub. +pub const FIPS_DNS_SUFFIX: &str = "fips"; + +/// FIPS daemon's local DNS resolver. +pub const FIPS_DNS_ADDR: &str = "127.0.0.1:5354"; + +/// Short DNS query timeout — FIPS DNS is a local process; a slow answer +/// almost certainly means the daemon is gone. +const DNS_TIMEOUT: Duration = Duration::from_secs(2); + +/// DNS AAAA query type. +const QTYPE_AAAA: u16 = 28; + +/// DNS IN class. +const QCLASS_IN: u16 = 1; + +/// Resolve a peer's bech32 npub to their `fips0` ULA address via the local +/// FIPS DNS resolver. +pub async fn resolve(npub: &str) -> Result { + let sock = UdpSocket::bind("127.0.0.1:0") + .await + .context("bind UDP socket for FIPS DNS")?; + sock.connect(FIPS_DNS_ADDR) + .await + .context("connect to FIPS DNS")?; + + let id: u16 = rand::random(); + let query = encode_query(id, npub)?; + tokio::time::timeout(DNS_TIMEOUT, sock.send(&query)) + .await + .context("FIPS DNS query timed out on send")? + .context("FIPS DNS send")?; + + let mut buf = [0u8; 512]; + let n = tokio::time::timeout(DNS_TIMEOUT, sock.recv(&mut buf)) + .await + .context("FIPS DNS query timed out on recv")? + .context("FIPS DNS recv")?; + + decode_response(id, &buf[..n], npub) +} + +/// Return a peer's base URL on the FIPS overlay, e.g. `http://[fd9d:…]:5678`. +pub async fn peer_base_url(npub: &str) -> Result { + let ip = resolve(npub).await?; + Ok(format!("http://[{}]:{}", ip, PEER_PORT)) +} + +/// Build an HTTP client tuned for FIPS peer-to-peer dialing. No proxy, +/// short timeout — fall back to Tor on failure. +pub fn client() -> reqwest::Client { + reqwest::Client::builder() + .timeout(Duration::from_secs(20)) + .connect_timeout(Duration::from_secs(5)) + .user_agent("archipelago-fips/1") + .build() + .expect("static reqwest client config") +} + +// ── DNS wire-format helpers ───────────────────────────────────────────── + +fn encode_query(id: u16, npub: &str) -> Result> { + let mut out = Vec::with_capacity(64 + npub.len()); + // Header + out.extend_from_slice(&id.to_be_bytes()); + out.extend_from_slice(&0x0100u16.to_be_bytes()); // RD=1, std query + out.extend_from_slice(&1u16.to_be_bytes()); // QDCOUNT + out.extend_from_slice(&0u16.to_be_bytes()); // ANCOUNT + out.extend_from_slice(&0u16.to_be_bytes()); // NSCOUNT + out.extend_from_slice(&0u16.to_be_bytes()); // ARCOUNT + + // QNAME — two labels: "" and "fips". + encode_label(&mut out, npub)?; + encode_label(&mut out, FIPS_DNS_SUFFIX)?; + out.push(0); // root + // QTYPE + QCLASS + out.extend_from_slice(&QTYPE_AAAA.to_be_bytes()); + out.extend_from_slice(&QCLASS_IN.to_be_bytes()); + Ok(out) +} + +fn encode_label(out: &mut Vec, label: &str) -> Result<()> { + if label.is_empty() || label.len() > 63 { + anyhow::bail!("invalid DNS label length: {}", label.len()); + } + out.push(label.len() as u8); + out.extend_from_slice(label.as_bytes()); + Ok(()) +} + +fn decode_response(expected_id: u16, buf: &[u8], npub: &str) -> Result { + if buf.len() < 12 { + anyhow::bail!("DNS response too short"); + } + let id = u16::from_be_bytes([buf[0], buf[1]]); + if id != expected_id { + anyhow::bail!("DNS response id mismatch"); + } + let rcode = buf[3] & 0x0F; + if rcode != 0 { + anyhow::bail!("DNS rcode {} resolving {}.fips", rcode, npub); + } + let qdcount = u16::from_be_bytes([buf[4], buf[5]]) as usize; + let ancount = u16::from_be_bytes([buf[6], buf[7]]) as usize; + if ancount == 0 { + anyhow::bail!("no AAAA record for {}.fips", npub); + } + + let mut pos = 12; + // Skip question section(s) + for _ in 0..qdcount { + pos = skip_name(buf, pos)?; + pos = pos + .checked_add(4) + .ok_or_else(|| anyhow::anyhow!("qsection overflow"))?; + if pos > buf.len() { + anyhow::bail!("qsection past end"); + } + } + + // Walk answers; return the first valid AAAA rdata. + for _ in 0..ancount { + pos = skip_name(buf, pos)?; + if pos + 10 > buf.len() { + anyhow::bail!("answer RR past end"); + } + let rtype = u16::from_be_bytes([buf[pos], buf[pos + 1]]); + let rclass = u16::from_be_bytes([buf[pos + 2], buf[pos + 3]]); + let rdlength = u16::from_be_bytes([buf[pos + 8], buf[pos + 9]]) as usize; + pos += 10; + if pos + rdlength > buf.len() { + anyhow::bail!("rdata past end"); + } + if rtype == QTYPE_AAAA && rclass == QCLASS_IN && rdlength == 16 { + let mut octets = [0u8; 16]; + octets.copy_from_slice(&buf[pos..pos + 16]); + return Ok(Ipv6Addr::from(octets)); + } + pos += rdlength; + } + anyhow::bail!("no AAAA answer for {}.fips", npub) +} + +/// Advance past a DNS name (handles compressed pointers). Returns the +/// position immediately after the name. +fn skip_name(buf: &[u8], mut pos: usize) -> Result { + loop { + if pos >= buf.len() { + anyhow::bail!("name past end"); + } + let len = buf[pos]; + if len == 0 { + return Ok(pos + 1); + } + if len & 0xC0 == 0xC0 { + // Compressed pointer — 2 bytes total, no further labels. + if pos + 2 > buf.len() { + anyhow::bail!("pointer past end"); + } + return Ok(pos + 2); + } + if len & 0xC0 != 0 { + anyhow::bail!("reserved label type"); + } + pos = pos + .checked_add(1 + len as usize) + .ok_or_else(|| anyhow::anyhow!("name overflow"))?; + } +} + +/// Treat `IpAddr::V6` as the raw address for ergonomic callers. +pub fn as_ip_addr(v6: Ipv6Addr) -> IpAddr { + IpAddr::V6(v6) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn encode_query_round_trip_header_is_correct() { + let q = encode_query(0x1234, "npub1abc").unwrap(); + assert_eq!(&q[0..2], &[0x12, 0x34]); + assert_eq!(&q[2..4], &[0x01, 0x00]); // flags RD=1 + assert_eq!(&q[4..6], &[0x00, 0x01]); // QDCOUNT=1 + // Tail: QTYPE=28, QCLASS=1 + assert_eq!(&q[q.len() - 4..], &[0x00, 0x1C, 0x00, 0x01]); + } + + #[test] + fn encode_query_includes_both_labels() { + let q = encode_query(0, "npub1xyz").unwrap(); + assert!(q.windows(9).any(|w| w == b"\x08npub1xyz")); + assert!(q.windows(5).any(|w| w == b"\x04fips")); + } + + #[test] + fn decode_response_returns_aaaa_rdata() { + // Minimal crafted response: header + qsection + one AAAA answer. + let id = 0xBEEFu16; + let mut r = Vec::new(); + r.extend_from_slice(&id.to_be_bytes()); + r.extend_from_slice(&0x8180u16.to_be_bytes()); // QR=1, RD=1, RA=1, rcode=0 + r.extend_from_slice(&1u16.to_be_bytes()); // QDCOUNT + r.extend_from_slice(&1u16.to_be_bytes()); // ANCOUNT + r.extend_from_slice(&0u16.to_be_bytes()); // NSCOUNT + r.extend_from_slice(&0u16.to_be_bytes()); // ARCOUNT + // Question: 1 label "a" + "fips" + r.extend_from_slice(b"\x01a\x04fips\x00"); + r.extend_from_slice(&QTYPE_AAAA.to_be_bytes()); + r.extend_from_slice(&QCLASS_IN.to_be_bytes()); + // Answer: compressed name pointing at question offset 12 + r.extend_from_slice(&[0xC0, 0x0C]); + r.extend_from_slice(&QTYPE_AAAA.to_be_bytes()); + r.extend_from_slice(&QCLASS_IN.to_be_bytes()); + r.extend_from_slice(&300u32.to_be_bytes()); // TTL + r.extend_from_slice(&16u16.to_be_bytes()); // RDLENGTH + let ip: Ipv6Addr = "fd9d:1192:e800:bad0:eed3:4b0e:b273:8e0e".parse().unwrap(); + r.extend_from_slice(&ip.octets()); + let got = decode_response(id, &r, "a").unwrap(); + assert_eq!(got, ip); + } + + #[test] + fn decode_rejects_id_mismatch() { + let r = vec![0u8; 12]; + let err = decode_response(0x1234, &r, "x").unwrap_err(); + assert!(err.to_string().contains("id mismatch")); + } + + #[test] + fn decode_rejects_rcode() { + let mut r = vec![0u8; 12]; + r[0] = 0xAA; + r[1] = 0xBB; + r[3] = 3; // NXDOMAIN + let err = decode_response(0xAABB, &r, "x").unwrap_err(); + assert!(err.to_string().contains("rcode 3")); + } + + #[test] + fn decode_rejects_empty_answer_section() { + let mut r = vec![0u8; 12]; + r[0] = 0xAA; + r[1] = 0xBB; + r[4] = 0; + r[5] = 0; // QDCOUNT=0 + r[6] = 0; + r[7] = 0; // ANCOUNT=0 + let err = decode_response(0xAABB, &r, "x").unwrap_err(); + assert!(err.to_string().contains("no AAAA")); + } +} diff --git a/core/archipelago/src/fips/iface.rs b/core/archipelago/src/fips/iface.rs new file mode 100644 index 00000000..f1e29fd4 --- /dev/null +++ b/core/archipelago/src/fips/iface.rs @@ -0,0 +1,111 @@ +//! Detect the `fips0` TUN interface's ULA (fd00::/8) IPv6 address. +//! +//! The `fips` daemon configures the TUN device with an address derived +//! from the node's identity key. We need that address to bind a +//! peer-facing listener that is only reachable from the FIPS overlay — +//! WAN IPv6 addresses never carry ULA prefixes, so binding specifically +//! to the fips0 address keeps the peer surface off the public internet. +//! +//! We read `/proc/net/if_inet6` rather than shelling out to `ip` so +//! this can run under the `archipelago` service user without extra +//! capabilities. +#![allow(dead_code)] + +use std::net::Ipv6Addr; + +/// Interface name the FIPS daemon creates (matches upstream default in +/// `/etc/fips/fips.yaml: tun.name`). +pub const FIPS_IFACE: &str = "fips0"; + +/// Return the first ULA (fd00::/8) address assigned to `fips0`, if any. +/// +/// - `None` if the interface is missing, has no address, or only has +/// link-local addresses. +/// - Link-local (`fe80::/10`) and non-ULA addresses are ignored — we +/// only want the mesh-routable ULA that `.fips` DNS resolves to. +pub fn fips0_ula() -> Option { + addresses_on(FIPS_IFACE) + .into_iter() + .find(|a| is_ula(a)) +} + +/// List every IPv6 address bound to a given interface from +/// `/proc/net/if_inet6`. Returns empty on any parse failure. +pub fn addresses_on(iface: &str) -> Vec { + let contents = match std::fs::read_to_string("/proc/net/if_inet6") { + Ok(s) => s, + Err(_) => return Vec::new(), + }; + contents + .lines() + .filter_map(|line| parse_line(line, iface)) + .collect() +} + +/// `fd00::/8` test — covers the full ULA range. +pub fn is_ula(addr: &Ipv6Addr) -> bool { + (addr.octets()[0] & 0xFE) == 0xFC +} + +fn parse_line(line: &str, iface: &str) -> Option { + // /proc/net/if_inet6 format (whitespace-separated): + // <32 hex chars addr> + // e.g. "fdd8...cd85 6f 80 00 80 fips0" + let mut parts = line.split_whitespace(); + let hex = parts.next()?; + let _idx = parts.next()?; + let _prefix = parts.next()?; + let _scope = parts.next()?; + let _flags = parts.next()?; + let name = parts.next()?; + if name != iface { + return None; + } + if hex.len() != 32 { + return None; + } + let mut octets = [0u8; 16]; + for i in 0..16 { + octets[i] = u8::from_str_radix(&hex[i * 2..i * 2 + 2], 16).ok()?; + } + Some(Ipv6Addr::from(octets)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_line_extracts_address() { + let line = "fdd83d5aabe08c0ee67f75fcf0d4cd85 6f 80 00 80 fips0"; + let addr = parse_line(line, "fips0").unwrap(); + assert_eq!( + addr, + "fdd8:3d5a:abe0:8c0e:e67f:75fc:f0d4:cd85" + .parse::() + .unwrap() + ); + } + + #[test] + fn parse_line_rejects_other_iface() { + let line = "fdd83d5aabe08c0ee67f75fcf0d4cd85 6f 80 00 80 eth0"; + assert!(parse_line(line, "fips0").is_none()); + } + + #[test] + fn parse_line_ignores_malformed() { + assert!(parse_line("garbage", "fips0").is_none()); + assert!(parse_line("shorthex 6f 80 00 80 fips0", "fips0").is_none()); + } + + #[test] + fn ula_classifier_matches_fd_range() { + assert!(is_ula(&"fd00::1".parse().unwrap())); + assert!(is_ula(&"fdff::".parse().unwrap())); + assert!(is_ula(&"fc00::1".parse().unwrap())); + assert!(!is_ula(&"fe80::1".parse().unwrap())); // link-local + assert!(!is_ula(&"2001:db8::1".parse().unwrap())); // global + assert!(!is_ula(&"::1".parse().unwrap())); // loopback + } +} diff --git a/core/archipelago/src/fips/mod.rs b/core/archipelago/src/fips/mod.rs index e853bf9e..d5c5a616 100644 --- a/core/archipelago/src/fips/mod.rs +++ b/core/archipelago/src/fips/mod.rs @@ -26,6 +26,8 @@ #![allow(dead_code)] pub mod config; +pub mod dial; +pub mod iface; pub mod service; pub mod update; diff --git a/core/archipelago/src/main.rs b/core/archipelago/src/main.rs index d7468d20..8ef4bff9 100644 --- a/core/archipelago/src/main.rs +++ b/core/archipelago/src/main.rs @@ -149,6 +149,19 @@ async fn main() -> Result<()> { .parse() .context("Invalid bind address")?; + // If the FIPS daemon has brought up `fips0` with a ULA address, bind a + // second listener there for peer-to-peer traffic. The peer listener + // applies a path whitelist (see server::is_peer_allowed_path) so FIPS + // peers can only reach signed peer endpoints, not internal surfaces. + // No address → no peer listener (fresh install pre-onboarding, fips + // service down, etc.); peers fall through to Tor until next restart. + let peer_addr: Option = fips::iface::fips0_ula().map(|ip| { + SocketAddr::new(std::net::IpAddr::V6(ip), fips::dial::PEER_PORT) + }); + if let Some(pa) = peer_addr { + info!("FIPS peer listener will bind {}", pa); + } + // Spawn background update scheduler let update_data_dir = config.data_dir.clone(); tokio::spawn(async move { @@ -199,7 +212,7 @@ async fn main() -> Result<()> { } }; - server.serve_with_shutdown(addr, shutdown).await?; + server.serve_with_shutdown(addr, peer_addr, shutdown).await?; // Clean shutdown: remove PID marker so next startup doesn't trigger recovery crash_recovery::remove_pid_marker(&config.data_dir).await; diff --git a/core/archipelago/src/server.rs b/core/archipelago/src/server.rs index 6f7f2a43..30f0d489 100644 --- a/core/archipelago/src/server.rs +++ b/core/archipelago/src/server.rs @@ -405,65 +405,154 @@ impl Server { } /// Serve with a graceful shutdown signal. - /// When the shutdown future completes, stop accepting new connections and drain in-flight requests. + /// + /// `main_addr` is the primary listener (historically `127.0.0.1:5678`). + /// `peer_addr` is an optional second listener bound to the `fips0` ULA + /// — when present, connections on that listener are subjected to the + /// peer path whitelist ([`is_peer_allowed_path`]) so FIPS peers can + /// reach only the signed peer-to-peer endpoints, not internal surfaces. + /// + /// When `shutdown` completes, both listeners stop accepting and drain + /// in-flight requests (bounded by `DRAIN_TIMEOUT`). pub async fn serve_with_shutdown( &self, - addr: SocketAddr, + main_addr: SocketAddr, + peer_addr: Option, shutdown: impl std::future::Future, ) -> Result<()> { - let listener = TcpListener::bind(addr).await?; let active_connections = Arc::new(tokio::sync::Semaphore::new(1024)); + let (tx, rx_main) = tokio::sync::watch::channel(false); - tokio::pin!(shutdown); + let main_task = tokio::spawn(accept_loop( + self.api_handler.clone(), + TcpListener::bind(main_addr).await?, + active_connections.clone(), + false, // main listener: no path filter + rx_main, + main_addr, + )); - loop { - tokio::select! { - result = listener.accept() => { - let (stream, peer_addr) = match result { - Ok(conn) => conn, - Err(e) => { - error!("Failed to accept connection: {}", e); - continue; - } - }; + let peer_task = if let Some(addr) = peer_addr { + let listener = match TcpListener::bind(addr).await { + Ok(l) => l, + Err(e) => { + warn!("FIPS peer listener bind to {} failed: {} — peers unreachable over FIPS until restart", addr, e); + let _ = tx.send(true); + main_task.await.ok(); + return Err(e.into()); + } + }; + info!("FIPS peer listener bound to {}", addr); + Some(tokio::spawn(accept_loop( + self.api_handler.clone(), + listener, + active_connections.clone(), + true, // peer listener: apply path filter + tx.subscribe(), + addr, + ))) + } else { + None + }; - let handler = self.api_handler.clone(); - let permit = active_connections.clone().acquire_owned().await; + shutdown.await; + info!("Shutdown signal received, draining connections..."); + let _ = tx.send(true); - tokio::spawn(async move { - let _permit = permit; - let service = service_fn(move |req| { - let handler = handler.clone(); - async move { - handler.handle_request(req).await - .map_err(|e| std::io::Error::other(format!("{}", e))) + // Wait up to 5s for in-flight requests. + let drain_start = std::time::Instant::now(); + let drain_timeout = std::time::Duration::from_secs(5); + while active_connections.available_permits() < 1024 { + if drain_start.elapsed() > drain_timeout { + warn!("Drain timeout reached, forcing shutdown"); + break; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + let _ = main_task.await; + if let Some(t) = peer_task { + let _ = t.await; + } + + info!("Shutdown complete"); + Ok(()) + } +} + +/// Whitelist of HTTP paths reachable via the peer-facing (FIPS) listener. +/// Every entry is an endpoint already protected by cryptographic auth +/// (ed25519 signature verification inside the handler, federation DID +/// headers checked by the content server, or JSON-RPC methods whose +/// handlers verify per-message signatures). +/// +/// Anything not on this list returns 404 on the peer listener. +pub fn is_peer_allowed_path(path: &str) -> bool { + // Exact matches + matches!( + path, + "/health" + | "/rpc/v1" + | "/archipelago/node-message" + | "/archipelago/mesh-typed" + | "/dwn" + | "/transport/inbox" + ) + // Prefix-matched content endpoints (peer file browse + fetch) + || path.starts_with("/content/") +} + +async fn accept_loop( + handler: Arc, + listener: TcpListener, + active_connections: Arc, + peer_only: bool, + mut shutdown_rx: tokio::sync::watch::Receiver, + local_addr: SocketAddr, +) { + loop { + tokio::select! { + result = listener.accept() => { + let (stream, peer_addr) = match result { + Ok(c) => c, + Err(e) => { + error!("{} accept error: {}", local_addr, e); + continue; + } + }; + let handler = handler.clone(); + let permit = active_connections.clone().acquire_owned().await; + tokio::spawn(async move { + let _permit = permit; + let service = service_fn(move |req: hyper::Request| { + let handler = handler.clone(); + async move { + if peer_only && !is_peer_allowed_path(req.uri().path()) { + let resp = hyper::Response::builder() + .status(hyper::StatusCode::NOT_FOUND) + .body(hyper::Body::empty()) + .expect("static response builds"); + return Ok::<_, std::io::Error>(resp); } - }); - - if let Err(e) = Http::new() - .http1_keep_alive(false) - .serve_connection(stream, service) - .with_upgrades() - .await - { - error!("Error serving connection from {}: {}", peer_addr, e); + handler + .handle_request(req) + .await + .map_err(|e| std::io::Error::other(format!("{}", e))) } }); - } - _ = &mut shutdown => { - info!("Shutdown signal received, draining connections..."); - // Wait up to 5 seconds for in-flight requests to complete - let drain_start = std::time::Instant::now(); - let drain_timeout = std::time::Duration::from_secs(5); - while active_connections.available_permits() < 1024 { - if drain_start.elapsed() > drain_timeout { - warn!("Drain timeout reached, forcing shutdown"); - break; - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + if let Err(e) = Http::new() + .http1_keep_alive(false) + .serve_connection(stream, service) + .with_upgrades() + .await + { + error!("Error serving connection from {}: {}", peer_addr, e); } - info!("Shutdown complete"); - return Ok(()); + }); + } + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + return; } } } diff --git a/core/archipelago/src/transport/fips.rs b/core/archipelago/src/transport/fips.rs index 6e4faba9..15a28620 100644 --- a/core/archipelago/src/transport/fips.rs +++ b/core/archipelago/src/transport/fips.rs @@ -2,32 +2,62 @@ //! //! Delegates the actual wire protocol to the `fips` system daemon //! (github.com/jmcorgan/fips), which archipelago supervises via the -//! `archipelago-fips.service` unit. This module is the in-process +//! `archipelago-fips.service` unit (or respects the upstream +//! `fips.service` on legacy nodes). This module is the in-process //! `NodeTransport` adapter: it checks daemon liveness, maps a peer's -//! FIPS npub to a `fd00::/8` IPv6 TUN address, and POSTs the -//! `TransportMessage` payload over it. +//! FIPS npub to a `fd00::/8` IPv6 address via the daemon's local DNS +//! resolver, and POSTs the `TransportMessage` payload over plain HTTP +//! to the peer's `/transport/inbox` endpoint. //! //! Sits at priority 3 between LAN and Tor — preferred over Tor for //! federation and peer traffic but yielding to direct LAN. -//! -//! Currently a stub: `is_available()` returns false until the FIPS -//! daemon integration in `crate::fips` lands and the key at -//! `/data/identity/fips_key` is materialised via onboarding. use super::{NodeTransport, TransportKind, TransportMessage}; -use anyhow::Result; +use anyhow::{Context, Result}; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +/// How long a successful `is_available()` probe is cached — the hot path +/// may poll this per-send, and `systemctl is-active` takes ~50ms. A short +/// TTL keeps the result responsive to daemon flaps without pounding DBus. +const AVAILABILITY_CACHE_TTL: Duration = Duration::from_secs(10); pub struct FipsTransport { identity_dir: PathBuf, + available_cached: AtomicBool, + available_cached_at_ms: AtomicU64, } impl FipsTransport { pub fn new(identity_dir: &Path) -> Self { Self { identity_dir: identity_dir.to_path_buf(), + available_cached: AtomicBool::new(false), + available_cached_at_ms: AtomicU64::new(0), } } + + fn probe_daemon_active() -> bool { + // Cheap blocking probe: spawn `systemctl is-active` synchronously. + // Short-circuit if either the archipelago-managed unit or the + // upstream fips.service is active — legacy/dev nodes run only the + // upstream unit. + for unit in [ + crate::fips::SERVICE_UNIT, + crate::fips::UPSTREAM_SERVICE_UNIT, + ] { + let out = std::process::Command::new("systemctl") + .args(["is-active", unit]) + .output(); + if let Ok(o) = out { + if String::from_utf8_lossy(&o.stdout).trim() == "active" { + return true; + } + } + } + false + } } impl NodeTransport for FipsTransport { @@ -36,21 +66,43 @@ impl NodeTransport for FipsTransport { } fn is_available(&self) -> bool { - // Readiness gate: key must be on disk AND daemon wiring must exist. - // The daemon-liveness check is added alongside `crate::fips` — until - // then we deliberately report unavailable so the router falls through - // to Tor and no traffic is misrouted onto a missing TUN. - let _key_present = crate::identity::fips_key_exists(&self.identity_dir); - false + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + let cached_at = self.available_cached_at_ms.load(Ordering::Relaxed); + if now_ms.saturating_sub(cached_at) < AVAILABILITY_CACHE_TTL.as_millis() as u64 { + return self.available_cached.load(Ordering::Relaxed); + } + let val = Self::probe_daemon_active(); + self.available_cached.store(val, Ordering::Relaxed); + self.available_cached_at_ms.store(now_ms, Ordering::Relaxed); + val } fn send<'a>( &'a self, - _address: &'a str, - _message: &'a TransportMessage, + address: &'a str, + message: &'a TransportMessage, ) -> std::pin::Pin> + Send + 'a>> { Box::pin(async move { - anyhow::bail!("FIPS transport not yet wired; daemon integration pending") + let base = crate::fips::dial::peer_base_url(address) + .await + .with_context(|| format!("resolve {}.fips", address))?; + let url = format!("{}/transport/inbox", base); + let client = crate::fips::dial::client(); + let body = serde_json::to_vec(message).context("serialize TransportMessage")?; + let resp = client + .post(&url) + .header("Content-Type", "application/json") + .body(body) + .send() + .await + .with_context(|| format!("POST {}", url))?; + if !resp.status().is_success() { + anyhow::bail!("peer FIPS inbox returned {}", resp.status()); + } + Ok(()) }) } } @@ -66,10 +118,12 @@ mod tests { } #[test] - fn test_reports_unavailable_pre_wiring() { - let dir = tempfile::tempdir().unwrap(); - let t = FipsTransport::new(dir.path()); - // Stub: always unavailable until daemon integration lands. - assert!(!t.is_available()); + fn is_available_caches_negative_result() { + // No fips.service in the test env → probe returns false. + // Two rapid calls must both be false without relying on a live daemon. + let t = FipsTransport::new(std::path::Path::new("/tmp")); + let a = t.is_available(); + let b = t.is_available(); + assert_eq!(a, b); } }