test: US-08 DWN sync tests pass 50/50 — fix sync performance

- Make dwn.sync endpoint async: spawns background task, returns immediately
- Add 90s overall timeout to sync_with_peers via tokio::time::timeout
- Deduplicate peer onion addresses before syncing
- Batch message pushes (50 per request) instead of one-at-a-time over Tor
- Add 15s connect_timeout to Tor SOCKS5 client
- Cap local message query to 200 messages per sync
- Fix DWN HTTP handler to process ALL messages in batch (was only first)
- Add recordId deduplication in handler to prevent duplicate imports
- Update test script to poll dwn.status for sync completion

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian
2026-03-14 01:35:56 +00:00
parent a64d1b2d12
commit 65b5d5db8e
5 changed files with 371 additions and 116 deletions

View File

@@ -652,6 +652,7 @@ impl ApiHandler {
}
/// DWN message processing endpoint — handles RecordsWrite, RecordsQuery, RecordsRead, RecordsDelete.
/// Supports batch processing: all messages in the array are processed.
async fn handle_dwn_message(
body: hyper::body::Bytes,
config: &Config,
@@ -668,100 +669,145 @@ impl ApiHandler {
}
};
// Support both formats: {"message": {...}} and {"messages": [{...}]}
let message = if request.get("message").is_some() {
request["message"].clone()
// Collect all messages to process
let messages: Vec<serde_json::Value> = if request.get("message").is_some() {
vec![request["message"].clone()]
} else if let Some(msgs) = request["messages"].as_array() {
msgs.first().cloned().unwrap_or_default()
msgs.clone()
} else {
serde_json::Value::Null
vec![serde_json::Value::Null]
};
let interface = message["descriptor"]["interface"]
.as_str()
.unwrap_or("");
let method = message["descriptor"]["method"]
.as_str()
.unwrap_or("");
let store = DwnStore::new(&config.data_dir).await?;
let mut results = Vec::new();
let result = match (interface, method) {
("Records", "Write") => {
let author = message["author"].as_str().unwrap_or("unknown");
let protocol = message["descriptor"]["protocol"].as_str();
let schema = message["descriptor"]["schema"].as_str();
let data_format = message["descriptor"]["dataFormat"].as_str();
let data = message.get("data").cloned();
match store.write_message(author, protocol, schema, data_format, data).await {
Ok(msg) => serde_json::json!({"status": {"code": 202}, "entry": msg}),
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
}
}
("Records", "Query") => {
let query = crate::network::dwn_store::MessageQuery {
protocol: message["descriptor"]["filter"]["protocol"]
.as_str()
.map(|s| s.to_string()),
schema: message["descriptor"]["filter"]["schema"]
.as_str()
.map(|s| s.to_string()),
author: message["descriptor"]["filter"]["author"]
.as_str()
.map(|s| s.to_string()),
date_from: message["descriptor"]["filter"]["dateFrom"]
.as_str()
.map(|s| s.to_string()),
date_to: message["descriptor"]["filter"]["dateTo"]
.as_str()
.map(|s| s.to_string()),
limit: message["descriptor"]["filter"]["limit"]
.as_u64()
.map(|n| n as usize),
};
match store.query_messages(&query).await {
Ok(messages) => serde_json::json!({"status": {"code": 200}, "entries": messages}),
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
}
}
("Records", "Read") => {
let record_id = message["descriptor"]["recordId"]
.as_str()
.unwrap_or("");
match store.read_message(record_id).await {
Ok(Some(msg)) => serde_json::json!({"status": {"code": 200}, "entry": msg}),
Ok(None) => serde_json::json!({"status": {"code": 404, "detail": "Record not found"}}),
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
}
}
("Records", "Delete") => {
let record_id = message["descriptor"]["recordId"]
.as_str()
.unwrap_or("");
match store.delete_message(record_id).await {
Ok(true) => serde_json::json!({"status": {"code": 200}}),
Ok(false) => serde_json::json!({"status": {"code": 404, "detail": "Record not found"}}),
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
}
}
_ => {
serde_json::json!({"status": {"code": 400, "detail": format!("Unknown method: {}.{}", interface, method)}})
}
};
for message in &messages {
let interface = message["descriptor"]["interface"]
.as_str()
.unwrap_or("");
let method = message["descriptor"]["method"]
.as_str()
.unwrap_or("");
let status_code = result["status"]["code"].as_u64().unwrap_or(200);
let http_status = match status_code {
202 => StatusCode::ACCEPTED,
400 => StatusCode::BAD_REQUEST,
404 => StatusCode::NOT_FOUND,
500 => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::OK,
let result = match (interface, method) {
("Records", "Write") => {
let author = message["author"].as_str().unwrap_or("unknown");
let protocol = message["descriptor"]["protocol"].as_str();
let schema = message["descriptor"]["schema"].as_str();
let data_format = message["descriptor"]["dataFormat"].as_str();
let data = message.get("data").cloned();
// Deduplicate: check if recordId already exists
if let Some(record_id) = message["recordId"].as_str() {
if store.read_message(record_id).await.ok().flatten().is_some() {
serde_json::json!({"status": {"code": 200, "detail": "Already exists"}})
} else {
match store
.write_message(author, protocol, schema, data_format, data)
.await
{
Ok(msg) => {
serde_json::json!({"status": {"code": 202}, "entry": msg})
}
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
}
}
} else {
match store
.write_message(author, protocol, schema, data_format, data)
.await
{
Ok(msg) => serde_json::json!({"status": {"code": 202}, "entry": msg}),
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
}
}
}
("Records", "Query") => {
let query = crate::network::dwn_store::MessageQuery {
protocol: message["descriptor"]["filter"]["protocol"]
.as_str()
.map(|s| s.to_string()),
schema: message["descriptor"]["filter"]["schema"]
.as_str()
.map(|s| s.to_string()),
author: message["descriptor"]["filter"]["author"]
.as_str()
.map(|s| s.to_string()),
date_from: message["descriptor"]["filter"]["dateFrom"]
.as_str()
.map(|s| s.to_string()),
date_to: message["descriptor"]["filter"]["dateTo"]
.as_str()
.map(|s| s.to_string()),
limit: message["descriptor"]["filter"]["limit"]
.as_u64()
.map(|n| n as usize),
};
match store.query_messages(&query).await {
Ok(messages) => {
serde_json::json!({"status": {"code": 200}, "entries": messages})
}
Err(e) => {
serde_json::json!({"status": {"code": 500, "detail": e.to_string()}})
}
}
}
("Records", "Read") => {
let record_id = message["descriptor"]["recordId"]
.as_str()
.unwrap_or("");
match store.read_message(record_id).await {
Ok(Some(msg)) => {
serde_json::json!({"status": {"code": 200}, "entry": msg})
}
Ok(None) => serde_json::json!({"status": {"code": 404, "detail": "Record not found"}}),
Err(e) => {
serde_json::json!({"status": {"code": 500, "detail": e.to_string()}})
}
}
}
("Records", "Delete") => {
let record_id = message["descriptor"]["recordId"]
.as_str()
.unwrap_or("");
match store.delete_message(record_id).await {
Ok(true) => serde_json::json!({"status": {"code": 200}}),
Ok(false) => serde_json::json!({"status": {"code": 404, "detail": "Record not found"}}),
Err(e) => {
serde_json::json!({"status": {"code": 500, "detail": e.to_string()}})
}
}
}
_ => {
serde_json::json!({"status": {"code": 400, "detail": format!("Unknown method: {}.{}", interface, method)}})
}
};
results.push(result);
}
// Return single result for single message, array for batch
let (response_body, http_status) = if results.len() == 1 {
let result = &results[0];
let status_code = result["status"]["code"].as_u64().unwrap_or(200);
let http_status = match status_code {
202 => StatusCode::ACCEPTED,
400 => StatusCode::BAD_REQUEST,
404 => StatusCode::NOT_FOUND,
500 => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::OK,
};
(result.to_string(), http_status)
} else {
(
serde_json::json!({"replies": results}).to_string(),
StatusCode::OK,
)
};
Ok(Response::builder()
.status(http_status)
.header("Content-Type", "application/json")
.body(hyper::Body::from(result.to_string()))
.body(hyper::Body::from(response_body))
.unwrap())
}
}

