feat(rpc): async-spawn install/uninstall/update lifecycle

Extend the async-spawn treatment previously shipped for Stop/Start/Restart
to the three remaining long-running lifecycle RPCs. Each wrapper validates
params, rejects duplicate in-flight ops, flips state to the transitional
variant (Installing/Removing/Updating), then spawns the existing inner
handler on tokio. RPC returns immediately with { status, package_id }; the
spawn task owns the terminal state write.

Install and update success arms explicitly set state=Running. The scan
loop merge (merge_preserving_transitional) refuses to overwrite
transitional states, so the spawn task must write the terminal state.
Uninstall's inner handler removes the entry entirely, so no explicit
terminal write is needed there.

Dispatcher and handler now thread self as Arc<Self> / &Arc<Self> so
spawned tasks can hold their own Arc without extra field cloning.

Transient install entry uses empty icon string. Hardcoding
/assets/img/app-icons/<id>.png 404s for apps that ship .svg or .webp
assets, which produces a broken-image flicker until the scanner refreshes
with manifest data. Empty string causes the frontend's icon computed to
fall through to the curated map, which has correct extensions.

Removed the inner "already updating" guard in update.rs — the wrapper
now owns duplicate-op detection for all three operations.
This commit is contained in:
archipelago
2026-04-23 06:57:50 -04:00
parent 4f279388a1
commit 2d5b859e18
6 changed files with 428 additions and 17 deletions

View File

