refactor: update dependencies and remove unused code

- Added new dependencies: `adler2`, `crc32fast`, `flate2`, `miniz_oxide`, and `libredox`.
- Updated existing dependencies: `tokio-rustls` to version 0.26.4 and `filetime` to version 0.2.27.
- Removed the `backup.rs` file as it is no longer needed.
- Introduced tests for configuration and credential management.
- Enhanced the `identity` module to generate W3C compliant DID documents.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dorian
2026-03-12 00:19:30 +00:00
parent fd2a837bea
commit f07ce10b1a
347 changed files with 18703 additions and 46785 deletions

View File

@@ -0,0 +1,382 @@
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::path::Path;
use tokio::fs;
use tracing::{debug, info};
const DNS_CONFIG_FILE: &str = "dns_config.json";
/// DNS provider presets with server addresses.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum DnsProvider {
/// Use system default (DHCP-assigned DNS)
System,
/// Cloudflare DNS-over-HTTPS (1.1.1.1)
Cloudflare,
/// Google DNS-over-HTTPS (8.8.8.8)
Google,
/// Quad9 DNS-over-HTTPS (9.9.9.9)
Quad9,
/// Mullvad DNS (no logging)
Mullvad,
/// Custom user-specified servers
Custom,
}
impl std::fmt::Display for DnsProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::System => write!(f, "system"),
Self::Cloudflare => write!(f, "cloudflare"),
Self::Google => write!(f, "google"),
Self::Quad9 => write!(f, "quad9"),
Self::Mullvad => write!(f, "mullvad"),
Self::Custom => write!(f, "custom"),
}
}
}
/// Persisted DNS configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DnsConfig {
pub provider: DnsProvider,
pub servers: Vec<String>,
pub doh_enabled: bool,
pub doh_url: Option<String>,
}
impl Default for DnsConfig {
fn default() -> Self {
Self {
provider: DnsProvider::System,
servers: Vec::new(),
doh_enabled: false,
doh_url: None,
}
}
}
/// Current DNS status read from the system.
#[derive(Debug, Serialize)]
pub struct DnsStatus {
pub provider: String,
pub servers: Vec<String>,
pub doh_enabled: bool,
pub doh_url: Option<String>,
pub resolv_conf_servers: Vec<String>,
}
/// Load persisted DNS config from disk.
pub async fn load_config(data_dir: &Path) -> Result<DnsConfig> {
let path = data_dir.join(DNS_CONFIG_FILE);
if !path.exists() {
return Ok(DnsConfig::default());
}
let data = fs::read_to_string(&path)
.await
.context("Reading DNS config")?;
serde_json::from_str(&data).context("Parsing DNS config")
}
/// Save DNS config to disk.
pub async fn save_config(data_dir: &Path, config: &DnsConfig) -> Result<()> {
let path = data_dir.join(DNS_CONFIG_FILE);
let data = serde_json::to_string_pretty(config)?;
fs::write(&path, data)
.await
.context("Writing DNS config")?;
Ok(())
}
/// Get the DNS servers for a given provider preset.
pub fn provider_servers(provider: &DnsProvider) -> (Vec<String>, Option<String>) {
match provider {
DnsProvider::System => (Vec::new(), None),
DnsProvider::Cloudflare => (
vec!["1.1.1.1".into(), "1.0.0.1".into()],
Some("https://cloudflare-dns.com/dns-query".into()),
),
DnsProvider::Google => (
vec!["8.8.8.8".into(), "8.8.4.4".into()],
Some("https://dns.google/dns-query".into()),
),
DnsProvider::Quad9 => (
vec!["9.9.9.9".into(), "149.112.112.112".into()],
Some("https://dns.quad9.net/dns-query".into()),
),
DnsProvider::Mullvad => (
vec!["194.242.2.2".into()],
Some("https://dns.mullvad.net/dns-query".into()),
),
DnsProvider::Custom => (Vec::new(), None),
}
}
/// Read current DNS servers from /etc/resolv.conf.
pub async fn read_resolv_conf() -> Result<Vec<String>> {
let content = fs::read_to_string("/etc/resolv.conf")
.await
.unwrap_or_default();
let servers: Vec<String> = content
.lines()
.filter_map(|line| {
let trimmed = line.trim();
if trimmed.starts_with("nameserver") {
trimmed.split_whitespace().nth(1).map(String::from)
} else {
None
}
})
.collect();
Ok(servers)
}
/// Get current DNS status combining config + system state.
pub async fn get_status(data_dir: &Path) -> Result<DnsStatus> {
let config = load_config(data_dir).await?;
let resolv_servers = read_resolv_conf().await.unwrap_or_default();
Ok(DnsStatus {
provider: config.provider.to_string(),
servers: if config.servers.is_empty() {
resolv_servers.clone()
} else {
config.servers.clone()
},
doh_enabled: config.doh_enabled,
doh_url: config.doh_url.clone(),
resolv_conf_servers: resolv_servers,
})
}
/// Apply DNS configuration to the system via nmcli.
///
/// Sets DNS servers on the active NetworkManager connection(s).
pub async fn apply_dns(config: &DnsConfig) -> Result<()> {
if config.provider == DnsProvider::System {
// Revert to DHCP-assigned DNS
info!("Reverting to system (DHCP) DNS");
apply_dns_via_nmcli(&[]).await?;
return Ok(());
}
let servers = &config.servers;
if servers.is_empty() {
anyhow::bail!("No DNS servers specified");
}
// Validate all server IPs
for s in servers {
if s.parse::<std::net::IpAddr>().is_err() {
anyhow::bail!("Invalid DNS server IP: {}", s);
}
}
info!(provider = %config.provider, servers = ?servers, "Applying DNS configuration");
apply_dns_via_nmcli(servers).await?;
Ok(())
}
/// Apply DNS servers to all active NetworkManager connections.
async fn apply_dns_via_nmcli(servers: &[String]) -> Result<()> {
// Get active connections
let output = tokio::process::Command::new("nmcli")
.args(["-t", "-f", "NAME,DEVICE,TYPE", "connection", "show", "--active"])
.output()
.await
.context("Failed to list nmcli connections")?;
if !output.status.success() {
anyhow::bail!(
"nmcli connection show failed: {}",
String::from_utf8_lossy(&output.stderr)
);
}
let stdout = String::from_utf8(output.stdout).context("nmcli output not utf8")?;
let connections: Vec<&str> = stdout
.lines()
.filter_map(|line| {
let parts: Vec<&str> = line.splitn(3, ':').collect();
if parts.len() >= 3 {
let conn_type = parts[2];
// Only modify ethernet and wifi connections
if conn_type.contains("ethernet") || conn_type.contains("wireless") || conn_type.contains("wifi") {
return Some(parts[0]);
}
}
None
})
.collect();
if connections.is_empty() {
debug!("No active ethernet/wifi connections found, skipping DNS apply");
return Ok(());
}
let dns_value = if servers.is_empty() {
String::new() // Empty clears custom DNS, reverts to DHCP
} else {
servers.join(" ")
};
for conn_name in &connections {
// Set DNS servers
let dns_args = if dns_value.is_empty() {
vec![
"connection".to_string(),
"modify".to_string(),
conn_name.to_string(),
"ipv4.dns".to_string(),
String::new(),
"ipv4.ignore-auto-dns".to_string(),
"no".to_string(),
]
} else {
vec![
"connection".to_string(),
"modify".to_string(),
conn_name.to_string(),
"ipv4.dns".to_string(),
dns_value.clone(),
"ipv4.ignore-auto-dns".to_string(),
"yes".to_string(),
]
};
let modify = tokio::process::Command::new("nmcli")
.args(&dns_args)
.output()
.await
.context("Failed to modify DNS via nmcli")?;
if !modify.status.success() {
let stderr = String::from_utf8_lossy(&modify.stderr);
tracing::warn!(conn = conn_name, err = %stderr, "Failed to set DNS on connection");
continue;
}
// Reapply the connection to pick up changes
let reapply = tokio::process::Command::new("nmcli")
.args(["connection", "up", conn_name])
.output()
.await;
match reapply {
Ok(out) if out.status.success() => {
info!(conn = conn_name, "DNS updated successfully");
}
Ok(out) => {
let stderr = String::from_utf8_lossy(&out.stderr);
tracing::warn!(conn = conn_name, err = %stderr, "Failed to reapply connection");
}
Err(e) => {
tracing::warn!(conn = conn_name, err = %e, "Failed to reapply connection");
}
}
}
Ok(())
}
/// Configure DNS with a specific provider.
pub async fn configure(data_dir: &Path, provider: DnsProvider, custom_servers: Vec<String>) -> Result<DnsConfig> {
let (servers, doh_url) = if provider == DnsProvider::Custom {
(custom_servers, None)
} else {
let (preset_servers, preset_doh) = provider_servers(&provider);
(preset_servers, preset_doh)
};
let doh_enabled = doh_url.is_some();
let config = DnsConfig {
provider,
servers,
doh_enabled,
doh_url,
};
// Apply to system
apply_dns(&config).await?;
// Persist config
save_config(data_dir, &config).await?;
Ok(config)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_default_config() {
let config = DnsConfig::default();
assert_eq!(config.provider, DnsProvider::System);
assert!(config.servers.is_empty());
assert!(!config.doh_enabled);
}
#[test]
fn test_provider_servers() {
let (servers, doh) = provider_servers(&DnsProvider::Cloudflare);
assert_eq!(servers, vec!["1.1.1.1", "1.0.0.1"]);
assert!(doh.unwrap().contains("cloudflare"));
let (servers, doh) = provider_servers(&DnsProvider::System);
assert!(servers.is_empty());
assert!(doh.is_none());
let (servers, doh) = provider_servers(&DnsProvider::Custom);
assert!(servers.is_empty());
assert!(doh.is_none());
}
#[test]
fn test_provider_display() {
assert_eq!(DnsProvider::Cloudflare.to_string(), "cloudflare");
assert_eq!(DnsProvider::System.to_string(), "system");
assert_eq!(DnsProvider::Quad9.to_string(), "quad9");
}
#[tokio::test]
async fn test_config_persistence() {
let dir = tempdir().unwrap();
let config = DnsConfig {
provider: DnsProvider::Cloudflare,
servers: vec!["1.1.1.1".into(), "1.0.0.1".into()],
doh_enabled: true,
doh_url: Some("https://cloudflare-dns.com/dns-query".into()),
};
save_config(dir.path(), &config).await.unwrap();
let loaded = load_config(dir.path()).await.unwrap();
assert_eq!(loaded.provider, DnsProvider::Cloudflare);
assert_eq!(loaded.servers.len(), 2);
assert!(loaded.doh_enabled);
}
#[tokio::test]
async fn test_load_missing_config_returns_default() {
let dir = tempdir().unwrap();
let config = load_config(dir.path()).await.unwrap();
assert_eq!(config.provider, DnsProvider::System);
}
#[test]
fn test_config_serialization() {
let config = DnsConfig {
provider: DnsProvider::Google,
servers: vec!["8.8.8.8".into()],
doh_enabled: true,
doh_url: Some("https://dns.google/dns-query".into()),
};
let json = serde_json::to_string(&config).unwrap();
let parsed: DnsConfig = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.provider, DnsProvider::Google);
assert_eq!(parsed.servers, vec!["8.8.8.8"]);
}
}

