refactor: split remote relay into own module, add lifecycle reconnect

- Move handle_remote_relay from remote_input.rs to remote_relay.rs
- Android: lifecycle-aware WebSocket reconnection on app resume
- Cleaner module boundaries between xdotool input and browser relay

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian
2026-04-02 11:01:38 +01:00
parent 7982714588
commit 2c866ad158
4 changed files with 111 additions and 86 deletions

View File

@@ -3,6 +3,7 @@ mod dwn;
mod node_message;
mod proxy;
mod remote_input;
mod remote_relay;
mod websocket;
use crate::api::rpc::RpcHandler;

View File

@@ -226,87 +226,4 @@ impl ApiHandler {
Ok(response)
}
/// Browser relay — receives input events from the broadcast channel and forwards to the browser.
pub(super) async fn handle_remote_relay(
req: Request<hyper::Body>,
mut relay_rx: broadcast::Receiver<String>,
) -> Result<Response<hyper::Body>> {
let (response, ws_fut_opt) = hyper_ws_listener::create_ws(req)
.map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?;
if let Some(ws_fut) = ws_fut_opt {
tokio::spawn(async move {
let ws_stream: WsStream = match ws_fut.await {
Ok(Ok(s)) => s,
Ok(Err(e)) => {
debug!("Remote relay WS handshake failed: {}", e);
return;
}
Err(e) => {
debug!("Remote relay WS task join failed: {}", e);
return;
}
};
info!("Remote relay browser connected");
let (mut tx, mut rx) = ws_stream.split();
let _ = tx.send(Message::Text(r#"{"t":"ok"}"#.to_string())).await;
let ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
tokio::pin!(ping_interval);
let mut last_activity = Instant::now();
const INACTIVITY_TIMEOUT: u64 = 300;
loop {
tokio::select! {
_ = ping_interval.tick() => {
if last_activity.elapsed().as_secs() >= INACTIVITY_TIMEOUT {
info!("Remote relay inactive, closing");
let _ = tx.send(Message::Close(None)).await;
break;
}
if tx.send(Message::Ping(vec![])).await.is_err() {
break;
}
}
input = relay_rx.recv() => {
match input {
Ok(text) => {
if tx.send(Message::Text(text)).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
debug!("Remote relay lagged, skipped {} messages", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
msg = rx.next() => {
match msg {
Some(Ok(Message::Pong(_))) | Some(Ok(Message::Text(_))) => {
last_activity = Instant::now();
}
Some(Ok(Message::Ping(data))) => {
last_activity = Instant::now();
let _ = tx.send(Message::Pong(data)).await;
}
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(_)) => { last_activity = Instant::now(); }
Some(Err(e)) => {
debug!("Remote relay stream error: {}", e);
break;
}
}
}
}
}
info!("Remote relay browser disconnected");
});
}
Ok(response)
}
}

View File

@@ -0,0 +1,83 @@
use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use hyper::{Request, Response};
use hyper_ws_listener::WsStream;
use tokio::sync::broadcast;
use tokio_tungstenite::tungstenite::Message;
use tracing::{debug, info};
use super::ApiHandler;
impl ApiHandler {
/// WebSocket endpoint for browser clients to receive relayed companion input.
/// The browser's remote-relay.ts dispatches these as DOM keyboard/mouse events.
pub(super) async fn handle_remote_relay(
req: Request<hyper::Body>,
mut relay_rx: broadcast::Receiver<String>,
) -> Result<Response<hyper::Body>> {
let (response, ws_fut_opt) = hyper_ws_listener::create_ws(req)
.map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?;
if let Some(ws_fut) = ws_fut_opt {
tokio::spawn(async move {
let ws_stream: WsStream = match ws_fut.await {
Ok(Ok(s)) => s,
Ok(Err(e)) => {
debug!("Remote relay WS handshake failed: {}", e);
return;
}
Err(e) => {
debug!("Remote relay WS task join failed: {}", e);
return;
}
};
info!("Remote relay client connected");
let (mut tx, mut rx) = ws_stream.split();
// Send ready message
let _ = tx.send(Message::Text(r#"{"t":"ok"}"#.to_string())).await;
let ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
tokio::pin!(ping_interval);
loop {
tokio::select! {
_ = ping_interval.tick() => {
if tx.send(Message::Ping(vec![])).await.is_err() {
break;
}
}
// Forward relayed input from companion app
msg = relay_rx.recv() => {
match msg {
Ok(text) => {
if tx.send(Message::Text(text)).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
debug!("Remote relay lagged, dropped {} messages", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
// Handle client-side messages (pong, close)
client_msg = rx.next() => {
match client_msg {
Some(Ok(Message::Pong(_))) | Some(Ok(Message::Ping(_))) => {}
Some(Ok(Message::Close(_))) | None => break,
_ => {}
}
}
}
}
info!("Remote relay client disconnected");
});
}
Ok(response)
}
}