mod process_lines_actions;
mod sandbox;
pub use process_lines_actions::ProcessLinesActions;
pub use sandbox::*;
use crate::native;
use crate::workspace::Workspace;
use futures_util::{
future::{self, FutureExt},
stream::{self, TryStreamExt},
};
use log::{error, info};
use process_lines_actions::InnerState;
use std::convert::AsRef;
use std::env::consts::EXE_SUFFIX;
use std::ffi::{OsStr, OsString};
use std::path::{Path, PathBuf};
use std::process::{ExitStatus, Stdio};
use std::time::{Duration, Instant};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command as AsyncCommand,
runtime::Runtime,
stream::StreamExt,
time,
};
lazy_static::lazy_static! {
pub(super) static ref RUNTIME: Runtime = Runtime::new().expect("Failed to construct tokio runtime");
}
pub(crate) mod container_dirs {
use lazy_static::lazy_static;
use std::path::{Path, PathBuf};
#[cfg(windows)]
lazy_static! {
pub(super) static ref ROOT_DIR: PathBuf = Path::new(r"C:\rustwide").into();
}
#[cfg(not(windows))]
lazy_static! {
pub(super) static ref ROOT_DIR: PathBuf = Path::new("/opt/rustwide").into();
}
lazy_static! {
pub(crate) static ref WORK_DIR: PathBuf = ROOT_DIR.join("workdir");
pub(crate) static ref TARGET_DIR: PathBuf = ROOT_DIR.join("target");
pub(super) static ref CARGO_HOME: PathBuf = ROOT_DIR.join("cargo-home");
pub(super) static ref RUSTUP_HOME: PathBuf = ROOT_DIR.join("rustup-home");
pub(super) static ref CARGO_BIN_DIR: PathBuf = CARGO_HOME.join("bin");
}
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum CommandError {
#[error("no output for {0} seconds")]
NoOutputFor(u64),
#[error("command timed out after {0} seconds")]
Timeout(u64),
#[error("command failed: {0}")]
ExecutionFailed(ExitStatus),
#[error("{0}")]
KillAfterTimeoutFailed(#[source] KillFailedError),
#[error("container ran out of memory")]
SandboxOOM,
#[error("failed to pull the sandbox image from the registry: {0}")]
SandboxImagePullFailed(#[source] Box<CommandError>),
#[error("sandbox image missing from the local system: {0}")]
SandboxImageMissing(#[source] Box<CommandError>),
#[error("the workspace is not mounted from outside the container")]
WorkspaceNotMountedCorrectly,
#[error("invalid output of `docker inspect`: {0}")]
InvalidDockerInspectOutput(#[source] serde_json::Error),
#[error(transparent)]
IO(#[from] std::io::Error),
}
#[derive(Debug, thiserror::Error)]
#[cfg_attr(unix, error(
"failed to kill the process with PID {pid}{}",
.errno.map(|e| format!(": {}", e.desc())).unwrap_or_else(String::new)
))]
#[cfg_attr(not(unix), error("failed to kill the process with PID {pid}"))]
pub struct KillFailedError {
pub(crate) pid: u32,
#[cfg(unix)]
pub(crate) errno: Option<nix::errno::Errno>,
}
impl KillFailedError {
pub fn pid(&self) -> u32 {
self.pid
}
#[cfg(any(unix, doc))]
#[cfg_attr(docs_rs, doc(cfg(unix)))]
pub fn errno(&self) -> Option<i32> {
self.errno.map(|errno| errno as i32)
}
}
#[non_exhaustive]
pub enum Binary {
Global(PathBuf),
ManagedByRustwide(PathBuf),
}
pub trait Runnable {
fn name(&self) -> Binary;
fn prepare_command<'w, 'pl>(&self, cmd: Command<'w, 'pl>) -> Command<'w, 'pl> {
cmd
}
}
impl<'a> Runnable for &'a str {
fn name(&self) -> Binary {
Binary::Global(self.into())
}
}
impl Runnable for String {
fn name(&self) -> Binary {
Binary::Global(self.into())
}
}
impl<'a, B: Runnable> Runnable for &'a B {
fn name(&self) -> Binary {
Runnable::name(*self)
}
fn prepare_command<'w, 'pl>(&self, cmd: Command<'w, 'pl>) -> Command<'w, 'pl> {
Runnable::prepare_command(*self, cmd)
}
}
pub struct Command<'w, 'pl> {
workspace: Option<&'w Workspace>,
sandbox: Option<SandboxBuilder>,
binary: Binary,
args: Vec<OsString>,
env: Vec<(OsString, OsString)>,
process_lines: Option<&'pl mut dyn FnMut(&str, &mut ProcessLinesActions)>,
cd: Option<PathBuf>,
timeout: Option<Duration>,
no_output_timeout: Option<Duration>,
log_command: bool,
log_output: bool,
}
impl<'w, 'pl> Command<'w, 'pl> {
pub fn new<R: Runnable>(workspace: &'w Workspace, binary: R) -> Self {
binary.prepare_command(Self::new_inner(binary.name(), Some(workspace), None))
}
pub fn new_sandboxed<R: Runnable>(
workspace: &'w Workspace,
sandbox: SandboxBuilder,
binary: R,
) -> Self {
binary.prepare_command(Self::new_inner(
binary.name(),
Some(workspace),
Some(sandbox),
))
}
pub(crate) fn new_workspaceless<R: Runnable>(binary: R) -> Self {
binary.prepare_command(Self::new_inner(binary.name(), None, None))
}
fn new_inner(
binary: Binary,
workspace: Option<&'w Workspace>,
sandbox: Option<SandboxBuilder>,
) -> Self {
let (timeout, no_output_timeout) = if let Some(workspace) = workspace {
(
workspace.default_command_timeout(),
workspace.default_command_no_output_timeout(),
)
} else {
(None, None)
};
Command {
workspace,
sandbox,
binary,
args: Vec::new(),
env: Vec::new(),
process_lines: None,
cd: None,
timeout,
no_output_timeout,
log_output: true,
log_command: true,
}
}
pub fn args<S: AsRef<OsStr>>(mut self, args: &[S]) -> Self {
for arg in args {
self.args.push(arg.as_ref().to_os_string());
}
self
}
pub fn env<S1: AsRef<OsStr>, S2: AsRef<OsStr>>(mut self, key: S1, value: S2) -> Self {
self.env
.push((key.as_ref().to_os_string(), value.as_ref().to_os_string()));
self
}
pub fn cd<P: AsRef<Path>>(mut self, path: P) -> Self {
self.cd = Some(path.as_ref().to_path_buf());
self
}
pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}
pub fn no_output_timeout(mut self, timeout: Option<Duration>) -> Self {
self.no_output_timeout = timeout;
self
}
pub fn process_lines(mut self, f: &'pl mut dyn FnMut(&str, &mut ProcessLinesActions)) -> Self {
self.process_lines = Some(f);
self
}
pub fn log_output(mut self, log_output: bool) -> Self {
self.log_output = log_output;
self
}
pub fn log_command(mut self, log_command: bool) -> Self {
self.log_command = log_command;
self
}
pub fn run(self) -> Result<(), CommandError> {
self.run_inner(false)?;
Ok(())
}
pub fn run_capture(self) -> Result<ProcessOutput, CommandError> {
Ok(self.run_inner(true)?)
}
fn run_inner(self, capture: bool) -> Result<ProcessOutput, CommandError> {
if let Some(mut builder) = self.sandbox {
let workspace = self
.workspace
.expect("sandboxed builds without a workspace are not supported");
let binary = match self.binary {
Binary::Global(path) => path,
Binary::ManagedByRustwide(path) => {
container_dirs::CARGO_BIN_DIR.join(exe_suffix(path.as_os_str()))
}
};
let mut cmd = Vec::new();
cmd.push(binary.to_string_lossy().as_ref().to_string());
for arg in self.args {
cmd.push(arg.to_string_lossy().to_string());
}
let source_dir = match self.cd {
Some(path) => path,
None => PathBuf::from("."),
};
builder = builder
.mount(&source_dir, &*container_dirs::WORK_DIR, MountKind::ReadOnly)
.env("SOURCE_DIR", container_dirs::WORK_DIR.to_str().unwrap())
.workdir(container_dirs::WORK_DIR.to_str().unwrap())
.cmd(cmd);
if let Some(user) = native::current_user() {
builder = builder.user(user.user_id, user.group_id);
}
for (key, value) in self.env {
builder = builder.env(
key.to_string_lossy().as_ref(),
value.to_string_lossy().as_ref(),
);
}
builder = builder
.mount(
&workspace.cargo_home(),
&*container_dirs::CARGO_HOME,
MountKind::ReadOnly,
)
.mount(
&workspace.rustup_home(),
&*container_dirs::RUSTUP_HOME,
MountKind::ReadOnly,
)
.env("CARGO_HOME", container_dirs::CARGO_HOME.to_str().unwrap())
.env("RUSTUP_HOME", container_dirs::RUSTUP_HOME.to_str().unwrap());
builder.run(
workspace,
self.timeout,
self.no_output_timeout,
self.process_lines,
self.log_output,
self.log_command,
capture,
)
} else {
let (binary, managed_by_rustwide) = match self.binary {
Binary::Global(path) => (path, false),
Binary::ManagedByRustwide(path) => {
let binary = self
.workspace
.expect("calling rustwide bins without a workspace is not supported")
.cargo_home()
.join("bin")
.join(exe_suffix(path.as_os_str()));
(crate::utils::normalize_path(&binary), true)
}
};
let mut cmd = AsyncCommand::new(&binary);
cmd.args(&self.args);
if managed_by_rustwide {
let workspace = self
.workspace
.expect("calling rustwide bins without a workspace is not supported");
let cargo_home = workspace
.cargo_home()
.to_str()
.expect("bad cargo home")
.to_string();
let rustup_home = workspace
.rustup_home()
.to_str()
.expect("bad rustup home")
.to_string();
cmd.env(
"CARGO_HOME",
crate::utils::normalize_path(cargo_home.as_ref()),
);
cmd.env(
"RUSTUP_HOME",
crate::utils::normalize_path(rustup_home.as_ref()),
);
}
for &(ref k, ref v) in &self.env {
cmd.env(k, v);
}
let cmdstr = format!("{:?}", cmd);
if let Some(ref cd) = self.cd {
cmd.current_dir(cd);
}
if self.log_command {
info!("running `{}`", cmdstr);
}
let out = RUNTIME
.handle()
.block_on(log_command(
cmd,
self.process_lines,
capture,
self.timeout,
self.no_output_timeout,
self.log_output,
))
.map_err(|e| {
error!("error running command: {}", e);
e
})?;
if out.status.success() {
Ok(out.into())
} else {
Err(CommandError::ExecutionFailed(out.status))
}
}
}
}
struct InnerProcessOutput {
status: ExitStatus,
stdout: Vec<String>,
stderr: Vec<String>,
}
impl From<InnerProcessOutput> for ProcessOutput {
fn from(orig: InnerProcessOutput) -> ProcessOutput {
ProcessOutput {
stdout: orig.stdout,
stderr: orig.stderr,
}
}
}
pub struct ProcessOutput {
stdout: Vec<String>,
stderr: Vec<String>,
}
impl ProcessOutput {
pub fn stdout_lines(&self) -> &[String] {
&self.stdout
}
pub fn stderr_lines(&self) -> &[String] {
&self.stderr
}
}
enum OutputKind {
Stdout,
Stderr,
}
impl OutputKind {
fn prefix(&self) -> &'static str {
match *self {
OutputKind::Stdout => "stdout",
OutputKind::Stderr => "stderr",
}
}
}
async fn log_command(
mut cmd: AsyncCommand,
mut process_lines: Option<&mut dyn FnMut(&str, &mut ProcessLinesActions)>,
capture: bool,
timeout: Option<Duration>,
no_output_timeout: Option<Duration>,
log_output: bool,
) -> Result<InnerProcessOutput, CommandError> {
let timeout = if let Some(t) = timeout {
t
} else {
Duration::from_secs(7 * 24 * 60 * 60)
};
let no_output_timeout = if let Some(t) = no_output_timeout {
t
} else {
timeout
};
let mut child = cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).spawn()?;
let child_id = child.id();
let stdout = BufReader::new(child.stdout.take().unwrap())
.lines()
.map(|line| (OutputKind::Stdout, line));
let stderr = BufReader::new(child.stderr.take().unwrap())
.lines()
.map(|line| (OutputKind::Stderr, line));
let start = Instant::now();
let mut actions = ProcessLinesActions::new();
let output = stream::select(stdout, stderr)
.timeout(no_output_timeout)
.map(move |result| match result {
Err(_timeout) => Err(match native::kill_process(child_id) {
Ok(()) => CommandError::NoOutputFor(no_output_timeout.as_secs()),
Err(err) => CommandError::KillAfterTimeoutFailed(err),
}),
Ok((_, Err(read_err))) => Err(read_err.into()),
Ok((out_kind, Ok(line))) => Ok((out_kind, line)),
})
.and_then(move |(kind, line): (OutputKind, String)| {
if start.elapsed() > timeout {
return future::err(CommandError::Timeout(timeout.as_secs()));
}
if let Some(f) = &mut process_lines {
f(&line, &mut actions);
}
let lines = match actions.take_lines() {
InnerState::Removed => Vec::new(),
InnerState::Original => vec![line],
InnerState::Replaced(new_lines) => new_lines,
};
if log_output {
for line in &lines {
info!("[{}] {}", kind.prefix(), line);
}
}
future::ok((kind, lines))
})
.try_fold(
(Vec::<String>::new(), Vec::<String>::new()),
move |(mut stdout, mut stderr), (kind, mut lines)| async move {
if capture {
match kind {
OutputKind::Stdout => stdout.append(&mut lines),
OutputKind::Stderr => stderr.append(&mut lines),
}
}
Ok((stdout, stderr))
},
);
let child = time::timeout(timeout, child).map(move |result| {
match result {
Err(_timeout) => Err(match native::kill_process(child_id) {
Ok(()) => CommandError::Timeout(timeout.as_secs()),
Err(err) => CommandError::KillAfterTimeoutFailed(err),
}),
Ok(Err(err)) => Err(err.into()),
Ok(Ok(exit_status)) => Ok(exit_status),
}
});
let ((stdout, stderr), status) = {
let (output, child) = future::join(output, child).await;
let (stdout, stderr) = output?;
((stdout, stderr), child?)
};
Ok(InnerProcessOutput {
status,
stdout,
stderr,
})
}
fn exe_suffix(file: &OsStr) -> OsString {
let mut path = OsString::from(file);
path.push(EXE_SUFFIX);
path
}