feat: Phase 3 Week 3 — typed messages + store-and-forward outbox
- Create mesh/message_types.rs: typed message envelope system
- MeshMessageType enum: Text, Alert, Invoice, PsbtHash, Coordinate,
PrekeyBundle, SessionInit, BlockHeader, TxRelay, LightningRelay
- TypedEnvelope: CBOR wire format with 0x02 prefix, optional Ed25519 sig
- Payload types: AlertPayload (with AlertType enum), InvoicePayload
(sats as u64), Coordinate (integer microdegrees, no float),
PsbtHashPayload, BlockHeaderPayload, TxRelayPayload, LightningRelayPayload
- Signed envelope creation + verification for alerts/block headers
- 8 unit tests
- Create mesh/outbox.rs: store-and-forward message queue
- PendingMessage with TTL (24h default), retry count, relay hops (max 3)
- MeshOutbox: persistent VecDeque, max 200 messages, expiry, relay support
- Disk persistence to mesh-outbox.json
- 6 unit tests: enqueue, deliver, expire, persistence, max size, relay hops
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
423
core/archipelago/src/mesh/message_types.rs
Normal file
423
core/archipelago/src/mesh/message_types.rs
Normal file
@@ -0,0 +1,423 @@
|
||||
//! Typed message envelope for mesh communication.
|
||||
//!
|
||||
//! Wraps all mesh messages in a CBOR envelope with type discrimination,
|
||||
//! enabling different message kinds (TEXT, ALERT, INVOICE, COORDINATE, etc.)
|
||||
//! over the same encrypted channel.
|
||||
//!
|
||||
//! Wire format: `[0x02: typed_marker] [CBOR envelope]`
|
||||
//! The 0x02 prefix distinguishes typed messages from plain text (0x00)
|
||||
//! and identity broadcasts (0x01 / ARCHY:2/3).
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Wire prefix for typed messages.
|
||||
pub const TYPED_MESSAGE_MARKER: u8 = 0x02;
|
||||
|
||||
/// Message type discriminator.
|
||||
#[repr(u8)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum MeshMessageType {
|
||||
Text = 0,
|
||||
Alert = 1,
|
||||
Invoice = 2,
|
||||
PsbtHash = 3,
|
||||
Coordinate = 4,
|
||||
PrekeyBundle = 5,
|
||||
SessionInit = 6,
|
||||
BlockHeader = 7,
|
||||
TxRelay = 8,
|
||||
TxRelayResponse = 9,
|
||||
LightningRelay = 10,
|
||||
LightningRelayResponse = 11,
|
||||
}
|
||||
|
||||
impl MeshMessageType {
|
||||
pub fn from_u8(v: u8) -> Option<Self> {
|
||||
match v {
|
||||
0 => Some(Self::Text),
|
||||
1 => Some(Self::Alert),
|
||||
2 => Some(Self::Invoice),
|
||||
3 => Some(Self::PsbtHash),
|
||||
4 => Some(Self::Coordinate),
|
||||
5 => Some(Self::PrekeyBundle),
|
||||
6 => Some(Self::SessionInit),
|
||||
7 => Some(Self::BlockHeader),
|
||||
8 => Some(Self::TxRelay),
|
||||
9 => Some(Self::TxRelayResponse),
|
||||
10 => Some(Self::LightningRelay),
|
||||
11 => Some(Self::LightningRelayResponse),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn label(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Text => "text",
|
||||
Self::Alert => "alert",
|
||||
Self::Invoice => "invoice",
|
||||
Self::PsbtHash => "psbt_hash",
|
||||
Self::Coordinate => "coordinate",
|
||||
Self::PrekeyBundle => "prekey_bundle",
|
||||
Self::SessionInit => "session_init",
|
||||
Self::BlockHeader => "block_header",
|
||||
Self::TxRelay => "tx_relay",
|
||||
Self::TxRelayResponse => "tx_relay_response",
|
||||
Self::LightningRelay => "lightning_relay",
|
||||
Self::LightningRelayResponse => "lightning_relay_response",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Wire Envelope ──────────────────────────────────────────────────────
|
||||
|
||||
/// CBOR wire envelope wrapping any typed message.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TypedEnvelope {
|
||||
/// Message type.
|
||||
pub t: u8,
|
||||
/// Payload bytes (type-specific CBOR or raw data).
|
||||
pub v: Vec<u8>,
|
||||
/// Unix timestamp (seconds since epoch).
|
||||
pub ts: u32,
|
||||
/// Optional Ed25519 signature of (t || v || ts_bytes) — for signed messages.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub sig: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl TypedEnvelope {
|
||||
/// Create an unsigned envelope.
|
||||
pub fn new(msg_type: MeshMessageType, payload: Vec<u8>) -> Self {
|
||||
let ts = chrono::Utc::now().timestamp() as u32;
|
||||
Self {
|
||||
t: msg_type as u8,
|
||||
v: payload,
|
||||
ts,
|
||||
sig: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a signed envelope (for ALERT, BlockHeader).
|
||||
pub fn new_signed(
|
||||
msg_type: MeshMessageType,
|
||||
payload: Vec<u8>,
|
||||
signing_key: &ed25519_dalek::SigningKey,
|
||||
) -> Self {
|
||||
use ed25519_dalek::Signer;
|
||||
let ts = chrono::Utc::now().timestamp() as u32;
|
||||
|
||||
// Sign: type byte || payload || timestamp (4 bytes LE)
|
||||
let mut sign_data = Vec::with_capacity(1 + payload.len() + 4);
|
||||
sign_data.push(msg_type as u8);
|
||||
sign_data.extend_from_slice(&payload);
|
||||
sign_data.extend_from_slice(&ts.to_le_bytes());
|
||||
let signature = signing_key.sign(&sign_data);
|
||||
|
||||
Self {
|
||||
t: msg_type as u8,
|
||||
v: payload,
|
||||
ts,
|
||||
sig: Some(signature.to_bytes().to_vec()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Verify signature if present.
|
||||
pub fn verify_signature(&self, verifying_key: &ed25519_dalek::VerifyingKey) -> Result<bool> {
|
||||
let Some(sig_bytes) = &self.sig else { return Ok(false) };
|
||||
let signature = ed25519_dalek::Signature::from_slice(sig_bytes)
|
||||
.context("Invalid signature bytes")?;
|
||||
|
||||
let mut sign_data = Vec::with_capacity(1 + self.v.len() + 4);
|
||||
sign_data.push(self.t);
|
||||
sign_data.extend_from_slice(&self.v);
|
||||
sign_data.extend_from_slice(&self.ts.to_le_bytes());
|
||||
|
||||
verifying_key
|
||||
.verify_strict(&sign_data, &signature)
|
||||
.context("Signature verification failed")?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Get the message type.
|
||||
pub fn message_type(&self) -> Option<MeshMessageType> {
|
||||
MeshMessageType::from_u8(self.t)
|
||||
}
|
||||
|
||||
/// Encode to wire format: [0x02] [CBOR envelope].
|
||||
pub fn to_wire(&self) -> Result<Vec<u8>> {
|
||||
let mut buf = Vec::new();
|
||||
buf.push(TYPED_MESSAGE_MARKER);
|
||||
ciborium::into_writer(self, &mut buf).context("CBOR encode failed")?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Decode from wire format.
|
||||
pub fn from_wire(data: &[u8]) -> Result<Self> {
|
||||
if data.is_empty() || data[0] != TYPED_MESSAGE_MARKER {
|
||||
anyhow::bail!("Not a typed message (expected 0x02 prefix)");
|
||||
}
|
||||
ciborium::from_reader(&data[1..]).context("CBOR decode failed")
|
||||
}
|
||||
|
||||
/// Check if raw bytes are a typed message.
|
||||
pub fn is_typed(data: &[u8]) -> bool {
|
||||
!data.is_empty() && data[0] == TYPED_MESSAGE_MARKER
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Payload Types ──────────────────────────────────────────────────────
|
||||
|
||||
/// Alert severity / type.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum AlertType {
|
||||
Emergency,
|
||||
Status,
|
||||
DeadMan,
|
||||
BlockHeader,
|
||||
}
|
||||
|
||||
/// GPS coordinate stored as integer microdegrees (no floating point).
|
||||
/// 1 microdegree ≈ 0.11 meters at the equator.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Coordinate {
|
||||
/// Latitude in microdegrees (degrees × 1_000_000).
|
||||
pub lat: i32,
|
||||
/// Longitude in microdegrees (degrees × 1_000_000).
|
||||
pub lng: i32,
|
||||
/// Optional human-readable label.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub label: Option<String>,
|
||||
}
|
||||
|
||||
impl Coordinate {
|
||||
/// Create from floating-point degrees (convenience for UI layer).
|
||||
pub fn from_degrees(lat: f64, lng: f64, label: Option<String>) -> Self {
|
||||
Self {
|
||||
lat: (lat * 1_000_000.0) as i32,
|
||||
lng: (lng * 1_000_000.0) as i32,
|
||||
label,
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert to floating-point degrees (for display only).
|
||||
pub fn lat_degrees(&self) -> f64 {
|
||||
self.lat as f64 / 1_000_000.0
|
||||
}
|
||||
|
||||
pub fn lng_degrees(&self) -> f64 {
|
||||
self.lng as f64 / 1_000_000.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Alert payload.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AlertPayload {
|
||||
pub alert_type: AlertType,
|
||||
pub message: String,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub coordinate: Option<Coordinate>,
|
||||
}
|
||||
|
||||
/// Lightning invoice payload.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct InvoicePayload {
|
||||
pub bolt11: String,
|
||||
/// Amount in satoshis — always u64, never float.
|
||||
pub amount_sats: u64,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub memo: Option<String>,
|
||||
/// Payment hash hex (for tracking).
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub payment_hash: Option<String>,
|
||||
}
|
||||
|
||||
/// PSBT coordination payload.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PsbtHashPayload {
|
||||
/// SHA-256 hash of the PSBT (hex).
|
||||
pub psbt_hash: String,
|
||||
pub description: String,
|
||||
/// Amount in satoshis.
|
||||
pub amount_sats: u64,
|
||||
}
|
||||
|
||||
/// Block header announcement (from internet-connected to mesh-only peers).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct BlockHeaderPayload {
|
||||
pub height: u64,
|
||||
/// Block hash (64 hex chars).
|
||||
pub hash: String,
|
||||
/// Previous block hash for chain continuity.
|
||||
pub prev_hash: String,
|
||||
/// Block timestamp.
|
||||
pub timestamp: u32,
|
||||
/// Announced by DID.
|
||||
pub announced_by: String,
|
||||
}
|
||||
|
||||
/// Transaction relay request (mesh-only → internet peer).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TxRelayPayload {
|
||||
pub tx_hex: String,
|
||||
pub request_id: u64,
|
||||
}
|
||||
|
||||
/// Transaction relay response (internet peer → mesh-only).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TxRelayResponsePayload {
|
||||
pub request_id: u64,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub txid: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
/// Lightning invoice relay request.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LightningRelayPayload {
|
||||
pub bolt11: String,
|
||||
pub amount_sats: u64,
|
||||
pub request_id: u64,
|
||||
}
|
||||
|
||||
/// Lightning relay response (proof of payment).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LightningRelayResponsePayload {
|
||||
pub request_id: u64,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub payment_hash: Option<String>,
|
||||
/// Preimage as proof of payment.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub preimage: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
// ─── Helpers ────────────────────────────────────────────────────────────
|
||||
|
||||
/// Encode a payload type to CBOR bytes.
|
||||
pub fn encode_payload<T: Serialize>(payload: &T) -> Result<Vec<u8>> {
|
||||
let mut buf = Vec::new();
|
||||
ciborium::into_writer(payload, &mut buf).context("CBOR payload encode failed")?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Decode a payload type from CBOR bytes.
|
||||
pub fn decode_payload<T: for<'a> Deserialize<'a>>(data: &[u8]) -> Result<T> {
|
||||
ciborium::from_reader(data).context("CBOR payload decode failed")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_typed_envelope_wire_roundtrip() {
|
||||
let envelope = TypedEnvelope::new(
|
||||
MeshMessageType::Text,
|
||||
b"hello mesh".to_vec(),
|
||||
);
|
||||
let wire = envelope.to_wire().unwrap();
|
||||
assert_eq!(wire[0], TYPED_MESSAGE_MARKER);
|
||||
|
||||
let decoded = TypedEnvelope::from_wire(&wire).unwrap();
|
||||
assert_eq!(decoded.t, MeshMessageType::Text as u8);
|
||||
assert_eq!(decoded.v, b"hello mesh");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_signed_envelope() {
|
||||
use ed25519_dalek::SigningKey;
|
||||
use rand::rngs::OsRng;
|
||||
|
||||
let key = SigningKey::generate(&mut OsRng);
|
||||
let envelope = TypedEnvelope::new_signed(
|
||||
MeshMessageType::Alert,
|
||||
b"emergency broadcast".to_vec(),
|
||||
&key,
|
||||
);
|
||||
|
||||
assert!(envelope.sig.is_some());
|
||||
assert!(envelope.verify_signature(&key.verifying_key()).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tampered_signature_fails() {
|
||||
use ed25519_dalek::SigningKey;
|
||||
use rand::rngs::OsRng;
|
||||
|
||||
let key = SigningKey::generate(&mut OsRng);
|
||||
let mut envelope = TypedEnvelope::new_signed(
|
||||
MeshMessageType::Alert,
|
||||
b"test".to_vec(),
|
||||
&key,
|
||||
);
|
||||
// Tamper with payload
|
||||
envelope.v = b"tampered".to_vec();
|
||||
assert!(envelope.verify_signature(&key.verifying_key()).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_invoice_payload_roundtrip() {
|
||||
let invoice = InvoicePayload {
|
||||
bolt11: "lnbc500n1pjtest...".to_string(),
|
||||
amount_sats: 50000,
|
||||
memo: Some("Pizza money".to_string()),
|
||||
payment_hash: None,
|
||||
};
|
||||
let encoded = encode_payload(&invoice).unwrap();
|
||||
let decoded: InvoicePayload = decode_payload(&encoded).unwrap();
|
||||
assert_eq!(decoded.amount_sats, 50000);
|
||||
assert_eq!(decoded.memo, Some("Pizza money".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_coordinate_microdegrees() {
|
||||
let coord = Coordinate::from_degrees(51.5074, -0.1278, Some("London".to_string()));
|
||||
assert_eq!(coord.lat, 51507400);
|
||||
assert_eq!(coord.lng, -127800);
|
||||
assert!((coord.lat_degrees() - 51.5074).abs() < 0.001);
|
||||
assert!((coord.lng_degrees() - (-0.1278)).abs() < 0.001);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alert_payload_roundtrip() {
|
||||
let alert = AlertPayload {
|
||||
alert_type: AlertType::DeadMan,
|
||||
message: "Node unresponsive for 6h".to_string(),
|
||||
coordinate: Some(Coordinate::from_degrees(30.2672, -97.7431, Some("Austin".to_string()))),
|
||||
};
|
||||
let encoded = encode_payload(&alert).unwrap();
|
||||
let decoded: AlertPayload = decode_payload(&encoded).unwrap();
|
||||
assert_eq!(decoded.alert_type, AlertType::DeadMan);
|
||||
assert!(decoded.coordinate.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_typed() {
|
||||
assert!(TypedEnvelope::is_typed(&[0x02, 0x01]));
|
||||
assert!(!TypedEnvelope::is_typed(&[0x00, 0x01]));
|
||||
assert!(!TypedEnvelope::is_typed(&[]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_message_type_label() {
|
||||
assert_eq!(MeshMessageType::Invoice.label(), "invoice");
|
||||
assert_eq!(MeshMessageType::Alert.label(), "alert");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_block_header_payload() {
|
||||
let header = BlockHeaderPayload {
|
||||
height: 890412,
|
||||
hash: "00000000000000000001abc".to_string(),
|
||||
prev_hash: "00000000000000000001aab".to_string(),
|
||||
timestamp: 1710633600,
|
||||
announced_by: "did:key:z6MkTest".to_string(),
|
||||
};
|
||||
let encoded = encode_payload(&header).unwrap();
|
||||
let decoded: BlockHeaderPayload = decode_payload(&encoded).unwrap();
|
||||
assert_eq!(decoded.height, 890412);
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,10 @@ pub mod serial;
|
||||
#[allow(dead_code)]
|
||||
pub mod types;
|
||||
#[allow(dead_code)]
|
||||
pub mod message_types;
|
||||
#[allow(dead_code)]
|
||||
pub mod outbox;
|
||||
#[allow(dead_code)]
|
||||
pub mod ratchet;
|
||||
#[allow(dead_code)]
|
||||
pub mod session;
|
||||
|
||||
333
core/archipelago/src/mesh/outbox.rs
Normal file
333
core/archipelago/src/mesh/outbox.rs
Normal file
@@ -0,0 +1,333 @@
|
||||
//! Store-and-forward message queue for mesh networking.
|
||||
//!
|
||||
//! When a destination peer is offline or unreachable, messages are queued
|
||||
//! in the outbox and retried periodically. Messages expire after TTL (24h default).
|
||||
//! Intermediate nodes can relay messages for peers up to 3 hops away.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
/// Default time-to-live for queued messages (24 hours).
|
||||
const DEFAULT_TTL_SECS: u64 = 86400;
|
||||
|
||||
/// Maximum relay hops for store-and-forward.
|
||||
const MAX_RELAY_HOPS: u8 = 3;
|
||||
|
||||
/// Maximum queued messages to prevent unbounded memory use.
|
||||
const MAX_QUEUE_SIZE: usize = 200;
|
||||
|
||||
const OUTBOX_FILE: &str = "mesh-outbox.json";
|
||||
|
||||
/// A message waiting to be delivered.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PendingMessage {
|
||||
pub id: u64,
|
||||
/// Destination peer DID.
|
||||
pub dest_did: String,
|
||||
/// Encrypted payload bytes (already ratchet-encrypted or static-encrypted).
|
||||
#[serde(with = "base64_bytes")]
|
||||
pub encrypted_payload: Vec<u8>,
|
||||
/// When this message was created (RFC 3339).
|
||||
pub created_at: String,
|
||||
/// Time-to-live in seconds.
|
||||
pub ttl_secs: u64,
|
||||
/// Number of times we've attempted delivery.
|
||||
pub retry_count: u32,
|
||||
/// How many relay hops this message has traversed.
|
||||
pub relay_hops: u8,
|
||||
/// Original sender DID (for relayed messages).
|
||||
pub from_did: String,
|
||||
}
|
||||
|
||||
impl PendingMessage {
|
||||
/// Check if this message has expired.
|
||||
pub fn is_expired(&self) -> bool {
|
||||
let Ok(created) = chrono::DateTime::parse_from_rfc3339(&self.created_at) else {
|
||||
return true; // Can't parse = treat as expired
|
||||
};
|
||||
let age = chrono::Utc::now().signed_duration_since(created);
|
||||
age.num_seconds() as u64 > self.ttl_secs
|
||||
}
|
||||
|
||||
/// Check if this message can be relayed further.
|
||||
pub fn can_relay(&self) -> bool {
|
||||
self.relay_hops < MAX_RELAY_HOPS
|
||||
}
|
||||
}
|
||||
|
||||
/// Persistent store-and-forward queue.
|
||||
pub struct MeshOutbox {
|
||||
queue: RwLock<VecDeque<PendingMessage>>,
|
||||
data_dir: PathBuf,
|
||||
next_id: RwLock<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
struct OutboxFile {
|
||||
messages: Vec<PendingMessage>,
|
||||
next_id: u64,
|
||||
}
|
||||
|
||||
impl MeshOutbox {
|
||||
/// Load outbox from disk or create empty.
|
||||
pub async fn load(data_dir: &Path) -> Result<Self> {
|
||||
let path = data_dir.join(OUTBOX_FILE);
|
||||
let (messages, next_id) = if path.exists() {
|
||||
let content = tokio::fs::read_to_string(&path)
|
||||
.await
|
||||
.context("Failed to read mesh outbox")?;
|
||||
let file: OutboxFile = serde_json::from_str(&content).unwrap_or_default();
|
||||
(VecDeque::from(file.messages), file.next_id)
|
||||
} else {
|
||||
(VecDeque::new(), 1)
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
queue: RwLock::new(messages),
|
||||
data_dir: data_dir.to_path_buf(),
|
||||
next_id: RwLock::new(next_id),
|
||||
})
|
||||
}
|
||||
|
||||
/// Persist queue to disk.
|
||||
pub async fn save(&self) -> Result<()> {
|
||||
let queue = self.queue.read().await;
|
||||
let next_id = *self.next_id.read().await;
|
||||
let file = OutboxFile {
|
||||
messages: queue.iter().cloned().collect(),
|
||||
next_id,
|
||||
};
|
||||
let content = serde_json::to_string_pretty(&file)
|
||||
.context("Failed to serialize outbox")?;
|
||||
tokio::fs::write(self.data_dir.join(OUTBOX_FILE), content)
|
||||
.await
|
||||
.context("Failed to write outbox")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Enqueue a message for delivery.
|
||||
pub async fn enqueue(
|
||||
&self,
|
||||
dest_did: &str,
|
||||
from_did: &str,
|
||||
encrypted_payload: Vec<u8>,
|
||||
ttl_secs: Option<u64>,
|
||||
) -> Result<u64> {
|
||||
let mut next_id = self.next_id.write().await;
|
||||
let id = *next_id;
|
||||
*next_id += 1;
|
||||
|
||||
let msg = PendingMessage {
|
||||
id,
|
||||
dest_did: dest_did.to_string(),
|
||||
encrypted_payload,
|
||||
created_at: chrono::Utc::now().to_rfc3339(),
|
||||
ttl_secs: ttl_secs.unwrap_or(DEFAULT_TTL_SECS),
|
||||
retry_count: 0,
|
||||
relay_hops: 0,
|
||||
from_did: from_did.to_string(),
|
||||
};
|
||||
|
||||
let mut queue = self.queue.write().await;
|
||||
// Evict oldest if over limit
|
||||
while queue.len() >= MAX_QUEUE_SIZE {
|
||||
queue.pop_front();
|
||||
}
|
||||
queue.push_back(msg);
|
||||
|
||||
info!(id = id, dest = %dest_did, "Message queued for delivery");
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// Enqueue a relayed message (from another peer, not originated by us).
|
||||
pub async fn enqueue_relay(&self, mut msg: PendingMessage) -> Result<()> {
|
||||
if !msg.can_relay() {
|
||||
anyhow::bail!("Message exceeded max relay hops ({})", MAX_RELAY_HOPS);
|
||||
}
|
||||
msg.relay_hops += 1;
|
||||
|
||||
let mut queue = self.queue.write().await;
|
||||
while queue.len() >= MAX_QUEUE_SIZE {
|
||||
queue.pop_front();
|
||||
}
|
||||
queue.push_back(msg);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove expired messages from the queue.
|
||||
pub async fn expire_stale(&self) -> usize {
|
||||
let mut queue = self.queue.write().await;
|
||||
let before = queue.len();
|
||||
queue.retain(|msg| !msg.is_expired());
|
||||
let expired = before - queue.len();
|
||||
if expired > 0 {
|
||||
debug!(expired = expired, "Expired stale outbox messages");
|
||||
}
|
||||
expired
|
||||
}
|
||||
|
||||
/// Get messages pending for a specific peer.
|
||||
pub async fn messages_for_peer(&self, did: &str) -> Vec<PendingMessage> {
|
||||
self.queue
|
||||
.read()
|
||||
.await
|
||||
.iter()
|
||||
.filter(|m| m.dest_did == did)
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Mark a message as delivered (remove from queue).
|
||||
pub async fn mark_delivered(&self, id: u64) -> bool {
|
||||
let mut queue = self.queue.write().await;
|
||||
let before = queue.len();
|
||||
queue.retain(|m| m.id != id);
|
||||
queue.len() < before
|
||||
}
|
||||
|
||||
/// Increment retry count for a message.
|
||||
pub async fn increment_retry(&self, id: u64) {
|
||||
let mut queue = self.queue.write().await;
|
||||
if let Some(msg) = queue.iter_mut().find(|m| m.id == id) {
|
||||
msg.retry_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
/// Get all pending messages (for RPC status).
|
||||
pub async fn list(&self, limit: Option<usize>) -> Vec<PendingMessage> {
|
||||
let queue = self.queue.read().await;
|
||||
let limit = limit.unwrap_or(50);
|
||||
queue.iter().take(limit).cloned().collect()
|
||||
}
|
||||
|
||||
/// Count of pending messages.
|
||||
pub async fn count(&self) -> usize {
|
||||
self.queue.read().await.len()
|
||||
}
|
||||
}
|
||||
|
||||
// ─── base64 serde for encrypted payloads ────────────────────────────────
|
||||
|
||||
mod base64_bytes {
|
||||
use base64::Engine;
|
||||
use serde::{Deserialize, Deserializer, Serializer};
|
||||
|
||||
pub fn serialize<S: Serializer>(bytes: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
|
||||
let encoded = base64::engine::general_purpose::STANDARD.encode(bytes);
|
||||
s.serialize_str(&encoded)
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
|
||||
let s = String::deserialize(d)?;
|
||||
base64::engine::general_purpose::STANDARD
|
||||
.decode(&s)
|
||||
.map_err(serde::de::Error::custom)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_enqueue_and_list() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let outbox = MeshOutbox::load(dir.path()).await.unwrap();
|
||||
|
||||
let id = outbox
|
||||
.enqueue("did:key:z6MkDest", "did:key:z6MkSelf", vec![1, 2, 3], None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(outbox.count().await, 1);
|
||||
let msgs = outbox.list(None).await;
|
||||
assert_eq!(msgs[0].id, id);
|
||||
assert_eq!(msgs[0].dest_did, "did:key:z6MkDest");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mark_delivered() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let outbox = MeshOutbox::load(dir.path()).await.unwrap();
|
||||
|
||||
let id = outbox
|
||||
.enqueue("did:key:z6MkDest", "did:key:z6MkSelf", vec![1], None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(outbox.mark_delivered(id).await);
|
||||
assert_eq!(outbox.count().await, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_expire_stale() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let outbox = MeshOutbox::load(dir.path()).await.unwrap();
|
||||
|
||||
// Enqueue with 0 TTL (immediately expired)
|
||||
outbox
|
||||
.enqueue("did:key:z6MkDest", "did:key:z6MkSelf", vec![1], Some(0))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let expired = outbox.expire_stale().await;
|
||||
assert_eq!(expired, 1);
|
||||
assert_eq!(outbox.count().await, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_persistence_roundtrip() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let outbox = MeshOutbox::load(dir.path()).await.unwrap();
|
||||
|
||||
outbox
|
||||
.enqueue("did:key:z6MkDest", "did:key:z6MkSelf", vec![42, 43, 44], None)
|
||||
.await
|
||||
.unwrap();
|
||||
outbox.save().await.unwrap();
|
||||
|
||||
// Reload
|
||||
let outbox2 = MeshOutbox::load(dir.path()).await.unwrap();
|
||||
assert_eq!(outbox2.count().await, 1);
|
||||
let msgs = outbox2.list(None).await;
|
||||
assert_eq!(msgs[0].encrypted_payload, vec![42, 43, 44]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_max_queue_size() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let outbox = MeshOutbox::load(dir.path()).await.unwrap();
|
||||
|
||||
for i in 0..210 {
|
||||
outbox
|
||||
.enqueue("did:key:z6MkDest", "did:key:z6MkSelf", vec![i as u8], None)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Should cap at MAX_QUEUE_SIZE
|
||||
assert!(outbox.count().await <= 200);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_relay_hops() {
|
||||
let msg = PendingMessage {
|
||||
id: 1,
|
||||
dest_did: "did:key:test".to_string(),
|
||||
encrypted_payload: vec![],
|
||||
created_at: chrono::Utc::now().to_rfc3339(),
|
||||
ttl_secs: 86400,
|
||||
retry_count: 0,
|
||||
relay_hops: 2,
|
||||
from_did: "did:key:sender".to_string(),
|
||||
};
|
||||
assert!(msg.can_relay()); // 2 < 3
|
||||
|
||||
let msg2 = PendingMessage { relay_hops: 3, ..msg };
|
||||
assert!(!msg2.can_relay()); // 3 >= 3
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user