feat: Phase 4 — off-grid Bitcoin relay, block headers, dead man's switch

- Typed message dispatch in listener (BlockHeader, TxRelay, LightningRelay, Alert, TxConfirmation)
- Base64 encoding for binary payloads over LoRa (fixes NUL byte truncation)
- Compact block header announcements (88 bytes, fits 160-byte LoRa limit)
- Block header announcer: internet nodes auto-announce new blocks to Archy peers
- TX relay: mesh-only nodes can broadcast transactions via internet-connected peers
- Confirmation tracking: relay node monitors 1/3, 2/3, 3/3 confirmations, sends updates back
- Dead man's switch background task with configurable interval and signed alert broadcast
- 6 new RPC endpoints: relay-tx, block-headers, relay-lightning, deadman-status/configure/checkin
- lnd.create-raw-tx: create signed TX without broadcasting (for mesh relay)
- Web5 wallet: offline detection + "Send via mesh?" prompt with auto relay + confirmation polling
- Mesh.vue: Off-Grid Bitcoin tab, Dead Man tab, Send Bitcoin/Lightning buttons
- TX/Lightning relay sends only to Archy peers (not broadcast to all devices)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian
2026-03-17 15:51:56 +00:00
parent 4b7c765cd1
commit d1ac098edb
13 changed files with 2091 additions and 126 deletions

View File

