Skip to content

Commit 89b58a1

Browse files
KixironJoshua Nelson
authored and
Joshua Nelson
committed
Worked on async uploading and switched to futures-util
1 parent 81b98a4 commit 89b58a1

File tree

6 files changed

+101
-59
lines changed

6 files changed

+101
-59
lines changed

Cargo.lock

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ rusoto_core = "0.43"
5050
rusoto_credential = "0.43"
5151

5252
# Async
53-
futures = "0.3"
53+
futures-util = "0.3"
5454
tokio = { version = "0.2", features = ["rt-threaded"] }
5555

5656
[target.'cfg(not(windows))'.dependencies]

src/db/delete_crate.rs

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use crate::storage::s3::{s3_client, S3_BUCKET_NAME};
1+
use crate::storage::s3::{s3_client, S3Backend, S3_BUCKET_NAME};
22
use failure::{Error, Fail};
3-
use futures::executor;
43
use postgres::Connection;
5-
use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3Client, S3};
4+
use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3};
65

76
/// List of directories in docs.rs's underlying storage (either the database or S3) containing a
87
/// subdirectory named after the crate. Those subdirectories will be deleted.
@@ -23,8 +22,9 @@ pub fn delete_crate(conn: &Connection, name: &str) -> Result<(), Error> {
2322
};
2423

2524
delete_from_database(conn, name, crate_id)?;
26-
if let Some(s3) = s3_client() {
27-
delete_from_s3(&s3, name)?;
25+
if let Some(client) = s3_client() {
26+
let mut backend = S3Backend::new(client, S3_BUCKET_NAME);
27+
delete_from_s3(&mut backend, name)?;
2828
}
2929

3030
Ok(())
@@ -69,22 +69,26 @@ fn delete_from_database(conn: &Connection, name: &str, crate_id: i32) -> Result<
6969
Ok(())
7070
}
7171

