diff --git a/.env.sample b/.env.sample index 45f166bbe..7cf054776 100644 --- a/.env.sample +++ b/.env.sample @@ -3,3 +3,6 @@ CRATESFYI_GITHUB_ACCESSTOKEN= CRATESFYI_PREFIX=ignored/cratesfyi-prefix CRATESFYI_DATABASE_URL=postgresql://cratesfyi:password@localhost RUST_LOG=cratesfyi,rustwide=info +AWS_ACCESS_KEY_ID=cratesfyi +AWS_SECRET_ACCESS_KEY=secret_key +S3_ENDPOINT=http://localhost:9000 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 85d6e8fc6..9172ac149 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,12 +49,12 @@ jobs: restore-key: | ${{ runner.os }}-cargo-build-target-${{ env.CURRENT_RUSTC_VERSION }}- - - name: Launch the postgres image + - name: Launch the postgres and min.io images run: | cp .env.sample .env . .env mkdir -p ${CRATESFYI_PREFIX}/public-html - docker-compose up -d db + docker-compose up -d db s3 # Give the database enough time to start up sleep 5 # Make sure the database is actually working diff --git a/README.md b/README.md index 2a4acb1f8..9d06c516b 100644 --- a/README.md +++ b/README.md @@ -86,16 +86,22 @@ cargo test ``` Some tests require access to the database. To run them, set the -`CRATESFYI_DATABASE_URL` to the url of a PostgreSQL database. If you are using -the `docker-compose` environment to run tests against, see the -[Docker-Compose][docker-compose-section] section for the default PostgreSQL URL. -You don't have to run the migrations on it or ensure it's empty, as all the -tests use temporary tables to prevent conflicts with each other or existing -data. See the [wiki page on developing outside docker-compose][wiki-no-compose] +`CRATESFYI_DATABASE_URL` in `.env` to the url of a PostgreSQL database, +and set the `AWS_ACCESS_KEY_ID`, `S3_ENDPOINT`, and `AWS_SECRET_ACCESS_KEY` variables. +We have some reasonable default parameters in `.env.sample`. + +For example, if you are using the `docker-compose` environment to run tests against, you can launch only the database and s3 server like so: + +```console +docker-compose up -d db s3 +``` + +If you don't want to use docker-compose, see the +[wiki page on developing outside docker-compose][wiki-no-compose] for more information on how to setup this environment. +Note that either way, you will need docker installed for sandboxing with Rustwide. [wiki-no-compose]: https://github.com/rust-lang/docs.rs/wiki/Developing-without-docker-compose -[docker-compose-section]: #Docker-Compose ### Docker-Compose diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index 66a44446c..33600bce0 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -83,7 +83,6 @@ pub fn main() { .subcommand(SubCommand::with_name("database") .about("Database operations") .setting(AppSettings::ArgRequiredElseHelp) - .subcommand(SubCommand::with_name("move-to-s3")) .subcommand(SubCommand::with_name("migrate") .about("Run database migrations") .arg(Arg::with_name("VERSION"))) @@ -263,18 +262,6 @@ pub fn main() { } else if matches.subcommand_matches("update-search-index").is_some() { let conn = db::connect_db().unwrap(); db::update_search_index(&conn).expect("Failed to update search index"); - } else if matches.subcommand_matches("move-to-s3").is_some() { - let conn = db::connect_db().unwrap(); - let mut count = 1; - let mut total = 0; - while count != 0 { - count = db::move_to_s3(&conn, 5_000).expect("Failed to upload batch to S3"); - total += count; - eprintln!( - "moved {} rows to s3 in this batch, total moved so far: {}", - count, total - ); - } } else if let Some(matches) = matches.subcommand_matches("delete-crate") { let name = matches.value_of("CRATE_NAME").expect("missing crate name"); let conn = db::connect_db().expect("failed to connect to the database"); diff --git a/src/db/delete_crate.rs b/src/db/delete_crate.rs index 70990ec95..47d8b7aa0 100644 --- a/src/db/delete_crate.rs +++ b/src/db/delete_crate.rs @@ -1,4 +1,4 @@ -use super::file::{s3_client, S3_BUCKET_NAME}; +use crate::storage::s3::{s3_client, S3_BUCKET_NAME}; use failure::{Error, Fail}; use postgres::Connection; use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3Client, S3}; diff --git a/src/db/file.rs b/src/db/file.rs index 56fbb4171..d82e32900 100644 --- a/src/db/file.rs +++ b/src/db/file.rs @@ -5,148 +5,16 @@ //! filesystem. This module is adding files into database and retrieving them. use crate::error::Result; -use failure::err_msg; -use log::{error, warn}; +use crate::storage::Storage; use postgres::Connection; -use rusoto_core::region::Region; -use rusoto_credential::DefaultCredentialsProvider; -use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; + use rustc_serialize::json::{Json, ToJson}; -use std::cmp; -use std::ffi::OsStr; -use std::fs; -use std::io::Read; use std::path::{Path, PathBuf}; -const MAX_CONCURRENT_UPLOADS: usize = 50; - -pub(super) static S3_BUCKET_NAME: &str = "rust-docs-rs"; - -fn get_file_list_from_dir>(path: P, files: &mut Vec) -> Result<()> { - let path = path.as_ref(); - - for file in path.read_dir()? { - let file = file?; - - if file.file_type()?.is_file() { - files.push(file.path()); - } else if file.file_type()?.is_dir() { - get_file_list_from_dir(file.path(), files)?; - } - } - - Ok(()) -} - -fn get_file_list>(path: P) -> Result> { - let path = path.as_ref(); - let mut files = Vec::new(); - - if !path.exists() { - return Err(err_msg("File not found")); - } else if path.is_file() { - files.push(PathBuf::from(path.file_name().unwrap())); - } else if path.is_dir() { - get_file_list_from_dir(path, &mut files)?; - for file_path in &mut files { - // We want the paths in this list to not be {path}/bar.txt but just bar.txt - *file_path = PathBuf::from(file_path.strip_prefix(path).unwrap()); - } - } - - Ok(files) -} - -pub struct Blob { - pub path: String, - pub mime: String, - pub date_updated: time::Timespec, - pub content: Vec, -} - -pub fn get_path(conn: &Connection, path: &str) -> Option { - if let Some(client) = s3_client() { - let res = client - .get_object(GetObjectRequest { - bucket: S3_BUCKET_NAME.into(), - key: path.into(), - ..Default::default() - }) - .sync(); +pub(crate) use crate::storage::Blob; - let res = match res { - Ok(r) => r, - Err(_) => { - return None; - } - }; - - let mut b = res.body.unwrap().into_blocking_read(); - let mut content = Vec::new(); - b.read_to_end(&mut content).unwrap(); - - let last_modified = res.last_modified.unwrap(); - let last_modified = time::strptime(&last_modified, "%a, %d %b %Y %H:%M:%S %Z") - .unwrap_or_else(|e| panic!("failed to parse {:?} as timespec: {:?}", last_modified, e)) - .to_timespec(); - - Some(Blob { - path: path.into(), - mime: res.content_type.unwrap(), - date_updated: last_modified, - content, - }) - } else { - let rows = conn - .query( - "SELECT path, mime, date_updated, content - FROM files - WHERE path = $1", - &[&path], - ) - .unwrap(); - - if rows.is_empty() { - None - } else { - let row = rows.get(0); - - Some(Blob { - path: row.get(0), - mime: row.get(1), - date_updated: row.get(2), - content: row.get(3), - }) - } - } -} - -pub(super) fn s3_client() -> Option { - // If AWS keys aren't configured, then presume we should use the DB exclusively - // for file storage. - if std::env::var_os("AWS_ACCESS_KEY_ID").is_none() && std::env::var_os("FORCE_S3").is_none() { - return None; - } - - let creds = match DefaultCredentialsProvider::new() { - Ok(creds) => creds, - Err(err) => { - warn!("failed to retrieve AWS credentials: {}", err); - return None; - } - }; - - Some(S3Client::new_with( - rusoto_core::request::HttpClient::new().unwrap(), - creds, - std::env::var("S3_ENDPOINT") - .ok() - .map(|e| Region::Custom { - name: std::env::var("S3_REGION").unwrap_or_else(|_| "us-west-1".to_owned()), - endpoint: e, - }) - .unwrap_or(Region::UsWest1), - )) +pub(crate) fn get_path(conn: &Connection, path: &str) -> Option { + Storage::new(conn).get(path).ok() } /// Store all files in a directory and return [[mimetype, filename]] as Json @@ -163,237 +31,20 @@ pub fn add_path_into_database>( prefix: &str, path: P, ) -> Result { - use futures::future::Future; - use std::collections::HashMap; - - let trans = conn.transaction()?; - let mut file_paths_and_mimes: HashMap = HashMap::new(); - - let mut rt = ::tokio::runtime::Runtime::new().unwrap(); - - let mut to_upload = get_file_list(&path)?; - let mut batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS); - let mut currently_uploading: Vec<_> = to_upload.drain(..batch_size).collect(); - let mut attempts = 0; - - while !to_upload.is_empty() || !currently_uploading.is_empty() { - let mut futures = Vec::new(); - let client = s3_client(); - - for file_path in ¤tly_uploading { - let path = Path::new(path.as_ref()).join(&file_path); - // Some files have insufficient permissions (like .lock file created by cargo in - // documentation directory). We are skipping this files. - let mut file = match fs::File::open(path) { - Ok(f) => f, - Err(_) => continue, - }; - let mut content: Vec = Vec::new(); - file.read_to_end(&mut content)?; - - let bucket_path = Path::new(prefix).join(&file_path); - - #[cfg(windows)] // On windows, we need to normalize \\ to / so the route logic works - let bucket_path = path_slash::PathBufExt::to_slash(&bucket_path).unwrap(); - #[cfg(not(windows))] - let bucket_path = bucket_path.into_os_string().into_string().unwrap(); - - let mime = detect_mime(&file_path)?; - - if let Some(client) = &client { - futures.push( - client - .put_object(PutObjectRequest { - bucket: S3_BUCKET_NAME.into(), - key: bucket_path.clone(), - body: Some(content.into()), - content_type: Some(mime.to_owned()), - ..Default::default() - }) - .inspect(|_| { - crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1); - }), - ); - } else { - // If AWS credentials are configured, don't insert/update the database - // check if file already exists in database - let rows = conn.query( - "SELECT COUNT(*) FROM files WHERE path = $1", - &[&bucket_path], - )?; - - if rows.get(0).get::(0) == 0 { - trans.query( - "INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)", - &[&bucket_path, &mime, &content], - )?; - } else { - trans.query( - "UPDATE files SET mime = $2, content = $3, date_updated = NOW() \ - WHERE path = $1", - &[&bucket_path, &mime, &content], - )?; - } - } - - file_paths_and_mimes.insert(file_path.clone(), mime.to_owned()); - } - - if !futures.is_empty() { - attempts += 1; - - match rt.block_on(::futures::future::join_all(futures)) { - Ok(_) => { - // this batch was successful, start another batch if there are still more files - batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS); - currently_uploading = to_upload.drain(..batch_size).collect(); - attempts = 0; - } - Err(err) => { - error!("failed to upload to s3: {:?}", err); - // if any futures error, leave `currently_uploading` in tact so that we can retry the batch - if attempts > 2 { - panic!("failed to upload 3 times, exiting"); - } - } - } - } else { - batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS); - currently_uploading = to_upload.drain(..batch_size).collect(); - } - } - - trans.commit()?; - - let file_list_with_mimes: Vec<(String, PathBuf)> = file_paths_and_mimes - .into_iter() - .map(|(file_path, mime)| (mime, file_path)) - .collect(); - file_list_to_json(file_list_with_mimes) -} - -fn detect_mime(file_path: &Path) -> Result<&'static str> { - let mime = mime_guess::from_path(file_path) - .first_raw() - .map(|m| m) - .unwrap_or("text/plain"); - Ok(match mime { - "text/plain" | "text/troff" | "text/x-markdown" | "text/x-rust" | "text/x-toml" => { - match file_path.extension().and_then(OsStr::to_str) { - Some("md") => "text/markdown", - Some("rs") => "text/rust", - Some("markdown") => "text/markdown", - Some("css") => "text/css", - Some("toml") => "text/toml", - Some("js") => "application/javascript", - Some("json") => "application/json", - _ => mime, - } - } - "image/svg" => "image/svg+xml", - _ => mime, - }) + let mut backend = Storage::new(conn); + let file_list = backend.store_all(conn, prefix, path.as_ref())?; + file_list_to_json(file_list.into_iter().collect()) } -fn file_list_to_json(file_list: Vec<(String, PathBuf)>) -> Result { +fn file_list_to_json(file_list: Vec<(PathBuf, String)>) -> Result { let mut file_list_json: Vec = Vec::new(); for file in file_list { - let mut v: Vec = Vec::new(); - v.push(file.0.clone()); - v.push(file.1.into_os_string().into_string().unwrap()); + let mut v: Vec = Vec::with_capacity(2); + v.push(file.1); + v.push(file.0.into_os_string().into_string().unwrap()); file_list_json.push(v.to_json()); } Ok(file_list_json.to_json()) } - -pub fn move_to_s3(conn: &Connection, n: usize) -> Result { - let trans = conn.transaction()?; - let client = s3_client().expect("configured s3"); - - let rows = trans.query( - &format!( - "SELECT path, mime, content FROM files WHERE content != E'in-s3' LIMIT {}", - n - ), - &[], - )?; - let count = rows.len(); - - let mut rt = ::tokio::runtime::Runtime::new().unwrap(); - let mut futures = Vec::new(); - for row in &rows { - let path: String = row.get(0); - let mime: String = row.get(1); - let content: Vec = row.get(2); - let path_1 = path.clone(); - futures.push( - client - .put_object(PutObjectRequest { - bucket: S3_BUCKET_NAME.into(), - key: path.clone(), - body: Some(content.into()), - content_type: Some(mime), - ..Default::default() - }) - .map(move |_| path_1) - .map_err(move |e| panic!("failed to upload to {}: {:?}", path, e)), - ); - } - - use ::futures::future::Future; - match rt.block_on(::futures::future::join_all(futures)) { - Ok(paths) => { - let statement = trans.prepare("DELETE FROM files WHERE path = $1").unwrap(); - for path in paths { - statement.execute(&[&path]).unwrap(); - } - } - Err(e) => { - panic!("results err: {:?}", e); - } - } - - trans.commit()?; - - Ok(count) -} - -#[cfg(test)] -mod test { - use super::*; - use std::env; - - #[test] - fn test_get_file_list() { - let _ = env_logger::try_init(); - - let files = get_file_list(env::current_dir().unwrap()); - assert!(files.is_ok()); - assert!(files.unwrap().len() > 0); - - let files = get_file_list(env::current_dir().unwrap().join("Cargo.toml")).unwrap(); - assert_eq!(files[0], std::path::Path::new("Cargo.toml")); - } - #[test] - fn test_mime_types() { - check_mime(".gitignore", "text/plain"); - check_mime("hello.toml", "text/toml"); - check_mime("hello.css", "text/css"); - check_mime("hello.js", "application/javascript"); - check_mime("hello.html", "text/html"); - check_mime("hello.hello.md", "text/markdown"); - check_mime("hello.markdown", "text/markdown"); - check_mime("hello.json", "application/json"); - check_mime("hello.txt", "text/plain"); - check_mime("file.rs", "text/rust"); - check_mime("important.svg", "image/svg+xml"); - } - - fn check_mime(path: &str, expected_mime: &str) { - let detected_mime = detect_mime(Path::new(&path)); - let detected_mime = detected_mime.expect("no mime was given"); - assert_eq!(detected_mime, expected_mime); - } -} diff --git a/src/db/mod.rs b/src/db/mod.rs index 503d22cf0..9d0e07a50 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -4,7 +4,7 @@ pub(crate) use self::add_package::add_build_into_database; pub(crate) use self::add_package::add_package_into_database; pub(crate) use self::add_package::CratesIoData; pub use self::delete_crate::delete_crate; -pub use self::file::{add_path_into_database, move_to_s3}; +pub use self::file::add_path_into_database; pub use self::migrate::migrate; use failure::Fail; diff --git a/src/lib.rs b/src/lib.rs index 354065ccc..c751db934 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ pub use self::web::Server; pub mod db; mod docbuilder; mod error; +pub mod storage; #[cfg(test)] mod test; pub mod utils; diff --git a/src/storage/database.rs b/src/storage/database.rs new file mode 100644 index 000000000..07ad4aefd --- /dev/null +++ b/src/storage/database.rs @@ -0,0 +1,98 @@ +use super::Blob; +use failure::{Error, Fail}; +use postgres::{transaction::Transaction, Connection}; + +#[derive(Debug, Fail)] +#[fail(display = "the path is not present in the database")] +struct PathNotFoundError; + +pub(crate) struct DatabaseBackend<'a> { + conn: &'a Connection, +} + +impl<'a> DatabaseBackend<'a> { + pub(crate) fn new(conn: &'a Connection) -> Self { + Self { conn } + } + + pub(super) fn get(&self, path: &str) -> Result { + let rows = self.conn.query( + "SELECT path, mime, date_updated, content FROM files WHERE path = $1;", + &[&path], + )?; + + if rows.is_empty() { + Err(PathNotFoundError.into()) + } else { + let row = rows.get(0); + Ok(Blob { + path: row.get("path"), + mime: row.get("mime"), + date_updated: row.get("date_updated"), + content: row.get("content"), + }) + } + } + pub(super) fn store_batch(&self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> { + for blob in batch { + trans.query( + "INSERT INTO files (path, mime, content) + VALUES ($1, $2, $3) + ON CONFLICT (path) DO UPDATE + SET mime = EXCLUDED.mime, content = EXCLUDED.content", + &[&blob.path, &blob.mime, &blob.content], + )?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use time::Timespec; + + #[test] + fn test_path_get() { + crate::test::wrapper(|env| { + let conn = env.db().conn(); + let backend = DatabaseBackend::new(&conn); + + // Add a test file to the database + conn.execute( + "INSERT INTO files (path, mime, date_updated, content) VALUES ($1, $2, $3, $4);", + &[ + &"dir/foo.txt", + &"text/plain", + &Timespec::new(42, 0), + &"Hello world!".as_bytes(), + ], + )?; + + // Test that the proper file was returned + assert_eq!( + Blob { + path: "dir/foo.txt".into(), + mime: "text/plain".into(), + date_updated: Timespec::new(42, 0), + content: "Hello world!".bytes().collect(), + }, + backend.get("dir/foo.txt")? + ); + + // Test that other files are not returned + assert!(backend + .get("dir/bar.txt") + .unwrap_err() + .downcast_ref::() + .is_some()); + assert!(backend + .get("foo.txt") + .unwrap_err() + .downcast_ref::() + .is_some()); + + Ok(()) + }); + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs new file mode 100644 index 000000000..830fb2430 --- /dev/null +++ b/src/storage/mod.rs @@ -0,0 +1,316 @@ +mod database; +pub(crate) mod s3; + +pub(crate) use self::database::DatabaseBackend; +pub(crate) use self::s3::S3Backend; +use failure::Error; +use time::Timespec; + +use failure::err_msg; +use postgres::{transaction::Transaction, Connection}; +use std::collections::HashMap; +use std::ffi::OsStr; +use std::fs; +use std::io::Read; +use std::path::{Path, PathBuf}; + +const MAX_CONCURRENT_UPLOADS: usize = 1000; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub(crate) struct Blob { + pub(crate) path: String, + pub(crate) mime: String, + pub(crate) date_updated: Timespec, + pub(crate) content: Vec, +} + +fn get_file_list_from_dir>(path: P, files: &mut Vec) -> Result<(), Error> { + let path = path.as_ref(); + + for file in path.read_dir()? { + let file = file?; + + if file.file_type()?.is_file() { + files.push(file.path()); + } else if file.file_type()?.is_dir() { + get_file_list_from_dir(file.path(), files)?; + } + } + + Ok(()) +} + +pub fn get_file_list>(path: P) -> Result, Error> { + let path = path.as_ref(); + let mut files = Vec::new(); + + if !path.exists() { + return Err(err_msg("File not found")); + } else if path.is_file() { + files.push(PathBuf::from(path.file_name().unwrap())); + } else if path.is_dir() { + get_file_list_from_dir(path, &mut files)?; + for file_path in &mut files { + // We want the paths in this list to not be {path}/bar.txt but just bar.txt + *file_path = PathBuf::from(file_path.strip_prefix(path).unwrap()); + } + } + + Ok(files) +} + +pub(crate) enum Storage<'a> { + Database(DatabaseBackend<'a>), + S3(S3Backend<'a>), +} + +impl<'a> Storage<'a> { + pub(crate) fn new(conn: &'a Connection) -> Self { + if let Some(c) = s3::s3_client() { + Storage::from(S3Backend::new(c, s3::S3_BUCKET_NAME)) + } else { + DatabaseBackend::new(conn).into() + } + } + pub(crate) fn get(&self, path: &str) -> Result { + match self { + Self::Database(db) => db.get(path), + Self::S3(s3) => s3.get(path), + } + } + + fn store_batch(&mut self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> { + match self { + Self::Database(db) => db.store_batch(batch, trans), + Self::S3(s3) => s3.store_batch(batch), + } + } + + // Store all files in `root_dir` into the backend under `prefix`. + // + // If the environment is configured with S3 credentials, this will upload to S3; + // otherwise, this will store files in the database. + // + // This returns a HashMap. + pub(crate) fn store_all( + &mut self, + conn: &Connection, + prefix: &str, + root_dir: &Path, + ) -> Result, Error> { + let trans = conn.transaction()?; + let mut file_paths_and_mimes = HashMap::new(); + + let mut blobs = get_file_list(root_dir)? + .into_iter() + .filter_map(|file_path| { + // Some files have insufficient permissions + // (like .lock file created by cargo in documentation directory). + // Skip these files. + fs::File::open(root_dir.join(&file_path)) + .ok() + .map(|file| (file_path, file)) + }) + .map(|(file_path, mut file)| -> Result<_, Error> { + let mut content: Vec = Vec::new(); + file.read_to_end(&mut content)?; + + let bucket_path = Path::new(prefix).join(&file_path); + + #[cfg(windows)] // On windows, we need to normalize \\ to / so the route logic works + let bucket_path = path_slash::PathBufExt::to_slash(&bucket_path).unwrap(); + #[cfg(not(windows))] + let bucket_path = bucket_path.into_os_string().into_string().unwrap(); + + let mime = detect_mime(&file_path)?; + + file_paths_and_mimes.insert(file_path, mime.to_string()); + Ok(Blob { + path: bucket_path, + mime: mime.to_string(), + content, + // this field is ignored by the backend + date_updated: Timespec::new(0, 0), + }) + }); + loop { + let batch: Vec<_> = blobs + .by_ref() + .take(MAX_CONCURRENT_UPLOADS) + .collect::>()?; + if batch.is_empty() { + break; + } + self.store_batch(&batch, &trans)?; + } + + trans.commit()?; + Ok(file_paths_and_mimes) + } +} + +fn detect_mime(file_path: &Path) -> Result<&'static str, Error> { + let mime = mime_guess::from_path(file_path) + .first_raw() + .map(|m| m) + .unwrap_or("text/plain"); + Ok(match mime { + "text/plain" | "text/troff" | "text/x-markdown" | "text/x-rust" | "text/x-toml" => { + match file_path.extension().and_then(OsStr::to_str) { + Some("md") => "text/markdown", + Some("rs") => "text/rust", + Some("markdown") => "text/markdown", + Some("css") => "text/css", + Some("toml") => "text/toml", + Some("js") => "application/javascript", + Some("json") => "application/json", + _ => mime, + } + } + "image/svg" => "image/svg+xml", + _ => mime, + }) +} + +impl<'a> From> for Storage<'a> { + fn from(db: DatabaseBackend<'a>) -> Self { + Self::Database(db) + } +} + +impl<'a> From> for Storage<'a> { + fn from(db: S3Backend<'a>) -> Self { + Self::S3(db) + } +} + +#[cfg(test)] +mod test { + extern crate env_logger; + use super::*; + use crate::test::wrapper; + use std::env; + + pub(crate) fn assert_blob_eq(blob: &Blob, actual: &Blob) { + assert_eq!(blob.path, actual.path); + assert_eq!(blob.content, actual.content); + assert_eq!(blob.mime, actual.mime); + // NOTE: this does _not_ compare the upload time since min.io doesn't allow this to be configured + } + + pub(crate) fn test_roundtrip(blobs: &[Blob]) { + let dir = tempdir::TempDir::new("docs.rs-upload-test").unwrap(); + for blob in blobs { + let path = dir.path().join(&blob.path); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).unwrap(); + } + fs::write(path, &blob.content).expect("failed to write to file"); + } + wrapper(|env| { + let db = env.db(); + let conn = db.conn(); + let mut backend = Storage::Database(DatabaseBackend::new(&conn)); + let stored_files = backend.store_all(&conn, "", dir.path()).unwrap(); + assert_eq!(stored_files.len(), blobs.len()); + for blob in blobs { + let name = Path::new(&blob.path); + assert!(stored_files.contains_key(name)); + + let actual = backend.get(&blob.path).unwrap(); + assert_blob_eq(blob, &actual); + } + + Ok(()) + }); + } + + #[test] + fn test_uploads() { + use std::fs; + let dir = tempdir::TempDir::new("docs.rs-upload-test").unwrap(); + let files = ["Cargo.toml", "src/main.rs"]; + for &file in &files { + let path = dir.path().join(file); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).unwrap(); + } + fs::write(path, "data").expect("failed to write to file"); + } + wrapper(|env| { + let db = env.db(); + let conn = db.conn(); + let mut backend = Storage::Database(DatabaseBackend::new(&conn)); + let stored_files = backend.store_all(&conn, "rustdoc", dir.path()).unwrap(); + assert_eq!(stored_files.len(), files.len()); + for name in &files { + let name = Path::new(name); + assert!(stored_files.contains_key(name)); + } + assert_eq!( + stored_files.get(Path::new("Cargo.toml")).unwrap(), + "text/toml" + ); + assert_eq!( + stored_files.get(Path::new("src/main.rs")).unwrap(), + "text/rust" + ); + + let file = backend.get("rustdoc/Cargo.toml").unwrap(); + assert_eq!(file.content, b"data"); + assert_eq!(file.mime, "text/toml"); + assert_eq!(file.path, "rustdoc/Cargo.toml"); + + let file = backend.get("rustdoc/src/main.rs").unwrap(); + assert_eq!(file.content, b"data"); + assert_eq!(file.mime, "text/rust"); + assert_eq!(file.path, "rustdoc/src/main.rs"); + Ok(()) + }) + } + + #[test] + fn test_batched_uploads() { + let uploads: Vec<_> = (0..=MAX_CONCURRENT_UPLOADS + 1) + .map(|i| Blob { + mime: "text/rust".into(), + content: "fn main() {}".into(), + path: format!("{}.rs", i), + date_updated: Timespec::new(42, 0), + }) + .collect(); + test_roundtrip(&uploads); + } + + #[test] + fn test_get_file_list() { + let _ = env_logger::try_init(); + + let files = get_file_list(env::current_dir().unwrap()); + assert!(files.is_ok()); + assert!(files.unwrap().len() > 0); + + let files = get_file_list(env::current_dir().unwrap().join("Cargo.toml")).unwrap(); + assert_eq!(files[0], std::path::Path::new("Cargo.toml")); + } + #[test] + fn test_mime_types() { + check_mime(".gitignore", "text/plain"); + check_mime("hello.toml", "text/toml"); + check_mime("hello.css", "text/css"); + check_mime("hello.js", "application/javascript"); + check_mime("hello.html", "text/html"); + check_mime("hello.hello.md", "text/markdown"); + check_mime("hello.markdown", "text/markdown"); + check_mime("hello.json", "application/json"); + check_mime("hello.txt", "text/plain"); + check_mime("file.rs", "text/rust"); + check_mime("important.svg", "image/svg+xml"); + } + + fn check_mime(path: &str, expected_mime: &str) { + let detected_mime = detect_mime(Path::new(&path)); + let detected_mime = detected_mime.expect("no mime was given"); + assert_eq!(detected_mime, expected_mime); + } +} diff --git a/src/storage/s3.rs b/src/storage/s3.rs new file mode 100644 index 000000000..6cf3fbf95 --- /dev/null +++ b/src/storage/s3.rs @@ -0,0 +1,211 @@ +use super::Blob; +use failure::Error; +use futures::Future; +use log::{error, warn}; +use rusoto_core::region::Region; +use rusoto_credential::DefaultCredentialsProvider; +use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; +use std::convert::TryInto; +use std::io::Read; +use time::Timespec; +use tokio::runtime::Runtime; + +#[cfg(test)] +mod test; +#[cfg(test)] +pub(crate) use test::TestS3; + +pub(crate) static S3_BUCKET_NAME: &str = "rust-docs-rs"; + +pub(crate) struct S3Backend<'a> { + client: S3Client, + bucket: &'a str, + runtime: Runtime, +} + +impl<'a> S3Backend<'a> { + pub(crate) fn new(client: S3Client, bucket: &'a str) -> Self { + Self { + client, + bucket, + runtime: Runtime::new().unwrap(), + } + } + + pub(super) fn get(&self, path: &str) -> Result { + let res = self + .client + .get_object(GetObjectRequest { + bucket: self.bucket.to_string(), + key: path.into(), + ..Default::default() + }) + .sync()?; + + let mut b = res.body.unwrap().into_blocking_read(); + let mut content = Vec::with_capacity( + res.content_length + .and_then(|l| l.try_into().ok()) + .unwrap_or(0), + ); + b.read_to_end(&mut content).unwrap(); + + let date_updated = parse_timespec(&res.last_modified.unwrap())?; + + Ok(Blob { + path: path.into(), + mime: res.content_type.unwrap(), + date_updated, + content, + }) + } + + pub(super) fn store_batch(&mut self, batch: &[Blob]) -> Result<(), Error> { + use futures::stream::FuturesUnordered; + use futures::stream::Stream; + let mut attempts = 0; + + loop { + let mut futures = FuturesUnordered::new(); + for blob in batch { + futures.push( + self.client + .put_object(PutObjectRequest { + bucket: self.bucket.to_string(), + key: blob.path.clone(), + body: Some(blob.content.clone().into()), + content_type: Some(blob.mime.clone()), + ..Default::default() + }) + .inspect(|_| { + crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1); + }), + ); + } + attempts += 1; + + match self.runtime.block_on(futures.map(drop).collect()) { + // this batch was successful, start another batch if there are still more files + Ok(_) => break, + Err(err) => { + error!("failed to upload to s3: {:?}", err); + // if a futures error occurs, retry the batch + if attempts > 2 { + panic!("failed to upload 3 times, exiting"); + } + } + } + } + Ok(()) + } +} + +fn parse_timespec(raw: &str) -> Result { + const TIME_FMT: &str = "%a, %d %b %Y %H:%M:%S %Z"; + Ok(time::strptime(raw, TIME_FMT)?.to_timespec()) +} + +pub(crate) fn s3_client() -> Option { + // If AWS keys aren't configured, then presume we should use the DB exclusively + // for file storage. + if std::env::var_os("AWS_ACCESS_KEY_ID").is_none() && std::env::var_os("FORCE_S3").is_none() { + return None; + } + let creds = match DefaultCredentialsProvider::new() { + Ok(creds) => creds, + Err(err) => { + warn!("failed to retrieve AWS credentials: {}", err); + return None; + } + }; + Some(S3Client::new_with( + rusoto_core::request::HttpClient::new().unwrap(), + creds, + std::env::var("S3_ENDPOINT") + .ok() + .map(|e| Region::Custom { + name: std::env::var("S3_REGION").unwrap_or_else(|_| "us-west-1".to_owned()), + endpoint: e, + }) + .unwrap_or(Region::UsWest1), + )) +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + use crate::test::*; + use std::slice; + + #[test] + fn test_parse_timespec() { + // Test valid conversions + assert_eq!( + parse_timespec("Thu, 1 Jan 1970 00:00:00 GMT").unwrap(), + Timespec::new(0, 0) + ); + assert_eq!( + parse_timespec("Mon, 16 Apr 2018 04:33:50 GMT").unwrap(), + Timespec::new(1523853230, 0) + ); + + // Test invalid conversion + assert!(parse_timespec("foo").is_err()); + } + + #[test] + fn test_get() { + wrapper(|env| { + let blob = Blob { + path: "dir/foo.txt".into(), + mime: "text/plain".into(), + date_updated: Timespec::new(42, 0), + content: "Hello world!".into(), + }; + + // Add a test file to the database + let s3 = env.s3(); + s3.upload(slice::from_ref(&blob)).unwrap(); + + // Test that the proper file was returned + s3.assert_blob(&blob, "dir/foo.txt"); + + // Test that other files are not returned + s3.assert_404("dir/bar.txt"); + s3.assert_404("foo.txt"); + + Ok(()) + }); + } + + #[test] + fn test_store() { + wrapper(|env| { + let s3 = env.s3(); + let names = [ + "a", + "b", + "a_very_long_file_name_that_has_an.extension", + "parent/child", + "h/i/g/h/l/y/_/n/e/s/t/e/d/_/d/i/r/e/c/t/o/r/i/e/s", + ]; + let blobs: Vec<_> = names + .iter() + .map(|&path| Blob { + path: path.into(), + mime: "text/plain".into(), + date_updated: Timespec::new(42, 0), + content: "Hello world!".into(), + }) + .collect(); + s3.upload(&blobs).unwrap(); + for blob in &blobs { + s3.assert_blob(blob, &blob.path); + } + Ok(()) + }) + } + // NOTE: trying to upload a file ending with `/` will behave differently in test and prod. + // NOTE: On s3, it will succeed and create a file called `/`. + // NOTE: On min.io, it will fail with 'Object name contains unsupported characters.' +} diff --git a/src/storage/s3/test.rs b/src/storage/s3/test.rs new file mode 100644 index 000000000..937f20127 --- /dev/null +++ b/src/storage/s3/test.rs @@ -0,0 +1,77 @@ +use super::*; +use crate::storage::test::assert_blob_eq; +use rusoto_s3::{ + CreateBucketRequest, DeleteBucketRequest, DeleteObjectRequest, ListObjectsRequest, S3, +}; +use std::cell::RefCell; + +pub(crate) struct TestS3(RefCell>); + +impl TestS3 { + pub(crate) fn new() -> Self { + // A random bucket name is generated and used for the current connection. + // This allows each test to create a fresh bucket to test with. + let bucket = format!("docs-rs-test-bucket-{}", rand::random::()); + let client = s3_client().unwrap(); + client + .create_bucket(CreateBucketRequest { + bucket: bucket.clone(), + ..Default::default() + }) + .sync() + .expect("failed to create test bucket"); + let bucket = Box::leak(bucket.into_boxed_str()); + TestS3(RefCell::new(S3Backend::new(client, bucket))) + } + pub(crate) fn upload(&self, blobs: &[Blob]) -> Result<(), Error> { + self.0.borrow_mut().store_batch(blobs) + } + pub(crate) fn assert_404(&self, path: &'static str) { + use rusoto_core::RusotoError; + use rusoto_s3::GetObjectError; + + let err = self.0.borrow().get(path).unwrap_err(); + match err + .downcast_ref::>() + .expect("wanted GetObject") + { + RusotoError::Unknown(http) => assert_eq!(http.status, 404), + RusotoError::Service(GetObjectError::NoSuchKey(_)) => {} + x => panic!("wrong error: {:?}", x), + }; + } + pub(crate) fn assert_blob(&self, blob: &Blob, path: &str) { + let actual = self.0.borrow().get(path).unwrap(); + assert_blob_eq(blob, &actual); + } +} + +impl Drop for TestS3 { + fn drop(&mut self) { + // delete the bucket when the test ends + // this has to delete all the objects in the bucket first or min.io will give an error + let inner = self.0.borrow(); + let list_req = ListObjectsRequest { + bucket: inner.bucket.to_owned(), + ..Default::default() + }; + let objects = inner.client.list_objects(list_req).sync().unwrap(); + assert!(!objects.is_truncated.unwrap_or(false)); + for path in objects.contents.unwrap() { + let delete_req = DeleteObjectRequest { + bucket: inner.bucket.to_owned(), + key: path.key.unwrap(), + ..Default::default() + }; + inner.client.delete_object(delete_req).sync().unwrap(); + } + let delete_req = DeleteBucketRequest { + bucket: inner.bucket.to_owned(), + }; + inner + .client + .delete_bucket(delete_req) + .sync() + .expect("failed to delete test bucket"); + } +} diff --git a/src/test/mod.rs b/src/test/mod.rs index 0be67c5f9..2d7d3f0bd 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -1,5 +1,6 @@ mod fakes; +use crate::storage::s3::TestS3; use crate::web::Server; use failure::Error; use log::error; @@ -89,6 +90,7 @@ pub(crate) fn assert_redirect( pub(crate) struct TestEnvironment { db: OnceCell, frontend: OnceCell, + s3: OnceCell, } impl TestEnvironment { @@ -98,6 +100,7 @@ impl TestEnvironment { Self { db: OnceCell::new(), frontend: OnceCell::new(), + s3: OnceCell::new(), } } @@ -115,6 +118,10 @@ impl TestEnvironment { pub(crate) fn frontend(&self) -> &TestFrontend { self.frontend.get_or_init(|| TestFrontend::new(self.db())) } + + pub(crate) fn s3(&self) -> &TestS3 { + self.s3.get_or_init(TestS3::new) + } } pub(crate) struct TestDatabase { diff --git a/src/web/file.rs b/src/web/file.rs index 1cd33df91..a1b5251e9 100644 --- a/src/web/file.rs +++ b/src/web/file.rs @@ -6,7 +6,7 @@ use iron::status; use iron::{Handler, IronError, IronResult, Request, Response}; use postgres::Connection; -pub struct File(pub db::file::Blob); +pub(crate) struct File(pub(crate) db::file::Blob); impl File { /// Gets file from database