@@ -676,6 +676,97 @@ impl RpcHandler {
}))
}
/// Create a signed raw transaction WITHOUT broadcasting.
/// Used for mesh relay: create the TX locally, then relay the hex to an
/// internet-connected peer who broadcasts it.
pub(super) async fn handle_lnd_create_raw_tx(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let addr = params.get("addr")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'addr'"))?;
let amount_sats = params.get("amount_sats")
.and_then(|v| v.as_u64())
.ok_or_else(|| anyhow::anyhow!("Missing 'amount_sats'"))?;
if amount_sats < 546 {
anyhow::bail!("Amount must be at least 546 sats (dust limit)");
}
if amount_sats > 2_100_000_000_000_000 {
anyhow::bail!("Amount exceeds 21M BTC");
}
let (client, macaroon_hex) = self.lnd_client().await?;
// Step 1: Fund a PSBT with the desired output
let fee_rate = params.get("fee_rate").and_then(|v| v.as_u64()).unwrap_or(5);
let fund_body = serde_json::json!({
"raw": {
"outputs": { addr: amount_sats }
},
"sat_per_vbyte": fee_rate,
"spend_unconfirmed": false,
});
let resp = client
.post("https://127.0.0.1:8080/v2/wallet/psbt/fund")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.json(&fund_body)
.send()
.await
.context("Failed to fund PSBT via LND")?;
let status = resp.status();
let body: serde_json::Value = resp.json().await
.context("Failed to parse fund response")?;
if !status.is_success() {
let msg = body.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Failed to create TX: {}", msg));
}
let funded_psbt = body.get("funded_psbt")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("No funded_psbt in response"))?;
// Step 2: Finalize (LND auto-signs with hot wallet keys)
let finalize_body = serde_json::json!({
"funded_psbt": funded_psbt,
});
let resp = client
.post("https://127.0.0.1:8080/v2/wallet/psbt/finalize")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.json(&finalize_body)
.send()
.await
.context("Failed to finalize PSBT")?;
let status = resp.status();
let body: serde_json::Value = resp.json().await
.context("Failed to parse finalize response")?;
if !status.is_success() {
let msg = body.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Failed to sign TX: {}", msg));
}
// raw_final_tx is the hex-encoded signed transaction — ready for broadcast
let raw_final_tx = body.get("raw_final_tx")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("No raw_final_tx in response"))?
.to_string();
info!(addr, amount_sats, tx_len = raw_final_tx.len(), "Created raw TX for mesh relay (NOT broadcast)");
Ok(serde_json::json!({
"raw_tx_hex": raw_final_tx,
"amount_sats": amount_sats,
"addr": addr,
"broadcast": false,
}))
}
/// List on-chain transactions from LND.
/// Returns all transactions, with incoming (amount > 0) flagged.
pub(super) async fn handle_lnd_gettransactions(&self) -> Result<serde_json::Value> {

View File

@@ -411,6 +411,231 @@ impl RpcHandler {
}
}
// ─── Phase 4: Off-Grid Bitcoin Operations ────────────────────────────
/// mesh.relay-tx — Send a raw transaction for relay by an internet-connected mesh peer.
pub(super) async fn handle_mesh_relay_tx(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let tx_hex = params["tx_hex"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing tx_hex"))?;
if tx_hex.len() < 20 || tx_hex.len() > 200_000 {
anyhow::bail!("Invalid tx_hex length");
}
// Validate hex
if hex::decode(tx_hex).is_err() {
anyhow::bail!("tx_hex is not valid hexadecimal");
}
let service = self.mesh_service.read().await;
let svc = service.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let request_id = chrono::Utc::now().timestamp() as u64;
svc.relay_tracker.track_tx_relay(request_id, svc.our_did()).await;
let wire = crate::mesh::bitcoin_relay::build_tx_relay_request(tx_hex, request_id)?;
// Send ONLY to Archipelago peers (Archy-* nodes), not broadcast to all devices
let peers = svc.peers().await;
let mut sent_count = 0u32;
for peer in &peers {
if !peer.advert_name.starts_with("Archy-") { continue; }
if let Some(ref pk) = peer.pubkey_hex {
if let Ok(pk_bytes) = hex::decode(pk) {
if pk_bytes.len() >= 6 {
let mut prefix = [0u8; 6];
prefix.copy_from_slice(&pk_bytes[..6]);
let _ = svc.shared_state()
.cmd_tx
.send(crate::mesh::listener::MeshCommand::SendRaw {
dest_pubkey_prefix: prefix,
payload: wire.clone(),
})
.await;
sent_count += 1;
}
}
}
}
info!(request_id, tx_len = tx_hex.len(), archy_peers = sent_count, "TX relay sent to Archy peers only");
Ok(serde_json::json!({
"request_id": request_id,
"queued": true,
"tx_hex_len": tx_hex.len(),
}))
}
/// mesh.block-headers — Get cached block headers received from mesh peers.
pub(super) async fn handle_mesh_block_headers(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let count = params
.as_ref()
.and_then(|p| p["count"].as_u64())
.unwrap_or(10) as usize;
let service = self.mesh_service.read().await;
let svc = service.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let headers = svc.block_header_cache.recent_headers(count).await;
let latest = svc.block_header_cache.latest_height().await;
Ok(serde_json::json!({
"headers": headers.iter().map(|h| serde_json::json!({
"height": h.height,
"hash": h.hash,
"prev_hash": h.prev_hash,
"timestamp": h.timestamp,
"announced_by": h.announced_by,
})).collect::<Vec<_>>(),
"latest_height": latest,
"count": headers.len(),
}))
}
/// mesh.relay-lightning — Send a Lightning invoice for payment by an internet-connected peer.
pub(super) async fn handle_mesh_relay_lightning(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let bolt11 = params["bolt11"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing bolt11"))?;
let amount_sats = params["amount_sats"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing amount_sats"))?;
if !bolt11.starts_with("lnbc") && !bolt11.starts_with("lntb") {
anyhow::bail!("Invalid bolt11 invoice — must start with lnbc or lntb");
}
let service = self.mesh_service.read().await;
let svc = service.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let request_id = chrono::Utc::now().timestamp() as u64;
svc.relay_tracker.track_lightning_relay(request_id, svc.our_did()).await;
let wire = crate::mesh::bitcoin_relay::build_lightning_relay_request(
bolt11, amount_sats, request_id,
)?;
// Send ONLY to Archipelago peers, not broadcast
let peers = svc.peers().await;
let mut sent_count = 0u32;
for peer in &peers {
if !peer.advert_name.starts_with("Archy-") { continue; }
if let Some(ref pk) = peer.pubkey_hex {
if let Ok(pk_bytes) = hex::decode(pk) {
if pk_bytes.len() >= 6 {
let mut prefix = [0u8; 6];
prefix.copy_from_slice(&pk_bytes[..6]);
let _ = svc.shared_state()
.cmd_tx
.send(crate::mesh::listener::MeshCommand::SendRaw {
dest_pubkey_prefix: prefix,
payload: wire.clone(),
})
.await;
sent_count += 1;
}
}
}
}
info!(request_id, amount_sats, archy_peers = sent_count, "Lightning relay sent to Archy peers only");
Ok(serde_json::json!({
"request_id": request_id,
"queued": true,
"amount_sats": amount_sats,
}))
}
/// mesh.deadman-status — Get dead man's switch status.
pub(super) async fn handle_mesh_deadman_status(&self) -> Result<serde_json::Value> {
let service = self.mesh_service.read().await;
let svc = service.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let status = svc.dead_man_switch.status().await;
Ok(serde_json::to_value(status)?)
}
/// mesh.deadman-configure — Configure the dead man's switch.
pub(super) async fn handle_mesh_deadman_configure(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let service = self.mesh_service.read().await;
let svc = service.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let mut config = svc.dead_man_switch.get_config().await;
if let Some(enabled) = params.get("enabled").and_then(|v| v.as_bool()) {
config.dead_man_enabled = enabled;
}
if let Some(interval) = params.get("interval_secs").and_then(|v| v.as_u64()) {
if interval < 60 {
anyhow::bail!("Interval must be at least 60 seconds");
}
config.dead_man_interval_secs = interval;
}
if let (Some(lat), Some(lng)) = (
params.get("lat").and_then(|v| v.as_f64()),
params.get("lng").and_then(|v| v.as_f64()),
) {
let label = params.get("label").and_then(|v| v.as_str()).map(|s| s.to_string());
config.last_gps = Some(Coordinate::from_degrees(lat, lng, label));
}
if let Some(contacts) = params.get("contacts").and_then(|v| v.as_array()) {
config.emergency_contacts = contacts
.iter()
.filter_map(|c| c.as_str().map(|s| s.to_string()))
.collect();
}
if let Some(msg) = params.get("custom_message").and_then(|v| v.as_str()) {
config.custom_message = Some(msg.to_string());
}
if let Some(auto_gps) = params.get("auto_gps").and_then(|v| v.as_bool()) {
config.auto_include_gps = auto_gps;
}
svc.dead_man_switch.configure(config).await?;
// Reset timer on configure
svc.dead_man_switch.check_in().await;
let status = svc.dead_man_switch.status().await;
info!("Dead man's switch configured");
Ok(serde_json::to_value(status)?)
}
/// mesh.deadman-checkin — Heartbeat to reset the dead man's switch timer.
pub(super) async fn handle_mesh_deadman_checkin(&self) -> Result<serde_json::Value> {
let service = self.mesh_service.read().await;
let svc = service.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
svc.dead_man_check_in().await;
let remaining = svc.dead_man_switch.time_remaining_secs().await;
Ok(serde_json::json!({
"checked_in": true,
"time_remaining_secs": remaining,
}))
}
/// mesh.rotate-prekeys — Force prekey rotation for X3DH.
pub(super) async fn handle_mesh_rotate_prekeys(&self) -> Result<serde_json::Value> {
// Load identity signing key

View File

@@ -490,6 +490,7 @@ impl RpcHandler {
"lnd.payinvoice" => self.handle_lnd_payinvoice(params).await,
"lnd.create-psbt" => self.handle_lnd_create_psbt(params).await,
"lnd.finalize-psbt" => self.handle_lnd_finalize_psbt(params).await,
"lnd.create-raw-tx" => self.handle_lnd_create_raw_tx(params).await,
"lnd.gettransactions" => self.handle_lnd_gettransactions().await,
"lnd.connect-info" => self.handle_lnd_connect_info().await,
@@ -652,6 +653,13 @@ impl RpcHandler {
"mesh.outbox" => self.handle_mesh_outbox(params).await,
"mesh.session-status" => self.handle_mesh_session_status(params).await,
"mesh.rotate-prekeys" => self.handle_mesh_rotate_prekeys().await,
// Phase 4: Off-grid Bitcoin operations
"mesh.relay-tx" => self.handle_mesh_relay_tx(params).await,
"mesh.block-headers" => self.handle_mesh_block_headers(params).await,
"mesh.relay-lightning" => self.handle_mesh_relay_lightning(params).await,
"mesh.deadman-status" => self.handle_mesh_deadman_status().await,
"mesh.deadman-configure" => self.handle_mesh_deadman_configure(params).await,
"mesh.deadman-checkin" => self.handle_mesh_deadman_checkin().await,
// Transport layer (unified routing)
"transport.status" => self.handle_transport_status().await,

View File

@@ -175,6 +175,16 @@ impl DeadManSwitch {
envelope.to_wire()
}
/// Check if the alert has already been sent (prevents re-broadcasting every 60s).
pub async fn triggered_flag(&self) -> tokio::sync::RwLockReadGuard<'_, bool> {
self.triggered.read().await
}
/// Mark the switch as having fired (alert already sent).
pub async fn mark_triggered(&self) {
*self.triggered.write().await = true;
}
/// Get the list of emergency contact DIDs.
pub async fn emergency_contacts(&self) -> Vec<String> {
self.config.read().await.emergency_contacts.clone()

View File

@@ -165,27 +165,46 @@ impl Default for RelayTracker {
// ─── Block Header Announcement Builder ──────────────────────────────────
/// Build a signed block header announcement for mesh broadcast.
/// Build a compact block header announcement for mesh broadcast.
/// Uses raw binary (not CBOR) to fit within the 160-byte LoRa limit:
/// height(8 LE) + hash_raw(32) + timestamp(4 LE) = 44 bytes payload
/// Wrapped in unsigned TypedEnvelope (~25 bytes overhead) = ~69 total.
pub fn build_block_header_announcement(
height: u64,
hash: &str,
prev_hash: &str,
_prev_hash: &str,
timestamp: u32,
our_did: &str,
signing_key: &ed25519_dalek::SigningKey,
_our_did: &str,
_signing_key: &ed25519_dalek::SigningKey,
) -> Result<Vec<u8>> {
let header = BlockHeaderPayload {
height,
hash: hash.to_string(),
prev_hash: prev_hash.to_string(),
timestamp,
announced_by: our_did.to_string(),
};
let payload = message_types::encode_payload(&header)?;
let envelope = TypedEnvelope::new_signed(MeshMessageType::BlockHeader, payload, signing_key);
let hash_bytes = hex::decode(hash).context("Invalid block hash hex")?;
if hash_bytes.len() != 32 {
anyhow::bail!("Block hash must be 32 bytes, got {}", hash_bytes.len());
}
// Compact binary: height(8) + hash(32) + timestamp(4) = 44 bytes
let mut payload = Vec::with_capacity(44);
payload.extend_from_slice(&height.to_le_bytes());
payload.extend_from_slice(&hash_bytes);
payload.extend_from_slice(&timestamp.to_le_bytes());
// Use unsigned envelope to save 64 bytes (no Ed25519 signature)
let envelope = TypedEnvelope::new(MeshMessageType::BlockHeader, payload);
envelope.to_wire()
}
/// Decode a compact block header from raw binary payload.
/// Returns (height, hash_hex, timestamp).
pub fn decode_compact_block_header(payload: &[u8]) -> Result<(u64, String, u32)> {
if payload.len() < 44 {
anyhow::bail!("Compact block header too short: {} bytes", payload.len());
}
let height = u64::from_le_bytes(payload[0..8].try_into().unwrap());
let hash_hex = hex::encode(&payload[8..40]);
let timestamp = u32::from_le_bytes(payload[40..44].try_into().unwrap());
Ok((height, hash_hex, timestamp))
}
/// Build a TX relay request envelope.
pub fn build_tx_relay_request(tx_hex: &str, request_id: u64) -> Result<Vec<u8>> {
let payload = message_types::encode_payload(&TxRelayPayload {

View File

@@ -8,6 +8,7 @@
//! - Manages peer cache and message store
use super::crypto;
use super::message_types::{self, MeshMessageType, TypedEnvelope};
use super::protocol;
use super::serial::MeshcoreDevice;
use super::types::*;
@@ -31,9 +32,17 @@ const MAX_MESSAGES: usize = 100;
/// Delay before reconnection attempt after device disconnect.
const RECONNECT_DELAY: Duration = Duration::from_secs(10);
/// Number of consecutive write failures before we consider the device dead
/// and trigger a reconnection cycle.
const MAX_CONSECUTIVE_WRITE_FAILURES: u32 = 3;
/// Command sent from MeshService to the listener task (which owns the serial port).
pub enum MeshCommand {
SendText { dest_pubkey_prefix: [u8; 6], payload: Vec<u8> },
/// Send pre-encoded binary (TypedEnvelope wire bytes) to a peer.
SendRaw { dest_pubkey_prefix: [u8; 6], payload: Vec<u8> },
/// Broadcast pre-encoded binary on a mesh channel.
BroadcastChannel { channel: u8, payload: Vec<u8> },
SendAdvert,
}
@@ -220,20 +229,33 @@ async fn run_mesh_session(
refresh_contacts(&mut device, state).await;
// Sync any queued messages from before we connected
sync_queued_messages(&mut device, state, our_x25519_secret).await;
let _ = sync_queued_messages(&mut device, state, our_x25519_secret).await;
// Main loop
let mut advert_timer = tokio::time::interval(ADVERT_INTERVAL);
let mut sync_timer = tokio::time::interval(SYNC_INTERVAL);
advert_timer.tick().await; // skip first immediate tick
sync_timer.tick().await;
let mut consecutive_write_failures: u32 = 0;
loop {
// If too many consecutive writes have failed, the serial port is dead —
// bail out so the outer loop can reconnect to a (possibly re-enumerated) device.
if consecutive_write_failures >= MAX_CONSECUTIVE_WRITE_FAILURES {
error!(
failures = consecutive_write_failures,
"Serial port unresponsive — triggering reconnection"
);
anyhow::bail!("Serial port unresponsive after {} consecutive write failures", consecutive_write_failures);
}
tokio::select! {
// Check for incoming frames
frame_result = device.try_recv_frame() => {
match frame_result {
Ok(Some(frame)) => {
// Successful read resets the failure counter
consecutive_write_failures = 0;
let should_action = handle_frame(
&frame,
state,
@@ -242,7 +264,9 @@ async fn run_mesh_session(
if should_action {
// Contact discovery or messages waiting — sync both
refresh_contacts(&mut device, state).await;
sync_queued_messages(&mut device, state, our_x25519_secret).await;
if sync_queued_messages(&mut device, state, our_x25519_secret).await {
consecutive_write_failures += 1;
}
}
}
Ok(None) => {
@@ -260,7 +284,10 @@ async fn run_mesh_session(
_ = advert_timer.tick() => {
debug!("Periodic self-advert broadcast");
if let Err(e) = device.send_self_advert().await {
warn!("Failed to send advert: {}", e);
consecutive_write_failures += 1;
warn!(failures = consecutive_write_failures, "Failed to send advert: {}", e);
} else {
consecutive_write_failures = 0;
}
refresh_contacts(&mut device, state).await;
}
@@ -270,14 +297,40 @@ async fn run_mesh_session(
match cmd {
MeshCommand::SendText { dest_pubkey_prefix, payload } => {
if let Err(e) = device.send_text(&dest_pubkey_prefix, &payload).await {
warn!("Failed to send text via mesh: {}", e);
consecutive_write_failures += 1;
warn!(failures = consecutive_write_failures, "Failed to send text via mesh: {}", e);
} else {
consecutive_write_failures = 0;
info!(dest = %hex::encode(dest_pubkey_prefix), len = payload.len(), "Sent mesh message");
}
}
MeshCommand::SendRaw { dest_pubkey_prefix, payload } => {
// Base64 encode binary payloads — Meshcore truncates at NUL bytes in text mode
use base64::Engine;
let encoded = base64::engine::general_purpose::STANDARD.encode(&payload);
if let Err(e) = device.send_text(&dest_pubkey_prefix, encoded.as_bytes()).await {
consecutive_write_failures += 1;
warn!(failures = consecutive_write_failures, "Failed to send raw via mesh: {}", e);
} else {
consecutive_write_failures = 0;
info!(dest = %hex::encode(dest_pubkey_prefix), raw_len = payload.len(), wire_len = encoded.len(), "Sent raw mesh message (base64)");
}
}
MeshCommand::BroadcastChannel { channel, payload } => {
if let Err(e) = device.send_channel_text(channel, &payload).await {
consecutive_write_failures += 1;
warn!(failures = consecutive_write_failures, "Failed to broadcast on channel {}: {}", channel, e);
} else {
consecutive_write_failures = 0;
info!(channel, len = payload.len(), "Broadcast on mesh channel");
}
}
MeshCommand::SendAdvert => {
if let Err(e) = device.send_self_advert().await {
warn!("Failed to send advert: {}", e);
consecutive_write_failures += 1;
warn!(failures = consecutive_write_failures, "Failed to send advert: {}", e);
} else {
consecutive_write_failures = 0;
}
}
}
@@ -285,7 +338,12 @@ async fn run_mesh_session(
// Periodic message sync
_ = sync_timer.tick() => {
sync_queued_messages(&mut device, state, our_x25519_secret).await;
if sync_queued_messages(&mut device, state, our_x25519_secret).await {
consecutive_write_failures += 1;
debug!(failures = consecutive_write_failures, "Message sync failed");
} else {
consecutive_write_failures = 0;
}
}
// Shutdown signal
@@ -323,33 +381,20 @@ async fn handle_frame(
}
protocol::RESP_CONTACT_MSG_V3 => {
// Direct message received (v3 format)
match protocol::parse_contact_msg_v3(&frame.data) {
Ok((sender_prefix, text, _snr)) => {
if !text.is_empty() {
let peer_name = {
let peers = state.peers.read().await;
peers.values()
.find(|p| p.pubkey_hex.as_ref().map(|k| k.starts_with(&sender_prefix)).unwrap_or(false))
.map(|p| (p.contact_id, p.advert_name.clone()))
};
let (contact_id, name) = peer_name.unwrap_or((0, sender_prefix.clone()));
let msg_id = state.next_id().await;
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Received,
peer_contact_id: contact_id,
peer_name: Some(name),
plaintext: text,
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: true,
encrypted: false,
};
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
info!(from = %sender_prefix, "Received mesh DM (v3)");
let _ = state.event_tx.send(MeshEvent::MessageReceived(msg));
// Direct message received (v3 format) — check for typed envelope first
match protocol::parse_contact_msg_v3_raw(&frame.data) {
Ok((sender_prefix, payload, _snr)) => {
if !payload.is_empty() {
let (contact_id, name) = resolve_peer(state, &sender_prefix).await;
if TypedEnvelope::is_typed(&payload) {
handle_typed_message(&payload, contact_id, &name, state).await;
} else if let Some(decoded) = try_base64_typed(&payload) {
handle_typed_message(&decoded, contact_id, &name, state).await;
} else {
let text = String::from_utf8_lossy(&payload).to_string();
store_plain_message(state, contact_id, &name, &text).await;
info!(from = %sender_prefix, "Received mesh DM (v3)");
}
}
}
Err(e) => warn!("Failed to parse v3 message: {}", e),
@@ -358,32 +403,19 @@ async fn handle_frame(
protocol::RESP_CONTACT_MSG => {
// Direct message received (v1 format)
match protocol::parse_contact_msg_v1(&frame.data) {
Ok((sender_prefix, text)) => {
if !text.is_empty() {
let peer_name = {
let peers = state.peers.read().await;
peers.values()
.find(|p| p.pubkey_hex.as_ref().map(|k| k.starts_with(&sender_prefix)).unwrap_or(false))
.map(|p| (p.contact_id, p.advert_name.clone()))
};
let (contact_id, name) = peer_name.unwrap_or((0, sender_prefix.clone()));
let msg_id = state.next_id().await;
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Received,
peer_contact_id: contact_id,
peer_name: Some(name),
plaintext: text,
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: true,
encrypted: false,
};
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
info!(from = %sender_prefix, "Received mesh DM (v1)");
let _ = state.event_tx.send(MeshEvent::MessageReceived(msg));
match protocol::parse_contact_msg_v1_raw(&frame.data) {
Ok((sender_prefix, payload)) => {
if !payload.is_empty() {
let (contact_id, name) = resolve_peer(state, &sender_prefix).await;
if TypedEnvelope::is_typed(&payload) {
handle_typed_message(&payload, contact_id, &name, state).await;
} else if let Some(decoded) = try_base64_typed(&payload) {
handle_typed_message(&decoded, contact_id, &name, state).await;
} else {
let text = String::from_utf8_lossy(&payload).to_string();
store_plain_message(state, contact_id, &name, &text).await;
info!(from = %sender_prefix, "Received mesh DM (v1)");
}
}
}
Err(e) => warn!("Failed to parse v1 message: {}", e),
@@ -391,26 +423,19 @@ async fn handle_frame(
}
protocol::RESP_CHANNEL_MSG_V3 => {
// Channel broadcast received (v3)
match protocol::parse_channel_msg_v3(&frame.data) {
Ok((channel_idx, text)) => {
if !text.is_empty() {
let msg_id = state.next_id().await;
let chan_contact_id = -((channel_idx as i32) + 1);
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Received,
peer_contact_id: chan_contact_id as u32,
peer_name: Some(format!("Channel {}", channel_idx)),
plaintext: text,
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: true,
encrypted: false,
};
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
info!(channel = channel_idx, "Received mesh channel message (v3)");
let _ = state.event_tx.send(MeshEvent::MessageReceived(msg));
// Channel broadcast received (v3) — check for typed envelope
match protocol::parse_channel_msg_v3_raw(&frame.data) {
Ok((channel_idx, payload)) => {
if !payload.is_empty() {
let chan_contact_id = u32::MAX - (channel_idx as u32);
let chan_name = format!("Channel {}", channel_idx);
if TypedEnvelope::is_typed(&payload) {
handle_typed_message(&payload, chan_contact_id, &chan_name, state).await;
} else {
let text = String::from_utf8_lossy(&payload).to_string();
store_plain_message(state, chan_contact_id, &chan_name, &text).await;
info!(channel = channel_idx, "Received mesh channel message (v3)");
}
}
}
Err(e) => warn!("Failed to parse v3 channel message: {}", e),
@@ -419,25 +444,18 @@ async fn handle_frame(
protocol::RESP_CHANNEL_MSG => {
// Channel broadcast received (v1)
match protocol::parse_channel_msg_v1(&frame.data) {
Ok((channel_idx, text)) => {
if !text.is_empty() {
let msg_id = state.next_id().await;
let chan_contact_id = -((channel_idx as i32) + 1);
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Received,
peer_contact_id: chan_contact_id as u32,
peer_name: Some(format!("Channel {}", channel_idx)),
plaintext: text,
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: true,
encrypted: false,
};
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
info!(channel = channel_idx, "Received mesh channel message");
let _ = state.event_tx.send(MeshEvent::MessageReceived(msg));
match protocol::parse_channel_msg_v1_raw(&frame.data) {
Ok((channel_idx, payload)) => {
if !payload.is_empty() {
let chan_contact_id = u32::MAX - (channel_idx as u32);
let chan_name = format!("Channel {}", channel_idx);
if TypedEnvelope::is_typed(&payload) {
handle_typed_message(&payload, chan_contact_id, &chan_name, state).await;
} else {
let text = String::from_utf8_lossy(&payload).to_string();
store_plain_message(state, chan_contact_id, &chan_name, &text).await;
info!(channel = channel_idx, "Received mesh channel message");
}
}
}
Err(e) => warn!("Failed to parse channel message: {}", e),
@@ -599,11 +617,12 @@ async fn handle_received_message(
}
/// Drain any queued messages from the device.
/// Returns `true` if a write/communication error occurred (for failure tracking).
async fn sync_queued_messages(
device: &mut MeshcoreDevice,
state: &Arc<MeshState>,
our_x25519_secret: &[u8; 32],
) {
) -> bool {
match device.sync_messages().await {
Ok(frames) => {
for frame in &frames {
@@ -612,9 +631,11 @@ async fn sync_queued_messages(
if !frames.is_empty() {
info!(count = frames.len(), "Synced queued mesh messages");
}
false
}
Err(e) => {
debug!("Message sync: {}", e);
true
}
}
}
@@ -654,3 +675,539 @@ async fn refresh_contacts(
}
}
}
// ─── Typed Message Dispatch ────────────────────────────────────────────
/// Try to base64-decode payload and check if the result is a typed envelope.
/// Returns the decoded bytes if it's a valid base64-encoded TypedEnvelope.
fn try_base64_typed(payload: &[u8]) -> Option<Vec<u8>> {
use base64::Engine;
// Quick check: base64 starts with uppercase letters or digits, not 0x02
if payload.is_empty() || payload[0] == message_types::TYPED_MESSAGE_MARKER {
return None;
}
let text = std::str::from_utf8(payload).ok()?;
let decoded = base64::engine::general_purpose::STANDARD.decode(text.trim()).ok()?;
if TypedEnvelope::is_typed(&decoded) {
Some(decoded)
} else {
None
}
}
/// Look up a peer by pubkey hex prefix. Returns (contact_id, display_name).
async fn resolve_peer(state: &Arc<MeshState>, sender_prefix: &str) -> (u32, String) {
let peers = state.peers.read().await;
peers
.values()
.find(|p| {
p.pubkey_hex
.as_ref()
.map(|k| k.starts_with(sender_prefix))
.unwrap_or(false)
})
.map(|p| (p.contact_id, p.advert_name.clone()))
.unwrap_or((0, sender_prefix.to_string()))
}
/// Store a plain-text (non-typed) message and emit an event.
async fn store_plain_message(
state: &Arc<MeshState>,
contact_id: u32,
peer_name: &str,
text: &str,
) {
let msg_id = state.next_id().await;
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Received,
peer_contact_id: contact_id,
peer_name: Some(peer_name.to_string()),
plaintext: text.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: true,
encrypted: false,
};
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
let _ = state.event_tx.send(MeshEvent::MessageReceived(msg));
}
/// Handle a typed message envelope (0x02 prefix).
/// Dispatches to type-specific handlers: BlockHeader, Alert, TxRelay, etc.
async fn handle_typed_message(
payload: &[u8],
sender_contact_id: u32,
sender_name: &str,
state: &Arc<MeshState>,
) {
let envelope = match TypedEnvelope::from_wire(payload) {
Ok(e) => e,
Err(e) => {
warn!(
payload_len = payload.len(),
first_bytes = %hex::encode(&payload[..payload.len().min(16)]),
"Failed to decode typed envelope: {}", e
);
return;
}
};
let msg_type = envelope.message_type();
let type_label = msg_type.map(|t| t.label()).unwrap_or("unknown");
info!(
msg_type = type_label,
from = sender_contact_id,
"Received typed mesh message"
);
match msg_type {
Some(MeshMessageType::BlockHeader) => {
// Compact binary format: height(8) + hash(32) + timestamp(4)
match super::bitcoin_relay::decode_compact_block_header(&envelope.v) {
Ok((height, hash_hex, _timestamp)) => {
info!(
height,
hash = %hash_hex,
"Block header received via mesh"
);
let text = format!(
"Block #{}{}...{}",
height,
&hash_hex[..8.min(hash_hex.len())],
&hash_hex[hash_hex.len().saturating_sub(8)..]
);
store_typed_message(
state,
sender_contact_id,
sender_name,
&text,
"block_header",
)
.await;
let _ = state.event_tx.send(MeshEvent::BlockHeaderReceived {
height,
hash: hash_hex,
});
}
Err(e) => warn!("Failed to decode block header: {}", e),
}
}
Some(MeshMessageType::Alert) => {
match message_types::decode_payload::<message_types::AlertPayload>(&envelope.v) {
Ok(alert) => {
let alert_type_str = format!("{:?}", alert.alert_type).to_lowercase();
info!(
alert_type = %alert_type_str,
from = sender_contact_id,
"Alert received via mesh: {}",
alert.message
);
store_typed_message(
state,
sender_contact_id,
sender_name,
&alert.message,
"alert",
)
.await;
let _ = state.event_tx.send(MeshEvent::AlertReceived {
alert_type: alert_type_str,
message: alert.message,
from_contact_id: sender_contact_id,
});
}
Err(e) => warn!("Failed to decode alert payload: {}", e),
}
}
Some(MeshMessageType::TxRelay) => {
match message_types::decode_payload::<message_types::TxRelayPayload>(&envelope.v) {
Ok(relay) => {
info!(
request_id = relay.request_id,
tx_len = relay.tx_hex.len(),
"TX relay request received — broadcasting to Bitcoin network"
);
store_typed_message(
state,
sender_contact_id,
sender_name,
&format!("TX relay request #{} ({} hex chars)", relay.request_id, relay.tx_hex.len()),
"tx_relay",
)
.await;
// Spawn async task to broadcast via Bitcoin RPC and track confirmations
let relay_state = Arc::clone(state);
let relay_contact = sender_contact_id;
tokio::spawn(async move {
handle_tx_relay_broadcast(relay, relay_contact, &relay_state).await;
});
}
Err(e) => warn!("Failed to decode TX relay payload: {}", e),
}
}
Some(MeshMessageType::TxRelayResponse) => {
match message_types::decode_payload::<message_types::TxRelayResponsePayload>(
&envelope.v,
) {
Ok(resp) => {
let status = if resp.txid.is_some() { "confirmed" } else { "failed" };
info!(
request_id = resp.request_id,
status,
"TX relay response received"
);
let text = if let Some(ref txid) = resp.txid {
format!("TX relayed! txid: {}...{}", &txid[..8.min(txid.len())], &txid[txid.len().saturating_sub(8)..])
} else {
format!("TX relay failed: {}", resp.error.as_deref().unwrap_or("unknown"))
};
store_typed_message(state, sender_contact_id, sender_name, &text, "tx_relay_response").await;
let _ = state.event_tx.send(MeshEvent::TxRelayCompleted {
request_id: resp.request_id,
txid: resp.txid,
error: resp.error,
});
}
Err(e) => warn!("Failed to decode TX relay response: {}", e),
}
}
Some(MeshMessageType::LightningRelay) => {
match message_types::decode_payload::<message_types::LightningRelayPayload>(
&envelope.v,
) {
Ok(relay) => {
info!(
request_id = relay.request_id,
amount_sats = relay.amount_sats,
"Lightning relay request received"
);
store_typed_message(
state,
sender_contact_id,
sender_name,
&format!("Lightning relay: {} sats", relay.amount_sats),
"lightning_relay",
)
.await;
// Will be wired to LND in Week 9
let _ = state.event_tx.send(MeshEvent::LightningRelayCompleted {
request_id: relay.request_id,
payment_hash: None,
error: Some("Lightning relay processing not yet wired".to_string()),
});
}
Err(e) => warn!("Failed to decode Lightning relay payload: {}", e),
}
}
Some(MeshMessageType::LightningRelayResponse) => {
match message_types::decode_payload::<message_types::LightningRelayResponsePayload>(
&envelope.v,
) {
Ok(resp) => {
let status = if resp.payment_hash.is_some() { "paid" } else { "failed" };
info!(request_id = resp.request_id, status, "Lightning relay response");
let text = if let Some(ref hash) = resp.payment_hash {
format!("Lightning paid! hash: {}...", &hash[..16.min(hash.len())])
} else {
format!("Lightning failed: {}", resp.error.as_deref().unwrap_or("unknown"))
};
store_typed_message(state, sender_contact_id, sender_name, &text, "lightning_relay_response").await;
let _ = state.event_tx.send(MeshEvent::LightningRelayCompleted {
request_id: resp.request_id,
payment_hash: resp.payment_hash,
error: resp.error,
});
}
Err(e) => warn!("Failed to decode Lightning relay response: {}", e),
}
}
Some(MeshMessageType::Invoice) => {
match message_types::decode_payload::<message_types::InvoicePayload>(&envelope.v) {
Ok(invoice) => {
let text = format!(
"Invoice: {} sats{}",
invoice.amount_sats,
invoice.memo.as_ref().map(|m| format!("{}", m)).unwrap_or_default()
);
store_typed_message(state, sender_contact_id, sender_name, &text, "invoice").await;
}
Err(e) => warn!("Failed to decode invoice payload: {}", e),
}
}
Some(MeshMessageType::Coordinate) => {
match message_types::decode_payload::<message_types::Coordinate>(&envelope.v) {
Ok(coord) => {
let text = format!(
"Location: {:.6}, {:.6}{}",
coord.lat_degrees(),
coord.lng_degrees(),
coord.label.as_ref().map(|l| format!(" ({})", l)).unwrap_or_default()
);
store_typed_message(state, sender_contact_id, sender_name, &text, "coordinate").await;
}
Err(e) => warn!("Failed to decode coordinate payload: {}", e),
}
}
Some(MeshMessageType::TxConfirmation) => {
match message_types::decode_payload::<message_types::TxConfirmationPayload>(&envelope.v) {
Ok(conf) => {
let status_text = if conf.confirmations >= 3 {
format!("TX {} confirmed ({}/3) at block #{}", &conf.txid[..12.min(conf.txid.len())], conf.confirmations, conf.block_height)
} else {
format!("TX {}{}/3 confirmations (block #{})", &conf.txid[..12.min(conf.txid.len())], conf.confirmations, conf.block_height)
};
info!(
txid = %conf.txid,
confirmations = conf.confirmations,
block_height = conf.block_height,
"TX confirmation update received"
);
store_typed_message(state, sender_contact_id, sender_name, &status_text, "tx_confirmation").await;
let _ = state.event_tx.send(MeshEvent::TxRelayCompleted {
request_id: conf.request_id,
txid: Some(conf.txid),
error: None,
});
}
Err(e) => warn!("Failed to decode TX confirmation: {}", e),
}
}
Some(MeshMessageType::Text) => {
// Typed text message — extract and store as plain text
let text = String::from_utf8_lossy(&envelope.v).to_string();
store_plain_message(state, sender_contact_id, sender_name, &text).await;
}
_ => {
debug!(
msg_type = ?msg_type,
"Unhandled typed message type"
);
}
}
}
/// Store a typed message with a type label for UI rendering.
async fn store_typed_message(
state: &Arc<MeshState>,
contact_id: u32,
peer_name: &str,
text: &str,
type_label: &str,
) {
let msg_id = state.next_id().await;
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Received,
peer_contact_id: contact_id,
peer_name: Some(peer_name.to_string()),
plaintext: format!("[{}] {}", type_label, text),
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: true,
encrypted: false,
};
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
let _ = state.event_tx.send(MeshEvent::MessageReceived(msg));
}
// ─── TX Relay Broadcast + Confirmation Tracking ────────────────────────
/// Called on an internet-connected node when it receives a TxRelay request.
/// Broadcasts the raw TX to Bitcoin via RPC, sends the txid back, then
/// monitors for 3 confirmations and sends updates back via mesh.
async fn handle_tx_relay_broadcast(
relay: message_types::TxRelayPayload,
sender_contact_id: u32,
state: &Arc<MeshState>,
) {
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
{
Ok(c) => c,
Err(e) => {
warn!("Failed to create HTTP client for TX relay: {}", e);
return;
}
};
// Step 1: Broadcast via Bitcoin Core RPC sendrawtransaction
let body = serde_json::json!({
"jsonrpc": "1.0",
"id": "mesh-relay",
"method": "sendrawtransaction",
"params": [relay.tx_hex]
});
let txid = match client
.post("http://127.0.0.1:8332/")
.basic_auth("archipelago", Some("archipelago123"))
.json(&body)
.send()
.await
{
Ok(resp) => {
match resp.json::<serde_json::Value>().await {
Ok(rpc_resp) => {
if let Some(err) = rpc_resp.get("error").and_then(|e| e.as_object()) {
let msg = err.get("message").and_then(|m| m.as_str()).unwrap_or("unknown");
warn!(request_id = relay.request_id, "sendrawtransaction failed: {}", msg);
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some(msg)).await;
return;
}
rpc_resp.get("result").and_then(|r| r.as_str()).map(|s| s.to_string())
}
Err(e) => {
warn!("Failed to parse Bitcoin RPC response: {}", e);
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some("RPC parse error")).await;
return;
}
}
}
Err(e) => {
warn!("Bitcoin Core RPC unreachable: {}", e);
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some("No Bitcoin node available")).await;
return;
}
};
let Some(txid) = txid else {
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some("No txid returned")).await;
return;
};
info!(request_id = relay.request_id, txid = %txid, "TX broadcast successful — tracking confirmations");
// Step 2: Send TxRelayResponse with txid back to originator
send_tx_relay_response(state, sender_contact_id, relay.request_id, Some(&txid), None).await;
// Step 3: Monitor confirmations (poll every 30s, up to 3 hours)
let mut last_reported_confs: u32 = 0;
for _ in 0..360 {
tokio::time::sleep(Duration::from_secs(30)).await;
match check_tx_confirmations(&client, &txid).await {
Ok((confs, block_height)) => {
if confs > last_reported_confs && confs <= 3 {
info!(txid = %txid, confirmations = confs, "Sending confirmation update via mesh");
send_confirmation_update(state, sender_contact_id, relay.request_id, &txid, confs, block_height).await;
last_reported_confs = confs;
if confs >= 3 {
info!(txid = %txid, "TX fully confirmed (3/3) — done tracking");
return;
}
}
}
Err(e) => {
debug!(txid = %txid, "Confirmation check: {}", e);
}
}
}
}
/// Send a TxRelayResponse back to the originating peer.
async fn send_tx_relay_response(
state: &Arc<MeshState>,
dest_contact_id: u32,
request_id: u64,
txid: Option<&str>,
error: Option<&str>,
) {
let wire = match super::bitcoin_relay::build_tx_relay_response(request_id, txid, error) {
Ok(w) => w,
Err(e) => {
warn!("Failed to build TX relay response: {}", e);
return;
}
};
send_to_peer(state, dest_contact_id, wire).await;
}
/// Send a TxConfirmation update to the originator.
async fn send_confirmation_update(
state: &Arc<MeshState>,
dest_contact_id: u32,
request_id: u64,
txid: &str,
confirmations: u32,
block_height: u64,
) {
let conf = message_types::TxConfirmationPayload {
request_id,
txid: txid.to_string(),
confirmations,
block_height,
};
if let Ok(payload_bytes) = message_types::encode_payload(&conf) {
let envelope = message_types::TypedEnvelope::new(
message_types::MeshMessageType::TxConfirmation,
payload_bytes,
);
if let Ok(wire) = envelope.to_wire() {
send_to_peer(state, dest_contact_id, wire).await;
}
}
}
/// Send raw wire bytes to a specific peer by contact_id.
/// Falls back to channel 0 broadcast if peer's pubkey is unknown.
async fn send_to_peer(state: &Arc<MeshState>, contact_id: u32, payload: Vec<u8>) {
let peers = state.peers.read().await;
if let Some(peer) = peers.get(&contact_id) {
if let Some(ref pk) = peer.pubkey_hex {
if let Ok(pk_bytes) = hex::decode(pk) {
if pk_bytes.len() >= 6 {
let mut prefix = [0u8; 6];
prefix.copy_from_slice(&pk_bytes[..6]);
drop(peers);
let _ = state.cmd_tx.send(MeshCommand::SendRaw {
dest_pubkey_prefix: prefix,
payload,
}).await;
return;
}
}
}
}
drop(peers);
let _ = state.cmd_tx.send(MeshCommand::BroadcastChannel {
channel: 0,
payload,
}).await;
}
/// Check transaction confirmation count via Bitcoin Core RPC.
async fn check_tx_confirmations(client: &reqwest::Client, txid: &str) -> anyhow::Result<(u32, u64)> {
let body = serde_json::json!({
"jsonrpc": "1.0",
"id": "mesh-conf",
"method": "gettransaction",
"params": [txid]
});
let resp = client
.post("http://127.0.0.1:8332/")
.basic_auth("archipelago", Some("archipelago123"))
.json(&body)
.send()
.await?;
let rpc_resp: serde_json::Value = resp.json().await?;
if let Some(result) = rpc_resp.get("result") {
let confs = result.get("confirmations").and_then(|c| c.as_u64()).unwrap_or(0) as u32;
let block_height = result.get("blockheight").and_then(|h| h.as_u64()).unwrap_or(0);
Ok((confs, block_height))
} else {
anyhow::bail!("gettransaction returned no result")
}
}

View File

@@ -30,6 +30,8 @@ pub enum MeshMessageType {
TxRelayResponse = 9,
LightningRelay = 10,
LightningRelayResponse = 11,
/// Confirmation update for a relayed transaction (1, 2, 3 confs).
TxConfirmation = 12,
}
impl MeshMessageType {
@@ -47,6 +49,7 @@ impl MeshMessageType {
9 => Some(Self::TxRelayResponse),
10 => Some(Self::LightningRelay),
11 => Some(Self::LightningRelayResponse),
12 => Some(Self::TxConfirmation),
_ => None,
}
}
@@ -65,6 +68,7 @@ impl MeshMessageType {
Self::TxRelayResponse => "tx_relay_response",
Self::LightningRelay => "lightning_relay",
Self::LightningRelayResponse => "lightning_relay_response",
Self::TxConfirmation => "tx_confirmation",
}
}
}
@@ -294,6 +298,18 @@ pub struct LightningRelayResponsePayload {
pub error: Option<String>,
}
/// Transaction confirmation update (relay node → originator).
/// Sent after each new confirmation (1, 2, 3) until fully confirmed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TxConfirmationPayload {
pub request_id: u64,
pub txid: String,
/// Number of confirmations (1, 2, 3).
pub confirmations: u32,
/// Block height where the transaction was included.
pub block_height: u64,
}
// ─── Helpers ────────────────────────────────────────────────────────────
/// Encode a payload type to CBOR bytes.

View File

@@ -31,15 +31,18 @@ pub mod x3dh;
pub use types::*;
use alerts::DeadManSwitch;
use anyhow::{Context, Result};
use bitcoin_relay::{BlockHeaderCache, RelayTracker};
use ed25519_dalek::SigningKey;
use listener::MeshState;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::fs;
use tokio::sync::{broadcast, watch};
use tracing::info;
use tracing::{error, info, warn};
const MESH_CONFIG_FILE: &str = "mesh-config.json";
@@ -62,6 +65,9 @@ pub struct MeshConfig {
/// Off-grid mode: disable Tor/internet, route everything via mesh only.
#[serde(default)]
pub mesh_only_mode: Option<bool>,
/// Announce new Bitcoin block headers over mesh (internet-connected nodes only).
#[serde(default)]
pub announce_block_headers: bool,
}
impl Default for MeshConfig {
@@ -73,6 +79,7 @@ impl Default for MeshConfig {
broadcast_identity: true,
advert_name: None,
mesh_only_mode: None,
announce_block_headers: false,
}
}
}
@@ -117,12 +124,19 @@ pub struct MeshService {
data_dir: PathBuf,
shutdown_tx: Option<watch::Sender<bool>>,
listener_handle: Option<tokio::task::JoinHandle<()>>,
deadman_handle: Option<tokio::task::JoinHandle<()>>,
block_announcer_handle: Option<tokio::task::JoinHandle<()>>,
cmd_rx: Option<tokio::sync::mpsc::Receiver<listener::MeshCommand>>,
// Crypto identity for this node
our_did: String,
our_ed_pubkey_hex: String,
our_x25519_secret: [u8; 32],
our_x25519_pubkey_hex: String,
signing_key: SigningKey,
// Phase 4: off-grid Bitcoin operations
pub block_header_cache: Arc<BlockHeaderCache>,
pub relay_tracker: Arc<RelayTracker>,
pub dead_man_switch: Arc<DeadManSwitch>,
}
#[allow(dead_code)]
@@ -149,17 +163,37 @@ impl MeshService {
)?;
let x25519_pubkey_hex = hex::encode(x25519_pubkey);
let block_header_cache = Arc::new(BlockHeaderCache::new());
let relay_tracker = Arc::new(RelayTracker::new());
let dead_man_switch = Arc::new(
DeadManSwitch::new(data_dir)
.await
.unwrap_or_else(|e| {
warn!("Failed to load dead man config (using defaults): {}", e);
// Fallback: create with defaults (won't persist until configured)
tokio::runtime::Handle::current()
.block_on(DeadManSwitch::new(data_dir))
.expect("DeadManSwitch fallback should succeed")
}),
);
Ok(Self {
state,
config,
data_dir: data_dir.to_path_buf(),
shutdown_tx: None,
listener_handle: None,
deadman_handle: None,
block_announcer_handle: None,
cmd_rx: Some(cmd_rx),
our_did: did.to_string(),
our_ed_pubkey_hex: ed_pubkey_hex.to_string(),
our_x25519_secret: x25519_secret,
our_x25519_pubkey_hex: x25519_pubkey_hex,
signing_key: signing_key.clone(),
block_header_cache,
relay_tracker,
dead_man_switch,
})
}
@@ -187,11 +221,169 @@ impl MeshService {
);
self.listener_handle = Some(handle);
// Spawn dead man's switch background checker
let dms = Arc::clone(&self.dead_man_switch);
let dms_state = Arc::clone(&self.state);
let dms_key = self.signing_key.clone();
let dms_shutdown = self.shutdown_tx.as_ref().unwrap().subscribe();
let dms_handle = tokio::spawn(async move {
let mut shutdown = dms_shutdown;
let mut interval = tokio::time::interval(Duration::from_secs(60));
interval.tick().await; // skip first immediate tick
loop {
tokio::select! {
_ = interval.tick() => {
if dms.is_triggered().await {
let was_triggered = *dms.triggered_flag().await;
if !was_triggered {
error!("Dead man's switch TRIGGERED — broadcasting alert");
if let Ok(wire) = dms.build_signed_alert(&dms_key).await {
for ch in [0u8, 1] {
let _ = dms_state.cmd_tx.send(
listener::MeshCommand::BroadcastChannel {
channel: ch,
payload: wire.clone(),
},
).await;
}
}
dms.mark_triggered().await;
}
}
}
_ = shutdown.changed() => {
if *shutdown.borrow() { return; }
}
}
}
});
self.deadman_handle = Some(dms_handle);
// Spawn block header announcer (internet-connected nodes only)
if self.config.announce_block_headers {
let bha_state = Arc::clone(&self.state);
let bha_cache = Arc::clone(&self.block_header_cache);
let bha_key = self.signing_key.clone();
let bha_did = self.our_did.clone();
let bha_shutdown = self.shutdown_tx.as_ref().unwrap().subscribe();
let bha_handle = tokio::spawn(async move {
let mut shutdown = bha_shutdown;
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.tick().await; // skip first
let mut last_announced_height: u64 = 0;
let client = match reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()
{
Ok(c) => c,
Err(e) => {
error!("Failed to create HTTP client for block announcer: {}", e);
return;
}
};
loop {
tokio::select! {
_ = interval.tick() => {
// Poll Bitcoin Core for latest block
match bitcoin_rpc_getblockcount(&client).await {
Ok(height) if height > last_announced_height => {
if let Ok(header) = bitcoin_rpc_getblockheader_by_height(&client, height).await {
// Store in cache
let payload = message_types::BlockHeaderPayload {
height,
hash: header.hash.clone(),
prev_hash: header.prev_hash.clone(),
timestamp: header.timestamp,
announced_by: bha_did.clone(),
};
let _ = bha_cache.store_header(payload).await;
// Build signed announcement and broadcast
match bitcoin_relay::build_block_header_announcement(
height,
&header.hash,
&header.prev_hash,
header.timestamp,
&bha_did,
&bha_key,
) {
Ok(wire) => {
// Send to peers — prefer Archy nodes, fall back to all (max 5)
let peers = bha_state.peers.read().await;
let mut sent = 0u32;
let max_peers = 5u32;
// First pass: Archy nodes
for peer in peers.values() {
if sent >= max_peers { break; }
if !peer.advert_name.starts_with("Archy-") { continue; }
if let Some(ref pk) = peer.pubkey_hex {
if let Ok(pk_bytes) = hex::decode(pk) {
if pk_bytes.len() >= 6 {
let mut prefix = [0u8; 6];
prefix.copy_from_slice(&pk_bytes[..6]);
let _ = bha_state.cmd_tx.send(
listener::MeshCommand::SendRaw {
dest_pubkey_prefix: prefix,
payload: wire.clone(),
},
).await;
sent += 1;
}
}
}
}
// Second pass: any peer if no Archy nodes found
if sent == 0 {
for peer in peers.values() {
if sent >= max_peers { break; }
if let Some(ref pk) = peer.pubkey_hex {
if let Ok(pk_bytes) = hex::decode(pk) {
if pk_bytes.len() >= 6 {
let mut prefix = [0u8; 6];
prefix.copy_from_slice(&pk_bytes[..6]);
let _ = bha_state.cmd_tx.send(
listener::MeshCommand::SendRaw {
dest_pubkey_prefix: prefix,
payload: wire.clone(),
},
).await;
sent += 1;
}
}
}
}
}
drop(peers);
last_announced_height = height;
info!(height, hash = %header.hash, peers = sent, "Announced block header to Archy peers");
}
Err(e) => warn!("Failed to build block announcement: {}", e),
}
}
}
Ok(_) => {} // No new block
Err(e) => {
// Bitcoin not running or not reachable — that's fine, skip
tracing::debug!("Block poll: {}", e);
}
}
}
_ = shutdown.changed() => {
if *shutdown.borrow() { return; }
}
}
}
});
self.block_announcer_handle = Some(bha_handle);
info!("Block header announcer started");
}
info!("Mesh service started");
Ok(())
}
/// Stop the background listener.
/// Stop the background listener and dead man's switch.
pub async fn stop(&mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(true);
@@ -199,6 +391,14 @@ impl MeshService {
if let Some(handle) = self.listener_handle.take() {
let _ = handle.await;
}
if let Some(handle) = self.deadman_handle.take() {
handle.abort();
let _ = handle.await;
}
if let Some(handle) = self.block_announcer_handle.take() {
handle.abort();
let _ = handle.await;
}
info!("Mesh service stopped");
}
@@ -358,10 +558,98 @@ impl MeshService {
pub fn shared_state(&self) -> Arc<MeshState> {
Arc::clone(&self.state)
}
/// Record user activity (resets dead man's switch timer).
pub async fn dead_man_check_in(&self) {
self.dead_man_switch.check_in().await;
}
/// Get the node's signing key (for signed messages).
pub fn signing_key(&self) -> &SigningKey {
&self.signing_key
}
/// Get our DID.
pub fn our_did(&self) -> &str {
&self.our_did
}
}
const MAX_MESSAGES_DEFAULT: usize = 100;
// ─── Bitcoin RPC helpers for block header announcer ────────────────────
#[derive(serde::Deserialize)]
struct BitcoinRpcResponse<T> {
result: Option<T>,
error: Option<serde_json::Value>,
}
struct BlockHeaderInfo {
hash: String,
prev_hash: String,
timestamp: u32,
}
async fn bitcoin_rpc_getblockcount(client: &reqwest::Client) -> Result<u64> {
let body = serde_json::json!({
"jsonrpc": "1.0", "id": "mesh", "method": "getblockcount", "params": []
});
let resp: BitcoinRpcResponse<u64> = client
.post("http://127.0.0.1:8332/")
.basic_auth("archipelago", Some("archipelago123"))
.json(&body)
.send()
.await
.map_err(|e| anyhow::anyhow!("Bitcoin RPC send failed: {}", e))?
.json()
.await
.map_err(|e| anyhow::anyhow!("Bitcoin RPC parse failed: {}", e))?;
if let Some(err) = resp.error {
anyhow::bail!("Bitcoin RPC: {}", err);
}
resp.result.ok_or_else(|| anyhow::anyhow!("Bitcoin RPC null result"))
}
async fn bitcoin_rpc_getblockheader_by_height(
client: &reqwest::Client,
height: u64,
) -> Result<BlockHeaderInfo> {
// First get block hash for this height
let body = serde_json::json!({
"jsonrpc": "1.0", "id": "mesh", "method": "getblockhash", "params": [height]
});
let resp: BitcoinRpcResponse<String> = client
.post("http://127.0.0.1:8332/")
.basic_auth("archipelago", Some("archipelago123"))
.json(&body)
.send()
.await?
.json()
.await?;
let hash = resp.result.ok_or_else(|| anyhow::anyhow!("No block hash"))?;
// Then get full header
let body = serde_json::json!({
"jsonrpc": "1.0", "id": "mesh", "method": "getblockheader", "params": [hash, true]
});
let resp: BitcoinRpcResponse<serde_json::Value> = client
.post("http://127.0.0.1:8332/")
.basic_auth("archipelago", Some("archipelago123"))
.json(&body)
.send()
.await?
.json()
.await?;
let header = resp.result.ok_or_else(|| anyhow::anyhow!("No block header"))?;
Ok(BlockHeaderInfo {
hash: header["hash"].as_str().unwrap_or_default().to_string(),
prev_hash: header["previousblockhash"].as_str().unwrap_or_default().to_string(),
timestamp: header["time"].as_u64().unwrap_or(0) as u32,
})
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -408,6 +408,78 @@ pub fn parse_channel_msg_v1(data: &[u8]) -> Result<(u8, String)> {
Ok((channel_idx, text))
}
// ─── Raw-bytes variants for typed message detection ────────────────────
/// Parse RESP_CONTACT_MSG_V3 returning raw payload bytes (not UTF-8 lossy).
/// Returns (sender_pubkey_prefix_hex, raw_payload_bytes, snr).
pub fn parse_contact_msg_v3_raw(data: &[u8]) -> Result<(String, Vec<u8>, i8)> {
if data.len() < 15 {
anyhow::bail!("Contact message too short: {} bytes", data.len());
}
let snr = data[0] as i8;
let pubkey_prefix = hex::encode(&data[3..9]);
let txt_type = data[10];
let text_start = if txt_type == 2 { 19 } else { 15 };
let payload = if data.len() > text_start {
data[text_start..].to_vec()
} else {
Vec::new()
};
Ok((pubkey_prefix, payload, snr))
}
/// Parse RESP_CONTACT_MSG returning raw payload bytes.
/// Returns (sender_pubkey_prefix_hex, raw_payload_bytes).
pub fn parse_contact_msg_v1_raw(data: &[u8]) -> Result<(String, Vec<u8>)> {
if data.len() < 12 {
anyhow::bail!("Contact message v1 too short: {} bytes", data.len());
}
let pubkey_prefix = hex::encode(&data[0..6]);
let txt_type = data[7];
let text_start = if txt_type == 2 { 16 } else { 12 };
let payload = if data.len() > text_start {
data[text_start..].to_vec()
} else {
Vec::new()
};
Ok((pubkey_prefix, payload))
}
/// Parse RESP_CHANNEL_MSG_V3 returning raw payload bytes.
/// Returns (channel_idx, raw_payload_bytes).
pub fn parse_channel_msg_v3_raw(data: &[u8]) -> Result<(u8, Vec<u8>)> {
if data.len() < 7 {
anyhow::bail!("Channel message too short: {} bytes", data.len());
}
let channel_idx = data[0];
let payload = if data.len() > 7 {
let mut p = data[7..].to_vec();
// Strip trailing NUL bytes
while p.last() == Some(&0) { p.pop(); }
p
} else {
Vec::new()
};
Ok((channel_idx, payload))
}
/// Parse RESP_CHANNEL_MSG returning raw payload bytes.
/// Returns (channel_idx, raw_payload_bytes).
pub fn parse_channel_msg_v1_raw(data: &[u8]) -> Result<(u8, Vec<u8>)> {
if data.len() < 7 {
anyhow::bail!("Channel message v1 too short: {} bytes", data.len());
}
let channel_idx = data[0];
let payload = if data.len() > 7 {
let mut p = data[7..].to_vec();
while p.last() == Some(&0) { p.pop(); }
p
} else {
Vec::new()
};
Ok((channel_idx, payload))
}
/// Parse RESP_ERR (0x01). Returns descriptive error string.
pub fn parse_error(data: &[u8]) -> String {
if data.is_empty() {

View File

@@ -111,4 +111,12 @@ pub enum MeshEvent {
pubkey_hex: String,
x25519_pubkey: [u8; 32],
},
/// Block header received from an internet-connected mesh peer.
BlockHeaderReceived { height: u64, hash: String },
/// Emergency or dead-man alert received from a peer.
AlertReceived { alert_type: String, message: String, from_contact_id: u32 },
/// TX relay completed (response received from internet peer).
TxRelayCompleted { request_id: u64, txid: Option<String>, error: Option<String> },
/// Lightning relay completed (response received from internet peer).
LightningRelayCompleted { request_id: u64, payment_hash: Option<String>, error: Option<String> },
}