feat: fix DWN sync to use federation peers and standard port
- DWN sync now uses federation node list instead of old peer list
- Fix sync URL to use port 80 (nginx) instead of 5678 (direct backend)
- DWN /dwn endpoint now accessible without auth for peer sync
- Support both message formats: {message:{}} and {messages:[{}]}
- Replace request["message"] with unified message variable
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -183,11 +183,8 @@ impl ApiHandler {
|
||||
Self::handle_dwn_health(&self.config).await
|
||||
}
|
||||
|
||||
// DWN message processing — authenticated
|
||||
// DWN message processing — peers access over Tor for sync (no session auth)
|
||||
(Method::POST, "/dwn") => {
|
||||
if !self.is_authenticated(&headers).await {
|
||||
return Ok(Self::unauthorized());
|
||||
}
|
||||
Self::handle_dwn_message(body_bytes, &self.config).await
|
||||
}
|
||||
|
||||
@@ -671,10 +668,19 @@ impl ApiHandler {
|
||||
}
|
||||
};
|
||||
|
||||
let interface = request["message"]["descriptor"]["interface"]
|
||||
// Support both formats: {"message": {...}} and {"messages": [{...}]}
|
||||
let message = if request.get("message").is_some() {
|
||||
request["message"].clone()
|
||||
} else if let Some(msgs) = request["messages"].as_array() {
|
||||
msgs.first().cloned().unwrap_or_default()
|
||||
} else {
|
||||
serde_json::Value::Null
|
||||
};
|
||||
|
||||
let interface = message["descriptor"]["interface"]
|
||||
.as_str()
|
||||
.unwrap_or("");
|
||||
let method = request["message"]["descriptor"]["method"]
|
||||
let method = message["descriptor"]["method"]
|
||||
.as_str()
|
||||
.unwrap_or("");
|
||||
|
||||
@@ -682,11 +688,11 @@ impl ApiHandler {
|
||||
|
||||
let result = match (interface, method) {
|
||||
("Records", "Write") => {
|
||||
let author = request["message"]["author"].as_str().unwrap_or("unknown");
|
||||
let protocol = request["message"]["descriptor"]["protocol"].as_str();
|
||||
let schema = request["message"]["descriptor"]["schema"].as_str();
|
||||
let data_format = request["message"]["descriptor"]["dataFormat"].as_str();
|
||||
let data = request["message"].get("data").cloned();
|
||||
let author = message["author"].as_str().unwrap_or("unknown");
|
||||
let protocol = message["descriptor"]["protocol"].as_str();
|
||||
let schema = message["descriptor"]["schema"].as_str();
|
||||
let data_format = message["descriptor"]["dataFormat"].as_str();
|
||||
let data = message.get("data").cloned();
|
||||
match store.write_message(author, protocol, schema, data_format, data).await {
|
||||
Ok(msg) => serde_json::json!({"status": {"code": 202}, "entry": msg}),
|
||||
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
|
||||
@@ -694,22 +700,22 @@ impl ApiHandler {
|
||||
}
|
||||
("Records", "Query") => {
|
||||
let query = crate::network::dwn_store::MessageQuery {
|
||||
protocol: request["message"]["descriptor"]["filter"]["protocol"]
|
||||
protocol: message["descriptor"]["filter"]["protocol"]
|
||||
.as_str()
|
||||
.map(|s| s.to_string()),
|
||||
schema: request["message"]["descriptor"]["filter"]["schema"]
|
||||
schema: message["descriptor"]["filter"]["schema"]
|
||||
.as_str()
|
||||
.map(|s| s.to_string()),
|
||||
author: request["message"]["descriptor"]["filter"]["author"]
|
||||
author: message["descriptor"]["filter"]["author"]
|
||||
.as_str()
|
||||
.map(|s| s.to_string()),
|
||||
date_from: request["message"]["descriptor"]["filter"]["dateFrom"]
|
||||
date_from: message["descriptor"]["filter"]["dateFrom"]
|
||||
.as_str()
|
||||
.map(|s| s.to_string()),
|
||||
date_to: request["message"]["descriptor"]["filter"]["dateTo"]
|
||||
date_to: message["descriptor"]["filter"]["dateTo"]
|
||||
.as_str()
|
||||
.map(|s| s.to_string()),
|
||||
limit: request["message"]["descriptor"]["filter"]["limit"]
|
||||
limit: message["descriptor"]["filter"]["limit"]
|
||||
.as_u64()
|
||||
.map(|n| n as usize),
|
||||
};
|
||||
@@ -719,7 +725,7 @@ impl ApiHandler {
|
||||
}
|
||||
}
|
||||
("Records", "Read") => {
|
||||
let record_id = request["message"]["descriptor"]["recordId"]
|
||||
let record_id = message["descriptor"]["recordId"]
|
||||
.as_str()
|
||||
.unwrap_or("");
|
||||
match store.read_message(record_id).await {
|
||||
@@ -729,7 +735,7 @@ impl ApiHandler {
|
||||
}
|
||||
}
|
||||
("Records", "Delete") => {
|
||||
let record_id = request["message"]["descriptor"]["recordId"]
|
||||
let record_id = message["descriptor"]["recordId"]
|
||||
.as_str()
|
||||
.unwrap_or("");
|
||||
match store.delete_message(record_id).await {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use super::RpcHandler;
|
||||
use crate::federation;
|
||||
use crate::network::dwn_store::{DwnStore, MessageQuery, ProtocolDefinition};
|
||||
use crate::network::dwn_sync;
|
||||
use crate::peers;
|
||||
use anyhow::Result;
|
||||
|
||||
impl RpcHandler {
|
||||
@@ -32,11 +32,11 @@ impl RpcHandler {
|
||||
|
||||
/// Trigger DWN sync with connected peers.
|
||||
pub(super) async fn handle_dwn_sync(&self) -> Result<serde_json::Value> {
|
||||
let peer_list = peers::load_peers(&self.config.data_dir).await?;
|
||||
let onions: Vec<String> = peer_list
|
||||
let nodes = federation::load_nodes(&self.config.data_dir).await?;
|
||||
let onions: Vec<String> = nodes
|
||||
.iter()
|
||||
.filter(|p| !p.onion.is_empty())
|
||||
.map(|p| p.onion.clone())
|
||||
.filter(|n| !n.onion.is_empty() && n.trust_level != federation::TrustLevel::Untrusted)
|
||||
.map(|n| n.onion.clone())
|
||||
.collect();
|
||||
|
||||
let state = dwn_sync::sync_with_peers(&self.config.data_dir, &onions).await?;
|
||||
|
||||
@@ -156,7 +156,7 @@ async fn sync_single_peer(
|
||||
local_messages: &[crate::network::dwn_store::DwnMessage],
|
||||
last_sync: &Option<String>,
|
||||
) -> Result<u64> {
|
||||
let base_url = format!("http://{}:5678", onion);
|
||||
let base_url = format!("http://{}", onion);
|
||||
let mut imported = 0u64;
|
||||
|
||||
// Step 1: Check peer health
|
||||
|
||||
@@ -522,7 +522,7 @@
|
||||
|
||||
### Sprint 45: DWN Multi-Node Sync (June 2026 Week 3-4)
|
||||
|
||||
- [ ] **DWN-SYNC-01** — Test DWN sync between federated nodes. On node A: register a protocol via `dwn.register-protocol` (e.g., `https://archipelago.dev/protocols/notes`), write 5 messages via `dwn.write-message`. On node B: add node A as a sync target (the DWN sync module uses the federation peer list), trigger `dwn.sync`. Verify all 5 messages appear on node B via `dwn.query-messages`. Write 3 messages on node B, trigger sync from node A — verify bidirectional replication. **Acceptance**: Messages replicate both ways between 2 nodes. Protocol definitions sync as well.
|
||||
- [x] **DWN-SYNC-01** — Test DWN sync between federated nodes. On node A: register a protocol via `dwn.register-protocol` (e.g., `https://archipelago.dev/protocols/notes`), write 5 messages via `dwn.write-message`. On node B: add node A as a sync target (the DWN sync module uses the federation peer list), trigger `dwn.sync`. Verify all 5 messages appear on node B via `dwn.query-messages`. Write 3 messages on node B, trigger sync from node A — verify bidirectional replication. **Acceptance**: Messages replicate both ways between 2 nodes. Protocol definitions sync as well.
|
||||
|
||||
- [ ] **DWN-SYNC-02** — Test DWN sync across all 4 nodes. Register the same protocol on all 4 nodes. Write unique messages on each node (node A writes 5, B writes 3, C writes 2, D writes 4 = 14 total). Trigger sync from each node. After sync completes, query all messages on each node — every node should have all 14 messages. If sync is missing messages: check the bidirectional replication logic in `dwn_sync.rs`, ensure Tor SOCKS proxy is used correctly, check for deduplication issues. **Acceptance**: All 4 nodes have all 14 messages after sync. Message content and metadata intact.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user