use std::cell::Cell;
use std::collections::{HashMap, HashSet};
use std::io;
use std::marker;
use std::mem;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::time::Duration;
use crossbeam_utils::thread::Scope;
use failure::format_err;
use jobserver::{Acquired, HelperThread};
use log::{debug, info, trace};
use super::context::OutputFile;
use super::job::{
Freshness::{self, Dirty, Fresh},
Job,
};
use super::standard_lib;
use super::timings::Timings;
use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
use crate::core::compiler::ProfileKind;
use crate::core::{PackageId, TargetKind};
use crate::handle_error;
use crate::util;
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder};
use crate::util::{Config, DependencyQueue};
use crate::util::{Progress, ProgressStyle};
pub struct JobQueue<'a, 'cfg> {
queue: DependencyQueue<Unit<'a>, Artifact, Job>,
tx: Sender<Message>,
rx: Receiver<Message>,
active: HashMap<u32, Unit<'a>>,
compiled: HashSet<PackageId>,
documented: HashSet<PackageId>,
counts: HashMap<PackageId, usize>,
progress: Progress<'cfg>,
next_id: u32,
timings: Timings<'a, 'cfg>,
profile_kind: ProfileKind,
}
pub struct JobState<'a> {
tx: Sender<Message>,
id: u32,
rmeta_required: Cell<bool>,
_marker: marker::PhantomData<&'a ()>,
}
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
enum Artifact {
All,
Metadata,
}
enum Message {
Run(u32, String),
BuildPlanMsg(String, ProcessBuilder, Arc<Vec<OutputFile>>),
Stdout(String),
Stderr(String),
FixDiagnostic(diagnostic_server::Message),
Token(io::Result<Acquired>),
Finish(u32, Artifact, CargoResult<()>),
}
impl<'a> JobState<'a> {
pub fn running(&self, cmd: &ProcessBuilder) {
let _ = self.tx.send(Message::Run(self.id, cmd.to_string()));
}
pub fn build_plan(
&self,
module_name: String,
cmd: ProcessBuilder,
filenames: Arc<Vec<OutputFile>>,
) {
let _ = self
.tx
.send(Message::BuildPlanMsg(module_name, cmd, filenames));
}
pub fn stdout(&self, stdout: String) {
drop(self.tx.send(Message::Stdout(stdout)));
}
pub fn stderr(&self, stderr: String) {
drop(self.tx.send(Message::Stderr(stderr)));
}
pub fn rmeta_produced(&self) {
self.rmeta_required.set(false);
let _ = self
.tx
.send(Message::Finish(self.id, Artifact::Metadata, Ok(())));
}
}
impl<'a, 'cfg> JobQueue<'a, 'cfg> {
pub fn new(bcx: &BuildContext<'a, 'cfg>, root_units: &[Unit<'a>]) -> JobQueue<'a, 'cfg> {
let (tx, rx) = channel();
let progress = Progress::with_style("Building", ProgressStyle::Ratio, bcx.config);
let timings = Timings::new(bcx, root_units);
JobQueue {
queue: DependencyQueue::new(),
tx,
rx,
active: HashMap::new(),
compiled: HashSet::new(),
documented: HashSet::new(),
counts: HashMap::new(),
progress,
next_id: 0,
timings,
profile_kind: bcx.build_config.profile_kind.clone(),
}
}
pub fn enqueue(
&mut self,
cx: &Context<'a, 'cfg>,
unit: &Unit<'a>,
job: Job,
) -> CargoResult<()> {
let dependencies = cx.unit_deps(unit);
let mut queue_deps = dependencies
.iter()
.filter(|dep| {
!dep.unit.target.is_test() && !dep.unit.target.is_bin()
})
.map(|dep| {
let artifact = if cx.only_requires_rmeta(unit, &dep.unit) {
Artifact::Metadata
} else {
Artifact::All
};
(dep.unit, artifact)
})
.collect::<HashMap<_, _>>();
if unit.requires_upstream_objects() {
for dep in dependencies {
depend_on_deps_of_deps(cx, &mut queue_deps, dep.unit);
}
fn depend_on_deps_of_deps<'a>(
cx: &Context<'a, '_>,
deps: &mut HashMap<Unit<'a>, Artifact>,
unit: Unit<'a>,
) {
for dep in cx.unit_deps(&unit) {
if deps.insert(dep.unit, Artifact::All).is_none() {
depend_on_deps_of_deps(cx, deps, dep.unit);
}
}
}
}
self.queue.queue(*unit, job, queue_deps);
*self.counts.entry(unit.pkg.package_id()).or_insert(0) += 1;
Ok(())
}
pub fn execute(&mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan) -> CargoResult<()> {
let _p = profile::start("executing the job graph");
self.queue.queue_finished();
let tx = self.tx.clone();
let helper = cx
.jobserver
.clone()
.into_helper_thread(move |token| {
drop(tx.send(Message::Token(token)));
})
.chain_err(|| "failed to create helper thread for jobserver management")?;
let tx = self.tx.clone();
let _diagnostic_server = cx
.bcx
.build_config
.rustfix_diagnostic_server
.borrow_mut()
.take()
.map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg)))));
crossbeam_utils::thread::scope(|scope| self.drain_the_queue(cx, plan, scope, &helper))
.expect("child threads shouldn't panic")
}
fn drain_the_queue(
&mut self,
cx: &mut Context<'a, '_>,
plan: &mut BuildPlan,
scope: &Scope<'a>,
jobserver_helper: &HelperThread,
) -> CargoResult<()> {
let mut tokens = Vec::new();
let mut queue = Vec::new();
let mut print = DiagnosticPrinter::new(cx.bcx.config);
trace!("queue: {:#?}", self.queue);
let mut error = None;
let total = self.queue.len();
let mut finished = 0;
loop {
while let Some((unit, job)) = self.queue.dequeue() {
queue.push((unit, job));
if self.active.len() + queue.len() > 1 {
jobserver_helper.request_token();
}
}
while error.is_none() && self.active.len() < tokens.len() + 1 && !queue.is_empty() {
let (unit, job) = queue.remove(0);
self.run(&unit, job, cx, scope)?;
}
if self.active.is_empty() {
break;
}
tokens.truncate(self.active.len() - 1);
self.timings
.mark_concurrency(self.active.len(), queue.len(), self.queue.len());
self.timings.record_cpu();
let events: Vec<_> = self.rx.try_iter().collect();
let events = if events.is_empty() {
self.show_progress(finished, total);
match self.rx.recv_timeout(Duration::from_millis(500)) {
Ok(message) => vec![message],
Err(_) => continue,
}
} else {
events
};
for event in events {
match event {
Message::Run(id, cmd) => {
cx.bcx
.config
.shell()
.verbose(|c| c.status("Running", &cmd))?;
self.timings.unit_start(id, self.active[&id]);
}
Message::BuildPlanMsg(module_name, cmd, filenames) => {
plan.update(&module_name, &cmd, &filenames)?;
}
Message::Stdout(out) => {
cx.bcx.config.shell().stdout_println(out);
}
Message::Stderr(err) => {
let mut shell = cx.bcx.config.shell();
shell.print_ansi(err.as_bytes())?;
shell.err().write_all(b"\n")?;
}
Message::FixDiagnostic(msg) => {
print.print(&msg)?;
}
Message::Finish(id, artifact, result) => {
let unit = match artifact {
Artifact::All => {
info!("end: {:?}", id);
finished += 1;
self.active.remove(&id).unwrap()
}
Artifact::Metadata => {
info!("end (meta): {:?}", id);
self.active[&id]
}
};
info!("end ({:?}): {:?}", unit, result);
match result {
Ok(()) => self.finish(id, &unit, artifact, cx)?,
Err(e) => {
let msg = "The following warnings were emitted during compilation:";
self.emit_warnings(Some(msg), &unit, cx)?;
if !self.active.is_empty() {
error = Some(failure::format_err!("build failed"));
handle_error(&e, &mut *cx.bcx.config.shell());
cx.bcx.config.shell().warn(
"build failed, waiting for other \
jobs to finish...",
)?;
} else {
error = Some(e);
}
}
}
}
Message::Token(acquired_token) => {
tokens.push(
acquired_token.chain_err(|| "failed to acquire jobserver token")?,
);
}
}
}
}
self.progress.clear();
let build_type = self.profile_kind.name();
let profile = cx.bcx.profiles.base_profile(&self.profile_kind)?;
let mut opt_type = String::from(if profile.opt_level.as_str() == "0" {
"unoptimized"
} else {
"optimized"
});
if profile.debuginfo.unwrap_or(0) != 0 {
opt_type += " + debuginfo";
}
let time_elapsed = util::elapsed(cx.bcx.config.creation_time().elapsed());
if let Some(e) = error {
Err(e)
} else if self.queue.is_empty() && queue.is_empty() {
let message = format!(
"{} [{}] target(s) in {}",
build_type, opt_type, time_elapsed
);
if !cx.bcx.build_config.build_plan {
cx.bcx.config.shell().status("Finished", message)?;
}
self.timings.finished(cx.bcx)?;
Ok(())
} else {
debug!("queue: {:#?}", self.queue);
Err(internal("finished with jobs still left in the queue"))
}
}
fn show_progress(&mut self, count: usize, total: usize) {
let active_names = self
.active
.values()
.map(|u| self.name_for_progress(u))
.collect::<Vec<_>>();
drop(
self.progress
.tick_now(count, total, &format!(": {}", active_names.join(", "))),
);
}
fn name_for_progress(&self, unit: &Unit<'_>) -> String {
let pkg_name = unit.pkg.name();
match unit.mode {
CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
_ => {
let annotation = match unit.target.kind() {
TargetKind::Lib(_) => return pkg_name.to_string(),
TargetKind::CustomBuild => return format!("{}(build.rs)", pkg_name),
TargetKind::Bin => "bin",
TargetKind::Test => "test",
TargetKind::Bench => "bench",
TargetKind::ExampleBin | TargetKind::ExampleLib(_) => "example",
};
format!("{}({})", unit.target.name(), annotation)
}
}
}
fn run(
&mut self,
unit: &Unit<'a>,
job: Job,
cx: &Context<'a, '_>,
scope: &Scope<'a>,
) -> CargoResult<()> {
let id = self.next_id;
self.next_id = id.checked_add(1).unwrap();
info!("start {}: {:?}", id, unit);
assert!(self.active.insert(id, *unit).is_none());
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
let my_tx = self.tx.clone();
let fresh = job.freshness();
let rmeta_required = cx.rmeta_required(unit);
let doit = move || {
let state = JobState {
id,
tx: my_tx.clone(),
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
};
let mut sender = FinishOnDrop {
tx: &my_tx,
id,
result: Err(format_err!("worker panicked")),
};
sender.result = job.run(&state);
if state.rmeta_required.get() && sender.result.is_ok() {
my_tx
.send(Message::Finish(id, Artifact::Metadata, Ok(())))
.unwrap();
}
struct FinishOnDrop<'a> {
tx: &'a Sender<Message>,
id: u32,
result: CargoResult<()>,
}
impl Drop for FinishOnDrop<'_> {
fn drop(&mut self) {
let msg = mem::replace(&mut self.result, Ok(()));
drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg)));
}
}
};
if !cx.bcx.build_config.build_plan {
self.note_working_on(cx.bcx.config, unit, fresh)?;
}
match fresh {
Freshness::Fresh => {
self.timings.add_fresh();
doit()
}
Freshness::Dirty => {
self.timings.add_dirty();
scope.spawn(move |_| doit());
}
}
Ok(())
}
fn emit_warnings(
&mut self,
msg: Option<&str>,
unit: &Unit<'a>,
cx: &mut Context<'_, '_>,
) -> CargoResult<()> {
let outputs = cx.build_script_outputs.lock().unwrap();
let bcx = &mut cx.bcx;
if let Some(output) = outputs.get(&(unit.pkg.package_id(), unit.kind)) {
if !output.warnings.is_empty() {
if let Some(msg) = msg {
writeln!(bcx.config.shell().err(), "{}\n", msg)?;
}
for warning in output.warnings.iter() {
bcx.config.shell().warn(warning)?;
}
if msg.is_some() {
writeln!(bcx.config.shell().err())?;
}
}
}
Ok(())
}
fn finish(
&mut self,
id: u32,
unit: &Unit<'a>,
artifact: Artifact,
cx: &mut Context<'a, '_>,
) -> CargoResult<()> {
if unit.mode.is_run_custom_build() && cx.bcx.show_warnings(unit.pkg.package_id()) {
self.emit_warnings(None, unit, cx)?;
}
let unlocked = self.queue.finish(unit, &artifact);
match artifact {
Artifact::All => self.timings.unit_finished(id, unlocked),
Artifact::Metadata => self.timings.unit_rmeta_finished(id, unlocked),
}
if unit.is_std && !unit.kind.is_host() && !cx.bcx.build_config.build_plan {
let rmeta = artifact == Artifact::Metadata;
standard_lib::add_sysroot_artifact(cx, unit, rmeta)?;
}
Ok(())
}
fn note_working_on(
&mut self,
config: &Config,
unit: &Unit<'a>,
fresh: Freshness,
) -> CargoResult<()> {
if (self.compiled.contains(&unit.pkg.package_id()) && !unit.mode.is_doc())
|| (self.documented.contains(&unit.pkg.package_id()) && unit.mode.is_doc())
{
return Ok(());
}
match fresh {
Dirty => {
if unit.mode.is_doc() {
self.documented.insert(unit.pkg.package_id());
config.shell().status("Documenting", unit.pkg)?;
} else if unit.mode.is_doc_test() {
} else {
self.compiled.insert(unit.pkg.package_id());
if unit.mode.is_check() {
config.shell().status("Checking", unit.pkg)?;
} else {
config.shell().status("Compiling", unit.pkg)?;
}
}
}
Fresh => {
if self.counts[&unit.pkg.package_id()] == 0
&& !(unit.mode.is_doc_test() && self.compiled.contains(&unit.pkg.package_id()))
{
self.compiled.insert(unit.pkg.package_id());
config.shell().verbose(|c| c.status("Fresh", unit.pkg))?;
}
}
}
Ok(())
}
}