Skip to content

Commit 64a5c3b

Browse files
committed
Implement a basic event loop built on LittleLock
It's not guaranteed that there will always be an event loop to run, and this implementation will serve as an incredibly basic one which does not provide any I/O, but allows the scheduler to still run. cc #9128
1 parent 3ee5ef1 commit 64a5c3b

File tree

12 files changed

+429
-68
lines changed

12 files changed

+429
-68
lines changed

src/libextra/comm.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ pub fn rendezvous<T: Send>() -> (SyncPort<T>, SyncChan<T>) {
136136
#[cfg(test)]
137137
mod test {
138138
use comm::{DuplexStream, rendezvous};
139-
use std::rt::test::run_in_newsched_task;
139+
use std::rt::test::run_in_uv_task;
140140
use std::task::spawn_unlinked;
141141

142142

@@ -165,7 +165,7 @@ mod test {
165165
#[test]
166166
fn recv_a_lot() {
167167
// Rendezvous streams should be able to handle any number of messages being sent
168-
do run_in_newsched_task {
168+
do run_in_uv_task {
169169
let (port, chan) = rendezvous();
170170
do spawn {
171171
do 1000000.times { chan.send(()) }

src/libstd/rt/basic.rs

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
//! This is a basic event loop implementation not meant for any "real purposes"
12+
//! other than testing the scheduler and proving that it's possible to have a
13+
//! pluggable event loop.
14+
15+
use prelude::*;
16+
17+
use cast;
18+
use rt::rtio::{EventLoop, IoFactory, RemoteCallback, PausibleIdleCallback};
19+
use unstable::sync::Exclusive;
20+
use util;
21+
22+
/// This is the only exported function from this module.
23+
pub fn event_loop() -> ~EventLoop {
24+
~BasicLoop::new() as ~EventLoop
25+
}
26+
27+
struct BasicLoop {
28+
work: ~[~fn()], // pending work
29+
idle: Option<*BasicPausible>, // only one is allowed
30+
remotes: ~[(uint, ~fn())],
31+
next_remote: uint,
32+
messages: Exclusive<~[Message]>
33+
}
34+
35+
enum Message { RunRemote(uint), RemoveRemote(uint) }
36+
37+
struct Time {
38+
sec: u64,
39+
nsec: u64,
40+
}
41+
42+
impl Ord for Time {
43+
fn lt(&self, other: &Time) -> bool {
44+
self.sec < other.sec || self.nsec < other.nsec
45+
}
46+
}
47+
48+
impl BasicLoop {
49+
fn new() -> BasicLoop {
50+
BasicLoop {
51+
work: ~[],
52+
idle: None,
53+
next_remote: 0,
54+
remotes: ~[],
55+
messages: Exclusive::new(~[]),
56+
}
57+
}
58+
59+
/// Process everything in the work queue (continually)
60+
fn work(&mut self) {
61+
while self.work.len() > 0 {
62+
for work in util::replace(&mut self.work, ~[]).move_iter() {
63+
work();
64+
}
65+
}
66+
}
67+
68+
fn remote_work(&mut self) {
69+
let messages = unsafe {
70+
do self.messages.with |messages| {
71+
if messages.len() > 0 {
72+
Some(util::replace(messages, ~[]))
73+
} else {
74+
None
75+
}
76+
}
77+
};
78+
let messages = match messages {
79+
Some(m) => m, None => return
80+
};
81+
for message in messages.iter() {
82+
self.message(*message);
83+
}
84+
}
85+
86+
fn message(&mut self, message: Message) {
87+
match message {
88+
RunRemote(i) => {
89+
match self.remotes.iter().find(|& &(id, _)| id == i) {
90+
Some(&(_, ref f)) => (*f)(),
91+
None => unreachable!()
92+
}
93+
}
94+
RemoveRemote(i) => {
95+
match self.remotes.iter().position(|&(id, _)| id == i) {
96+
Some(i) => { self.remotes.remove(i); }
97+
None => unreachable!()
98+
}
99+
}
100+
}
101+
}
102+
103+
/// Run the idle callback if one is registered
104+
fn idle(&mut self) {
105+
unsafe {
106+
match self.idle {
107+
Some(idle) => {
108+
if (*idle).active {
109+
(*(*idle).work.get_ref())();
110+
}
111+
}
112+
None => {}
113+
}
114+
}
115+
}
116+
117+
fn has_idle(&self) -> bool {
118+
unsafe { self.idle.is_some() && (**self.idle.get_ref()).active }
119+
}
120+
}
121+
122+
impl EventLoop for BasicLoop {
123+
fn run(&mut self) {
124+
// Not exactly efficient, but it gets the job done.
125+
while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() {
126+
127+
self.work();
128+
self.remote_work();
129+
130+
if self.has_idle() {
131+
self.idle();
132+
continue
133+
}
134+
135+
unsafe {
136+
// We block here if we have no messages to process and we may
137+
// receive a message at a later date
138+
do self.messages.hold_and_wait |messages| {
139+
self.remotes.len() > 0 &&
140+
messages.len() == 0 &&
141+
self.work.len() == 0
142+
}
143+
}
144+
}
145+
}
146+
147+
fn callback(&mut self, f: ~fn()) {
148+
self.work.push(f);
149+
}
150+
151+
// XXX: Seems like a really weird requirement to have an event loop provide.
152+
fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
153+
let callback = ~BasicPausible::new(self);
154+
rtassert!(self.idle.is_none());
155+
unsafe {
156+
let cb_ptr: &*BasicPausible = cast::transmute(&callback);
157+
self.idle = Some(*cb_ptr);
158+
}
159+
return callback as ~PausibleIdleCallback;
160+
}
161+
162+
fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback {
163+
let id = self.next_remote;
164+
self.next_remote += 1;
165+
self.remotes.push((id, f));
166+
~BasicRemote::new(self.messages.clone(), id) as ~RemoteCallback
167+
}
168+
169+
/// This has no bindings for local I/O
170+
fn io<'a>(&'a mut self, _: &fn(&'a mut IoFactory)) {}
171+
}
172+
173+
struct BasicRemote {
174+
queue: Exclusive<~[Message]>,
175+
id: uint,
176+
}
177+
178+
impl BasicRemote {
179+
fn new(queue: Exclusive<~[Message]>, id: uint) -> BasicRemote {
180+
BasicRemote { queue: queue, id: id }
181+
}
182+
}
183+
184+
impl RemoteCallback for BasicRemote {
185+
fn fire(&mut self) {
186+
unsafe {
187+
do self.queue.hold_and_signal |queue| {
188+
queue.push(RunRemote(self.id));
189+
}
190+
}
191+
}
192+
}
193+
194+
impl Drop for BasicRemote {
195+
fn drop(&mut self) {
196+
unsafe {
197+
do self.queue.hold_and_signal |queue| {
198+
queue.push(RemoveRemote(self.id));
199+
}
200+
}
201+
}
202+
}
203+
204+
struct BasicPausible {
205+
eloop: *mut BasicLoop,
206+
work: Option<~fn()>,
207+
active: bool,
208+
}
209+
210+
impl BasicPausible {
211+
fn new(eloop: &mut BasicLoop) -> BasicPausible {
212+
BasicPausible {
213+
active: false,
214+
work: None,
215+
eloop: eloop,
216+
}
217+
}
218+
}
219+
220+
impl PausibleIdleCallback for BasicPausible {
221+
fn start(&mut self, f: ~fn()) {
222+
rtassert!(!self.active && self.work.is_none());
223+
self.active = true;
224+
self.work = Some(f);
225+
}
226+
fn pause(&mut self) {
227+
self.active = false;
228+
}
229+
fn resume(&mut self) {
230+
self.active = true;
231+
}
232+
fn close(&mut self) {
233+
self.active = false;
234+
self.work = None;
235+
}
236+
}
237+
238+
impl Drop for BasicPausible {
239+
fn drop(&mut self) {
240+
unsafe {
241+
(*self.eloop).idle = None;
242+
}
243+
}
244+
}
245+
246+
fn time() -> Time {
247+
#[fixed_stack_segment]; #[inline(never)];
248+
extern {
249+
fn get_time(sec: &mut i64, nsec: &mut i32);
250+
}
251+
let mut sec = 0;
252+
let mut nsec = 0;
253+
unsafe { get_time(&mut sec, &mut nsec) }
254+
255+
Time { sec: sec as u64, nsec: nsec as u64 }
256+
}

src/libstd/rt/io/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,13 @@ pub fn standard_error(kind: IoErrorKind) -> IoError {
606606
detail: None
607607
}
608608
}
609+
IoUnavailable => {
610+
IoError {
611+
kind: IoUnavailable,
612+
desc: "I/O is unavailable",
613+
detail: None
614+
}
615+
}
609616
_ => fail!()
610617
}
611618
}

src/libstd/rt/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ pub mod shouldnt_be_public {
102102
// Internal macros used by the runtime.
103103
mod macros;
104104

105+
/// Basic implementation of an EventLoop, provides no I/O interfaces
106+
mod basic;
107+
105108
/// The global (exchange) heap.
106109
pub mod global_heap;
107110

src/libstd/rt/sched.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ pub struct Scheduler {
6262
/// no longer try to go to sleep, but exit instead.
6363
no_sleep: bool,
6464
stack_pool: StackPool,
65-
/// The event loop used to drive the scheduler and perform I/O
66-
event_loop: ~EventLoop,
6765
/// The scheduler runs on a special task. When it is not running
6866
/// it is stored here instead of the work queue.
6967
priv sched_task: Option<~Task>,
@@ -95,7 +93,7 @@ pub struct Scheduler {
9593
// destroyed before it's actually destroyed.
9694

9795
/// The event loop used to drive the scheduler and perform I/O
98-
event_loop: ~EventLoopObject,
96+
event_loop: ~EventLoop,
9997
}
10098

10199
/// An indication of how hard to work on a given operation, the difference
@@ -915,7 +913,7 @@ mod test {
915913
use cell::Cell;
916914
use rt::thread::Thread;
917915
use rt::task::{Task, Sched};
918-
use rt::rtio::EventLoop;
916+
use rt::basic;
919917
use rt::util;
920918
use option::{Some};
921919

@@ -1015,7 +1013,6 @@ mod test {
10151013
#[test]
10161014
fn test_schedule_home_states() {
10171015

1018-
use rt::uv::uvio::UvEventLoop;
10191016
use rt::sleeper_list::SleeperList;
10201017
use rt::work_queue::WorkQueue;
10211018
use rt::sched::Shutdown;
@@ -1031,7 +1028,7 @@ mod test {
10311028

10321029
// Our normal scheduler
10331030
let mut normal_sched = ~Scheduler::new(
1034-
~UvEventLoop::new() as ~EventLoop,
1031+
basic::event_loop(),
10351032
normal_queue,
10361033
queues.clone(),
10371034
sleepers.clone());
@@ -1042,7 +1039,7 @@ mod test {
10421039

10431040
// Our special scheduler
10441041
let mut special_sched = ~Scheduler::new_special(
1045-
~UvEventLoop::new() as ~EventLoop,
1042+
basic::event_loop(),
10461043
special_queue.clone(),
10471044
queues.clone(),
10481045
sleepers.clone(),
@@ -1153,7 +1150,7 @@ mod test {
11531150
// in the work queue, but we are performing I/O, that once we do put
11541151
// something in the work queue again the scheduler picks it up and doesn't
11551152
// exit before emptying the work queue
1156-
do run_in_newsched_task {
1153+
do run_in_uv_task {
11571154
do spawntask {
11581155
timer::sleep(10);
11591156
}
@@ -1195,7 +1192,6 @@ mod test {
11951192
use rt::work_queue::WorkQueue;
11961193
use rt::sleeper_list::SleeperList;
11971194
use rt::stack::StackPool;
1198-
use rt::uv::uvio::UvEventLoop;
11991195
use rt::sched::{Shutdown, TaskFromFriend};
12001196
use util;
12011197

@@ -1206,7 +1202,7 @@ mod test {
12061202
let queues = ~[queue.clone()];
12071203

12081204
let mut sched = ~Scheduler::new(
1209-
~UvEventLoop::new() as ~EventLoop,
1205+
basic::event_loop(),
12101206
queue,
12111207
queues.clone(),
12121208
sleepers.clone());

src/libstd/rt/task.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ mod test {
637637
638638
#[test]
639639
fn rng() {
640-
do run_in_newsched_task() {
640+
do run_in_uv_task() {
641641
use rand::{rng, Rng};
642642
let mut r = rng();
643643
let _ = r.next_u32();
@@ -646,7 +646,7 @@ mod test {
646646
647647
#[test]
648648
fn logging() {
649-
do run_in_newsched_task() {
649+
do run_in_uv_task() {
650650
info!("here i am. logging in a newsched task");
651651
}
652652
}

0 commit comments

Comments
 (0)