backup commit

This commit is contained in:
Dorian
2026-03-17 00:03:08 +00:00
parent f23be63bba
commit 253c305cc8
43 changed files with 9514 additions and 308 deletions

View File

@@ -0,0 +1,408 @@
//! Chunked message protocol with Reed-Solomon FEC for LoRa transport.
//!
//! Splits payloads larger than a single LoRa frame (160 bytes) into
//! numbered chunks with forward error correction, enabling reliable
//! transfer over lossy radio links.
//!
//! Chunk wire format (8 bytes header + payload):
//! ```text
//! [0x01: type] [msg_id: u32 LE] [chunk_idx: u8] [total: u8] [is_parity: u8] [payload...]
//! ```
use anyhow::{Context, Result};
use reed_solomon_erasure::galois_8::ReedSolomon;
use std::collections::HashMap;
use std::time::Instant;
/// Header size for each chunk frame.
const CHUNK_HEADER_SIZE: usize = 8;
/// Maximum payload per chunk after header.
/// 132 bytes available after ChaCha20-Poly1305 encryption overhead (12 nonce + 16 tag),
/// minus 8 byte chunk header = 124 bytes of user data per chunk.
pub const MAX_CHUNK_PAYLOAD: usize = 124;
/// Chunk type marker in the wire format.
const CHUNK_TYPE_MARKER: u8 = 0x01;
/// FEC redundancy ratio: 25% parity shards.
const FEC_RATIO_DENOMINATOR: usize = 4;
/// Maximum age of pending reassembly entries before garbage collection.
const REASSEMBLY_TIMEOUT_SECS: u64 = 60;
/// Maximum practical chunks for LoRa (airtime budget).
pub const MAX_PRACTICAL_CHUNKS: usize = 20;
/// A single chunk ready for transmission.
#[derive(Debug, Clone)]
pub struct Chunk {
pub message_id: u32,
pub chunk_index: u8,
pub total_chunks: u8,
pub is_parity: bool,
pub payload: Vec<u8>,
}
impl Chunk {
/// Serialize chunk to wire format.
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(CHUNK_HEADER_SIZE + self.payload.len());
buf.push(CHUNK_TYPE_MARKER);
buf.extend_from_slice(&self.message_id.to_le_bytes());
buf.push(self.chunk_index);
buf.push(self.total_chunks);
buf.push(if self.is_parity { 1 } else { 0 });
buf.extend_from_slice(&self.payload);
buf
}
/// Parse chunk from wire format.
pub fn from_bytes(data: &[u8]) -> Result<Self> {
if data.len() < CHUNK_HEADER_SIZE {
anyhow::bail!("Chunk too small: {} bytes", data.len());
}
if data[0] != CHUNK_TYPE_MARKER {
anyhow::bail!("Not a chunked message (marker: 0x{:02x})", data[0]);
}
let message_id = u32::from_le_bytes([data[1], data[2], data[3], data[4]]);
let chunk_index = data[5];
let total_chunks = data[6];
let is_parity = data[7] != 0;
let payload = data[CHUNK_HEADER_SIZE..].to_vec();
Ok(Self {
message_id,
chunk_index,
total_chunks,
is_parity,
payload,
})
}
/// Check if a raw byte slice starts with the chunk type marker.
pub fn is_chunked_message(data: &[u8]) -> bool {
!data.is_empty() && data[0] == CHUNK_TYPE_MARKER
}
}
/// Encode a payload into chunks with Reed-Solomon FEC parity.
///
/// Returns a vector of chunks ready for sequential transmission.
/// Each chunk's payload is exactly `shard_size` bytes (padded if needed).
pub fn encode_chunked(data: &[u8]) -> Result<Vec<Chunk>> {
if data.is_empty() {
anyhow::bail!("Cannot chunk empty data");
}
let shard_size = MAX_CHUNK_PAYLOAD;
let data_shard_count = (data.len() + shard_size - 1) / shard_size;
if data_shard_count > MAX_PRACTICAL_CHUNKS {
anyhow::bail!(
"Payload too large for LoRa chunking: {} bytes ({} chunks, max {})",
data.len(),
data_shard_count,
MAX_PRACTICAL_CHUNKS
);
}
let parity_shard_count = (data_shard_count + FEC_RATIO_DENOMINATOR - 1) / FEC_RATIO_DENOMINATOR;
let total_shards = data_shard_count + parity_shard_count;
if total_shards > 255 {
anyhow::bail!("Too many shards: {}", total_shards);
}
// Split data into equal-size shards
let mut shards: Vec<Vec<u8>> = Vec::with_capacity(total_shards);
for i in 0..data_shard_count {
let start = i * shard_size;
let end = (start + shard_size).min(data.len());
let mut shard = vec![0u8; shard_size];
shard[..end - start].copy_from_slice(&data[start..end]);
shards.push(shard);
}
// Add empty parity shards
for _ in 0..parity_shard_count {
shards.push(vec![0u8; shard_size]);
}
// Generate parity
let rs = ReedSolomon::new(data_shard_count, parity_shard_count)
.context("Failed to create Reed-Solomon codec")?;
rs.encode(&mut shards)
.context("Reed-Solomon encoding failed")?;
// Build chunk frames
let message_id: u32 = rand::random();
let total = total_shards as u8;
let mut chunks = Vec::with_capacity(total_shards);
for (i, shard) in shards.into_iter().enumerate() {
chunks.push(Chunk {
message_id,
chunk_index: i as u8,
total_chunks: total,
is_parity: i >= data_shard_count,
payload: shard,
});
}
// Encode the original data length in the first chunk's first 4 bytes
// so the receiver can trim padding after reconstruction.
let data_len = data.len() as u32;
chunks[0].payload[..4].copy_from_slice(&data_len.to_le_bytes());
// Re-encode FEC to reflect the length header change
let mut shard_data: Vec<Vec<u8>> = chunks.iter().map(|c| c.payload.clone()).collect();
rs.encode(&mut shard_data)
.context("Reed-Solomon re-encoding failed")?;
for (i, shard) in shard_data.into_iter().enumerate() {
chunks[i].payload = shard;
}
Ok(chunks)
}
/// In-progress reassembly of a chunked message.
struct PendingMessage {
shards: Vec<Option<Vec<u8>>>,
data_shard_count: usize,
parity_shard_count: usize,
received_count: usize,
created_at: Instant,
}
/// Reassembles chunked messages from incoming chunks.
pub struct ChunkReassembler {
pending: HashMap<u32, PendingMessage>,
}
impl ChunkReassembler {
pub fn new() -> Self {
Self {
pending: HashMap::new(),
}
}
/// Feed a chunk into the reassembler.
/// Returns `Some(data)` if the message is fully reconstructed.
pub fn feed(&mut self, chunk: &Chunk) -> Result<Option<Vec<u8>>> {
// Garbage collect stale entries
self.pending.retain(|_, pm| {
pm.created_at.elapsed().as_secs() < REASSEMBLY_TIMEOUT_SECS
});
let total = chunk.total_chunks as usize;
let entry = self.pending.entry(chunk.message_id).or_insert_with(|| {
// Infer data vs parity count from chunks we've seen
// The first non-parity chunk tells us the split point
let data_count = if chunk.is_parity {
// Best guess: 80% data, 20% parity
(total * FEC_RATIO_DENOMINATOR) / (FEC_RATIO_DENOMINATOR + 1)
} else {
// We know this index is data — parity starts after all data
// Exact split point: smallest i where chunk_index >= data_count AND is_parity
total - (total + FEC_RATIO_DENOMINATOR) / (FEC_RATIO_DENOMINATOR + 1)
};
let parity_count = total - data_count;
PendingMessage {
shards: vec![None; total],
data_shard_count: data_count,
parity_shard_count: parity_count,
received_count: 0,
created_at: Instant::now(),
}
});
let idx = chunk.chunk_index as usize;
if idx >= total {
anyhow::bail!("Chunk index {} out of range (total {})", idx, total);
}
if entry.shards[idx].is_none() {
entry.shards[idx] = Some(chunk.payload.clone());
entry.received_count += 1;
}
// Need at least data_shard_count shards to reconstruct
if entry.received_count >= entry.data_shard_count {
self.try_reconstruct(chunk.message_id)
} else {
Ok(None)
}
}
fn try_reconstruct(&mut self, message_id: u32) -> Result<Option<Vec<u8>>> {
let entry = match self.pending.get_mut(&message_id) {
Some(e) => e,
None => return Ok(None),
};
let rs = ReedSolomon::new(entry.data_shard_count, entry.parity_shard_count)
.context("Failed to create Reed-Solomon codec for reconstruction")?;
let mut shards: Vec<Option<Vec<u8>>> = entry.shards.clone();
match rs.reconstruct(&mut shards) {
Ok(()) => {
// Concatenate data shards (not parity)
let mut result = Vec::new();
for shard in shards.iter().take(entry.data_shard_count) {
if let Some(data) = shard {
result.extend_from_slice(data);
}
}
// Extract original length from first 4 bytes
if result.len() < 4 {
anyhow::bail!("Reconstructed data too small for length header");
}
let original_len =
u32::from_le_bytes([result[0], result[1], result[2], result[3]]) as usize;
// The actual data starts at byte 4 of the first shard
// But wait — the length is embedded in shard 0 bytes 0..4, and the
// actual payload starts at byte 4 of shard 0, then continues in subsequent shards.
// Actually, encode_chunked puts the length in the first 4 bytes of shard 0,
// and the rest of shard 0 + all other shards contain the original data.
// So we need to skip 4 bytes from the beginning.
if 4 + original_len > result.len() {
anyhow::bail!(
"Original length {} exceeds reconstructed data ({})",
original_len,
result.len() - 4
);
}
let data = result[4..4 + original_len].to_vec();
self.pending.remove(&message_id);
Ok(Some(data))
}
Err(_) => {
// Not enough shards yet
Ok(None)
}
}
}
}
impl Default for ChunkReassembler {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_chunk_roundtrip_small() {
// Small payload fits in 1 data chunk + 1 parity chunk
let data = b"Hello, mesh network!";
let chunks = encode_chunked(data).unwrap();
// 1 data + 1 parity = 2 chunks
assert_eq!(chunks.len(), 2);
assert!(!chunks[0].is_parity);
assert!(chunks[1].is_parity);
let mut reassembler = ChunkReassembler::new();
// Feed data chunk — should reconstruct immediately (1 data shard needed)
let result = reassembler.feed(&chunks[0]).unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap(), data);
}
#[test]
fn test_chunk_roundtrip_medium() {
// ~500 bytes: 4 data chunks + 1 parity
let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
let chunks = encode_chunked(&data).unwrap();
let data_chunks: Vec<_> = chunks.iter().filter(|c| !c.is_parity).collect();
let parity_chunks: Vec<_> = chunks.iter().filter(|c| c.is_parity).collect();
assert_eq!(data_chunks.len(), 4); // ceil(500/124) = 5... wait
// Actually: ceil(500/124) = ceil(4.03) = 5 data shards
// But the first shard has 4 bytes of length header embedded, so
// the actual data capacity is 124 * N - 0 (length is IN the shard data).
// Let's just check it roundtrips.
let mut reassembler = ChunkReassembler::new();
let mut result = None;
for chunk in &chunks {
if let Some(data) = reassembler.feed(chunk).unwrap() {
result = Some(data);
break;
}
}
assert!(result.is_some());
assert_eq!(result.unwrap(), data);
}
#[test]
fn test_chunk_wire_format() {
let chunk = Chunk {
message_id: 0x12345678,
chunk_index: 2,
total_chunks: 5,
is_parity: false,
payload: vec![0xAA, 0xBB],
};
let bytes = chunk.to_bytes();
assert_eq!(bytes[0], CHUNK_TYPE_MARKER);
assert_eq!(&bytes[1..5], &0x12345678u32.to_le_bytes());
assert_eq!(bytes[5], 2);
assert_eq!(bytes[6], 5);
assert_eq!(bytes[7], 0);
assert_eq!(&bytes[8..], &[0xAA, 0xBB]);
let parsed = Chunk::from_bytes(&bytes).unwrap();
assert_eq!(parsed.message_id, 0x12345678);
assert_eq!(parsed.chunk_index, 2);
assert_eq!(parsed.total_chunks, 5);
assert!(!parsed.is_parity);
assert_eq!(parsed.payload, vec![0xAA, 0xBB]);
}
#[test]
fn test_chunk_is_chunked_message() {
assert!(Chunk::is_chunked_message(&[0x01, 0x00]));
assert!(!Chunk::is_chunked_message(&[0x02, 0x00]));
assert!(!Chunk::is_chunked_message(&[]));
}
#[test]
fn test_chunk_with_missing_chunk() {
// Verify FEC can recover from a missing data chunk
let data: Vec<u8> = (0..300).map(|i| (i % 256) as u8).collect();
let chunks = encode_chunked(&data).unwrap();
let mut reassembler = ChunkReassembler::new();
// Skip chunk index 1 (simulate loss)
for chunk in &chunks {
if chunk.chunk_index == 1 {
continue;
}
if let Some(recovered) = reassembler.feed(chunk).unwrap() {
assert_eq!(recovered, data);
return;
}
}
panic!("Failed to reconstruct with one missing chunk");
}
#[test]
fn test_empty_data_rejected() {
assert!(encode_chunked(&[]).is_err());
}
#[test]
fn test_too_large_rejected() {
let data = vec![0u8; MAX_CHUNK_PAYLOAD * (MAX_PRACTICAL_CHUNKS + 1)];
assert!(encode_chunked(&data).is_err());
}
}

