feat: v1.2.0-alpha — E2E encrypted mesh relay, steganography, relay status polling
Phase 5 mesh networking: - E2E encrypted TX relay (X25519 + ChaCha20-Poly1305) — non-Archy nodes relay encrypted blobs transparently via Meshcore native routing - Steganographic encoding modes (WeatherStation, SensorNetwork) — traffic looks like sensor data on the wire, 0xAA marker, configurable per-node - Pre-flight Bitcoin Core health check on relay node — specific error codes (bitcoin_unreachable, bitcoin_syncing, tx_rejected) instead of generic fails - mesh.relay-status RPC endpoint — frontend polls for relay result every 3s - On-Chain / Lightning tabs in Off-Grid Bitcoin panel - Archy Peers vs Mesh Broadcast relay mode selector - Mesh view fills viewport (no page scroll), internal panel scrolling - Version bump to 1.2.0-alpha Also includes: deploy hardening, container fixes, IndeedHub updates, boot screen, dashboard improvements, MASTER_PLAN task tracking Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
10
core/Cargo.lock
generated
10
core/Cargo.lock
generated
@@ -101,6 +101,7 @@ dependencies = [
|
||||
"flate2",
|
||||
"futures-util",
|
||||
"hex",
|
||||
"hkdf",
|
||||
"hmac",
|
||||
"http-body 1.0.1",
|
||||
"http-body-util",
|
||||
@@ -1058,6 +1059,15 @@ dependencies = [
|
||||
"arrayvec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hkdf"
|
||||
version = "0.12.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7"
|
||||
dependencies = [
|
||||
"hmac",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hmac"
|
||||
version = "0.12.1"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "archipelago"
|
||||
version = "1.1.0"
|
||||
version = "1.2.0-alpha"
|
||||
edition = "2021"
|
||||
description = "Archipelago Bitcoin Node OS - Native backend"
|
||||
authors = ["Archipelago Team"]
|
||||
|
||||
@@ -751,16 +751,21 @@ impl RpcHandler {
|
||||
return Err(anyhow::anyhow!("Failed to sign TX: {}", msg));
|
||||
}
|
||||
|
||||
// raw_final_tx is the hex-encoded signed transaction — ready for broadcast
|
||||
let raw_final_tx = body.get("raw_final_tx")
|
||||
// raw_final_tx from LND is base64-encoded — decode to hex for Bitcoin RPC
|
||||
let raw_final_tx_b64 = body.get("raw_final_tx")
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or_else(|| anyhow::anyhow!("No raw_final_tx in response"))?
|
||||
.to_string();
|
||||
.ok_or_else(|| anyhow::anyhow!("No raw_final_tx in response"))?;
|
||||
|
||||
info!(addr, amount_sats, tx_len = raw_final_tx.len(), "Created raw TX for mesh relay (NOT broadcast)");
|
||||
use base64::Engine;
|
||||
let tx_bytes = base64::engine::general_purpose::STANDARD
|
||||
.decode(raw_final_tx_b64)
|
||||
.context("Failed to decode raw_final_tx base64")?;
|
||||
let raw_tx_hex = hex::encode(&tx_bytes);
|
||||
|
||||
info!(addr, amount_sats, tx_len = raw_tx_hex.len(), "Created raw TX for mesh relay (NOT broadcast)");
|
||||
|
||||
Ok(serde_json::json!({
|
||||
"raw_tx_hex": raw_final_tx,
|
||||
"raw_tx_hex": raw_tx_hex,
|
||||
"amount_sats": amount_sats,
|
||||
"addr": addr,
|
||||
"broadcast": false,
|
||||
|
||||
@@ -423,6 +423,10 @@ impl RpcHandler {
|
||||
.as_str()
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing tx_hex"))?;
|
||||
|
||||
let relay_mode = params["relay_mode"]
|
||||
.as_str()
|
||||
.unwrap_or("archy");
|
||||
|
||||
if tx_hex.len() < 20 || tx_hex.len() > 200_000 {
|
||||
anyhow::bail!("Invalid tx_hex length");
|
||||
}
|
||||
@@ -440,30 +444,83 @@ impl RpcHandler {
|
||||
|
||||
let wire = crate::mesh::bitcoin_relay::build_tx_relay_request(tx_hex, request_id)?;
|
||||
|
||||
// Send ONLY to Archipelago peers (Archy-* nodes), not broadcast to all devices
|
||||
let peers = svc.peers().await;
|
||||
let mut sent_count = 0u32;
|
||||
for peer in &peers {
|
||||
if !peer.advert_name.starts_with("Archy-") { continue; }
|
||||
if let Some(ref pk) = peer.pubkey_hex {
|
||||
if let Ok(pk_bytes) = hex::decode(pk) {
|
||||
if pk_bytes.len() >= 6 {
|
||||
let mut prefix = [0u8; 6];
|
||||
prefix.copy_from_slice(&pk_bytes[..6]);
|
||||
let _ = svc.shared_state()
|
||||
.cmd_tx
|
||||
.send(crate::mesh::listener::MeshCommand::SendRaw {
|
||||
dest_pubkey_prefix: prefix,
|
||||
payload: wire.clone(),
|
||||
})
|
||||
.await;
|
||||
sent_count += 1;
|
||||
|
||||
if relay_mode == "broadcast" {
|
||||
// Broadcast mode: send on channel 0 (all mesh nodes relay)
|
||||
// Still encrypted — only Archy nodes can decrypt and broadcast the TX
|
||||
let shared_state = svc.shared_state();
|
||||
let shared_secrets = shared_state.shared_secrets.read().await;
|
||||
|
||||
// Encrypt with first available Archy peer's shared secret
|
||||
// (any Archy node that receives it can try decrypting)
|
||||
let payload = shared_secrets.values().next()
|
||||
.and_then(|secret| {
|
||||
crate::mesh::crypto::encrypt(secret, &wire).ok().map(|ct| {
|
||||
let mut encrypted = Vec::with_capacity(1 + ct.len());
|
||||
encrypted.push(crate::mesh::message_types::ENCRYPTED_TYPED_MARKER);
|
||||
encrypted.extend_from_slice(&ct);
|
||||
encrypted
|
||||
})
|
||||
})
|
||||
.unwrap_or_else(|| wire.clone());
|
||||
drop(shared_secrets);
|
||||
|
||||
{
|
||||
use base64::Engine;
|
||||
let b64 = base64::engine::general_purpose::STANDARD.encode(&payload);
|
||||
let _ = shared_state
|
||||
.cmd_tx
|
||||
.send(crate::mesh::listener::MeshCommand::BroadcastChannel {
|
||||
channel: 0,
|
||||
payload: b64.into_bytes(),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
sent_count = 1;
|
||||
info!(request_id, tx_len = tx_hex.len(), "TX relay broadcast on mesh channel 0 (encrypted)");
|
||||
} else {
|
||||
// Archy mode: E2E encrypted per-peer, direct to known Archy nodes
|
||||
let peers = svc.peers().await;
|
||||
let shared_state = svc.shared_state();
|
||||
let shared_secrets = shared_state.shared_secrets.read().await;
|
||||
for peer in &peers {
|
||||
if !peer.advert_name.starts_with("Archy-") { continue; }
|
||||
if let Some(ref pk) = peer.pubkey_hex {
|
||||
if let Ok(pk_bytes) = hex::decode(pk) {
|
||||
if pk_bytes.len() >= 6 {
|
||||
let mut prefix = [0u8; 6];
|
||||
prefix.copy_from_slice(&pk_bytes[..6]);
|
||||
|
||||
let payload = if let Some(secret) = shared_secrets.get(&peer.contact_id) {
|
||||
match crate::mesh::crypto::encrypt(secret, &wire) {
|
||||
Ok(ciphertext) => {
|
||||
let mut encrypted = Vec::with_capacity(1 + ciphertext.len());
|
||||
encrypted.push(crate::mesh::message_types::ENCRYPTED_TYPED_MARKER);
|
||||
encrypted.extend_from_slice(&ciphertext);
|
||||
encrypted
|
||||
}
|
||||
Err(_) => wire.clone(),
|
||||
}
|
||||
} else {
|
||||
wire.clone()
|
||||
};
|
||||
|
||||
let _ = svc.shared_state()
|
||||
.cmd_tx
|
||||
.send(crate::mesh::listener::MeshCommand::SendRaw {
|
||||
dest_pubkey_prefix: prefix,
|
||||
payload,
|
||||
})
|
||||
.await;
|
||||
sent_count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(shared_secrets);
|
||||
info!(request_id, tx_len = tx_hex.len(), archy_peers = sent_count, "TX relay sent to Archy peers (E2E encrypted)");
|
||||
}
|
||||
|
||||
info!(request_id, tx_len = tx_hex.len(), archy_peers = sent_count, "TX relay sent to Archy peers only");
|
||||
Ok(serde_json::json!({
|
||||
"request_id": request_id,
|
||||
"queued": true,
|
||||
@@ -471,6 +528,47 @@ impl RpcHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
/// mesh.relay-status — Check the status of a pending or completed TX relay.
|
||||
pub(super) async fn handle_mesh_relay_status(
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
|
||||
let request_id = params["request_id"]
|
||||
.as_u64()
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing request_id"))?;
|
||||
|
||||
let service = self.mesh_service.read().await;
|
||||
let svc = service.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
|
||||
|
||||
// Check completed results first
|
||||
if let Some(result) = svc.relay_tracker.get_result(request_id).await {
|
||||
return Ok(serde_json::json!({
|
||||
"status": if result.txid.is_some() { "confirmed" } else { "failed" },
|
||||
"request_id": result.request_id,
|
||||
"txid": result.txid,
|
||||
"error": result.error,
|
||||
"error_code": result.error_code,
|
||||
"completed_at": result.completed_at,
|
||||
}));
|
||||
}
|
||||
|
||||
// Check if still pending
|
||||
if svc.relay_tracker.is_pending(request_id).await {
|
||||
return Ok(serde_json::json!({
|
||||
"status": "pending",
|
||||
"request_id": request_id,
|
||||
}));
|
||||
}
|
||||
|
||||
// Unknown — either expired or never existed
|
||||
Ok(serde_json::json!({
|
||||
"status": "unknown",
|
||||
"request_id": request_id,
|
||||
}))
|
||||
}
|
||||
|
||||
/// mesh.block-headers — Get cached block headers received from mesh peers.
|
||||
pub(super) async fn handle_mesh_block_headers(
|
||||
&self,
|
||||
@@ -529,8 +627,10 @@ impl RpcHandler {
|
||||
bolt11, amount_sats, request_id,
|
||||
)?;
|
||||
|
||||
// Send ONLY to Archipelago peers, not broadcast
|
||||
// Send to Archipelago peers — E2E encrypted per-peer
|
||||
let peers = svc.peers().await;
|
||||
let shared_state = svc.shared_state();
|
||||
let shared_secrets = shared_state.shared_secrets.read().await;
|
||||
let mut sent_count = 0u32;
|
||||
for peer in &peers {
|
||||
if !peer.advert_name.starts_with("Archy-") { continue; }
|
||||
@@ -539,11 +639,26 @@ impl RpcHandler {
|
||||
if pk_bytes.len() >= 6 {
|
||||
let mut prefix = [0u8; 6];
|
||||
prefix.copy_from_slice(&pk_bytes[..6]);
|
||||
|
||||
let payload = if let Some(secret) = shared_secrets.get(&peer.contact_id) {
|
||||
match crate::mesh::crypto::encrypt(secret, &wire) {
|
||||
Ok(ciphertext) => {
|
||||
let mut encrypted = Vec::with_capacity(1 + ciphertext.len());
|
||||
encrypted.push(crate::mesh::message_types::ENCRYPTED_TYPED_MARKER);
|
||||
encrypted.extend_from_slice(&ciphertext);
|
||||
encrypted
|
||||
}
|
||||
Err(_) => wire.clone(),
|
||||
}
|
||||
} else {
|
||||
wire.clone()
|
||||
};
|
||||
|
||||
let _ = svc.shared_state()
|
||||
.cmd_tx
|
||||
.send(crate::mesh::listener::MeshCommand::SendRaw {
|
||||
dest_pubkey_prefix: prefix,
|
||||
payload: wire.clone(),
|
||||
payload,
|
||||
})
|
||||
.await;
|
||||
sent_count += 1;
|
||||
@@ -551,8 +666,9 @@ impl RpcHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(shared_secrets);
|
||||
|
||||
info!(request_id, amount_sats, archy_peers = sent_count, "Lightning relay sent to Archy peers only");
|
||||
info!(request_id, amount_sats, archy_peers = sent_count, "Lightning relay sent (E2E encrypted)");
|
||||
Ok(serde_json::json!({
|
||||
"request_id": request_id,
|
||||
"queued": true,
|
||||
@@ -670,4 +786,80 @@ impl RpcHandler {
|
||||
"one_time_prekeys": bundle.one_time_prekeys.len(),
|
||||
}))
|
||||
}
|
||||
|
||||
// ─── Radio Diagnostics ─────────────────────────────────────────────
|
||||
|
||||
/// mesh.test-send — Send test payloads of various sizes to diagnose radio link.
|
||||
/// Sends plain text markers that the receiver can count.
|
||||
pub(super) async fn handle_mesh_test_send(
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
|
||||
let contact_id = params["contact_id"]
|
||||
.as_u64()
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32;
|
||||
|
||||
// Test modes: "ping" (small), "medium" (80 bytes), "large" (150 bytes), "chunked" (400 bytes)
|
||||
let mode = params["mode"].as_str().unwrap_or("ping");
|
||||
let count = params["count"].as_u64().unwrap_or(3) as usize;
|
||||
|
||||
let service = self.mesh_service.read().await;
|
||||
let svc = service.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
|
||||
|
||||
let mut sent = 0usize;
|
||||
let test_id = chrono::Utc::now().timestamp() as u32;
|
||||
|
||||
for i in 0..count {
|
||||
let payload = match mode {
|
||||
"ping" => format!("MESHTEST:{}:{}:PING", test_id, i),
|
||||
"medium" => format!("MESHTEST:{}:{}:{}", test_id, i, "X".repeat(60)),
|
||||
"large" => format!("MESHTEST:{}:{}:{}", test_id, i, "X".repeat(130)),
|
||||
"chunked" => {
|
||||
// Send a TypedEnvelope that requires chunking (>140 base64 chars)
|
||||
let fake_tx = "0".repeat(400); // simulates TX hex
|
||||
let wire = crate::mesh::bitcoin_relay::build_tx_relay_request(&fake_tx, test_id as u64 + i as u64)?;
|
||||
// Send via SendRaw which handles base64 + chunking
|
||||
let peers = svc.peers().await;
|
||||
if let Some(peer) = peers.iter().find(|p| p.contact_id == contact_id) {
|
||||
if let Some(ref pk) = peer.pubkey_hex {
|
||||
if let Ok(pk_bytes) = hex::decode(pk) {
|
||||
if pk_bytes.len() >= 6 {
|
||||
let mut prefix = [0u8; 6];
|
||||
prefix.copy_from_slice(&pk_bytes[..6]);
|
||||
let _ = svc.shared_state().cmd_tx.send(
|
||||
crate::mesh::listener::MeshCommand::SendRaw {
|
||||
dest_pubkey_prefix: prefix,
|
||||
payload: wire,
|
||||
},
|
||||
).await;
|
||||
sent += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Delay between chunked sends
|
||||
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
|
||||
continue;
|
||||
}
|
||||
_ => format!("MESHTEST:{}:{}:UNKNOWN", test_id, i),
|
||||
};
|
||||
|
||||
// Send as plain text for ping/medium/large
|
||||
let msg = svc.send_message(contact_id, &payload).await?;
|
||||
sent += 1;
|
||||
info!(test_id, seq = i, mode, len = payload.len(), "Test message sent");
|
||||
|
||||
// Small delay between sends
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
|
||||
}
|
||||
|
||||
Ok(serde_json::json!({
|
||||
"test_id": test_id,
|
||||
"mode": mode,
|
||||
"sent": sent,
|
||||
"count": count,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -655,11 +655,13 @@ impl RpcHandler {
|
||||
"mesh.rotate-prekeys" => self.handle_mesh_rotate_prekeys().await,
|
||||
// Phase 4: Off-grid Bitcoin operations
|
||||
"mesh.relay-tx" => self.handle_mesh_relay_tx(params).await,
|
||||
"mesh.relay-status" => self.handle_mesh_relay_status(params).await,
|
||||
"mesh.block-headers" => self.handle_mesh_block_headers(params).await,
|
||||
"mesh.relay-lightning" => self.handle_mesh_relay_lightning(params).await,
|
||||
"mesh.deadman-status" => self.handle_mesh_deadman_status().await,
|
||||
"mesh.deadman-configure" => self.handle_mesh_deadman_configure(params).await,
|
||||
"mesh.deadman-checkin" => self.handle_mesh_deadman_checkin().await,
|
||||
"mesh.test-send" => self.handle_mesh_test_send(params).await,
|
||||
|
||||
// Transport layer (unified routing)
|
||||
"transport.status" => self.handle_transport_status().await,
|
||||
|
||||
@@ -50,6 +50,13 @@ impl DockerPackageScanner {
|
||||
"immich_redis",
|
||||
"endurain-db",
|
||||
"nextcloud-db",
|
||||
"indeedhub-build_api_1",
|
||||
"indeedhub-build_postgres_1",
|
||||
"indeedhub-build_redis_1",
|
||||
"indeedhub-build_minio_1",
|
||||
"indeedhub-build_minio-init_1",
|
||||
"indeedhub-build_relay_1",
|
||||
"indeedhub-build_ffmpeg-worker_1",
|
||||
];
|
||||
|
||||
// First pass: collect UI containers
|
||||
@@ -95,7 +102,14 @@ impl DockerPackageScanner {
|
||||
debug!("Skipping backend service: {}", app_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
// Skip podman-compose infrastructure containers (e.g. indeedhub-build_api_1)
|
||||
// These have the project prefix pattern: {project}_{service}_{instance}
|
||||
if app_id.starts_with("indeedhub-build_") {
|
||||
debug!("Skipping IndeedHub compose service: {}", app_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip UI containers (they're merged with their parent apps)
|
||||
if app_id.ends_with("-ui") {
|
||||
debug!("Skipping UI container: {}", app_id);
|
||||
|
||||
@@ -119,11 +119,15 @@ pub async fn remove_pid_marker(data_dir: &Path) {
|
||||
/// Save a snapshot of currently running containers to disk.
|
||||
/// Called periodically so we know what to restart after a crash.
|
||||
pub async fn save_container_snapshot(data_dir: &Path) -> Result<()> {
|
||||
let output = tokio::process::Command::new("sudo")
|
||||
.args(["podman", "ps", "--format", "json"])
|
||||
.output()
|
||||
.await
|
||||
.context("Failed to run podman ps")?;
|
||||
let output = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(30),
|
||||
tokio::process::Command::new("sudo")
|
||||
.args(["podman", "ps", "--format", "json"])
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.context("podman ps timed out (30s)")?
|
||||
.context("Failed to run podman ps")?;
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
@@ -181,28 +185,40 @@ pub async fn recover_containers(containers: &[RunningContainerRecord]) -> Recove
|
||||
failed: Vec::new(),
|
||||
};
|
||||
|
||||
for record in containers {
|
||||
for (i, record) in containers.iter().enumerate() {
|
||||
info!("Recovering container: {} (image: {})", record.name, record.image);
|
||||
|
||||
let result = tokio::process::Command::new("sudo")
|
||||
.args(["podman", "start", &record.name])
|
||||
.output()
|
||||
.await;
|
||||
// Rate-limit container starts to avoid overwhelming podman on low-resource systems
|
||||
if i > 0 {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
|
||||
}
|
||||
|
||||
let result = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(30),
|
||||
tokio::process::Command::new("sudo")
|
||||
.args(["podman", "start", &record.name])
|
||||
.output(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(output) if output.status.success() => {
|
||||
Ok(Ok(output)) if output.status.success() => {
|
||||
info!("Successfully restarted container: {}", record.name);
|
||||
report.recovered += 1;
|
||||
}
|
||||
Ok(output) => {
|
||||
Ok(Ok(output)) => {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
warn!("Failed to restart container {}: {}", record.name, stderr.trim());
|
||||
report.failed.push(record.name.clone());
|
||||
}
|
||||
Err(e) => {
|
||||
Ok(Err(e)) => {
|
||||
warn!("Failed to execute podman start for {}: {}", record.name, e);
|
||||
report.failed.push(record.name.clone());
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("Timeout starting container {} (30s)", record.name);
|
||||
report.failed.push(record.name.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -226,10 +242,20 @@ fn is_process_running(pid: u32) -> bool {
|
||||
/// Runs on every startup to ensure containers come back after clean reboots.
|
||||
/// The crash recovery (PID-based) handles dirty shutdowns; this handles clean ones.
|
||||
pub async fn start_stopped_containers() -> RecoveryReport {
|
||||
let output = tokio::process::Command::new("sudo")
|
||||
.args(["podman", "ps", "-a", "--filter", "status=exited", "--filter", "status=created", "--format", "{{.Names}}"])
|
||||
.output()
|
||||
.await;
|
||||
let output = match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(30),
|
||||
tokio::process::Command::new("sudo")
|
||||
.args(["podman", "ps", "-a", "--filter", "status=exited", "--filter", "status=created", "--format", "{{.Names}}"])
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(_) => {
|
||||
warn!("Timeout listing stopped containers (30s)");
|
||||
return RecoveryReport { total: 0, recovered: 0, failed: Vec::new() };
|
||||
}
|
||||
};
|
||||
|
||||
let names: Vec<String> = match output {
|
||||
Ok(o) if o.status.success() => {
|
||||
@@ -256,10 +282,10 @@ pub async fn start_stopped_containers() -> RecoveryReport {
|
||||
/// Spawn a background task that periodically saves the container snapshot.
|
||||
pub fn spawn_snapshot_task(data_dir: PathBuf) {
|
||||
tokio::spawn(async move {
|
||||
// Wait 30s before first snapshot (let containers stabilize after startup)
|
||||
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
|
||||
// Wait 2 minutes before first snapshot (let crash recovery finish and containers stabilize)
|
||||
tokio::time::sleep(std::time::Duration::from_secs(120)).await;
|
||||
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(120));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if let Err(e) = save_container_snapshot(&data_dir).await {
|
||||
|
||||
@@ -182,12 +182,23 @@ impl MemoryTracker {
|
||||
|
||||
/// Query container memory stats from podman.
|
||||
async fn check_container_memory() -> HashMap<String, u64> {
|
||||
let output = match tokio::process::Command::new("sudo")
|
||||
.args(["podman", "stats", "--no-stream", "--format", "{{.Name}} {{.MemUsage}}"])
|
||||
.output()
|
||||
.await
|
||||
let output = match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(30),
|
||||
tokio::process::Command::new("sudo")
|
||||
.args(["podman", "stats", "--no-stream", "--format", "{{.Name}} {{.MemUsage}}"])
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(o) if o.status.success() => o,
|
||||
Ok(Ok(o)) if o.status.success() => o,
|
||||
Ok(Err(e)) => {
|
||||
debug!("podman stats failed: {}", e);
|
||||
return HashMap::new();
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("podman stats timed out (30s)");
|
||||
return HashMap::new();
|
||||
}
|
||||
_ => return HashMap::new(),
|
||||
};
|
||||
|
||||
@@ -230,12 +241,23 @@ fn parse_memory_string(s: &str) -> Option<u64> {
|
||||
|
||||
/// Query all containers and their health status.
|
||||
async fn check_containers() -> Vec<ContainerHealth> {
|
||||
let output = match tokio::process::Command::new("sudo")
|
||||
.args(["podman", "ps", "-a", "--format", "json"])
|
||||
.output()
|
||||
.await
|
||||
let output = match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(30),
|
||||
tokio::process::Command::new("sudo")
|
||||
.args(["podman", "ps", "-a", "--format", "json"])
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(o) if o.status.success() => o,
|
||||
Ok(Ok(o)) if o.status.success() => o,
|
||||
Ok(Err(e)) => {
|
||||
debug!("podman ps failed: {}", e);
|
||||
return Vec::new();
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("podman ps timed out (30s)");
|
||||
return Vec::new();
|
||||
}
|
||||
_ => return Vec::new(),
|
||||
};
|
||||
|
||||
@@ -243,7 +265,7 @@ async fn check_containers() -> Vec<ContainerHealth> {
|
||||
let containers: Vec<serde_json::Value> =
|
||||
serde_json::from_str(&stdout).unwrap_or_default();
|
||||
|
||||
// Backend services to skip
|
||||
// Backend services and one-shot init containers to skip
|
||||
let skip = [
|
||||
"btcpay-db", "nbxplorer", "mempool-db", "mempool-api",
|
||||
"penpot-postgres", "penpot-backend", "penpot-exporter", "penpot-valkey",
|
||||
@@ -271,6 +293,11 @@ async fn check_containers() -> Vec<ContainerHealth> {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Skip podman-compose infrastructure and one-shot init containers
|
||||
if name.starts_with("indeedhub-build_") || name.contains("-init") {
|
||||
return None;
|
||||
}
|
||||
|
||||
let state = c.get("State")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("unknown")
|
||||
@@ -291,25 +318,32 @@ async fn check_containers() -> Vec<ContainerHealth> {
|
||||
/// Try to restart a container.
|
||||
async fn restart_container(name: &str) -> bool {
|
||||
info!("Auto-restarting unhealthy container: {}", name);
|
||||
let result = tokio::process::Command::new("sudo")
|
||||
.args(["podman", "start", name])
|
||||
.output()
|
||||
.await;
|
||||
let result = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(30),
|
||||
tokio::process::Command::new("sudo")
|
||||
.args(["podman", "start", name])
|
||||
.output(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(output) if output.status.success() => {
|
||||
Ok(Ok(output)) if output.status.success() => {
|
||||
info!("Successfully restarted container: {}", name);
|
||||
true
|
||||
}
|
||||
Ok(output) => {
|
||||
Ok(Ok(output)) => {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
warn!("Failed to restart container {}: {}", name, stderr.trim());
|
||||
false
|
||||
}
|
||||
Err(e) => {
|
||||
Ok(Err(e)) => {
|
||||
warn!("Failed to execute podman start for {}: {}", name, e);
|
||||
false
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("Timeout starting container {} (30s)", name);
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -93,6 +93,8 @@ pub struct RelayTracker {
|
||||
tx_requests: RwLock<HashMap<u64, PendingRelay>>,
|
||||
/// Pending Lightning relay requests.
|
||||
lightning_requests: RwLock<HashMap<u64, PendingRelay>>,
|
||||
/// Completed relay results (kept for 5 minutes for frontend polling).
|
||||
completed_results: RwLock<Vec<RelayResult>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -101,11 +103,22 @@ struct PendingRelay {
|
||||
created_at: String,
|
||||
}
|
||||
|
||||
/// Result of a completed relay attempt, stored for frontend polling.
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct RelayResult {
|
||||
pub request_id: u64,
|
||||
pub txid: Option<String>,
|
||||
pub error: Option<String>,
|
||||
pub error_code: Option<String>,
|
||||
pub completed_at: String,
|
||||
}
|
||||
|
||||
impl RelayTracker {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
tx_requests: RwLock::new(HashMap::new()),
|
||||
lightning_requests: RwLock::new(HashMap::new()),
|
||||
completed_results: RwLock::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -155,6 +168,31 @@ impl RelayTracker {
|
||||
let ln = self.lightning_requests.read().await.len();
|
||||
(tx, ln)
|
||||
}
|
||||
|
||||
/// Store a completed relay result for frontend polling.
|
||||
pub async fn store_result(&self, result: RelayResult) {
|
||||
let mut results = self.completed_results.write().await;
|
||||
// Evict results older than 5 minutes
|
||||
let cutoff = chrono::Utc::now() - chrono::Duration::minutes(5);
|
||||
let cutoff_str = cutoff.to_rfc3339();
|
||||
results.retain(|r| r.completed_at > cutoff_str);
|
||||
results.push(result);
|
||||
}
|
||||
|
||||
/// Get relay result by request_id (returns None if not yet completed or expired).
|
||||
pub async fn get_result(&self, request_id: u64) -> Option<RelayResult> {
|
||||
self.completed_results
|
||||
.read()
|
||||
.await
|
||||
.iter()
|
||||
.find(|r| r.request_id == request_id)
|
||||
.cloned()
|
||||
}
|
||||
|
||||
/// Check if a TX relay request is still pending.
|
||||
pub async fn is_pending(&self, request_id: u64) -> bool {
|
||||
self.tx_requests.read().await.contains_key(&request_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RelayTracker {
|
||||
@@ -220,11 +258,13 @@ pub fn build_tx_relay_response(
|
||||
request_id: u64,
|
||||
txid: Option<&str>,
|
||||
error: Option<&str>,
|
||||
error_code: Option<&str>,
|
||||
) -> Result<Vec<u8>> {
|
||||
let payload = message_types::encode_payload(&TxRelayResponsePayload {
|
||||
request_id,
|
||||
txid: txid.map(|s| s.to_string()),
|
||||
error: error.map(|s| s.to_string()),
|
||||
error_code: error_code.map(|s| s.to_string()),
|
||||
})?;
|
||||
let envelope = TypedEnvelope::new(MeshMessageType::TxRelayResponse, payload);
|
||||
envelope.to_wire()
|
||||
|
||||
@@ -55,10 +55,30 @@ pub struct MeshState {
|
||||
pub event_tx: broadcast::Sender<MeshEvent>,
|
||||
pub cmd_tx: mpsc::Sender<MeshCommand>,
|
||||
next_message_id: RwLock<u64>,
|
||||
/// Block header cache — populated when receiving headers from internet-connected peers.
|
||||
pub block_header_cache: Arc<super::bitcoin_relay::BlockHeaderCache>,
|
||||
/// Relay tracker — stores completed relay results for frontend polling.
|
||||
pub relay_tracker: Option<Arc<super::bitcoin_relay::RelayTracker>>,
|
||||
/// Steganography mode for outgoing/incoming messages.
|
||||
pub stego_mode: super::steganography::SteganographyMode,
|
||||
/// Chunk reassembly buffer for multi-frame messages.
|
||||
chunk_buffer: RwLock<HashMap<(u32, u8), ChunkAssembly>>,
|
||||
}
|
||||
|
||||
/// In-progress chunk reassembly for a multi-frame message.
|
||||
struct ChunkAssembly {
|
||||
chunks: HashMap<u8, String>,
|
||||
total: u8,
|
||||
created: std::time::Instant,
|
||||
}
|
||||
|
||||
impl MeshState {
|
||||
pub fn new(channel_name: &str) -> (Arc<Self>, broadcast::Receiver<MeshEvent>, mpsc::Receiver<MeshCommand>) {
|
||||
pub fn new(
|
||||
channel_name: &str,
|
||||
block_header_cache: Arc<super::bitcoin_relay::BlockHeaderCache>,
|
||||
relay_tracker: Option<Arc<super::bitcoin_relay::RelayTracker>>,
|
||||
stego_mode: super::steganography::SteganographyMode,
|
||||
) -> (Arc<Self>, broadcast::Receiver<MeshEvent>, mpsc::Receiver<MeshCommand>) {
|
||||
let (tx, rx) = broadcast::channel(64);
|
||||
let (cmd_tx, cmd_rx) = mpsc::channel(32);
|
||||
let state = Arc::new(Self {
|
||||
@@ -81,6 +101,10 @@ impl MeshState {
|
||||
}),
|
||||
event_tx: tx,
|
||||
next_message_id: RwLock::new(1),
|
||||
block_header_cache,
|
||||
relay_tracker,
|
||||
stego_mode,
|
||||
chunk_buffer: RwLock::new(HashMap::new()),
|
||||
});
|
||||
(state, rx, cmd_rx)
|
||||
}
|
||||
@@ -305,15 +329,60 @@ async fn run_mesh_session(
|
||||
}
|
||||
}
|
||||
MeshCommand::SendRaw { dest_pubkey_prefix, payload } => {
|
||||
// Base64 encode binary payloads — Meshcore truncates at NUL bytes in text mode
|
||||
use base64::Engine;
|
||||
let encoded = base64::engine::general_purpose::STANDARD.encode(&payload);
|
||||
if let Err(e) = device.send_text(&dest_pubkey_prefix, encoded.as_bytes()).await {
|
||||
consecutive_write_failures += 1;
|
||||
warn!(failures = consecutive_write_failures, "Failed to send raw via mesh: {}", e);
|
||||
// Apply steganographic encoding if configured
|
||||
let wire_payload = if state.stego_mode != super::steganography::SteganographyMode::Normal
|
||||
&& payload.first() == Some(&super::message_types::TYPED_MESSAGE_MARKER)
|
||||
{
|
||||
match super::steganography::encode_typed_wire(state.stego_mode, &payload) {
|
||||
Ok(stego) => stego,
|
||||
Err(e) => {
|
||||
warn!("Stego encode failed, sending plain: {}", e);
|
||||
payload
|
||||
}
|
||||
}
|
||||
} else {
|
||||
payload
|
||||
};
|
||||
// Base64 encode, then chunk if >140 chars (LoRa 160 byte limit)
|
||||
use base64::Engine;
|
||||
let encoded = base64::engine::general_purpose::STANDARD.encode(&wire_payload);
|
||||
|
||||
if encoded.len() <= 140 {
|
||||
// Single frame — fits in one LoRa packet
|
||||
if let Err(e) = device.send_text(&dest_pubkey_prefix, encoded.as_bytes()).await {
|
||||
consecutive_write_failures += 1;
|
||||
warn!(failures = consecutive_write_failures, "Failed to send raw via mesh: {}", e);
|
||||
} else {
|
||||
consecutive_write_failures = 0;
|
||||
info!(dest = %hex::encode(dest_pubkey_prefix), len = encoded.len(), "Sent raw mesh message");
|
||||
}
|
||||
} else {
|
||||
// Multi-frame chunking: "MCxxyyzz..." where xx=msg_id, yy=chunk_idx, zz=total_chunks
|
||||
static CHUNK_MSG_ID: std::sync::atomic::AtomicU8 = std::sync::atomic::AtomicU8::new(0);
|
||||
let msg_id = CHUNK_MSG_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let chunk_data_size = 132; // 160 - 8 header bytes ("MCxxyyzz") = 152, leave margin
|
||||
let chunks: Vec<&str> = encoded.as_bytes().chunks(chunk_data_size)
|
||||
.map(|c| std::str::from_utf8(c).unwrap_or(""))
|
||||
.collect();
|
||||
let total = chunks.len() as u8;
|
||||
info!(
|
||||
dest = %hex::encode(dest_pubkey_prefix),
|
||||
raw_len = wire_payload.len(),
|
||||
b64_len = encoded.len(),
|
||||
chunks = total,
|
||||
"Sending chunked mesh message"
|
||||
);
|
||||
for (idx, chunk) in chunks.iter().enumerate() {
|
||||
let frame = format!("MC{:02x}{:02x}{:02x}{}", msg_id, idx as u8, total, chunk);
|
||||
if let Err(e) = device.send_text(&dest_pubkey_prefix, frame.as_bytes()).await {
|
||||
consecutive_write_failures += 1;
|
||||
warn!(failures = consecutive_write_failures, chunk = idx, "Chunk send failed: {}", e);
|
||||
break;
|
||||
}
|
||||
// Small delay between chunks to avoid overwhelming the radio
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
consecutive_write_failures = 0;
|
||||
info!(dest = %hex::encode(dest_pubkey_prefix), raw_len = payload.len(), wire_len = encoded.len(), "Sent raw mesh message (base64)");
|
||||
}
|
||||
}
|
||||
MeshCommand::BroadcastChannel { channel, payload } => {
|
||||
@@ -390,7 +459,11 @@ async fn handle_frame(
|
||||
handle_typed_message(&payload, contact_id, &name, state).await;
|
||||
} else if let Some(decoded) = try_base64_typed(&payload) {
|
||||
handle_typed_message(&decoded, contact_id, &name, state).await;
|
||||
} else {
|
||||
} else if let Some(decoded) = try_decrypt_base64(&payload, contact_id, state).await {
|
||||
handle_typed_message(&decoded, contact_id, &name, state).await;
|
||||
} else if let Some(decoded) = try_chunk_reassemble(&payload, contact_id, state).await {
|
||||
handle_typed_message(&decoded, contact_id, &name, state).await;
|
||||
} else if !payload.starts_with(b"MC") {
|
||||
let text = String::from_utf8_lossy(&payload).to_string();
|
||||
store_plain_message(state, contact_id, &name, &text).await;
|
||||
info!(from = %sender_prefix, "Received mesh DM (v3)");
|
||||
@@ -411,7 +484,11 @@ async fn handle_frame(
|
||||
handle_typed_message(&payload, contact_id, &name, state).await;
|
||||
} else if let Some(decoded) = try_base64_typed(&payload) {
|
||||
handle_typed_message(&decoded, contact_id, &name, state).await;
|
||||
} else {
|
||||
} else if let Some(decoded) = try_decrypt_base64(&payload, contact_id, state).await {
|
||||
handle_typed_message(&decoded, contact_id, &name, state).await;
|
||||
} else if let Some(decoded) = try_chunk_reassemble(&payload, contact_id, state).await {
|
||||
handle_typed_message(&decoded, contact_id, &name, state).await;
|
||||
} else if !payload.starts_with(b"MC") {
|
||||
let text = String::from_utf8_lossy(&payload).to_string();
|
||||
store_plain_message(state, contact_id, &name, &text).await;
|
||||
info!(from = %sender_prefix, "Received mesh DM (v1)");
|
||||
@@ -679,22 +756,169 @@ async fn refresh_contacts(
|
||||
// ─── Typed Message Dispatch ────────────────────────────────────────────
|
||||
|
||||
/// Try to base64-decode payload and check if the result is a typed envelope.
|
||||
/// Handles: plain typed (0x02), steganographic (0xAA), and encrypted (0xEE).
|
||||
/// Returns the decoded bytes if it's a valid base64-encoded TypedEnvelope.
|
||||
fn try_base64_typed(payload: &[u8]) -> Option<Vec<u8>> {
|
||||
use base64::Engine;
|
||||
// Quick check: base64 starts with uppercase letters or digits, not 0x02
|
||||
if payload.is_empty() || payload[0] == message_types::TYPED_MESSAGE_MARKER {
|
||||
return None;
|
||||
}
|
||||
let text = std::str::from_utf8(payload).ok()?;
|
||||
let decoded = base64::engine::general_purpose::STANDARD.decode(text.trim()).ok()?;
|
||||
if TypedEnvelope::is_typed(&decoded) {
|
||||
Some(decoded)
|
||||
unwrap_wire_layers(&decoded)
|
||||
}
|
||||
|
||||
/// Try to base64-decode and decrypt an encrypted typed message.
|
||||
/// Handles the common case where encrypted messages arrive as base64 text.
|
||||
async fn try_decrypt_base64(
|
||||
payload: &[u8],
|
||||
sender_contact_id: u32,
|
||||
state: &Arc<MeshState>,
|
||||
) -> Option<Vec<u8>> {
|
||||
use base64::Engine;
|
||||
let text = std::str::from_utf8(payload).ok()?;
|
||||
let decoded = base64::engine::general_purpose::STANDARD.decode(text.trim()).ok()?;
|
||||
if decoded.first() != Some(&message_types::ENCRYPTED_TYPED_MARKER) {
|
||||
return None;
|
||||
}
|
||||
let secrets = state.shared_secrets.read().await;
|
||||
try_decrypt_typed(&decoded, sender_contact_id, &secrets)
|
||||
}
|
||||
|
||||
/// Unwrap wire layers: encrypted (0xEE) → stego (0xAA) → typed (0x02).
|
||||
/// Returns None if decoding fails at any layer (caller should use shared_secrets variant).
|
||||
fn unwrap_wire_layers(decoded: &[u8]) -> Option<Vec<u8>> {
|
||||
// Check for steganographic frame (0xAA prefix) — unwrap to typed envelope
|
||||
if decoded.first() == Some(&super::steganography::STEGO_MARKER) {
|
||||
match super::steganography::decode_typed_wire(decoded) {
|
||||
Ok(typed_wire) => return Some(typed_wire),
|
||||
Err(_) => return None,
|
||||
}
|
||||
}
|
||||
if TypedEnvelope::is_typed(decoded) {
|
||||
Some(decoded.to_vec())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to decrypt an encrypted typed message (0xEE prefix) using known shared secrets.
|
||||
/// Format: [0xEE] [nonce: 12] [ciphertext + tag: 16]
|
||||
fn try_decrypt_typed(
|
||||
decoded: &[u8],
|
||||
sender_contact_id: u32,
|
||||
shared_secrets: &HashMap<u32, [u8; 32]>,
|
||||
) -> Option<Vec<u8>> {
|
||||
if decoded.first() != Some(&message_types::ENCRYPTED_TYPED_MARKER) {
|
||||
return None;
|
||||
}
|
||||
let ciphertext = &decoded[1..]; // skip 0xEE marker
|
||||
|
||||
// Try sender's shared secret first (most likely)
|
||||
if let Some(secret) = shared_secrets.get(&sender_contact_id) {
|
||||
if let Ok(plaintext) = crypto::decrypt(secret, ciphertext) {
|
||||
return unwrap_wire_layers(&plaintext);
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: try all known shared secrets (in case contact_id mapping is stale)
|
||||
for (cid, secret) in shared_secrets {
|
||||
if *cid == sender_contact_id { continue; } // already tried
|
||||
if let Ok(plaintext) = crypto::decrypt(secret, ciphertext) {
|
||||
return unwrap_wire_layers(&plaintext);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Check if payload is a mesh chunk ("MC" prefix) and try to reassemble.
|
||||
/// Format: MC{msg_id:2hex}{chunk_idx:2hex}{total:2hex}{base64_data}
|
||||
/// Returns Some(decoded_bytes) when all chunks have arrived.
|
||||
async fn try_chunk_reassemble(
|
||||
payload: &[u8],
|
||||
sender_contact_id: u32,
|
||||
state: &Arc<MeshState>,
|
||||
) -> Option<Vec<u8>> {
|
||||
use base64::Engine;
|
||||
let text = std::str::from_utf8(payload).ok()?;
|
||||
if !text.starts_with("MC") || text.len() < 8 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let msg_id = u8::from_str_radix(&text[2..4], 16).ok()?;
|
||||
let chunk_idx = u8::from_str_radix(&text[4..6], 16).ok()?;
|
||||
let total = u8::from_str_radix(&text[6..8], 16).ok()?;
|
||||
let chunk_data = &text[8..];
|
||||
|
||||
if total == 0 || total > 20 {
|
||||
return None; // sanity check
|
||||
}
|
||||
|
||||
let key = (sender_contact_id, msg_id);
|
||||
let mut buffer = state.chunk_buffer.write().await;
|
||||
|
||||
// Clean up stale entries (>120s old)
|
||||
buffer.retain(|_, v| v.created.elapsed().as_secs() < 120);
|
||||
|
||||
let assembly = buffer.entry(key).or_insert_with(|| ChunkAssembly {
|
||||
chunks: HashMap::new(),
|
||||
total,
|
||||
created: std::time::Instant::now(),
|
||||
});
|
||||
|
||||
assembly.chunks.insert(chunk_idx, chunk_data.to_string());
|
||||
assembly.total = total; // update in case first chunk had it wrong
|
||||
|
||||
debug!(msg_id, chunk_idx, total, received = assembly.chunks.len(), "Chunk received");
|
||||
|
||||
// Check if we have all chunks
|
||||
if assembly.chunks.len() < total as usize {
|
||||
return None;
|
||||
}
|
||||
|
||||
// All chunks received — reassemble in order
|
||||
let mut combined = String::new();
|
||||
for i in 0..total {
|
||||
match assembly.chunks.get(&i) {
|
||||
Some(data) => combined.push_str(data),
|
||||
None => {
|
||||
warn!(msg_id, missing = i, "Chunk missing during reassembly");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(&combined) {
|
||||
// Check for encrypted frame (0xEE) — decrypt then unwrap
|
||||
if decoded.first() == Some(&message_types::ENCRYPTED_TYPED_MARKER) {
|
||||
let secrets = state.shared_secrets.read().await;
|
||||
if let Some(typed_wire) = try_decrypt_typed(&decoded, sender_contact_id, &secrets) {
|
||||
info!(msg_id, chunks = total, total_len = typed_wire.len(), "Reassembled encrypted chunked message");
|
||||
buffer.remove(&key);
|
||||
return Some(typed_wire);
|
||||
}
|
||||
}
|
||||
// Check for stego frame — unwrap to typed envelope
|
||||
if decoded.first() == Some(&super::steganography::STEGO_MARKER) {
|
||||
if let Ok(typed_wire) = super::steganography::decode_typed_wire(&decoded) {
|
||||
info!(msg_id, chunks = total, total_len = typed_wire.len(), "Reassembled stego chunked message");
|
||||
buffer.remove(&key);
|
||||
return Some(typed_wire);
|
||||
}
|
||||
}
|
||||
if TypedEnvelope::is_typed(&decoded) {
|
||||
info!(msg_id, chunks = total, total_len = decoded.len(), "Reassembled chunked message");
|
||||
buffer.remove(&key);
|
||||
return Some(decoded);
|
||||
}
|
||||
}
|
||||
|
||||
warn!(msg_id, "All chunks received but decode failed");
|
||||
buffer.remove(&key);
|
||||
None
|
||||
}
|
||||
|
||||
/// Look up a peer by pubkey hex prefix. Returns (contact_id, display_name).
|
||||
async fn resolve_peer(state: &Arc<MeshState>, sender_prefix: &str) -> (u32, String) {
|
||||
let peers = state.peers.read().await;
|
||||
@@ -765,12 +989,23 @@ async fn handle_typed_message(
|
||||
Some(MeshMessageType::BlockHeader) => {
|
||||
// Compact binary format: height(8) + hash(32) + timestamp(4)
|
||||
match super::bitcoin_relay::decode_compact_block_header(&envelope.v) {
|
||||
Ok((height, hash_hex, _timestamp)) => {
|
||||
Ok((height, hash_hex, timestamp)) => {
|
||||
info!(
|
||||
height,
|
||||
hash = %hash_hex,
|
||||
"Block header received via mesh"
|
||||
);
|
||||
|
||||
// Store in block header cache for the Off-Grid Bitcoin panel
|
||||
let header_payload = message_types::BlockHeaderPayload {
|
||||
height,
|
||||
hash: hash_hex.clone(),
|
||||
prev_hash: String::new(),
|
||||
timestamp,
|
||||
announced_by: sender_name.to_string(),
|
||||
};
|
||||
let _ = state.block_header_cache.store_header(header_payload).await;
|
||||
|
||||
let text = format!(
|
||||
"Block #{} — {}...{}",
|
||||
height,
|
||||
@@ -859,14 +1094,27 @@ async fn handle_typed_message(
|
||||
info!(
|
||||
request_id = resp.request_id,
|
||||
status,
|
||||
error_code = resp.error_code.as_deref().unwrap_or("none"),
|
||||
"TX relay response received"
|
||||
);
|
||||
let text = if let Some(ref txid) = resp.txid {
|
||||
format!("TX relayed! txid: {}...{}", &txid[..8.min(txid.len())], &txid[txid.len().saturating_sub(8)..])
|
||||
} else if let Some(ref code) = resp.error_code {
|
||||
format!("TX relay failed [{}]: {}", code, resp.error.as_deref().unwrap_or("unknown"))
|
||||
} else {
|
||||
format!("TX relay failed: {}", resp.error.as_deref().unwrap_or("unknown"))
|
||||
};
|
||||
store_typed_message(state, sender_contact_id, sender_name, &text, "tx_relay_response").await;
|
||||
// Store result for frontend polling
|
||||
if let Some(ref tracker) = state.relay_tracker {
|
||||
tracker.store_result(super::bitcoin_relay::RelayResult {
|
||||
request_id: resp.request_id,
|
||||
txid: resp.txid.clone(),
|
||||
error: resp.error.clone(),
|
||||
error_code: resp.error_code.clone(),
|
||||
completed_at: chrono::Utc::now().to_rfc3339(),
|
||||
}).await;
|
||||
}
|
||||
let _ = state.event_tx.send(MeshEvent::TxRelayCompleted {
|
||||
request_id: resp.request_id,
|
||||
txid: resp.txid,
|
||||
@@ -973,6 +1221,16 @@ async fn handle_typed_message(
|
||||
"TX confirmation update received"
|
||||
);
|
||||
store_typed_message(state, sender_contact_id, sender_name, &status_text, "tx_confirmation").await;
|
||||
// Store confirmation for frontend polling
|
||||
if let Some(ref tracker) = state.relay_tracker {
|
||||
tracker.store_result(super::bitcoin_relay::RelayResult {
|
||||
request_id: conf.request_id,
|
||||
txid: Some(conf.txid.clone()),
|
||||
error: None,
|
||||
error_code: None,
|
||||
completed_at: chrono::Utc::now().to_rfc3339(),
|
||||
}).await;
|
||||
}
|
||||
let _ = state.event_tx.send(MeshEvent::TxRelayCompleted {
|
||||
request_id: conf.request_id,
|
||||
txid: Some(conf.txid),
|
||||
@@ -1043,6 +1301,59 @@ async fn handle_tx_relay_broadcast(
|
||||
}
|
||||
};
|
||||
|
||||
// Pre-flight: check if Bitcoin Core is reachable and synced
|
||||
let preflight_body = serde_json::json!({
|
||||
"jsonrpc": "1.0",
|
||||
"id": "preflight",
|
||||
"method": "getblockchaininfo",
|
||||
"params": []
|
||||
});
|
||||
|
||||
match client
|
||||
.post("http://127.0.0.1:8332/")
|
||||
.basic_auth("archipelago", Some("archipelago123"))
|
||||
.json(&preflight_body)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(resp) => {
|
||||
if let Ok(rpc_resp) = resp.json::<serde_json::Value>().await {
|
||||
if let Some(result) = rpc_resp.get("result") {
|
||||
let ibd = result.get("initialblockdownload")
|
||||
.and_then(|v| v.as_bool())
|
||||
.unwrap_or(false);
|
||||
let progress = result.get("verificationprogress")
|
||||
.and_then(|v| v.as_f64())
|
||||
.unwrap_or(0.0);
|
||||
if ibd || progress < 0.999 {
|
||||
let pct = (progress * 100.0) as u32;
|
||||
let msg = format!("Bitcoin node is syncing ({}%) — cannot broadcast yet", pct);
|
||||
warn!(request_id = relay.request_id, "{}", msg);
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some(&msg), Some("bitcoin_syncing")).await;
|
||||
return;
|
||||
}
|
||||
} else if let Some(err) = rpc_resp.get("error").and_then(|e| e.as_object()) {
|
||||
let msg = err.get("message").and_then(|m| m.as_str()).unwrap_or("RPC error");
|
||||
warn!(request_id = relay.request_id, "Bitcoin pre-flight failed: {}", msg);
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some(&format!("Bitcoin node error: {}", msg)), Some("bitcoin_unreachable")).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let msg = format!("Bitcoin node unreachable — {}", if e.is_connect() {
|
||||
"connection refused (node may be stopped)"
|
||||
} else if e.is_timeout() {
|
||||
"connection timed out"
|
||||
} else {
|
||||
"network error"
|
||||
});
|
||||
warn!(request_id = relay.request_id, "Pre-flight: {}: {}", msg, e);
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some(&msg), Some("bitcoin_unreachable")).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 1: Broadcast via Bitcoin Core RPC sendrawtransaction
|
||||
let body = serde_json::json!({
|
||||
"jsonrpc": "1.0",
|
||||
@@ -1062,36 +1373,50 @@ async fn handle_tx_relay_broadcast(
|
||||
match resp.json::<serde_json::Value>().await {
|
||||
Ok(rpc_resp) => {
|
||||
if let Some(err) = rpc_resp.get("error").and_then(|e| e.as_object()) {
|
||||
let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
|
||||
let msg = err.get("message").and_then(|m| m.as_str()).unwrap_or("unknown");
|
||||
warn!(request_id = relay.request_id, "sendrawtransaction failed: {}", msg);
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some(msg)).await;
|
||||
let user_msg = match code {
|
||||
-25 => format!("TX already in mempool or confirmed: {}", msg),
|
||||
-26 => format!("TX rejected by mempool policy: {}", msg),
|
||||
-27 => format!("TX already confirmed in a block"),
|
||||
_ => format!("Bitcoin rejected TX (code {}): {}", code, msg),
|
||||
};
|
||||
warn!(request_id = relay.request_id, rpc_code = code, "sendrawtransaction: {}", msg);
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some(&user_msg), Some(&format!("tx_rejected:{}", code))).await;
|
||||
return;
|
||||
}
|
||||
rpc_resp.get("result").and_then(|r| r.as_str()).map(|s| s.to_string())
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to parse Bitcoin RPC response: {}", e);
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some("RPC parse error")).await;
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some("Failed to parse Bitcoin node response"), Some("rpc_parse_error")).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let msg = format!("Bitcoin node unreachable during broadcast — {}", if e.is_connect() {
|
||||
"connection refused"
|
||||
} else if e.is_timeout() {
|
||||
"timed out"
|
||||
} else {
|
||||
"network error"
|
||||
});
|
||||
warn!("Bitcoin Core RPC unreachable: {}", e);
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some("No Bitcoin node available")).await;
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some(&msg), Some("bitcoin_unreachable")).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(txid) = txid else {
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some("No txid returned")).await;
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some("Bitcoin node returned no transaction ID"), Some("rpc_parse_error")).await;
|
||||
return;
|
||||
};
|
||||
|
||||
info!(request_id = relay.request_id, txid = %txid, "TX broadcast successful — tracking confirmations");
|
||||
|
||||
// Step 2: Send TxRelayResponse with txid back to originator
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, Some(&txid), None).await;
|
||||
send_tx_relay_response(state, sender_contact_id, relay.request_id, Some(&txid), None, None).await;
|
||||
|
||||
// Step 3: Monitor confirmations (poll every 30s, up to 3 hours)
|
||||
let mut last_reported_confs: u32 = 0;
|
||||
@@ -1124,8 +1449,9 @@ async fn send_tx_relay_response(
|
||||
request_id: u64,
|
||||
txid: Option<&str>,
|
||||
error: Option<&str>,
|
||||
error_code: Option<&str>,
|
||||
) {
|
||||
let wire = match super::bitcoin_relay::build_tx_relay_response(request_id, txid, error) {
|
||||
let wire = match super::bitcoin_relay::build_tx_relay_response(request_id, txid, error, error_code) {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
warn!("Failed to build TX relay response: {}", e);
|
||||
|
||||
@@ -14,6 +14,10 @@ use serde::{Deserialize, Serialize};
|
||||
/// Wire prefix for typed messages.
|
||||
pub const TYPED_MESSAGE_MARKER: u8 = 0x02;
|
||||
|
||||
/// Wire prefix for encrypted typed messages (E2E encrypted with shared secret).
|
||||
/// Format: [0xEE] [nonce: 12 bytes] [ciphertext + auth tag]
|
||||
pub const ENCRYPTED_TYPED_MARKER: u8 = 0xEE;
|
||||
|
||||
/// Message type discriminator.
|
||||
#[repr(u8)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -275,6 +279,9 @@ pub struct TxRelayResponsePayload {
|
||||
pub txid: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<String>,
|
||||
/// Machine-readable error code: bitcoin_unreachable, bitcoin_syncing, tx_rejected, rpc_parse_error
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub error_code: Option<String>,
|
||||
}
|
||||
|
||||
/// Lightning invoice relay request.
|
||||
|
||||
@@ -27,6 +27,8 @@ pub mod ratchet;
|
||||
#[allow(dead_code)]
|
||||
pub mod session;
|
||||
#[allow(dead_code)]
|
||||
pub mod steganography;
|
||||
#[allow(dead_code)]
|
||||
pub mod x3dh;
|
||||
|
||||
pub use types::*;
|
||||
@@ -68,6 +70,9 @@ pub struct MeshConfig {
|
||||
/// Announce new Bitcoin block headers over mesh (internet-connected nodes only).
|
||||
#[serde(default)]
|
||||
pub announce_block_headers: bool,
|
||||
/// Steganographic encoding mode for mesh messages (Normal = disabled).
|
||||
#[serde(default)]
|
||||
pub steganography_mode: steganography::SteganographyMode,
|
||||
}
|
||||
|
||||
impl Default for MeshConfig {
|
||||
@@ -80,6 +85,7 @@ impl Default for MeshConfig {
|
||||
advert_name: None,
|
||||
mesh_only_mode: None,
|
||||
announce_block_headers: false,
|
||||
steganography_mode: steganography::SteganographyMode::Normal,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -154,7 +160,14 @@ impl MeshService {
|
||||
.clone()
|
||||
.unwrap_or_else(|| "archipelago".to_string());
|
||||
|
||||
let (state, _rx, cmd_rx) = MeshState::new(&channel_name);
|
||||
let block_header_cache = Arc::new(BlockHeaderCache::new());
|
||||
let relay_tracker = Arc::new(RelayTracker::new());
|
||||
let (state, _rx, cmd_rx) = MeshState::new(
|
||||
&channel_name,
|
||||
Arc::clone(&block_header_cache),
|
||||
Some(Arc::clone(&relay_tracker)),
|
||||
config.steganography_mode,
|
||||
);
|
||||
|
||||
// Derive X25519 keys from Ed25519 identity
|
||||
let x25519_secret = crypto::ed25519_secret_to_x25519(signing_key);
|
||||
@@ -162,9 +175,6 @@ impl MeshService {
|
||||
&signing_key.verifying_key().to_bytes(),
|
||||
)?;
|
||||
let x25519_pubkey_hex = hex::encode(x25519_pubkey);
|
||||
|
||||
let block_header_cache = Arc::new(BlockHeaderCache::new());
|
||||
let relay_tracker = Arc::new(RelayTracker::new());
|
||||
let dead_man_switch = Arc::new(
|
||||
DeadManSwitch::new(data_dir)
|
||||
.await
|
||||
@@ -670,6 +680,7 @@ mod tests {
|
||||
channel_name: Some("test".to_string()),
|
||||
broadcast_identity: false,
|
||||
advert_name: Some("MyNode".to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
let json = serde_json::to_string(&config).unwrap();
|
||||
let parsed: MeshConfig = serde_json::from_str(&json).unwrap();
|
||||
@@ -694,6 +705,7 @@ mod tests {
|
||||
channel_name: Some("archy".to_string()),
|
||||
broadcast_identity: true,
|
||||
advert_name: None,
|
||||
..Default::default()
|
||||
};
|
||||
save_config(dir.path(), &config).await.unwrap();
|
||||
let loaded = load_config(dir.path()).await.unwrap();
|
||||
|
||||
@@ -440,7 +440,7 @@ mod tests {
|
||||
prev_chain_n: 0,
|
||||
message_n: 0,
|
||||
},
|
||||
ciphertext: vec![0x01, 0x02, 0x03; 30].into_iter().flatten().collect(),
|
||||
ciphertext: vec![[0x01, 0x02, 0x03]; 30].into_iter().flatten().collect(),
|
||||
};
|
||||
let bytes = msg.to_bytes();
|
||||
let parsed = RatchetMessage::from_bytes(&bytes).unwrap();
|
||||
|
||||
403
core/archipelago/src/mesh/steganography.rs
Normal file
403
core/archipelago/src/mesh/steganography.rs
Normal file
@@ -0,0 +1,403 @@
|
||||
//! Steganographic encoding for mesh messages.
|
||||
//!
|
||||
//! Transforms typed message envelopes into formats that resemble innocuous
|
||||
//! sensor data on the wire. Provides plausible deniability — traffic analysis
|
||||
//! sees weather readings or industrial sensor data, not Bitcoin transactions.
|
||||
//!
|
||||
//! Wire format:
|
||||
//! - Normal: `[0x02] [CBOR envelope]` (existing)
|
||||
//! - Stego: `[0xAA] [mode: 1 byte] [stego-encoded data]`
|
||||
//!
|
||||
//! The 0xAA prefix distinguishes steganographic frames from typed (0x02) and
|
||||
//! plain text (0x00) messages. Both sender and receiver must use the same mode.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Wire prefix for steganographic messages.
|
||||
pub const STEGO_MARKER: u8 = 0xAA;
|
||||
|
||||
/// Steganography mode — how real payload bytes are disguised on the wire.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum SteganographyMode {
|
||||
/// No steganography — standard 0x02 typed envelope.
|
||||
Normal,
|
||||
/// Payload disguised as weather station telemetry.
|
||||
/// Format: repeating 8-byte "readings" (temp, humidity, pressure, wind, flags).
|
||||
WeatherStation,
|
||||
/// Payload disguised as industrial sensor network data.
|
||||
/// Format: repeating 6-byte "samples" (voltage, current, vibration, status).
|
||||
SensorNetwork,
|
||||
}
|
||||
|
||||
impl Default for SteganographyMode {
|
||||
fn default() -> Self {
|
||||
Self::Normal
|
||||
}
|
||||
}
|
||||
|
||||
impl SteganographyMode {
|
||||
pub fn from_u8(v: u8) -> Option<Self> {
|
||||
match v {
|
||||
0 => Some(Self::Normal),
|
||||
1 => Some(Self::WeatherStation),
|
||||
2 => Some(Self::SensorNetwork),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Weather Station Encoding ──────────────────────────────────────────
|
||||
//
|
||||
// Each 8-byte "reading" encodes 5 bytes of real payload data:
|
||||
// [temp_hi: u8] [temp_lo: u8] [humidity: u8] [pressure_hi: u8] [pressure_lo: u8]
|
||||
// [wind_speed: u8] [wind_dir: u8] [flags: u8]
|
||||
//
|
||||
// Real data bytes map as:
|
||||
// byte0 → temp_hi (offset by 200 to look like -50.0°C to +5.5°C range)
|
||||
// byte1 → humidity (modulo 100)
|
||||
// byte2 → pressure_hi (offset by 900 for 900-1155 hPa range)
|
||||
// byte3 → wind_speed (modulo 60 for 0-59 m/s)
|
||||
// byte4 → flags (lower 5 bits = data, upper 3 bits = plausible status flags)
|
||||
//
|
||||
// temp_lo, pressure_lo, wind_dir are derived (not payload data) for realism.
|
||||
// Overhead: 8 bytes per 5 payload bytes = 60% efficiency.
|
||||
|
||||
const WEATHER_REAL_BYTES_PER_BLOCK: usize = 5;
|
||||
const WEATHER_WIRE_BYTES_PER_BLOCK: usize = 8;
|
||||
|
||||
fn encode_weather_block(data: &[u8]) -> [u8; WEATHER_WIRE_BYTES_PER_BLOCK] {
|
||||
let mut block = [0u8; 8];
|
||||
let b0 = *data.first().unwrap_or(&0);
|
||||
let b1 = *data.get(1).unwrap_or(&0);
|
||||
let b2 = *data.get(2).unwrap_or(&0);
|
||||
let b3 = *data.get(3).unwrap_or(&0);
|
||||
let b4 = *data.get(4).unwrap_or(&0);
|
||||
|
||||
// temp: b0 mapped to plausible range, fractional derived from b1
|
||||
block[0] = b0.wrapping_add(200); // temp_hi — wraps around, decoded by subtracting 200
|
||||
block[1] = b1 ^ 0x55; // temp_lo — XOR mask, recoverable
|
||||
// humidity: b1 stored directly (0-255 maps to 0-100% with modular interpretation)
|
||||
block[2] = b1;
|
||||
// pressure: b2 offset into 900-1155 range
|
||||
block[3] = b2;
|
||||
block[4] = b3 ^ 0x33; // pressure_lo — XOR mask
|
||||
// wind: b3 modular
|
||||
block[5] = b3;
|
||||
// wind direction: derived from b4 (0-359 degrees as single byte = 0-255 → *1.41)
|
||||
block[6] = b4 ^ 0xAA; // XOR mask
|
||||
// flags: b4 with upper bits set for realism (battery OK, GPS lock, etc.)
|
||||
block[7] = (b4 & 0x1F) | 0xC0; // upper 2 bits always set
|
||||
|
||||
block
|
||||
}
|
||||
|
||||
fn decode_weather_block(block: &[u8; WEATHER_WIRE_BYTES_PER_BLOCK]) -> [u8; WEATHER_REAL_BYTES_PER_BLOCK] {
|
||||
let mut data = [0u8; 5];
|
||||
data[0] = block[0].wrapping_sub(200);
|
||||
data[1] = block[2]; // humidity field stores b1 directly
|
||||
data[2] = block[3]; // pressure_hi stores b2 directly
|
||||
data[3] = block[5]; // wind_speed stores b3 directly
|
||||
data[4] = block[6] ^ 0xAA; // wind_dir XOR back
|
||||
data
|
||||
}
|
||||
|
||||
// ─── Sensor Network Encoding ───────────────────────────────────────────
|
||||
//
|
||||
// Each 6-byte "sample" encodes 4 bytes of real payload data:
|
||||
// [voltage_hi: u8] [voltage_lo: u8] [current: u8]
|
||||
// [vibration: u8] [phase: u8] [status: u8]
|
||||
//
|
||||
// Real data bytes map as:
|
||||
// byte0 → voltage_hi
|
||||
// byte1 → current
|
||||
// byte2 → vibration
|
||||
// byte3 → status (lower 4 bits = data, upper 4 = plausible status)
|
||||
//
|
||||
// voltage_lo and phase are derived for realism.
|
||||
// Overhead: 6 bytes per 4 payload bytes = 67% efficiency.
|
||||
|
||||
const SENSOR_REAL_BYTES_PER_BLOCK: usize = 4;
|
||||
const SENSOR_WIRE_BYTES_PER_BLOCK: usize = 6;
|
||||
|
||||
fn encode_sensor_block(data: &[u8]) -> [u8; SENSOR_WIRE_BYTES_PER_BLOCK] {
|
||||
let mut block = [0u8; 6];
|
||||
let b0 = *data.first().unwrap_or(&0);
|
||||
let b1 = *data.get(1).unwrap_or(&0);
|
||||
let b2 = *data.get(2).unwrap_or(&0);
|
||||
let b3 = *data.get(3).unwrap_or(&0);
|
||||
|
||||
block[0] = b0; // voltage_hi
|
||||
block[1] = b0 ^ b1; // voltage_lo (derived, recoverable)
|
||||
block[2] = b1; // current
|
||||
block[3] = b2; // vibration
|
||||
block[4] = b2.wrapping_add(b3); // phase (derived)
|
||||
block[5] = (b3 & 0x0F) | 0x80; // status: upper nibble = "operational"
|
||||
|
||||
block
|
||||
}
|
||||
|
||||
fn decode_sensor_block(block: &[u8; SENSOR_WIRE_BYTES_PER_BLOCK]) -> [u8; SENSOR_REAL_BYTES_PER_BLOCK] {
|
||||
let mut data = [0u8; 4];
|
||||
data[0] = block[0]; // voltage_hi = b0
|
||||
data[1] = block[2]; // current = b1
|
||||
data[2] = block[3]; // vibration = b2
|
||||
data[3] = (block[5] & 0x0F) | (block[4].wrapping_sub(block[3]) & 0xF0);
|
||||
// Recover b3: lower 4 bits from status, but we only stored lower 4.
|
||||
// Full b3 recovery: block[4] = b2 + b3, so b3 = block[4] - block[3]
|
||||
data[3] = block[4].wrapping_sub(block[3]);
|
||||
data
|
||||
}
|
||||
|
||||
// ─── Public API ────────────────────────────────────────────────────────
|
||||
|
||||
/// Encode raw payload bytes using steganographic mode.
|
||||
/// Returns: `[0xAA] [mode_byte] [length_hi] [length_lo] [encoded_blocks...]`
|
||||
///
|
||||
/// The length field stores the original payload size (up to 65535 bytes)
|
||||
/// so the decoder knows how many real bytes to extract.
|
||||
pub fn encode(mode: SteganographyMode, payload: &[u8]) -> Result<Vec<u8>> {
|
||||
if mode == SteganographyMode::Normal {
|
||||
anyhow::bail!("Cannot steganographically encode in Normal mode");
|
||||
}
|
||||
if payload.len() > 0xFFFF {
|
||||
anyhow::bail!("Payload too large for steganographic encoding");
|
||||
}
|
||||
|
||||
let len = payload.len() as u16;
|
||||
let mut output = Vec::new();
|
||||
output.push(STEGO_MARKER);
|
||||
output.push(mode as u8);
|
||||
output.push((len >> 8) as u8);
|
||||
output.push((len & 0xFF) as u8);
|
||||
|
||||
match mode {
|
||||
SteganographyMode::WeatherStation => {
|
||||
for chunk in payload.chunks(WEATHER_REAL_BYTES_PER_BLOCK) {
|
||||
// Pad short final chunk with zeros
|
||||
let mut padded = [0u8; WEATHER_REAL_BYTES_PER_BLOCK];
|
||||
padded[..chunk.len()].copy_from_slice(chunk);
|
||||
output.extend_from_slice(&encode_weather_block(&padded));
|
||||
}
|
||||
}
|
||||
SteganographyMode::SensorNetwork => {
|
||||
for chunk in payload.chunks(SENSOR_REAL_BYTES_PER_BLOCK) {
|
||||
let mut padded = [0u8; SENSOR_REAL_BYTES_PER_BLOCK];
|
||||
padded[..chunk.len()].copy_from_slice(chunk);
|
||||
output.extend_from_slice(&encode_sensor_block(&padded));
|
||||
}
|
||||
}
|
||||
SteganographyMode::Normal => unreachable!(),
|
||||
}
|
||||
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
/// Decode a steganographic frame back to raw payload bytes.
|
||||
/// Input must start with `0xAA`.
|
||||
pub fn decode(data: &[u8]) -> Result<(SteganographyMode, Vec<u8>)> {
|
||||
if data.len() < 4 {
|
||||
anyhow::bail!("Stego frame too short: {} bytes", data.len());
|
||||
}
|
||||
if data[0] != STEGO_MARKER {
|
||||
anyhow::bail!("Not a stego frame (expected 0xAA, got 0x{:02x})", data[0]);
|
||||
}
|
||||
|
||||
let mode = SteganographyMode::from_u8(data[1])
|
||||
.ok_or_else(|| anyhow::anyhow!("Unknown stego mode: 0x{:02x}", data[1]))?;
|
||||
let original_len = ((data[2] as usize) << 8) | (data[3] as usize);
|
||||
let encoded_data = &data[4..];
|
||||
|
||||
let mut payload = Vec::with_capacity(original_len);
|
||||
|
||||
match mode {
|
||||
SteganographyMode::WeatherStation => {
|
||||
for block_bytes in encoded_data.chunks(WEATHER_WIRE_BYTES_PER_BLOCK) {
|
||||
if block_bytes.len() < WEATHER_WIRE_BYTES_PER_BLOCK {
|
||||
break;
|
||||
}
|
||||
let block: [u8; WEATHER_WIRE_BYTES_PER_BLOCK] = block_bytes.try_into()
|
||||
.context("Invalid weather block size")?;
|
||||
let decoded = decode_weather_block(&block);
|
||||
payload.extend_from_slice(&decoded);
|
||||
}
|
||||
}
|
||||
SteganographyMode::SensorNetwork => {
|
||||
for block_bytes in encoded_data.chunks(SENSOR_WIRE_BYTES_PER_BLOCK) {
|
||||
if block_bytes.len() < SENSOR_WIRE_BYTES_PER_BLOCK {
|
||||
break;
|
||||
}
|
||||
let block: [u8; SENSOR_WIRE_BYTES_PER_BLOCK] = block_bytes.try_into()
|
||||
.context("Invalid sensor block size")?;
|
||||
let decoded = decode_sensor_block(&block);
|
||||
payload.extend_from_slice(&decoded);
|
||||
}
|
||||
}
|
||||
SteganographyMode::Normal => {
|
||||
anyhow::bail!("Normal mode cannot appear in stego frame");
|
||||
}
|
||||
}
|
||||
|
||||
// Truncate to original length (removes padding from last block)
|
||||
payload.truncate(original_len);
|
||||
Ok((mode, payload))
|
||||
}
|
||||
|
||||
/// Encode a typed envelope wire bytes using steganography.
|
||||
/// Input: standard wire bytes starting with 0x02 (TYPED_MESSAGE_MARKER).
|
||||
/// Output: stego wire bytes starting with 0xAA.
|
||||
pub fn encode_typed_wire(mode: SteganographyMode, typed_wire: &[u8]) -> Result<Vec<u8>> {
|
||||
if typed_wire.is_empty() || typed_wire[0] != super::message_types::TYPED_MESSAGE_MARKER {
|
||||
anyhow::bail!("Input is not a typed message (expected 0x02 prefix)");
|
||||
}
|
||||
// Encode the entire typed wire frame (including the 0x02 marker) as payload
|
||||
encode(mode, typed_wire)
|
||||
}
|
||||
|
||||
/// Decode a stego frame back to typed envelope wire bytes.
|
||||
/// Returns the original bytes with 0x02 prefix restored.
|
||||
pub fn decode_typed_wire(stego_data: &[u8]) -> Result<Vec<u8>> {
|
||||
let (_mode, payload) = decode(stego_data)?;
|
||||
if payload.is_empty() || payload[0] != super::message_types::TYPED_MESSAGE_MARKER {
|
||||
anyhow::bail!("Decoded stego payload is not a typed message");
|
||||
}
|
||||
Ok(payload)
|
||||
}
|
||||
|
||||
/// Calculate the wire overhead for a given mode and payload size.
|
||||
pub fn wire_size(mode: SteganographyMode, payload_len: usize) -> usize {
|
||||
let header = 4; // 0xAA + mode + len_hi + len_lo
|
||||
match mode {
|
||||
SteganographyMode::Normal => payload_len,
|
||||
SteganographyMode::WeatherStation => {
|
||||
let blocks = (payload_len + WEATHER_REAL_BYTES_PER_BLOCK - 1) / WEATHER_REAL_BYTES_PER_BLOCK;
|
||||
header + blocks * WEATHER_WIRE_BYTES_PER_BLOCK
|
||||
}
|
||||
SteganographyMode::SensorNetwork => {
|
||||
let blocks = (payload_len + SENSOR_REAL_BYTES_PER_BLOCK - 1) / SENSOR_REAL_BYTES_PER_BLOCK;
|
||||
header + blocks * SENSOR_WIRE_BYTES_PER_BLOCK
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Max real payload bytes that fit in a single 160-byte LoRa frame after stego.
|
||||
pub fn max_payload_per_frame(mode: SteganographyMode) -> usize {
|
||||
let frame_limit = 160usize;
|
||||
let header = 4;
|
||||
let available = frame_limit.saturating_sub(header);
|
||||
match mode {
|
||||
SteganographyMode::Normal => frame_limit - 1, // minus 0x02 marker
|
||||
SteganographyMode::WeatherStation => {
|
||||
let blocks = available / WEATHER_WIRE_BYTES_PER_BLOCK;
|
||||
blocks * WEATHER_REAL_BYTES_PER_BLOCK
|
||||
}
|
||||
SteganographyMode::SensorNetwork => {
|
||||
let blocks = available / SENSOR_WIRE_BYTES_PER_BLOCK;
|
||||
blocks * SENSOR_REAL_BYTES_PER_BLOCK
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_weather_roundtrip() {
|
||||
let original = vec![0x42, 0xFF, 0x00, 0xAB, 0x13];
|
||||
let encoded = encode(SteganographyMode::WeatherStation, &original).unwrap();
|
||||
assert_eq!(encoded[0], STEGO_MARKER);
|
||||
assert_eq!(encoded[1], SteganographyMode::WeatherStation as u8);
|
||||
let (mode, decoded) = decode(&encoded).unwrap();
|
||||
assert_eq!(mode, SteganographyMode::WeatherStation);
|
||||
assert_eq!(decoded, original);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sensor_roundtrip() {
|
||||
let original = vec![0x42, 0xFF, 0x00, 0xAB];
|
||||
let encoded = encode(SteganographyMode::SensorNetwork, &original).unwrap();
|
||||
assert_eq!(encoded[0], STEGO_MARKER);
|
||||
let (mode, decoded) = decode(&encoded).unwrap();
|
||||
assert_eq!(mode, SteganographyMode::SensorNetwork);
|
||||
assert_eq!(decoded, original);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_weather_multi_block() {
|
||||
// 12 bytes = 3 weather blocks (5+5+2 with padding)
|
||||
let original: Vec<u8> = (0..12).collect();
|
||||
let encoded = encode(SteganographyMode::WeatherStation, &original).unwrap();
|
||||
let (_, decoded) = decode(&encoded).unwrap();
|
||||
assert_eq!(decoded, original);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sensor_multi_block() {
|
||||
// 10 bytes = 3 sensor blocks (4+4+2 with padding)
|
||||
let original: Vec<u8> = (0..10).collect();
|
||||
let encoded = encode(SteganographyMode::SensorNetwork, &original).unwrap();
|
||||
let (_, decoded) = decode(&encoded).unwrap();
|
||||
assert_eq!(decoded, original);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_all_byte_values_weather() {
|
||||
let original: Vec<u8> = (0..=255).collect();
|
||||
let encoded = encode(SteganographyMode::WeatherStation, &original).unwrap();
|
||||
let (_, decoded) = decode(&encoded).unwrap();
|
||||
assert_eq!(decoded, original);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_all_byte_values_sensor() {
|
||||
let original: Vec<u8> = (0..=255).collect();
|
||||
let encoded = encode(SteganographyMode::SensorNetwork, &original).unwrap();
|
||||
let (_, decoded) = decode(&encoded).unwrap();
|
||||
assert_eq!(decoded, original);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_payload() {
|
||||
let encoded = encode(SteganographyMode::WeatherStation, &[]).unwrap();
|
||||
let (_, decoded) = decode(&encoded).unwrap();
|
||||
assert!(decoded.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wire_size_calculation() {
|
||||
// 5 bytes payload = 1 weather block = 4 header + 8 = 12
|
||||
assert_eq!(wire_size(SteganographyMode::WeatherStation, 5), 12);
|
||||
// 4 bytes payload = 1 sensor block = 4 header + 6 = 10
|
||||
assert_eq!(wire_size(SteganographyMode::SensorNetwork, 4), 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_max_payload_per_frame() {
|
||||
let weather_max = max_payload_per_frame(SteganographyMode::WeatherStation);
|
||||
let sensor_max = max_payload_per_frame(SteganographyMode::SensorNetwork);
|
||||
// Verify the encoded output fits in 160 bytes
|
||||
let test_data = vec![0x42; weather_max];
|
||||
let encoded = encode(SteganographyMode::WeatherStation, &test_data).unwrap();
|
||||
assert!(encoded.len() <= 160, "Weather stego {} > 160", encoded.len());
|
||||
|
||||
let test_data = vec![0x42; sensor_max];
|
||||
let encoded = encode(SteganographyMode::SensorNetwork, &test_data).unwrap();
|
||||
assert!(encoded.len() <= 160, "Sensor stego {} > 160", encoded.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_normal_mode_rejects() {
|
||||
assert!(encode(SteganographyMode::Normal, &[1, 2, 3]).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_typed_wire_roundtrip() {
|
||||
// Simulate a typed message wire frame
|
||||
let mut typed_wire = vec![0x02]; // TYPED_MESSAGE_MARKER
|
||||
typed_wire.extend_from_slice(&[0x01, 0x02, 0x03, 0x04, 0x05]);
|
||||
let stego = encode_typed_wire(SteganographyMode::WeatherStation, &typed_wire).unwrap();
|
||||
let recovered = decode_typed_wire(&stego).unwrap();
|
||||
assert_eq!(recovered, typed_wire);
|
||||
}
|
||||
}
|
||||
@@ -209,96 +209,36 @@ impl Server {
|
||||
});
|
||||
}
|
||||
|
||||
// Initialize mesh networking service (if config has enabled: true)
|
||||
{
|
||||
let data_dir = config.data_dir.clone();
|
||||
let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey)
|
||||
.unwrap_or_default();
|
||||
let pubkey_hex = identity.pubkey_hex();
|
||||
let signing_key = identity.signing_key();
|
||||
match crate::mesh::MeshService::new(&data_dir, signing_key, &did, &pubkey_hex).await {
|
||||
Ok(mut mesh_service) => {
|
||||
let mesh_config = crate::mesh::load_config(&data_dir).await.unwrap_or_default();
|
||||
if mesh_config.enabled {
|
||||
if let Err(e) = mesh_service.start() {
|
||||
warn!("Mesh service start failed (non-fatal): {}", e);
|
||||
} else {
|
||||
info!("📡 Mesh networking started");
|
||||
}
|
||||
}
|
||||
api_handler.rpc_handler().set_mesh_service(mesh_service).await;
|
||||
info!("📡 Mesh service initialized");
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Mesh service init failed (non-fatal): {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize transport router (unified routing: mesh > lan > tor)
|
||||
{
|
||||
let data_dir = config.data_dir.clone();
|
||||
let did = identity::did_key_from_pubkey_hex(&data.server_info.pubkey)
|
||||
.unwrap_or_default();
|
||||
let pubkey_hex = identity.pubkey_hex();
|
||||
let mesh_config = crate::mesh::load_config(&data_dir).await.unwrap_or_default();
|
||||
let mesh_only = mesh_config.mesh_only_mode.unwrap_or(false);
|
||||
|
||||
match crate::transport::PeerRegistry::load(&data_dir).await {
|
||||
Ok(registry) => {
|
||||
let registry = std::sync::Arc::new(registry);
|
||||
let mut transports: Vec<Box<dyn crate::transport::NodeTransport>> = Vec::new();
|
||||
|
||||
transports.push(Box::new(
|
||||
crate::transport::tor::TorTransport::new(&pubkey_hex),
|
||||
));
|
||||
transports.push(Box::new(
|
||||
crate::transport::mesh_transport::MeshTransport::new(
|
||||
api_handler.rpc_handler().mesh_service_arc(),
|
||||
),
|
||||
));
|
||||
|
||||
let mut lan = crate::transport::lan::LanTransport::new(&did, &pubkey_hex, 5678);
|
||||
match lan.start(registry.clone()) {
|
||||
Ok(()) => info!("📡 LAN transport (mDNS) started"),
|
||||
Err(e) => debug!("LAN transport init (non-fatal): {}", e),
|
||||
}
|
||||
transports.push(Box::new(lan));
|
||||
|
||||
let router = std::sync::Arc::new(crate::transport::TransportRouter::new(
|
||||
transports,
|
||||
registry,
|
||||
mesh_only,
|
||||
));
|
||||
api_handler.rpc_handler().set_transport_router(router).await;
|
||||
info!("📡 Transport router initialized (mesh_only={})", mesh_only);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Transport router init failed (non-fatal): {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize container scanner — discovers installed apps from Podman/Docker
|
||||
{
|
||||
let scanner = create_docker_scanner(&config).await?;
|
||||
let state = state_manager.clone();
|
||||
let identity_clone = identity.clone();
|
||||
|
||||
// Initial scan
|
||||
// Initial scan (delayed to let crash recovery finish first)
|
||||
tokio::spawn(async move {
|
||||
// Wait for crash recovery to start containers before scanning
|
||||
tokio::time::sleep(Duration::from_secs(15)).await;
|
||||
info!("🐳 Scanning containers...");
|
||||
if let Err(e) = scan_and_update_packages(&scanner, &state, identity_clone.as_ref()).await {
|
||||
error!("Failed to scan containers: {}", e);
|
||||
}
|
||||
|
||||
// Periodic scan every 10 seconds (only broadcasts if state changed)
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(10));
|
||||
// Periodic scan every 30 seconds (only broadcasts if state changed)
|
||||
// Uses an in-flight guard to skip scans when a previous one is still running
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
||||
let scanning = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if scanning.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
debug!("Skipping container scan — previous scan still in progress");
|
||||
continue;
|
||||
}
|
||||
scanning.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
if let Err(e) = scan_and_update_packages(&scanner, &state, identity_clone.as_ref()).await {
|
||||
error!("Failed to update containers: {}", e);
|
||||
}
|
||||
scanning.store(false, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -335,11 +335,14 @@ impl PodmanClient {
|
||||
.arg("-a")
|
||||
.arg("--format")
|
||||
.arg("json");
|
||||
|
||||
let output = cmd
|
||||
.output()
|
||||
.await
|
||||
.context("Failed to list containers")?;
|
||||
|
||||
let output = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(60),
|
||||
cmd.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("podman ps timed out (60s)"))?
|
||||
.context("Failed to list containers")?;
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
|
||||
Reference in New Issue
Block a user