Skip to content

Commit 9fa59ae

Browse files
committed
WIPWIP
1 parent 6049732 commit 9fa59ae

File tree

4 files changed

+39
-8
lines changed

4 files changed

+39
-8
lines changed

src/diskio/immediate.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,7 @@ impl Executor for ImmediateUnpacker {
2121
true
2222
}
2323

24-
fn join(&mut self) {}
24+
fn join(&mut self, marker: mut Item) -> Option<Item> {
25+
marker
26+
}
2527
}

src/diskio/mod.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,12 @@
5454
pub mod immediate;
5555
pub mod threaded;
5656

57+
use std::env;
5758
use std::fs::OpenOptions;
5859
use std::io::{self, Write};
5960
use std::path::{Path, PathBuf};
6061

62+
use lazy_static::lazy_static;
6163
use time::precise_time_s;
6264

6365
#[derive(Debug)]
@@ -129,9 +131,11 @@ pub trait Executor {
129131
/// always be ready for work if they have no in-progress work.
130132
fn ready(&mut self) -> bool;
131133

132-
// Wrap up any pending operations and close the transmit channel
133-
// so that rx.iter() can be used (and thus a race-free termination).
134-
fn join(&mut self);
134+
// Wrap up any pending operations and send marker back when done
135+
// (or return it imnmediately).
136+
// This permits blocking rx.iter() calls to avoid races with slow
137+
// IO.
138+
fn join(&mut self, mut marker: Item) -> Option<Item>;
135139
}
136140

137141
/// Trivial single threaded IO to be used from executors.
@@ -183,3 +187,25 @@ pub fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
183187
trace_scoped!("create_dir", "name": path_display);
184188
std::fs::create_dir(path)
185189
}
190+
191+
/// Get the executor for disk IO.
192+
pub fn get_executor() -> &'static dyn Executor {
193+
lazy_static! {
194+
static ref EXECUTOR: Box<dyn Executor> =
195+
// If this gets lots of use, consider exposing via the config file.
196+
if let Ok(thread_str) = env::var("RUSTUP_IO_THREADS") {
197+
if thread_str == "disabled" {
198+
Box::new(immediate::ImmediateUnpacker::new())
199+
} else {
200+
if let Ok(thread_count) = thread_str.parse::<usize>() {
201+
Box::new(threaded::Threaded::new_with_threads(tx, notify_handler, thread_count))
202+
} else {
203+
Box::new(threaded::Threaded::new(tx, notify_handler))
204+
}
205+
}
206+
} else {
207+
Box::new(threaded::Threaded::new(tx, notify_handler))
208+
};
209+
}
210+
&EXECUTOR
211+
}

src/diskio/threaded.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ impl<'a> Executor for Threaded<'a> {
7979
self.pool.queued_count() < 5
8080
}
8181

82-
fn join(&mut self) {
82+
83+
fn join(&mut self, marker: mut Item) {
8384
// Some explanation is in order. Even though the tar we are reading from (if
8485
// any) will have had its FileWithProgress download tracking
8586
// completed before we hit drop, that is not true if we are unwinding due to a
@@ -125,8 +126,9 @@ impl<'a> Executor for Threaded<'a> {
125126
self.notify_handler
126127
.map(|handler| handler(Notification::DownloadPopUnits));
127128
// close the feedback channel so that blocking reads on it can
128-
// complete.
129-
self.tx = None;
129+
// complete. send is atomic, and we know the threads completed from the
130+
// pool join, so this is race-free.
131+
self.tx.send(marker);
130132
}
131133
}
132134

src/dist/component/package.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,8 @@ fn unpack_without_first_dir<'a, R: Read>(
340340
filter_result(prev_item).chain_err(|| ErrorKind::ExtractingPackage)?;
341341
}
342342
}
343-
io_executor.join();
343+
let marker = Item {}
344+
io_executor.join(marker);
344345
// And drain final results
345346
for item in rx.try_iter() {
346347
// TODO capture metrics, add directories to created cache

0 commit comments

Comments
 (0)