Skip to content

Commit 6b63a1c

Browse files
committed
Auto merge of #1813 - smarnach:stream-s3-uploads, r=sgrif
Allow streaming uploads to S3. Currently the interface in the `s3` crate accepts the contents of an upload as a byte slice, which so far isn't much of a problem, since crate uploads are generally below 10MB. When uploading public database dumps to S3 (#1800), however, we don't want to keep the whole dump in memory at once. This change basically changes the type of the uploaded content from `&[u8]` to a generic type implementing `std::io::Read + Send + 'static`, which are the trait bounds `reqwest` requires for the request body. The PUT request to S3 needs to include a Content-Length header. Since a `std::io::Read` does not have a length, we need to add another parameter for the content length, and pass it on to `reqwest`.
2 parents 123cf65 + 44c93be commit 6b63a1c

File tree

3 files changed

+34
-23
lines changed

3 files changed

+34
-23
lines changed

src/bin/render-readmes.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,12 @@ fn main() {
128128
return;
129129
}
130130
let readme = readme.unwrap();
131+
let content_length = readme.len() as u64;
132+
let content = std::io::Cursor::new(readme);
131133
let readme_path = format!("readmes/{0}/{0}-{1}.html", krate_name, version.num);
132134
config
133135
.uploader
134-
.upload(&client, &readme_path, readme.into_bytes(), "text/html")
136+
.upload(&client, &readme_path, content, content_length, "text/html")
135137
.unwrap_or_else(|_| {
136138
panic!(
137139
"[{}-{}] Couldn't upload file to S3",

src/s3/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@ impl Bucket {
3838
}
3939
}
4040

41-
pub fn put(
41+
pub fn put<R: std::io::Read + Send + 'static>(
4242
&self,
4343
client: &reqwest::Client,
4444
path: &str,
45-
content: Vec<u8>,
45+
content: R,
46+
content_length: u64,
4647
content_type: &str,
4748
) -> reqwest::Result<reqwest::Response> {
4849
let path = if path.starts_with('/') {
@@ -59,7 +60,7 @@ impl Bucket {
5960
.header(header::AUTHORIZATION, auth)
6061
.header(header::CONTENT_TYPE, content_type)
6162
.header(header::DATE, date)
62-
.body(content)
63+
.body(reqwest::Body::sized(content, content_length))
6364
.send()?
6465
.error_for_status()
6566
}

src/uploaders.rs

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::util::{human, internal, CargoResult, ChainError, Maximums};
77

88
use std::env;
99
use std::fs::{self, File};
10-
use std::io::{Read, Write};
10+
use std::io::{Cursor, Read};
1111
use std::sync::Arc;
1212

1313
use crate::middleware::app::RequestApp;
@@ -83,30 +83,29 @@ impl Uploader {
8383

8484
/// Uploads a file using the configured uploader (either `S3`, `Local`).
8585
///
86-
/// It returns a a tuple containing the path of the uploaded file
87-
/// and its checksum.
88-
pub fn upload(
86+
/// It returns the path of the uploaded file.
87+
pub fn upload<R: std::io::Read + Send + 'static>(
8988
&self,
9089
client: &reqwest::Client,
9190
path: &str,
92-
body: Vec<u8>,
91+
mut content: R,
92+
content_length: u64,
9393
content_type: &str,
94-
) -> CargoResult<(Option<String>, Vec<u8>)> {
95-
let hash = hash(&body);
94+
) -> CargoResult<Option<String>> {
9695
match *self {
9796
Uploader::S3 { ref bucket, .. } => {
9897
bucket
99-
.put(client, path, body, content_type)
98+
.put(client, path, content, content_length, content_type)
10099
.map_err(|e| internal(&format_args!("failed to upload to S3: {}", e)))?;
101-
Ok((Some(String::from(path)), hash))
100+
Ok(Some(String::from(path)))
102101
}
103102
Uploader::Local => {
104103
let filename = env::current_dir().unwrap().join("local_uploads").join(path);
105104
let dir = filename.parent().unwrap();
106105
fs::create_dir_all(dir)?;
107106
let mut file = File::create(&filename)?;
108-
file.write_all(&body)?;
109-
Ok((filename.to_str().map(String::from), hash))
107+
std::io::copy(&mut content, &mut file)?;
108+
Ok(filename.to_str().map(String::from))
110109
}
111110
}
112111
}
@@ -120,13 +119,20 @@ impl Uploader {
120119
vers: &semver::Version,
121120
) -> CargoResult<Vec<u8>> {
122121
let app = Arc::clone(req.app());
123-
let (_, checksum) = {
124-
let path = Uploader::crate_path(&krate.name, &vers.to_string());
125-
let mut body = Vec::new();
126-
LimitErrorReader::new(req.body(), maximums.max_upload_size).read_to_end(&mut body)?;
127-
verify_tarball(krate, vers, &body, maximums.max_unpack_size)?;
128-
self.upload(app.http_client(), &path, body, "application/x-tar")?
129-
};
122+
let path = Uploader::crate_path(&krate.name, &vers.to_string());
123+
let mut body = Vec::new();
124+
LimitErrorReader::new(req.body(), maximums.max_upload_size).read_to_end(&mut body)?;
125+
verify_tarball(krate, vers, &body, maximums.max_unpack_size)?;
126+
let checksum = hash(&body);
127+
let content_length = body.len() as u64;
128+
let content = Cursor::new(body);
129+
self.upload(
130+
app.http_client(),
131+
&path,
132+
content,
133+
content_length,
134+
"application/x-tar",
135+
)?;
130136
Ok(checksum)
131137
}
132138

@@ -138,7 +144,9 @@ impl Uploader {
138144
readme: String,
139145
) -> CargoResult<()> {
140146
let path = Uploader::readme_path(crate_name, vers);
141-
self.upload(http_client, &path, readme.into_bytes(), "text/html")?;
147+
let content_length = readme.len() as u64;
148+
let content = Cursor::new(readme);
149+
self.upload(http_client, &path, content, content_length, "text/html")?;
142150
Ok(())
143151
}
144152
}

0 commit comments

Comments
 (0)