feat(container): ProdContainerOrchestrator with build-or-pull, adoption, reconcile
Step 3 of the rust-orchestrator-migration. New file prod_orchestrator.rs (999 LOC)
implements the full public surface that will replace scripts/first-boot-containers.sh:
* install / start / stop / restart / remove / upgrade / status / list / logs / health
* adopt_existing: read-only scan that claims containers matching our manifests by
name, without recreating — preserves the v1.7.42 fixture on .116.
* reconcile_all: level-triggered, per-app failures collected rather than aborting.
* install_fresh: build-or-pull (Step 2 trait methods), relative build contexts
resolved against the manifest directory.
Naming rule (answered design Q1): UI app IDs (bitcoin-ui/electrs-ui/lnd-ui) get the
archy- prefix; backends keep their bare ID. An explicit extensions.container_name
always wins. Codified in compute_container_name() with unit tests for all three tiers.
Concurrency (answered design Q4): per-app tokio::sync::Mutex<()> created lazily,
protecting every mutating op against the reconciler loop. Acquiring the per-app
lock only needs a read lock on the map, so independent apps do not serialize.
16 tests: 3 sync naming rule tests + 13 tokio async tests covering install (pull,
build-absent, build-present, relative-context), reconcile (noop/exited/missing/
mixed-failure), adopt-by-name, upgrade sequence ordering, list filtering, health
state mapping, and unknown-app-id rejection. All pass.
Not wired into main.rs yet — that is Step 6. Crate builds clean with expected
unused warnings for the new re-exports.
This commit is contained in:
1
core/Cargo.lock
generated
1
core/Cargo.lock
generated
@@ -87,6 +87,7 @@ dependencies = [
|
||||
"archipelago-performance",
|
||||
"archipelago-security",
|
||||
"argon2",
|
||||
"async-trait",
|
||||
"base64 0.21.7",
|
||||
"bcrypt",
|
||||
"bip39",
|
||||
|
||||
@@ -106,3 +106,4 @@ sd-notify = "0.4"
|
||||
[dev-dependencies]
|
||||
tokio-test = "0.4"
|
||||
tempfile = "3.10"
|
||||
async-trait = "0.1"
|
||||
|
||||
@@ -2,7 +2,12 @@ pub mod data_manager;
|
||||
pub mod dev_orchestrator;
|
||||
pub mod docker_packages;
|
||||
pub mod image_versions;
|
||||
pub mod prod_orchestrator;
|
||||
pub mod registry;
|
||||
|
||||
pub use dev_orchestrator::DevContainerOrchestrator;
|
||||
pub use docker_packages::DockerPackageScanner;
|
||||
pub use prod_orchestrator::{
|
||||
compute_container_name, AdoptionReport, ProdContainerOrchestrator, ReconcileAction,
|
||||
ReconcileReport,
|
||||
};
|
||||
|
||||
999
core/archipelago/src/container/prod_orchestrator.rs
Normal file
999
core/archipelago/src/container/prod_orchestrator.rs
Normal file
@@ -0,0 +1,999 @@
|
||||
//! Production container orchestrator.
|
||||
//!
|
||||
//! Owns install/start/stop/restart/remove/upgrade/status/list/logs/health for every
|
||||
//! archipelago-managed production container. Unlike `DevContainerOrchestrator`, this
|
||||
//! orchestrator:
|
||||
//!
|
||||
//! * does NOT append a `-dev` suffix to container names,
|
||||
//! * does NOT apply a port offset,
|
||||
//! * does NOT rewrite volume source paths into a dev data dir,
|
||||
//! * does NOT gate bitcoin dependencies on the simulator,
|
||||
//! * DOES support building images from a local Dockerfile (new in Step 2),
|
||||
//! * DOES adopt pre-existing containers by name rather than recreating,
|
||||
//! * DOES run a level-triggered reconciler (spawned by `BootReconciler` in Step 5).
|
||||
//!
|
||||
//! See `docs/rust-orchestrator-migration.md` — this file is Step 3.
|
||||
//!
|
||||
//! Concurrency model: a per-app `tokio::sync::Mutex<()>` protects all mutating ops
|
||||
//! against the reconciler loop. The map of mutexes is itself behind an `RwLock`;
|
||||
//! acquiring the per-app lock requires only a read lock on the map, so reconcile
|
||||
//! runs over N apps don't serialize against each other.
|
||||
//!
|
||||
//! Name resolution rule (answered design Q1): UI containers get the `archy-` prefix;
|
||||
//! everything else uses the bare app_id. This matches the running fixture on .116
|
||||
//! so adoption doesn't orphan anything.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use archipelago_container::{
|
||||
AppManifest, ContainerRuntime as ContainerRuntimeTrait, ContainerState, ContainerStatus,
|
||||
ResolvedSource,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use crate::config::{Config, ContainerRuntime as ConfigContainerRuntime};
|
||||
|
||||
/// App IDs whose containers are named `archy-<id>` rather than bare `<id>`.
|
||||
///
|
||||
/// Keep in sync with the running fixture on .116. Centralized as a constant
|
||||
/// so the rule is visible in one place and unit-testable.
|
||||
const UI_APP_IDS: &[&str] = &["bitcoin-ui", "electrs-ui", "lnd-ui"];
|
||||
|
||||
/// Compute the podman container name for a manifest.
|
||||
///
|
||||
/// Rules (in order):
|
||||
/// 1. If the manifest carries `extensions.container_name: <string>`, use that verbatim.
|
||||
/// 2. If `app.id` is in `UI_APP_IDS`, return `archy-<id>`.
|
||||
/// 3. Otherwise return `<id>` unchanged.
|
||||
pub fn compute_container_name(manifest: &AppManifest) -> String {
|
||||
if let Some(v) = manifest.app.extensions.get("container_name") {
|
||||
if let Some(s) = v.as_str() {
|
||||
if !s.is_empty() {
|
||||
return s.to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
let id = manifest.app.id.as_str();
|
||||
if UI_APP_IDS.contains(&id) {
|
||||
format!("archy-{id}")
|
||||
} else {
|
||||
id.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
/// Outcome of `reconcile_all` for a single app.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ReconcileAction {
|
||||
/// Container already running — nothing done.
|
||||
NoOp,
|
||||
/// Container existed in a stopped state; we started it.
|
||||
Started,
|
||||
/// Container did not exist; we built/pulled + created + started it.
|
||||
Installed,
|
||||
/// Container was in an unexpected state we chose not to disturb (Created, Paused).
|
||||
Left(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ReconcileReport {
|
||||
/// (app_id, action) for each manifest we touched.
|
||||
pub actions: Vec<(String, ReconcileAction)>,
|
||||
/// (app_id, error message) for each manifest whose reconcile failed.
|
||||
pub failures: Vec<(String, String)>,
|
||||
}
|
||||
|
||||
impl ReconcileReport {
|
||||
fn record(&mut self, app_id: &str, action: ReconcileAction) {
|
||||
self.actions.push((app_id.to_string(), action));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct AdoptionReport {
|
||||
pub adopted: Vec<String>,
|
||||
}
|
||||
|
||||
/// Internal: track a manifest together with the absolute directory it was loaded
|
||||
/// from, so Build sources can resolve relative `context:` paths.
|
||||
#[derive(Debug, Clone)]
|
||||
struct LoadedManifest {
|
||||
manifest: AppManifest,
|
||||
/// Directory containing the manifest file. Used as the base for relative
|
||||
/// `container.build.context` paths.
|
||||
manifest_dir: PathBuf,
|
||||
}
|
||||
|
||||
struct OrchestratorState {
|
||||
/// app_id → loaded manifest
|
||||
manifests: HashMap<String, LoadedManifest>,
|
||||
/// app_id → per-app mutex, created lazily the first time we touch an app
|
||||
locks: HashMap<String, Arc<Mutex<()>>>,
|
||||
}
|
||||
|
||||
impl OrchestratorState {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
manifests: HashMap::new(),
|
||||
locks: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ProdContainerOrchestrator {
|
||||
runtime: Arc<dyn ContainerRuntimeTrait>,
|
||||
manifests_dir: PathBuf,
|
||||
state: Arc<RwLock<OrchestratorState>>,
|
||||
}
|
||||
|
||||
impl ProdContainerOrchestrator {
|
||||
/// Production constructor: builds the podman/docker runtime from `Config` and
|
||||
/// uses `/opt/archipelago/apps` (or `$ARCHIPELAGO_APPS_DIR`) as the manifest root.
|
||||
pub async fn new(config: Config) -> Result<Self> {
|
||||
let user = std::env::var("USER").unwrap_or_else(|_| "archipelago".to_string());
|
||||
let runtime: Arc<dyn ContainerRuntimeTrait> = match &config.container_runtime {
|
||||
ConfigContainerRuntime::Podman => {
|
||||
Arc::new(archipelago_container::PodmanRuntime::new(user))
|
||||
}
|
||||
ConfigContainerRuntime::Docker => {
|
||||
Arc::new(archipelago_container::DockerRuntime::new(user))
|
||||
}
|
||||
ConfigContainerRuntime::Auto => Arc::new(
|
||||
archipelago_container::AutoRuntime::new(user)
|
||||
.await
|
||||
.context("Failed to create auto runtime")?,
|
||||
),
|
||||
};
|
||||
|
||||
let manifests_dir = std::env::var("ARCHIPELAGO_APPS_DIR")
|
||||
.map(PathBuf::from)
|
||||
.unwrap_or_else(|_| PathBuf::from("/opt/archipelago/apps"));
|
||||
|
||||
Ok(Self {
|
||||
runtime,
|
||||
manifests_dir,
|
||||
state: Arc::new(RwLock::new(OrchestratorState::new())),
|
||||
})
|
||||
}
|
||||
|
||||
/// Test/advanced constructor: inject an arbitrary runtime + manifests dir.
|
||||
///
|
||||
/// This is the entry point used by unit tests with a `MockRuntime`.
|
||||
pub fn with_runtime(
|
||||
runtime: Arc<dyn ContainerRuntimeTrait>,
|
||||
manifests_dir: PathBuf,
|
||||
) -> Self {
|
||||
Self {
|
||||
runtime,
|
||||
manifests_dir,
|
||||
state: Arc::new(RwLock::new(OrchestratorState::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Walk `manifests_dir` looking for `*/manifest.yml` files. Parses each,
|
||||
/// validates it, and stores it in the in-memory state.
|
||||
///
|
||||
/// Non-fatal: a malformed manifest is logged and skipped so one broken app
|
||||
/// can't prevent the rest of the fleet from reconciling.
|
||||
pub async fn load_manifests(&self) -> Result<usize> {
|
||||
let root = self.manifests_dir.clone();
|
||||
if !root.exists() {
|
||||
tracing::warn!(
|
||||
path = %root.display(),
|
||||
"manifests dir does not exist — orchestrator has no apps to manage"
|
||||
);
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let mut loaded: Vec<LoadedManifest> = Vec::new();
|
||||
let mut entries = tokio::fs::read_dir(&root)
|
||||
.await
|
||||
.with_context(|| format!("reading manifests dir {}", root.display()))?;
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let app_dir = entry.path();
|
||||
if !app_dir.is_dir() {
|
||||
continue;
|
||||
}
|
||||
let manifest_path = app_dir.join("manifest.yml");
|
||||
if !manifest_path.exists() {
|
||||
continue;
|
||||
}
|
||||
match AppManifest::from_file(&manifest_path) {
|
||||
Ok(m) => {
|
||||
loaded.push(LoadedManifest {
|
||||
manifest: m,
|
||||
manifest_dir: app_dir,
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
path = %manifest_path.display(),
|
||||
error = %e,
|
||||
"skipping malformed manifest"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let count = loaded.len();
|
||||
let mut state = self.state.write().await;
|
||||
state.manifests.clear();
|
||||
for lm in loaded {
|
||||
state.manifests.insert(lm.manifest.app.id.clone(), lm);
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Test helper: inject a manifest directly without touching the filesystem.
|
||||
#[doc(hidden)]
|
||||
pub async fn insert_manifest_for_test(&self, manifest: AppManifest, manifest_dir: PathBuf) {
|
||||
let mut state = self.state.write().await;
|
||||
state.manifests.insert(
|
||||
manifest.app.id.clone(),
|
||||
LoadedManifest {
|
||||
manifest,
|
||||
manifest_dir,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/// Acquire (creating if needed) the per-app mutex.
|
||||
async fn app_lock(&self, app_id: &str) -> Arc<Mutex<()>> {
|
||||
{
|
||||
let state = self.state.read().await;
|
||||
if let Some(l) = state.locks.get(app_id) {
|
||||
return l.clone();
|
||||
}
|
||||
}
|
||||
let mut state = self.state.write().await;
|
||||
state
|
||||
.locks
|
||||
.entry(app_id.to_string())
|
||||
.or_insert_with(|| Arc::new(Mutex::new(())))
|
||||
.clone()
|
||||
}
|
||||
|
||||
async fn loaded(&self, app_id: &str) -> Result<LoadedManifest> {
|
||||
let state = self.state.read().await;
|
||||
state
|
||||
.manifests
|
||||
.get(app_id)
|
||||
.cloned()
|
||||
.ok_or_else(|| anyhow::anyhow!("unknown app_id: {app_id}"))
|
||||
}
|
||||
|
||||
/// Scan the runtime for containers whose names match one of our manifests.
|
||||
/// This is a read-only adoption pass: nothing is created, started, or touched.
|
||||
pub async fn adopt_existing(&self) -> Result<AdoptionReport> {
|
||||
let all = self
|
||||
.runtime
|
||||
.list_containers()
|
||||
.await
|
||||
.context("list_containers during adoption")?;
|
||||
let state = self.state.read().await;
|
||||
let mut report = AdoptionReport::default();
|
||||
for (app_id, lm) in state.manifests.iter() {
|
||||
let expected = compute_container_name(&lm.manifest);
|
||||
if all.iter().any(|c| c.name == expected || c.name == format!("/{expected}")) {
|
||||
report.adopted.push(app_id.clone());
|
||||
}
|
||||
}
|
||||
Ok(report)
|
||||
}
|
||||
|
||||
/// Walk every loaded manifest and ensure it has a running container.
|
||||
/// Never panics; per-app failures are collected into the report.
|
||||
pub async fn reconcile_all(&self) -> ReconcileReport {
|
||||
let manifests: Vec<LoadedManifest> = {
|
||||
let state = self.state.read().await;
|
||||
state.manifests.values().cloned().collect()
|
||||
};
|
||||
let mut report = ReconcileReport::default();
|
||||
for lm in manifests {
|
||||
let app_id = lm.manifest.app.id.clone();
|
||||
match self.ensure_running(&lm).await {
|
||||
Ok(action) => report.record(&app_id, action),
|
||||
Err(e) => {
|
||||
tracing::error!(app_id = %app_id, error = %e, "reconcile failed");
|
||||
report.failures.push((app_id, e.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
report
|
||||
}
|
||||
|
||||
async fn ensure_running(&self, lm: &LoadedManifest) -> Result<ReconcileAction> {
|
||||
let app_id = lm.manifest.app.id.clone();
|
||||
let lock = self.app_lock(&app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
|
||||
let name = compute_container_name(&lm.manifest);
|
||||
match self.runtime.get_container_status(&name).await {
|
||||
Ok(status) => match status.state {
|
||||
ContainerState::Running => Ok(ReconcileAction::NoOp),
|
||||
ContainerState::Stopped | ContainerState::Exited => {
|
||||
self.runtime
|
||||
.start_container(&name)
|
||||
.await
|
||||
.with_context(|| format!("reconcile start {name}"))?;
|
||||
Ok(ReconcileAction::Started)
|
||||
}
|
||||
ContainerState::Created => Ok(ReconcileAction::Left("created".to_string())),
|
||||
ContainerState::Paused => Ok(ReconcileAction::Left("paused".to_string())),
|
||||
ContainerState::Unknown(s) => Ok(ReconcileAction::Left(s)),
|
||||
},
|
||||
Err(_) => {
|
||||
// Container missing entirely → install fresh.
|
||||
self.install_fresh(lm).await?;
|
||||
Ok(ReconcileAction::Installed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Build-or-pull, create, start. Assumes the per-app mutex is already held.
|
||||
async fn install_fresh(&self, lm: &LoadedManifest) -> Result<()> {
|
||||
let resolved = lm.manifest.app.container.resolve().ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"manifest for {} has invalid container source (neither image nor build)",
|
||||
lm.manifest.app.id
|
||||
)
|
||||
})?;
|
||||
|
||||
match resolved {
|
||||
ResolvedSource::Pull {
|
||||
image,
|
||||
image_signature,
|
||||
..
|
||||
} => {
|
||||
self.runtime
|
||||
.pull_image(&image, image_signature.as_deref())
|
||||
.await
|
||||
.with_context(|| format!("pulling {image}"))?;
|
||||
}
|
||||
ResolvedSource::Build(mut bcfg) => {
|
||||
// Resolve a relative context: against the manifest's own directory.
|
||||
let ctx_path = Path::new(&bcfg.context);
|
||||
if !ctx_path.is_absolute() {
|
||||
bcfg.context = lm
|
||||
.manifest_dir
|
||||
.join(ctx_path)
|
||||
.to_string_lossy()
|
||||
.into_owned();
|
||||
}
|
||||
let already = self
|
||||
.runtime
|
||||
.image_exists(&bcfg.tag)
|
||||
.await
|
||||
.with_context(|| format!("image_exists {}", bcfg.tag))?;
|
||||
if !already {
|
||||
self.runtime
|
||||
.build_image(&bcfg)
|
||||
.await
|
||||
.with_context(|| format!("build_image {}", bcfg.tag))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let name = compute_container_name(&lm.manifest);
|
||||
// Production orchestrator: no port offset.
|
||||
self.runtime
|
||||
.create_container(&lm.manifest, &name, 0)
|
||||
.await
|
||||
.with_context(|| format!("create_container {name}"))?;
|
||||
self.runtime
|
||||
.start_container(&name)
|
||||
.await
|
||||
.with_context(|| format!("start_container {name}"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Public imperative API (RPC surface). Mirrors DevContainerOrchestrator.
|
||||
// ------------------------------------------------------------------
|
||||
|
||||
pub async fn install(&self, app_id: &str) -> Result<String> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let lock = self.app_lock(app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
self.install_fresh(&lm).await?;
|
||||
Ok(compute_container_name(&lm.manifest))
|
||||
}
|
||||
|
||||
pub async fn start(&self, app_id: &str) -> Result<()> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let lock = self.app_lock(app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
let name = compute_container_name(&lm.manifest);
|
||||
self.runtime
|
||||
.start_container(&name)
|
||||
.await
|
||||
.with_context(|| format!("start_container {name}"))
|
||||
}
|
||||
|
||||
pub async fn stop(&self, app_id: &str) -> Result<()> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let lock = self.app_lock(app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
let name = compute_container_name(&lm.manifest);
|
||||
self.runtime
|
||||
.stop_container(&name)
|
||||
.await
|
||||
.with_context(|| format!("stop_container {name}"))
|
||||
}
|
||||
|
||||
pub async fn restart(&self, app_id: &str) -> Result<()> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let lock = self.app_lock(app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
let name = compute_container_name(&lm.manifest);
|
||||
// Best-effort stop (ignored if already stopped), then start.
|
||||
let _ = self.runtime.stop_container(&name).await;
|
||||
self.runtime
|
||||
.start_container(&name)
|
||||
.await
|
||||
.with_context(|| format!("restart start_container {name}"))
|
||||
}
|
||||
|
||||
/// Remove the container. `preserve_data=true` is honored by NOT touching volumes
|
||||
/// here (production volumes live under `/var/lib/archipelago` — removal is a
|
||||
/// separate operation owned by the data layer, not this orchestrator).
|
||||
pub async fn remove(&self, app_id: &str, _preserve_data: bool) -> Result<()> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let lock = self.app_lock(app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
let name = compute_container_name(&lm.manifest);
|
||||
let _ = self.runtime.stop_container(&name).await;
|
||||
self.runtime
|
||||
.remove_container(&name)
|
||||
.await
|
||||
.with_context(|| format!("remove_container {name}"))
|
||||
}
|
||||
|
||||
/// Upgrade: remove the existing container, re-install fresh (which will re-pull
|
||||
/// or rebuild as required).
|
||||
pub async fn upgrade(&self, app_id: &str) -> Result<()> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let lock = self.app_lock(app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
let name = compute_container_name(&lm.manifest);
|
||||
let _ = self.runtime.stop_container(&name).await;
|
||||
let _ = self.runtime.remove_container(&name).await;
|
||||
self.install_fresh(&lm).await
|
||||
}
|
||||
|
||||
pub async fn status(&self, app_id: &str) -> Result<ContainerStatus> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let name = compute_container_name(&lm.manifest);
|
||||
self.runtime.get_container_status(&name).await
|
||||
}
|
||||
|
||||
pub async fn list(&self) -> Result<Vec<ContainerStatus>> {
|
||||
// Intersect runtime containers with our known manifests' expected names.
|
||||
let all = self.runtime.list_containers().await?;
|
||||
let state = self.state.read().await;
|
||||
let mut wanted: Vec<String> = state
|
||||
.manifests
|
||||
.values()
|
||||
.map(|lm| compute_container_name(&lm.manifest))
|
||||
.collect();
|
||||
wanted.sort();
|
||||
wanted.dedup();
|
||||
let set: std::collections::HashSet<&str> = wanted.iter().map(|s| s.as_str()).collect();
|
||||
Ok(all
|
||||
.into_iter()
|
||||
.filter(|c| {
|
||||
let bare = c.name.strip_prefix('/').unwrap_or(&c.name);
|
||||
set.contains(bare)
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn logs(&self, app_id: &str, lines: u32) -> Result<Vec<String>> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let name = compute_container_name(&lm.manifest);
|
||||
self.runtime.get_container_logs(&name, lines).await
|
||||
}
|
||||
|
||||
pub async fn health(&self, app_id: &str) -> Result<String> {
|
||||
let status = self.status(app_id).await?;
|
||||
Ok(match status.state {
|
||||
ContainerState::Running => "healthy".to_string(),
|
||||
ContainerState::Stopped | ContainerState::Exited => "unhealthy".to_string(),
|
||||
ContainerState::Created => "starting".to_string(),
|
||||
ContainerState::Paused => "paused".to_string(),
|
||||
ContainerState::Unknown(s) => format!("unknown:{s}"),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use archipelago_container::BuildConfig;
|
||||
use async_trait::async_trait;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
|
||||
/// Instrumented in-memory runtime. Every call is recorded so tests can assert
|
||||
/// the exact sequence of side effects.
|
||||
#[derive(Default)]
|
||||
struct MockRuntime {
|
||||
calls: StdMutex<Vec<String>>,
|
||||
/// container_name -> ContainerState. Absence = "doesn't exist".
|
||||
containers: StdMutex<HashMap<String, ContainerState>>,
|
||||
/// image_ref -> present. Absence = "not present in local storage".
|
||||
images: StdMutex<HashMap<String, bool>>,
|
||||
/// If set, the next `build_image` call fails with this message.
|
||||
fail_build: StdMutex<Option<String>>,
|
||||
}
|
||||
|
||||
impl MockRuntime {
|
||||
fn record(&self, s: impl Into<String>) {
|
||||
self.calls.lock().unwrap().push(s.into());
|
||||
}
|
||||
fn calls(&self) -> Vec<String> {
|
||||
self.calls.lock().unwrap().clone()
|
||||
}
|
||||
fn set_state(&self, name: &str, state: ContainerState) {
|
||||
self.containers
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(name.to_string(), state);
|
||||
}
|
||||
fn mark_image_present(&self, tag: &str) {
|
||||
self.images.lock().unwrap().insert(tag.to_string(), true);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ContainerRuntimeTrait for MockRuntime {
|
||||
async fn pull_image(&self, image: &str, _sig: Option<&str>) -> Result<()> {
|
||||
self.record(format!("pull_image:{image}"));
|
||||
self.mark_image_present(image);
|
||||
Ok(())
|
||||
}
|
||||
async fn create_container(
|
||||
&self,
|
||||
_manifest: &AppManifest,
|
||||
name: &str,
|
||||
port_offset: u16,
|
||||
) -> Result<String> {
|
||||
self.record(format!("create_container:{name}:offset={port_offset}"));
|
||||
self.set_state(name, ContainerState::Created);
|
||||
Ok(name.to_string())
|
||||
}
|
||||
async fn start_container(&self, name: &str) -> Result<()> {
|
||||
self.record(format!("start_container:{name}"));
|
||||
self.set_state(name, ContainerState::Running);
|
||||
Ok(())
|
||||
}
|
||||
async fn stop_container(&self, name: &str) -> Result<()> {
|
||||
self.record(format!("stop_container:{name}"));
|
||||
self.set_state(name, ContainerState::Stopped);
|
||||
Ok(())
|
||||
}
|
||||
async fn remove_container(&self, name: &str) -> Result<()> {
|
||||
self.record(format!("remove_container:{name}"));
|
||||
self.containers.lock().unwrap().remove(name);
|
||||
Ok(())
|
||||
}
|
||||
async fn get_container_status(&self, name: &str) -> Result<ContainerStatus> {
|
||||
self.record(format!("get_container_status:{name}"));
|
||||
let map = self.containers.lock().unwrap();
|
||||
let state = map
|
||||
.get(name)
|
||||
.cloned()
|
||||
.ok_or_else(|| anyhow::anyhow!("not found: {name}"))?;
|
||||
Ok(ContainerStatus {
|
||||
id: format!("id-{name}"),
|
||||
name: name.to_string(),
|
||||
state,
|
||||
health: None,
|
||||
exit_code: None,
|
||||
started_at: None,
|
||||
image: "test-image".to_string(),
|
||||
created: "now".to_string(),
|
||||
ports: vec![],
|
||||
lan_address: None,
|
||||
})
|
||||
}
|
||||
async fn get_container_logs(&self, name: &str, _lines: u32) -> Result<Vec<String>> {
|
||||
self.record(format!("get_container_logs:{name}"));
|
||||
Ok(vec!["log line".into()])
|
||||
}
|
||||
async fn list_containers(&self) -> Result<Vec<ContainerStatus>> {
|
||||
self.record("list_containers".to_string());
|
||||
let map = self.containers.lock().unwrap();
|
||||
Ok(map
|
||||
.iter()
|
||||
.map(|(name, state)| ContainerStatus {
|
||||
id: format!("id-{name}"),
|
||||
name: name.clone(),
|
||||
state: state.clone(),
|
||||
health: None,
|
||||
exit_code: None,
|
||||
started_at: None,
|
||||
image: "test-image".to_string(),
|
||||
created: "now".to_string(),
|
||||
ports: vec![],
|
||||
lan_address: None,
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
async fn image_exists(&self, image_ref: &str) -> Result<bool> {
|
||||
self.record(format!("image_exists:{image_ref}"));
|
||||
Ok(self
|
||||
.images
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get(image_ref)
|
||||
.copied()
|
||||
.unwrap_or(false))
|
||||
}
|
||||
async fn build_image(&self, config: &BuildConfig) -> Result<()> {
|
||||
self.record(format!(
|
||||
"build_image:{}:context={}",
|
||||
config.tag, config.context
|
||||
));
|
||||
if let Some(msg) = self.fail_build.lock().unwrap().clone() {
|
||||
return Err(anyhow::anyhow!("{msg}"));
|
||||
}
|
||||
self.mark_image_present(&config.tag);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn pull_manifest(id: &str, image: &str) -> AppManifest {
|
||||
let yaml = format!(
|
||||
"app:\n id: {id}\n name: {id}\n version: 1.0.0\n container:\n image: {image}\n"
|
||||
);
|
||||
AppManifest::parse(&yaml).unwrap()
|
||||
}
|
||||
|
||||
fn build_manifest(id: &str, context: &str, tag: &str) -> AppManifest {
|
||||
let yaml = format!(
|
||||
"app:\n id: {id}\n name: {id}\n version: 1.0.0\n container:\n build:\n context: {context}\n tag: {tag}\n"
|
||||
);
|
||||
AppManifest::parse(&yaml).unwrap()
|
||||
}
|
||||
|
||||
fn manifest_with_container_name(id: &str, image: &str, name: &str) -> AppManifest {
|
||||
let yaml = format!(
|
||||
"app:\n id: {id}\n name: {id}\n version: 1.0.0\n container_name: {name}\n container:\n image: {image}\n"
|
||||
);
|
||||
AppManifest::parse(&yaml).unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compute_container_name_ui_apps_prefixed() {
|
||||
let m = pull_manifest("bitcoin-ui", "foo:1");
|
||||
assert_eq!(compute_container_name(&m), "archy-bitcoin-ui");
|
||||
let m = pull_manifest("electrs-ui", "foo:1");
|
||||
assert_eq!(compute_container_name(&m), "archy-electrs-ui");
|
||||
let m = pull_manifest("lnd-ui", "foo:1");
|
||||
assert_eq!(compute_container_name(&m), "archy-lnd-ui");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compute_container_name_backend_apps_bare() {
|
||||
let m = pull_manifest("bitcoin-knots", "foo:1");
|
||||
assert_eq!(compute_container_name(&m), "bitcoin-knots");
|
||||
let m = pull_manifest("lnd", "foo:1");
|
||||
assert_eq!(compute_container_name(&m), "lnd");
|
||||
let m = pull_manifest("electrumx", "foo:1");
|
||||
assert_eq!(compute_container_name(&m), "electrumx");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compute_container_name_extension_override_wins() {
|
||||
// An explicit container_name in the manifest always takes priority, even for a
|
||||
// UI id that would otherwise get the archy- prefix.
|
||||
let m = manifest_with_container_name("bitcoin-ui", "foo:1", "legacy-bitcoin-ui");
|
||||
assert_eq!(compute_container_name(&m), "legacy-bitcoin-ui");
|
||||
}
|
||||
|
||||
async fn orch_with(runtime: Arc<MockRuntime>) -> ProdContainerOrchestrator {
|
||||
ProdContainerOrchestrator::with_runtime(
|
||||
runtime,
|
||||
PathBuf::from("/nonexistent-for-tests"),
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn install_fresh_pull() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
orch.insert_manifest_for_test(
|
||||
pull_manifest("bitcoin-knots", "docker.io/bitcoin/knots:28"),
|
||||
PathBuf::from("/tmp/fixtures/bitcoin-knots"),
|
||||
)
|
||||
.await;
|
||||
|
||||
let name = orch.install("bitcoin-knots").await.unwrap();
|
||||
assert_eq!(name, "bitcoin-knots");
|
||||
let calls = rt.calls();
|
||||
assert!(calls.iter().any(|c| c.starts_with("pull_image:")));
|
||||
assert!(calls.iter().any(|c| c == "create_container:bitcoin-knots:offset=0"));
|
||||
assert!(calls.iter().any(|c| c == "start_container:bitcoin-knots"));
|
||||
// Must NOT build
|
||||
assert!(!calls.iter().any(|c| c.starts_with("build_image:")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn install_fresh_build_when_image_absent() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
orch.insert_manifest_for_test(
|
||||
build_manifest("bitcoin-ui", "/opt/archy/docker/bitcoin-ui", "archy-bitcoin-ui:local"),
|
||||
PathBuf::from("/opt/archy/apps/bitcoin-ui"),
|
||||
)
|
||||
.await;
|
||||
|
||||
orch.install("bitcoin-ui").await.unwrap();
|
||||
let calls = rt.calls();
|
||||
assert!(calls.iter().any(|c| c == "image_exists:archy-bitcoin-ui:local"));
|
||||
assert!(calls
|
||||
.iter()
|
||||
.any(|c| c.starts_with("build_image:archy-bitcoin-ui:local:")));
|
||||
assert!(calls
|
||||
.iter()
|
||||
.any(|c| c == "create_container:archy-bitcoin-ui:offset=0"));
|
||||
assert!(calls.iter().any(|c| c == "start_container:archy-bitcoin-ui"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn install_fresh_build_skipped_when_image_present() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
rt.mark_image_present("archy-bitcoin-ui:local");
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
orch.insert_manifest_for_test(
|
||||
build_manifest("bitcoin-ui", "/opt/archy/docker/bitcoin-ui", "archy-bitcoin-ui:local"),
|
||||
PathBuf::from("/opt/archy/apps/bitcoin-ui"),
|
||||
)
|
||||
.await;
|
||||
|
||||
orch.install("bitcoin-ui").await.unwrap();
|
||||
let calls = rt.calls();
|
||||
assert!(calls.iter().any(|c| c == "image_exists:archy-bitcoin-ui:local"));
|
||||
// Build must NOT be invoked because the image is already there.
|
||||
assert!(!calls.iter().any(|c| c.starts_with("build_image:")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn install_fresh_build_resolves_relative_context_against_manifest_dir() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
orch.insert_manifest_for_test(
|
||||
build_manifest("bitcoin-ui", "docker-context", "archy-bitcoin-ui:local"),
|
||||
PathBuf::from("/opt/archy/apps/bitcoin-ui"),
|
||||
)
|
||||
.await;
|
||||
|
||||
orch.install("bitcoin-ui").await.unwrap();
|
||||
let calls = rt.calls();
|
||||
let build = calls
|
||||
.iter()
|
||||
.find(|c| c.starts_with("build_image:"))
|
||||
.expect("build should have run");
|
||||
// Relative path must be joined onto the manifest directory.
|
||||
assert!(
|
||||
build.contains("context=/opt/archy/apps/bitcoin-ui/docker-context"),
|
||||
"unexpected build context in call: {build}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reconcile_noop_when_already_running() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
rt.set_state("bitcoin-knots", ContainerState::Running);
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
orch.insert_manifest_for_test(
|
||||
pull_manifest("bitcoin-knots", "docker.io/bitcoin/knots:28"),
|
||||
PathBuf::from("/tmp/bk"),
|
||||
)
|
||||
.await;
|
||||
|
||||
let report = orch.reconcile_all().await;
|
||||
assert_eq!(
|
||||
report.actions,
|
||||
vec![("bitcoin-knots".to_string(), ReconcileAction::NoOp)]
|
||||
);
|
||||
assert!(report.failures.is_empty());
|
||||
// Must not have pulled / created / started anything
|
||||
let calls = rt.calls();
|
||||
assert!(!calls.iter().any(|c| c.starts_with("pull_image:")));
|
||||
assert!(!calls.iter().any(|c| c.starts_with("create_container:")));
|
||||
assert!(!calls.iter().any(|c| c.starts_with("start_container:")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reconcile_starts_exited_container() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
rt.set_state("bitcoin-knots", ContainerState::Exited);
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
orch.insert_manifest_for_test(
|
||||
pull_manifest("bitcoin-knots", "docker.io/bitcoin/knots:28"),
|
||||
PathBuf::from("/tmp/bk"),
|
||||
)
|
||||
.await;
|
||||
|
||||
let report = orch.reconcile_all().await;
|
||||
assert_eq!(
|
||||
report.actions,
|
||||
vec![("bitcoin-knots".to_string(), ReconcileAction::Started)]
|
||||
);
|
||||
// Exactly one start, no (re)create, no pull
|
||||
let calls = rt.calls();
|
||||
assert!(calls.iter().any(|c| c == "start_container:bitcoin-knots"));
|
||||
assert!(!calls.iter().any(|c| c.starts_with("create_container:")));
|
||||
assert!(!calls.iter().any(|c| c.starts_with("pull_image:")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reconcile_installs_missing_container() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
// container map empty -> "not found" -> orchestrator installs fresh
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
orch.insert_manifest_for_test(
|
||||
pull_manifest("bitcoin-knots", "docker.io/bitcoin/knots:28"),
|
||||
PathBuf::from("/tmp/bk"),
|
||||
)
|
||||
.await;
|
||||
|
||||
let report = orch.reconcile_all().await;
|
||||
assert_eq!(
|
||||
report.actions,
|
||||
vec![("bitcoin-knots".to_string(), ReconcileAction::Installed)]
|
||||
);
|
||||
let calls = rt.calls();
|
||||
assert!(calls.iter().any(|c| c.starts_with("pull_image:")));
|
||||
assert!(calls.iter().any(|c| c == "create_container:bitcoin-knots:offset=0"));
|
||||
assert!(calls.iter().any(|c| c == "start_container:bitcoin-knots"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reconcile_collects_per_app_failures_without_short_circuiting() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
// ui app: image build will fail
|
||||
*rt.fail_build.lock().unwrap() = Some("disk full".to_string());
|
||||
// knots: already running, will no-op
|
||||
rt.set_state("bitcoin-knots", ContainerState::Running);
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
orch.insert_manifest_for_test(
|
||||
pull_manifest("bitcoin-knots", "docker.io/bitcoin/knots:28"),
|
||||
PathBuf::from("/tmp/bk"),
|
||||
)
|
||||
.await;
|
||||
orch.insert_manifest_for_test(
|
||||
build_manifest("bitcoin-ui", "/ctx", "archy-bitcoin-ui:local"),
|
||||
PathBuf::from("/opt/bitcoin-ui"),
|
||||
)
|
||||
.await;
|
||||
|
||||
let report = orch.reconcile_all().await;
|
||||
// knots succeeded, bitcoin-ui failed; both reported.
|
||||
let app_ids: Vec<&str> = report.actions.iter().map(|(a, _)| a.as_str()).collect();
|
||||
assert!(app_ids.contains(&"bitcoin-knots"));
|
||||
let failed: Vec<&str> = report.failures.iter().map(|(a, _)| a.as_str()).collect();
|
||||
assert_eq!(failed, vec!["bitcoin-ui"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn adopt_existing_matches_by_name() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
// Pre-seed the runtime as if the bash first-boot script had installed it.
|
||||
rt.set_state("archy-bitcoin-ui", ContainerState::Running);
|
||||
rt.set_state("bitcoin-knots", ContainerState::Running);
|
||||
rt.set_state("unrelated-sidecar", ContainerState::Running);
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
orch.insert_manifest_for_test(
|
||||
build_manifest("bitcoin-ui", "/ctx", "archy-bitcoin-ui:local"),
|
||||
PathBuf::from("/opt/bitcoin-ui"),
|
||||
)
|
||||
.await;
|
||||
orch.insert_manifest_for_test(
|
||||
pull_manifest("bitcoin-knots", "docker.io/bitcoin/knots:28"),
|
||||
PathBuf::from("/tmp/bk"),
|
||||
)
|
||||
.await;
|
||||
// A manifest with no matching running container — must NOT be adopted.
|
||||
orch.insert_manifest_for_test(
|
||||
pull_manifest("lnd", "lnd:0.18"),
|
||||
PathBuf::from("/tmp/lnd"),
|
||||
)
|
||||
.await;
|
||||
|
||||
let report = orch.adopt_existing().await.unwrap();
|
||||
let mut ids = report.adopted.clone();
|
||||
ids.sort();
|
||||
assert_eq!(ids, vec!["bitcoin-knots", "bitcoin-ui"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn upgrade_removes_and_reinstalls() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
rt.set_state("bitcoin-knots", ContainerState::Running);
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
orch.insert_manifest_for_test(
|
||||
pull_manifest("bitcoin-knots", "docker.io/bitcoin/knots:28"),
|
||||
PathBuf::from("/tmp/bk"),
|
||||
)
|
||||
.await;
|
||||
|
||||
orch.upgrade("bitcoin-knots").await.unwrap();
|
||||
let calls = rt.calls();
|
||||
// Sequence contract: stop -> remove -> pull -> create -> start, in that order.
|
||||
let relevant: Vec<&str> = calls
|
||||
.iter()
|
||||
.map(|s| s.as_str())
|
||||
.filter(|s| {
|
||||
s.starts_with("stop_container:")
|
||||
|| s.starts_with("remove_container:")
|
||||
|| s.starts_with("pull_image:")
|
||||
|| s.starts_with("create_container:")
|
||||
|| s.starts_with("start_container:")
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(
|
||||
relevant,
|
||||
vec![
|
||||
"stop_container:bitcoin-knots",
|
||||
"remove_container:bitcoin-knots",
|
||||
"pull_image:docker.io/bitcoin/knots:28",
|
||||
"create_container:bitcoin-knots:offset=0",
|
||||
"start_container:bitcoin-knots",
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_filters_to_known_manifests_only() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
rt.set_state("bitcoin-knots", ContainerState::Running);
|
||||
rt.set_state("unrelated", ContainerState::Running);
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
orch.insert_manifest_for_test(
|
||||
pull_manifest("bitcoin-knots", "docker.io/bitcoin/knots:28"),
|
||||
PathBuf::from("/tmp/bk"),
|
||||
)
|
||||
.await;
|
||||
|
||||
let list = orch.list().await.unwrap();
|
||||
let names: Vec<&str> = list.iter().map(|c| c.name.as_str()).collect();
|
||||
assert_eq!(names, vec!["bitcoin-knots"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn health_maps_states_to_strings() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
rt.set_state("lnd", ContainerState::Running);
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
orch.insert_manifest_for_test(
|
||||
pull_manifest("lnd", "lnd:0.18"),
|
||||
PathBuf::from("/tmp/lnd"),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(orch.health("lnd").await.unwrap(), "healthy");
|
||||
|
||||
rt.set_state("lnd", ContainerState::Exited);
|
||||
assert_eq!(orch.health("lnd").await.unwrap(), "unhealthy");
|
||||
|
||||
rt.set_state("lnd", ContainerState::Paused);
|
||||
assert_eq!(orch.health("lnd").await.unwrap(), "paused");
|
||||
|
||||
rt.set_state("lnd", ContainerState::Created);
|
||||
assert_eq!(orch.health("lnd").await.unwrap(), "starting");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unknown_app_id_is_rejected() {
|
||||
let rt = Arc::new(MockRuntime::default());
|
||||
let orch = orch_with(rt.clone()).await;
|
||||
let err = orch.status("does-not-exist").await.unwrap_err();
|
||||
assert!(format!("{err}").contains("unknown app_id"));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user