use std::cell::{Cell, Ref, RefCell, RefMut};
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::hash;
use std::mem;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use bytesize::ByteSize;
use curl::easy::{Easy, HttpVersion};
use curl::multi::{EasyHandle, Multi};
use failure::ResultExt;
use lazycell::LazyCell;
use log::{debug, warn};
use semver::Version;
use serde::ser;
use serde::Serialize;
use crate::core::interning::InternedString;
use crate::core::source::MaybePackage;
use crate::core::{Dependency, Manifest, PackageId, SourceId, Target};
use crate::core::{FeatureMap, SourceMap, Summary};
use crate::ops;
use crate::util::config::PackageCacheLock;
use crate::util::errors::{CargoResult, CargoResultExt, HttpNot200};
use crate::util::network::Retry;
use crate::util::{self, internal, Config, Progress, ProgressStyle};
#[derive(Clone)]
pub struct Package {
manifest: Manifest,
manifest_path: PathBuf,
}
impl Ord for Package {
fn cmp(&self, other: &Package) -> Ordering {
self.package_id().cmp(&other.package_id())
}
}
impl PartialOrd for Package {
fn partial_cmp(&self, other: &Package) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Serialize)]
struct SerializedPackage<'a> {
name: &'a str,
version: &'a Version,
id: PackageId,
license: Option<&'a str>,
license_file: Option<&'a str>,
description: Option<&'a str>,
source: SourceId,
dependencies: &'a [Dependency],
targets: Vec<&'a Target>,
features: &'a FeatureMap,
manifest_path: &'a Path,
metadata: Option<&'a toml::Value>,
publish: Option<&'a Vec<String>>,
authors: &'a [String],
categories: &'a [String],
keywords: &'a [String],
readme: Option<&'a str>,
repository: Option<&'a str>,
edition: &'a str,
links: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
metabuild: Option<&'a Vec<String>>,
}
impl ser::Serialize for Package {
fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
let summary = self.manifest.summary();
let package_id = summary.package_id();
let manmeta = self.manifest.metadata();
let license = manmeta.license.as_ref().map(String::as_ref);
let license_file = manmeta.license_file.as_ref().map(String::as_ref);
let description = manmeta.description.as_ref().map(String::as_ref);
let authors = manmeta.authors.as_ref();
let categories = manmeta.categories.as_ref();
let keywords = manmeta.keywords.as_ref();
let readme = manmeta.readme.as_ref().map(String::as_ref);
let repository = manmeta.repository.as_ref().map(String::as_ref);
let targets: Vec<&Target> = self
.manifest
.targets()
.iter()
.filter(|t| t.src_path().is_path())
.collect();
SerializedPackage {
name: &*package_id.name(),
version: package_id.version(),
id: package_id,
license,
license_file,
description,
source: summary.source_id(),
dependencies: summary.dependencies(),
targets,
features: summary.features(),
manifest_path: &self.manifest_path,
metadata: self.manifest.custom_metadata(),
authors,
categories,
keywords,
readme,
repository,
edition: &self.manifest.edition().to_string(),
links: self.manifest.links(),
metabuild: self.manifest.metabuild(),
publish: self.publish().as_ref(),
}
.serialize(s)
}
}
impl Package {
pub fn new(manifest: Manifest, manifest_path: &Path) -> Package {
Package {
manifest,
manifest_path: manifest_path.to_path_buf(),
}
}
pub fn dependencies(&self) -> &[Dependency] {
self.manifest.dependencies()
}
pub fn manifest(&self) -> &Manifest {
&self.manifest
}
pub fn manifest_mut(&mut self) -> &mut Manifest {
&mut self.manifest
}
pub fn manifest_path(&self) -> &Path {
&self.manifest_path
}
pub fn name(&self) -> InternedString {
self.package_id().name()
}
pub fn package_id(&self) -> PackageId {
self.manifest.package_id()
}
pub fn root(&self) -> &Path {
self.manifest_path.parent().unwrap()
}
pub fn summary(&self) -> &Summary {
self.manifest.summary()
}
pub fn targets(&self) -> &[Target] {
self.manifest.targets()
}
pub fn version(&self) -> &Version {
self.package_id().version()
}
pub fn authors(&self) -> &Vec<String> {
&self.manifest.metadata().authors
}
pub fn publish(&self) -> &Option<Vec<String>> {
self.manifest.publish()
}
pub fn has_custom_build(&self) -> bool {
self.targets().iter().any(|t| t.is_custom_build())
}
pub fn map_source(self, to_replace: SourceId, replace_with: SourceId) -> Package {
Package {
manifest: self.manifest.map_source(to_replace, replace_with),
manifest_path: self.manifest_path,
}
}
pub fn to_registry_toml(&self, config: &Config) -> CargoResult<String> {
let manifest = self.manifest().original().prepare_for_publish(config)?;
let toml = toml::to_string(&manifest)?;
Ok(format!(
"# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO\n\
#\n\
# When uploading crates to the registry Cargo will automatically\n\
# \"normalize\" Cargo.toml files for maximal compatibility\n\
# with all versions of Cargo and also rewrite `path` dependencies\n\
# to registry (e.g., crates.io) dependencies\n\
#\n\
# If you believe there's an error in this file please file an\n\
# issue against the rust-lang/cargo repository. If you're\n\
# editing this file be aware that the upstream Cargo.toml\n\
# will likely look very different (and much more reasonable)\n\
\n\
{}\
",
toml
))
}
pub fn include_lockfile(&self) -> bool {
self.targets().iter().any(|t| t.is_example() || t.is_bin())
}
}
impl fmt::Display for Package {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.summary().package_id())
}
}
impl fmt::Debug for Package {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Package")
.field("id", &self.summary().package_id())
.field("..", &"..")
.finish()
}
}
impl PartialEq for Package {
fn eq(&self, other: &Package) -> bool {
self.package_id() == other.package_id()
}
}
impl Eq for Package {}
impl hash::Hash for Package {
fn hash<H: hash::Hasher>(&self, into: &mut H) {
self.package_id().hash(into)
}
}
pub struct PackageSet<'cfg> {
packages: HashMap<PackageId, LazyCell<Package>>,
sources: RefCell<SourceMap<'cfg>>,
config: &'cfg Config,
multi: Multi,
downloading: Cell<bool>,
multiplexing: bool,
}
pub struct Downloads<'a, 'cfg> {
set: &'a PackageSet<'cfg>,
pending: HashMap<usize, (Download<'cfg>, EasyHandle)>,
pending_ids: HashSet<PackageId>,
results: Vec<(usize, Result<(), curl::Error>)>,
next: usize,
progress: RefCell<Option<Progress<'cfg>>>,
downloads_finished: usize,
downloaded_bytes: u64,
largest: (u64, String),
start: Instant,
success: bool,
timeout: ops::HttpTimeout,
updated_at: Cell<Instant>,
next_speed_check: Cell<Instant>,
next_speed_check_bytes_threshold: Cell<u64>,
_lock: PackageCacheLock<'cfg>,
}
struct Download<'cfg> {
token: usize,
id: PackageId,
data: RefCell<Vec<u8>>,
url: String,
descriptor: String,
total: Cell<u64>,
current: Cell<u64>,
start: Instant,
timed_out: Cell<Option<String>>,
retry: Retry<'cfg>,
}
impl<'cfg> PackageSet<'cfg> {
pub fn new(
package_ids: &[PackageId],
sources: SourceMap<'cfg>,
config: &'cfg Config,
) -> CargoResult<PackageSet<'cfg>> {
let mut multi = Multi::new();
let multiplexing = config.http_config()?.multiplexing.unwrap_or(true);
multi
.pipelining(false, multiplexing)
.chain_err(|| "failed to enable multiplexing/pipelining in curl")?;
multi.set_max_host_connections(2)?;
Ok(PackageSet {
packages: package_ids
.iter()
.map(|&id| (id, LazyCell::new()))
.collect(),
sources: RefCell::new(sources),
config,
multi,
downloading: Cell::new(false),
multiplexing,
})
}
pub fn package_ids<'a>(&'a self) -> impl Iterator<Item = PackageId> + 'a {
self.packages.keys().cloned()
}
pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'cfg>> {
assert!(!self.downloading.replace(true));
let timeout = ops::HttpTimeout::new(self.config)?;
Ok(Downloads {
start: Instant::now(),
set: self,
next: 0,
pending: HashMap::new(),
pending_ids: HashSet::new(),
results: Vec::new(),
progress: RefCell::new(Some(Progress::with_style(
"Downloading",
ProgressStyle::Ratio,
self.config,
))),
downloads_finished: 0,
downloaded_bytes: 0,
largest: (0, String::new()),
success: false,
updated_at: Cell::new(Instant::now()),
timeout,
next_speed_check: Cell::new(Instant::now()),
next_speed_check_bytes_threshold: Cell::new(0),
_lock: self.config.acquire_package_cache_lock()?,
})
}
pub fn get_one(&self, id: PackageId) -> CargoResult<&Package> {
Ok(self.get_many(Some(id))?.remove(0))
}
pub fn get_many(&self, ids: impl IntoIterator<Item = PackageId>) -> CargoResult<Vec<&Package>> {
let mut pkgs = Vec::new();
let mut downloads = self.enable_download()?;
for id in ids {
pkgs.extend(downloads.start(id)?);
}
while downloads.remaining() > 0 {
pkgs.push(downloads.wait()?);
}
downloads.success = true;
Ok(pkgs)
}
pub fn sources(&self) -> Ref<'_, SourceMap<'cfg>> {
self.sources.borrow()
}
pub fn sources_mut(&self) -> RefMut<'_, SourceMap<'cfg>> {
self.sources.borrow_mut()
}
pub fn add_set(&mut self, set: PackageSet<'cfg>) {
assert!(!self.downloading.get());
assert!(!set.downloading.get());
for (pkg_id, p_cell) in set.packages {
self.packages.entry(pkg_id).or_insert(p_cell);
}
let mut sources = self.sources.borrow_mut();
let other_sources = set.sources.into_inner();
sources.add_source_map(other_sources);
}
pub fn lookup_mut(&mut self, id: PackageId) -> Option<&mut Package> {
self.packages
.get_mut(&id)
.and_then(|cell| cell.borrow_mut())
}
}
macro_rules! try_old_curl {
($e:expr, $msg:expr) => {
let result = $e;
if cfg!(target_os = "macos") {
if let Err(e) = result {
warn!("ignoring libcurl {} error: {}", $msg, e);
}
} else {
result.with_context(|_| {
failure::format_err!("failed to enable {}, is curl not built right?", $msg)
})?;
}
};
}
impl<'a, 'cfg> Downloads<'a, 'cfg> {
pub fn start(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
Ok(self
.start_inner(id)
.chain_err(|| format!("failed to download `{}`", id))?)
}
fn start_inner(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
let slot = self
.set
.packages
.get(&id)
.ok_or_else(|| internal(format!("couldn't find `{}` in package set", id)))?;
if let Some(pkg) = slot.borrow() {
return Ok(Some(pkg));
}
let mut sources = self.set.sources.borrow_mut();
let source = sources
.get_mut(id.source_id())
.ok_or_else(|| internal(format!("couldn't find source for `{}`", id)))?;
let pkg = source
.download(id)
.chain_err(|| failure::format_err!("unable to get packages from source"))?;
let (url, descriptor) = match pkg {
MaybePackage::Ready(pkg) => {
debug!("{} doesn't need a download", id);
assert!(slot.fill(pkg).is_ok());
return Ok(Some(slot.borrow().unwrap()));
}
MaybePackage::Download { url, descriptor } => (url, descriptor),
};
let token = self.next;
self.next += 1;
debug!("downloading {} as {}", id, token);
assert!(self.pending_ids.insert(id));
let (mut handle, _timeout) = ops::http_handle_and_timeout(self.set.config)?;
handle.get(true)?;
handle.url(&url)?;
handle.follow_location(true)?;
if self.set.multiplexing {
try_old_curl!(handle.http_version(HttpVersion::V2), "HTTP2");
} else {
handle.http_version(HttpVersion::V11)?;
}
try_old_curl!(handle.pipewait(true), "pipewait");
handle.write_function(move |buf| {
debug!("{} - {} bytes of data", token, buf.len());
tls::with(|downloads| {
if let Some(downloads) = downloads {
downloads.pending[&token]
.0
.data
.borrow_mut()
.extend_from_slice(buf);
}
});
Ok(buf.len())
})?;
handle.progress(true)?;
handle.progress_function(move |dl_total, dl_cur, _, _| {
tls::with(|downloads| match downloads {
Some(d) => d.progress(token, dl_total as u64, dl_cur as u64),
None => false,
})
})?;
if self.downloads_finished == 0
&& self.pending.is_empty()
&& !self.progress.borrow().as_ref().unwrap().is_enabled()
{
self.set
.config
.shell()
.status("Downloading", "crates ...")?;
}
let dl = Download {
token,
data: RefCell::new(Vec::new()),
id,
url,
descriptor,
total: Cell::new(0),
current: Cell::new(0),
start: Instant::now(),
timed_out: Cell::new(None),
retry: Retry::new(self.set.config)?,
};
self.enqueue(dl, handle)?;
self.tick(WhyTick::DownloadStarted)?;
Ok(None)
}
pub fn remaining(&self) -> usize {
self.pending.len()
}
pub fn wait(&mut self) -> CargoResult<&'a Package> {
let (dl, data) = loop {
assert_eq!(self.pending.len(), self.pending_ids.len());
let (token, result) = self.wait_for_curl()?;
debug!("{} finished with {:?}", token, result);
let (mut dl, handle) = self
.pending
.remove(&token)
.expect("got a token for a non-in-progress transfer");
let data = mem::replace(&mut *dl.data.borrow_mut(), Vec::new());
let mut handle = self.set.multi.remove(handle)?;
self.pending_ids.remove(&dl.id);
let ret = {
let timed_out = &dl.timed_out;
let url = &dl.url;
dl.retry
.r#try(|| {
if let Err(e) = result {
if !e.is_aborted_by_callback() {
return Err(e.into());
}
return Err(match timed_out.replace(None) {
Some(msg) => {
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
let mut err = curl::Error::new(code);
err.set_extra(msg);
err
}
None => e,
}
.into());
}
let code = handle.response_code()?;
if code != 200 && code != 0 {
let url = handle.effective_url()?.unwrap_or(url);
return Err(HttpNot200 {
code,
url: url.to_string(),
}
.into());
}
Ok(())
})
.chain_err(|| format!("failed to download from `{}`", dl.url))?
};
match ret {
Some(()) => break (dl, data),
None => {
self.pending_ids.insert(dl.id);
self.enqueue(dl, handle)?
}
}
};
self.progress.borrow_mut().as_mut().unwrap().clear();
self.set
.config
.shell()
.status("Downloaded", &dl.descriptor)?;
self.downloads_finished += 1;
self.downloaded_bytes += dl.total.get();
if dl.total.get() > self.largest.0 {
self.largest = (dl.total.get(), dl.id.name().to_string());
}
if dl.total.get() < ByteSize::kb(400).0 {
self.tick(WhyTick::DownloadFinished)?;
} else {
self.tick(WhyTick::Extracting(&dl.id.name()))?;
}
let mut sources = self.set.sources.borrow_mut();
let source = sources
.get_mut(dl.id.source_id())
.ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
let start = Instant::now();
let pkg = source.finish_download(dl.id, data)?;
let finish_dur = start.elapsed();
self.updated_at.set(self.updated_at.get() + finish_dur);
self.next_speed_check
.set(self.next_speed_check.get() + finish_dur);
let slot = &self.set.packages[&dl.id];
assert!(slot.fill(pkg).is_ok());
Ok(slot.borrow().unwrap())
}
fn enqueue(&mut self, dl: Download<'cfg>, handle: Easy) -> CargoResult<()> {
let mut handle = self.set.multi.add(handle)?;
let now = Instant::now();
handle.set_token(dl.token)?;
self.updated_at.set(now);
self.next_speed_check.set(now + self.timeout.dur);
self.next_speed_check_bytes_threshold
.set(u64::from(self.timeout.low_speed_limit));
dl.timed_out.set(None);
dl.current.set(0);
dl.total.set(0);
self.pending.insert(dl.token, (dl, handle));
Ok(())
}
fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> {
loop {
let n = tls::set(self, || {
self.set
.multi
.perform()
.chain_err(|| "failed to perform http requests")
})?;
debug!("handles remaining: {}", n);
let results = &mut self.results;
let pending = &self.pending;
self.set.multi.messages(|msg| {
let token = msg.token().expect("failed to read token");
let handle = &pending[&token].1;
if let Some(result) = msg.result_for(handle) {
results.push((token, result));
} else {
debug!("message without a result (?)");
}
});
if let Some(pair) = results.pop() {
break Ok(pair);
}
assert!(!self.pending.is_empty());
let timeout = self
.set
.multi
.get_timeout()?
.unwrap_or_else(|| Duration::new(5, 0));
self.set
.multi
.wait(&mut [], timeout)
.chain_err(|| "failed to wait on curl `Multi`")?;
}
}
fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
let dl = &self.pending[&token].0;
dl.total.set(total);
let now = Instant::now();
if cur != dl.current.get() {
let delta = cur - dl.current.get();
let threshold = self.next_speed_check_bytes_threshold.get();
dl.current.set(cur);
self.updated_at.set(now);
if delta >= threshold {
self.next_speed_check.set(now + self.timeout.dur);
self.next_speed_check_bytes_threshold
.set(u64::from(self.timeout.low_speed_limit));
} else {
self.next_speed_check_bytes_threshold.set(threshold - delta);
}
}
if self.tick(WhyTick::DownloadUpdate).is_err() {
return false;
}
if now - self.updated_at.get() > self.timeout.dur {
self.updated_at.set(now);
let msg = format!(
"failed to download any data for `{}` within {}s",
dl.id,
self.timeout.dur.as_secs()
);
dl.timed_out.set(Some(msg));
return false;
}
if now >= self.next_speed_check.get() {
self.next_speed_check.set(now + self.timeout.dur);
assert!(self.next_speed_check_bytes_threshold.get() > 0);
let msg = format!(
"download of `{}` failed to transfer more \
than {} bytes in {}s",
dl.id,
self.timeout.low_speed_limit,
self.timeout.dur.as_secs()
);
dl.timed_out.set(Some(msg));
return false;
}
true
}
fn tick(&self, why: WhyTick<'_>) -> CargoResult<()> {
let mut progress = self.progress.borrow_mut();
let progress = progress.as_mut().unwrap();
if let WhyTick::DownloadUpdate = why {
if !progress.update_allowed() {
return Ok(());
}
}
let pending = self.pending.len();
let mut msg = if pending == 1 {
format!("{} crate", pending)
} else {
format!("{} crates", pending)
};
match why {
WhyTick::Extracting(krate) => {
msg.push_str(&format!(", extracting {} ...", krate));
}
_ => {
let mut dur = Duration::new(0, 0);
let mut remaining = 0;
for (dl, _) in self.pending.values() {
dur += dl.start.elapsed();
if dl.total.get() >= dl.current.get() {
remaining += dl.total.get() - dl.current.get();
}
}
if remaining > 0 && dur > Duration::from_millis(500) {
msg.push_str(&format!(", remaining bytes: {}", ByteSize(remaining)));
}
}
}
progress.print_now(&msg)
}
}
#[derive(Copy, Clone)]
enum WhyTick<'a> {
DownloadStarted,
DownloadUpdate,
DownloadFinished,
Extracting(&'a str),
}
impl<'a, 'cfg> Drop for Downloads<'a, 'cfg> {
fn drop(&mut self) {
self.set.downloading.set(false);
let progress = self.progress.get_mut().take().unwrap();
if !progress.is_enabled() {
return;
}
if self.downloads_finished == 0 {
return;
}
if !self.success {
return;
}
let crate_string = if self.downloads_finished == 1 {
"crate"
} else {
"crates"
};
let mut status = format!(
"{} {} ({}) in {}",
self.downloads_finished,
crate_string,
ByteSize(self.downloaded_bytes),
util::elapsed(self.start.elapsed())
);
if self.largest.0 > ByteSize::mb(1).0 && self.downloads_finished > 1 {
status.push_str(&format!(
" (largest was `{}` at {})",
self.largest.1,
ByteSize(self.largest.0),
));
}
drop(progress);
drop(self.set.config.shell().status("Downloaded", status));
}
}
mod tls {
use std::cell::Cell;
use super::Downloads;
thread_local!(static PTR: Cell<usize> = Cell::new(0));
pub(crate) fn with<R>(f: impl FnOnce(Option<&Downloads<'_, '_>>) -> R) -> R {
let ptr = PTR.with(|p| p.get());
if ptr == 0 {
f(None)
} else {
unsafe { f(Some(&*(ptr as *const Downloads<'_, '_>))) }
}
}
pub(crate) fn set<R>(dl: &Downloads<'_, '_>, f: impl FnOnce() -> R) -> R {
struct Reset<'a, T: Copy>(&'a Cell<T>, T);
impl<'a, T: Copy> Drop for Reset<'a, T> {
fn drop(&mut self) {
self.0.set(self.1);
}
}
PTR.with(|p| {
let _reset = Reset(p, p.get());
p.set(dl as *const Downloads<'_, '_> as usize);
f()
})
}
}