feat: streaming ecash payments + media playback overhaul

Cashu ecash protocol (BDHKE blind signatures, cashuA token format,
mint HTTP client) replacing the stub wallet. TollGate-inspired streaming
data payment system with step-based pricing (bytes/time/requests),
session management with incremental top-ups, usage metering, and
Nostr kind 10021 service advertisements.

13 new streaming.* RPC endpoints. Content server now verifies real
Cashu tokens. Profits tracking includes streaming revenue.

Frontend: GlobalAudioPlayer (persistent bottom bar across all pages),
video lightbox with full controls, audio in MediaLightbox, free file
previews (no blur), paid 10% audio/video previews, separated play
vs download buttons in PeerFiles.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian
2026-04-11 22:31:28 -04:00
parent 0995aa1033
commit 2c98bdd19d
25 changed files with 4740 additions and 480 deletions

View File

@@ -187,12 +187,29 @@ impl RpcHandler {
// Ecash wallet
"wallet.ecash-balance" => self.handle_wallet_ecash_balance().await,
"wallet.ecash-mint" => self.handle_wallet_ecash_mint(params).await,
"wallet.ecash-mint-claim" => self.handle_wallet_ecash_mint_claim(params).await,
"wallet.ecash-melt" => self.handle_wallet_ecash_melt(params).await,
"wallet.ecash-melt-confirm" => self.handle_wallet_ecash_melt_confirm(params).await,
"wallet.ecash-send" => self.handle_wallet_ecash_send(params).await,
"wallet.ecash-receive" => self.handle_wallet_ecash_receive(params).await,
"wallet.ecash-history" => self.handle_wallet_ecash_history().await,
"wallet.networking-profits" => self.handle_wallet_networking_profits().await,
// Streaming ecash payments
"streaming.list-services" => self.handle_streaming_list_services().await,
"streaming.configure-service" => self.handle_streaming_configure_service(params).await,
"streaming.toggle-service" => self.handle_streaming_toggle_service(params).await,
"streaming.pay" => self.handle_streaming_pay(params).await,
"streaming.discover" => self.handle_streaming_discover().await,
"streaming.usage" => self.handle_streaming_usage(params).await,
"streaming.session" => self.handle_streaming_session(params).await,
"streaming.list-sessions" => self.handle_streaming_list_sessions().await,
"streaming.close-session" => self.handle_streaming_close_session(params).await,
"streaming.advertise" => self.handle_streaming_advertise().await,
"streaming.list-mints" => self.handle_streaming_list_mints().await,
"streaming.configure-mints" => self.handle_streaming_configure_mints(params).await,
"streaming.maintenance" => self.handle_streaming_maintenance().await,
// Content catalog management
"content.list-mine" => self.handle_content_list_mine().await,
"content.add" => self.handle_content_add(params).await,
@@ -201,6 +218,8 @@ impl RpcHandler {
"content.set-availability" => self.handle_content_set_availability(params).await,
"content.browse-peer" => self.handle_content_browse_peer(params).await,
"content.download-peer" => self.handle_content_download_peer(params).await,
"content.download-peer-paid" => self.handle_content_download_peer_paid(params).await,
"content.preview-peer" => self.handle_content_preview_peer(params).await,
// DWN (Decentralized Web Node)
"dwn.status" => self.handle_dwn_status().await,

View File

@@ -26,6 +26,7 @@ mod response;
mod router;
mod seed_rpc;
mod security;
mod streaming;
mod tor;
mod transport;
mod totp;

View File

@@ -0,0 +1,411 @@
//! RPC handlers for streaming ecash payments.
//!
//! Endpoints for managing priced services, processing payments,
//! checking sessions/usage, and publishing service advertisements.
use super::RpcHandler;
use crate::streaming::{advertisement, gate, meter, pricing, session};
use crate::wallet::ecash;
use anyhow::Result;
impl RpcHandler {
// ── Service pricing management ──
/// List all configured streaming services and their pricing.
pub(super) async fn handle_streaming_list_services(&self) -> Result<serde_json::Value> {
let config = pricing::load_pricing(&self.config.data_dir).await?;
Ok(serde_json::json!({
"services": config.services,
}))
}
/// Configure pricing for a streaming service.
pub(super) async fn handle_streaming_configure_service(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let service_id = params
.get("service_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing service_id"))?;
let name = params
.get("name")
.and_then(|v| v.as_str())
.unwrap_or(service_id);
let metric_str = params
.get("metric")
.and_then(|v| v.as_str())
.unwrap_or("requests");
let step_size = params
.get("step_size")
.and_then(|v| v.as_u64())
.unwrap_or(1);
let price_per_step = params
.get("price_per_step")
.and_then(|v| v.as_u64())
.unwrap_or(1);
let min_steps = params
.get("min_steps")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let enabled = params
.get("enabled")
.and_then(|v| v.as_bool())
.unwrap_or(true);
let description = params
.get("description")
.and_then(|v| v.as_str())
.unwrap_or("");
let metric = match metric_str {
"bytes" => pricing::Metric::Bytes,
"milliseconds" | "time" => pricing::Metric::Milliseconds,
"requests" => pricing::Metric::Requests,
_ => return Err(anyhow::anyhow!("Invalid metric: {}", metric_str)),
};
let accepted_mints: Vec<String> = params
.get("accepted_mints")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let service = pricing::ServicePricing {
service_id: service_id.to_string(),
name: name.to_string(),
metric,
step_size,
price_per_step,
min_steps,
enabled,
description: description.to_string(),
accepted_mints,
};
service.validate()?;
let mut config = pricing::load_pricing(&self.config.data_dir).await?;
// Update existing or add new
if let Some(existing) = config
.services
.iter_mut()
.find(|s| s.service_id == service_id)
{
*existing = service.clone();
} else {
config.services.push(service.clone());
}
pricing::save_pricing(&self.config.data_dir, &config).await?;
Ok(serde_json::json!({
"service": service,
"updated": true,
}))
}
/// Enable or disable a streaming service.
pub(super) async fn handle_streaming_toggle_service(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let service_id = params
.get("service_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing service_id"))?;
let enabled = params
.get("enabled")
.and_then(|v| v.as_bool())
.ok_or_else(|| anyhow::anyhow!("Missing enabled"))?;
let mut config = pricing::load_pricing(&self.config.data_dir).await?;
if let Some(service) = config
.services
.iter_mut()
.find(|s| s.service_id == service_id)
{
service.enabled = enabled;
pricing::save_pricing(&self.config.data_dir, &config).await?;
Ok(serde_json::json!({
"service_id": service_id,
"enabled": enabled,
}))
} else {
Err(anyhow::anyhow!("Service '{}' not found", service_id))
}
}
// ── Payment processing ──
/// Process a streaming payment — submit a Cashu token for a service.
/// Returns session details with allotment on success.
pub(super) async fn handle_streaming_pay(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let service_id = params
.get("service_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing service_id"))?;
let token = params
.get("token")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing token (cashuA token string)"))?;
let peer_id = params
.get("peer_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing peer_id"))?;
if token.is_empty() {
return Err(anyhow::anyhow!("Token cannot be empty"));
}
if peer_id.is_empty() {
return Err(anyhow::anyhow!("Peer ID cannot be empty"));
}
let result =
gate::check_gate(&self.config.data_dir, peer_id, service_id, Some(token), 0).await?;
match result {
gate::GateResult::PaidAndAllowed {
session_id,
allotment,
paid_sats,
} => Ok(serde_json::json!({
"status": "paid",
"session_id": session_id,
"allotment": allotment,
"paid_sats": paid_sats,
})),
gate::GateResult::InsufficientPayment {
provided_sats,
minimum_sats,
} => Ok(serde_json::json!({
"status": "insufficient",
"error": { "code": "insufficient_payment", "message": format!("Need {} sats, got {}", minimum_sats, provided_sats) },
"minimum_sats": minimum_sats,
"provided_sats": provided_sats,
})),
gate::GateResult::PaymentFailed { reason } => Ok(serde_json::json!({
"status": "failed",
"error": { "code": "payment_failed", "message": reason },
})),
gate::GateResult::ServiceUnavailable => {
Err(anyhow::anyhow!("Service '{}' not available", service_id))
}
_ => Err(anyhow::anyhow!("Unexpected gate result")),
}
}
/// Discover available streaming services (pricing info).
/// This is the unauthenticated discovery endpoint.
pub(super) async fn handle_streaming_discover(&self) -> Result<serde_json::Value> {
let config = pricing::load_pricing(&self.config.data_dir).await?;
let accepted_mints = ecash::load_accepted_mints(&self.config.data_dir).await?;
let services: Vec<serde_json::Value> = config
.services
.iter()
.filter(|s| s.enabled)
.map(|s| {
let mints = if s.accepted_mints.is_empty() {
&accepted_mints.mints
} else {
&s.accepted_mints
};
serde_json::json!({
"service_id": s.service_id,
"name": s.name,
"description": s.description,
"metric": s.metric,
"step_size": s.step_size,
"price_per_step": s.price_per_step,
"min_steps": s.min_steps,
"minimum_sats": s.minimum_payment(),
"accepted_mints": mints,
})
})
.collect();
Ok(serde_json::json!({
"services": services,
}))
}
// ── Session management ──
/// Check usage for a peer's active session.
pub(super) async fn handle_streaming_usage(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let peer_id = params
.get("peer_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing peer_id"))?;
let service_id = params
.get("service_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing service_id"))?;
match meter::get_peer_usage(&self.config.data_dir, peer_id, service_id).await? {
Some(usage) => Ok(serde_json::json!({ "usage": usage })),
None => Ok(serde_json::json!({
"usage": null,
"message": "No active session",
})),
}
}
/// Get details of a specific session by ID.
pub(super) async fn handle_streaming_session(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let session_id = params
.get("session_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing session_id"))?;
let store = session::load_sessions(&self.config.data_dir).await?;
match store.get(session_id) {
Some(s) => Ok(serde_json::json!({ "session": s })),
None => Err(anyhow::anyhow!("Session not found")),
}
}
/// List all active streaming sessions (admin view).
pub(super) async fn handle_streaming_list_sessions(&self) -> Result<serde_json::Value> {
let store = session::load_sessions(&self.config.data_dir).await?;
let active = store.active_sessions();
let revenue = store.total_revenue();
let by_service = store.revenue_by_service();
Ok(serde_json::json!({
"sessions": active,
"total_active": active.len(),
"total_revenue_sats": revenue,
"revenue_by_service": by_service,
}))
}
/// Close a specific session.
pub(super) async fn handle_streaming_close_session(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let session_id = params
.get("session_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing session_id"))?;
let mut store = session::load_sessions(&self.config.data_dir).await?;
if let Some(s) = store.get_mut(session_id) {
s.close();
session::save_sessions(&self.config.data_dir, &store).await?;
Ok(serde_json::json!({ "closed": true }))
} else {
Err(anyhow::anyhow!("Session not found"))
}
}
// ── Advertisement ──
/// Publish a streaming service advertisement to Nostr relays.
pub(super) async fn handle_streaming_advertise(&self) -> Result<serde_json::Value> {
let config = pricing::load_pricing(&self.config.data_dir).await?;
let accepted_mints = ecash::load_accepted_mints(&self.config.data_dir).await?;
let enabled_count = config.services.iter().filter(|s| s.enabled).count();
if enabled_count == 0 {
return Err(anyhow::anyhow!(
"No enabled services to advertise"
));
}
// Get node's onion address for the endpoint tag
let onion = crate::container::docker_packages::read_tor_address("archipelago").await;
let tags = advertisement::build_advertisement_tags(
&config,
&accepted_mints.mints,
onion.as_deref(),
);
let content = advertisement::build_advertisement_content(&config);
Ok(serde_json::json!({
"kind": advertisement::KIND_SERVICE_ADVERTISEMENT,
"content": content,
"tags": tags,
"services_count": enabled_count,
"ready_to_publish": true,
}))
}
// ── Accepted mints management ──
/// List accepted mints for streaming payments.
pub(super) async fn handle_streaming_list_mints(&self) -> Result<serde_json::Value> {
let mints = ecash::load_accepted_mints(&self.config.data_dir).await?;
Ok(serde_json::json!({ "mints": mints.mints }))
}
/// Add or remove accepted mints.
pub(super) async fn handle_streaming_configure_mints(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let mints = params
.get("mints")
.and_then(|v| v.as_array())
.ok_or_else(|| anyhow::anyhow!("Missing mints array"))?;
let mint_urls: Vec<String> = mints
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
if mint_urls.is_empty() {
return Err(anyhow::anyhow!("Must have at least one accepted mint"));
}
// Basic validation
for url in &mint_urls {
if !url.starts_with("http://") && !url.starts_with("https://") {
return Err(anyhow::anyhow!("Invalid mint URL: {}", url));
}
}
let config = ecash::AcceptedMints {
mints: mint_urls.clone(),
};
ecash::save_accepted_mints(&self.config.data_dir, &config).await?;
Ok(serde_json::json!({
"mints": mint_urls,
"updated": true,
}))
}
// ── Maintenance ──
/// Run streaming maintenance (close expired sessions, prune old records).
pub(super) async fn handle_streaming_maintenance(&self) -> Result<serde_json::Value> {
let closed = meter::maintenance(&self.config.data_dir).await?;
Ok(serde_json::json!({
"expired_closed": closed,
}))
}
}