View File

@@ -0,0 +1,399 @@
//! CBOR delta encoding for federation state sync.
//!
//! Instead of sending a full NodeStateSnapshot (~500-2000 bytes JSON) on every
//! sync cycle, we compute a delta of only changed fields and encode it as CBOR.
//! A typical delta (CPU + memory change) is ~30-50 bytes — small enough to fit
//! in a single LoRa chunk after encryption.
use crate::federation::{AppStatus, NodeStateSnapshot};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
/// Delta format version. Increment when fields change.
const DELTA_VERSION: u8 = 1;
/// Compact state delta — only changed fields, with short field names.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct StateDelta {
/// Timestamp of the snapshot this delta represents.
pub ts: String,
/// Delta format version for forward compatibility.
pub v: u8,
/// Apps that changed status (full entry for each changed app).
#[serde(skip_serializing_if = "Option::is_none")]
pub apps: Option<Vec<AppStatus>>,
/// App IDs that were removed since last sync.
#[serde(skip_serializing_if = "Option::is_none")]
pub apps_rm: Option<Vec<String>>,
/// CPU usage percent (only if changed).
#[serde(skip_serializing_if = "Option::is_none")]
pub cpu: Option<f64>,
/// Memory used bytes (only if changed).
#[serde(skip_serializing_if = "Option::is_none")]
pub mem_u: Option<u64>,
/// Memory total bytes (only if changed).
#[serde(skip_serializing_if = "Option::is_none")]
pub mem_t: Option<u64>,
/// Disk used bytes (only if changed).
#[serde(skip_serializing_if = "Option::is_none")]
pub dsk_u: Option<u64>,
/// Disk total bytes (only if changed).
#[serde(skip_serializing_if = "Option::is_none")]
pub dsk_t: Option<u64>,
/// Uptime seconds (only if changed).
#[serde(skip_serializing_if = "Option::is_none")]
pub up: Option<u64>,
/// Tor active flag (only if changed).
#[serde(skip_serializing_if = "Option::is_none")]
pub tor: Option<bool>,
}
/// Compute the delta between two state snapshots.
/// Returns only the fields that differ.
pub fn compute_delta(prev: &NodeStateSnapshot, curr: &NodeStateSnapshot) -> StateDelta {
let mut delta = StateDelta {
ts: curr.timestamp.clone(),
v: DELTA_VERSION,
..Default::default()
};
// Compare apps
let prev_apps: std::collections::HashMap<&str, &AppStatus> =
prev.apps.iter().map(|a| (a.id.as_str(), a)).collect();
let curr_apps: std::collections::HashMap<&str, &AppStatus> =
curr.apps.iter().map(|a| (a.id.as_str(), a)).collect();
let mut changed_apps = Vec::new();
let mut removed_apps = Vec::new();
for (id, curr_app) in &curr_apps {
match prev_apps.get(id) {
Some(prev_app) => {
if prev_app.status != curr_app.status || prev_app.version != curr_app.version {
changed_apps.push((*curr_app).clone());
}
}
None => changed_apps.push((*curr_app).clone()),
}
}
for id in prev_apps.keys() {
if !curr_apps.contains_key(id) {
removed_apps.push(id.to_string());
}
}
if !changed_apps.is_empty() {
delta.apps = Some(changed_apps);
}
if !removed_apps.is_empty() {
delta.apps_rm = Some(removed_apps);
}
// Compare scalar fields
if curr.cpu_usage_percent != prev.cpu_usage_percent {
delta.cpu = curr.cpu_usage_percent;
}
if curr.mem_used_bytes != prev.mem_used_bytes {
delta.mem_u = curr.mem_used_bytes;
}
if curr.mem_total_bytes != prev.mem_total_bytes {
delta.mem_t = curr.mem_total_bytes;
}
if curr.disk_used_bytes != prev.disk_used_bytes {
delta.dsk_u = curr.disk_used_bytes;
}
if curr.disk_total_bytes != prev.disk_total_bytes {
delta.dsk_t = curr.disk_total_bytes;
}
if curr.uptime_secs != prev.uptime_secs {
delta.up = curr.uptime_secs;
}
if curr.tor_active != prev.tor_active {
delta.tor = curr.tor_active;
}
delta
}
/// Apply a delta to a base snapshot, producing an updated snapshot.
pub fn apply_delta(base: &NodeStateSnapshot, delta: &StateDelta) -> NodeStateSnapshot {
let mut result = base.clone();
result.timestamp = delta.ts.clone();
// Apply app changes
if let Some(changed) = &delta.apps {
for app in changed {
if let Some(existing) = result.apps.iter_mut().find(|a| a.id == app.id) {
existing.status = app.status.clone();
existing.version = app.version.clone();
} else {
result.apps.push(app.clone());
}
}
}
// Apply app removals
if let Some(removed) = &delta.apps_rm {
result.apps.retain(|a| !removed.contains(&a.id));
}
// Apply scalar fields
if let Some(cpu) = delta.cpu {
result.cpu_usage_percent = Some(cpu);
}
if let Some(mem_u) = delta.mem_u {
result.mem_used_bytes = Some(mem_u);
}
if let Some(mem_t) = delta.mem_t {
result.mem_total_bytes = Some(mem_t);
}
if let Some(dsk_u) = delta.dsk_u {
result.disk_used_bytes = Some(dsk_u);
}
if let Some(dsk_t) = delta.dsk_t {
result.disk_total_bytes = Some(dsk_t);
}
if let Some(up) = delta.up {
result.uptime_secs = Some(up);
}
if let Some(tor) = delta.tor {
result.tor_active = Some(tor);
}
result
}
/// Encode a delta as CBOR bytes.
pub fn encode_cbor(delta: &StateDelta) -> Result<Vec<u8>> {
let mut buf = Vec::new();
ciborium::into_writer(delta, &mut buf).context("CBOR encode failed")?;
Ok(buf)
}
/// Decode a delta from CBOR bytes.
pub fn decode_cbor(data: &[u8]) -> Result<StateDelta> {
ciborium::from_reader(data).context("CBOR decode failed")
}
/// Encode a full state snapshot as CBOR (for initial sync or Tor transport).
pub fn encode_snapshot_cbor(snapshot: &NodeStateSnapshot) -> Result<Vec<u8>> {
let mut buf = Vec::new();
ciborium::into_writer(snapshot, &mut buf).context("CBOR snapshot encode failed")?;
Ok(buf)
}
/// Decode a full state snapshot from CBOR.
pub fn decode_snapshot_cbor(data: &[u8]) -> Result<NodeStateSnapshot> {
ciborium::from_reader(data).context("CBOR snapshot decode failed")
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_snapshot_a() -> NodeStateSnapshot {
NodeStateSnapshot {
timestamp: "2026-03-16T12:00:00Z".to_string(),
apps: vec![
AppStatus {
id: "bitcoin-knots".to_string(),
status: "running".to_string(),
version: Some("27.1".to_string()),
},
AppStatus {
id: "lnd".to_string(),
status: "running".to_string(),
version: Some("0.18.0".to_string()),
},
AppStatus {
id: "mempool".to_string(),
status: "stopped".to_string(),
version: Some("3.0".to_string()),
},
],
cpu_usage_percent: Some(23.5),
mem_used_bytes: Some(4_000_000_000),
mem_total_bytes: Some(16_000_000_000),
disk_used_bytes: Some(500_000_000_000),
disk_total_bytes: Some(1_800_000_000_000),
uptime_secs: Some(86400),
tor_active: Some(true),
}
}
fn sample_snapshot_b() -> NodeStateSnapshot {
NodeStateSnapshot {
timestamp: "2026-03-16T12:05:00Z".to_string(),
apps: vec![
AppStatus {
id: "bitcoin-knots".to_string(),
status: "running".to_string(),
version: Some("27.1".to_string()),
},
AppStatus {
id: "lnd".to_string(),
status: "running".to_string(),
version: Some("0.18.0".to_string()),
},
AppStatus {
id: "mempool".to_string(),
status: "running".to_string(), // Changed: stopped -> running
version: Some("3.0".to_string()),
},
],
cpu_usage_percent: Some(35.2), // Changed
mem_used_bytes: Some(4_500_000_000), // Changed
mem_total_bytes: Some(16_000_000_000),
disk_used_bytes: Some(500_000_000_000),
disk_total_bytes: Some(1_800_000_000_000),
uptime_secs: Some(86700), // Changed
tor_active: Some(true),
}
}
#[test]
fn test_compute_delta_detects_changes() {
let a = sample_snapshot_a();
let b = sample_snapshot_b();
let delta = compute_delta(&a, &b);
assert_eq!(delta.v, DELTA_VERSION);
assert_eq!(delta.ts, "2026-03-16T12:05:00Z");
// Mempool status changed
assert!(delta.apps.is_some());
let apps = delta.apps.as_ref().unwrap();
assert_eq!(apps.len(), 1);
assert_eq!(apps[0].id, "mempool");
assert_eq!(apps[0].status, "running");
// No apps removed
assert!(delta.apps_rm.is_none());
// Scalar changes
assert_eq!(delta.cpu, Some(35.2));
assert_eq!(delta.mem_u, Some(4_500_000_000));
assert_eq!(delta.up, Some(86700));
// Unchanged fields should be None
assert!(delta.mem_t.is_none());
assert!(delta.dsk_u.is_none());
assert!(delta.dsk_t.is_none());
assert!(delta.tor.is_none());
}
#[test]
fn test_apply_delta_reconstructs() {
let a = sample_snapshot_a();
let b = sample_snapshot_b();
let delta = compute_delta(&a, &b);
let reconstructed = apply_delta(&a, &delta);
assert_eq!(reconstructed.timestamp, b.timestamp);
assert_eq!(reconstructed.cpu_usage_percent, b.cpu_usage_percent);
assert_eq!(reconstructed.mem_used_bytes, b.mem_used_bytes);
assert_eq!(reconstructed.uptime_secs, b.uptime_secs);
// Check mempool status was updated
let mempool = reconstructed.apps.iter().find(|a| a.id == "mempool").unwrap();
assert_eq!(mempool.status, "running");
}
#[test]
fn test_delta_with_app_removal() {
let a = sample_snapshot_a();
let mut b = sample_snapshot_b();
// Remove mempool from b
b.apps.retain(|app| app.id != "mempool");
let delta = compute_delta(&a, &b);
assert!(delta.apps_rm.is_some());
assert_eq!(delta.apps_rm.as_ref().unwrap(), &["mempool".to_string()]);
let reconstructed = apply_delta(&a, &delta);
assert!(reconstructed.apps.iter().all(|a| a.id != "mempool"));
}
#[test]
fn test_delta_with_new_app() {
let a = sample_snapshot_a();
let mut b = sample_snapshot_b();
b.apps.push(AppStatus {
id: "electrs".to_string(),
status: "running".to_string(),
version: Some("0.10.0".to_string()),
});
let delta = compute_delta(&a, &b);
let apps = delta.apps.as_ref().unwrap();
assert!(apps.iter().any(|a| a.id == "electrs"));
let reconstructed = apply_delta(&a, &delta);
assert!(reconstructed.apps.iter().any(|a| a.id == "electrs"));
}
#[test]
fn test_cbor_roundtrip() {
let a = sample_snapshot_a();
let b = sample_snapshot_b();
let delta = compute_delta(&a, &b);
let encoded = encode_cbor(&delta).unwrap();
let decoded = decode_cbor(&encoded).unwrap();
assert_eq!(decoded.ts, delta.ts);
assert_eq!(decoded.cpu, delta.cpu);
assert_eq!(decoded.mem_u, delta.mem_u);
assert_eq!(decoded.up, delta.up);
}
#[test]
fn test_cbor_size_vs_json() {
let a = sample_snapshot_a();
let b = sample_snapshot_b();
let delta = compute_delta(&a, &b);
let cbor_bytes = encode_cbor(&delta).unwrap();
let json_bytes = serde_json::to_vec(&b).unwrap();
// CBOR delta should be significantly smaller than full JSON snapshot
assert!(
cbor_bytes.len() < json_bytes.len(),
"CBOR delta ({} bytes) should be smaller than full JSON ({} bytes)",
cbor_bytes.len(),
json_bytes.len()
);
}
#[test]
fn test_snapshot_cbor_roundtrip() {
let snapshot = sample_snapshot_a();
let encoded = encode_snapshot_cbor(&snapshot).unwrap();
let decoded = decode_snapshot_cbor(&encoded).unwrap();
assert_eq!(decoded.timestamp, snapshot.timestamp);
assert_eq!(decoded.apps.len(), snapshot.apps.len());
assert_eq!(decoded.cpu_usage_percent, snapshot.cpu_usage_percent);
}
#[test]
fn test_no_changes_produces_minimal_delta() {
let a = sample_snapshot_a();
let mut b = a.clone();
b.timestamp = "2026-03-16T12:01:00Z".to_string();
let delta = compute_delta(&a, &b);
// Only timestamp should differ
assert!(delta.apps.is_none());
assert!(delta.apps_rm.is_none());
assert!(delta.cpu.is_none());
assert!(delta.mem_u.is_none());
assert!(delta.tor.is_none());
let cbor_bytes = encode_cbor(&delta).unwrap();
// Minimal delta should be very small (just timestamp + version)
assert!(cbor_bytes.len() < 50, "Minimal delta should be <50 bytes, got {}", cbor_bytes.len());
}
}