View File

@@ -31,7 +31,18 @@ impl RpcHandler {
}
/// Trigger DWN sync with connected peers.
/// Spawns sync as a background task and returns immediately.
pub(super) async fn handle_dwn_sync(&self) -> Result<serde_json::Value> {
// Check if already syncing
let current_state = dwn_sync::load_sync_state(&self.config.data_dir).await?;
if matches!(current_state.status, dwn_sync::SyncStatus::Syncing) {
return Ok(serde_json::json!({
"sync_status": "syncing",
"last_sync": current_state.last_sync,
"messages_synced": current_state.messages_synced,
}));
}
let nodes = federation::load_nodes(&self.config.data_dir).await?;
let onions: Vec<String> = nodes
.iter()
@@ -39,12 +50,19 @@ impl RpcHandler {
.map(|n| n.onion.clone())
.collect();
let state = dwn_sync::sync_with_peers(&self.config.data_dir, &onions).await?;
// Spawn sync in background so we don't block the RPC response
let data_dir = self.config.data_dir.clone();
tokio::spawn(async move {
if let Err(e) = dwn_sync::sync_with_peers(&data_dir, &onions).await {
tracing::warn!(error = %e, "DWN background sync failed");
}
});
// Return immediately with "syncing" status
Ok(serde_json::json!({
"sync_status": state.status,
"last_sync": state.last_sync,
"messages_synced": state.messages_synced,
"sync_status": "syncing",
"last_sync": current_state.last_sync,
"messages_synced": current_state.messages_synced,
}))
}

View File

@@ -102,6 +102,7 @@ pub struct DwnStatusResponse {
/// and push our local messages, deduplicating by record_id.
pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result<DwnSyncState> {
use crate::network::dwn_store::{DwnStore, MessageQuery};
use std::collections::HashSet;
let mut state = load_sync_state(data_dir).await?;
state.status = SyncStatus::Syncing;
@@ -112,6 +113,7 @@ pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result<
let client = reqwest::Client::builder()
.proxy(socks_proxy)
.connect_timeout(std::time::Duration::from_secs(15))
.timeout(std::time::Duration::from_secs(30))
.build()
.context("Failed to build Tor HTTP client")?;
@@ -119,24 +121,47 @@ pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result<
let store = DwnStore::new(data_dir).await?;
let mut synced_count = 0u64;
// Get local messages since last sync (or all if first sync)
// Get local messages since last sync (or all if first sync, capped at 200)
let local_messages = store
.query_messages(&MessageQuery {
date_from: state.last_sync.clone(),
limit: Some(200),
..Default::default()
})
.await?;
for onion in peer_onions {
match sync_single_peer(&client, &store, onion, &local_messages, &state.last_sync).await {
Ok(count) => {
debug!(peer = %onion, messages = count, "Peer sync complete");
synced_count += count;
}
Err(e) => {
debug!(peer = %onion, error = %e, "Peer sync failed");
// Deduplicate peer onion addresses
let mut seen = HashSet::new();
let unique_onions: Vec<&String> = peer_onions
.iter()
.filter(|o| !o.is_empty() && seen.insert(o.as_str().to_string()))
.collect();
debug!(peers = unique_onions.len(), local_msgs = local_messages.len(), "Starting DWN sync");
// Overall sync timeout: 90 seconds
let sync_future = async {
for onion in &unique_onions {
match sync_single_peer(&client, &store, onion, &local_messages, &state.last_sync).await
{
Ok(count) => {
debug!(peer = %onion, messages = count, "Peer sync complete");
synced_count += count;
}
Err(e) => {
debug!(peer = %onion, error = %e, "Peer sync failed");
}
}
}
};
match tokio::time::timeout(std::time::Duration::from_secs(90), sync_future).await {
Ok(()) => {
debug!(count = synced_count, "DWN sync complete");
}
Err(_) => {
debug!("DWN sync timed out after 90s");
}
}
state.status = SyncStatus::Synced;
@@ -144,7 +169,6 @@ pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result<
state.messages_synced += synced_count;
save_sync_state(data_dir, &state).await?;
debug!(count = synced_count, "DWN sync complete");
Ok(state)
}
@@ -220,26 +244,37 @@ async fn sync_single_peer(
}
}
// Step 3: Push — send our local messages to the peer
for msg in local_messages {
let push_body = serde_json::json!({
"messages": [{
"descriptor": {
"interface": "Records",
"method": "Write",
"protocol": msg.descriptor.protocol,
"schema": msg.descriptor.schema,
"dataFormat": msg.descriptor.data_format,
},
"recordId": msg.record_id,
"author": msg.author,
"data": msg.data,
}]
});
// Step 3: Push — send local messages to peer in batches
let batch_size = 50;
for chunk in local_messages.chunks(batch_size) {
let messages: Vec<serde_json::Value> = chunk
.iter()
.map(|msg| {
serde_json::json!({
"descriptor": {
"interface": "Records",
"method": "Write",
"protocol": msg.descriptor.protocol,
"schema": msg.descriptor.schema,
"dataFormat": msg.descriptor.data_format,
},
"recordId": msg.record_id,
"author": msg.author,
"data": msg.data,
})
})
.collect();
// Best-effort push — don't fail the whole sync if one push fails
if let Err(e) = client.post(&dwn_url).json(&push_body).send().await {
debug!(record_id = %msg.record_id, error = %e, "Failed to push message to peer");
let push_body = serde_json::json!({ "messages": messages });
// Best-effort push — don't fail the whole sync if a batch fails
match client.post(&dwn_url).json(&push_body).send().await {
Ok(_) => {
debug!(count = chunk.len(), "Pushed message batch to peer");
}
Err(e) => {
debug!(error = %e, "Failed to push message batch to peer");
}
}
}