fix(mesh): dedup across transports + persistent radio-contact blocklist
Two mesh fixes bundled so the deploy lands them together: Doubled messages (radio + federation): dedup at store_message now runs a third cross-transport check keyed on (sender_seq, plaintext, 120s). The existing (sender_pubkey, sender_seq) match missed the common case where the same envelope arrives via LoRa radio (sender_pubkey looked up from the firmware key) and again via Tor federation (sender_pubkey = archipelago ed25519), because the two lookups disagree. The new cross-transport match closes that gap without loosening legacy paths. Stale contacts after clear-all: meshcore's on-device contact table is persistent and reads back into peers on the next refresh_contacts, so the previous "nuclear" clear wiped app state for a few seconds before the old rows reappeared. New persistent `radio_contact_blocklist` (mesh-ignored-radio-contacts.json) captures the pubkeys present at clear-time; `refresh_contacts` filters them on read and the filter survives restart. Federation-synthetic peers are excluded from the snapshot so the list rebuilds normally on the next gossip. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -233,6 +233,45 @@ impl RpcHandler {
|
|||||||
let service = self.mesh_service.read().await;
|
let service = self.mesh_service.read().await;
|
||||||
if let Some(svc) = service.as_ref() {
|
if let Some(svc) = service.as_ref() {
|
||||||
let state = svc.state();
|
let state = svc.state();
|
||||||
|
|
||||||
|
// Snapshot the firmware pubkeys we currently know about, then
|
||||||
|
// add them to the radio-contact blocklist. MeshCore's on-device
|
||||||
|
// contact table is persistent and reads back stale rows on the
|
||||||
|
// next refresh_contacts, so without this step `clear-all` only
|
||||||
|
// wipes the app view for a few seconds before the old entries
|
||||||
|
// reappear. The blocklist is also saved to disk so the filter
|
||||||
|
// survives a restart.
|
||||||
|
let firmware_pubkeys: Vec<String> = state
|
||||||
|
.peers
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.values()
|
||||||
|
.filter_map(|p| {
|
||||||
|
// Federation-synthetic peers have their contact_id in the
|
||||||
|
// high half of u32 and carry the archipelago key — those
|
||||||
|
// aren't firmware contacts and must not go on the list.
|
||||||
|
if p.contact_id & 0x8000_0000 != 0 {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
p.pubkey_hex.clone()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
{
|
||||||
|
let mut set = state.radio_contact_blocklist.write().await;
|
||||||
|
for pk in &firmware_pubkeys {
|
||||||
|
set.insert(pk.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let persisted: Vec<String> = state
|
||||||
|
.radio_contact_blocklist
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
let _ = crate::mesh::save_ignored_radio_contacts(&data_dir, &persisted).await;
|
||||||
|
|
||||||
state.peers.write().await.clear();
|
state.peers.write().await.clear();
|
||||||
state.messages.write().await.clear();
|
state.messages.write().await.clear();
|
||||||
state.contacts.write().await.clear();
|
state.contacts.write().await.clear();
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ mod session;
|
|||||||
|
|
||||||
use super::types::*;
|
use super::types::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{HashMap, HashSet, VecDeque};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::{broadcast, mpsc, RwLock};
|
use tokio::sync::{broadcast, mpsc, RwLock};
|
||||||
@@ -106,6 +106,12 @@ pub struct MeshState {
|
|||||||
/// by `RpcHandler` after startup so the mesh listener can persist inline
|
/// by `RpcHandler` after startup so the mesh listener can persist inline
|
||||||
/// file bytes into the same store the HTTP layer serves.
|
/// file bytes into the same store the HTTP layer serves.
|
||||||
pub blob_store: RwLock<Option<Arc<crate::blobs::BlobStore>>>,
|
pub blob_store: RwLock<Option<Arc<crate::blobs::BlobStore>>>,
|
||||||
|
/// Firmware-pubkey-hex of radio contacts the user has chosen to ignore
|
||||||
|
/// (via mesh.clear-all). `refresh_contacts` skips any device contact
|
||||||
|
/// whose pubkey is in this set, preventing the meshcore firmware's
|
||||||
|
/// persistent contact table from regenerating rows the user just
|
||||||
|
/// wiped. Persisted to `mesh-ignored-radio-contacts.json`.
|
||||||
|
pub radio_contact_blocklist: RwLock<HashSet<String>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Contact metadata kept alongside MeshState.peers. Pinned contacts sort to
|
/// Contact metadata kept alongside MeshState.peers. Pinned contacts sort to
|
||||||
@@ -172,6 +178,7 @@ impl MeshState {
|
|||||||
contacts: RwLock::new(HashMap::new()),
|
contacts: RwLock::new(HashMap::new()),
|
||||||
our_ed_pubkey_hex,
|
our_ed_pubkey_hex,
|
||||||
blob_store: RwLock::new(None),
|
blob_store: RwLock::new(None),
|
||||||
|
radio_contact_blocklist: RwLock::new(HashSet::new()),
|
||||||
});
|
});
|
||||||
(state, rx, cmd_rx)
|
(state, rx, cmd_rx)
|
||||||
}
|
}
|
||||||
@@ -208,26 +215,39 @@ impl MeshState {
|
|||||||
// collides with an earlier one (e.g. two 👍 reactions to different
|
// collides with an earlier one (e.g. two 👍 reactions to different
|
||||||
// targets, or "ok" reply twice in a row).
|
// targets, or "ok" reply twice in a row).
|
||||||
//
|
//
|
||||||
// For received messages, prefer MessageKey (sender_pubkey, sender_seq)
|
// Dedup runs THREE checks, any match drops the incoming message:
|
||||||
// as the dedup identity — it's exact and cross-transport-safe. Fall
|
// (a) (sender_pubkey, sender_seq) — exact MessageKey match
|
||||||
// back to (peer, plaintext, 30s window) only for legacy plain-text
|
// (b) (sender_seq, plaintext, 120s) — cross-transport match when
|
||||||
// frames that arrive without a sender_seq.
|
// the same envelope arrives via radio and federation: radio
|
||||||
|
// populates sender_pubkey from the firmware key, federation
|
||||||
|
// populates it from the archipelago ed25519 key, so (a) misses
|
||||||
|
// but the seq+text still uniquely identifies the envelope
|
||||||
|
// (c) (peer_contact_id, plaintext, 30s) — legacy plain-text frames
|
||||||
|
// without a sender_seq at all
|
||||||
if matches!(msg.direction, MessageDirection::Received) {
|
if matches!(msg.direction, MessageDirection::Received) {
|
||||||
let dominated = if msg.sender_pubkey.is_some() && msg.sender_seq.is_some() {
|
let has_seq = msg.sender_seq.is_some();
|
||||||
messages.iter().rev().take(40).any(|m| {
|
let key_match = has_seq
|
||||||
|
&& msg.sender_pubkey.is_some()
|
||||||
|
&& messages.iter().rev().take(40).any(|m| {
|
||||||
matches!(m.direction, MessageDirection::Received)
|
matches!(m.direction, MessageDirection::Received)
|
||||||
&& m.sender_pubkey == msg.sender_pubkey
|
&& m.sender_pubkey == msg.sender_pubkey
|
||||||
&& m.sender_seq == msg.sender_seq
|
&& m.sender_seq == msg.sender_seq
|
||||||
})
|
});
|
||||||
} else {
|
let cross_transport_match = has_seq
|
||||||
messages.iter().rev().take(20).any(|m| {
|
&& messages.iter().rev().take(40).any(|m| {
|
||||||
|
matches!(m.direction, MessageDirection::Received)
|
||||||
|
&& m.sender_seq == msg.sender_seq
|
||||||
|
&& m.plaintext == msg.plaintext
|
||||||
|
&& within_seconds_iso(&m.timestamp, &msg.timestamp, 120)
|
||||||
|
});
|
||||||
|
let legacy_match = !has_seq
|
||||||
|
&& messages.iter().rev().take(20).any(|m| {
|
||||||
matches!(m.direction, MessageDirection::Received)
|
matches!(m.direction, MessageDirection::Received)
|
||||||
&& m.peer_contact_id == msg.peer_contact_id
|
&& m.peer_contact_id == msg.peer_contact_id
|
||||||
&& m.plaintext == msg.plaintext
|
&& m.plaintext == msg.plaintext
|
||||||
&& within_seconds_iso(&m.timestamp, &msg.timestamp, 30)
|
&& within_seconds_iso(&m.timestamp, &msg.timestamp, 30)
|
||||||
})
|
});
|
||||||
};
|
if key_match || cross_transport_match || legacy_match {
|
||||||
if dominated {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -158,8 +158,16 @@ async fn refresh_contacts(
|
|||||||
) {
|
) {
|
||||||
match device.get_contacts().await {
|
match device.get_contacts().await {
|
||||||
Ok(contacts) => {
|
Ok(contacts) => {
|
||||||
|
// Skip firmware contacts the user has explicitly wiped via
|
||||||
|
// mesh.clear-all. MeshCore keeps its own persistent contact
|
||||||
|
// table the app can't remove from, so we filter on read to
|
||||||
|
// keep cleared entries out of the chat list.
|
||||||
|
let blocklist = state.radio_contact_blocklist.read().await.clone();
|
||||||
let mut peers = state.peers.write().await;
|
let mut peers = state.peers.write().await;
|
||||||
for (idx, contact) in contacts.iter().enumerate() {
|
for (idx, contact) in contacts.iter().enumerate() {
|
||||||
|
if blocklist.contains(&contact.public_key_hex) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
let contact_id = idx as u32;
|
let contact_id = idx as u32;
|
||||||
let existing = peers.get(&contact_id);
|
let existing = peers.get(&contact_id);
|
||||||
let peer = super::super::types::MeshPeer {
|
let peer = super::super::types::MeshPeer {
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ use tokio::sync::watch;
|
|||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
const MESH_CONFIG_FILE: &str = "mesh-config.json";
|
const MESH_CONFIG_FILE: &str = "mesh-config.json";
|
||||||
|
const MESH_IGNORED_RADIO_FILE: &str = "mesh-ignored-radio-contacts.json";
|
||||||
|
|
||||||
/// Derive a stable synthetic `contact_id` for a federation peer from its
|
/// Derive a stable synthetic `contact_id` for a federation peer from its
|
||||||
/// archipelago ed25519 pubkey. Mesh LoRa contacts use meshcore firmware's
|
/// archipelago ed25519 pubkey. Mesh LoRa contacts use meshcore firmware's
|
||||||
@@ -178,6 +179,27 @@ pub async fn save_config(data_dir: &Path, config: &MeshConfig) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn load_ignored_radio_contacts(data_dir: &Path) -> Vec<String> {
|
||||||
|
let path = data_dir.join(MESH_IGNORED_RADIO_FILE);
|
||||||
|
if !path.exists() {
|
||||||
|
return Vec::new();
|
||||||
|
}
|
||||||
|
match fs::read_to_string(&path).await {
|
||||||
|
Ok(s) => serde_json::from_str::<Vec<String>>(&s).unwrap_or_default(),
|
||||||
|
Err(_) => Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn save_ignored_radio_contacts(data_dir: &Path, pubkeys: &[String]) -> Result<()> {
|
||||||
|
fs::create_dir_all(data_dir).await.ok();
|
||||||
|
let content = serde_json::to_string_pretty(pubkeys)
|
||||||
|
.context("Failed to serialize ignored-radio list")?;
|
||||||
|
fs::write(data_dir.join(MESH_IGNORED_RADIO_FILE), content)
|
||||||
|
.await
|
||||||
|
.context("Failed to write ignored-radio list")?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Detect serial devices that could be mesh radios.
|
/// Detect serial devices that could be mesh radios.
|
||||||
/// Checks both Meshcore (via probe) and legacy Meshtastic paths.
|
/// Checks both Meshcore (via probe) and legacy Meshtastic paths.
|
||||||
pub async fn detect_devices() -> Vec<String> {
|
pub async fn detect_devices() -> Vec<String> {
|
||||||
@@ -264,6 +286,19 @@ impl MeshService {
|
|||||||
// radio — which never happens for nodes that only share Tor.
|
// radio — which never happens for nodes that only share Tor.
|
||||||
seed_federation_peers_into_mesh(&state, data_dir).await;
|
seed_federation_peers_into_mesh(&state, data_dir).await;
|
||||||
|
|
||||||
|
// Load the radio-contact blocklist so previously-wiped firmware
|
||||||
|
// contacts stay hidden after restart. Without this, meshcore's
|
||||||
|
// persistent on-device contact table regenerates the rows on the
|
||||||
|
// next refresh_contacts cycle and the user sees stale entries
|
||||||
|
// they already cleared.
|
||||||
|
{
|
||||||
|
let ignored = load_ignored_radio_contacts(data_dir).await;
|
||||||
|
let mut set = state.radio_contact_blocklist.write().await;
|
||||||
|
for pk in ignored {
|
||||||
|
set.insert(pk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
state,
|
state,
|
||||||
config,
|
config,
|
||||||
|
|||||||
Reference in New Issue
Block a user