View File

@@ -0,0 +1,170 @@
//! LAN transport — peer discovery via mDNS and direct HTTP messaging.
//!
//! Advertises this node as `_archipelago._tcp.local.` with TXT records
//! containing the node's DID and public key. Discovers other Archipelago
//! nodes on the same LAN segment. Sends messages via direct HTTP POST
//! to the discovered IP:port — same endpoint as Tor transport but without
//! the SOCKS5 proxy, for near-zero latency on local networks.
use super::{NodeTransport, PeerRegistry, PeerSource, TransportKind, TransportMessage};
use anyhow::{Context, Result};
use mdns_sd::{ServiceDaemon, ServiceEvent, ServiceInfo};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
const SERVICE_TYPE: &str = "_archipelago._tcp.local.";
const DEFAULT_PORT: u16 = 5678;
const LAN_TIMEOUT: Duration = Duration::from_secs(10);
pub struct LanTransport {
our_did: String,
our_pubkey_hex: String,
our_port: u16,
daemon: Option<ServiceDaemon>,
available: AtomicBool,
}
impl LanTransport {
/// Create a new LAN transport. Does not start discovery yet.
pub fn new(our_did: &str, our_pubkey_hex: &str, port: u16) -> Self {
Self {
our_did: our_did.to_string(),
our_pubkey_hex: our_pubkey_hex.to_string(),
our_port: port,
daemon: None,
available: AtomicBool::new(false),
}
}
/// Start the mDNS daemon, advertise our service, and begin browsing.
/// Non-blocking — spawns background tasks for discovery.
pub fn start(&mut self, registry: Arc<PeerRegistry>) -> Result<()> {
let daemon = ServiceDaemon::new()
.context("Failed to create mDNS daemon")?;
// Advertise our service
let hostname = format!("archy-{}.local.", &self.our_pubkey_hex[..8]);
let properties = vec![
("did".to_string(), self.our_did.clone()),
("pubkey".to_string(), self.our_pubkey_hex.clone()),
("version".to_string(), "0.1.0".to_string()),
];
let service_info = ServiceInfo::new(
SERVICE_TYPE,
&format!("archy-{}", &self.our_pubkey_hex[..8]),
&hostname,
"",
self.our_port,
Some(properties.into_iter().collect()),
)
.context("Failed to create mDNS service info")?;
daemon
.register(service_info)
.context("Failed to register mDNS service")?;
// Browse for other Archipelago nodes
let receiver = daemon
.browse(SERVICE_TYPE)
.context("Failed to browse mDNS services")?;
self.daemon = Some(daemon);
self.available.store(true, Ordering::Relaxed);
info!("LAN transport started — advertising {}", SERVICE_TYPE);
// Spawn background discovery listener
let registry_clone = registry;
tokio::spawn(async move {
while let Ok(event) = receiver.recv() {
match event {
ServiceEvent::ServiceResolved(info) => {
let did = info.get_properties().get("did").map(|v| v.val_str().to_string());
let pubkey = info.get_properties().get("pubkey").map(|v| v.val_str().to_string());
let addresses = info.get_addresses();
if let (Some(did), Some(pubkey)) = (did, pubkey) {
if let Some(scoped_ip) = addresses.iter().next() {
let ip: std::net::IpAddr = (*scoped_ip).into();
let socket_addr = std::net::SocketAddr::new(ip, info.get_port());
info!(did = %did, addr = %socket_addr, "Discovered LAN peer via mDNS");
registry_clone
.register_peer(&did, &pubkey, PeerSource::LanDiscovery)
.await;
registry_clone
.set_lan_address(&did, socket_addr)
.await;
registry_clone
.set_name(&did, info.get_fullname())
.await;
}
}
}
ServiceEvent::ServiceRemoved(_, name) => {
debug!(name = %name, "LAN peer removed");
}
_ => {}
}
}
});
Ok(())
}
async fn send_impl(&self, address: &str, message: &TransportMessage) -> Result<()> {
// address is "ip:port" format
let url = format!("http://{}/archipelago/node-message", address);
let encoded_payload = {
use base64::Engine;
base64::engine::general_purpose::STANDARD.encode(&message.payload)
};
let body = serde_json::json!({
"from_pubkey": self.our_pubkey_hex,
"from_did": message.from_did,
"message": encoded_payload,
"message_type": message.message_type,
"timestamp": chrono::Utc::now().to_rfc3339(),
"transport": "lan",
});
let client = reqwest::Client::builder()
.timeout(LAN_TIMEOUT)
.build()
.context("Failed to build LAN HTTP client")?;
let resp = client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| anyhow::anyhow!("LAN send to {} failed: {}", address, e))?;
if !resp.status().is_success() {
anyhow::bail!("LAN peer at {} returned {}", address, resp.status().as_u16());
}
Ok(())
}
}
impl NodeTransport for LanTransport {
fn kind(&self) -> TransportKind {
TransportKind::Lan
}
fn is_available(&self) -> bool {
self.available.load(Ordering::Relaxed)
}
fn send<'a>(
&'a self,
address: &'a str,
message: &'a TransportMessage,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.send_impl(address, message).await })
}
}