View File

@@ -0,0 +1,459 @@
//! DWN message store — persists DWN messages as JSON files on disk.
//!
//! Implements core CRUD operations, protocol registration, and query interface
//! for the Decentralized Web Node spec.
use anyhow::{bail, Context, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use tokio::fs;
use tracing::debug;
use uuid::Uuid;
/// A DWN message descriptor following the spec.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageDescriptor {
pub interface: String,
pub method: String,
pub protocol: Option<String>,
pub schema: Option<String>,
#[serde(rename = "dateCreated")]
pub date_created: String,
#[serde(rename = "dateModified")]
pub date_modified: Option<String>,
#[serde(rename = "dataFormat")]
pub data_format: Option<String>,
}
/// A stored DWN message.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DwnMessage {
pub record_id: String,
pub descriptor: MessageDescriptor,
pub author: String,
pub data: Option<serde_json::Value>,
#[serde(rename = "dateCreated")]
pub date_created: String,
}
/// A registered protocol definition.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProtocolDefinition {
pub protocol: String,
pub published: bool,
pub types: HashMap<String, serde_json::Value>,
pub structure: HashMap<String, serde_json::Value>,
#[serde(rename = "dateRegistered")]
pub date_registered: String,
}
/// Query parameters for searching messages.
#[derive(Debug, Default)]
pub struct MessageQuery {
pub protocol: Option<String>,
pub schema: Option<String>,
pub author: Option<String>,
pub date_from: Option<String>,
pub date_to: Option<String>,
pub limit: Option<usize>,
}
/// The DWN message store backed by the filesystem.
pub struct DwnStore {
messages_dir: PathBuf,
protocols_dir: PathBuf,
}
impl DwnStore {
/// Create a new DWN store at the given data directory.
pub async fn new(data_dir: &Path) -> Result<Self> {
let messages_dir = data_dir.join("dwn/messages");
let protocols_dir = data_dir.join("dwn/protocols");
fs::create_dir_all(&messages_dir)
.await
.context("Failed to create DWN messages dir")?;
fs::create_dir_all(&protocols_dir)
.await
.context("Failed to create DWN protocols dir")?;
Ok(Self {
messages_dir,
protocols_dir,
})
}
/// Write a new message or update an existing one.
pub async fn write_message(
&self,
author: &str,
protocol: Option<&str>,
schema: Option<&str>,
data_format: Option<&str>,
data: Option<serde_json::Value>,
) -> Result<DwnMessage> {
let now = chrono::Utc::now().to_rfc3339();
let record_id = Uuid::new_v4().to_string();
let message = DwnMessage {
record_id: record_id.clone(),
descriptor: MessageDescriptor {
interface: "Records".to_string(),
method: "Write".to_string(),
protocol: protocol.map(|s| s.to_string()),
schema: schema.map(|s| s.to_string()),
date_created: now.clone(),
date_modified: Some(now.clone()),
data_format: data_format.map(|s| s.to_string()),
},
author: author.to_string(),
data,
date_created: now,
};
let path = self.messages_dir.join(format!("{}.json", record_id));
let content =
serde_json::to_string_pretty(&message).context("Failed to serialize message")?;
fs::write(&path, content)
.await
.context("Failed to write message file")?;
debug!(record_id = %message.record_id, "DWN message written");
Ok(message)
}
/// Read a message by record ID.
pub async fn read_message(&self, record_id: &str) -> Result<Option<DwnMessage>> {
let path = self.messages_dir.join(format!("{}.json", record_id));
if !path.exists() {
return Ok(None);
}
let content = fs::read_to_string(&path)
.await
.context("Failed to read message file")?;
let message: DwnMessage =
serde_json::from_str(&content).context("Failed to parse message")?;
Ok(Some(message))
}
/// Delete a message by record ID.
pub async fn delete_message(&self, record_id: &str) -> Result<bool> {
let path = self.messages_dir.join(format!("{}.json", record_id));
if !path.exists() {
return Ok(false);
}
fs::remove_file(&path)
.await
.context("Failed to delete message file")?;
debug!(record_id = %record_id, "DWN message deleted");
Ok(true)
}
/// Query messages by various criteria.
pub async fn query_messages(&self, query: &MessageQuery) -> Result<Vec<DwnMessage>> {
let mut results = Vec::new();
let mut entries = fs::read_dir(&self.messages_dir)
.await
.context("Failed to read messages dir")?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
let content = match fs::read_to_string(&path).await {
Ok(c) => c,
Err(_) => continue,
};
let message: DwnMessage = match serde_json::from_str(&content) {
Ok(m) => m,
Err(_) => continue,
};
if let Some(ref proto) = query.protocol {
if message.descriptor.protocol.as_deref() != Some(proto) {
continue;
}
}
if let Some(ref schema) = query.schema {
if message.descriptor.schema.as_deref() != Some(schema) {
continue;
}
}
if let Some(ref author) = query.author {
if &message.author != author {
continue;
}
}
if let Some(ref from) = query.date_from {
if message.date_created < *from {
continue;
}
}
if let Some(ref to) = query.date_to {
if message.date_created > *to {
continue;
}
}
results.push(message);
}
// Sort by date descending (newest first)
results.sort_by(|a, b| b.date_created.cmp(&a.date_created));
if let Some(limit) = query.limit {
results.truncate(limit);
}
Ok(results)
}
/// Register a protocol definition.
pub async fn register_protocol(&self, definition: &ProtocolDefinition) -> Result<()> {
if definition.protocol.is_empty() {
bail!("Protocol URI cannot be empty");
}
let safe_name = definition.protocol.replace(['/', ':', '.'], "_");
let path = self.protocols_dir.join(format!("{}.json", safe_name));
let content =
serde_json::to_string_pretty(definition).context("Failed to serialize protocol")?;
fs::write(&path, content)
.await
.context("Failed to write protocol file")?;
debug!(protocol = %definition.protocol, "Protocol registered");
Ok(())
}
/// List all registered protocols.
pub async fn list_protocols(&self) -> Result<Vec<ProtocolDefinition>> {
let mut protocols = Vec::new();
let mut entries = fs::read_dir(&self.protocols_dir)
.await
.context("Failed to read protocols dir")?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
let content = match fs::read_to_string(&path).await {
Ok(c) => c,
Err(_) => continue,
};
if let Ok(proto) = serde_json::from_str::<ProtocolDefinition>(&content) {
protocols.push(proto);
}
}
Ok(protocols)
}
/// Remove a registered protocol.
pub async fn remove_protocol(&self, protocol_uri: &str) -> Result<bool> {
let safe_name = protocol_uri.replace(['/', ':', '.'], "_");
let path = self.protocols_dir.join(format!("{}.json", safe_name));
if !path.exists() {
return Ok(false);
}
fs::remove_file(&path)
.await
.context("Failed to remove protocol file")?;
debug!(protocol = %protocol_uri, "Protocol removed");
Ok(true)
}
/// Get storage statistics.
pub async fn stats(&self) -> Result<StoreStats> {
let mut message_count: u64 = 0;
let mut total_bytes: u64 = 0;
let mut entries = fs::read_dir(&self.messages_dir)
.await
.context("Failed to read messages dir")?;
while let Some(entry) = entries.next_entry().await? {
if entry
.path()
.extension()
.and_then(|e| e.to_str())
== Some("json")
{
message_count += 1;
if let Ok(meta) = entry.metadata().await {
total_bytes += meta.len();
}
}
}
let protocol_count = self.list_protocols().await?.len() as u64;
Ok(StoreStats {
message_count,
protocol_count,
total_bytes,
})
}
}
/// Storage statistics.
#[derive(Debug, Serialize, Deserialize)]
pub struct StoreStats {
pub message_count: u64,
pub protocol_count: u64,
pub total_bytes: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
async fn setup() -> (TempDir, DwnStore) {
let dir = TempDir::new().unwrap();
let store = DwnStore::new(dir.path()).await.unwrap();
(dir, store)
}
#[tokio::test]
async fn write_and_read_message() {
let (_dir, store) = setup().await;
let msg = store
.write_message("did:key:test", Some("proto://chat"), None, None, Some(serde_json::json!({"text": "hello"})))
.await
.unwrap();
assert!(!msg.record_id.is_empty());
let read = store.read_message(&msg.record_id).await.unwrap();
assert!(read.is_some());
let read = read.unwrap();
assert_eq!(read.author, "did:key:test");
assert_eq!(read.data, Some(serde_json::json!({"text": "hello"})));
}
#[tokio::test]
async fn read_nonexistent_returns_none() {
let (_dir, store) = setup().await;
let read = store.read_message("nonexistent-id").await.unwrap();
assert!(read.is_none());
}
#[tokio::test]
async fn delete_message() {
let (_dir, store) = setup().await;
let msg = store
.write_message("did:key:test", None, None, None, None)
.await
.unwrap();
assert!(store.delete_message(&msg.record_id).await.unwrap());
assert!(!store.delete_message(&msg.record_id).await.unwrap());
assert!(store.read_message(&msg.record_id).await.unwrap().is_none());
}
#[tokio::test]
async fn query_by_protocol() {
let (_dir, store) = setup().await;
store
.write_message("did:key:a", Some("proto://chat"), None, None, None)
.await
.unwrap();
store
.write_message("did:key:a", Some("proto://files"), None, None, None)
.await
.unwrap();
store
.write_message("did:key:b", Some("proto://chat"), None, None, None)
.await
.unwrap();
let results = store
.query_messages(&MessageQuery {
protocol: Some("proto://chat".to_string()),
..Default::default()
})
.await
.unwrap();
assert_eq!(results.len(), 2);
}
#[tokio::test]
async fn query_by_author() {
let (_dir, store) = setup().await;
store.write_message("did:key:a", None, None, None, None).await.unwrap();
store.write_message("did:key:b", None, None, None, None).await.unwrap();
let results = store
.query_messages(&MessageQuery {
author: Some("did:key:a".to_string()),
..Default::default()
})
.await
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].author, "did:key:a");
}
#[tokio::test]
async fn query_with_limit() {
let (_dir, store) = setup().await;
for i in 0..5 {
store
.write_message(&format!("did:key:{}", i), None, None, None, None)
.await
.unwrap();
}
let results = store
.query_messages(&MessageQuery {
limit: Some(3),
..Default::default()
})
.await
.unwrap();
assert_eq!(results.len(), 3);
}
#[tokio::test]
async fn register_and_list_protocols() {
let (_dir, store) = setup().await;
let proto = ProtocolDefinition {
protocol: "https://example.com/chat".to_string(),
published: true,
types: HashMap::new(),
structure: HashMap::new(),
date_registered: chrono::Utc::now().to_rfc3339(),
};
store.register_protocol(&proto).await.unwrap();
let list = store.list_protocols().await.unwrap();
assert_eq!(list.len(), 1);
assert_eq!(list[0].protocol, "https://example.com/chat");
}
#[tokio::test]
async fn remove_protocol() {
let (_dir, store) = setup().await;
let proto = ProtocolDefinition {
protocol: "https://example.com/test".to_string(),
published: false,
types: HashMap::new(),
structure: HashMap::new(),
date_registered: chrono::Utc::now().to_rfc3339(),
};
store.register_protocol(&proto).await.unwrap();
assert!(store.remove_protocol("https://example.com/test").await.unwrap());
assert!(!store.remove_protocol("https://example.com/test").await.unwrap());
assert!(store.list_protocols().await.unwrap().is_empty());
}
#[tokio::test]
async fn store_stats() {
let (_dir, store) = setup().await;
store.write_message("did:key:a", None, None, None, None).await.unwrap();
store.write_message("did:key:b", None, None, None, None).await.unwrap();
let stats = store.stats().await.unwrap();
assert_eq!(stats.message_count, 2);
assert!(stats.total_bytes > 0);
}
}

