feat: streaming ecash payments + media playback overhaul

Cashu ecash protocol (BDHKE blind signatures, cashuA token format,
mint HTTP client) replacing the stub wallet. TollGate-inspired streaming
data payment system with step-based pricing (bytes/time/requests),
session management with incremental top-ups, usage metering, and
Nostr kind 10021 service advertisements.

13 new streaming.* RPC endpoints. Content server now verifies real
Cashu tokens. Profits tracking includes streaming revenue.

Frontend: GlobalAudioPlayer (persistent bottom bar across all pages),
video lightbox with full controls, audio in MediaLightbox, free file
previews (no blur), paid 10% audio/video previews, separated play
vs download buttons in PeerFiles.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian
2026-04-11 22:31:28 -04:00
parent 90506ee52c
commit ffd57ad29d
25 changed files with 4740 additions and 480 deletions

View File

@@ -187,12 +187,29 @@ impl RpcHandler {
// Ecash wallet
"wallet.ecash-balance" => self.handle_wallet_ecash_balance().await,
"wallet.ecash-mint" => self.handle_wallet_ecash_mint(params).await,
"wallet.ecash-mint-claim" => self.handle_wallet_ecash_mint_claim(params).await,
"wallet.ecash-melt" => self.handle_wallet_ecash_melt(params).await,
"wallet.ecash-melt-confirm" => self.handle_wallet_ecash_melt_confirm(params).await,
"wallet.ecash-send" => self.handle_wallet_ecash_send(params).await,
"wallet.ecash-receive" => self.handle_wallet_ecash_receive(params).await,
"wallet.ecash-history" => self.handle_wallet_ecash_history().await,
"wallet.networking-profits" => self.handle_wallet_networking_profits().await,
// Streaming ecash payments
"streaming.list-services" => self.handle_streaming_list_services().await,
"streaming.configure-service" => self.handle_streaming_configure_service(params).await,
"streaming.toggle-service" => self.handle_streaming_toggle_service(params).await,
"streaming.pay" => self.handle_streaming_pay(params).await,
"streaming.discover" => self.handle_streaming_discover().await,
"streaming.usage" => self.handle_streaming_usage(params).await,
"streaming.session" => self.handle_streaming_session(params).await,
"streaming.list-sessions" => self.handle_streaming_list_sessions().await,
"streaming.close-session" => self.handle_streaming_close_session(params).await,
"streaming.advertise" => self.handle_streaming_advertise().await,
"streaming.list-mints" => self.handle_streaming_list_mints().await,
"streaming.configure-mints" => self.handle_streaming_configure_mints(params).await,
"streaming.maintenance" => self.handle_streaming_maintenance().await,
// Content catalog management
"content.list-mine" => self.handle_content_list_mine().await,
"content.add" => self.handle_content_add(params).await,
@@ -201,6 +218,8 @@ impl RpcHandler {
"content.set-availability" => self.handle_content_set_availability(params).await,
"content.browse-peer" => self.handle_content_browse_peer(params).await,
"content.download-peer" => self.handle_content_download_peer(params).await,
"content.download-peer-paid" => self.handle_content_download_peer_paid(params).await,
"content.preview-peer" => self.handle_content_preview_peer(params).await,
// DWN (Decentralized Web Node)
"dwn.status" => self.handle_dwn_status().await,

View File

@@ -26,6 +26,7 @@ mod response;
mod router;
mod seed_rpc;
mod security;
mod streaming;
mod tor;
mod transport;
mod totp;

View File

@@ -0,0 +1,411 @@
//! RPC handlers for streaming ecash payments.
//!
//! Endpoints for managing priced services, processing payments,
//! checking sessions/usage, and publishing service advertisements.
use super::RpcHandler;
use crate::streaming::{advertisement, gate, meter, pricing, session};
use crate::wallet::ecash;
use anyhow::Result;
impl RpcHandler {
// ── Service pricing management ──
/// List all configured streaming services and their pricing.
pub(super) async fn handle_streaming_list_services(&self) -> Result<serde_json::Value> {
let config = pricing::load_pricing(&self.config.data_dir).await?;
Ok(serde_json::json!({
"services": config.services,
}))
}
/// Configure pricing for a streaming service.
pub(super) async fn handle_streaming_configure_service(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let service_id = params
.get("service_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing service_id"))?;
let name = params
.get("name")
.and_then(|v| v.as_str())
.unwrap_or(service_id);
let metric_str = params
.get("metric")
.and_then(|v| v.as_str())
.unwrap_or("requests");
let step_size = params
.get("step_size")
.and_then(|v| v.as_u64())
.unwrap_or(1);
let price_per_step = params
.get("price_per_step")
.and_then(|v| v.as_u64())
.unwrap_or(1);
let min_steps = params
.get("min_steps")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let enabled = params
.get("enabled")
.and_then(|v| v.as_bool())
.unwrap_or(true);
let description = params
.get("description")
.and_then(|v| v.as_str())
.unwrap_or("");
let metric = match metric_str {
"bytes" => pricing::Metric::Bytes,
"milliseconds" | "time" => pricing::Metric::Milliseconds,
"requests" => pricing::Metric::Requests,
_ => return Err(anyhow::anyhow!("Invalid metric: {}", metric_str)),
};
let accepted_mints: Vec<String> = params
.get("accepted_mints")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let service = pricing::ServicePricing {
service_id: service_id.to_string(),
name: name.to_string(),
metric,
step_size,
price_per_step,
min_steps,
enabled,
description: description.to_string(),
accepted_mints,
};
service.validate()?;
let mut config = pricing::load_pricing(&self.config.data_dir).await?;
// Update existing or add new
if let Some(existing) = config
.services
.iter_mut()
.find(|s| s.service_id == service_id)
{
*existing = service.clone();
} else {
config.services.push(service.clone());
}
pricing::save_pricing(&self.config.data_dir, &config).await?;
Ok(serde_json::json!({
"service": service,
"updated": true,
}))
}
/// Enable or disable a streaming service.
pub(super) async fn handle_streaming_toggle_service(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let service_id = params
.get("service_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing service_id"))?;
let enabled = params
.get("enabled")
.and_then(|v| v.as_bool())
.ok_or_else(|| anyhow::anyhow!("Missing enabled"))?;
let mut config = pricing::load_pricing(&self.config.data_dir).await?;
if let Some(service) = config
.services
.iter_mut()
.find(|s| s.service_id == service_id)
{
service.enabled = enabled;
pricing::save_pricing(&self.config.data_dir, &config).await?;
Ok(serde_json::json!({
"service_id": service_id,
"enabled": enabled,
}))
} else {
Err(anyhow::anyhow!("Service '{}' not found", service_id))
}
}
// ── Payment processing ──
/// Process a streaming payment — submit a Cashu token for a service.
/// Returns session details with allotment on success.
pub(super) async fn handle_streaming_pay(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let service_id = params
.get("service_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing service_id"))?;
let token = params
.get("token")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing token (cashuA token string)"))?;
let peer_id = params
.get("peer_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing peer_id"))?;
if token.is_empty() {
return Err(anyhow::anyhow!("Token cannot be empty"));
}
if peer_id.is_empty() {
return Err(anyhow::anyhow!("Peer ID cannot be empty"));
}
let result =
gate::check_gate(&self.config.data_dir, peer_id, service_id, Some(token), 0).await?;
match result {
gate::GateResult::PaidAndAllowed {
session_id,
allotment,
paid_sats,
} => Ok(serde_json::json!({
"status": "paid",
"session_id": session_id,
"allotment": allotment,
"paid_sats": paid_sats,
})),
gate::GateResult::InsufficientPayment {
provided_sats,
minimum_sats,
} => Ok(serde_json::json!({
"status": "insufficient",
"error": { "code": "insufficient_payment", "message": format!("Need {} sats, got {}", minimum_sats, provided_sats) },
"minimum_sats": minimum_sats,
"provided_sats": provided_sats,
})),
gate::GateResult::PaymentFailed { reason } => Ok(serde_json::json!({
"status": "failed",
"error": { "code": "payment_failed", "message": reason },
})),
gate::GateResult::ServiceUnavailable => {
Err(anyhow::anyhow!("Service '{}' not available", service_id))
}
_ => Err(anyhow::anyhow!("Unexpected gate result")),
}
}
/// Discover available streaming services (pricing info).
/// This is the unauthenticated discovery endpoint.
pub(super) async fn handle_streaming_discover(&self) -> Result<serde_json::Value> {
let config = pricing::load_pricing(&self.config.data_dir).await?;
let accepted_mints = ecash::load_accepted_mints(&self.config.data_dir).await?;
let services: Vec<serde_json::Value> = config
.services
.iter()
.filter(|s| s.enabled)
.map(|s| {
let mints = if s.accepted_mints.is_empty() {
&accepted_mints.mints
} else {
&s.accepted_mints
};
serde_json::json!({
"service_id": s.service_id,
"name": s.name,
"description": s.description,
"metric": s.metric,
"step_size": s.step_size,
"price_per_step": s.price_per_step,
"min_steps": s.min_steps,
"minimum_sats": s.minimum_payment(),
"accepted_mints": mints,
})
})
.collect();
Ok(serde_json::json!({
"services": services,
}))
}
// ── Session management ──
/// Check usage for a peer's active session.
pub(super) async fn handle_streaming_usage(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let peer_id = params
.get("peer_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing peer_id"))?;
let service_id = params
.get("service_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing service_id"))?;
match meter::get_peer_usage(&self.config.data_dir, peer_id, service_id).await? {
Some(usage) => Ok(serde_json::json!({ "usage": usage })),
None => Ok(serde_json::json!({
"usage": null,
"message": "No active session",
})),
}
}
/// Get details of a specific session by ID.
pub(super) async fn handle_streaming_session(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let session_id = params
.get("session_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing session_id"))?;
let store = session::load_sessions(&self.config.data_dir).await?;
match store.get(session_id) {
Some(s) => Ok(serde_json::json!({ "session": s })),
None => Err(anyhow::anyhow!("Session not found")),
}
}
/// List all active streaming sessions (admin view).
pub(super) async fn handle_streaming_list_sessions(&self) -> Result<serde_json::Value> {
let store = session::load_sessions(&self.config.data_dir).await?;
let active = store.active_sessions();
let revenue = store.total_revenue();
let by_service = store.revenue_by_service();
Ok(serde_json::json!({
"sessions": active,
"total_active": active.len(),
"total_revenue_sats": revenue,
"revenue_by_service": by_service,
}))
}
/// Close a specific session.
pub(super) async fn handle_streaming_close_session(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let session_id = params
.get("session_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing session_id"))?;
let mut store = session::load_sessions(&self.config.data_dir).await?;
if let Some(s) = store.get_mut(session_id) {
s.close();
session::save_sessions(&self.config.data_dir, &store).await?;
Ok(serde_json::json!({ "closed": true }))
} else {
Err(anyhow::anyhow!("Session not found"))
}
}
// ── Advertisement ──
/// Publish a streaming service advertisement to Nostr relays.
pub(super) async fn handle_streaming_advertise(&self) -> Result<serde_json::Value> {
let config = pricing::load_pricing(&self.config.data_dir).await?;
let accepted_mints = ecash::load_accepted_mints(&self.config.data_dir).await?;
let enabled_count = config.services.iter().filter(|s| s.enabled).count();
if enabled_count == 0 {
return Err(anyhow::anyhow!(
"No enabled services to advertise"
));
}
// Get node's onion address for the endpoint tag
let onion = crate::container::docker_packages::read_tor_address("archipelago").await;
let tags = advertisement::build_advertisement_tags(
&config,
&accepted_mints.mints,
onion.as_deref(),
);
let content = advertisement::build_advertisement_content(&config);
Ok(serde_json::json!({
"kind": advertisement::KIND_SERVICE_ADVERTISEMENT,
"content": content,
"tags": tags,
"services_count": enabled_count,
"ready_to_publish": true,
}))
}
// ── Accepted mints management ──
/// List accepted mints for streaming payments.
pub(super) async fn handle_streaming_list_mints(&self) -> Result<serde_json::Value> {
let mints = ecash::load_accepted_mints(&self.config.data_dir).await?;
Ok(serde_json::json!({ "mints": mints.mints }))
}
/// Add or remove accepted mints.
pub(super) async fn handle_streaming_configure_mints(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let mints = params
.get("mints")
.and_then(|v| v.as_array())
.ok_or_else(|| anyhow::anyhow!("Missing mints array"))?;
let mint_urls: Vec<String> = mints
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
if mint_urls.is_empty() {
return Err(anyhow::anyhow!("Must have at least one accepted mint"));
}
// Basic validation
for url in &mint_urls {
if !url.starts_with("http://") && !url.starts_with("https://") {
return Err(anyhow::anyhow!("Invalid mint URL: {}", url));
}
}
let config = ecash::AcceptedMints {
mints: mint_urls.clone(),
};
ecash::save_accepted_mints(&self.config.data_dir, &config).await?;
Ok(serde_json::json!({
"mints": mint_urls,
"updated": true,
}))
}
// ── Maintenance ──
/// Run streaming maintenance (close expired sessions, prune old records).
pub(super) async fn handle_streaming_maintenance(&self) -> Result<serde_json::Value> {
let closed = meter::maintenance(&self.config.data_dir).await?;
Ok(serde_json::json!({
"expired_closed": closed,
}))
}
}