View File

@@ -0,0 +1,114 @@
//! Mesh transport — sends messages via LoRa radio through the MeshService.
//!
//! Bridges the transport abstraction to the existing mesh serial listener.
//! For payloads exceeding the LoRa frame limit (160 bytes), uses the chunking
//! protocol with Reed-Solomon FEC for reliable delivery.
use super::chunking::{self, ChunkReassembler, MAX_CHUNK_PAYLOAD};
use super::{NodeTransport, TransportKind, TransportMessage};
use crate::mesh::MeshService;
use anyhow::{Context, Result};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
/// Inter-chunk delay for LoRa airtime fairness.
const CHUNK_DELAY: Duration = Duration::from_millis(200);
/// Maximum single-frame payload (before chunking kicks in).
/// After ChaCha20-Poly1305 overhead: 160 - 12 (nonce) - 16 (tag) = 132 bytes.
const MAX_SINGLE_FRAME: usize = 132;
pub struct MeshTransport {
mesh_service: Arc<RwLock<Option<MeshService>>>,
reassembler: Arc<RwLock<ChunkReassembler>>,
}
impl MeshTransport {
pub fn new(mesh_service: Arc<RwLock<Option<MeshService>>>) -> Self {
Self {
mesh_service,
reassembler: Arc::new(RwLock::new(ChunkReassembler::new())),
}
}
/// Get a reference to the chunk reassembler (for incoming message processing).
pub fn reassembler(&self) -> Arc<RwLock<ChunkReassembler>> {
Arc::clone(&self.reassembler)
}
async fn send_impl(&self, contact_id_str: &str, message: &TransportMessage) -> Result<()> {
let contact_id: u32 = contact_id_str
.parse()
.context("Invalid mesh contact ID")?;
let service = self.mesh_service.read().await;
let service = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
// Serialize the transport message as CBOR for compact encoding
let mut payload = Vec::new();
ciborium::into_writer(message, &mut payload)
.context("Failed to CBOR-encode transport message")?;
if payload.len() <= MAX_SINGLE_FRAME {
// Fits in a single LoRa frame — send directly as text
let text = {
use base64::Engine;
base64::engine::general_purpose::STANDARD.encode(&payload)
};
service
.send_message(contact_id, &text)
.await
.context("Mesh single-frame send failed")?;
} else {
// Chunk with FEC
let chunks = chunking::encode_chunked(&payload)?;
tracing::info!(
chunks = chunks.len(),
payload_bytes = payload.len(),
"Sending chunked message over mesh"
);
for chunk in &chunks {
let chunk_bytes = chunk.to_bytes();
let text = {
use base64::Engine;
base64::engine::general_purpose::STANDARD.encode(&chunk_bytes)
};
service
.send_message(contact_id, &text)
.await
.context("Mesh chunk send failed")?;
tokio::time::sleep(CHUNK_DELAY).await;
}
}
Ok(())
}
}
impl NodeTransport for MeshTransport {
fn kind(&self) -> TransportKind {
TransportKind::Mesh
}
fn is_available(&self) -> bool {
// Check synchronously — we can't await here, so use try_read
match self.mesh_service.try_read() {
Ok(guard) => match guard.as_ref() {
Some(_service) => true, // Service exists
None => false,
},
Err(_) => false, // Lock contention — assume unavailable
}
}
fn send<'a>(
&'a self,
address: &'a str,
message: &'a TransportMessage,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.send_impl(address, message).await })
}
}

