fix(mesh): DM-via-channel tunnel + disable presence spam
Meshcore direct unicast silently drops between our two Archy nodes (firmware reports flood sends with resp_code=6 but nothing arrives). Wrap DMs as channel-1 broadcasts with a [0xD1][dest_prefix(6)][inner] header; receivers filter by prefix and dispatch the inner payload through the existing typed/base64/chunk ladder. Shrink chunk body to 125B so the wrapper still fits the 160B LoRa budget. Auto-heal routing: CMD_RESET_PATH (0x0D) any type-1 contact with path_len=0 on refresh so floods take over. send_text now returns the firmware's flood/direct mode flag for diagnostics. Disable the 120s presence heartbeat broadcaster — its CBOR payload was being re-echoed as plaintext by the shared repeater, spamming every visible node with garbled "Archy-…: av�…fstatusfonline…" messages on channel 0. mesh.broadcast-presence RPC stays registered but no longer transmits. Re-enable only once presence moves off the shared broadcast path. Also: MeshState.cmd_tx behind RwLock so stop()→start() cycles don't fail with "command channel already consumed"; MeshService.send_cmd helper; drop_message_by_id for control envelopes that shouldn't appear as Sent bubbles; self_advert_name reflected into MeshStatus after set; path_len/flags parsed out of RESP_CONTACT. Frontend: unified inbox merges mesh peers with federation nodes by DID/pubkey/name; hide presence/read_receipt/edit/channel_invite/ contact_card from chat stream; publicChannel index → 1 to match the new DM-via-channel routing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3,7 +3,7 @@ use crate::blobs::DEFAULT_CAP_TTL_SECS;
|
||||
use crate::mesh::message_types::{
|
||||
self, AlertPayload, AlertType, ChannelInvitePayload, ContentRefPayload, Coordinate,
|
||||
DeletePayload, EditPayload, ForwardPayload, InvoicePayload, MessageKey, MeshMessageType,
|
||||
PresencePayload, PsbtHashPayload, ReactionPayload, ReadReceiptPayload, ReplyPayload,
|
||||
PsbtHashPayload, ReactionPayload, ReadReceiptPayload, ReplyPayload,
|
||||
TypedEnvelope,
|
||||
};
|
||||
use anyhow::Result;
|
||||
@@ -647,6 +647,10 @@ impl RpcHandler {
|
||||
let msg = svc
|
||||
.send_typed_wire(contact_id, wire, "read_receipt", &display, typed_json, seq)
|
||||
.await?;
|
||||
// Read receipts are control envelopes; the receiver uses them to
|
||||
// roll the ✓✓ marker forward on the matching outgoing bubble. They
|
||||
// must not surface as standalone bubbles in our own chat history.
|
||||
svc.drop_message_by_id(msg.id).await;
|
||||
info!(contact_id, seq, "Sent read receipt over mesh");
|
||||
Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq }))
|
||||
}
|
||||
@@ -776,6 +780,10 @@ impl RpcHandler {
|
||||
let msg = svc
|
||||
.send_typed_wire(contact_id, wire, "edit", &new_text, typed_json, seq)
|
||||
.await?;
|
||||
// Edits are control envelopes — they mutate the target bubble in
|
||||
// apply_local_edit below, so the standalone Sent record has no UI
|
||||
// value and would just clutter the chat.
|
||||
svc.drop_message_by_id(msg.id).await;
|
||||
|
||||
// Best-effort: apply the edit to our own local copy too, so the UI
|
||||
// updates without waiting for an echo.
|
||||
@@ -824,6 +832,10 @@ impl RpcHandler {
|
||||
let msg = svc
|
||||
.send_typed_wire(contact_id, wire, "delete", "(deleted)", typed_json, seq)
|
||||
.await?;
|
||||
// Delete is a control envelope — apply_local_delete below tombstones
|
||||
// the target bubble in place, so the standalone Sent record is just
|
||||
// noise in the chat history.
|
||||
svc.drop_message_by_id(msg.id).await;
|
||||
|
||||
svc.apply_local_delete(target_seq).await;
|
||||
|
||||
@@ -836,35 +848,14 @@ impl RpcHandler {
|
||||
/// Params: `{ channel?, status? }`. Defaults: channel 0, status "online".
|
||||
pub(in crate::api::rpc) async fn handle_mesh_broadcast_presence(
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
_params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
let params = params.unwrap_or(serde_json::json!({}));
|
||||
let channel = params["channel"].as_u64().unwrap_or(0) as u8;
|
||||
let status = params["status"].as_str().unwrap_or("online").to_string();
|
||||
|
||||
let presence = PresencePayload {
|
||||
status: status.clone(),
|
||||
last_active: chrono::Utc::now().timestamp() as u32,
|
||||
};
|
||||
|
||||
let service = self.mesh_service.read().await;
|
||||
let svc = service
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
|
||||
let seq = svc.next_send_seq(0).await;
|
||||
let payload = message_types::encode_payload(&presence)?;
|
||||
let envelope = TypedEnvelope::new(MeshMessageType::Presence, payload).with_seq(seq);
|
||||
let wire = envelope.to_wire()?;
|
||||
let typed_json = serde_json::to_value(&presence).ok();
|
||||
// Best-effort: if the mesh device isn't connected, skip silently —
|
||||
// presence heartbeats don't deserve a user-visible error.
|
||||
match svc
|
||||
.send_channel_typed_wire(channel, wire, "presence", &status, typed_json, seq)
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(serde_json::json!({ "sent": true, "sender_seq": seq })),
|
||||
Err(e) => Ok(serde_json::json!({ "sent": false, "reason": e.to_string() })),
|
||||
}
|
||||
// DISABLED: presence broadcasts were spamming the public channel
|
||||
// with malformed CBOR bytes (repeaters re-echoed our
|
||||
// PresencePayload as plaintext, producing "av<61>…fstatusfonline…").
|
||||
// The RPC stays registered so frontends that still call it don't
|
||||
// hard-fail, but it no longer transmits anything.
|
||||
Ok(serde_json::json!({ "sent": false, "reason": "presence disabled" }))
|
||||
}
|
||||
|
||||
/// mesh.presence-list — return the in-memory presence map (pubkey → status+timestamps).
|
||||
|
||||
@@ -94,15 +94,7 @@ pub(super) async fn handle_frame(
|
||||
match protocol::parse_channel_msg_v3_raw(&frame.data) {
|
||||
Ok((channel_idx, payload)) => {
|
||||
if !payload.is_empty() {
|
||||
let chan_contact_id = u32::MAX - (channel_idx as u32);
|
||||
let chan_name = format!("Channel {}", channel_idx);
|
||||
if TypedEnvelope::is_typed(&payload) {
|
||||
handle_typed_message(&payload, chan_contact_id, &chan_name, state).await;
|
||||
} else {
|
||||
let text = String::from_utf8_lossy(&payload).to_string();
|
||||
store_plain_message(state, chan_contact_id, &chan_name, &text).await;
|
||||
info!(channel = channel_idx, "Received mesh channel message (v3)");
|
||||
}
|
||||
handle_channel_payload(state, channel_idx, &payload).await;
|
||||
}
|
||||
}
|
||||
Err(e) => warn!("Failed to parse v3 channel message: {}", e),
|
||||
@@ -114,15 +106,7 @@ pub(super) async fn handle_frame(
|
||||
match protocol::parse_channel_msg_v1_raw(&frame.data) {
|
||||
Ok((channel_idx, payload)) => {
|
||||
if !payload.is_empty() {
|
||||
let chan_contact_id = u32::MAX - (channel_idx as u32);
|
||||
let chan_name = format!("Channel {}", channel_idx);
|
||||
if TypedEnvelope::is_typed(&payload) {
|
||||
handle_typed_message(&payload, chan_contact_id, &chan_name, state).await;
|
||||
} else {
|
||||
let text = String::from_utf8_lossy(&payload).to_string();
|
||||
store_plain_message(state, chan_contact_id, &chan_name, &text).await;
|
||||
info!(channel = channel_idx, "Received mesh channel message");
|
||||
}
|
||||
handle_channel_payload(state, channel_idx, &payload).await;
|
||||
}
|
||||
}
|
||||
Err(e) => warn!("Failed to parse channel message: {}", e),
|
||||
@@ -141,3 +125,138 @@ pub(super) async fn handle_frame(
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Process a channel-broadcast payload. If the payload carries the
|
||||
/// DM-via-channel marker and the destination prefix matches any of our
|
||||
/// local mesh peer pubkeys (or we can't tell), the inner payload is
|
||||
/// dispatched through the direct-message path so it lands in the right
|
||||
/// chat. Otherwise it's handled as a normal channel text/typed message.
|
||||
async fn handle_channel_payload(
|
||||
state: &Arc<MeshState>,
|
||||
channel_idx: u8,
|
||||
payload: &[u8],
|
||||
) {
|
||||
// DM-via-channel wrapper: [marker(1)][dest_prefix(6)][inner…]
|
||||
if payload.len() >= 7 && payload[0] == protocol::DM_VIA_CHANNEL_MARKER {
|
||||
let dest_prefix: [u8; 6] = payload[1..7]
|
||||
.try_into()
|
||||
.expect("sliced 6 bytes");
|
||||
let inner = &payload[7..];
|
||||
|
||||
// If the destination prefix matches a contact we know about that
|
||||
// isn't ourselves, forward it (the channel broadcast is shared by
|
||||
// everyone but only the intended recipient should treat it as a
|
||||
// DM). We compare against our mesh contacts — if the prefix is
|
||||
// not one of our known peers AND not our self_node_id, we drop
|
||||
// because it's someone else's DM bouncing through the mesh.
|
||||
let addressed_to_us = dest_prefix_is_us(state, &dest_prefix).await;
|
||||
if !addressed_to_us {
|
||||
debug!(
|
||||
dest = %hex::encode(dest_prefix),
|
||||
inner_len = inner.len(),
|
||||
"Dropping DM-via-channel (not for us)"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
info!(
|
||||
dest = %hex::encode(dest_prefix),
|
||||
inner_len = inner.len(),
|
||||
channel = channel_idx,
|
||||
"Received DM via channel (addressed to us)"
|
||||
);
|
||||
|
||||
// Treat the inner payload exactly the same as we'd treat a direct
|
||||
// unicast frame — resolve the sender from our peer table (we
|
||||
// don't know the sender here, so use a synthetic-ish contact_id
|
||||
// derived from the first peer whose dest_prefix != us), and
|
||||
// dispatch through the typed / base64 / plain-text ladder.
|
||||
// Because the wrapped frame doesn't carry the sender prefix, we
|
||||
// pick "the other side of the conversation" — there are only two
|
||||
// known archipelago peers in the radio neighborhood, so the
|
||||
// sender is whoever isn't us. For the typical 2-node setup this
|
||||
// is correct. When there are more peers, upper layers (typed
|
||||
// envelope sender_pubkey) will carry the real sender identity.
|
||||
let (contact_id, name) = resolve_counterparty(state, &dest_prefix).await;
|
||||
if TypedEnvelope::is_typed(inner) {
|
||||
handle_typed_message(inner, contact_id, &name, state).await;
|
||||
} else if let Some(decoded) = try_base64_typed(inner) {
|
||||
handle_typed_message(&decoded, contact_id, &name, state).await;
|
||||
} else if let Some(decoded) = try_chunk_reassemble(inner, contact_id, state).await {
|
||||
// Reassembled a chunked MC-framed payload
|
||||
if TypedEnvelope::is_typed(&decoded) {
|
||||
handle_typed_message(&decoded, contact_id, &name, state).await;
|
||||
} else {
|
||||
let text = String::from_utf8_lossy(&decoded).to_string();
|
||||
store_plain_message(state, contact_id, &name, &text).await;
|
||||
}
|
||||
} else {
|
||||
let text = String::from_utf8_lossy(inner).to_string();
|
||||
store_plain_message(state, contact_id, &name, &text).await;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Regular channel broadcast (not DM-wrapped)
|
||||
let chan_contact_id = u32::MAX - (channel_idx as u32);
|
||||
let chan_name = format!("Channel {}", channel_idx);
|
||||
if TypedEnvelope::is_typed(payload) {
|
||||
handle_typed_message(payload, chan_contact_id, &chan_name, state).await;
|
||||
} else {
|
||||
let text = String::from_utf8_lossy(payload).to_string();
|
||||
store_plain_message(state, chan_contact_id, &chan_name, &text).await;
|
||||
info!(channel = channel_idx, "Received mesh channel message");
|
||||
}
|
||||
}
|
||||
|
||||
/// Return true if the given 6-byte pubkey prefix matches our own meshcore
|
||||
/// firmware pubkey. We don't currently track our own firmware pubkey in
|
||||
/// state (the SELF_INFO parse only pulls the node_id), so this falls back
|
||||
/// to "not any of our known peers" — i.e. if the prefix isn't one of the
|
||||
/// OTHER contacts in our mesh contact table, it must be us. That holds
|
||||
/// for the typical 2-node-plus-repeaters topology and is good enough to
|
||||
/// filter out DMs clearly addressed to someone else.
|
||||
async fn dest_prefix_is_us(state: &Arc<MeshState>, dest_prefix: &[u8; 6]) -> bool {
|
||||
let peers = state.peers.read().await;
|
||||
for p in peers.values() {
|
||||
if let Some(hex_pk) = p.pubkey_hex.as_deref() {
|
||||
if hex_pk.len() >= 12 {
|
||||
if let Ok(bytes) = hex::decode(&hex_pk[..12]) {
|
||||
if bytes.len() == 6 && bytes[..] == dest_prefix[..] {
|
||||
// It matches a peer we know — so it's NOT for us
|
||||
// (we'd never have a peer row for ourselves).
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
/// Pick a "counterparty" contact_id when dispatching a DM-via-channel
|
||||
/// whose sender we don't otherwise know. We look for any archipelago
|
||||
/// (type-1, "Archy-*") peer in the contact table whose prefix ISN'T the
|
||||
/// destination — that's "the other side." Falls back to contact_id=0
|
||||
/// when nothing matches.
|
||||
async fn resolve_counterparty(
|
||||
state: &Arc<MeshState>,
|
||||
dest_prefix: &[u8; 6],
|
||||
) -> (u32, String) {
|
||||
let peers = state.peers.read().await;
|
||||
for p in peers.values() {
|
||||
if !p.advert_name.starts_with("Archy-") {
|
||||
continue;
|
||||
}
|
||||
if let Some(hex_pk) = p.pubkey_hex.as_deref() {
|
||||
if hex_pk.len() >= 12 {
|
||||
if let Ok(bytes) = hex::decode(&hex_pk[..12]) {
|
||||
if bytes.len() == 6 && bytes[..] != dest_prefix[..] {
|
||||
return (p.contact_id, p.advert_name.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
(0, "dm-via-channel".to_string())
|
||||
}
|
||||
|
||||
@@ -68,7 +68,11 @@ pub struct MeshState {
|
||||
pub shared_secrets: RwLock<HashMap<u32, [u8; 32]>>,
|
||||
pub status: RwLock<MeshStatus>,
|
||||
pub event_tx: broadcast::Sender<MeshEvent>,
|
||||
pub cmd_tx: mpsc::Sender<MeshCommand>,
|
||||
/// Command channel sender. Wrapped in RwLock so `MeshService::stop()`
|
||||
/// can swap it for a fresh channel when the listener task drains the
|
||||
/// old receiver — without this, a disable→enable cycle fails with
|
||||
/// "Command channel already consumed" on the second start().
|
||||
pub cmd_tx: RwLock<mpsc::Sender<MeshCommand>>,
|
||||
next_message_id: RwLock<u64>,
|
||||
/// Per-contact outbound sequence counter. Increments on every typed
|
||||
/// envelope we send to a given peer so the receiver (and anyone else
|
||||
@@ -129,7 +133,7 @@ impl MeshState {
|
||||
peers: RwLock::new(HashMap::new()),
|
||||
messages: RwLock::new(VecDeque::new()),
|
||||
shared_secrets: RwLock::new(HashMap::new()),
|
||||
cmd_tx,
|
||||
cmd_tx: RwLock::new(cmd_tx),
|
||||
status: RwLock::new(MeshStatus {
|
||||
enabled: true,
|
||||
device_type: DeviceType::Unknown,
|
||||
@@ -158,6 +162,14 @@ impl MeshState {
|
||||
(state, rx, cmd_rx)
|
||||
}
|
||||
|
||||
/// Send a command to the listener. Reads the current sender from the
|
||||
/// RwLock and clones for the async send. Returns the mpsc SendError so
|
||||
/// callers can treat a dead listener as "mesh not running".
|
||||
pub async fn send_cmd(&self, cmd: MeshCommand) -> Result<(), mpsc::error::SendError<MeshCommand>> {
|
||||
let tx = self.cmd_tx.read().await.clone();
|
||||
tx.send(cmd).await
|
||||
}
|
||||
|
||||
pub async fn next_id(&self) -> u64 {
|
||||
let mut id = self.next_message_id.write().await;
|
||||
let current = *id;
|
||||
|
||||
@@ -34,6 +34,19 @@ async fn auto_detect_and_open() -> Result<(String, MeshcoreDevice, DeviceInfo)>
|
||||
anyhow::bail!("No Meshcore device found on {} candidate ports: {:?}", paths.len(), paths)
|
||||
}
|
||||
|
||||
/// Wrap a direct-message payload as a channel-1 broadcast body. Format:
|
||||
/// `[DM_VIA_CHANNEL_MARKER(1)][dest_pubkey_prefix(6)][inner_payload…]`
|
||||
/// Receivers that see this marker on a channel frame extract the header,
|
||||
/// filter by destination, and dispatch the inner payload as if it were a
|
||||
/// direct unicast message.
|
||||
fn wrap_dm_via_channel(dest_pubkey_prefix: &[u8; 6], inner: &[u8]) -> Vec<u8> {
|
||||
let mut out = Vec::with_capacity(1 + 6 + inner.len());
|
||||
out.push(super::super::protocol::DM_VIA_CHANNEL_MARKER);
|
||||
out.extend_from_slice(dest_pubkey_prefix);
|
||||
out.extend_from_slice(inner);
|
||||
out
|
||||
}
|
||||
|
||||
/// Fetch the contacts list from the device and update the peer cache.
|
||||
async fn refresh_contacts(
|
||||
device: &mut MeshcoreDevice,
|
||||
@@ -62,6 +75,36 @@ async fn refresh_contacts(
|
||||
state.update_peer_count().await;
|
||||
if !contacts.is_empty() {
|
||||
info!(count = contacts.len(), "Refreshed mesh contacts");
|
||||
// Auto-heal routing: any type-1 (friend) contact whose stored
|
||||
// path_len is 0 has no route established, which makes the
|
||||
// meshcore firmware silently drop outbound TXT_MSGs to it.
|
||||
// Flip those to OUT_PATH_UNKNOWN (0xFF) via CMD_RESET_PATH so
|
||||
// the firmware falls back to flood routing. Advert discovery
|
||||
// will fill in a real path later if one becomes available.
|
||||
for c in contacts.iter() {
|
||||
if c.contact_type != 1 || c.path_len != 0 {
|
||||
continue;
|
||||
}
|
||||
let pk_bytes = match hex::decode(&c.public_key_hex) {
|
||||
Ok(b) if b.len() == 32 => {
|
||||
let mut arr = [0u8; 32];
|
||||
arr.copy_from_slice(&b);
|
||||
arr
|
||||
}
|
||||
_ => continue,
|
||||
};
|
||||
match device.reset_contact_path(&pk_bytes).await {
|
||||
Ok(()) => info!(
|
||||
name = %c.advert_name,
|
||||
"Reset contact path → flood (was path_len=0)"
|
||||
),
|
||||
Err(e) => warn!(
|
||||
name = %c.advert_name,
|
||||
"reset_contact_path failed: {}",
|
||||
e
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -142,6 +185,13 @@ pub(super) async fn run_mesh_session(
|
||||
let advert_name = format!("Archy-{}", short_did);
|
||||
if let Err(e) = device.set_advert_name(&advert_name).await {
|
||||
warn!("Failed to set advert name: {}", e);
|
||||
} else {
|
||||
// Reflect the post-set name in MeshStatus too so the UI can filter
|
||||
// its own radio echo from the peer list. Without this, the status
|
||||
// still carries whatever pre-set name the firmware reported and the
|
||||
// self-filter never matches.
|
||||
let mut status = state.status.write().await;
|
||||
status.self_advert_name = Some(advert_name.clone());
|
||||
}
|
||||
|
||||
// Broadcast our advertisement so other nodes can discover us
|
||||
@@ -251,12 +301,24 @@ async fn handle_send_command(
|
||||
) {
|
||||
match cmd {
|
||||
MeshCommand::SendText { dest_pubkey_prefix, payload } => {
|
||||
if let Err(e) = device.send_text(&dest_pubkey_prefix, &payload).await {
|
||||
// Route the DM as a DM-via-channel broadcast: meshcore's
|
||||
// direct-unicast path silently drops between our two nodes
|
||||
// (proven via `mode=flood resp_code=6` diag — the firmware
|
||||
// transmits but nothing arrives), while channel-1 broadcasts
|
||||
// reliably flood via the FreeMadeira repeater. Wrap the
|
||||
// payload with a recipient pubkey-prefix header so the
|
||||
// receiver side can tell it apart from normal channel text.
|
||||
let wrapped = wrap_dm_via_channel(&dest_pubkey_prefix, &payload);
|
||||
if let Err(e) = device.send_channel_text(1, &wrapped).await {
|
||||
*consecutive_write_failures += 1;
|
||||
warn!(failures = *consecutive_write_failures, "Failed to send text via mesh: {}", e);
|
||||
warn!(failures = *consecutive_write_failures, "Failed to send DM via channel: {}", e);
|
||||
} else {
|
||||
*consecutive_write_failures = 0;
|
||||
info!(dest = %hex::encode(dest_pubkey_prefix), len = payload.len(), "Sent mesh message");
|
||||
info!(
|
||||
dest = %hex::encode(dest_pubkey_prefix),
|
||||
len = payload.len(),
|
||||
"Sent mesh message (DM via channel)"
|
||||
);
|
||||
}
|
||||
}
|
||||
MeshCommand::SendRaw { dest_pubkey_prefix, payload } => {
|
||||
@@ -278,20 +340,30 @@ async fn handle_send_command(
|
||||
use base64::Engine;
|
||||
let encoded = base64::engine::general_purpose::STANDARD.encode(&wire_payload);
|
||||
|
||||
if encoded.len() <= 140 {
|
||||
// Single frame — fits in one LoRa packet
|
||||
if let Err(e) = device.send_text(&dest_pubkey_prefix, encoded.as_bytes()).await {
|
||||
// Route via DM-via-channel wrapper. Channel-1 broadcasts are
|
||||
// the only proven-working path between our two nodes, so we
|
||||
// send the base64 chunk on channel 1 with a recipient header
|
||||
// the receiver can use to filter. Chunk size is reduced by 7
|
||||
// bytes (1 marker + 6 dest-prefix) so each wrapped frame
|
||||
// still fits inside the LoRa 160-byte budget.
|
||||
if encoded.len() <= 133 {
|
||||
// Single frame — wraps under 160B. 160 − 7 wrapper − some
|
||||
// safety margin leaves ≈133 bytes for the base64 payload.
|
||||
let wrapped = wrap_dm_via_channel(&dest_pubkey_prefix, encoded.as_bytes());
|
||||
if let Err(e) = device.send_channel_text(1, &wrapped).await {
|
||||
*consecutive_write_failures += 1;
|
||||
warn!(failures = *consecutive_write_failures, "Failed to send raw via mesh: {}", e);
|
||||
warn!(failures = *consecutive_write_failures, "Failed to send raw DM-via-channel: {}", e);
|
||||
} else {
|
||||
*consecutive_write_failures = 0;
|
||||
info!(dest = %hex::encode(dest_pubkey_prefix), len = encoded.len(), "Sent raw mesh message");
|
||||
info!(dest = %hex::encode(dest_pubkey_prefix), len = encoded.len(), "Sent raw mesh message (DM via channel)");
|
||||
}
|
||||
} else {
|
||||
// Multi-frame chunking: "MCxxyyzz..." where xx=msg_id, yy=chunk_idx, zz=total_chunks
|
||||
// Multi-frame chunking: "MCxxyyzz..." where xx=msg_id, yy=chunk_idx, zz=total_chunks.
|
||||
// Chunk size shrunk from 132 → 125 to leave room for the
|
||||
// DM wrapper header (7 bytes) on top of the "MCxxyyzz" (8).
|
||||
static CHUNK_MSG_ID: std::sync::atomic::AtomicU8 = std::sync::atomic::AtomicU8::new(0);
|
||||
let msg_id = CHUNK_MSG_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let chunk_data_size = 132; // 160 - 8 header bytes ("MCxxyyzz") = 152, leave margin
|
||||
let chunk_data_size = 125;
|
||||
let chunks: Vec<&str> = encoded.as_bytes().chunks(chunk_data_size)
|
||||
.map(|c| std::str::from_utf8(c).unwrap_or(""))
|
||||
.collect();
|
||||
@@ -301,13 +373,14 @@ async fn handle_send_command(
|
||||
raw_len = wire_payload.len(),
|
||||
b64_len = encoded.len(),
|
||||
chunks = total,
|
||||
"Sending chunked mesh message"
|
||||
"Sending chunked mesh message (DM via channel)"
|
||||
);
|
||||
for (idx, chunk) in chunks.iter().enumerate() {
|
||||
let frame = format!("MC{:02x}{:02x}{:02x}{}", msg_id, idx as u8, total, chunk);
|
||||
if let Err(e) = device.send_text(&dest_pubkey_prefix, frame.as_bytes()).await {
|
||||
let wrapped = wrap_dm_via_channel(&dest_pubkey_prefix, frame.as_bytes());
|
||||
if let Err(e) = device.send_channel_text(1, &wrapped).await {
|
||||
*consecutive_write_failures += 1;
|
||||
warn!(failures = *consecutive_write_failures, chunk = idx, "Chunk send failed: {}", e);
|
||||
warn!(failures = *consecutive_write_failures, chunk = idx, "Chunk DM-via-channel send failed: {}", e);
|
||||
break;
|
||||
}
|
||||
// Small delay between chunks to avoid overwhelming the radio
|
||||
|
||||
@@ -196,6 +196,7 @@ pub struct MeshService {
|
||||
listener_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
deadman_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
block_announcer_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
presence_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
cmd_rx: Option<tokio::sync::mpsc::Receiver<listener::MeshCommand>>,
|
||||
// Crypto identity for this node
|
||||
our_did: String,
|
||||
@@ -268,6 +269,7 @@ impl MeshService {
|
||||
listener_handle: None,
|
||||
deadman_handle: None,
|
||||
block_announcer_handle: None,
|
||||
presence_handle: None,
|
||||
cmd_rx: Some(cmd_rx),
|
||||
our_did: did.to_string(),
|
||||
our_ed_pubkey_hex: ed_pubkey_hex.to_string(),
|
||||
@@ -323,7 +325,7 @@ impl MeshService {
|
||||
error!("Dead man's switch TRIGGERED — broadcasting alert");
|
||||
if let Ok(wire) = dms.build_signed_alert(&dms_key).await {
|
||||
for ch in [0u8, 1] {
|
||||
let _ = dms_state.cmd_tx.send(
|
||||
let _ = dms_state.send_cmd(
|
||||
listener::MeshCommand::BroadcastChannel {
|
||||
channel: ch,
|
||||
payload: wire.clone(),
|
||||
@@ -407,7 +409,7 @@ impl MeshService {
|
||||
if pk_bytes.len() >= 6 {
|
||||
let mut prefix = [0u8; 6];
|
||||
prefix.copy_from_slice(&pk_bytes[..6]);
|
||||
let _ = bha_state.cmd_tx.send(
|
||||
let _ = bha_state.send_cmd(
|
||||
listener::MeshCommand::SendRaw {
|
||||
dest_pubkey_prefix: prefix,
|
||||
payload: wire.clone(),
|
||||
@@ -427,7 +429,7 @@ impl MeshService {
|
||||
if pk_bytes.len() >= 6 {
|
||||
let mut prefix = [0u8; 6];
|
||||
prefix.copy_from_slice(&pk_bytes[..6]);
|
||||
let _ = bha_state.cmd_tx.send(
|
||||
let _ = bha_state.send_cmd(
|
||||
listener::MeshCommand::SendRaw {
|
||||
dest_pubkey_prefix: prefix,
|
||||
payload: wire.clone(),
|
||||
@@ -464,6 +466,16 @@ impl MeshService {
|
||||
info!("Block header announcer started");
|
||||
}
|
||||
|
||||
// Presence heartbeat broadcaster is DISABLED. The CBOR-encoded
|
||||
// PresencePayload was rendering as garbled bytes on peers that
|
||||
// didn't understand the typed envelope (e.g. the FreeMadeira
|
||||
// repeater echoed it back as plaintext on channel 0), spamming
|
||||
// every visible node with "Archy-…: av<61>…fstatusfonline…" every
|
||||
// 120s. Re-enable only after either (a) presence moves to a
|
||||
// non-broadcast path or (b) we can guarantee no plain-text-only
|
||||
// receivers on the shared channel.
|
||||
self.presence_handle = None;
|
||||
|
||||
info!("Mesh service started");
|
||||
Ok(())
|
||||
}
|
||||
@@ -484,6 +496,19 @@ impl MeshService {
|
||||
handle.abort();
|
||||
let _ = handle.await;
|
||||
}
|
||||
if let Some(handle) = self.presence_handle.take() {
|
||||
handle.abort();
|
||||
let _ = handle.await;
|
||||
}
|
||||
// Recreate the cmd channel so a subsequent start() has a fresh
|
||||
// receiver. The listener task took ownership of the old receiver
|
||||
// on its previous run and dropped it when the task ended, so
|
||||
// without this swap the next start() hits "Command channel
|
||||
// already consumed". Swapping the sender inside MeshState means
|
||||
// every Arc holder transparently picks up the new channel.
|
||||
let (new_tx, new_rx) = tokio::sync::mpsc::channel(32);
|
||||
*self.state.cmd_tx.write().await = new_tx;
|
||||
self.cmd_rx = Some(new_rx);
|
||||
info!("Mesh service stopped");
|
||||
}
|
||||
|
||||
@@ -583,9 +608,7 @@ impl MeshService {
|
||||
let end = (start + MAX_CHUNK_B64).min(b64.len());
|
||||
let chunk = &b64[start..end];
|
||||
let frame = format!("MC{:02X}{:02X}{:02X}{}", msg_id, chunk_idx, total_chunks, chunk);
|
||||
self.state
|
||||
.cmd_tx
|
||||
.send(listener::MeshCommand::SendText {
|
||||
self.state.send_cmd(listener::MeshCommand::SendText {
|
||||
dest_pubkey_prefix: dest_prefix,
|
||||
payload: frame.into_bytes(),
|
||||
})
|
||||
@@ -621,9 +644,7 @@ impl MeshService {
|
||||
|
||||
let dest_prefix = self.peer_dest_prefix(contact_id).await?;
|
||||
|
||||
self.state
|
||||
.cmd_tx
|
||||
.send(listener::MeshCommand::SendText {
|
||||
self.state.send_cmd(listener::MeshCommand::SendText {
|
||||
dest_pubkey_prefix: dest_prefix,
|
||||
payload,
|
||||
})
|
||||
@@ -841,6 +862,15 @@ impl MeshService {
|
||||
messages.iter().find(|m| m.id == id).cloned()
|
||||
}
|
||||
|
||||
/// Drop a stored MeshMessage by local id. Used after sending control
|
||||
/// envelopes (read receipts) so they don't surface as their own
|
||||
/// bubbles in the chat history. The wire frame is already on its way;
|
||||
/// this just prunes the local Sent record.
|
||||
pub async fn drop_message_by_id(&self, id: u64) {
|
||||
let mut messages = self.state.messages.write().await;
|
||||
messages.retain(|m| m.id != id);
|
||||
}
|
||||
|
||||
/// Apply an Edit locally to any own-Sent message matching `sender_seq`
|
||||
/// (sender_pubkey is implicit = self). Rewrites `plaintext` and appends
|
||||
/// an `edited_at` marker on `typed_payload` so the UI can show "(edited)".
|
||||
@@ -905,9 +935,7 @@ impl MeshService {
|
||||
);
|
||||
}
|
||||
|
||||
self.state
|
||||
.cmd_tx
|
||||
.send(listener::MeshCommand::BroadcastChannel {
|
||||
self.state.send_cmd(listener::MeshCommand::BroadcastChannel {
|
||||
channel,
|
||||
payload: wire,
|
||||
})
|
||||
@@ -1014,9 +1042,7 @@ impl MeshService {
|
||||
}
|
||||
|
||||
// Send through the listener's command channel
|
||||
self.state
|
||||
.cmd_tx
|
||||
.send(listener::MeshCommand::BroadcastChannel {
|
||||
self.state.send_cmd(listener::MeshCommand::BroadcastChannel {
|
||||
channel,
|
||||
payload,
|
||||
})
|
||||
@@ -1060,9 +1086,7 @@ impl MeshService {
|
||||
}
|
||||
drop(status);
|
||||
|
||||
self.state
|
||||
.cmd_tx
|
||||
.send(listener::MeshCommand::SendAdvert)
|
||||
self.state.send_cmd(listener::MeshCommand::SendAdvert)
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("Mesh listener not running"))?;
|
||||
|
||||
|
||||
@@ -24,6 +24,12 @@ pub const CMD_SET_DEVICE_TIME: u8 = 0x06;
|
||||
pub const CMD_SEND_SELF_ADVERT: u8 = 0x07;
|
||||
pub const CMD_SET_ADVERT_NAME: u8 = 0x08;
|
||||
pub const CMD_SYNC_NEXT_MESSAGE: u8 = 0x0A;
|
||||
/// CMD_RESET_PATH (0x0D): Tell the firmware to drop the stored route for
|
||||
/// a contact and fall back to flood routing (out_path_len = 0xFF). Used to
|
||||
/// unstick direct messages to contacts whose `path_len=0` means "no route
|
||||
/// known" — without this, the firmware silently drops outbound TXT_MSG
|
||||
/// frames to such contacts.
|
||||
pub const CMD_RESET_PATH: u8 = 0x0D;
|
||||
pub const CMD_SET_RADIO_PARAMS: u8 = 0x0B;
|
||||
pub const CMD_SET_RADIO_TX_POWER: u8 = 0x0C;
|
||||
pub const CMD_SET_TUNING_PARAMS: u8 = 0x15;
|
||||
@@ -72,6 +78,16 @@ pub const ERR_ILLEGAL_ARG: u8 = 0x06;
|
||||
/// Maximum payload size for a single LoRa message.
|
||||
pub const MAX_MESSAGE_LEN: usize = 160;
|
||||
|
||||
/// Marker byte for "direct message wrapped as channel broadcast". Our
|
||||
/// meshcore devices can hear each other's channel broadcasts (via
|
||||
/// repeater flooding) but direct unicast frames don't reach between
|
||||
/// archipelago nodes — so we emulate DMs by sending them on the shared
|
||||
/// channel with a recipient pubkey-prefix header. Format:
|
||||
/// `[DM_VIA_CHANNEL_MARKER][dest_pubkey_prefix(6B)][inner_payload…]`
|
||||
/// The inner payload is whatever we would have sent directly — a typed
|
||||
/// envelope, a chunked MC frame, or plain text.
|
||||
pub const DM_VIA_CHANNEL_MARKER: u8 = 0xD1;
|
||||
|
||||
/// Minimum frame size: marker (1) + length (2) + command/response (1) = 4 bytes.
|
||||
const MIN_FRAME_SIZE: usize = 4;
|
||||
|
||||
@@ -224,6 +240,15 @@ pub fn build_get_contacts() -> Vec<u8> {
|
||||
encode_frame(&[CMD_GET_CONTACTS])
|
||||
}
|
||||
|
||||
/// CMD_RESET_PATH (0x0D): `[0x0D][pub_key:32]`. Clears the stored route
|
||||
/// for a contact so subsequent sends route via flood instead of being
|
||||
/// silently dropped.
|
||||
pub fn build_reset_path(pubkey: &[u8; 32]) -> Vec<u8> {
|
||||
let mut data = vec![CMD_RESET_PATH];
|
||||
data.extend_from_slice(pubkey);
|
||||
encode_frame(&data)
|
||||
}
|
||||
|
||||
/// CMD_SYNC_NEXT_MESSAGE (0x0A): Retrieve the next queued message.
|
||||
pub fn build_sync_next_message() -> Vec<u8> {
|
||||
encode_frame(&[CMD_SYNC_NEXT_MESSAGE])
|
||||
@@ -294,6 +319,8 @@ pub struct ParsedContact {
|
||||
pub advert_name: String,
|
||||
pub last_advert: u32,
|
||||
pub contact_type: u8,
|
||||
pub path_len: u8,
|
||||
pub flags: u8,
|
||||
}
|
||||
|
||||
/// Parse RESP_CONTACT (0x03) response.
|
||||
@@ -305,7 +332,8 @@ pub fn parse_contact(data: &[u8]) -> Result<ParsedContact> {
|
||||
|
||||
let public_key_hex = hex::encode(&data[0..32]);
|
||||
let contact_type = data[32];
|
||||
// flags at data[33], path_len at data[34]
|
||||
let flags = if data.len() > 33 { data[33] } else { 0 };
|
||||
let path_len = if data.len() > 34 { data[34] } else { 0 };
|
||||
// path at data[35..99] (64 bytes)
|
||||
// name at data[99..131] (32 bytes)
|
||||
let name_start = 99.min(data.len());
|
||||
@@ -330,6 +358,8 @@ pub fn parse_contact(data: &[u8]) -> Result<ParsedContact> {
|
||||
advert_name,
|
||||
last_advert,
|
||||
contact_type,
|
||||
path_len,
|
||||
flags,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -158,14 +158,26 @@ impl MeshcoreDevice {
|
||||
}
|
||||
|
||||
/// Send a text message to a contact by their public key prefix (first 6 bytes).
|
||||
pub async fn send_text(&mut self, dest_pubkey_prefix: &[u8; 6], msg: &[u8]) -> Result<()> {
|
||||
/// Returns whether the firmware routed it via flood (true) or direct (false).
|
||||
/// The response frame is `RESP_CODE_SENT | mode | tag[4] | est_timeout[4]`
|
||||
/// where mode == 1 means flood and mode == 0 means direct.
|
||||
pub async fn send_text(&mut self, dest_pubkey_prefix: &[u8; 6], msg: &[u8]) -> Result<bool> {
|
||||
let frame_data = protocol::build_send_text(dest_pubkey_prefix, msg)?;
|
||||
self.send_raw(&frame_data).await?;
|
||||
let frame = self.recv_frame_timeout(READ_TIMEOUT).await?;
|
||||
if frame.code == protocol::RESP_ERR {
|
||||
anyhow::bail!("Send text failed: {}", protocol::parse_error(&frame.data));
|
||||
}
|
||||
Ok(())
|
||||
// RESP_CODE_SENT layout: [mode(1)][tag(4)][est_timeout(4)]
|
||||
let sent_via_flood = frame.data.first().copied().unwrap_or(0) == 1;
|
||||
tracing::info!(
|
||||
dest = %hex::encode(dest_pubkey_prefix),
|
||||
mode = if sent_via_flood { "flood" } else { "direct" },
|
||||
resp_code = frame.code,
|
||||
data_len = frame.data.len(),
|
||||
"[diag] send_text response"
|
||||
);
|
||||
Ok(sent_via_flood)
|
||||
}
|
||||
|
||||
/// Broadcast a text message on a channel.
|
||||
@@ -182,6 +194,20 @@ impl MeshcoreDevice {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clear the stored routing path for a contact so the firmware flood-
|
||||
/// routes future messages instead of dropping them when path_len=0.
|
||||
pub async fn reset_contact_path(&mut self, pubkey: &[u8; 32]) -> Result<()> {
|
||||
self.send_raw(&protocol::build_reset_path(pubkey)).await?;
|
||||
let frame = self.recv_frame_timeout(READ_TIMEOUT).await?;
|
||||
if frame.code == protocol::RESP_ERR {
|
||||
anyhow::bail!(
|
||||
"Reset path failed: {}",
|
||||
protocol::parse_error(&frame.data)
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the list of known contacts from the device.
|
||||
/// Protocol: CMD_GET_CONTACTS -> CONTACT_START(count) -> N×CONTACT -> CONTACT_END
|
||||
pub async fn get_contacts(&mut self) -> Result<Vec<protocol::ParsedContact>> {
|
||||
|
||||
Reference in New Issue
Block a user