feat(container): ContainerOrchestrator trait, RpcHandler uses it in prod
Step 4 of the rust-orchestrator migration. Unifies the container lifecycle
surface behind a single trait so the RPC layer stops caring whether it is
talking to the dev or prod orchestrator.
* New trait core/archipelago/src/container/traits.rs: ContainerOrchestrator
with install / start / stop / restart / remove / upgrade / status / list /
logs / health, all keyed by app_id. Every method is async_trait-based.
* ProdContainerOrchestrator: the lifecycle methods are moved from inherent
impl into the trait impl (avoids name-shadowing recursion). Adoption and
reconcile remain inherent since only main.rs / BootReconciler call them.
* DevContainerOrchestrator: new trait impl that forwards to the existing
Dev-named methods, applying the dev container-name + port-offset rules
internally. New load_manifest_for() helper resolves app_id to
<data_dir>/apps/<app_id>/manifest.yml so trait-level install(app_id)
works in dev too. install_container(manifest, path) stays inherent for
the manifest-path RPC shape.
* RpcHandler now holds Option<Arc<dyn ContainerOrchestrator>> and, when in
dev mode, a separate Option<Arc<DevContainerOrchestrator>> for the
manifest_path install RPC. In prod mode RpcHandler::new() constructs a
ProdContainerOrchestrator and calls load_manifests() at startup.
* All seven container-* RPC guards no longer say dev mode required.
container-install still requires dev mode because its manifest_path
argument has no prod meaning; every other container RPC now works in both
modes via the trait.
BOOT STILL DOES NOT USE THIS. main.rs wire-up (Step 6) and BootReconciler
(Step 5) come next. Until then the prod orchestrator is constructed but nothing
populates /opt/archipelago/apps so it has zero manifests to manage, matching
the pre-Step-4 behaviour.
Verification: cargo build -p archipelago clean (11 expected unused method
warnings for methods not yet wired from main.rs). cargo test -p archipelago:
all 21 container::* tests pass (16 prod_orchestrator + 5 others). 24 other
test failures are pre-existing and unrelated (identity_manager / session /
wallet / mesh / credentials — all independently flaky on file-backed state).
This commit is contained in:
@@ -103,7 +103,9 @@ mdns-sd = "0.18"
|
||||
# Systemd watchdog notification
|
||||
sd-notify = "0.4"
|
||||
|
||||
# Trait objects for async methods (container orchestrator trait, Step 4)
|
||||
async-trait = "0.1"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-test = "0.4"
|
||||
tempfile = "3.10"
|
||||
async-trait = "0.1"
|
||||
|
||||
@@ -7,8 +7,13 @@ impl RpcHandler {
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
let orchestrator = self.orchestrator.as_ref().ok_or_else(|| {
|
||||
anyhow::anyhow!("Container orchestrator not available (dev mode required)")
|
||||
// The `container-install { manifest_path }` RPC is a dev-mode convenience
|
||||
// that points at an arbitrary YAML on disk. Production install happens via
|
||||
// the reconciler (BootReconciler, Step 5) and via the unified
|
||||
// ContainerOrchestrator::install(app_id) trait call, which can be exposed
|
||||
// through a separate `container-install-by-id` RPC when needed.
|
||||
let dev = self.dev_orchestrator.as_ref().ok_or_else(|| {
|
||||
anyhow::anyhow!("container-install with manifest_path is only available in dev mode")
|
||||
})?;
|
||||
|
||||
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
|
||||
@@ -45,7 +50,7 @@ impl RpcHandler {
|
||||
let manifest: archipelago_container::AppManifest =
|
||||
serde_yaml::from_str(&manifest_content).context("Failed to parse manifest")?;
|
||||
|
||||
let container_name = orchestrator
|
||||
let container_name = dev
|
||||
.install_container(&manifest, manifest_path)
|
||||
.await
|
||||
.context("Failed to install container")?;
|
||||
@@ -57,9 +62,10 @@ impl RpcHandler {
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
let orchestrator = self.orchestrator.as_ref().ok_or_else(|| {
|
||||
anyhow::anyhow!("Container orchestrator not available (dev mode required)")
|
||||
})?;
|
||||
let orchestrator = self
|
||||
.orchestrator
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Container orchestrator not available"))?;
|
||||
|
||||
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
|
||||
let app_id = params
|
||||
@@ -69,7 +75,7 @@ impl RpcHandler {
|
||||
validate_app_id(app_id)?;
|
||||
|
||||
orchestrator
|
||||
.start_container(app_id)
|
||||
.start(app_id)
|
||||
.await
|
||||
.context("Failed to start container")?;
|
||||
|
||||
@@ -80,9 +86,10 @@ impl RpcHandler {
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
let orchestrator = self.orchestrator.as_ref().ok_or_else(|| {
|
||||
anyhow::anyhow!("Container orchestrator not available (dev mode required)")
|
||||
})?;
|
||||
let orchestrator = self
|
||||
.orchestrator
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Container orchestrator not available"))?;
|
||||
|
||||
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
|
||||
let app_id = params
|
||||
@@ -92,7 +99,7 @@ impl RpcHandler {
|
||||
validate_app_id(app_id)?;
|
||||
|
||||
orchestrator
|
||||
.stop_container(app_id)
|
||||
.stop(app_id)
|
||||
.await
|
||||
.context("Failed to stop container")?;
|
||||
|
||||
@@ -103,9 +110,10 @@ impl RpcHandler {
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
let orchestrator = self.orchestrator.as_ref().ok_or_else(|| {
|
||||
anyhow::anyhow!("Container orchestrator not available (dev mode required)")
|
||||
})?;
|
||||
let orchestrator = self
|
||||
.orchestrator
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Container orchestrator not available"))?;
|
||||
|
||||
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
|
||||
let app_id = params
|
||||
@@ -119,7 +127,7 @@ impl RpcHandler {
|
||||
.unwrap_or(false);
|
||||
|
||||
orchestrator
|
||||
.remove_container(app_id, preserve_data)
|
||||
.remove(app_id, preserve_data)
|
||||
.await
|
||||
.context("Failed to remove container")?;
|
||||
|
||||
@@ -163,9 +171,9 @@ impl RpcHandler {
|
||||
return Ok(serde_json::json!(containers));
|
||||
}
|
||||
|
||||
// Fallback: scanner hasn't run yet, query podman directly
|
||||
// Fallback: scanner hasn't run yet, query the orchestrator directly.
|
||||
if let Some(orchestrator) = &self.orchestrator {
|
||||
if let Ok(containers) = orchestrator.list_containers().await {
|
||||
if let Ok(containers) = orchestrator.list().await {
|
||||
if !containers.is_empty() {
|
||||
return Ok(serde_json::to_value(containers)?);
|
||||
}
|
||||
@@ -242,9 +250,10 @@ impl RpcHandler {
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
let orchestrator = self.orchestrator.as_ref().ok_or_else(|| {
|
||||
anyhow::anyhow!("Container orchestrator not available (dev mode required)")
|
||||
})?;
|
||||
let orchestrator = self
|
||||
.orchestrator
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Container orchestrator not available"))?;
|
||||
|
||||
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
|
||||
let app_id = params
|
||||
@@ -254,7 +263,7 @@ impl RpcHandler {
|
||||
validate_app_id(app_id)?;
|
||||
|
||||
let status = orchestrator
|
||||
.get_container_status(app_id)
|
||||
.status(app_id)
|
||||
.await
|
||||
.context("Failed to get container status")?;
|
||||
|
||||
@@ -265,9 +274,10 @@ impl RpcHandler {
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
let orchestrator = self.orchestrator.as_ref().ok_or_else(|| {
|
||||
anyhow::anyhow!("Container orchestrator not available (dev mode required)")
|
||||
})?;
|
||||
let orchestrator = self
|
||||
.orchestrator
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Container orchestrator not available"))?;
|
||||
|
||||
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
|
||||
let app_id = params
|
||||
@@ -278,7 +288,7 @@ impl RpcHandler {
|
||||
let lines = params.get("lines").and_then(|v| v.as_u64()).unwrap_or(100) as u32;
|
||||
|
||||
let logs = orchestrator
|
||||
.get_container_logs(app_id, lines)
|
||||
.logs(app_id, lines)
|
||||
.await
|
||||
.context("Failed to get container logs")?;
|
||||
|
||||
@@ -291,12 +301,13 @@ impl RpcHandler {
|
||||
app_id: &str,
|
||||
lines: u32,
|
||||
) -> Result<serde_json::Value> {
|
||||
let orchestrator = self.orchestrator.as_ref().ok_or_else(|| {
|
||||
anyhow::anyhow!("Container orchestrator not available (dev mode required)")
|
||||
})?;
|
||||
let orchestrator = self
|
||||
.orchestrator
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Container orchestrator not available"))?;
|
||||
|
||||
let logs = orchestrator
|
||||
.get_container_logs(app_id, lines)
|
||||
.logs(app_id, lines)
|
||||
.await
|
||||
.context("Failed to get container logs")?;
|
||||
|
||||
@@ -307,43 +318,52 @@ impl RpcHandler {
|
||||
&self,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value> {
|
||||
let orchestrator = self.orchestrator.as_ref().ok_or_else(|| {
|
||||
anyhow::anyhow!("Container orchestrator not available (dev mode required)")
|
||||
})?;
|
||||
let orchestrator = self
|
||||
.orchestrator
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Container orchestrator not available"))?;
|
||||
|
||||
// If app_id is provided, get health for that app
|
||||
// If app_id is provided, get health for that app.
|
||||
if let Some(params) = params {
|
||||
if let Some(app_id) = params.get("app_id").and_then(|v| v.as_str()) {
|
||||
let health = orchestrator
|
||||
.get_health_status(app_id)
|
||||
.health(app_id)
|
||||
.await
|
||||
.context("Failed to get container health")?;
|
||||
return Ok(serde_json::json!({ app_id: health }));
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise, get health for all containers
|
||||
// Otherwise, get health for all containers.
|
||||
let containers = orchestrator
|
||||
.list_containers()
|
||||
.list()
|
||||
.await
|
||||
.context("Failed to list containers")?;
|
||||
|
||||
let mut health_map = serde_json::Map::new();
|
||||
for container in containers {
|
||||
if let Some(app_id) = container.name.strip_prefix("archipelago-") {
|
||||
if let Some(app_id) = app_id.strip_suffix("-dev") {
|
||||
match orchestrator.get_health_status(app_id).await {
|
||||
Ok(health) => {
|
||||
health_map
|
||||
.insert(app_id.to_string(), serde_json::Value::String(health));
|
||||
}
|
||||
Err(_) => {
|
||||
health_map.insert(
|
||||
app_id.to_string(),
|
||||
serde_json::Value::String("unknown".to_string()),
|
||||
);
|
||||
}
|
||||
}
|
||||
// Map the runtime container name back to the app_id the orchestrator
|
||||
// knows about. Dev orchestrator uses `archipelago-<id>-dev`; Prod
|
||||
// uses bare `<id>` (or `archy-<id>` for UIs — health() accepts the
|
||||
// app_id either way since UI_APP_IDS is centralised).
|
||||
let app_id_candidate = container
|
||||
.name
|
||||
.strip_prefix("archipelago-")
|
||||
.and_then(|s| s.strip_suffix("-dev"))
|
||||
.or_else(|| container.name.strip_prefix("archy-"))
|
||||
.unwrap_or(container.name.as_str());
|
||||
match orchestrator.health(app_id_candidate).await {
|
||||
Ok(health) => {
|
||||
health_map.insert(
|
||||
app_id_candidate.to_string(),
|
||||
serde_json::Value::String(health),
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
health_map.insert(
|
||||
app_id_candidate.to_string(),
|
||||
serde_json::Value::String("unknown".to_string()),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ mod webhooks;
|
||||
|
||||
use crate::auth::AuthManager;
|
||||
use crate::config::Config;
|
||||
use crate::container::DevContainerOrchestrator;
|
||||
use crate::container::{ContainerOrchestrator, DevContainerOrchestrator, ProdContainerOrchestrator};
|
||||
use crate::monitoring::MetricsStore;
|
||||
use crate::port_allocator::PortAllocator;
|
||||
use crate::rate_limit::{EndpointRateLimiter, LoginRateLimiter};
|
||||
@@ -62,7 +62,14 @@ pub(crate) const DEV_DEFAULT_PASSWORD: &str = "password123";
|
||||
pub struct RpcHandler {
|
||||
config: Config,
|
||||
auth_manager: AuthManager,
|
||||
orchestrator: Option<Arc<DevContainerOrchestrator>>,
|
||||
/// Shared lifecycle orchestrator (Dev or Prod). Always `Some` in a normal
|
||||
/// build — the only reason it is `Option` is so tests that don't exercise
|
||||
/// container RPCs can skip constructing one.
|
||||
orchestrator: Option<Arc<dyn ContainerOrchestrator>>,
|
||||
/// Concrete handle to the dev orchestrator, when we're in dev mode. Used by
|
||||
/// `container-install { manifest_path }` which takes an ad-hoc manifest
|
||||
/// path and is not part of the shared trait.
|
||||
dev_orchestrator: Option<Arc<DevContainerOrchestrator>>,
|
||||
state_manager: Arc<StateManager>,
|
||||
pub(crate) metrics_store: Arc<MetricsStore>,
|
||||
port_allocator: Arc<tokio::sync::Mutex<PortAllocator>>,
|
||||
@@ -89,12 +96,22 @@ impl RpcHandler {
|
||||
session_store: SessionStore,
|
||||
) -> Result<Self> {
|
||||
let auth_manager = AuthManager::new(config.data_dir.clone());
|
||||
let orchestrator = if config.dev_mode {
|
||||
Some(Arc::new(
|
||||
DevContainerOrchestrator::new(config.clone()).await?,
|
||||
))
|
||||
let (orchestrator, dev_orchestrator): (
|
||||
Option<Arc<dyn ContainerOrchestrator>>,
|
||||
Option<Arc<DevContainerOrchestrator>>,
|
||||
) = if config.dev_mode {
|
||||
let dev = Arc::new(DevContainerOrchestrator::new(config.clone()).await?);
|
||||
let trait_obj: Arc<dyn ContainerOrchestrator> = dev.clone();
|
||||
(Some(trait_obj), Some(dev))
|
||||
} else {
|
||||
None
|
||||
let prod = Arc::new(ProdContainerOrchestrator::new(config.clone()).await?);
|
||||
// Best-effort manifest load; a missing /opt/archipelago/apps is
|
||||
// logged by load_manifests and not fatal.
|
||||
if let Err(e) = prod.load_manifests().await {
|
||||
tracing::error!(error = %e, "prod orchestrator: load_manifests failed at startup");
|
||||
}
|
||||
let trait_obj: Arc<dyn ContainerOrchestrator> = prod;
|
||||
(Some(trait_obj), None)
|
||||
};
|
||||
let port_allocator = Arc::new(tokio::sync::Mutex::new(
|
||||
PortAllocator::new(&config.data_dir).await?,
|
||||
@@ -129,6 +146,7 @@ impl RpcHandler {
|
||||
config,
|
||||
auth_manager,
|
||||
orchestrator,
|
||||
dev_orchestrator,
|
||||
state_manager,
|
||||
metrics_store,
|
||||
port_allocator,
|
||||
|
||||
@@ -3,10 +3,12 @@ use archipelago_container::{
|
||||
AppManifest, BitcoinSimulationMode, BitcoinSimulator,
|
||||
ContainerRuntime as ContainerRuntimeTrait, ContainerStatus, PortManager, ResolvedSource,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::config::{BitcoinSimulation, Config, ContainerRuntime};
|
||||
use crate::container::data_manager::DevDataManager;
|
||||
use crate::container::traits::ContainerOrchestrator;
|
||||
|
||||
pub struct DevContainerOrchestrator {
|
||||
runtime: Arc<dyn ContainerRuntimeTrait>,
|
||||
@@ -258,4 +260,82 @@ impl DevContainerOrchestrator {
|
||||
archipelago_container::ContainerState::Unknown(_) => Ok("unknown".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Load a manifest for `app_id` from the dev-mode apps directory.
|
||||
///
|
||||
/// Used by the trait-level `install(app_id)` entry point. Looks under
|
||||
/// `<data_dir>/apps/<app_id>/manifest.yml`.
|
||||
async fn load_manifest_for(&self, app_id: &str) -> Result<AppManifest> {
|
||||
let path = self
|
||||
.config
|
||||
.data_dir
|
||||
.join("apps")
|
||||
.join(app_id)
|
||||
.join("manifest.yml");
|
||||
let content = tokio::fs::read_to_string(&path)
|
||||
.await
|
||||
.with_context(|| format!("reading manifest {}", path.display()))?;
|
||||
let manifest: AppManifest = serde_yaml::from_str(&content)
|
||||
.with_context(|| format!("parsing manifest {}", path.display()))?;
|
||||
Ok(manifest)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Trait impl (Step 4): expose the shared ContainerOrchestrator surface.
|
||||
// Forwards to the inherent methods, which internally apply the `-dev` suffix
|
||||
// and the port offset. The trait keeps the RPC layer mode-agnostic; Dev's
|
||||
// install_container (manifest_path-based) stays as an inherent method for the
|
||||
// ad-hoc dev-mode RPC and is not exposed on the trait.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[async_trait]
|
||||
impl ContainerOrchestrator for DevContainerOrchestrator {
|
||||
async fn install(&self, app_id: &str) -> Result<String> {
|
||||
let manifest = self.load_manifest_for(app_id).await?;
|
||||
let name = self.install_container(&manifest, "").await?;
|
||||
Ok(name)
|
||||
}
|
||||
|
||||
async fn start(&self, app_id: &str) -> Result<()> {
|
||||
self.start_container(app_id).await
|
||||
}
|
||||
|
||||
async fn stop(&self, app_id: &str) -> Result<()> {
|
||||
self.stop_container(app_id).await
|
||||
}
|
||||
|
||||
async fn restart(&self, app_id: &str) -> Result<()> {
|
||||
let _ = self.stop_container(app_id).await;
|
||||
self.start_container(app_id).await
|
||||
}
|
||||
|
||||
async fn remove(&self, app_id: &str, preserve_data: bool) -> Result<()> {
|
||||
self.remove_container(app_id, preserve_data).await
|
||||
}
|
||||
|
||||
async fn upgrade(&self, app_id: &str) -> Result<()> {
|
||||
// Dev upgrade: stop, remove (preserving data), re-install from the loaded manifest.
|
||||
let _ = self.stop_container(app_id).await;
|
||||
let _ = self.remove_container(app_id, true).await;
|
||||
let manifest = self.load_manifest_for(app_id).await?;
|
||||
self.install_container(&manifest, "").await?;
|
||||
self.start_container(app_id).await
|
||||
}
|
||||
|
||||
async fn status(&self, app_id: &str) -> Result<ContainerStatus> {
|
||||
self.get_container_status(app_id).await
|
||||
}
|
||||
|
||||
async fn list(&self) -> Result<Vec<ContainerStatus>> {
|
||||
self.list_containers().await
|
||||
}
|
||||
|
||||
async fn logs(&self, app_id: &str, lines: u32) -> Result<Vec<String>> {
|
||||
self.get_container_logs(app_id, lines).await
|
||||
}
|
||||
|
||||
async fn health(&self, app_id: &str) -> Result<String> {
|
||||
self.get_health_status(app_id).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ pub mod docker_packages;
|
||||
pub mod image_versions;
|
||||
pub mod prod_orchestrator;
|
||||
pub mod registry;
|
||||
pub mod traits;
|
||||
|
||||
pub use dev_orchestrator::DevContainerOrchestrator;
|
||||
pub use docker_packages::DockerPackageScanner;
|
||||
@@ -11,3 +12,4 @@ pub use prod_orchestrator::{
|
||||
compute_container_name, AdoptionReport, ProdContainerOrchestrator, ReconcileAction,
|
||||
ReconcileReport,
|
||||
};
|
||||
pub use traits::ContainerOrchestrator;
|
||||
|
||||
@@ -28,12 +28,14 @@ use archipelago_container::{
|
||||
AppManifest, ContainerRuntime as ContainerRuntimeTrait, ContainerState, ContainerStatus,
|
||||
ResolvedSource,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use crate::config::{Config, ContainerRuntime as ConfigContainerRuntime};
|
||||
use crate::container::traits::ContainerOrchestrator;
|
||||
|
||||
/// App IDs whose containers are named `archy-<id>` rather than bare `<id>`.
|
||||
///
|
||||
@@ -389,10 +391,21 @@ impl ProdContainerOrchestrator {
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Public imperative API (RPC surface). Mirrors DevContainerOrchestrator.
|
||||
// Prod-specific inherent methods. The shared lifecycle surface
|
||||
// (install/start/stop/restart/remove/upgrade/status/list/logs/health) lives
|
||||
// in the `impl ContainerOrchestrator for ProdContainerOrchestrator` block
|
||||
// below — call those through the trait, not as inherent methods.
|
||||
// ------------------------------------------------------------------
|
||||
}
|
||||
|
||||
pub async fn install(&self, app_id: &str) -> Result<String> {
|
||||
// ---------------------------------------------------------------------------
|
||||
// Trait impl (Step 4): the shared ContainerOrchestrator surface.
|
||||
// This IS the public RPC-facing API; there are no duplicate inherent methods.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[async_trait]
|
||||
impl ContainerOrchestrator for ProdContainerOrchestrator {
|
||||
async fn install(&self, app_id: &str) -> Result<String> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let lock = self.app_lock(app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
@@ -400,7 +413,7 @@ impl ProdContainerOrchestrator {
|
||||
Ok(compute_container_name(&lm.manifest))
|
||||
}
|
||||
|
||||
pub async fn start(&self, app_id: &str) -> Result<()> {
|
||||
async fn start(&self, app_id: &str) -> Result<()> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let lock = self.app_lock(app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
@@ -411,7 +424,7 @@ impl ProdContainerOrchestrator {
|
||||
.with_context(|| format!("start_container {name}"))
|
||||
}
|
||||
|
||||
pub async fn stop(&self, app_id: &str) -> Result<()> {
|
||||
async fn stop(&self, app_id: &str) -> Result<()> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let lock = self.app_lock(app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
@@ -422,7 +435,7 @@ impl ProdContainerOrchestrator {
|
||||
.with_context(|| format!("stop_container {name}"))
|
||||
}
|
||||
|
||||
pub async fn restart(&self, app_id: &str) -> Result<()> {
|
||||
async fn restart(&self, app_id: &str) -> Result<()> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let lock = self.app_lock(app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
@@ -438,7 +451,7 @@ impl ProdContainerOrchestrator {
|
||||
/// Remove the container. `preserve_data=true` is honored by NOT touching volumes
|
||||
/// here (production volumes live under `/var/lib/archipelago` — removal is a
|
||||
/// separate operation owned by the data layer, not this orchestrator).
|
||||
pub async fn remove(&self, app_id: &str, _preserve_data: bool) -> Result<()> {
|
||||
async fn remove(&self, app_id: &str, _preserve_data: bool) -> Result<()> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let lock = self.app_lock(app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
@@ -450,9 +463,8 @@ impl ProdContainerOrchestrator {
|
||||
.with_context(|| format!("remove_container {name}"))
|
||||
}
|
||||
|
||||
/// Upgrade: remove the existing container, re-install fresh (which will re-pull
|
||||
/// or rebuild as required).
|
||||
pub async fn upgrade(&self, app_id: &str) -> Result<()> {
|
||||
/// Upgrade: stop-remove-reinstall (re-pulls or rebuilds as required).
|
||||
async fn upgrade(&self, app_id: &str) -> Result<()> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let lock = self.app_lock(app_id).await;
|
||||
let _guard = lock.lock().await;
|
||||
@@ -462,13 +474,13 @@ impl ProdContainerOrchestrator {
|
||||
self.install_fresh(&lm).await
|
||||
}
|
||||
|
||||
pub async fn status(&self, app_id: &str) -> Result<ContainerStatus> {
|
||||
async fn status(&self, app_id: &str) -> Result<ContainerStatus> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let name = compute_container_name(&lm.manifest);
|
||||
self.runtime.get_container_status(&name).await
|
||||
}
|
||||
|
||||
pub async fn list(&self) -> Result<Vec<ContainerStatus>> {
|
||||
async fn list(&self) -> Result<Vec<ContainerStatus>> {
|
||||
// Intersect runtime containers with our known manifests' expected names.
|
||||
let all = self.runtime.list_containers().await?;
|
||||
let state = self.state.read().await;
|
||||
@@ -489,14 +501,14 @@ impl ProdContainerOrchestrator {
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn logs(&self, app_id: &str, lines: u32) -> Result<Vec<String>> {
|
||||
async fn logs(&self, app_id: &str, lines: u32) -> Result<Vec<String>> {
|
||||
let lm = self.loaded(app_id).await?;
|
||||
let name = compute_container_name(&lm.manifest);
|
||||
self.runtime.get_container_logs(&name, lines).await
|
||||
}
|
||||
|
||||
pub async fn health(&self, app_id: &str) -> Result<String> {
|
||||
let status = self.status(app_id).await?;
|
||||
async fn health(&self, app_id: &str) -> Result<String> {
|
||||
let status = <Self as ContainerOrchestrator>::status(self, app_id).await?;
|
||||
Ok(match status.state {
|
||||
ContainerState::Running => "healthy".to_string(),
|
||||
ContainerState::Stopped | ContainerState::Exited => "unhealthy".to_string(),
|
||||
|
||||
56
core/archipelago/src/container/traits.rs
Normal file
56
core/archipelago/src/container/traits.rs
Normal file
@@ -0,0 +1,56 @@
|
||||
//! Orchestrator trait — the shared surface the RPC layer talks to.
|
||||
//!
|
||||
//! Step 4 of the rust-orchestrator migration. Unifies the container lifecycle
|
||||
//! surface of `DevContainerOrchestrator` and `ProdContainerOrchestrator` so
|
||||
//! `RpcHandler` can hold `Arc<dyn ContainerOrchestrator>` and stop caring
|
||||
//! which mode it is in.
|
||||
//!
|
||||
//! The trait takes `app_id: &str` everywhere (never a manifest path). Dev and
|
||||
//! Prod both resolve app_id → manifest internally. The legacy
|
||||
//! `container-install { manifest_path }` RPC shape is preserved as a concrete
|
||||
//! `install_container_from_path` method on `DevContainerOrchestrator` only,
|
||||
//! since that ad-hoc workflow is a dev convenience and has no prod meaning.
|
||||
//!
|
||||
//! See `docs/rust-orchestrator-migration.md`.
|
||||
|
||||
use anyhow::Result;
|
||||
use archipelago_container::ContainerStatus;
|
||||
use async_trait::async_trait;
|
||||
|
||||
/// Lifecycle + query operations every orchestrator exposes to the RPC layer.
|
||||
#[async_trait]
|
||||
pub trait ContainerOrchestrator: Send + Sync {
|
||||
/// Build-or-pull the image, create the container, and start it. Returns the
|
||||
/// podman container name that was created. Assumes the app_id corresponds
|
||||
/// to a manifest the orchestrator already knows about.
|
||||
async fn install(&self, app_id: &str) -> Result<String>;
|
||||
|
||||
/// Start an already-created container.
|
||||
async fn start(&self, app_id: &str) -> Result<()>;
|
||||
|
||||
/// Stop a running container. No-op on Prod if already stopped.
|
||||
async fn stop(&self, app_id: &str) -> Result<()>;
|
||||
|
||||
/// Stop-then-start. Best-effort: ignores stop failure.
|
||||
async fn restart(&self, app_id: &str) -> Result<()>;
|
||||
|
||||
/// Remove the container. `preserve_data = true` keeps the volumes; `false`
|
||||
/// is honored on a best-effort basis (Dev cleans, Prod leaves the volume
|
||||
/// management to the data layer).
|
||||
async fn remove(&self, app_id: &str, preserve_data: bool) -> Result<()>;
|
||||
|
||||
/// Pull/rebuild the image and recreate the container from scratch.
|
||||
async fn upgrade(&self, app_id: &str) -> Result<()>;
|
||||
|
||||
/// Current state of a single container.
|
||||
async fn status(&self, app_id: &str) -> Result<ContainerStatus>;
|
||||
|
||||
/// All containers this orchestrator knows about.
|
||||
async fn list(&self) -> Result<Vec<ContainerStatus>>;
|
||||
|
||||
/// Tail the container's stdout+stderr.
|
||||
async fn logs(&self, app_id: &str, lines: u32) -> Result<Vec<String>>;
|
||||
|
||||
/// Coarse health summary: "healthy", "unhealthy", "starting", "paused", "unknown".
|
||||
async fn health(&self, app_id: &str) -> Result<String>;
|
||||
}
|
||||
Reference in New Issue
Block a user