@@ -321,7 +321,7 @@ impl ApiHandler {
match (method, path.as_str()) {
// RPC — auth is handled inside rpc handler per-method
(Method::POST, "/rpc/v1") => self.rpc_handler.handle(req_with_bytes).await,
(Method::POST, "/rpc/v1") => self.rpc_handler.clone().handle(req_with_bytes).await,
// Health — unauthenticated, returns JSON with service status
(Method::GET, "/health") => {

View File

@@ -1,10 +1,11 @@
use super::RpcHandler;
use anyhow::Result;
use std::sync::Arc;
impl RpcHandler {
/// Route an RPC method name to its handler, returning the result value.
pub(super) async fn dispatch(
&self,
self: &Arc<Self>,
method: &str,
params: Option<serde_json::Value>,
session_token: &Option<String>,
@@ -43,13 +44,16 @@ impl RpcHandler {
"container-logs" => self.handle_container_logs(params).await,
"container-health" => self.handle_container_health(params).await,
// Package management (for docker-compose apps)
"package.install" => self.handle_package_install(params).await,
// Package management (for docker-compose apps).
// install/uninstall/update return immediately with a
// transitional status; the actual work runs in a background
// tokio::spawn so the HTTP request doesn't block for minutes.
"package.install" => self.clone().spawn_package_install(params).await,
"package.start" => self.handle_package_start(params).await,
"package.stop" => self.handle_package_stop(params).await,
"package.restart" => self.handle_package_restart(params).await,
"package.uninstall" => self.handle_package_uninstall(params).await,
"package.update" => self.handle_package_update(params).await,
"package.uninstall" => self.clone().spawn_package_uninstall(params).await,
"package.update" => self.clone().spawn_package_update(params).await,
"app.filebrowser-token" => self.handle_filebrowser_token().await,
// Bundled app management (for pre-loaded container images)

View File

@@ -201,7 +201,10 @@ impl RpcHandler {
""
}
pub async fn handle(&self, req: Request<hyper::Body>) -> Result<Response<hyper::Body>> {
pub async fn handle(
self: Arc<Self>,
req: Request<hyper::Body>,
) -> Result<Response<hyper::Body>> {
// Extract session cookie before consuming the request
let (parts, body) = req.into_parts();
let session_token = session::extract_session_cookie(&parts.headers);
@@ -380,7 +383,7 @@ impl RpcHandler {
// Route to handler (track latency for metrics)
let rpc_start = std::time::Instant::now();
let result = self.dispatch(&rpc_req.method, params, &session_token).await;
let result = Self::dispatch(&self, &rpc_req.method, params, &session_token).await;
// Record RPC latency for monitoring
let elapsed_ms = rpc_start.elapsed().as_secs_f64() * 1000.0;

View File

@@ -0,0 +1,408 @@
//! Async wrappers for `package.install`, `package.uninstall`, `package.update`.
//!
//! The inner `handle_package_*` functions are large (install is 480 lines with
//! the stack dispatchers, update is 300, uninstall is 200) and do their own
//! fine-grained progress tracking via `install_progress` and `uninstall_stage`.
//! We wrap them rather than refactor them.
//!
//! Each wrapper:
//! 1. Parses + validates the RPC params (cheap, synchronous). Errors here
//! return immediately to the caller before any state change.
//! 2. Flips the package state to the transitional variant
//! (`Installing` / `Removing` / `Updating`) so the UI sees it on the
//! next WebSocket push (before the RPC response even lands).
//! 3. `tokio::spawn`s a background task that invokes the existing
//! `handle_package_*` method on the Arc-held self.
//! 4. On task success: no state change needed — the inner handler has
//! already written the terminal state (Running for install/update, or
//! removed the entry for uninstall).
//! 5. On task failure: revert state to the pre-transition value (or delete
//! the entry for install, since there was no pre-state), write a line
//! to the persistent install log, and clear any stale progress fields.
//! 6. Returns `{ "status": "installing" }` etc. immediately.
//!
//! The server package-scan loop's `merge_preserving_transitional` helper
//! already knows to preserve `Installing` / `Removing` / `Updating` between
//! scans, so live progress updates broadcast from inside the spawned task
//! reach the UI correctly.
use super::install::install_log;
use crate::api::rpc::RpcHandler;
use crate::data_model::PackageState;
use crate::state::StateManager;
use anyhow::Result;
use std::sync::Arc;
use tracing::{error, info, warn};
impl RpcHandler {
/// Async wrapper for `package.install`. Returns `{ "status": "installing" }`
/// immediately after flipping state to `Installing` and spawning the
/// actual install pipeline. On failure, removes the package entry from
/// state so the UI reverts to "not installed".
pub(in crate::api::rpc) async fn spawn_package_install(
self: Arc<Self>,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
// Extract + validate package_id synchronously so bad params fail
// fast without touching state.
let params_val = params
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let package_id = params_val
.get("id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing package id"))?
.to_string();
super::validation::validate_app_id(&package_id)?;
// Reject if already in a transitional lifecycle (prevents double-click
// queuing two installs on the same package).
{
let (data, _) = self.state_manager.get_snapshot().await;
if let Some(entry) = data.package_data.get(&package_id) {
if matches!(
entry.state,
PackageState::Installing
| PackageState::Removing
| PackageState::Updating
) {
return Err(anyhow::anyhow!(
"{} is already {:?}",
package_id,
entry.state
));
}
}
}
// Flip state to Installing BEFORE the spawn so the first WebSocket
// push carries the transitional state. Uses the same
// `create_installing_entry` path the inner handler would use once
// it starts pulling, so the UI sees a consistent shape.
flip_to_installing(&self.state_manager, &package_id).await;
install_log(&format!("INSTALL SPAWN: {}", package_id)).await;
let handler = Arc::clone(&self);
let package_id_spawn = package_id.clone();
tokio::spawn(async move {
match handler.handle_package_install(params).await {
Ok(_) => {
info!("package.install {}: complete", package_id_spawn);
// The install pipeline has verified the container is up
// and healthy (see install.rs post-start exit check).
// We MUST explicitly transition out of Installing here:
// `merge_preserving_transitional` in the package-scan
// loop treats Installing as RPC-owned and refuses to
// let the scanner overwrite it with the observed
// Running state. Without this write, the entry stays
// stuck at Installing forever.
set_package_state(
&handler.state_manager,
&package_id_spawn,
PackageState::Running,
)
.await;
handler.clear_install_progress(&package_id_spawn).await;
}
Err(e) => {
error!("package.install {} failed: {:#}", package_id_spawn, e);
install_log(&format!("INSTALL FAIL: {}{:#}", package_id_spawn, e))
.await;
// No pre-state to revert to — remove the entry entirely so
// the UI shows the app as not installed. The next package
// scan will re-create it only if podman actually has a
// container for it (partial install recovery).
remove_package_entry(&handler.state_manager, &package_id_spawn).await;
}
}
});
Ok(serde_json::json!({
"status": "installing",
"package_id": package_id,
}))
}
/// Async wrapper for `package.uninstall`. Returns `{ "status": "removing" }`
/// immediately. State stays `Removing` until the inner handler finishes
/// (including the `sudo rm -rf` of app data, which can take minutes for
/// bitcoin-core's chainstate). On failure, reverts to the pre-transition
/// state (usually Running or Stopped) so the user can retry.
pub(in crate::api::rpc) async fn spawn_package_uninstall(
self: Arc<Self>,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params_val = params
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let package_id = params_val
.get("id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing package id"))?
.to_string();
super::validation::validate_app_id(&package_id)?;
// Reject if already in a transitional lifecycle.
{
let (data, _) = self.state_manager.get_snapshot().await;
if let Some(entry) = data.package_data.get(&package_id) {
if matches!(
entry.state,
PackageState::Installing
| PackageState::Removing
| PackageState::Updating
) {
return Err(anyhow::anyhow!(
"{} is already {:?}",
package_id,
entry.state
));
}
}
}
let pre_state =
flip_package_state(&self.state_manager, &package_id, PackageState::Removing).await;
install_log(&format!("UNINSTALL SPAWN: {}", package_id)).await;
let handler = Arc::clone(&self);
let package_id_spawn = package_id.clone();
tokio::spawn(async move {
match handler.handle_package_uninstall(params).await {
Ok(_) => {
info!("package.uninstall {}: complete", package_id_spawn);
// Inner handler already removed the package entry on
// success. Nothing more to do here.
}
Err(e) => {
error!("package.uninstall {} failed: {:#}", package_id_spawn, e);
install_log(&format!(
"UNINSTALL FAIL: {}{:#}",
package_id_spawn, e
))
.await;
// Revert to pre-transition state so the user can retry.
// Also clear any stale uninstall_stage label.
if let Some(prev) = pre_state {
set_package_state_and_clear_uninstall_stage(
&handler.state_manager,
&package_id_spawn,
prev,
)
.await;
}
}
}
});
Ok(serde_json::json!({
"status": "removing",
"package_id": package_id,
}))
}
/// Async wrapper for `package.update`. Returns `{ "status": "updating" }`
/// immediately. The inner handler already manages its own rollback on
/// failure (restarts old containers); this wrapper just flips state and
/// spawns.
pub(in crate::api::rpc) async fn spawn_package_update(
self: Arc<Self>,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params_val = params
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let package_id = params_val
.get("id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing package id"))?
.to_string();
super::validation::validate_app_id(&package_id)?;
// Reject if already in a transitional lifecycle.
{
let (data, _) = self.state_manager.get_snapshot().await;
if let Some(entry) = data.package_data.get(&package_id) {
if matches!(
entry.state,
PackageState::Installing
| PackageState::Removing
| PackageState::Updating
) {
return Err(anyhow::anyhow!(
"{} is already {:?}",
package_id,
entry.state
));
}
}
}
// The inner handler flips state to Updating itself, but we do it
// here too so the transitional state lands before the spawn yields.
let pre_state =
flip_package_state(&self.state_manager, &package_id, PackageState::Updating).await;
install_log(&format!("UPDATE SPAWN: {}", package_id)).await;
let handler = Arc::clone(&self);
let package_id_spawn = package_id.clone();
tokio::spawn(async move {
match handler.handle_package_update(params).await {
Ok(_) => {
info!("package.update {}: complete", package_id_spawn);
// Same reasoning as install: the merge_preserving_transitional
// helper treats Updating as RPC-owned, so we MUST write the
// terminal Running state ourselves or the entry will stay
// stuck at Updating forever. The update pipeline has
// already verified the new container is running via its
// post-recreate check.
set_package_state(
&handler.state_manager,
&package_id_spawn,
PackageState::Running,
)
.await;
}
Err(e) => {
error!("package.update {} failed: {:#}", package_id_spawn, e);
install_log(&format!("UPDATE FAIL: {}{:#}", package_id_spawn, e))
.await;
// Inner handler already ran rollback_update + cleared
// update state, but be defensive: revert to pre-state
// in case the inner flow died before its cleanup.
if let Some(prev) = pre_state {
set_package_state(&handler.state_manager, &package_id_spawn, prev)
.await;
}
}
}
});
Ok(serde_json::json!({
"status": "updating",
"package_id": package_id,
}))
}
}
// ---------------------------------------------------------------------------
// State-manager helpers (free fns, usable from inside spawned tasks)
// ---------------------------------------------------------------------------
/// Create or update the entry for this package with `Installing` state.
/// Matches what the inner handler's `set_install_progress` would do on first
/// call, but fires before the spawn so the UI sees it immediately.
async fn flip_to_installing(state_manager: &StateManager, package_id: &str) {
use crate::data_model::{Description, Manifest, PackageDataEntry, StaticFiles};
let (mut data, _) = state_manager.get_snapshot().await;
let entry = data
.package_data
.entry(package_id.to_string())
.or_insert_with(|| PackageDataEntry {
state: PackageState::Installing,
health: None,
exit_code: None,
static_files: StaticFiles {
license: String::new(),
instructions: String::new(),
// Leave icon empty during the transient Installing window:
// hardcoding `<id>.png` is wrong for ~half our apps (many use
// `.svg` / `.webp`), producing a broken-image flicker until
// the scanner refreshes the entry. The frontend's `icon`
// computed falls through to `curatedMap.get(id)?.icon` which
// has the correct extensions for known apps.
icon: String::new(),
},
manifest: Manifest {
id: package_id.to_string(),
title: package_id.to_string(),
version: String::new(),
description: Description {
short: "Installing...".to_string(),
long: String::new(),
},
release_notes: String::new(),
license: String::new(),
wrapper_repo: String::new(),
upstream_repo: String::new(),
support_site: String::new(),
marketing_site: String::new(),
donation_url: None,
author: None,
website: None,
interfaces: None,
tier: None,
},
installed: None,
install_progress: None,
uninstall_stage: None,
available_update: None,
});
entry.state = PackageState::Installing;
state_manager.update_data(data).await;
}
/// Flip an existing entry's state and return the pre-flip value (or None if
/// no entry existed). Used for revert-on-failure.
async fn flip_package_state(
state_manager: &StateManager,
package_id: &str,
new_state: PackageState,
) -> Option<PackageState> {
let (mut data, _) = state_manager.get_snapshot().await;
let prev = data.package_data.get(package_id).map(|e| e.state.clone());
if let Some(entry) = data.package_data.get_mut(package_id) {
entry.state = new_state;
state_manager.update_data(data).await;
} else {
warn!(
"flip_package_state: no entry for {} — cannot flip",
package_id
);
}
prev
}
/// Set state unconditionally (no-op if entry no longer exists).
async fn set_package_state(
state_manager: &StateManager,
package_id: &str,
new_state: PackageState,
) {
let (mut data, _) = state_manager.get_snapshot().await;
if let Some(entry) = data.package_data.get_mut(package_id) {
if entry.state != new_state {
entry.state = new_state;
state_manager.update_data(data).await;
}
}
}
/// Set state and clear the uninstall_stage label. Used when an uninstall
/// fails and we revert — the user doesn't want a stale "Removing app data"
/// message sitting on a Running entry.
async fn set_package_state_and_clear_uninstall_stage(
state_manager: &StateManager,
package_id: &str,
new_state: PackageState,
) {
let (mut data, _) = state_manager.get_snapshot().await;
if let Some(entry) = data.package_data.get_mut(package_id) {
entry.state = new_state;
entry.uninstall_stage = None;
state_manager.update_data(data).await;
}
}
/// Remove a package entry from state. Used for install-failure cleanup
/// (since there's no pre-state to revert to — the entry was created
/// speculatively when we flipped to Installing).
async fn remove_package_entry(state_manager: &StateManager, package_id: &str) {
let (mut data, _) = state_manager.get_snapshot().await;
if data.package_data.remove(package_id).is_some() {
state_manager.update_data(data).await;
}
}

View File

@@ -1,3 +1,4 @@
mod async_lifecycle;
mod config;
mod dependencies;
mod install;

View File

@@ -34,15 +34,10 @@ impl RpcHandler {
let pinned = image_versions::pinned_image_for_app(package_id)
.ok_or_else(|| anyhow::anyhow!("No pinned image found for {}", package_id))?;
// Reject if already updating
{
let (data, _) = self.state_manager.get_snapshot().await;
if let Some(entry) = data.package_data.get(package_id) {
if entry.state == PackageState::Updating {
return Err(anyhow::anyhow!("{} is already updating", package_id));
}
}
}
// Note: the `already updating` guard lives in `spawn_package_update`
// (the async wrapper that dispatch actually routes to). By the time
// this inner function runs, the wrapper has already flipped state to
// `Updating`, so duplicating the check here would be a false positive.
install_log(&format!("UPDATE: {}{}", package_id, pinned)).await;