View File

@@ -98,9 +98,11 @@ pub struct DwnStatusResponse {
}
/// Trigger a sync with connected peers.
/// For each peer that has a DWN endpoint, we query their DWN
/// and replicate relevant messages.
/// For each peer that has a DWN endpoint, we pull their messages
/// and push our local messages, deduplicating by record_id.
pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result<DwnSyncState> {
use crate::network::dwn_store::{DwnStore, MessageQuery};
let mut state = load_sync_state(data_dir).await?;
state.status = SyncStatus::Syncing;
save_sync_state(data_dir, &state).await?;
@@ -114,21 +116,25 @@ pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result<
.build()
.context("Failed to build Tor HTTP client")?;
let store = DwnStore::new(data_dir).await?;
let mut synced_count = 0u64;
// Get local messages since last sync (or all if first sync)
let local_messages = store
.query_messages(&MessageQuery {
date_from: state.last_sync.clone(),
..Default::default()
})
.await?;
for onion in peer_onions {
// Try to reach the peer's DWN endpoint
let url = format!("http://{}:3100/health", onion);
match client.get(&url).send().await {
Ok(res) if res.status().is_success() => {
debug!("Peer {} has DWN running, syncing...", onion);
synced_count += 1;
}
Ok(_) => {
debug!("Peer {} DWN not available", onion);
match sync_single_peer(&client, &store, onion, &local_messages, &state.last_sync).await {
Ok(count) => {
debug!(peer = %onion, messages = count, "Peer sync complete");
synced_count += count;
}
Err(e) => {
debug!("Could not reach peer {} DWN: {}", onion, e);
debug!(peer = %onion, error = %e, "Peer sync failed");
}
}
}
@@ -138,10 +144,108 @@ pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result<
state.messages_synced += synced_count;
save_sync_state(data_dir, &state).await?;
debug!("DWN sync complete: {} peers synced", synced_count);
debug!(count = synced_count, "DWN sync complete");
Ok(state)
}
/// Sync with a single peer: pull their messages and push ours.
async fn sync_single_peer(
client: &reqwest::Client,
store: &crate::network::dwn_store::DwnStore,
onion: &str,
local_messages: &[crate::network::dwn_store::DwnMessage],
last_sync: &Option<String>,
) -> Result<u64> {
let base_url = format!("http://{}:5678", onion);
let mut imported = 0u64;
// Step 1: Check peer health
let health_url = format!("{}/dwn/health", base_url);
let res = client
.get(&health_url)
.send()
.await
.context("Peer DWN unreachable")?;
if !res.status().is_success() {
return Err(anyhow::anyhow!("Peer DWN not healthy"));
}
// Step 2: Pull — query peer for messages since our last sync
let dwn_url = format!("{}/dwn", base_url);
let mut query_filter = serde_json::json!({});
if let Some(ref since) = last_sync {
query_filter = serde_json::json!({ "dateSort": "createdAscending", "dateFrom": since });
}
let pull_body = serde_json::json!({
"messages": [{
"descriptor": {
"interface": "Records",
"method": "Query",
"filter": query_filter,
}
}]
});
let pull_res = client
.post(&dwn_url)
.json(&pull_body)
.send()
.await
.context("Failed to query peer DWN")?;
if pull_res.status().is_success() {
let pull_data: serde_json::Value = pull_res.json().await.unwrap_or_default();
if let Some(entries) = pull_data["entries"].as_array() {
for entry in entries {
let record_id = entry["record_id"].as_str().unwrap_or_default();
if record_id.is_empty() {
continue;
}
// Skip if we already have this message
if store.read_message(record_id).await?.is_some() {
continue;
}
// Import the message
let author = entry["author"].as_str().unwrap_or("unknown");
let protocol = entry["descriptor"]["protocol"].as_str();
let schema = entry["descriptor"]["schema"].as_str();
let data_format = entry["descriptor"]["dataFormat"].as_str();
let data = entry.get("data").cloned();
store
.write_message(author, protocol, schema, data_format, data)
.await?;
imported += 1;
}
}
}
// Step 3: Push — send our local messages to the peer
for msg in local_messages {
let push_body = serde_json::json!({
"messages": [{
"descriptor": {
"interface": "Records",
"method": "Write",
"protocol": msg.descriptor.protocol,
"schema": msg.descriptor.schema,
"dataFormat": msg.descriptor.data_format,
},
"recordId": msg.record_id,
"author": msg.author,
"data": msg.data,
}]
});
// Best-effort push — don't fail the whole sync if one push fails
if let Err(e) = client.post(&dwn_url).json(&push_body).send().await {
debug!(record_id = %msg.record_id, error = %e, "Failed to push message to peer");
}
}
Ok(imported)
}
/// Add a peer as a sync target.
pub async fn add_sync_target(data_dir: &Path, onion: &str) -> Result<()> {
let mut state = load_sync_state(data_dir).await?;

View File

@@ -1,2 +1,4 @@
pub mod dns;
pub mod dwn_store;
pub mod dwn_sync;
pub mod router;

View File

@@ -5,7 +5,7 @@ use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::path::Path;
use tokio::fs;
use tracing::{debug, info, warn};
use tracing::debug;
const FORWARDS_FILE: &str = "port_forwards.json";