Skip to content

Commit f9675a5

Browse files
committed
Variant of suspension based on fibers
1 parent 08a29dd commit f9675a5

File tree

6 files changed

+89
-22
lines changed

6 files changed

+89
-22
lines changed

tests/run/suspend-strawman-2.check

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
test async:
2+
33
3+
(22,11)

tests/run/suspend-strawman-2/Async.scala

+30-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package concurrent
22
import java.util.concurrent.atomic.AtomicBoolean
33
import scala.collection.mutable
4-
import runtime.suspend
5-
import scala.util.boundary
4+
import fiberRuntime.suspend
5+
import fiberRuntime.boundary
66

77
/** A context that allows to suspend waiting for asynchronous data sources
88
*/
@@ -53,23 +53,48 @@ object Async:
5353
checkCancellation()
5454
src.poll().getOrElse:
5555
try
56+
var result: Option[T] = None
5657
suspend[T, Unit]: k =>
5758
src.onComplete: x =>
5859
scheduler.schedule: () =>
59-
k.resume(x)
60+
result = Some(x)
61+
k.resume()
6062
true // signals to `src` that result `x` was consumed
63+
result.get
6164
finally checkCancellation()
6265

6366
end Impl
6467

68+
private class Blocking(val scheduler: Scheduler = Scheduler) extends Async:
69+
70+
def root = Cancellable.empty
71+
72+
protected def checkCancellation(): Unit = ()
73+
74+
private var hasResumed = false
75+
76+
def await[T](src: Source[T]): T = synchronized:
77+
src.poll() match
78+
case Some(x) => x
79+
case None =>
80+
var result: Option[T] = None
81+
src.onComplete: x =>
82+
synchronized:
83+
result = Some(x)
84+
notify()
85+
true
86+
while result.isEmpty do wait()
87+
result.get
88+
89+
def blocking[T](body: Async ?=> T, scheduler: Scheduler = Scheduler): T =
90+
body(using Blocking())
91+
6592
/** The currently executing Async context */
6693
inline def current(using async: Async): Async = async
6794

6895
/** Await source result in currently executing Async context */
6996
inline def await[T](src: Source[T])(using async: Async): T = async.await(src)
7097

71-
72-
7398
/** A function `T => Boolean` whose lineage is recorded by its implementing
7499
* classes. The Listener function accepts values of type `T` and returns
75100
* `true` iff the value was consumed by an async block.
+21-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,24 @@
11
import concurrent.*
2+
import fiberRuntime.boundary.setName
3+
4+
@main def Test =
5+
given Scheduler = Scheduler
6+
val x = Future:
7+
setName("x")
8+
val a = Future{ setName("xa"); 22 }
9+
val b = Future{ setName("xb"); 11 }
10+
val c = Future { setName("xc"); assert(false); 1 }
11+
c.alt(Future{ setName("alt1"); a.value + b.value }).alt(c).value
12+
val y = Future:
13+
setName("y")
14+
val a = Future{ setName("ya"); 22 }
15+
val b = Future{ setName("yb"); 11 }
16+
a.zip(b).value
17+
println("test async:")
18+
Async.blocking:
19+
println(x.value)
20+
println(y.value)
21+
//println("test choices:")
22+
//println(TestChoices)
223

3-
@main def Test = ()
424

tests/run/suspend-strawman-2/channels.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package concurrent
22
import scala.collection.mutable, mutable.ListBuffer
3-
import scala.util.boundary, boundary.Label
4-
import runtime.suspend
3+
import fiberRuntime.boundary, boundary.Label
4+
import fiberRuntime.suspend
55
import java.util.concurrent.CancellationException
66
import Async.{Listener, await}
77

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package fiberRuntime
2+
3+
/** A delimited contination, which can be invoked with `resume` */
4+
class Suspension:
5+
private var hasResumed = false
6+
def resume(): Unit = synchronized:
7+
hasResumed = true
8+
notify()
9+
def suspend(): Unit = synchronized:
10+
if !hasResumed then
11+
wait()
12+
13+
def suspend[T, R](body: Suspension => Unit): Unit =
14+
val susp = Suspension()
15+
body(susp)
16+
susp.suspend()
17+
18+
object boundary:
19+
final class Label[-T]()
20+
21+
def setName(name: String) = ()
22+
23+
def apply[T](body: Label[T] ?=> Unit): Unit =
24+
new Thread:
25+
override def run() =
26+
body(using Label[T]())
27+
.start()
28+
29+

tests/run/suspend-strawman-2/futures.scala

+4-14
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package concurrent
22

33
import scala.collection.mutable, mutable.ListBuffer
4-
import scala.util.boundary
4+
import fiberRuntime.boundary
55
import scala.compiletime.uninitialized
66
import scala.util.{Try, Success, Failure}
77
import scala.annotation.unchecked.uncheckedVariance
@@ -16,11 +16,6 @@ trait Future[+T] extends Async.OriginalSource[Try[T]], Cancellable:
1616
*/
1717
def value(using async: Async): T
1818

19-
/** Block thread until future is completed and return result
20-
* N.B. This should be parameterized with a timeout.
21-
*/
22-
def force(): T
23-
2419
/** Eventually stop computation of this future and fail with
2520
* a `Cancellation` exception. Also cancel all children.
2621
*/
@@ -81,10 +76,6 @@ object Future:
8176
def value(using async: Async): T =
8277
async.await(this).get
8378

84-
def force(): T = synchronized:
85-
while !hasCompleted do wait()
86-
result.get
87-
8879
/** Complete future with result. If future was cancelled in the meantime,
8980
* return a CancellationException failure instead.
9081
* Note: @uncheckedVariance is safe here since `complete` is called from
@@ -99,8 +90,6 @@ object Future:
9990
this.result = result
10091
hasCompleted = true
10192
for listener <- extract(waiting) do listener(result)
102-
synchronized:
103-
notifyAll()
10493

10594
end CoreFuture
10695

@@ -151,7 +140,8 @@ object Future:
151140
* If either task succeeds, succeed with the success that was returned first
152141
* and cancel the other. Otherwise, fail with the failure that was returned last.
153142
*/
154-
def alt[T2 >: T1](f2: Future[T2])(using Async.Config): Future[T2] = Future:
143+
def alt[T2 >: T1](f2: Future[T2], name: String = "alt")(using Async.Config): Future[T2] = Future:
144+
boundary.setName(name)
155145
Async.await(Async.either(f1, f2)) match
156146
case Left(Success(x1)) => f2.cancel(); x1
157147
case Right(Success(x2)) => f1.cancel(); x2
@@ -210,5 +200,5 @@ def add(x: Future[Int], xs: List[Future[Int]])(using Scheduler): Future[Int] =
210200
end add
211201

212202
def Main(x: Future[Int], xs: List[Future[Int]])(using Scheduler): Int =
213-
add(x, xs).force()
203+
Async.blocking(add(x, xs).value)
214204

0 commit comments

Comments
 (0)