View File

@@ -9,7 +9,8 @@ impl RpcHandler {
let wallet = ecash::load_wallet(&self.config.data_dir).await?;
Ok(serde_json::json!({
"balance_sats": wallet.balance(),
"token_count": wallet.tokens.iter().filter(|t| !t.spent).count(),
"proof_count": wallet.proofs.iter().filter(|p| !p.spent && !p.reserved).count(),
"mint_url": wallet.mint_url,
}))
}
@@ -27,10 +28,36 @@ impl RpcHandler {
return Err(anyhow::anyhow!("Amount must be between 1 and 1,000,000 sats"));
}
let token = ecash::mint_tokens(&self.config.data_dir, amount_sats).await?;
// Step 1: Get a mint quote (returns Lightning invoice)
let quote = ecash::mint_quote(&self.config.data_dir, amount_sats).await?;
Ok(serde_json::json!({
"token_id": token.id,
"amount_sats": token.amount_sats,
"quote_id": quote.quote,
"bolt11": quote.request,
"state": quote.state,
"amount_sats": amount_sats,
"message": "Pay the Lightning invoice, then call wallet.ecash-mint-claim with the quote_id",
}))
}
/// Claim minted tokens after paying the Lightning invoice.
pub(super) async fn handle_wallet_ecash_mint_claim(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let quote_id = params
.get("quote_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing quote_id"))?;
let amount_sats = params
.get("amount_sats")
.and_then(|v| v.as_u64())
.ok_or_else(|| anyhow::anyhow!("Missing amount_sats"))?;
let minted = ecash::mint_tokens(&self.config.data_dir, quote_id, amount_sats).await?;
Ok(serde_json::json!({
"minted_sats": minted,
}))
}
@@ -39,14 +66,41 @@ impl RpcHandler {
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let token_id = params
.get("token_id")
let bolt11 = params
.get("bolt11")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing token_id"))?;
.ok_or_else(|| anyhow::anyhow!("Missing bolt11 (Lightning invoice)"))?;
// Step 1: Get melt quote
let quote = ecash::melt_quote(&self.config.data_dir, bolt11).await?;
let amount = ecash::melt_tokens(&self.config.data_dir, token_id).await?;
Ok(serde_json::json!({
"melted_sats": amount,
"quote_id": quote.quote,
"amount_sats": quote.amount,
"fee_reserve_sats": quote.fee_reserve,
"total_needed_sats": quote.amount + quote.fee_reserve,
"message": "Call wallet.ecash-melt-confirm with quote_id and bolt11 to execute",
}))
}
/// Confirm and execute a melt (pay Lightning invoice with ecash).
pub(super) async fn handle_wallet_ecash_melt_confirm(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let quote_id = params
.get("quote_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing quote_id"))?;
let bolt11 = params
.get("bolt11")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing bolt11"))?;
let melted = ecash::melt_tokens(&self.config.data_dir, quote_id, bolt11).await?;
Ok(serde_json::json!({
"melted_sats": melted,
}))
}
@@ -100,6 +154,7 @@ impl RpcHandler {
"total_sats": summary.total_sats,
"content_sales_sats": summary.content_sales_sats,
"routing_fees_sats": summary.routing_fees_sats,
"streaming_revenue_sats": summary.streaming_revenue_sats,
"recent": summary.recent,
}))
}

View File

