Basic BinaryCacheStore implementation using async Rust

This commit is contained in:
Eelco Dolstra 2019-09-12 17:39:25 +02:00
parent 98ef11677c
commit dd5d76e2ed
8 changed files with 1877 additions and 11 deletions

1599
nix-rust/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -11,3 +11,8 @@ crate-type = ["cdylib"]
[dependencies]
tar = "0.4"
libc = "0.2"
futures-preview = { version = "=0.3.0-alpha.18", features = ["compat"] }
#hyper = "0.12"
reqwest = "0.9"
http = "0.1"
tokio = "0.1"

View file

@ -8,8 +8,8 @@ endif
libnixrust_PATH := $(d)/target/$(RUST_DIR)/libnixrust.$(SO_EXT)
libnixrust_INSTALL_PATH := $(libdir)/libnixrust.$(SO_EXT)
libnixrust_LDFLAGS_USE := -L$(d)/target/$(RUST_DIR) -lnixrust -ldl
libnixrust_LDFLAGS_USE_INSTALLED := -L$(libdir) -lnixrust -ldl
libnixrust_LDFLAGS_USE := -L$(d)/target/$(RUST_DIR) -lnixrust -ldl -lssl
libnixrust_LDFLAGS_USE_INSTALLED := -L$(libdir) -lnixrust -ldl -lssl
ifeq ($(OS), Darwin)
libnixrust_BUILD_FLAGS = NIX_LDFLAGS="-undefined dynamic_lookup"
@ -21,7 +21,7 @@ endif
$(libnixrust_PATH): $(wildcard $(d)/src/*.rs) $(d)/Cargo.toml
$(trace-gen) cd nix-rust && CARGO_HOME=$$(if [[ -d vendor ]]; then echo vendor; fi) \
$(libnixrust_BUILD_FLAGS) \
cargo build $(RUST_MODE) $$(if [[ -d vendor ]]; then echo --offline; fi) \
RUSTC_BOOTSTRAP=1 cargo build $(RUST_MODE) $$(if [[ -d vendor ]]; then echo --offline; fi) \
&& touch target/$(RUST_DIR)/libnixrust.$(SO_EXT)
$(libnixrust_INSTALL_PATH): $(libnixrust_PATH)

View file

@ -0,0 +1,51 @@
use crate::store::{Store, StorePath};
use crate::path_info::PathInfo;
use crate::Error;
use futures::compat::Future01CompatExt;
pub struct BinaryCacheStore {
base_uri: String,
client: reqwest::r#async::Client,
}
impl BinaryCacheStore {
pub fn new(base_uri: String) -> Self {
Self {
base_uri,
client: reqwest::r#async::Client::new(),
}
}
}
impl Store for BinaryCacheStore {
fn query_path_info(
&self,
path: &StorePath,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<PathInfo, Error>> + Send>> {
let uri = format!("{}/{}.narinfo", self.base_uri.clone(), path.hash);
let path = path.clone();
let client = self.client.clone();
let store_dir = self.store_dir().to_string();
Box::pin(async move {
let response = client
.get(&uri)
.send()
.compat()
.await?;
if response.status() == reqwest::StatusCode::NOT_FOUND || response.status() == reqwest::StatusCode::FORBIDDEN {
return Err(Error::InvalidPath(path));
}
let mut response = response.error_for_status()?;
let body = response
.text()
.compat()
.await?;
PathInfo::parse_nar_info(&body, &store_dir)
})
}
}

View file

@ -1,6 +1,10 @@
#[derive(Debug)]
pub enum Error {
InvalidPath(crate::store::StorePath),
BadStorePath(std::path::PathBuf),
BadNarInfo,
IOError(std::io::Error),
HttpError(reqwest::Error),
Misc(String),
Foreign(CppException),
}
@ -11,12 +15,24 @@ impl From<std::io::Error> for Error {
}
}
impl From<reqwest::Error> for Error {
fn from(err: reqwest::Error) -> Self {
Error::HttpError(err)
}
}
impl From<Error> for CppException {
fn from(err: Error) -> Self {
match err {
Error::InvalidPath(_) => unsafe { make_error("invalid path") }, // FIXME
Error::BadNarInfo => unsafe { make_error(".narinfo file is corrupt") }, // FIXME
Error::BadStorePath(path) => unsafe {
make_error(&format!("path '{}' is not a store path", path.display()))
}, // FIXME
Error::IOError(err) => unsafe { make_error(&err.to_string()) },
Error::HttpError(err) => unsafe { make_error(&err.to_string()) },
Error::Foreign(ex) => ex,
Error::Misc(s) => unsafe { make_error(&s) },
Error::IOError(err) => unsafe { make_error(&err.to_string()) },
}
}
}

View file

@ -1,6 +1,11 @@
#![feature(await_macro, async_await)]
mod binary_cache_store;
mod error;
mod foreign;
mod store;
mod tarfile;
mod path_info;
pub use error::Error;
@ -30,3 +35,36 @@ pub extern "C" fn unpack_tarfile(
) -> CBox<Result<(), error::CppException>> {
CBox::new(tarfile::unpack_tarfile(source, dest_dir).map_err(|err| err.into()))
}
#[no_mangle]
pub extern "C" fn rust_test() {
use crate::store::Store;
use futures::future::{FutureExt, TryFutureExt};
use std::path::Path;
let fut = async move {
let store: Box<dyn Store> = Box::new(binary_cache_store::BinaryCacheStore::new(
"https://cache.nixos.org".to_string(),
));
let path = store
.parse_store_path(&Path::new(
"/nix/store/7h7qgvs4kgzsn8a6rb273saxyqh4jxlz-konsole-18.12.3",
))
.unwrap();
/*
let info = store.query_path_info(&path).await.unwrap();
eprintln!("INFO = {:?}", info);
*/
let closure = store.compute_path_closure(vec![path].into_iter().collect()).await.unwrap();
eprintln!("CLOSURE = {:?}", closure.len());
Ok(())
};
tokio::run(fut.boxed().compat());
}

