feat(mesh): MessageKey foundation and debug-dump RPC

Adds sender_pubkey + sender_seq fields to MeshMessage so received
messages carry a stable cross-transport identity: (sender_pubkey,
sender_seq) pair. This is the foundation for the upcoming reply,
reaction, edit, and read-receipt variants — they need to target a
message by an ID that is meaningful on every node, not just locally.

Receive-side population lives in dispatch.rs::store_typed_message,
which now looks up the peer's pubkey_hex and copies envelope.seq from
the decoded TypedEnvelope. Sent-side population will land when we
plumb a per-node monotonic seq counter through the RPC layer.

Also adds mesh.debug-dump: a full in-memory state snapshot returning
peers, messages, status, shared-secret peer ids, encrypt_relay flag,
and stego mode — intended for smoke tests and bug investigation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian
2026-04-13 08:18:01 -04:00
parent ca7119df8c
commit 390ceaa75d
6 changed files with 79 additions and 5 deletions

View File

@@ -287,6 +287,7 @@ impl RpcHandler {
"mesh.status" => self.handle_mesh_status().await,
"mesh.peers" => self.handle_mesh_peers().await,
"mesh.messages" => self.handle_mesh_messages(params).await,
"mesh.debug-dump" => self.handle_mesh_debug_dump().await,
"mesh.send" => self.handle_mesh_send(params).await,
"mesh.send-channel" => self.handle_mesh_send_channel(params).await,
"mesh.broadcast" => self.handle_mesh_broadcast().await,

View File

@@ -70,6 +70,18 @@ impl RpcHandler {
}
}
/// mesh.debug-dump — Full in-memory state snapshot for debugging.
/// Returns peers, all messages, status, shared-secret peer ids, encrypt_relay
/// flag, and stego mode. Intended for smoke tests and bug investigation.
pub(in crate::api::rpc) async fn handle_mesh_debug_dump(&self) -> Result<serde_json::Value> {
let service = self.mesh_service.read().await;
if let Some(svc) = service.as_ref() {
Ok(svc.debug_dump().await)
} else {
Ok(serde_json::json!({ "running": false }))
}
}
/// mesh.session-status — Get ratchet session info for a peer.
pub(in crate::api::rpc) async fn handle_mesh_session_status(
&self,

View File

@@ -278,6 +278,8 @@ pub(super) async fn store_plain_message(
encrypted: false,
message_type: "text".to_string(),
typed_payload: None,
sender_pubkey: None,
sender_seq: None,
};
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
@@ -442,6 +444,8 @@ pub(super) async fn handle_received_message(
encrypted,
message_type: "text".to_string(),
typed_payload: None,
sender_pubkey: None,
sender_seq: None,
};
state.store_message(msg.clone()).await;

View File

@@ -16,8 +16,17 @@ async fn store_typed_message(
display_text: &str,
type_label: &str,
typed_payload: Option<serde_json::Value>,
sender_seq: Option<u64>,
) {
let msg_id = state.next_id().await;
// Populate stable MessageKey components from the peer record so the UI
// can dedupe and target replies/reactions across transports.
let sender_pubkey = state
.peers
.read()
.await
.get(&contact_id)
.and_then(|p| p.pubkey_hex.clone());
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Received,
@@ -29,6 +38,8 @@ async fn store_typed_message(
encrypted: false,
message_type: type_label.to_string(),
typed_payload,
sender_pubkey,
sender_seq,
};
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
@@ -122,6 +133,7 @@ pub(super) async fn handle_typed_message(
&alert.message,
"alert",
json,
Some(envelope.seq),
)
.await;
let _ = state.event_tx.send(MeshEvent::AlertReceived {
@@ -155,6 +167,7 @@ pub(super) async fn handle_typed_message(
&format!("TX relay request #{} ({} hex chars)", relay.request_id, relay.tx_hex.len()),
"tx_relay",
json,
Some(envelope.seq),
)
.await;
@@ -191,6 +204,7 @@ pub(super) async fn handle_typed_message(
&format!("Lightning relay: {} sats", relay.amount_sats),
"lightning_relay",
json,
Some(envelope.seq),
)
.await;
// Will be wired to LND in Week 9
@@ -217,7 +231,7 @@ pub(super) async fn handle_typed_message(
format!("Lightning failed: {}", resp.error.as_deref().unwrap_or("unknown"))
};
let json = payload_to_json(&resp);
store_typed_message(state, sender_contact_id, sender_name, &text, "lightning_relay_response", json).await;
store_typed_message(state, sender_contact_id, sender_name, &text, "lightning_relay_response", json, Some(envelope.seq)).await;
let _ = state.event_tx.send(MeshEvent::LightningRelayCompleted {
request_id: resp.request_id,
payment_hash: resp.payment_hash,
@@ -237,7 +251,7 @@ pub(super) async fn handle_typed_message(
invoice.memo.as_ref().map(|m| format!("{}", m)).unwrap_or_default()
);
let json = payload_to_json(&invoice);
store_typed_message(state, sender_contact_id, sender_name, &text, "invoice", json).await;
store_typed_message(state, sender_contact_id, sender_name, &text, "invoice", json, Some(envelope.seq)).await;
}
Err(e) => warn!("Failed to decode invoice payload: {}", e),
}
@@ -253,7 +267,7 @@ pub(super) async fn handle_typed_message(
coord.label.as_ref().map(|l| format!(" ({})", l)).unwrap_or_default()
);
let json = payload_to_json(&coord);
store_typed_message(state, sender_contact_id, sender_name, &text, "coordinate", json).await;
store_typed_message(state, sender_contact_id, sender_name, &text, "coordinate", json, Some(envelope.seq)).await;
}
Err(e) => warn!("Failed to decode coordinate payload: {}", e),
}
@@ -325,6 +339,7 @@ async fn dispatch_block_header(
&text,
"block_header",
json,
Some(envelope.seq),
)
.await;
let _ = state.event_tx.send(MeshEvent::BlockHeaderReceived {
@@ -360,7 +375,7 @@ async fn dispatch_tx_relay_response(
format!("TX relay failed: {}", resp.error.as_deref().unwrap_or("unknown"))
};
let json = payload_to_json(&resp);
store_typed_message(state, sender_contact_id, sender_name, &text, "tx_relay_response", json).await;
store_typed_message(state, sender_contact_id, sender_name, &text, "tx_relay_response", json, Some(envelope.seq)).await;
// Store result for frontend polling
if let Some(ref tracker) = state.relay_tracker {
tracker.store_result(super::super::bitcoin_relay::RelayResult {
@@ -402,7 +417,7 @@ async fn dispatch_tx_confirmation(
"TX confirmation update received"
);
let json = payload_to_json(&conf);
store_typed_message(state, sender_contact_id, sender_name, &status_text, "tx_confirmation", json).await;
store_typed_message(state, sender_contact_id, sender_name, &status_text, "tx_confirmation", json, Some(envelope.seq)).await;
// Store confirmation for frontend polling
if let Some(ref tracker) = state.relay_tracker {
tracker.store_result(super::super::bitcoin_relay::RelayResult {

View File

@@ -432,6 +432,28 @@ impl MeshService {
messages.iter().skip(skip).cloned().collect()
}
/// Full in-memory state dump for debugging. Returns peers, all messages,
/// status, shared-secret peer ids (not the secrets), encrypt_relay flag,
/// and stego mode. Intended for development/smoke-test use only — don't
/// call this on a hot path.
pub async fn debug_dump(&self) -> serde_json::Value {
let status = self.state.status.read().await.clone();
let peers: Vec<_> = self.state.peers.read().await.values().cloned().collect();
let messages: Vec<_> = self.state.messages.read().await.iter().cloned().collect();
let secret_peer_ids: Vec<u32> =
self.state.shared_secrets.read().await.keys().copied().collect();
serde_json::json!({
"status": status,
"peers": peers,
"peer_count": peers.len(),
"messages": messages,
"message_count": messages.len(),
"secret_peer_ids": secret_peer_ids,
"encrypt_relay": self.state.encrypt_relay,
"stego_mode": format!("{:?}", self.state.stego_mode),
})
}
/// Resolve a peer's 6-byte public-key prefix for mesh addressing.
async fn peer_dest_prefix(&self, contact_id: u32) -> Result<[u8; 6]> {
let peers = self.state.peers.read().await;
@@ -547,6 +569,8 @@ impl MeshService {
encrypted: false,
message_type: type_label.to_string(),
typed_payload,
sender_pubkey: None,
sender_seq: None,
};
self.state.store_message(msg.clone()).await;
{
@@ -584,6 +608,8 @@ impl MeshService {
encrypted,
message_type: "text".to_string(),
typed_payload: None,
sender_pubkey: None,
sender_seq: None,
};
self.state.store_message(msg.clone()).await;
@@ -625,6 +651,8 @@ impl MeshService {
encrypted: false,
message_type: type_label.to_string(),
typed_payload,
sender_pubkey: None,
sender_seq: None,
};
self.state.store_message(msg.clone()).await;
{
@@ -678,6 +706,8 @@ impl MeshService {
encrypted: false,
message_type: "text".to_string(),
typed_payload: None,
sender_pubkey: None,
sender_seq: None,
};
self.state.store_message(msg.clone()).await;

View File

@@ -78,6 +78,18 @@ pub struct MeshMessage {
/// Structured payload as JSON — populated for non-text typed messages.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub typed_payload: Option<serde_json::Value>,
/// Hex-encoded sender pubkey. On Received: the peer's mesh public key
/// (or first 6 bytes / full key as available). On Sent: None today,
/// populated later when we own the key. Combined with `sender_seq`
/// this forms a stable cross-transport MessageKey that reactions,
/// replies, edits, and read-receipts can reference without relying
/// on the local `id` field (which is only meaningful to one node).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sender_pubkey: Option<String>,
/// Per-sender monotonic sequence from the typed envelope. Paired with
/// `sender_pubkey` to form the stable MessageKey.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sender_seq: Option<u64>,
}
fn default_message_type() -> String {