@@ -309,23 +309,99 @@ pub async fn serve_content(
Ok(ServeResult::Ok(bytes, item.mime_type.clone()))
}
/// Verify a payment token covers the required amount.
/// Tokens are ecash strings that we validate and mark as spent.
async fn verify_payment_token(data_dir: &Path, token: &str, required_sats: u64) -> bool {
// Parse cashu token format to verify amount
if token.starts_with("cashuSend_") {
let amount = token
.split('_')
.nth(1)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
if amount >= required_sats {
// Record the payment (receive the token into our wallet)
if let Ok(wallet_mod) = crate::wallet::ecash::receive_token(data_dir, token).await {
debug!("Payment verified: {} sats for {} required", wallet_mod, required_sats);
return true;
/// Result of attempting to serve a preview.
pub enum PreviewResult {
/// Full content (free/peers-only items — redirect to normal serve).
FullContent(Vec<u8>, String),
/// Blurred preview for paid image (full bytes, frontend applies blur).
BlurPreview(Vec<u8>, String),
/// Truncated preview for paid video (first ~2% of bytes).
TruncatedPreview(Vec<u8>, String, u64),
/// Content not found.
NotFound,
}
/// Serve a preview of content by ID. For paid content, returns degraded previews:
/// - Images: full file with X-Content-Preview: blur (frontend applies CSS blur)
/// - Videos: first 2% of file bytes (minimum 512KB for codec headers)
/// - Other: not available
/// For free/peers-only content, returns the full file.
pub async fn serve_content_preview(
data_dir: &Path,
id: &str,
) -> Result<PreviewResult> {
let catalog = load_catalog(data_dir).await?;
let item = match catalog.items.iter().find(|i| i.id == id) {
Some(i) => i,
None => return Ok(PreviewResult::NotFound),
};
// Check availability — don't preview hidden items
if matches!(item.availability, Availability::Nobody) {
return Ok(PreviewResult::NotFound);
}
let file_path = content_file_path(data_dir, item);
if !file_path.exists() {
return Ok(PreviewResult::NotFound);
}
match &item.access {
AccessControl::Paid { .. } => {
let mime = &item.mime_type;
if mime.starts_with("image/") {
// Serve full image — frontend applies CSS blur
let bytes = fs::read(&file_path).await.context("Failed to read preview file")?;
debug!("Serving blur preview for paid image '{}' ({} bytes)", id, bytes.len());
Ok(PreviewResult::BlurPreview(bytes, item.mime_type.clone()))
} else if mime.starts_with("video/") || mime.starts_with("audio/") {
// Serve first 10% of video/audio, minimum 512KB for codec headers
let metadata = fs::metadata(&file_path).await.context("Failed to read file metadata")?;
let total_size = metadata.len();
let preview_bytes = ((total_size * 10) / 100).max(512 * 1024).min(total_size);
use tokio::io::AsyncReadExt;
let mut file = tokio::fs::File::open(&file_path).await.context("Failed to open file")?;
let mut buf = vec![0u8; preview_bytes as usize];
file.read_exact(&mut buf).await.context("Failed to read preview bytes")?;
let kind = if mime.starts_with("video/") { "video" } else { "audio" };
debug!("Serving truncated preview for paid {} '{}' ({}/{} bytes)", kind, id, preview_bytes, total_size);
Ok(PreviewResult::TruncatedPreview(buf, item.mime_type.clone(), total_size))
} else {
// Non-media paid content — no preview available
Ok(PreviewResult::NotFound)
}
}
_ => {
// Free or peers-only — serve full content as preview
let bytes = fs::read(&file_path).await.context("Failed to read content file")?;
Ok(PreviewResult::FullContent(bytes, item.mime_type.clone()))
}
}
}
/// Verify a payment token covers the required amount.
/// Accepts both cashuA tokens (real Cashu) and legacy cashuSend_ format.
/// Swaps proofs at the mint to verify they're unspent before accepting.
async fn verify_payment_token(data_dir: &Path, token: &str, required_sats: u64) -> bool {
match crate::wallet::ecash::verify_and_receive_payment(data_dir, token, required_sats).await {
Ok(received) => {
debug!(
"Payment verified: {} sats received for {} required",
received, required_sats
);
// Record the content sale for profit tracking
if let Err(e) =
crate::wallet::profits::record_content_sale(data_dir, received, "Content download payment").await
{
debug!("Failed to record content sale profit (non-fatal): {}", e);
}
true
}
Err(e) => {
debug!("Payment verification failed: {}", e);
false
}
}
false
}

View File

@@ -36,6 +36,7 @@ mod server;
mod rate_limit;
mod session;
mod state;
mod streaming;
mod totp;
mod wallet;
mod names;

View File

@@ -0,0 +1,354 @@
//! Nostr service advertisements for streaming data payments.
//!
//! Publishes and parses kind 10021 replaceable events (TollGate TIP-01 compatible)
//! that advertise priced services on this node. Peers discover services by
//! querying Nostr relays for these events.
use super::pricing::{PricingConfig, ServicePricing};
use serde::{Deserialize, Serialize};
/// Nostr event kind for service advertisements (TollGate TIP-01).
pub const KIND_SERVICE_ADVERTISEMENT: u16 = 10021;
/// Nostr event kind for session proof (TollGate TIP-01).
pub const KIND_SESSION: u16 = 1022;
/// Nostr event kind for service notice.
pub const KIND_NOTICE: u16 = 21023;
/// A parsed service advertisement from a Nostr event.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceAdvertisement {
/// Publisher's Nostr pubkey (hex).
pub pubkey: String,
/// Tor onion address or clearnet address for connecting.
#[serde(default)]
pub endpoint: String,
/// Advertised services with pricing.
pub services: Vec<AdvertisedService>,
/// Supported protocol versions/TIPs.
#[serde(default)]
pub supported_tips: Vec<String>,
/// Timestamp of the advertisement.
pub created_at: String,
}
/// A single service within an advertisement.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdvertisedService {
pub service_id: String,
pub name: String,
pub metric: String,
pub step_size: u64,
pub price_per_step: u64,
pub unit: String,
pub mint_urls: Vec<String>,
pub min_steps: u64,
pub description: String,
}
impl From<&ServicePricing> for AdvertisedService {
fn from(p: &ServicePricing) -> Self {
Self {
service_id: p.service_id.clone(),
name: p.name.clone(),
metric: p.metric.to_string(),
step_size: p.step_size,
price_per_step: p.price_per_step,
unit: "sat".to_string(),
mint_urls: p.accepted_mints.clone(),
min_steps: p.min_steps,
description: p.description.clone(),
}
}
}
/// Build Nostr event tags for a service advertisement (TIP-01/TIP-02 format).
///
/// Returns a vector of tag arrays suitable for inclusion in a Nostr event.
pub fn build_advertisement_tags(
config: &PricingConfig,
accepted_mints: &[String],
onion_address: Option<&str>,
) -> Vec<Vec<String>> {
let mut tags: Vec<Vec<String>> = Vec::new();
// Endpoint tag (if we have a Tor address)
if let Some(onion) = onion_address {
tags.push(vec!["endpoint".to_string(), onion.to_string()]);
}
// Supported TIPs
tags.push(vec![
"tips".to_string(),
"TIP-01".to_string(),
"TIP-02".to_string(),
]);
// One set of tags per enabled service
for service in config.services.iter().filter(|s| s.enabled) {
tags.push(vec![
"service".to_string(),
service.service_id.clone(),
service.name.clone(),
]);
tags.push(vec![
"metric".to_string(),
service.service_id.clone(),
service.metric.to_string(),
]);
tags.push(vec![
"step_size".to_string(),
service.service_id.clone(),
service.step_size.to_string(),
]);
// Price tags — one per accepted mint (TIP-02 format)
let mints = if service.accepted_mints.is_empty() {
accepted_mints.to_vec()
} else {
service.accepted_mints.clone()
};
for mint_url in &mints {
tags.push(vec![
"price_per_step".to_string(),
service.service_id.clone(),
"cashu".to_string(),
service.price_per_step.to_string(),
"sat".to_string(),
mint_url.clone(),
service.min_steps.to_string(),
]);
}
}
tags
}
/// Build the content string for a kind 10021 advertisement event.
pub fn build_advertisement_content(config: &PricingConfig) -> String {
let enabled: Vec<_> = config.services.iter().filter(|s| s.enabled).collect();
if enabled.is_empty() {
return "No streaming services available".to_string();
}
let mut lines = vec!["Streaming data services available:".to_string()];
for service in &enabled {
lines.push(format!(
"- {} ({}: {} sats per {} {})",
service.name,
service.service_id,
service.price_per_step,
service.step_size,
service.metric,
));
}
lines.join("\n")
}
/// Parse a Nostr event's tags into a ServiceAdvertisement.
pub fn parse_advertisement_tags(
pubkey: &str,
tags: &[Vec<String>],
created_at: &str,
) -> ServiceAdvertisement {
let mut ad = ServiceAdvertisement {
pubkey: pubkey.to_string(),
endpoint: String::new(),
services: Vec::new(),
supported_tips: Vec::new(),
created_at: created_at.to_string(),
};
// Collect service IDs first
let mut service_ids: Vec<String> = Vec::new();
let mut service_names: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
let mut service_metrics: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
let mut service_step_sizes: std::collections::HashMap<String, u64> =
std::collections::HashMap::new();
let mut service_prices: std::collections::HashMap<String, u64> =
std::collections::HashMap::new();
let mut service_mints: std::collections::HashMap<String, Vec<String>> =
std::collections::HashMap::new();
let mut service_min_steps: std::collections::HashMap<String, u64> =
std::collections::HashMap::new();
for tag in tags {
if tag.is_empty() {
continue;
}
match tag[0].as_str() {
"endpoint" if tag.len() >= 2 => {
ad.endpoint = tag[1].clone();
}
"tips" => {
ad.supported_tips = tag[1..].to_vec();
}
"service" if tag.len() >= 3 => {
let sid = tag[1].clone();
service_names.insert(sid.clone(), tag[2].clone());
if !service_ids.contains(&sid) {
service_ids.push(sid);
}
}
"metric" if tag.len() >= 3 => {
service_metrics.insert(tag[1].clone(), tag[2].clone());
}
"step_size" if tag.len() >= 3 => {
if let Ok(v) = tag[2].parse() {
service_step_sizes.insert(tag[1].clone(), v);
}
}
"price_per_step" if tag.len() >= 5 => {
// ["price_per_step", service_id, "cashu", price, "sat", mint_url, min_steps]
let sid = tag[1].clone();
if let Ok(price) = tag[3].parse::<u64>() {
service_prices.insert(sid.clone(), price);
}
if tag.len() >= 6 {
service_mints
.entry(sid.clone())
.or_default()
.push(tag[5].clone());
}
if tag.len() >= 7 {
if let Ok(ms) = tag[6].parse::<u64>() {
service_min_steps.insert(sid, ms);
}
}
}
_ => {}
}
}
// Assemble services
for sid in &service_ids {
ad.services.push(AdvertisedService {
service_id: sid.clone(),
name: service_names.get(sid).cloned().unwrap_or_default(),
metric: service_metrics.get(sid).cloned().unwrap_or_default(),
step_size: service_step_sizes.get(sid).copied().unwrap_or(0),
price_per_step: service_prices.get(sid).copied().unwrap_or(0),
unit: "sat".to_string(),
mint_urls: service_mints.get(sid).cloned().unwrap_or_default(),
min_steps: service_min_steps.get(sid).copied().unwrap_or(0),
description: String::new(),
});
}
ad
}
/// Build a kind 1022 session event content (proof of access grant).
pub fn build_session_event_content(
session_id: &str,
peer_pubkey: &str,
service_id: &str,
allotment: u64,
metric: &str,
paid_sats: u64,
) -> serde_json::Value {
serde_json::json!({
"session_id": session_id,
"customer": peer_pubkey,
"service": service_id,
"allotment": allotment,
"metric": metric,
"paid_sats": paid_sats,
"granted_at": chrono::Utc::now().to_rfc3339(),
})
}
#[cfg(test)]
mod tests {
use super::*;
fn test_config() -> PricingConfig {
PricingConfig {
services: vec![
ServicePricing {
service_id: "content-download".into(),
name: "Content Downloads".into(),
metric: Metric::Bytes,
step_size: 1_048_576,
price_per_step: 1,
min_steps: 0,
enabled: true,
description: "test".into(),
accepted_mints: vec![],
},
ServicePricing {
service_id: "disabled-service".into(),
name: "Disabled".into(),
metric: Metric::Requests,
step_size: 1,
price_per_step: 1,
min_steps: 0,
enabled: false,
description: "disabled".into(),
accepted_mints: vec![],
},
],
}
}
#[test]
fn test_build_advertisement_tags() {
let config = test_config();
let mints = vec!["http://mint.example.com".to_string()];
let tags = build_advertisement_tags(&config, &mints, Some("abc123.onion"));
// Should have endpoint, tips, service, metric, step_size, price_per_step
assert!(tags.iter().any(|t| t[0] == "endpoint"));
assert!(tags.iter().any(|t| t[0] == "tips"));
assert!(tags.iter().any(|t| t[0] == "service" && t[1] == "content-download"));
assert!(tags.iter().any(|t| t[0] == "metric" && t[1] == "content-download"));
assert!(tags.iter().any(|t| t[0] == "price_per_step" && t[1] == "content-download"));
// Disabled service should NOT appear
assert!(!tags.iter().any(|t| t.len() > 1 && t[1] == "disabled-service"));
}
#[test]
fn test_build_advertisement_content() {
let config = test_config();
let content = build_advertisement_content(&config);
assert!(content.contains("Content Downloads"));
assert!(!content.contains("Disabled"));
}
#[test]
fn test_parse_advertisement_tags_roundtrip() {
let config = test_config();
let mints = vec!["http://mint.example.com".to_string()];
let tags = build_advertisement_tags(&config, &mints, Some("abc123.onion"));
let ad = parse_advertisement_tags("deadbeef", &tags, "2025-01-01T00:00:00Z");
assert_eq!(ad.pubkey, "deadbeef");
assert_eq!(ad.endpoint, "abc123.onion");
assert_eq!(ad.services.len(), 1);
assert_eq!(ad.services[0].service_id, "content-download");
assert_eq!(ad.services[0].price_per_step, 1);
assert_eq!(ad.services[0].step_size, 1_048_576);
assert_eq!(ad.services[0].metric, "bytes");
}
#[test]
fn test_build_session_event_content() {
let content = build_session_event_content(
"session-123",
"peer-pubkey",
"content-download",
10_485_760,
"bytes",
10,
);
assert_eq!(content["session_id"], "session-123");
assert_eq!(content["paid_sats"], 10);
}
}

View File

@@ -0,0 +1,281 @@
//! Streaming access gate — controls access to metered services.
//!
//! The gate sits between incoming requests and the resource being served.
//! It checks for active sessions, verifies/receives payments, and
//! records usage against allotments.
use super::meter::{self, MeterDecision};
use super::pricing::{self, ServicePricing};
use super::session::{self};
use crate::wallet::ecash;
use anyhow::Result;
use std::path::Path;
use tracing::{debug, warn};
/// Result of a gate check.
#[derive(Debug)]
pub enum GateResult {
/// Access granted — session is active with sufficient allotment.
Allowed {
session_id: String,
remaining: u64,
},
/// Access granted after accepting payment — new or topped-up session.
PaidAndAllowed {
session_id: String,
allotment: u64,
paid_sats: u64,
},
/// Payment required — no active session and no payment token provided.
PaymentRequired {
service_id: String,
minimum_sats: u64,
pricing: PricingInfo,
},
/// Payment insufficient — token was provided but doesn't meet minimum.
InsufficientPayment {
provided_sats: u64,
minimum_sats: u64,
},
/// Payment failed — token was invalid or couldn't be verified at mint.
PaymentFailed {
reason: String,
},
/// Service not found or not enabled.
ServiceUnavailable,
}
/// Pricing information for the payment-required response.
#[derive(Debug, Clone, serde::Serialize)]
pub struct PricingInfo {
pub metric: String,
pub step_size: u64,
pub price_per_step: u64,
pub min_steps: u64,
pub accepted_mints: Vec<String>,
}
impl From<&ServicePricing> for PricingInfo {
fn from(p: &ServicePricing) -> Self {
Self {
metric: p.metric.to_string(),
step_size: p.step_size,
price_per_step: p.price_per_step,
min_steps: p.min_steps,
accepted_mints: p.accepted_mints.clone(),
}
}
}
/// Check the gate for a streaming service request.
///
/// If `payment_token` is provided (cashuA string), it will be verified and
/// accepted to create or top up a session. If no token is provided, checks
/// for an existing active session.
///
/// `usage_cost` is the cost of the current request in the service's metric units
/// (e.g., bytes for download, 1 for a single API request).
pub async fn check_gate(
data_dir: &Path,
peer_id: &str,
service_id: &str,
payment_token: Option<&str>,
usage_cost: u64,
) -> Result<GateResult> {
// Load pricing config
let config = pricing::load_pricing(data_dir).await?;
let service = match config.get_active_service(service_id) {
Some(s) => s,
None => return Ok(GateResult::ServiceUnavailable),
};
// If payment token provided, process it first
if let Some(token_str) = payment_token {
return process_payment(data_dir, peer_id, service, token_str, usage_cost).await;
}
// No payment — check for existing session
let decision = meter::check_access(data_dir, peer_id, service_id, usage_cost).await?;
match decision {
MeterDecision::Allow {
session_id,
remaining,
} => {
// Record usage
let _ = meter::record_and_check(data_dir, peer_id, service_id, usage_cost).await?;
Ok(GateResult::Allowed {
session_id,
remaining: remaining.saturating_sub(usage_cost),
})
}
MeterDecision::Exhausted { .. } | MeterDecision::NoSession => {
let accepted_mints = if service.accepted_mints.is_empty() {
let wallet_mints = ecash::load_accepted_mints(data_dir).await?;
wallet_mints.mints
} else {
service.accepted_mints.clone()
};
let mut pricing_info = PricingInfo::from(service);
pricing_info.accepted_mints = accepted_mints;
Ok(GateResult::PaymentRequired {
service_id: service_id.to_string(),
minimum_sats: service.minimum_payment(),
pricing: pricing_info,
})
}
MeterDecision::NotMetered => Ok(GateResult::Allowed {
session_id: String::new(),
remaining: u64::MAX,
}),
}
}
/// Process a payment token and create/topup a session.
async fn process_payment(
data_dir: &Path,
peer_id: &str,
service: &ServicePricing,
token_str: &str,
usage_cost: u64,
) -> Result<GateResult> {
let minimum = service.minimum_payment();
// Verify and receive the payment
let received_sats = match ecash::verify_and_receive_payment(data_dir, token_str, minimum).await
{
Ok(amount) => amount,
Err(e) => {
let err_str = e.to_string();
if err_str.contains("Insufficient payment") {
// Try to parse what was provided
let provided = extract_token_amount(token_str);
return Ok(GateResult::InsufficientPayment {
provided_sats: provided,
minimum_sats: minimum,
});
}
warn!("Payment verification failed for peer {}: {}", peer_id, e);
return Ok(GateResult::PaymentFailed {
reason: err_str,
});
}
};
// Create or top-up session
let mut store = session::load_sessions(data_dir).await?;
let session = store.create_or_topup(peer_id, &service.service_id, service, received_sats);
let session_id = session.id.clone();
let allotment = session.allotment;
// Record initial usage if applicable
if usage_cost > 0 {
if let Some(s) = store.get_mut(&session_id) {
s.record_usage(usage_cost);
}
}
session::save_sessions(data_dir, &store).await?;
// Record the streaming revenue
let mut wallet = ecash::load_wallet(data_dir).await?;
wallet.record_tx(
ecash::TransactionType::StreamingRevenue,
received_sats,
&format!(
"Streaming payment: {} sats for {} from {}",
received_sats, service.service_id, peer_id
),
&wallet.mint_url.clone(),
peer_id,
);
ecash::save_wallet(data_dir, &wallet).await?;
debug!(
"Gate: accepted {} sats from {} for {}, allotment={}",
received_sats, peer_id, service.service_id, allotment
);
Ok(GateResult::PaidAndAllowed {
session_id,
allotment,
paid_sats: received_sats,
})
}
/// Try to extract the total amount from a token string (best-effort for error messages).
fn extract_token_amount(token_str: &str) -> u64 {
// Try cashuA format
if let Ok(token) = super::super::wallet::cashu::CashuToken::deserialize(token_str) {
return token.total_amount();
}
// Try legacy format
if token_str.starts_with("cashuSend_") {
return token_str
.split('_')
.nth(1)
.and_then(|s| s.parse().ok())
.unwrap_or(0);
}
0
}
/// Quick check: does a peer have an active session for a service?
/// Lighter weight than check_gate — doesn't record usage or process payments.
pub async fn has_active_session(
data_dir: &Path,
peer_id: &str,
service_id: &str,
) -> Result<bool> {
let store = session::load_sessions(data_dir).await?;
Ok(store.find_active(peer_id, service_id).is_some())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_gate_service_unavailable() {
let tmp = TempDir::new().unwrap();
let result = check_gate(tmp.path(), "peer1", "nonexistent", None, 1)
.await
.unwrap();
assert!(matches!(result, GateResult::ServiceUnavailable));
}
#[tokio::test]
async fn test_gate_payment_required_default_services() {
let tmp = TempDir::new().unwrap();
// Enable a service first
let mut config = pricing::load_pricing(tmp.path()).await.unwrap();
config.services[0].enabled = true; // content-download
pricing::save_pricing(tmp.path(), &config).await.unwrap();
let result = check_gate(tmp.path(), "peer1", "content-download", None, 1024)
.await
.unwrap();
match result {
GateResult::PaymentRequired {
minimum_sats,
pricing,
..
} => {
assert_eq!(pricing.metric, "bytes");
assert!(minimum_sats > 0);
}
other => panic!("Expected PaymentRequired, got {:?}", other),
}
}
#[tokio::test]
async fn test_has_active_session_false() {
let tmp = TempDir::new().unwrap();
assert!(!has_active_session(tmp.path(), "peer1", "test").await.unwrap());
}
}

View File

@@ -0,0 +1,235 @@
//! Usage metering engine for streaming data payments.
//!
//! Tracks resource consumption (bytes, time, requests) per session
//! and enforces allotment limits.
use super::pricing::Metric;
use super::session::{self, StreamingSession};
use anyhow::Result;
use std::path::Path;
use tracing::debug;
/// Result of checking whether a request should be allowed.
#[derive(Debug)]
pub enum MeterDecision {
/// Request allowed — session has sufficient allotment.
Allow {
session_id: String,
remaining: u64,
},
/// Request denied — session exhausted or expired.
Exhausted {
session_id: String,
},
/// No active session found for this peer+service.
NoSession,
/// Service is not configured for metering (free access).
NotMetered,
}
/// Record usage for a peer's session and check if they can continue.
pub async fn record_and_check(
data_dir: &Path,
peer_id: &str,
service_id: &str,
usage_amount: u64,
) -> Result<MeterDecision> {
let mut store = session::load_sessions(data_dir).await?;
let decision = match store.find_active_mut(peer_id, service_id) {
Some(session) => {
let still_active = session.record_usage(usage_amount);
let session_id = session.id.clone();
let remaining = session.remaining();
if still_active {
debug!(
"Meter: peer={} service={} used={} remaining={}",
peer_id, service_id, usage_amount, remaining
);
MeterDecision::Allow {
session_id,
remaining,
}
} else {
debug!(
"Meter: peer={} service={} exhausted (used={})",
peer_id, service_id, session.used
);
session.close();
MeterDecision::Exhausted { session_id }
}
}
None => MeterDecision::NoSession,
};
session::save_sessions(data_dir, &store).await?;
Ok(decision)
}
/// Check if a peer has an active session for a service without recording usage.
pub async fn check_access(
data_dir: &Path,
peer_id: &str,
service_id: &str,
required_amount: u64,
) -> Result<MeterDecision> {
let store = session::load_sessions(data_dir).await?;
match store.find_active(peer_id, service_id) {
Some(session) => {
if session.can_serve(required_amount) {
Ok(MeterDecision::Allow {
session_id: session.id.clone(),
remaining: session.remaining(),
})
} else {
Ok(MeterDecision::Exhausted {
session_id: session.id.clone(),
})
}
}
None => Ok(MeterDecision::NoSession),
}
}
/// Get usage summary for a session.
pub async fn get_usage(data_dir: &Path, session_id: &str) -> Result<Option<UsageSummary>> {
let store = session::load_sessions(data_dir).await?;
Ok(store.get(session_id).map(|s| UsageSummary::from_session(s)))
}
/// Get usage summary for a peer's active session on a service.
pub async fn get_peer_usage(
data_dir: &Path,
peer_id: &str,
service_id: &str,
) -> Result<Option<UsageSummary>> {
let store = session::load_sessions(data_dir).await?;
Ok(store
.find_active(peer_id, service_id)
.map(|s| UsageSummary::from_session(s)))
}
/// Usage summary for display.
#[derive(Debug, Clone, serde::Serialize)]
pub struct UsageSummary {
pub session_id: String,
pub metric: Metric,
pub allotment: u64,
pub used: u64,
pub remaining: u64,
pub paid_sats: u64,
pub active: bool,
/// Human-readable usage string (e.g., "5.2 MB / 10 MB").
pub display: String,
}
impl UsageSummary {
pub fn from_session(session: &StreamingSession) -> Self {
let remaining = session.remaining();
let display = format_usage(session.metric, session.used, session.allotment);
Self {
session_id: session.id.clone(),
metric: session.metric,
allotment: session.allotment,
used: session.used,
remaining,
paid_sats: session.paid_sats,
active: session.active && !session.is_expired(),
display,
}
}
}
/// Format usage as a human-readable string.
fn format_usage(metric: Metric, used: u64, allotment: u64) -> String {
match metric {
Metric::Bytes => {
format!("{} / {}", format_bytes(used), format_bytes(allotment))
}
Metric::Milliseconds => {
format!(
"{} / {}",
format_duration_ms(used),
format_duration_ms(allotment)
)
}
Metric::Requests => {
format!("{} / {} requests", used, allotment)
}
}
}
/// Format bytes as human-readable (KB, MB, GB).
fn format_bytes(bytes: u64) -> String {
if bytes < 1024 {
format!("{} B", bytes)
} else if bytes < 1_048_576 {
format!("{:.1} KB", bytes as f64 / 1024.0)
} else if bytes < 1_073_741_824 {
format!("{:.1} MB", bytes as f64 / 1_048_576.0)
} else {
format!("{:.2} GB", bytes as f64 / 1_073_741_824.0)
}
}
/// Format milliseconds as human-readable duration.
fn format_duration_ms(ms: u64) -> String {
if ms < 1000 {
format!("{}ms", ms)
} else if ms < 60_000 {
format!("{:.1}s", ms as f64 / 1000.0)
} else if ms < 3_600_000 {
format!("{:.1}m", ms as f64 / 60_000.0)
} else {
format!("{:.1}h", ms as f64 / 3_600_000.0)
}
}
/// Run periodic maintenance: close expired sessions, prune old records.
pub async fn maintenance(data_dir: &Path) -> Result<usize> {
let mut store = session::load_sessions(data_dir).await?;
let closed = store.close_expired();
store.prune_old();
session::save_sessions(data_dir, &store).await?;
if closed > 0 {
debug!("Meter maintenance: closed {} expired sessions", closed);
}
Ok(closed)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_format_bytes() {
assert_eq!(format_bytes(500), "500 B");
assert_eq!(format_bytes(1536), "1.5 KB");
assert_eq!(format_bytes(5_242_880), "5.0 MB");
assert_eq!(format_bytes(1_610_612_736), "1.50 GB");
}
#[test]
fn test_format_duration_ms() {
assert_eq!(format_duration_ms(500), "500ms");
assert_eq!(format_duration_ms(2500), "2.5s");
assert_eq!(format_duration_ms(90_000), "1.5m");
assert_eq!(format_duration_ms(5_400_000), "1.5h");
}
#[test]
fn test_format_usage_bytes() {
let display = format_usage(Metric::Bytes, 5_242_880, 10_485_760);
assert_eq!(display, "5.0 MB / 10.0 MB");
}
#[test]
fn test_format_usage_requests() {
let display = format_usage(Metric::Requests, 3, 10);
assert_eq!(display, "3 / 10 requests");
}
}

View File

@@ -0,0 +1,39 @@
//! Streaming ecash payments for metered data access.
//!
//! Implements a TollGate-inspired protocol for paying for streaming data
//! using Cashu ecash micropayments. Supports three metering models:
//!
//! - **Bytes**: Pay per MB downloaded (content, federation sync)
//! - **Time**: Pay per minute of access (relay, API endpoints)
//! - **Requests**: Pay per API call
//!
//! # Architecture
//!
//! ```text
//! Paying Node Selling Node
//! ┌──────────────┐ ┌──────────────────────┐
//! │ Cashu Wallet │──cashuA token──────▶│ Gate │
//! │ (real BDHKE) │ │ verify + receive │
//! └──────────────┘ └──────┬───────────────┘
//! │ create/topup session
//! ┌──────▼───────────────┐
//! │ Meter │
//! │ track usage │
//! └──────┬───────────────┘
//! │ enforce allotment
//! ┌──────▼───────────────┐
//! │ Service │
//! │ content / sync / api │
//! └──────────────────────┘
//! ```
//!
//! # Discovery
//!
//! Services are advertised via Nostr kind 10021 events (TollGate TIP-01
//! compatible) containing pricing tags per TIP-02.
pub mod advertisement;
pub mod gate;
pub mod meter;
pub mod pricing;
pub mod session;

View File

@@ -0,0 +1,362 @@
//! Streaming data pricing configuration.
//!
//! Follows TollGate TIP-02 pricing model:
//! - step_size: granularity of purchase (bytes, milliseconds, or request count)
//! - price_per_step: cost in sats for one step
//! - min_steps: minimum purchase requirement
//! - metric: what is being metered (bytes, time, requests)
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::path::Path;
use tokio::fs;
const PRICING_FILE: &str = "streaming/pricing.json";
/// What resource is being metered.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Metric {
/// Bytes transferred.
Bytes,
/// Time in milliseconds.
Milliseconds,
/// Number of API requests.
Requests,
}
impl std::fmt::Display for Metric {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Metric::Bytes => write!(f, "bytes"),
Metric::Milliseconds => write!(f, "milliseconds"),
Metric::Requests => write!(f, "requests"),
}
}
}
/// Pricing configuration for a specific service.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServicePricing {
/// Unique service identifier.
pub service_id: String,
/// Human-readable service name.
pub name: String,
/// What is being metered.
pub metric: Metric,
/// Size of one step in the metric's unit.
/// e.g., 1_048_576 for 1MB steps, 60_000 for 1-minute steps, 1 for per-request.
pub step_size: u64,
/// Price in sats for one step.
pub price_per_step: u64,
/// Minimum number of steps per purchase (0 = no minimum).
#[serde(default)]
pub min_steps: u64,
/// Whether this service is currently active/accepting payments.
#[serde(default = "default_true")]
pub enabled: bool,
/// Description of what this service provides.
#[serde(default)]
pub description: String,
/// Accepted mint URLs (empty = use wallet defaults).
#[serde(default)]
pub accepted_mints: Vec<String>,
}
fn default_true() -> bool {
true
}
impl ServicePricing {
/// Calculate allotment (in metric units) for a given payment amount.
pub fn calculate_allotment(&self, paid_sats: u64) -> u64 {
if self.price_per_step == 0 {
return 0;
}
let steps = paid_sats / self.price_per_step;
steps * self.step_size
}
/// Calculate the minimum payment required.
pub fn minimum_payment(&self) -> u64 {
if self.min_steps == 0 {
self.price_per_step // At least one step
} else {
self.min_steps * self.price_per_step
}
}
/// Calculate how many sats are needed for a specific allotment.
pub fn cost_for_allotment(&self, allotment: u64) -> u64 {
if self.step_size == 0 {
return 0;
}
let steps = (allotment + self.step_size - 1) / self.step_size; // ceiling division
steps * self.price_per_step
}
/// Validate that this pricing config is sensible.
pub fn validate(&self) -> Result<()> {
if self.service_id.is_empty() {
anyhow::bail!("Service ID cannot be empty");
}
if self.step_size == 0 {
anyhow::bail!("Step size must be > 0");
}
if self.price_per_step == 0 {
anyhow::bail!("Price per step must be > 0");
}
if self.name.is_empty() {
anyhow::bail!("Service name cannot be empty");
}
Ok(())
}
}
/// All pricing configurations for this node.
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct PricingConfig {
pub services: Vec<ServicePricing>,
}
impl PricingConfig {
/// Find pricing for a service by ID.
pub fn get_service(&self, service_id: &str) -> Option<&ServicePricing> {
self.services.iter().find(|s| s.service_id == service_id)
}
/// Find enabled pricing for a service by ID.
pub fn get_active_service(&self, service_id: &str) -> Option<&ServicePricing> {
self.services
.iter()
.find(|s| s.service_id == service_id && s.enabled)
}
}
/// Load pricing config from disk.
pub async fn load_pricing(data_dir: &Path) -> Result<PricingConfig> {
let path = data_dir.join(PRICING_FILE);
if !path.exists() {
return Ok(default_pricing());
}
let content = fs::read_to_string(&path)
.await
.context("Failed to read pricing config")?;
let config: PricingConfig = serde_json::from_str(&content).unwrap_or_else(|_| default_pricing());
Ok(config)
}
/// Save pricing config to disk.
pub async fn save_pricing(data_dir: &Path, config: &PricingConfig) -> Result<()> {
let dir = data_dir.join("streaming");
fs::create_dir_all(&dir)
.await
.context("Failed to create streaming dir")?;
let path = data_dir.join(PRICING_FILE);
let content =
serde_json::to_string_pretty(config).context("Failed to serialize pricing config")?;
fs::write(&path, content)
.await
.context("Failed to write pricing config")?;
Ok(())
}
/// Default pricing config with common services pre-configured (disabled).
fn default_pricing() -> PricingConfig {
PricingConfig {
services: vec![
ServicePricing {
service_id: "content-download".to_string(),
name: "Content Downloads".to_string(),
metric: Metric::Bytes,
step_size: 1_048_576, // 1 MB
price_per_step: 1, // 1 sat per MB
min_steps: 0,
enabled: false,
description: "Pay-per-byte content downloads from this node".to_string(),
accepted_mints: vec![],
},
ServicePricing {
service_id: "federation-sync".to_string(),
name: "Federation Sync Access".to_string(),
metric: Metric::Milliseconds,
step_size: 60_000, // 1 minute
price_per_step: 1, // 1 sat per minute
min_steps: 5, // 5 minute minimum
enabled: false,
description: "Timed access to federation sync endpoint".to_string(),
accepted_mints: vec![],
},
ServicePricing {
service_id: "api-access".to_string(),
name: "API Access".to_string(),
metric: Metric::Requests,
step_size: 1, // Per request
price_per_step: 1, // 1 sat per request
min_steps: 10, // 10 request minimum
enabled: false,
description: "Per-request API access for external consumers".to_string(),
accepted_mints: vec![],
},
ServicePricing {
service_id: "nostr-relay".to_string(),
name: "Nostr Relay Access".to_string(),
metric: Metric::Milliseconds,
step_size: 3_600_000, // 1 hour
price_per_step: 10, // 10 sats per hour
min_steps: 1,
enabled: false,
description: "Timed access to the local Nostr relay".to_string(),
accepted_mints: vec![],
},
],
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_calculate_allotment_bytes() {
let pricing = ServicePricing {
service_id: "test".into(),
name: "Test".into(),
metric: Metric::Bytes,
step_size: 1_048_576, // 1 MB
price_per_step: 1,
min_steps: 0,
enabled: true,
description: String::new(),
accepted_mints: vec![],
};
// 10 sats = 10 MB
assert_eq!(pricing.calculate_allotment(10), 10_485_760);
}
#[test]
fn test_calculate_allotment_time() {
let pricing = ServicePricing {
service_id: "test".into(),
name: "Test".into(),
metric: Metric::Milliseconds,
step_size: 60_000, // 1 minute
price_per_step: 2,
min_steps: 0,
enabled: true,
description: String::new(),
accepted_mints: vec![],
};
// 10 sats at 2 sats/min = 5 minutes = 300,000 ms
assert_eq!(pricing.calculate_allotment(10), 300_000);
}
#[test]
fn test_minimum_payment() {
let pricing = ServicePricing {
service_id: "test".into(),
name: "Test".into(),
metric: Metric::Requests,
step_size: 1,
price_per_step: 1,
min_steps: 10,
enabled: true,
description: String::new(),
accepted_mints: vec![],
};
assert_eq!(pricing.minimum_payment(), 10);
}
#[test]
fn test_cost_for_allotment() {
let pricing = ServicePricing {
service_id: "test".into(),
name: "Test".into(),
metric: Metric::Bytes,
step_size: 1_048_576,
price_per_step: 1,
min_steps: 0,
enabled: true,
description: String::new(),
accepted_mints: vec![],
};
// 5 MB costs 5 sats
assert_eq!(pricing.cost_for_allotment(5_242_880), 5);
// 1.5 MB rounds up to 2 sats
assert_eq!(pricing.cost_for_allotment(1_572_864), 2);
}
#[test]
fn test_validate_pricing() {
let good = ServicePricing {
service_id: "test".into(),
name: "Test".into(),
metric: Metric::Bytes,
step_size: 1024,
price_per_step: 1,
min_steps: 0,
enabled: true,
description: String::new(),
accepted_mints: vec![],
};
assert!(good.validate().is_ok());
let bad_step = ServicePricing { step_size: 0, ..good.clone() };
assert!(bad_step.validate().is_err());
let bad_price = ServicePricing { price_per_step: 0, ..good.clone() };
assert!(bad_price.validate().is_err());
}
#[tokio::test]
async fn test_load_default_pricing() {
let tmp = TempDir::new().unwrap();
let config = load_pricing(tmp.path()).await.unwrap();
assert_eq!(config.services.len(), 4);
// All disabled by default
assert!(config.services.iter().all(|s| !s.enabled));
}
#[tokio::test]
async fn test_save_load_roundtrip() {
let tmp = TempDir::new().unwrap();
let config = PricingConfig {
services: vec![ServicePricing {
service_id: "custom".into(),
name: "Custom Service".into(),
metric: Metric::Requests,
step_size: 1,
price_per_step: 5,
min_steps: 1,
enabled: true,
description: "custom".into(),
accepted_mints: vec!["http://mint".into()],
}],
};
save_pricing(tmp.path(), &config).await.unwrap();
let loaded = load_pricing(tmp.path()).await.unwrap();
assert_eq!(loaded.services.len(), 1);
assert_eq!(loaded.services[0].price_per_step, 5);
}
#[test]
fn test_get_service() {
let config = default_pricing();
assert!(config.get_service("content-download").is_some());
assert!(config.get_service("nonexistent").is_none());
// All disabled by default
assert!(config.get_active_service("content-download").is_none());
}
#[test]
fn test_metric_display() {
assert_eq!(format!("{}", Metric::Bytes), "bytes");
assert_eq!(format!("{}", Metric::Milliseconds), "milliseconds");
assert_eq!(format!("{}", Metric::Requests), "requests");
}
}

View File

@@ -0,0 +1,436 @@
//! Streaming session management.
//!
//! Tracks active metered sessions: which peer has how much allotment remaining
//! for which service. Supports incremental top-ups (TollGate-style).
use super::pricing::{Metric, ServicePricing};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
use tokio::fs;
const SESSIONS_FILE: &str = "streaming/sessions.json";
/// A single streaming session.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingSession {
/// Unique session ID.
pub id: String,
/// Peer identifier (Nostr pubkey, DID, or onion address).
pub peer_id: String,
/// Service this session is for.
pub service_id: String,
/// Metric type for this session.
pub metric: Metric,
/// Total allotment granted (in metric units).
pub allotment: u64,
/// Amount consumed so far (in metric units).
pub used: u64,
/// Total sats paid for this session.
pub paid_sats: u64,
/// When the session was created.
pub created_at: String,
/// When the session was last topped up.
pub last_topup_at: String,
/// When the session expires (for time-based: created_at + allotment_ms).
/// Empty string for non-time-based sessions.
#[serde(default)]
pub expires_at: String,
/// Whether the session is still active.
#[serde(default = "default_true")]
pub active: bool,
}
fn default_true() -> bool {
true
}
impl StreamingSession {
/// Create a new session from a payment.
pub fn new(
peer_id: &str,
service_id: &str,
pricing: &ServicePricing,
paid_sats: u64,
) -> Self {
let allotment = pricing.calculate_allotment(paid_sats);
let now = chrono::Utc::now();
let now_str = now.to_rfc3339();
let expires_at = if pricing.metric == Metric::Milliseconds {
let expires =
now + chrono::Duration::milliseconds(allotment as i64);
expires.to_rfc3339()
} else {
String::new()
};
Self {
id: uuid::Uuid::new_v4().to_string(),
peer_id: peer_id.to_string(),
service_id: service_id.to_string(),
metric: pricing.metric,
allotment,
used: 0,
paid_sats,
created_at: now_str.clone(),
last_topup_at: now_str,
expires_at,
active: true,
}
}
/// Add more allotment from an additional payment (top-up).
pub fn topup(&mut self, pricing: &ServicePricing, additional_sats: u64) {
let additional_allotment = pricing.calculate_allotment(additional_sats);
self.allotment += additional_allotment;
self.paid_sats += additional_sats;
self.last_topup_at = chrono::Utc::now().to_rfc3339();
// For time-based: extend expiry
if self.metric == Metric::Milliseconds {
let current_expires = chrono::DateTime::parse_from_rfc3339(&self.expires_at)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
let new_expires = current_expires
+ chrono::Duration::milliseconds(additional_allotment as i64);
self.expires_at = new_expires.to_rfc3339();
}
// Reactivate if it was closed
self.active = true;
}
/// Record usage and check if the session is still within its allotment.
pub fn record_usage(&mut self, amount: u64) -> bool {
self.used += amount;
self.remaining() > 0 && !self.is_expired()
}
/// Remaining allotment.
pub fn remaining(&self) -> u64 {
self.allotment.saturating_sub(self.used)
}
/// Check if a time-based session has expired.
pub fn is_expired(&self) -> bool {
if !self.active {
return true;
}
if self.metric == Metric::Milliseconds && !self.expires_at.is_empty() {
if let Ok(expires) = chrono::DateTime::parse_from_rfc3339(&self.expires_at) {
return chrono::Utc::now() > expires.with_timezone(&chrono::Utc);
}
}
// For non-time-based: expired when allotment consumed
self.used >= self.allotment
}
/// Check if this session can serve a request of the given cost.
pub fn can_serve(&self, cost: u64) -> bool {
self.active && !self.is_expired() && self.remaining() >= cost
}
/// Close the session.
pub fn close(&mut self) {
self.active = false;
}
}
/// All active and recent sessions.
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct SessionStore {
pub sessions: Vec<StreamingSession>,
}
impl SessionStore {
/// Find an active session for a peer and service.
pub fn find_active(&self, peer_id: &str, service_id: &str) -> Option<&StreamingSession> {
self.sessions
.iter()
.find(|s| s.peer_id == peer_id && s.service_id == service_id && s.active && !s.is_expired())
}
/// Find a mutable active session for a peer and service.
pub fn find_active_mut(
&mut self,
peer_id: &str,
service_id: &str,
) -> Option<&mut StreamingSession> {
self.sessions
.iter_mut()
.find(|s| s.peer_id == peer_id && s.service_id == service_id && s.active && !s.is_expired())
}
/// Get a session by ID.
pub fn get(&self, session_id: &str) -> Option<&StreamingSession> {
self.sessions.iter().find(|s| s.id == session_id)
}
/// Get a mutable session by ID.
pub fn get_mut(&mut self, session_id: &str) -> Option<&mut StreamingSession> {
self.sessions.iter_mut().find(|s| s.id == session_id)
}
/// List all active sessions.
pub fn active_sessions(&self) -> Vec<&StreamingSession> {
self.sessions
.iter()
.filter(|s| s.active && !s.is_expired())
.collect()
}
/// List all sessions for a peer.
pub fn sessions_for_peer(&self, peer_id: &str) -> Vec<&StreamingSession> {
self.sessions
.iter()
.filter(|s| s.peer_id == peer_id)
.collect()
}
/// Close expired sessions and return how many were closed.
pub fn close_expired(&mut self) -> usize {
let mut closed = 0;
for session in &mut self.sessions {
if session.active && session.is_expired() {
session.active = false;
closed += 1;
}
}
closed
}
/// Prune inactive sessions older than 7 days.
pub fn prune_old(&mut self) {
let cutoff = (chrono::Utc::now() - chrono::Duration::days(7)).to_rfc3339();
self.sessions
.retain(|s| s.active || s.created_at > cutoff);
}
/// Create or top-up a session for a peer+service.
pub fn create_or_topup(
&mut self,
peer_id: &str,
service_id: &str,
pricing: &ServicePricing,
paid_sats: u64,
) -> &StreamingSession {
// Check for existing active session
if let Some(session) = self.find_active_mut(peer_id, service_id) {
session.topup(pricing, paid_sats);
let id = session.id.clone();
return self.get(&id).unwrap();
}
// Create new session
let session = StreamingSession::new(peer_id, service_id, pricing, paid_sats);
self.sessions.push(session);
self.sessions.last().unwrap()
}
/// Total revenue from all sessions.
pub fn total_revenue(&self) -> u64 {
self.sessions.iter().map(|s| s.paid_sats).sum()
}
/// Total revenue by service.
pub fn revenue_by_service(&self) -> HashMap<String, u64> {
let mut map = HashMap::new();
for session in &self.sessions {
*map.entry(session.service_id.clone()).or_insert(0) += session.paid_sats;
}
map
}
}
/// Load sessions from disk.
pub async fn load_sessions(data_dir: &Path) -> Result<SessionStore> {
let path = data_dir.join(SESSIONS_FILE);
if !path.exists() {
return Ok(SessionStore::default());
}
let content = fs::read_to_string(&path)
.await
.context("Failed to read sessions file")?;
let store: SessionStore = serde_json::from_str(&content).unwrap_or_default();
Ok(store)
}
/// Save sessions to disk.
pub async fn save_sessions(data_dir: &Path, store: &SessionStore) -> Result<()> {
let dir = data_dir.join("streaming");
fs::create_dir_all(&dir)
.await
.context("Failed to create streaming dir")?;
let path = data_dir.join(SESSIONS_FILE);
let content =
serde_json::to_string_pretty(store).context("Failed to serialize sessions")?;
fs::write(&path, content)
.await
.context("Failed to write sessions file")?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn test_pricing(metric: Metric) -> ServicePricing {
ServicePricing {
service_id: "test".into(),
name: "Test".into(),
metric,
step_size: match metric {
Metric::Bytes => 1_048_576,
Metric::Milliseconds => 60_000,
Metric::Requests => 1,
},
price_per_step: 1,
min_steps: 0,
enabled: true,
description: String::new(),
accepted_mints: vec![],
}
}
#[test]
fn test_new_session_bytes() {
let pricing = test_pricing(Metric::Bytes);
let session = StreamingSession::new("peer1", "test", &pricing, 10);
assert_eq!(session.allotment, 10_485_760); // 10 MB
assert_eq!(session.used, 0);
assert_eq!(session.paid_sats, 10);
assert!(session.active);
assert!(session.expires_at.is_empty());
}
#[test]
fn test_new_session_time() {
let pricing = test_pricing(Metric::Milliseconds);
let session = StreamingSession::new("peer1", "test", &pricing, 5);
assert_eq!(session.allotment, 300_000); // 5 minutes
assert!(!session.expires_at.is_empty());
assert!(!session.is_expired());
}
#[test]
fn test_session_topup() {
let pricing = test_pricing(Metric::Bytes);
let mut session = StreamingSession::new("peer1", "test", &pricing, 10);
assert_eq!(session.allotment, 10_485_760);
session.topup(&pricing, 5);
assert_eq!(session.allotment, 15_728_640); // 15 MB
assert_eq!(session.paid_sats, 15);
}
#[test]
fn test_session_record_usage() {
let pricing = test_pricing(Metric::Requests);
let mut session = StreamingSession::new("peer1", "test", &pricing, 5);
assert_eq!(session.allotment, 5);
assert!(session.record_usage(1));
assert!(session.record_usage(1));
assert!(session.record_usage(1));
assert!(session.record_usage(1));
assert!(!session.record_usage(1)); // 5th consumes last
assert_eq!(session.remaining(), 0);
}
#[test]
fn test_session_can_serve() {
let pricing = test_pricing(Metric::Requests);
let session = StreamingSession::new("peer1", "test", &pricing, 3);
assert!(session.can_serve(1));
assert!(session.can_serve(3));
assert!(!session.can_serve(4));
}
#[test]
fn test_session_close() {
let pricing = test_pricing(Metric::Requests);
let mut session = StreamingSession::new("peer1", "test", &pricing, 5);
assert!(session.active);
session.close();
assert!(!session.active);
assert!(session.is_expired());
}
#[test]
fn test_session_store_create_or_topup() {
let pricing = test_pricing(Metric::Requests);
let mut store = SessionStore::default();
// First payment creates session
let s1 = store.create_or_topup("peer1", "test", &pricing, 10);
let s1_id = s1.id.clone();
assert_eq!(s1.allotment, 10);
assert_eq!(s1.paid_sats, 10);
// Second payment tops up
let s2 = store.create_or_topup("peer1", "test", &pricing, 5);
assert_eq!(s2.id, s1_id); // Same session
assert_eq!(s2.allotment, 15);
assert_eq!(s2.paid_sats, 15);
}
#[test]
fn test_session_store_different_peers() {
let pricing = test_pricing(Metric::Requests);
let mut store = SessionStore::default();
store.create_or_topup("peer1", "test", &pricing, 10);
store.create_or_topup("peer2", "test", &pricing, 20);
assert_eq!(store.active_sessions().len(), 2);
}
#[test]
fn test_close_expired() {
let pricing = test_pricing(Metric::Requests);
let mut store = SessionStore::default();
store.create_or_topup("peer1", "test", &pricing, 1);
// Consume the allotment
if let Some(s) = store.find_active_mut("peer1", "test") {
s.record_usage(1);
}
let closed = store.close_expired();
assert_eq!(closed, 1);
}
#[test]
fn test_revenue_tracking() {
let pricing = test_pricing(Metric::Requests);
let mut store = SessionStore::default();
store.create_or_topup("peer1", "test", &pricing, 100);
store.create_or_topup("peer2", "test", &pricing, 200);
assert_eq!(store.total_revenue(), 300);
let by_service = store.revenue_by_service();
assert_eq!(*by_service.get("test").unwrap(), 300);
}
#[tokio::test]
async fn test_load_save_sessions() {
let tmp = TempDir::new().unwrap();
let pricing = test_pricing(Metric::Bytes);
let mut store = SessionStore::default();
store.create_or_topup("peer1", "test", &pricing, 42);
save_sessions(tmp.path(), &store).await.unwrap();
let loaded = load_sessions(tmp.path()).await.unwrap();
assert_eq!(loaded.sessions.len(), 1);
assert_eq!(loaded.sessions[0].paid_sats, 42);
}
}