View File

@@ -9,7 +9,8 @@ impl RpcHandler {
let wallet = ecash::load_wallet(&self.config.data_dir).await?;
Ok(serde_json::json!({
"balance_sats": wallet.balance(),
"token_count": wallet.tokens.iter().filter(|t| !t.spent).count(),
"proof_count": wallet.proofs.iter().filter(|p| !p.spent && !p.reserved).count(),
"mint_url": wallet.mint_url,
}))
}
@@ -27,10 +28,36 @@ impl RpcHandler {
return Err(anyhow::anyhow!("Amount must be between 1 and 1,000,000 sats"));
}
let token = ecash::mint_tokens(&self.config.data_dir, amount_sats).await?;
// Step 1: Get a mint quote (returns Lightning invoice)
let quote = ecash::mint_quote(&self.config.data_dir, amount_sats).await?;
Ok(serde_json::json!({
"token_id": token.id,
"amount_sats": token.amount_sats,
"quote_id": quote.quote,
"bolt11": quote.request,
"state": quote.state,
"amount_sats": amount_sats,
"message": "Pay the Lightning invoice, then call wallet.ecash-mint-claim with the quote_id",
}))
}
/// Claim minted tokens after paying the Lightning invoice.
pub(super) async fn handle_wallet_ecash_mint_claim(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let quote_id = params
.get("quote_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing quote_id"))?;
let amount_sats = params
.get("amount_sats")
.and_then(|v| v.as_u64())
.ok_or_else(|| anyhow::anyhow!("Missing amount_sats"))?;
let minted = ecash::mint_tokens(&self.config.data_dir, quote_id, amount_sats).await?;
Ok(serde_json::json!({
"minted_sats": minted,
}))
}
@@ -39,14 +66,41 @@ impl RpcHandler {
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let token_id = params
.get("token_id")
let bolt11 = params
.get("bolt11")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing token_id"))?;
.ok_or_else(|| anyhow::anyhow!("Missing bolt11 (Lightning invoice)"))?;
// Step 1: Get melt quote
let quote = ecash::melt_quote(&self.config.data_dir, bolt11).await?;
let amount = ecash::melt_tokens(&self.config.data_dir, token_id).await?;
Ok(serde_json::json!({
"melted_sats": amount,
"quote_id": quote.quote,
"amount_sats": quote.amount,
"fee_reserve_sats": quote.fee_reserve,
"total_needed_sats": quote.amount + quote.fee_reserve,
"message": "Call wallet.ecash-melt-confirm with quote_id and bolt11 to execute",
}))
}
/// Confirm and execute a melt (pay Lightning invoice with ecash).
pub(super) async fn handle_wallet_ecash_melt_confirm(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let quote_id = params
.get("quote_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing quote_id"))?;
let bolt11 = params
.get("bolt11")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing bolt11"))?;
let melted = ecash::melt_tokens(&self.config.data_dir, quote_id, bolt11).await?;
Ok(serde_json::json!({
"melted_sats": melted,
}))
}
@@ -100,6 +154,7 @@ impl RpcHandler {
"total_sats": summary.total_sats,
"content_sales_sats": summary.content_sales_sats,
"routing_fees_sats": summary.routing_fees_sats,
"streaming_revenue_sats": summary.streaming_revenue_sats,
"recent": summary.recent,
}))
}