This commit is contained in:
Dorian
2026-01-27 22:37:08 +00:00
parent 8373a8c9be
commit 4126aa0b33
4 changed files with 143 additions and 25 deletions

View File

@@ -1,14 +1,15 @@
use crate::api::rpc::RpcHandler;
use crate::config::Config;
use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use hyper::{Method, Request, Response, StatusCode};
use hyper_tungstenite::tungstenite::Message;
use std::sync::Arc;
use tracing::debug;
use tracing::{debug, error, info};
pub struct ApiHandler {
_config: Config,
rpc_handler: Arc<RpcHandler>,
// Add other handlers here (websocket, static files, etc.)
}
impl ApiHandler {
@@ -25,37 +26,73 @@ impl ApiHandler {
&self,
req: Request<hyper::Body>,
) -> Result<Response<hyper::Body>> {
// Extract path and method before consuming req
let path = req.uri().path().to_string();
let method = req.method().clone();
// Convert body to bytes
// WebSocket upgrade must be handled before consuming the body
if method == Method::GET && path == "/ws/db" {
return Self::handle_websocket(req).await;
}
// Convert body to bytes for non-WS routes
let (parts, body) = req.into_parts();
let body_bytes = hyper::body::to_bytes(body).await
.map_err(|e| anyhow::anyhow!("Failed to read body: {}", e))?;
// Reconstruct request with body as Bytes for RPC handler
let req_with_bytes = Request::from_parts(parts, hyper::Body::from(body_bytes));
debug!("{} {}", method, path);
// Route requests
match (method, path.as_str()) {
(Method::POST, "/rpc/v1") => {
self.rpc_handler.handle(req_with_bytes).await
}
(Method::GET, "/health") => {
Ok(Response::builder()
.status(StatusCode::OK)
.body(hyper::Body::from("OK"))
.unwrap())
}
_ => {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(hyper::Body::from("Not Found"))
.unwrap())
}
(Method::POST, "/rpc/v1") => self.rpc_handler.handle(req_with_bytes).await,
(Method::GET, "/health") => Ok(Response::builder()
.status(StatusCode::OK)
.body(hyper::Body::from("OK"))
.unwrap()),
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(hyper::Body::from("Not Found"))
.unwrap()),
}
}
async fn handle_websocket(
req: Request<hyper::Body>,
) -> Result<Response<hyper::Body>> {
if !hyper_tungstenite::is_upgrade_request(&req) {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(hyper::Body::from("Expected WebSocket upgrade"))?);
}
let (response, ws_fut) = hyper_tungstenite::upgrade(req, None)
.map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?;
// Spawn a task to hold the connection open and avoid EPIPE when the UI connects
tokio::spawn(async move {
let mut ws_stream = match ws_fut.await {
Ok(s) => s,
Err(e) => {
error!("WebSocket handshake failed: {}", e);
return;
}
};
info!("WebSocket /ws/db connected");
// Keep connection open; UI may send/receive JSON patches. For now just accept and ignore.
while let Some(msg) = ws_stream.next().await {
match msg {
Ok(Message::Close(_)) => break,
Ok(Message::Ping(data)) => {
let _ = ws_stream.send(Message::Pong(data)).await;
}
Ok(_) => {}
Err(e) => {
debug!("WebSocket stream error: {}", e);
break;
}
}
}
});
Ok(response)
}
}