View File

@@ -0,0 +1,216 @@
//! Blind Diffie-Hellman Key Exchange (BDHKE) for Cashu ecash.
//!
//! Implements NUT-00 cryptographic operations:
//! - hash_to_curve: deterministic point derivation from secret
//! - blind: create blinded message for mint signing
//! - unblind: remove blinding factor from mint signature
//! - verify: verify unblinded signature against mint pubkey
use anyhow::{Context, Result};
use bitcoin::secp256k1::{PublicKey, Scalar, Secp256k1, SecretKey};
use sha2::{Digest, Sha256};
/// Domain separator for hash_to_curve per NUT-00 spec.
const DOMAIN_SEPARATOR: &[u8] = b"Secp256k1_HashToCurve_Cashu_";
/// Hash a message to a secp256k1 curve point (NUT-00).
///
/// Iteratively hashes `sha256(sha256(domain_separator || msg) || counter)` until
/// the result is a valid x-coordinate on secp256k1. Prepends 0x02 to try as
/// a compressed public key.
pub fn hash_to_curve(message: &[u8]) -> Result<PublicKey> {
let msg_hash = {
let mut hasher = Sha256::new();
hasher.update(DOMAIN_SEPARATOR);
hasher.update(message);
hasher.finalize()
};
for counter in 0u32..65536 {
let mut hasher = Sha256::new();
hasher.update(&msg_hash);
hasher.update(counter.to_le_bytes());
let hash = hasher.finalize();
// Try to construct a point: 0x02 || hash (compressed even-y format)
let mut point_bytes = [0u8; 33];
point_bytes[0] = 0x02;
point_bytes[1..].copy_from_slice(&hash);
if let Ok(pk) = PublicKey::from_slice(&point_bytes) {
return Ok(pk);
}
}
Err(anyhow::anyhow!(
"hash_to_curve: no valid point found after 65536 iterations"
))
}
/// Blinded message output from the client.
pub struct BlindedMessage {
/// The blinded point B_ = Y + r*G
pub b_prime: PublicKey,
/// The blinding factor (kept secret by client)
pub r: SecretKey,
/// The original secret
pub secret: Vec<u8>,
}
/// Create a blinded message for the mint to sign.
///
/// Given a secret, computes Y = hash_to_curve(secret), picks random r,
/// and returns B_ = Y + r*G along with the blinding factor r.
pub fn blind_message(secret: &[u8], blinding_factor: &SecretKey) -> Result<BlindedMessage> {
let secp = Secp256k1::new();
// Y = hash_to_curve(secret)
let y = hash_to_curve(secret)?;
// r*G
let r_pub = PublicKey::from_secret_key(&secp, blinding_factor);
// B_ = Y + r*G
let b_prime = PublicKey::combine_keys(&[&y, &r_pub])
.context("Failed to compute blinded message B_ = Y + r*G")?;
Ok(BlindedMessage {
b_prime,
r: *blinding_factor,
secret: secret.to_vec(),
})
}
/// Unblind a mint's blind signature to get the real signature.
///
/// Given C_ (blind signature from mint), r (our blinding factor), and K (mint's pubkey):
/// C = C_ - r*K
pub fn unblind_signature(
c_prime: &PublicKey,
r: &SecretKey,
mint_pubkey: &PublicKey,
) -> Result<PublicKey> {
let secp = Secp256k1::new();
// Compute r*K
let r_scalar =
Scalar::from_be_bytes(r.secret_bytes()).expect("valid secret key is valid scalar");
let r_times_k = mint_pubkey
.mul_tweak(&secp, &r_scalar)
.context("Failed to compute r*K")?;
// Negate to get -(r*K)
let neg_r_times_k = r_times_k.negate(&secp);
// C = C_ + (-(r*K)) = C_ - r*K
let c = PublicKey::combine_keys(&[c_prime, &neg_r_times_k])
.context("Failed to compute C = C_ - r*K")?;
Ok(c)
}
/// Verify that a proof (secret, C) is valid against a mint's public key K.
///
/// Checks: C == k * hash_to_curve(secret) — but since we don't have k (the mint's
/// private key), we verify by checking that the DLEQ proof is valid, or by
/// attempting to swap the token at the mint. This function provides a basic
/// structural check that the proof components are well-formed.
pub fn verify_proof_structure(secret: &[u8], c: &PublicKey) -> Result<bool> {
// Verify that hash_to_curve(secret) produces a valid point
let _y = hash_to_curve(secret)?;
// Verify C is a valid public key (already guaranteed by type, but check non-identity)
let c_bytes = c.serialize();
if c_bytes.iter().all(|&b| b == 0) {
return Ok(false);
}
Ok(true)
}
/// Construct the secret string for a Cashu proof.
/// NUT-10 defines secret as a JSON array: ["P2PK", {nonce, data, tags}]
/// For basic (non-P2PK) proofs, the secret is just a random hex string.
pub fn generate_secret() -> Vec<u8> {
let random_bytes: [u8; 32] = rand::random();
hex::encode(random_bytes).into_bytes()
}
/// Generate a random blinding factor.
pub fn random_blinding_factor() -> SecretKey {
let mut rng = rand::thread_rng();
SecretKey::new(&mut rng)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hash_to_curve_deterministic() {
let msg = b"test_message";
let p1 = hash_to_curve(msg).unwrap();
let p2 = hash_to_curve(msg).unwrap();
assert_eq!(p1, p2);
}
#[test]
fn test_hash_to_curve_different_messages() {
let p1 = hash_to_curve(b"message_a").unwrap();
let p2 = hash_to_curve(b"message_b").unwrap();
assert_ne!(p1, p2);
}
#[test]
fn test_blind_unblind_roundtrip() {
let secp = Secp256k1::new();
let secret = b"test_secret";
let r = random_blinding_factor();
// Simulate mint: k is mint's private key, K = k*G is public key
let k = SecretKey::new(&mut rand::thread_rng());
let k_pub = PublicKey::from_secret_key(&secp, &k);
// Client blinds
let blinded = blind_message(secret, &r).unwrap();
// Mint signs: C_ = k * B_
let k_scalar = Scalar::from_be_bytes(k.secret_bytes()).unwrap();
let c_prime = blinded
.b_prime
.mul_tweak(&secp, &k_scalar)
.unwrap();
// Client unblinds: C = C_ - r*K
let c = unblind_signature(&c_prime, &r, &k_pub).unwrap();
// Verify: C should equal k * hash_to_curve(secret)
let y = hash_to_curve(secret).unwrap();
let expected_c = y.mul_tweak(&secp, &k_scalar).unwrap();
assert_eq!(c, expected_c);
}
#[test]
fn test_generate_secret_length() {
let secret = generate_secret();
// 32 bytes hex-encoded = 64 chars
assert_eq!(secret.len(), 64);
}
#[test]
fn test_generate_secret_unique() {
let s1 = generate_secret();
let s2 = generate_secret();
assert_ne!(s1, s2);
}
#[test]
fn test_verify_proof_structure_valid() {
let secret = generate_secret();
let secp = Secp256k1::new();
let k = SecretKey::new(&mut rand::thread_rng());
let y = hash_to_curve(&secret).unwrap();
let k_scalar = Scalar::from_be_bytes(k.secret_bytes()).unwrap();
let c = y.mul_tweak(&secp, &k_scalar).unwrap();
assert!(verify_proof_structure(&secret, &c).unwrap());
}
}