72-
fn delete_from_s3(s3: &S3Client, name: &str) -> Result<(), Error> {
72+
fn delete_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error> {
7373
for prefix in STORAGE_PATHS_TO_DELETE {
7474
delete_prefix_from_s3(s3, &format!("{}/{}/", prefix, name))?;
7575
}
76+
7677
Ok(())
7778
}
7879

79-
fn delete_prefix_from_s3(s3: &S3Client, name: &str) -> Result<(), Error> {
80+
fn delete_prefix_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error> {
8081
let mut continuation_token = None;
82+
8183
loop {
82-
let list = executor::block_on(s3.list_objects_v2(ListObjectsV2Request {
83-
bucket: S3_BUCKET_NAME.into(),
84-
prefix: Some(name.into()),
85-
continuation_token,
86-
..ListObjectsV2Request::default()
87-
}))?;
84+
let list =
85+
s3.runtime_handle()
86+
.block_on(s3.client().list_objects_v2(ListObjectsV2Request {
87+
bucket: S3_BUCKET_NAME.into(),
88+
prefix: Some(name.into()),
89+
continuation_token,
90+
..ListObjectsV2Request::default()
91+
}))?;
8892

8993
let to_delete = list
9094
.contents
@@ -97,19 +101,22 @@ fn delete_prefix_from_s3(s3: &S3Client, name: &str) -> Result<(), Error> {
97101
})
98102
.collect::<Vec<_>>();
99103

100-
let resp = executor::block_on(s3.delete_objects(DeleteObjectsRequest {
101-
bucket: S3_BUCKET_NAME.into(),
102-
delete: rusoto_s3::Delete {
103-
objects: to_delete,
104-
quiet: None,
105-
},
106-
..DeleteObjectsRequest::default()
107-
}))?;
104+
let resp =
105+
s3.runtime_handle()
106+
.block_on(s3.client().delete_objects(DeleteObjectsRequest {
107+
bucket: S3_BUCKET_NAME.into(),
108+
delete: rusoto_s3::Delete {
109+
objects: to_delete,
110+
quiet: None,
111+
},
112+
..DeleteObjectsRequest::default()
113+
}))?;
108114

109115
if let Some(errs) = resp.errors {
110116
for err in &errs {
111117
log::error!("error deleting file from s3: {:?}", err);
112118
}
119+
113120
failure::bail!("uploading to s3 failed");
114121
}
115122

src/storage/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub fn get_file_list<P: AsRef<Path>>(path: P) -> Result<Vec<PathBuf>, Error> {
6161

6262
pub(crate) enum Storage<'a> {
6363
Database(DatabaseBackend<'a>),
64-
S3(S3Backend<'a>),
64+
S3(Box<S3Backend<'a>>),
6565
}
6666

6767
impl<'a> Storage<'a> {
@@ -79,9 +79,9 @@ impl<'a> Storage<'a> {
7979
}
8080
}
8181

82-
fn store_batch(&mut self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> {
82+
fn store_batch(&mut self, batch: Vec<Blob>, trans: &Transaction) -> Result<(), Error> {
8383
match self {
84-
Self::Database(db) => db.store_batch(batch, trans),
84+
Self::Database(db) => db.store_batch(&batch, trans),
8585
Self::S3(s3) => s3.store_batch(batch),
8686
}
8787
}
@@ -141,7 +141,7 @@ impl<'a> Storage<'a> {
141141
if batch.is_empty() {
142142
break;
143143
}
144-
self.store_batch(&batch, &trans)?;
144+
self.store_batch(batch, &trans)?;
145145
}
146146

147147
trans.commit()?;
@@ -180,7 +180,7 @@ impl<'a> From<DatabaseBackend<'a>> for Storage<'a> {
180180

181181
impl<'a> From<S3Backend<'a>> for Storage<'a> {
182182
fn from(db: S3Backend<'a>) -> Self {
183-
Self::S3(db)
183+
Self::S3(Box::new(db))
184184
}
185185
}
186186

src/storage/s3.rs

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
use super::Blob;
22
use failure::Error;
3-
use futures::future::{self, FutureExt};
3+
use futures_util::{
4+
future::{FutureExt, TryFutureExt},
5+
stream::{FuturesUnordered, StreamExt},
6+
};
47
use log::{error, warn};
5-
use rusoto_core::{region::Region, RusotoError};
8+
use rusoto_core::region::Region;
69
use rusoto_credential::DefaultCredentialsProvider;
7-
use rusoto_s3::{
8-
GetObjectRequest, PutObjectError, PutObjectOutput, PutObjectRequest, S3Client, S3,
9-
};
10+
use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3};
1011
use std::convert::TryInto;
1112
use std::io::Read;
1213
use time::Timespec;
13-
use tokio::runtime::Runtime;
14+
use tokio::runtime::{Handle, Runtime};
1415

1516
#[cfg(test)]
1617
mod test;
@@ -70,12 +71,16 @@ impl<'a> S3Backend<'a> {
7071
})
7172
}
7273

73-
pub(super) fn store_batch(&mut self, batch: &[Blob]) -> Result<(), Error> {
74+
pub(super) fn store_batch(&mut self, mut uploads: Vec<Blob>) -> Result<(), Error> {
7475
let mut attempts = 0;
7576

7677
loop {
77-
let mut futures = Vec::with_capacity(batch.len());
78-
for blob in batch {
78+
// `FuturesUnordered` is used because the order of execution doesn't
79+
// matter, we just want things to execute as fast as possible
80+
let futures = FuturesUnordered::new();
81+
82+
// Drain uploads, filling `futures` with upload requests
83+
for blob in uploads.drain(..) {
7984
futures.push(
8085
self.client
8186
.put_object(PutObjectRequest {
@@ -85,33 +90,52 @@ impl<'a> S3Backend<'a> {
8590
content_type: Some(blob.mime.clone()),
8691
..Default::default()
8792
})
88-
.inspect(|_| {
93+
// Drop the value returned by `put_object` because we don't need it,
94+
// emit an error and replace the error values with the blob that failed
95+
// to upload so that we can retry failed uploads
96+
.map(|resp| match resp {
97+
Ok(..) => Ok(()),
98+
Err(err) => {
99+
error!("failed to upload file to s3: {:?}", err);
100+
Err(blob)
101+
}
102+
})
103+
.inspect_ok(|_| {
104+
// Increment the total uploaded files when a file is uploaded
89105
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);
90106
}),
91107
);
92108
}
93109
attempts += 1;
94110

95-
let result: Result<Vec<PutObjectOutput>, RusotoError<PutObjectError>> = self
96-
.runtime
97-
.block_on(future::join_all(futures))
98-
.into_iter()
99-
.collect();
100-
101-
match result {
102-
// this batch was successful, start another batch if there are still more files
103-
Ok(..) => break,
104-
Err(err) => {
105-
error!("failed to upload to s3: {:?}", err);
106-
// if a futures error occurs, retry the batch
107-
if attempts > 2 {
108-
panic!("failed to upload 3 times, exiting");
109-
}
110-
}
111+
// Collect all the failed uploads so that we can retry them
112+
uploads = self.runtime.block_on(
113+
futures
114+
.filter_map(|resp| async move { resp.err() })
115+
.collect(),
116+
);
117+
118+
// If there are no further uploads we were successful and can return
119+
if uploads.is_empty() {
120+
break;
121+
122+
// If more than three attempts to upload fail, return an error
123+
} else if attempts >= 3 {
124+
error!("failed to upload to s3, abandoning");
125+
failure::bail!("Failed to upload to s3 three times, abandoning");
111126
}
112127
}
128+
113129
Ok(())
114130
}
131+
132+
pub fn runtime_handle(&self) -> Handle {
133+
self.runtime.handle().clone()
134+
}
135+
136+
pub fn client(&self) -> &S3Client {
137+
&self.client
138+
}
115139
}
116140

117141
fn parse_timespec(raw: &str) -> Result<Timespec, Error> {
@@ -125,13 +149,15 @@ pub(crate) fn s3_client() -> Option<S3Client> {
125149
if std::env::var_os("AWS_ACCESS_KEY_ID").is_none() && std::env::var_os("FORCE_S3").is_none() {
126150
return None;
127151
}
152+
128153
let creds = match DefaultCredentialsProvider::new() {
129154
Ok(creds) => creds,
130155
Err(err) => {
131156
warn!("failed to retrieve AWS credentials: {}", err);
132157
return None;
133158
}
134159
};
160+
135161
Some(S3Client::new_with(
136162
rusoto_core::request::HttpClient::new().unwrap(),
137163
creds,
@@ -149,7 +175,6 @@ pub(crate) fn s3_client() -> Option<S3Client> {
149175
pub(crate) mod tests {
150176
use super::*;
151177
use crate::test::*;
152-
use std::slice;
153178

154179
#[test]
155180
fn test_parse_timespec() {
@@ -179,7 +204,7 @@ pub(crate) mod tests {
179204

180205
// Add a test file to the database
181206
let s3 = env.s3();
182-
s3.upload(slice::from_ref(&blob)).unwrap();
207+
s3.upload(vec![blob.clone()]).unwrap();
183208

184209
// Test that the proper file was returned
185210
s3.assert_blob(&blob, "dir/foo.txt");
@@ -215,7 +240,7 @@ pub(crate) mod tests {
215240
})
216241
.collect();
217242

218-
s3.upload(&blobs).unwrap();
243+
s3.upload(blobs.clone()).unwrap();
219244
for blob in &blobs {
220245
s3.assert_blob(blob, &blob.path);
221246
}

src/storage/s3/test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::*;
22
use crate::storage::test::assert_blob_eq;
3+
use rusoto_core::RusotoError;
34
use rusoto_s3::{
45
CreateBucketRequest, DeleteBucketRequest, DeleteObjectRequest, ListObjectsRequest, S3,
56
};
@@ -28,7 +29,7 @@ impl TestS3 {
2829
)))
2930
}
3031

31-
pub(crate) fn upload(&self, blobs: &[Blob]) -> Result<(), Error> {
32+
pub(crate) fn upload(&self, blobs: Vec<Blob>) -> Result<(), Error> {
3233
self.0.borrow_mut().store_batch(blobs)
3334
}
3435

0 commit comments

Comments
 (0)