70
nix-rust/src/path_info.rs Normal file
View file

@ -0,0 +1,70 @@
use crate::store::StorePath;
use crate::Error;
use std::collections::BTreeSet;
#[derive(Clone, Debug)]
pub struct PathInfo {
pub path: StorePath,
pub references: BTreeSet<StorePath>,
pub nar_size: u64,
pub deriver: Option<StorePath>,
// Additional binary cache info.
pub url: Option<String>,
pub compression: Option<String>,
pub file_size: Option<u64>,
}
impl PathInfo {
pub fn parse_nar_info(nar_info: &str, store_dir: &str) -> Result<Self, Error> {
let mut path = None;
let mut references = BTreeSet::new();
let mut nar_size = None;
let mut deriver = None;
let mut url = None;
let mut compression = None;
let mut file_size = None;
for line in nar_info.lines() {
let colon = line.find(':').ok_or(Error::BadNarInfo)?;
let (name, value) = line.split_at(colon);
if !value.starts_with(": ") {
return Err(Error::BadNarInfo);
}
let value = &value[2..];
if name == "StorePath" {
path = Some(StorePath::new(std::path::Path::new(value), store_dir)?);
} else if name == "NarSize" {
nar_size = Some(u64::from_str_radix(value, 10).map_err(|_| Error::BadNarInfo)?);
} else if name == "References" {
if !value.is_empty() {
for r in value.split(' ') {
references.insert(StorePath::new_short(r)?);
}
}
} else if name == "Deriver" {
deriver = Some(StorePath::new_short(value)?);
} else if name == "URL" {
url = Some(value.into());
} else if name == "Compression" {
compression = Some(value.into());
} else if name == "FileSize" {
file_size = Some(u64::from_str_radix(value, 10).map_err(|_| Error::BadNarInfo)?);
}
}
Ok(PathInfo {
path: path.ok_or(Error::BadNarInfo)?,
references,
nar_size: nar_size.ok_or(Error::BadNarInfo)?,
deriver,
url: Some(url.ok_or(Error::BadNarInfo)?),
compression,
file_size,
})
}
}

101
nix-rust/src/store.rs Normal file
View file

@ -0,0 +1,101 @@
use crate::path_info::PathInfo;
use crate::Error;
use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct StorePath {
pub hash: String,
pub name: String,
}
pub const STORE_PATH_HASH_CHARS: usize = 32;
impl StorePath {
pub fn new(path: &Path, store_dir: &str) -> Result<Self, Error> {
// FIXME: check store_dir
Self::new_short(
path.file_name()
.ok_or(Error::BadStorePath(path.into()))?
.to_str()
.ok_or(Error::BadStorePath(path.into()))?,
)
}
pub fn new_short(base_name: &str) -> Result<Self, Error> {
if base_name.len() < STORE_PATH_HASH_CHARS + 2
|| base_name.as_bytes()[STORE_PATH_HASH_CHARS] != '-' as u8
{
return Err(Error::BadStorePath(base_name.into()));
}
// FIXME: validate name
Ok(StorePath {
hash: base_name[0..STORE_PATH_HASH_CHARS].to_string(),
name: base_name[STORE_PATH_HASH_CHARS + 1..].to_string(),
})
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct StorePathHash {
bytes: [u8; 20],
}
/*
impl StorePathHash {
pub fn to_base32(&self) -> String {
"7h7qgvs4kgzsn8a6rb273saxyqh4jxlz".to_string()
}
}
*/
pub trait Store: Send + Sync {
fn store_dir(&self) -> &str {
"/nix/store"
}
fn query_path_info(
&self,
store_path: &StorePath,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<PathInfo, Error>> + Send>>;
}
impl dyn Store {
pub fn parse_store_path(&self, path: &Path) -> Result<StorePath, Error> {
StorePath::new(path, self.store_dir())
}
pub async fn compute_path_closure(
&self,
roots: BTreeSet<StorePath>,
) -> Result<BTreeMap<StorePath, PathInfo>, Error> {
let mut done = BTreeSet::new();
let mut result = BTreeMap::new();
let mut pending = vec![];
for root in roots {
pending.push(self.query_path_info(&root));
done.insert(root);
}
while !pending.is_empty() {
let (info, _, remaining) = futures::future::select_all(pending).await;
pending = remaining;
let info = info?;
for path in &info.references {
if !done.contains(path) {
pending.push(self.query_path_info(&path));
done.insert(path.clone());
}
}
result.insert(info.path.clone(), info);
}
Ok(result)
}
}