View File

@@ -0,0 +1,315 @@
//! Cashu token format (NUT-00) — serialization and deserialization.
//!
//! Supports the cashuA (V3) token format:
//! cashuA<base64url_encoded_json>
//!
//! Token JSON structure:
//! {
//! "token": [{ "mint": "<url>", "proofs": [{ "amount": u64, "id": "<keyset>", "secret": "<str>", "C": "<hex>" }] }],
//! "memo": "<optional>"
//! }
use anyhow::{Context, Result};
use bitcoin::secp256k1::PublicKey;
use serde::{Deserialize, Serialize};
/// Prefix for V3 tokens.
const CASHU_A_PREFIX: &str = "cashuA";
/// A single Cashu proof (a signed token for a specific denomination).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Proof {
/// Denomination in the mint's unit (sats).
pub amount: u64,
/// Keyset ID (hex string, e.g. "009a1f293253e41e").
pub id: String,
/// The secret (random hex string or NUT-10 structured secret).
pub secret: String,
/// The unblinded signature C as hex-encoded compressed public key.
#[serde(rename = "C")]
pub c: String,
}
impl Proof {
/// Parse the C field as a secp256k1 PublicKey.
pub fn c_as_pubkey(&self) -> Result<PublicKey> {
let bytes = hex::decode(&self.c).context("Invalid hex in proof C field")?;
PublicKey::from_slice(&bytes).context("Invalid public key in proof C field")
}
}
/// A group of proofs from a single mint.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TokenEntry {
/// Mint URL.
pub mint: String,
/// Proofs from this mint.
pub proofs: Vec<Proof>,
}
/// The full cashuA token envelope.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CashuToken {
/// Token entries grouped by mint.
pub token: Vec<TokenEntry>,
/// Optional memo.
#[serde(skip_serializing_if = "Option::is_none")]
pub memo: Option<String>,
/// Optional unit (e.g. "sat").
#[serde(skip_serializing_if = "Option::is_none")]
pub unit: Option<String>,
}
impl CashuToken {
/// Create a new token with proofs from a single mint.
pub fn new(mint_url: &str, proofs: Vec<Proof>) -> Self {
Self {
token: vec![TokenEntry {
mint: mint_url.to_string(),
proofs,
}],
memo: None,
unit: Some("sat".to_string()),
}
}
/// Total value of all proofs across all mints.
pub fn total_amount(&self) -> u64 {
self.token
.iter()
.flat_map(|e| &e.proofs)
.map(|p| p.amount)
.sum()
}
/// All proofs across all mint entries.
pub fn all_proofs(&self) -> Vec<&Proof> {
self.token.iter().flat_map(|e| &e.proofs).collect()
}
/// All unique mint URLs in this token.
pub fn mint_urls(&self) -> Vec<&str> {
self.token.iter().map(|e| e.mint.as_str()).collect()
}
/// Encode as a cashuA token string.
pub fn serialize(&self) -> Result<String> {
let json = serde_json::to_string(self).context("Failed to serialize token JSON")?;
use base64::Engine;
let encoded = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json.as_bytes());
Ok(format!("{}{}", CASHU_A_PREFIX, encoded))
}
/// Decode a cashuA token string.
pub fn deserialize(token_str: &str) -> Result<Self> {
let payload = token_str
.strip_prefix(CASHU_A_PREFIX)
.ok_or_else(|| anyhow::anyhow!("Token must start with '{}'", CASHU_A_PREFIX))?;
use base64::Engine;
let decoded = base64::engine::general_purpose::URL_SAFE_NO_PAD
.decode(payload)
.or_else(|_| {
// Try standard base64 as fallback (some implementations use it)
base64::engine::general_purpose::URL_SAFE.decode(payload)
})
.or_else(|_| base64::engine::general_purpose::STANDARD.decode(payload))
.context("Invalid base64 in cashuA token")?;
let json_str = String::from_utf8(decoded).context("Invalid UTF-8 in decoded token")?;
let token: CashuToken =
serde_json::from_str(&json_str).context("Invalid JSON in cashuA token")?;
// Structural validation
if token.token.is_empty() {
anyhow::bail!("Token has no entries");
}
for entry in &token.token {
if entry.mint.is_empty() {
anyhow::bail!("Token entry has empty mint URL");
}
if entry.proofs.is_empty() {
anyhow::bail!("Token entry has no proofs");
}
for proof in &entry.proofs {
if proof.amount == 0 {
anyhow::bail!("Proof has zero amount");
}
if proof.secret.is_empty() {
anyhow::bail!("Proof has empty secret");
}
if proof.c.is_empty() {
anyhow::bail!("Proof has empty C");
}
}
}
Ok(token)
}
}
/// Keyset info returned by a mint.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KeysetInfo {
pub id: String,
pub unit: String,
pub active: bool,
}
/// Mint keyset: maps denomination amounts to public keys.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MintKeyset {
pub id: String,
/// Map of amount (as string) to hex-encoded public key.
pub keys: std::collections::HashMap<String, String>,
}
impl MintKeyset {
/// Get the mint's public key for a given denomination amount.
pub fn key_for_amount(&self, amount: u64) -> Result<PublicKey> {
let amount_str = amount.to_string();
let hex_key = self
.keys
.get(&amount_str)
.ok_or_else(|| anyhow::anyhow!("No key for amount {} in keyset {}", amount, self.id))?;
let bytes = hex::decode(hex_key).context("Invalid hex in mint pubkey")?;
PublicKey::from_slice(&bytes).context("Invalid pubkey in mint keyset")
}
}
/// Blinded message sent to the mint during mint/swap.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlindedMessageRequest {
/// Amount for this output.
pub amount: u64,
/// Keyset ID to use.
pub id: String,
/// Blinded secret B_ as hex-encoded compressed pubkey.
#[serde(rename = "B_")]
pub b_prime: String,
}
/// Blind signature returned by the mint.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlindSignature {
/// Amount signed.
pub amount: u64,
/// Keyset ID.
pub id: String,
/// Blind signature C_ as hex-encoded compressed pubkey.
#[serde(rename = "C_")]
pub c_prime: String,
}
impl BlindSignature {
/// Parse C_ as a secp256k1 PublicKey.
pub fn c_prime_as_pubkey(&self) -> Result<PublicKey> {
let bytes = hex::decode(&self.c_prime).context("Invalid hex in blind signature C_")?;
PublicKey::from_slice(&bytes).context("Invalid pubkey in blind signature C_")
}
}
/// Split a target amount into powers of 2 (Cashu denomination scheme).
/// E.g., 13 -> [1, 4, 8]
pub fn amount_to_denominations(mut amount: u64) -> Vec<u64> {
let mut denoms = Vec::new();
let mut bit = 0;
while amount > 0 {
if amount & 1 == 1 {
denoms.push(1u64 << bit);
}
amount >>= 1;
bit += 1;
}
denoms
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serialize_deserialize_roundtrip() {
let token = CashuToken {
token: vec![TokenEntry {
mint: "http://127.0.0.1:8175".to_string(),
proofs: vec![Proof {
amount: 8,
id: "009a1f293253e41e".to_string(),
secret: "abcdef1234567890".to_string(),
c: "02a9acc1e48c25eeeb9289b5031cc57da9fe72f3fe2861d94ec4da0e7f6c2b4e24".to_string(),
}],
}],
memo: Some("test token".to_string()),
unit: Some("sat".to_string()),
};
let encoded = token.serialize().unwrap();
assert!(encoded.starts_with("cashuA"));
let decoded = CashuToken::deserialize(&encoded).unwrap();
assert_eq!(decoded.total_amount(), 8);
assert_eq!(decoded.token[0].mint, "http://127.0.0.1:8175");
assert_eq!(decoded.token[0].proofs[0].secret, "abcdef1234567890");
assert_eq!(decoded.memo, Some("test token".to_string()));
}
#[test]
fn test_total_amount_multi_proof() {
let token = CashuToken {
token: vec![TokenEntry {
mint: "http://mint".to_string(),
proofs: vec![
Proof { amount: 1, id: "id1".into(), secret: "s1".into(), c: "02a9acc1e48c25eeeb9289b5031cc57da9fe72f3fe2861d94ec4da0e7f6c2b4e24".into() },
Proof { amount: 4, id: "id1".into(), secret: "s2".into(), c: "02a9acc1e48c25eeeb9289b5031cc57da9fe72f3fe2861d94ec4da0e7f6c2b4e24".into() },
Proof { amount: 8, id: "id1".into(), secret: "s3".into(), c: "02a9acc1e48c25eeeb9289b5031cc57da9fe72f3fe2861d94ec4da0e7f6c2b4e24".into() },
],
}],
memo: None,
unit: None,
};
assert_eq!(token.total_amount(), 13);
}
#[test]
fn test_deserialize_rejects_empty_token() {
let bad = CashuToken { token: vec![], memo: None, unit: None };
let encoded = bad.serialize().unwrap();
let result = CashuToken::deserialize(&encoded);
assert!(result.is_err());
}
#[test]
fn test_deserialize_rejects_invalid_prefix() {
let result = CashuToken::deserialize("cashuBabc123");
assert!(result.is_err());
}
#[test]
fn test_amount_to_denominations() {
assert_eq!(amount_to_denominations(0), Vec::<u64>::new());
assert_eq!(amount_to_denominations(1), vec![1]);
assert_eq!(amount_to_denominations(13), vec![1, 4, 8]);
assert_eq!(amount_to_denominations(21), vec![1, 4, 16]);
assert_eq!(amount_to_denominations(64), vec![64]);
assert_eq!(amount_to_denominations(255), vec![1, 2, 4, 8, 16, 32, 64, 128]);
}
#[test]
fn test_amount_to_denominations_large() {
let denoms = amount_to_denominations(1_000_000);
let sum: u64 = denoms.iter().sum();
assert_eq!(sum, 1_000_000);
}
#[test]
fn test_proof_c_as_pubkey() {
let proof = Proof {
amount: 1,
id: "test".into(),
secret: "s".into(),
c: "02a9acc1e48c25eeeb9289b5031cc57da9fe72f3fe2861d94ec4da0e7f6c2b4e24".to_string(),
};
assert!(proof.c_as_pubkey().is_ok());
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,461 @@
//! HTTP client for Cashu mint API (NUT-01 through NUT-06).
//!
//! Communicates with a Cashu-compatible mint for:
//! - Keyset discovery (GET /v1/keys, /v1/keysets)
//! - Mint quotes and minting (POST /v1/mint/quote/bolt11, /v1/mint/bolt11)
//! - Melt quotes and melting (POST /v1/melt/quote/bolt11, /v1/melt/bolt11)
//! - Token swaps (POST /v1/swap)
//! - Proof state checks (POST /v1/checkstate)
use super::bdhke;
use super::cashu::{
amount_to_denominations, BlindSignature, BlindedMessageRequest, CashuToken, MintKeyset, Proof,
};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use tracing::debug;
/// Default timeout for mint API calls.
const MINT_TIMEOUT_SECS: u64 = 10;
/// Timeout for heavy operations (minting with Lightning payment).
const MINT_HEAVY_TIMEOUT_SECS: u64 = 30;
/// Mint quote response (NUT-04).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MintQuote {
pub quote: String,
pub request: String, // BOLT11 Lightning invoice
pub state: String, // "UNPAID", "PAID", "ISSUED"
#[serde(default)]
pub expiry: u64,
}
/// Melt quote response (NUT-05).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MeltQuote {
pub quote: String,
pub amount: u64,
pub fee_reserve: u64,
pub state: String, // "UNPAID", "PENDING", "PAID"
#[serde(default)]
pub expiry: u64,
}
/// Token state from checkstate (NUT-07).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProofState {
#[serde(rename = "Y")]
pub y: String,
pub state: String, // "UNSPENT", "SPENT", "PENDING"
}
/// Result of a swap operation.
pub struct SwapResult {
pub new_proofs: Vec<Proof>,
}
/// Result of a mint operation.
pub struct MintResult {
pub proofs: Vec<Proof>,
}
/// HTTP client for a single Cashu mint.
pub struct MintClient {
url: String,
client: reqwest::Client,
}
impl MintClient {
/// Create a new mint client for the given mint URL.
pub fn new(mint_url: &str) -> Result<Self> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(MINT_TIMEOUT_SECS))
.build()
.context("Failed to build HTTP client for mint")?;
Ok(Self {
url: mint_url.trim_end_matches('/').to_string(),
client,
})
}
/// Create a mint client with a custom reqwest client (e.g., for Tor proxy).
pub fn with_client(mint_url: &str, client: reqwest::Client) -> Self {
Self {
url: mint_url.trim_end_matches('/').to_string(),
client,
}
}
pub fn url(&self) -> &str {
&self.url
}
// ── Keyset discovery (NUT-01, NUT-02) ──
/// Fetch the active keyset from the mint.
pub async fn get_keys(&self) -> Result<Vec<MintKeyset>> {
let url = format!("{}/v1/keys", self.url);
let res = self
.client
.get(&url)
.send()
.await
.context("Failed to fetch mint keys")?;
if !res.status().is_success() {
anyhow::bail!("Mint keys request failed: {}", res.status());
}
let body: serde_json::Value = res.json().await.context("Failed to parse mint keys")?;
let keysets: Vec<MintKeyset> = serde_json::from_value(
body.get("keysets")
.cloned()
.unwrap_or(serde_json::json!([])),
)
.context("Failed to parse keysets")?;
Ok(keysets)
}
/// Get the active keyset for the "sat" unit.
pub async fn get_active_sat_keyset(&self) -> Result<MintKeyset> {
let keysets = self.get_keys().await?;
keysets
.into_iter()
.find(|k| {
// Find active sat keyset — check keys map is non-empty
!k.keys.is_empty()
})
.ok_or_else(|| anyhow::anyhow!("No active keyset found at mint {}", self.url))
}
// ── Mint quotes (NUT-04) ──
/// Request a mint quote — returns a Lightning invoice to pay.
pub async fn mint_quote(&self, amount: u64) -> Result<MintQuote> {
let url = format!("{}/v1/mint/quote/bolt11", self.url);
let res = self
.client
.post(&url)
.json(&serde_json::json!({ "amount": amount, "unit": "sat" }))
.send()
.await
.context("Failed to request mint quote")?;
if !res.status().is_success() {
let status = res.status();
let body = res.text().await.unwrap_or_default();
anyhow::bail!("Mint quote failed ({}): {}", status, body);
}
res.json().await.context("Failed to parse mint quote")
}
/// Check the status of a mint quote.
pub async fn mint_quote_status(&self, quote_id: &str) -> Result<MintQuote> {
let url = format!("{}/v1/mint/quote/bolt11/{}", self.url, quote_id);
let res = self
.client
.get(&url)
.send()
.await
.context("Failed to check mint quote status")?;
if !res.status().is_success() {
anyhow::bail!("Mint quote status check failed: {}", res.status());
}
res.json().await.context("Failed to parse mint quote status")
}
/// Mint tokens after Lightning invoice has been paid.
/// Performs BDHKE blinding, sends blinded messages to mint, unblinds signatures.
pub async fn mint_tokens(&self, quote_id: &str, amount: u64) -> Result<MintResult> {
let keyset = self.get_active_sat_keyset().await?;
let denominations = amount_to_denominations(amount);
let mut blinded_messages = Vec::new();
let mut blinding_data = Vec::new(); // (secret, blinding_factor, amount)
for &denom in &denominations {
let secret = bdhke::generate_secret();
let r = bdhke::random_blinding_factor();
let blinded = bdhke::blind_message(&secret, &r)?;
blinded_messages.push(BlindedMessageRequest {
amount: denom,
id: keyset.id.clone(),
b_prime: hex::encode(blinded.b_prime.serialize()),
});
blinding_data.push((secret, r, denom));
}
let url = format!("{}/v1/mint/bolt11", self.url);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(MINT_HEAVY_TIMEOUT_SECS))
.build()
.context("Failed to build client for mint operation")?;
let res = client
.post(&url)
.json(&serde_json::json!({
"quote": quote_id,
"outputs": blinded_messages,
}))
.send()
.await
.context("Failed to mint tokens")?;
if !res.status().is_success() {
let status = res.status();
let body = res.text().await.unwrap_or_default();
anyhow::bail!("Mint tokens failed ({}): {}", status, body);
}
let body: serde_json::Value = res.json().await.context("Failed to parse mint response")?;
let signatures: Vec<BlindSignature> = serde_json::from_value(
body.get("signatures")
.cloned()
.unwrap_or(serde_json::json!([])),
)
.context("Failed to parse blind signatures")?;
if signatures.len() != blinding_data.len() {
anyhow::bail!(
"Mint returned {} signatures, expected {}",
signatures.len(),
blinding_data.len()
);
}
// Unblind signatures to get real proofs
let mut proofs = Vec::new();
for (sig, (secret, r, amount)) in signatures.iter().zip(blinding_data.iter()) {
let c_prime = sig.c_prime_as_pubkey()?;
let mint_key = keyset.key_for_amount(*amount)?;
let c = bdhke::unblind_signature(&c_prime, r, &mint_key)?;
proofs.push(Proof {
amount: *amount,
id: keyset.id.clone(),
secret: String::from_utf8_lossy(secret).to_string(),
c: hex::encode(c.serialize()),
});
}
debug!("Minted {} proofs totaling {} sats", proofs.len(), amount);
Ok(MintResult { proofs })
}
// ── Melt (NUT-05) ──
/// Request a melt quote — how much it costs to pay a Lightning invoice.
pub async fn melt_quote(&self, bolt11: &str) -> Result<MeltQuote> {
let url = format!("{}/v1/melt/quote/bolt11", self.url);
let res = self
.client
.post(&url)
.json(&serde_json::json!({ "request": bolt11, "unit": "sat" }))
.send()
.await
.context("Failed to request melt quote")?;
if !res.status().is_success() {
let status = res.status();
let body = res.text().await.unwrap_or_default();
anyhow::bail!("Melt quote failed ({}): {}", status, body);
}
res.json().await.context("Failed to parse melt quote")
}
/// Melt tokens — pay a Lightning invoice using ecash proofs.
pub async fn melt_tokens(&self, quote_id: &str, proofs: &[Proof]) -> Result<MeltQuote> {
let url = format!("{}/v1/melt/bolt11", self.url);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(MINT_HEAVY_TIMEOUT_SECS))
.build()
.context("Failed to build client for melt operation")?;
let res = client
.post(&url)
.json(&serde_json::json!({
"quote": quote_id,
"inputs": proofs,
}))
.send()
.await
.context("Failed to melt tokens")?;
if !res.status().is_success() {
let status = res.status();
let body = res.text().await.unwrap_or_default();
anyhow::bail!("Melt failed ({}): {}", status, body);
}
res.json().await.context("Failed to parse melt response")
}
// ── Swap (NUT-03) ──
/// Swap proofs for new proofs of different denominations.
/// This is how we "receive" a token — swap it for fresh proofs that only we know.
pub async fn swap(&self, inputs: &[Proof], target_amounts: &[u64]) -> Result<SwapResult> {
let keyset = self.get_active_sat_keyset().await?;
let mut blinded_messages = Vec::new();
let mut blinding_data = Vec::new();
for &amount in target_amounts {
let secret = bdhke::generate_secret();
let r = bdhke::random_blinding_factor();
let blinded = bdhke::blind_message(&secret, &r)?;
blinded_messages.push(BlindedMessageRequest {
amount,
id: keyset.id.clone(),
b_prime: hex::encode(blinded.b_prime.serialize()),
});
blinding_data.push((secret, r, amount));
}
let url = format!("{}/v1/swap", self.url);
let res = self
.client
.post(&url)
.json(&serde_json::json!({
"inputs": inputs,
"outputs": blinded_messages,
}))
.send()
.await
.context("Failed to swap tokens")?;
if !res.status().is_success() {
let status = res.status();
let body = res.text().await.unwrap_or_default();
anyhow::bail!("Swap failed ({}): {}", status, body);
}
let body: serde_json::Value = res.json().await.context("Failed to parse swap response")?;
let signatures: Vec<BlindSignature> = serde_json::from_value(
body.get("signatures")
.cloned()
.unwrap_or(serde_json::json!([])),
)
.context("Failed to parse swap signatures")?;
if signatures.len() != blinding_data.len() {
anyhow::bail!(
"Swap returned {} signatures, expected {}",
signatures.len(),
blinding_data.len()
);
}
let mut new_proofs = Vec::new();
for (sig, (secret, r, amount)) in signatures.iter().zip(blinding_data.iter()) {
let c_prime = sig.c_prime_as_pubkey()?;
let mint_key = keyset.key_for_amount(*amount)?;
let c = bdhke::unblind_signature(&c_prime, r, &mint_key)?;
new_proofs.push(Proof {
amount: *amount,
id: keyset.id.clone(),
secret: String::from_utf8_lossy(secret).to_string(),
c: hex::encode(c.serialize()),
});
}
debug!(
"Swapped {} inputs for {} new proofs",
inputs.len(),
new_proofs.len()
);
Ok(SwapResult { new_proofs })
}
// ── Check state (NUT-07) ──
/// Check whether proofs are spent, unspent, or pending.
pub async fn check_state(&self, proofs: &[Proof]) -> Result<Vec<ProofState>> {
// Compute Y = hash_to_curve(secret) for each proof
let ys: Vec<String> = proofs
.iter()
.map(|p| {
let y = bdhke::hash_to_curve(p.secret.as_bytes())?;
Ok(hex::encode(y.serialize()))
})
.collect::<Result<Vec<_>>>()?;
let url = format!("{}/v1/checkstate", self.url);
let res = self
.client
.post(&url)
.json(&serde_json::json!({ "Ys": ys }))
.send()
.await
.context("Failed to check proof state")?;
if !res.status().is_success() {
anyhow::bail!("Check state failed: {}", res.status());
}
let body: serde_json::Value =
res.json().await.context("Failed to parse checkstate response")?;
let states: Vec<ProofState> = serde_json::from_value(
body.get("states")
.cloned()
.unwrap_or(serde_json::json!([])),
)
.context("Failed to parse proof states")?;
Ok(states)
}
/// Receive a CashuToken by swapping its proofs for fresh ones.
/// This prevents double-spend and ensures only we can spend the new proofs.
pub async fn receive_token(&self, token: &CashuToken) -> Result<Vec<Proof>> {
let mut all_new_proofs = Vec::new();
for entry in &token.token {
if entry.mint != self.url {
debug!(
"Skipping proofs from different mint {} (ours: {})",
entry.mint, self.url
);
continue;
}
let total: u64 = entry.proofs.iter().map(|p| p.amount).sum();
let target_amounts = amount_to_denominations(total);
let result = self.swap(&entry.proofs, &target_amounts).await?;
all_new_proofs.extend(result.new_proofs);
}
if all_new_proofs.is_empty() {
anyhow::bail!("No proofs could be swapped — mint mismatch or empty token");
}
Ok(all_new_proofs)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mint_client_url_normalization() {
let client = MintClient::new("http://mint.example.com/").unwrap();
assert_eq!(client.url(), "http://mint.example.com");
}
#[test]
fn test_mint_client_url_no_trailing_slash() {
let client = MintClient::new("http://mint.example.com").unwrap();
assert_eq!(client.url(), "http://mint.example.com");
}
}