View File

@@ -0,0 +1,568 @@
//! Transport abstraction layer for Archipelago node-to-node communication.
//!
//! Unifies mesh radio (LoRa), LAN (mDNS), and Tor under a common trait.
//! Routes messages to peers via the best available transport with automatic
//! fallback: Mesh (priority 1) > LAN (2) > Tor (3).
pub mod chunking;
pub mod delta;
pub mod lan;
pub mod mesh_transport;
pub mod tor;
use crate::federation::TrustLevel;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
use tokio::sync::RwLock;
use tracing::{info, warn};
// ─── Transport Kind ─────────────────────────────────────────────────────
/// Transport backend type, ordered by priority (lower = preferred).
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum TransportKind {
Mesh = 1,
Lan = 2,
Tor = 3,
}
impl std::fmt::Display for TransportKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Mesh => write!(f, "mesh"),
Self::Lan => write!(f, "lan"),
Self::Tor => write!(f, "tor"),
}
}
}
// ─── Message Types ──────────────────────────────────────────────────────
/// Type of transport-level message.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MessageType {
StateSync,
PeerMessage,
FederationRpc,
}
/// A message sent between nodes via any transport.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransportMessage {
pub from_did: String,
pub payload: Vec<u8>,
pub message_type: MessageType,
}
// ─── NodeTransport Trait ────────────────────────────────────────────────
/// Trait implemented by each transport backend (Tor, Mesh, LAN).
pub trait NodeTransport: Send + Sync {
/// Which transport this is.
fn kind(&self) -> TransportKind;
/// Whether this transport is currently operational.
fn is_available(&self) -> bool;
/// Send raw bytes to a peer at their transport-specific address.
/// For Tor: address is an onion hostname.
/// For Mesh: address is a contact_id as string.
/// For LAN: address is "ip:port".
fn send<'a>(
&'a self,
address: &'a str,
message: &'a TransportMessage,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>>;
}
// ─── Peer Registry ──────────────────────────────────────────────────────
/// How we discovered this peer.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PeerSource {
Federation,
MeshDiscovery,
LanDiscovery,
NostrHandshake,
Manual,
}
/// Unified peer record with per-transport capabilities.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerRecord {
pub did: String,
pub pubkey_hex: String,
#[serde(default)]
pub name: Option<String>,
#[serde(default)]
pub trust_level: Option<TrustLevel>,
#[serde(default)]
pub source: Option<PeerSource>,
// Transport-specific addresses
#[serde(default)]
pub mesh_contact_id: Option<u32>,
#[serde(default)]
pub lan_address: Option<String>,
#[serde(default)]
pub onion_address: Option<String>,
// Freshness timestamps (RFC 3339)
#[serde(default)]
pub last_mesh: Option<String>,
#[serde(default)]
pub last_lan: Option<String>,
#[serde(default)]
pub last_tor: Option<String>,
}
impl PeerRecord {
/// Get the transport-specific address for a given transport kind.
pub fn address_for(&self, kind: TransportKind) -> Option<String> {
match kind {
TransportKind::Mesh => self.mesh_contact_id.map(|id| id.to_string()),
TransportKind::Lan => self.lan_address.clone(),
TransportKind::Tor => self.onion_address.clone(),
}
}
/// Check if the last-seen timestamp for a transport is fresh enough.
/// Mesh/LAN: 5 minutes. Tor: 1 hour.
pub fn is_fresh(&self, kind: TransportKind) -> bool {
let timestamp = match kind {
TransportKind::Mesh => self.last_mesh.as_deref(),
TransportKind::Lan => self.last_lan.as_deref(),
TransportKind::Tor => self.last_tor.as_deref(),
};
let Some(ts) = timestamp else {
// No timestamp means we haven't confirmed it, but the address exists.
// Allow it — the send will fail if unreachable.
return true;
};
let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(ts) else {
return false;
};
let age = chrono::Utc::now().signed_duration_since(parsed);
let max_age = match kind {
TransportKind::Mesh | TransportKind::Lan => chrono::Duration::minutes(5),
TransportKind::Tor => chrono::Duration::hours(1),
};
age < max_age
}
/// List available transport kinds for this peer, in priority order.
pub fn available_transports(&self) -> Vec<TransportKind> {
let mut result = Vec::new();
if self.mesh_contact_id.is_some() {
result.push(TransportKind::Mesh);
}
if self.lan_address.is_some() {
result.push(TransportKind::Lan);
}
if self.onion_address.is_some() {
result.push(TransportKind::Tor);
}
result
}
}
const PEERS_FILE: &str = "transport-peers.json";
/// Thread-safe registry of all known peers with their transport capabilities.
pub struct PeerRegistry {
peers: RwLock<HashMap<String, PeerRecord>>,
data_dir: PathBuf,
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct PeersFile {
peers: Vec<PeerRecord>,
}
impl PeerRegistry {
/// Load peer registry from disk (or create empty).
pub async fn load(data_dir: &Path) -> Result<Self> {
let path = data_dir.join(PEERS_FILE);
let peers = if path.exists() {
let content = fs::read_to_string(&path)
.await
.context("Failed to read transport peers")?;
let file: PeersFile = serde_json::from_str(&content).unwrap_or_default();
file.peers
.into_iter()
.map(|p| (p.did.clone(), p))
.collect()
} else {
HashMap::new()
};
Ok(Self {
peers: RwLock::new(peers),
data_dir: data_dir.to_path_buf(),
})
}
/// Persist current state to disk.
pub async fn save(&self) -> Result<()> {
let peers = self.peers.read().await;
let file = PeersFile {
peers: peers.values().cloned().collect(),
};
let content =
serde_json::to_string_pretty(&file).context("Failed to serialize transport peers")?;
fs::write(self.data_dir.join(PEERS_FILE), content)
.await
.context("Failed to write transport peers")?;
Ok(())
}
/// Register or update a peer.
pub async fn register_peer(
&self,
did: &str,
pubkey_hex: &str,
source: PeerSource,
) -> PeerRecord {
let mut peers = self.peers.write().await;
let record = peers.entry(did.to_string()).or_insert_with(|| PeerRecord {
did: did.to_string(),
pubkey_hex: pubkey_hex.to_string(),
name: None,
trust_level: None,
source: Some(source.clone()),
mesh_contact_id: None,
lan_address: None,
onion_address: None,
last_mesh: None,
last_lan: None,
last_tor: None,
});
// Update pubkey if it changed
if record.pubkey_hex != pubkey_hex {
record.pubkey_hex = pubkey_hex.to_string();
}
record.clone()
}
/// Set the mesh contact ID for a peer.
pub async fn set_mesh_id(&self, did: &str, contact_id: u32) {
let mut peers = self.peers.write().await;
if let Some(peer) = peers.get_mut(did) {
peer.mesh_contact_id = Some(contact_id);
peer.last_mesh = Some(chrono::Utc::now().to_rfc3339());
}
}
/// Set the LAN address for a peer.
pub async fn set_lan_address(&self, did: &str, addr: SocketAddr) {
let mut peers = self.peers.write().await;
if let Some(peer) = peers.get_mut(did) {
peer.lan_address = Some(addr.to_string());
peer.last_lan = Some(chrono::Utc::now().to_rfc3339());
}
}
/// Set the onion address for a peer.
pub async fn set_onion(&self, did: &str, onion: &str) {
let mut peers = self.peers.write().await;
if let Some(peer) = peers.get_mut(did) {
peer.onion_address = Some(onion.to_string());
peer.last_tor = Some(chrono::Utc::now().to_rfc3339());
}
}
/// Set the display name for a peer.
pub async fn set_name(&self, did: &str, name: &str) {
let mut peers = self.peers.write().await;
if let Some(peer) = peers.get_mut(did) {
peer.name = Some(name.to_string());
}
}
/// Get a peer by DID.
pub async fn get_peer(&self, did: &str) -> Option<PeerRecord> {
self.peers.read().await.get(did).cloned()
}
/// Get all peers.
pub async fn all_peers(&self) -> Vec<PeerRecord> {
self.peers.read().await.values().cloned().collect()
}
/// Count of registered peers.
pub async fn count(&self) -> usize {
self.peers.read().await.len()
}
}
// ─── Transport Router ───────────────────────────────────────────────────
/// Routes messages to the best available transport per peer.
pub struct TransportRouter {
transports: Vec<Box<dyn NodeTransport>>,
pub registry: Arc<PeerRegistry>,
mesh_only: RwLock<bool>,
}
impl TransportRouter {
pub fn new(
transports: Vec<Box<dyn NodeTransport>>,
registry: Arc<PeerRegistry>,
mesh_only: bool,
) -> Self {
Self {
transports,
registry,
mesh_only: RwLock::new(mesh_only),
}
}
/// Send a message to a peer by DID, using the best available transport.
pub async fn send_to_peer(
&self,
did: &str,
message: &TransportMessage,
) -> Result<TransportKind> {
let peer = self
.registry
.get_peer(did)
.await
.ok_or_else(|| anyhow::anyhow!("Unknown peer: {}", did))?;
let candidates = self.route(&peer).await;
if candidates.is_empty() {
anyhow::bail!("No available transport for peer {}", did);
}
let mut last_err = None;
for kind in &candidates {
let transport = match self.transports.iter().find(|t| t.kind() == *kind) {
Some(t) => t,
None => continue,
};
let address = match peer.address_for(*kind) {
Some(a) => a,
None => continue,
};
match transport.send(&address, message).await {
Ok(()) => {
info!(transport = %kind, peer = %did, "Message sent");
return Ok(*kind);
}
Err(e) => {
warn!(transport = %kind, peer = %did, error = %e, "Transport failed, trying next");
last_err = Some(e);
}
}
}
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("All transports failed for peer {}", did)))
}
/// Determine transport priority for a peer.
async fn route(&self, peer: &PeerRecord) -> Vec<TransportKind> {
let mesh_only = *self.mesh_only.read().await;
let mut available = Vec::new();
if mesh_only {
// Off-grid mode: only mesh
if peer.mesh_contact_id.is_some() {
available.push(TransportKind::Mesh);
}
} else {
// Normal mode: priority order, check freshness
if peer.mesh_contact_id.is_some() && peer.is_fresh(TransportKind::Mesh) {
if let Some(t) = self.transports.iter().find(|t| t.kind() == TransportKind::Mesh) {
if t.is_available() {
available.push(TransportKind::Mesh);
}
}
}
if peer.lan_address.is_some() && peer.is_fresh(TransportKind::Lan) {
if let Some(t) = self.transports.iter().find(|t| t.kind() == TransportKind::Lan) {
if t.is_available() {
available.push(TransportKind::Lan);
}
}
}
if peer.onion_address.is_some() {
if let Some(t) = self.transports.iter().find(|t| t.kind() == TransportKind::Tor) {
if t.is_available() {
available.push(TransportKind::Tor);
}
}
}
}
available
}
/// Set mesh-only (off-grid) mode.
pub async fn set_mesh_only(&self, enabled: bool) {
*self.mesh_only.write().await = enabled;
}
/// Get current mesh-only mode status.
pub async fn is_mesh_only(&self) -> bool {
*self.mesh_only.read().await
}
/// Get status of all transports.
pub fn transport_status(&self) -> Vec<(TransportKind, bool)> {
self.transports
.iter()
.map(|t| (t.kind(), t.is_available()))
.collect()
}
}
// ─── Tests ──────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_transport_kind_ordering() {
assert!(TransportKind::Mesh < TransportKind::Lan);
assert!(TransportKind::Lan < TransportKind::Tor);
}
#[test]
fn test_peer_record_address_for() {
let peer = PeerRecord {
did: "did:key:z6MkTest".to_string(),
pubkey_hex: "aabb".to_string(),
name: Some("test-node".to_string()),
trust_level: None,
source: None,
mesh_contact_id: Some(42),
lan_address: Some("192.168.1.100:5678".to_string()),
onion_address: Some("abc123.onion".to_string()),
last_mesh: None,
last_lan: None,
last_tor: None,
};
assert_eq!(peer.address_for(TransportKind::Mesh), Some("42".to_string()));
assert_eq!(
peer.address_for(TransportKind::Lan),
Some("192.168.1.100:5678".to_string())
);
assert_eq!(
peer.address_for(TransportKind::Tor),
Some("abc123.onion".to_string())
);
}
#[test]
fn test_peer_record_available_transports() {
let peer = PeerRecord {
did: "did:key:z6MkTest".to_string(),
pubkey_hex: "aabb".to_string(),
name: None,
trust_level: None,
source: None,
mesh_contact_id: Some(1),
lan_address: None,
onion_address: Some("test.onion".to_string()),
last_mesh: None,
last_lan: None,
last_tor: None,
};
let transports = peer.available_transports();
assert_eq!(transports, vec![TransportKind::Mesh, TransportKind::Tor]);
}
#[test]
fn test_freshness_no_timestamp() {
let peer = PeerRecord {
did: "did:key:z6MkTest".to_string(),
pubkey_hex: "aabb".to_string(),
name: None,
trust_level: None,
source: None,
mesh_contact_id: Some(1),
lan_address: None,
onion_address: None,
last_mesh: None,
last_lan: None,
last_tor: None,
};
// No timestamp = considered fresh (allows first attempt)
assert!(peer.is_fresh(TransportKind::Mesh));
}
#[test]
fn test_freshness_recent_timestamp() {
let peer = PeerRecord {
did: "did:key:z6MkTest".to_string(),
pubkey_hex: "aabb".to_string(),
name: None,
trust_level: None,
source: None,
mesh_contact_id: Some(1),
lan_address: None,
onion_address: None,
last_mesh: Some(chrono::Utc::now().to_rfc3339()),
last_lan: None,
last_tor: None,
};
assert!(peer.is_fresh(TransportKind::Mesh));
}
#[test]
fn test_freshness_stale_timestamp() {
let stale = chrono::Utc::now() - chrono::Duration::minutes(10);
let peer = PeerRecord {
did: "did:key:z6MkTest".to_string(),
pubkey_hex: "aabb".to_string(),
name: None,
trust_level: None,
source: None,
mesh_contact_id: Some(1),
lan_address: None,
onion_address: None,
last_mesh: Some(stale.to_rfc3339()),
last_lan: None,
last_tor: None,
};
// 10 minutes old > 5 minute mesh freshness threshold
assert!(!peer.is_fresh(TransportKind::Mesh));
}
#[tokio::test]
async fn test_peer_registry_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let registry = PeerRegistry::load(dir.path()).await.unwrap();
registry
.register_peer(
"did:key:z6MkTest",
"aabbccdd",
PeerSource::MeshDiscovery,
)
.await;
registry.set_mesh_id("did:key:z6MkTest", 42).await;
registry
.set_onion("did:key:z6MkTest", "test123.onion")
.await;
registry.save().await.unwrap();
// Reload from disk
let registry2 = PeerRegistry::load(dir.path()).await.unwrap();
let peer = registry2.get_peer("did:key:z6MkTest").await.unwrap();
assert_eq!(peer.mesh_contact_id, Some(42));
assert_eq!(peer.onion_address, Some("test123.onion".to_string()));
assert_eq!(peer.pubkey_hex, "aabbccdd");
}
}

