Skip to content

Commit 2be57b4

Browse files
fzaisercelinval
andauthored
Add kani::spawn and an executor to the Kani library (rust-lang#1659)
This adds an executor (scheduler for async futures) to the Kani library, thus supporting `kani::spawn` as a replacement for `tokio::spawn`. It also includes `kani::yield_now` which is similar to `tokio::yield_now`. Co-authored-by: Celina G. Val <[email protected]>
1 parent 08ed811 commit 2be57b4

File tree

6 files changed

+226
-205
lines changed

6 files changed

+226
-205
lines changed

kani-driver/src/args/common.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ pub enum UnstableFeatures {
4242
CFfi,
4343
/// Enable concrete playback flow.
4444
ConcretePlayback,
45+
/// Enable Kani's unstable async library.
46+
AsyncLib,
4547
}
4648

4749
impl ValidateArgs for CommonArgs {

library/kani/src/futures.rs

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ use std::{
1515
/// Whereas a clever executor like `block_on` in `futures` or `tokio` would interact with the OS scheduler
1616
/// to be woken up when a resource becomes available, this is not supported by Kani.
1717
/// As a consequence, this function completely ignores the waker infrastructure and just polls the given future in a busy loop.
18+
///
19+
/// Note that [`spawn`] is not supported with this function. Use [`block_on_with_spawn`] if you need it.
20+
#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
1821
pub fn block_on<T>(mut fut: impl Future<Output = T>) -> T {
1922
let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
2023
let cx = &mut Context::from_waker(&waker);
@@ -41,3 +44,189 @@ const NOOP_RAW_WAKER: RawWaker = {
4144

4245
RawWaker::new(std::ptr::null(), &RawWakerVTable::new(clone_waker, noop, noop, noop))
4346
};
47+
48+
/// The global executor used by [`spawn`] and [`block_on_with_spawn`] to run tasks.
49+
static mut GLOBAL_EXECUTOR: Option<Scheduler> = None;
50+
51+
type BoxFuture = Pin<Box<dyn Future<Output = ()> + Sync + 'static>>;
52+
53+
/// Indicates to the scheduler whether it can `kani::assume` that the returned task is running.
54+
///
55+
/// This is useful if the task was picked nondeterministically using `kani::any()`.
56+
/// For more information, see [`SchedulingStrategy`].
57+
pub enum SchedulingAssumption {
58+
CanAssumeRunning,
59+
CannotAssumeRunning,
60+
}
61+
62+
/// Trait that determines the possible sequence of tasks scheduling for a harness.
63+
///
64+
/// If your harness spawns several tasks, Kani's scheduler has to decide in what order to poll them.
65+
/// This order may depend on the needs of your verification goal.
66+
/// For example, you sometimes may wish to verify all possible schedulings, i.e. a nondeterministic scheduling strategy.
67+
///
68+
/// Nondeterministic scheduling strategies can be very slow to verify because they require Kani to check a large number of permutations of tasks.
69+
/// So if you want to verify a harness that uses `spawn`, but don't care about concurrency issues, you can simply use a deterministic scheduling strategy,
70+
/// such as [`RoundRobin`], which polls each task in turn.
71+
///
72+
/// Finally, you have the option of providing your own scheduling strategy by implementing this trait.
73+
/// This can be useful, for example, if you want to verify that things work correctly for a very specific task ordering.
74+
pub trait SchedulingStrategy {
75+
/// Picks the next task to be scheduled whenever the scheduler needs to pick a task to run next, and whether it can be assumed that the picked task is still running
76+
///
77+
/// Tasks are numbered `0..num_tasks`.
78+
/// For example, if pick_task(4) returns (2, CanAssumeRunning) than it picked the task with index 2 and allows Kani to `assume` that this task is still running.
79+
/// This is useful if the task is chosen nondeterministicall (`kani::any()`) and allows the verifier to discard useless execution branches (such as polling a completed task again).
80+
///
81+
/// As a rule of thumb:
82+
/// if the scheduling strategy picks the next task nondeterministically (using `kani::any()`), return CanAssumeRunning, otherwise CannotAssumeRunning.
83+
/// When returning `CanAssumeRunning`, the scheduler will then assume that the picked task is still running, which cuts off "useless" paths where a completed task is polled again.
84+
/// It is even necessary to make things terminate if nondeterminism is involved:
85+
/// if we pick the task nondeterministically, and don't have the restriction to still running tasks, we could poll the same task over and over again.
86+
///
87+
/// However, for most deterministic scheduling strategies, e.g. the round robin scheduling strategy, assuming that the picked task is still running is generally not possible
88+
/// because if that task has ended, we are saying assume(false) and the verification effectively stops (which is undesirable, of course).
89+
/// In such cases, return `CannotAssumeRunning` instead.
90+
fn pick_task(&mut self, num_tasks: usize) -> (usize, SchedulingAssumption);
91+
}
92+
93+
/// Keeps cycling through the tasks in a deterministic order
94+
#[derive(Default)]
95+
pub struct RoundRobin {
96+
index: usize,
97+
}
98+
99+
impl SchedulingStrategy for RoundRobin {
100+
#[inline]
101+
fn pick_task(&mut self, num_tasks: usize) -> (usize, SchedulingAssumption) {
102+
self.index = (self.index + 1) % num_tasks;
103+
(self.index, SchedulingAssumption::CannotAssumeRunning)
104+
}
105+
}
106+
107+
pub(crate) struct Scheduler {
108+
tasks: Vec<Option<BoxFuture>>,
109+
num_running: usize,
110+
}
111+
112+
impl Scheduler {
113+
/// Creates a scheduler with an empty task list
114+
#[inline]
115+
pub(crate) const fn new() -> Scheduler {
116+
Scheduler { tasks: Vec::new(), num_running: 0 }
117+
}
118+
119+
/// Adds a future to the scheduler's task list, returning a JoinHandle
120+
pub(crate) fn spawn<F: Future<Output = ()> + Sync + 'static>(&mut self, fut: F) -> JoinHandle {
121+
let index = self.tasks.len();
122+
self.tasks.push(Some(Box::pin(fut)));
123+
self.num_running += 1;
124+
JoinHandle { index }
125+
}
126+
127+
/// Runs the scheduler with the given scheduling plan until all tasks have completed
128+
fn run(&mut self, mut scheduling_plan: impl SchedulingStrategy) {
129+
let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
130+
let cx = &mut Context::from_waker(&waker);
131+
while self.num_running > 0 {
132+
let (index, assumption) = scheduling_plan.pick_task(self.tasks.len());
133+
let task = &mut self.tasks[index];
134+
if let Some(fut) = task.as_mut() {
135+
match fut.as_mut().poll(cx) {
136+
std::task::Poll::Ready(()) => {
137+
self.num_running -= 1;
138+
let _prev = task.take();
139+
}
140+
std::task::Poll::Pending => (),
141+
}
142+
} else if let SchedulingAssumption::CanAssumeRunning = assumption {
143+
crate::assume(false); // useful so that we can assume that a nondeterministically picked task is still running
144+
}
145+
}
146+
}
147+
148+
/// Polls the given future and the tasks it may spawn until all of them complete
149+
fn block_on<F: Future<Output = ()> + Sync + 'static>(
150+
&mut self,
151+
fut: F,
152+
scheduling_plan: impl SchedulingStrategy,
153+
) {
154+
self.spawn(fut);
155+
self.run(scheduling_plan);
156+
}
157+
}
158+
159+
/// Result of spawning a task.
160+
///
161+
/// If you `.await` a JoinHandle, this will wait for the spawned task to complete.
162+
pub struct JoinHandle {
163+
index: usize,
164+
}
165+
166+
impl Future for JoinHandle {
167+
type Output = ();
168+
169+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
170+
if unsafe { GLOBAL_EXECUTOR.as_mut().unwrap().tasks[self.index].is_some() } {
171+
std::task::Poll::Pending
172+
} else {
173+
cx.waker().wake_by_ref(); // For completeness. But Kani currently ignores wakers.
174+
std::task::Poll::Ready(())
175+
}
176+
}
177+
}
178+
179+
/// Spawns a task on the current global executor (which is set by [`block_on_with_spawn`])
180+
///
181+
/// This function can only be called inside a future passed to [`block_on_with_spawn`].
182+
#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
183+
pub fn spawn<F: Future<Output = ()> + Sync + 'static>(fut: F) -> JoinHandle {
184+
unsafe {
185+
GLOBAL_EXECUTOR
186+
.as_mut()
187+
.expect("`spawn` should only be called within `block_on_with_spawn`")
188+
.spawn(fut)
189+
}
190+
}
191+
192+
/// Polls the given future and the tasks it may spawn until all of them complete
193+
///
194+
/// Contrary to [`block_on`], this allows `spawn`ing other futures
195+
#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
196+
pub fn block_on_with_spawn<F: Future<Output = ()> + Sync + 'static>(
197+
fut: F,
198+
scheduling_plan: impl SchedulingStrategy,
199+
) {
200+
unsafe {
201+
assert!(GLOBAL_EXECUTOR.is_none(), "`block_on_with_spawn` should not be nested");
202+
GLOBAL_EXECUTOR = Some(Scheduler::new());
203+
GLOBAL_EXECUTOR.as_mut().unwrap().block_on(fut, scheduling_plan);
204+
GLOBAL_EXECUTOR = None;
205+
}
206+
}
207+
208+
/// Suspends execution of the current future, to allow the scheduler to poll another future
209+
///
210+
/// Specifically, it returns a future that isn't ready until the second time it is polled.
211+
#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
212+
pub fn yield_now() -> impl Future<Output = ()> {
213+
struct YieldNow {
214+
yielded: bool,
215+
}
216+
217+
impl Future for YieldNow {
218+
type Output = ();
219+
220+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
221+
if self.yielded {
222+
cx.waker().wake_by_ref(); // For completeness. But Kani currently ignores wakers.
223+
std::task::Poll::Ready(())
224+
} else {
225+
self.yielded = true;
226+
std::task::Poll::Pending
227+
}
228+
}
229+
}
230+
231+
YieldNow { yielded: false }
232+
}

library/kani/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ pub use concrete_playback::concrete_playback_run;
2626
pub fn concrete_playback_run<F: Fn()>(_: Vec<Vec<u8>>, _: F) {
2727
unreachable!("Concrete playback does not work during verification")
2828
}
29-
30-
pub use futures::block_on;
29+
pub use futures::{block_on, block_on_with_spawn, spawn, yield_now, RoundRobin};
3130

3231
/// Creates an assumption that will be valid after this statement run. Note that the assumption
3332
/// will only be applied for paths that follow the assumption. If the assumption doesn't hold, the

tests/kani/AsyncAwait/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0 OR MIT
33
//
44
// compile-flags: --edition 2018
5+
// kani-flags: -Z async-lib
56

67
// Tests that the language constructs `async { ... }` blocks, `async fn`, and `.await` work correctly.
78

0 commit comments

Comments
 (0)