View File

@@ -1,2 +1,5 @@
pub mod bdhke;
pub mod cashu;
pub mod ecash;
pub mod mint_client;
pub mod profits;

View File

@@ -19,6 +19,9 @@ pub struct ProfitsSummary {
pub content_sales_sats: u64,
/// Earnings from Lightning routing fees.
pub routing_fees_sats: u64,
/// Earnings from streaming data payments.
#[serde(default)]
pub streaming_revenue_sats: u64,
/// Recent earning entries (newest first).
pub recent: Vec<ProfitEntry>,
}
@@ -38,6 +41,7 @@ pub struct ProfitEntry {
pub enum ProfitSource {
ContentSale,
RoutingFee,
StreamingRevenue,
}
/// Load profits summary from disk.
@@ -84,7 +88,7 @@ pub async fn record_content_sale(data_dir: &Path, amount_sats: u64, description:
summary.recent.truncate(100);
}
summary.content_sales_sats += amount_sats;
summary.total_sats = summary.content_sales_sats + summary.routing_fees_sats;
summary.total_sats = summary.content_sales_sats + summary.routing_fees_sats + summary.streaming_revenue_sats;
save_profits(data_dir, &summary).await?;
Ok(())
}
@@ -93,8 +97,9 @@ pub async fn record_content_sale(data_dir: &Path, amount_sats: u64, description:
pub async fn get_networking_profits(data_dir: &Path) -> Result<ProfitsSummary> {
let mut summary = load_profits(data_dir).await?;
// Also count ecash "receive" transactions as content sales revenue
// Count ecash transactions by type
let wallet = ecash::load_wallet(data_dir).await?;
let ecash_received: u64 = wallet
.transactions
.iter()
@@ -102,11 +107,22 @@ pub async fn get_networking_profits(data_dir: &Path) -> Result<ProfitsSummary> {
.map(|tx| tx.amount_sats)
.sum();
let streaming_received: u64 = wallet
.transactions
.iter()
.filter(|tx| matches!(tx.tx_type, ecash::TransactionType::StreamingRevenue))
.map(|tx| tx.amount_sats)
.sum();
// Use the higher of tracked profits or ecash receives as content sales
if ecash_received > summary.content_sales_sats {
summary.content_sales_sats = ecash_received;
}
summary.total_sats = summary.content_sales_sats + summary.routing_fees_sats;
if streaming_received > summary.streaming_revenue_sats {
summary.streaming_revenue_sats = streaming_received;
}
summary.total_sats =
summary.content_sales_sats + summary.routing_fees_sats + summary.streaming_revenue_sats;
Ok(summary)
}
@@ -142,6 +158,7 @@ mod tests {
total_sats: 5000,
content_sales_sats: 3000,
routing_fees_sats: 2000,
streaming_revenue_sats: 0,
recent: vec![ProfitEntry {
source: ProfitSource::ContentSale,
amount_sats: 3000,