View File

@@ -0,0 +1,102 @@
//! Tor transport — sends messages via HTTP POST through SOCKS5 proxy.
//!
//! Wraps the existing `node_message.rs` Tor messaging logic behind
//! the `NodeTransport` trait.
use super::{MessageType, NodeTransport, TransportKind, TransportMessage};
use anyhow::{Context, Result};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
const TOR_SOCKS: &str = "socks5h://127.0.0.1:9050";
const TOR_TIMEOUT: Duration = Duration::from_secs(60);
pub struct TorTransport {
our_pubkey_hex: String,
available: AtomicBool,
}
impl TorTransport {
pub fn new(our_pubkey_hex: &str) -> Self {
Self {
our_pubkey_hex: our_pubkey_hex.to_string(),
available: AtomicBool::new(true), // Assume available, checked lazily
}
}
/// Update availability (call periodically from health check).
pub fn set_available(&self, avail: bool) {
self.available.store(avail, Ordering::Relaxed);
}
async fn send_impl(&self, onion_address: &str, message: &TransportMessage) -> Result<()> {
let host = if onion_address.ends_with(".onion") {
onion_address.to_string()
} else {
format!("{}.onion", onion_address)
};
let url = format!("http://{}/archipelago/node-message", host);
let encoded_payload = {
use base64::Engine;
base64::engine::general_purpose::STANDARD.encode(&message.payload)
};
let body = serde_json::json!({
"from_pubkey": self.our_pubkey_hex,
"message": encoded_payload,
"message_type": message.message_type,
"from_did": message.from_did,
"timestamp": chrono::Utc::now().to_rfc3339(),
});
let proxy = reqwest::Proxy::all(TOR_SOCKS).context("Invalid Tor proxy")?;
let client = reqwest::Client::builder()
.proxy(proxy)
.timeout(TOR_TIMEOUT)
.build()
.context("Failed to build Tor HTTP client")?;
let resp = client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| {
let msg = e.to_string();
if msg.contains("connection refused") || msg.contains("Connection refused") {
self.available.store(false, Ordering::Relaxed);
anyhow::anyhow!("Tor not reachable at 127.0.0.1:9050")
} else if msg.contains("timeout") || msg.contains("timed out") {
anyhow::anyhow!("Tor connection timed out — peer may be offline")
} else {
anyhow::anyhow!("Tor send failed: {}", msg)
}
})?;
if !resp.status().is_success() {
anyhow::bail!(
"Peer returned {} over Tor",
resp.status().as_u16()
);
}
Ok(())
}
}
impl NodeTransport for TorTransport {
fn kind(&self) -> TransportKind {
TransportKind::Tor
}
fn is_available(&self) -> bool {
self.available.load(Ordering::Relaxed)
}
fn send<'a>(
&'a self,
address: &'a str,
message: &'a TransportMessage,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.send_impl(address, message).await })
}
}