diff --git a/compiler/src/dotty/tools/dotc/parsing/Parsers.scala b/compiler/src/dotty/tools/dotc/parsing/Parsers.scala index 479ae1fa9095..661bc502dbad 100644 --- a/compiler/src/dotty/tools/dotc/parsing/Parsers.scala +++ b/compiler/src/dotty/tools/dotc/parsing/Parsers.scala @@ -3080,8 +3080,8 @@ object Parsers { /* -------- PARAMETERS ------------------------------------------- */ /** DefParamClauses ::= DefParamClause { DefParamClause } -- and two DefTypeParamClause cannot be adjacent - * DefParamClause ::= DefTypeParamClause - * | DefTermParamClause + * DefParamClause ::= DefTypeParamClause + * | DefTermParamClause * | UsingParamClause */ def typeOrTermParamClauses( @@ -3179,7 +3179,7 @@ object Parsers { * UsingClsTermParamClause::= ‘(’ ‘using’ [‘erased’] (ClsParams | ContextTypes) ‘)’ * ClsParams ::= ClsParam {‘,’ ClsParam} * ClsParam ::= {Annotation} - * + * * TypelessClause ::= DefTermParamClause * | UsingParamClause * @@ -3557,13 +3557,13 @@ object Parsers { } } - + /** DefDef ::= DefSig [‘:’ Type] ‘=’ Expr * | this TypelessClauses [DefImplicitClause] `=' ConstrExpr * DefDcl ::= DefSig `:' Type * DefSig ::= id [DefTypeParamClause] DefTermParamClauses - * + * * if clauseInterleaving is enabled: * DefSig ::= id [DefParamClauses] [DefImplicitClause] */ @@ -3602,8 +3602,8 @@ object Parsers { val mods1 = addFlag(mods, Method) val ident = termIdent() var name = ident.name.asTermName - val paramss = - if in.featureEnabled(Feature.clauseInterleaving) then + val paramss = + if in.featureEnabled(Feature.clauseInterleaving) then // If you are making interleaving stable manually, please refer to the PR introducing it instead, section "How to make non-experimental" typeOrTermParamClauses(ParamOwner.Def, numLeadParams = numLeadParams) else @@ -3613,7 +3613,7 @@ object Parsers { joinParams(tparams, vparamss) var tpt = fromWithinReturnType { typedOpt() } - + if (migrateTo3) newLineOptWhenFollowedBy(LBRACE) val rhs = if in.token == EQUALS then diff --git a/library/src/scala/util/boundary.scala b/library/src/scala/util/boundary.scala index 3c6c6982c7ee..26972a2d4b76 100644 --- a/library/src/scala/util/boundary.scala +++ b/library/src/scala/util/boundary.scala @@ -33,6 +33,11 @@ object boundary: /*message*/ null, /*cause*/ null, /*enableSuppression=*/ false, /*writableStackTrace*/ false) /** Labels are targets indicating which boundary will be exited by a `break`. + * A Label is generated and passed to code wrapped in a `boundary` call. Example + * + * boundary[Unit]: + * ... + * summon[Label[Unit]] // will link to the label generated by `boundary` */ final class Label[-T] diff --git a/tests/pos/suspend-strawman-1/choices.scala b/tests/pos/suspend-strawman-1/choices.scala new file mode 100644 index 000000000000..b969a1367116 --- /dev/null +++ b/tests/pos/suspend-strawman-1/choices.scala @@ -0,0 +1,84 @@ +import collection.mutable.ListBuffer +import scala.util.boundary, boundary.{Label, break} +import runtime.* + +/** A single method iterator */ +abstract class Choices[+T]: + def next: Option[T] + +object Choices: + def apply[T](elems: T*): Choices[T] = + apply(elems.iterator) + + def apply[T](it: Iterator[T]) = new Choices[T]: + def next = if it.hasNext then Some(it.next) else None +end Choices + +/** A Label representikng a boundary to which can be returned + * - None, indicating an mepty choice + * - a suspension with Option[T] result, which will be + * iterated over element by element in the boundary's result. + */ +type CanChoose[T] = Label[None.type | Suspension[Option[T]]] + +/** A variable representing Choices */ +class Ref[T](choices: => Choices[T]): + + /** Try all values of this variable. For each value, run the continuation. + * If it yields a Some result take that as the next value of the enclosing + * boundary. If it yields a Non, skip the element. + */ + def each[R](using CanChoose[R]): T = + val cs = choices + suspend() + cs.next.getOrElse(break(None)) + +end Ref + +/** A prompt to iterate a compputation `body` over all choices of + * variables which it references. + * @return all results in a new `Choices` iterator, + */ +def choices[T](body: CanChoose[T] ?=> T): Choices[T] = new Choices: + var stack: List[Suspension[Option[T]]] = Nil + + def next: Option[T] = + boundary(Some(body)) match + case s: Some[T] => s + case None => + if stack.isEmpty then + // no (more) variable choices encountered + None + else + // last variable's choices exhausted; use next value of previous variable + stack = stack.tail + stack.head.resume() + case susp: Suspension[Option[T]] => + // A new Choices variable was encountered + stack = susp :: stack + stack.head.resume() + +@main def test: Choices[Int] = + val x = Ref(Choices(1, -2, -3)) + val y = Ref(Choices("ab", "cde")) + choices: + val xx = x.each + xx + ( + if xx > 0 then y.each.length * x.each + else y.each.length + ) + /* Gives the results of the following computations: + 1 + 2 * 1 + 1 + 2 * -2 + 1 + 2 * -3 + 1 + 3 * 1 + 1 + 3 * -2 + 1 + 3 * -3 + -2 + 2 + -2 + 3 + -3 + 2 + -3 + 3 + */ + + + diff --git a/tests/pos/suspend-strawman-1/futures.scala b/tests/pos/suspend-strawman-1/futures.scala new file mode 100644 index 000000000000..d9c1b6870e89 --- /dev/null +++ b/tests/pos/suspend-strawman-1/futures.scala @@ -0,0 +1,44 @@ +import collection.mutable.ListBuffer +import scala.util.boundary, boundary.{Label, break} +import runtime.* + +/** A suspension and a value indicating the Future for which the suspension is waiting */ +type Waiting = (Suspension[Unit], Future[?]) + +/** The capability to suspend while waiting for some other future */ +type CanWait = Label[Waiting] + +class Future[+T] private (): + private var result: Option[T] = None + private var waiting: ListBuffer[Runnable] = ListBuffer() + + /** Return future's result, while waiting until it is completed if necessary. + */ + def await(using CanWait): T = result match + case Some(x) => x + case None => + suspend((s: Suspension[Unit]) => (s, this)) + await + +object Future: + + private def complete[T](f: Future[T], value: T): Unit = + f.result = Some(value) + f.waiting.foreach(Scheduler.schedule) + f.waiting = ListBuffer() + + /** Create future that eventually returns the result of `op` */ + def apply[T](body: CanWait ?=> T): Future[T] = + val f = new Future[T] + Scheduler.schedule: () => + boundary[Unit | Waiting]: + complete(f, body) + match + case (suspension, blocking) => + blocking.waiting += (() => suspension.resume()) + case () => + f + +def Test(x: Future[Int], xs: List[Future[Int]]) = + Future: + x.await + xs.map(_.await).sum diff --git a/tests/pos/suspend-strawman-1/runtime.scala b/tests/pos/suspend-strawman-1/runtime.scala new file mode 100644 index 000000000000..e021f317de25 --- /dev/null +++ b/tests/pos/suspend-strawman-1/runtime.scala @@ -0,0 +1,21 @@ +import scala.util.boundary, boundary.Label +object runtime: + + /** A hypthetical task scheduler */ + object Scheduler: + def schedule(task: Runnable): Unit = ??? + + /** Contains a delimited contination, which can be invoked with `resume` */ + class Suspension[+R]: + def resume(): R = ??? + + /** Returns `fn(s)` where `s` is the current suspension to the boundary associated + * with the given label. + */ + def suspend[R, T](fn: Suspension[R] => T)(using Label[T]): T = ??? + + /** Returns the current suspension to the boundary associated + * with the given label. + */ + def suspend[R]()(using Label[Suspension[R]]): Unit = + suspend[R, Suspension[R]](identity) \ No newline at end of file diff --git a/tests/run/suspend-strawman-2.check b/tests/run/suspend-strawman-2.check new file mode 100644 index 000000000000..4c66392cd76a --- /dev/null +++ b/tests/run/suspend-strawman-2.check @@ -0,0 +1,3 @@ +test async: +33 +(22,11) diff --git a/tests/run/suspend-strawman-2/Async.scala b/tests/run/suspend-strawman-2/Async.scala new file mode 100644 index 000000000000..b68c4ad62e17 --- /dev/null +++ b/tests/run/suspend-strawman-2/Async.scala @@ -0,0 +1,209 @@ +package concurrent +import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable +import scala.concurrent.ExecutionContext + +/** A context that allows to suspend waiting for asynchronous data sources + */ +trait Async: + + /** Wait for completion of async source `src` and return the result */ + def await[T](src: Async.Source[T]): T + + /** The configuration of this Async */ + def config: Async.Config + + /** An Async of the same kind as this one, with a new configuration as given */ + def withConfig(config: Async.Config): Async + +object Async: + + /** The underlying configuration of an async block */ + case class Config(scheduler: ExecutionContext, group: CancellationGroup) + + trait LowPrioConfig: + + /** A toplevel async group with given scheduler and a synthetic root that + * ignores cancellation requests + */ + given fromExecutionContext(using scheduler: ExecutionContext): Config = + Config(scheduler, CancellationGroup.Unlinked) + + end LowPrioConfig + + object Config extends LowPrioConfig: + + /** The async configuration stored in the given async capabaility */ + given fromAsync(using async: Async): Config = async.config + + end Config + + /** An implementation of Async that blocks the running thread when waiting */ + private class Blocking(using val config: Config) extends Async: + + def await[T](src: Source[T]): T = + src.poll().getOrElse: + var result: Option[T] = None + src.onComplete: x => + synchronized: + result = Some(x) + notify() + true + synchronized: + while result.isEmpty do wait() + result.get + + def withConfig(config: Config) = Blocking(using config) + end Blocking + + /** Execute asynchronous computation `body` on currently running thread. + * The thread will suspend when the computation waits. + */ + def blocking[T](body: Async ?=> T)(using ExecutionContext): T = + body(using Blocking()) + + /** The currently executing Async context */ + inline def current(using async: Async): Async = async + + /** Await source result in currently executing Async context */ + inline def await[T](src: Source[T])(using async: Async): T = async.await(src) + + def group[T](body: Async ?=> T)(using async: Async): T = + val newGroup = CancellationGroup().link() + try body(using async.withConfig(async.config.copy(group = newGroup))) + finally newGroup.cancel() + + /** A function `T => Boolean` whose lineage is recorded by its implementing + * classes. The Listener function accepts values of type `T` and returns + * `true` iff the value was consumed by an async block. + */ + trait Listener[-T] extends (T => Boolean) + + /** A listener for values that are processed by the given source `src` and + * that are demanded by the continuation listener `continue`. + */ + abstract case class ForwardingListener[T](src: Source[?], continue: Listener[?]) extends Listener[T] + + /** An asynchronous data source. Sources can be persistent or ephemeral. + * A persistent source will always pass same data to calls of `poll and `onComplete`. + * An ephememral source can pass new data in every call. + * An example of a persistent source is `Future`. + * An example of an ephemeral source is `Channel`. + */ + trait Source[+T]: + + /** If data is available at present, pass it to function `k` + * and return the result of this call. Otherwise return false. + * `k` returns true iff the data was consumed in an async block. + * Calls to `poll` are always synchronous. + */ + def poll(k: Listener[T]): Boolean + + /** Once data is available, pass it to function `k`. + * `k` returns true iff the data was consumed in an async block. + * Calls to `onComplete` are usually asynchronous, meaning that + * the passed continuation `k` is a suspension. + */ + def onComplete(k: Listener[T]): Unit + + /** Signal that listener `k` is dead (i.e. will always return `false` from now on). + * This permits original, (i.e. non-derived) sources like futures or channels + * to drop the listener from their waiting sets. + */ + def dropListener(k: Listener[T]): Unit + + /** Utililty method for direct polling. */ + def poll(): Option[T] = + var resultOpt: Option[T] = None + poll { x => resultOpt = Some(x); true } + resultOpt + + end Source + + /** An original source has a standard definition of `onCopmplete` in terms + * of `poll` and `addListener`. + */ + abstract class OriginalSource[+T] extends Source[T]: + + /** Add `k` to the listener set of this source */ + protected def addListener(k: Listener[T]): Unit + + def onComplete(k: Listener[T]): Unit = + if !poll(k) then addListener(k) + + end OriginalSource + + /** A source that transforms an original source in some way */ + abstract class DerivedSource[T, U](val original: Source[T]) extends Source[U]: + + /** Handle a value `x` passed to the original source by possibly + * invokiong the continuation for this source. + */ + protected def listen(x: T, k: Listener[U]): Boolean + + private def transform(k: Listener[U]): Listener[T] = + new ForwardingListener[T](this, k): + def apply(x: T): Boolean = listen(x, k) + + def poll(k: Listener[U]): Boolean = + original.poll(transform(k)) + def onComplete(k: Listener[U]): Unit = + original.onComplete(transform(k)) + def dropListener(k: Listener[U]): Unit = + original.dropListener(transform(k)) + + end DerivedSource + + extension [T](src: Source[T]) + + /** Pass on data transformed by `f` */ + def map[U](f: T => U): Source[U] = + new DerivedSource[T, U](src): + def listen(x: T, k: Listener[U]) = k(f(x)) + + /** Pass on only data matching the predicate `p` */ + def filter(p: T => Boolean): Source[T] = + new DerivedSource[T, T](src): + def listen(x: T, k: Listener[T]) = p(x) && k(x) + + /** Pass first result from any of `sources` to the continuation */ + def race[T](sources: Source[T]*): Source[T] = + new Source[T]: + + def poll(k: Listener[T]): Boolean = + val it = sources.iterator + var found = false + while it.hasNext && !found do + it.next.poll: x => + found = k(x) + found + found + + def onComplete(k: Listener[T]): Unit = + val listener = new ForwardingListener[T](this, k): + var foundBefore = false + def continueIfFirst(x: T): Boolean = synchronized: + if foundBefore then false else { foundBefore = k(x); foundBefore } + def apply(x: T): Boolean = + val found = continueIfFirst(x) + if found then sources.foreach(_.dropListener(this)) + found + sources.foreach(_.onComplete(listener)) + + def dropListener(k: Listener[T]): Unit = + val listener = new ForwardingListener[T](this, k): + def apply(x: T): Boolean = ??? + // not to be called, we need the listener only for its + // hashcode and equality test. + sources.foreach(_.dropListener(listener)) + + end race + + /** If left (respectively, right) source succeeds with `x`, pass `Left(x)`, + * (respectively, Right(x)) on to the continuation. + */ + def either[T1, T2](src1: Source[T1], src2: Source[T2]): Source[Either[T1, T2]] = + race(src1.map(Left(_)), src2.map(Right(_))) + +end Async + diff --git a/tests/run/suspend-strawman-2/Cancellable.scala b/tests/run/suspend-strawman-2/Cancellable.scala new file mode 100644 index 000000000000..91e86abe89ed --- /dev/null +++ b/tests/run/suspend-strawman-2/Cancellable.scala @@ -0,0 +1,62 @@ +package concurrent +import scala.collection.mutable + +/** A trait for cancellable entities that can be grouped */ +trait Cancellable: + + private var group: CancellationGroup = CancellationGroup.Unlinked + + /** Issue a cancel request */ + def cancel(): Unit + + /** Add this cancellable to the given group after removing + * it from the previous group in which it was. + */ + def link(group: CancellationGroup): this.type = + this.group.drop(this) + this.group = group + this.group.add(this) + this + + /** Link this cancellable to the cancellable group of the + * current async context. + */ + def link()(using async: Async): this.type = + link(async.config.group) + + /** Unlink this cancellable from its group. */ + def unlink(): this.type = + link(CancellationGroup.Unlinked) + +end Cancellable + +class CancellationGroup extends Cancellable: + private var members: mutable.Set[Cancellable] = mutable.Set() + + /** Cancel all members and clear the members set */ + def cancel() = + synchronized(members.toArray).foreach(_.cancel()) + members.clear() + + /** Add given member to the members set */ + def add(member: Cancellable): Unit = synchronized: + members += member + + /** Remove given member from the members set if it is an element */ + def drop(member: Cancellable): Unit = synchronized: + members -= member + +object CancellationGroup: + + /** A sentinal group of cancellables that are in fact not linked + * to any real group. `cancel`, `add`, and `drop` do nothing when + * called on this group. + */ + object Unlinked extends CancellationGroup: + override def cancel() = () + override def add(member: Cancellable): Unit = () + override def drop(member: Cancellable): Unit = () + end Unlinked + +end CancellationGroup + diff --git a/tests/run/suspend-strawman-2/Test.scala b/tests/run/suspend-strawman-2/Test.scala new file mode 100644 index 000000000000..42588295bc0b --- /dev/null +++ b/tests/run/suspend-strawman-2/Test.scala @@ -0,0 +1,33 @@ +// scalajs: --skip + +import concurrent.* +import fiberRuntime.boundary.setName +import scala.concurrent.ExecutionContext + +@main def Test = + given ExecutionContext = ExecutionContext.global + val x = Future: + setName("x") + val a = Future{ setName("xa"); 22 } + val b = Future{ setName("xb"); 11 } + val c = Future { setName("xc"); assert(false); 1 } + c.alt(Future{ setName("alt1"); a.value + b.value }).alt(c).value + val y = Future: + setName("y") + val a = Future{ setName("ya"); 22 } + val b = Future{ setName("yb"); 11 } + a.zip(b).value + val z = Future: + val a = Future{ setName("za"); 22 } + val b = Future{ setName("zb"); true } + a.alt(b).value + val _: Future[Int | Boolean] = z + println("test async:") + Async.blocking: + println(x.value) + println(y.value) + //println("test choices:") + //println(TestChoices) + + + diff --git a/tests/run/suspend-strawman-2/channels.scala b/tests/run/suspend-strawman-2/channels.scala new file mode 100644 index 000000000000..71c9bd5e3ce4 --- /dev/null +++ b/tests/run/suspend-strawman-2/channels.scala @@ -0,0 +1,178 @@ +package concurrent +import scala.collection.mutable, mutable.ListBuffer +import fiberRuntime.boundary, boundary.Label +import fiberRuntime.suspend +import scala.concurrent.ExecutionContext +import scala.util.{Try, Failure} +import Async.{Listener, await} + +/** A common interface for channels */ +trait Channel[T]: + def read()(using Async): T + def send(x: T)(using Async): Unit + protected def shutDown(finalValue: T): Unit + +object Channel: + + extension [T](c: Channel[Try[T]]) + def close(): Unit = + c.shutDown(Failure(ChannelClosedException())) + +class ChannelClosedException extends Exception + +/** An unbounded asynchronous channel. Senders do not wait for matching + * readers. + */ +class AsyncChannel[T] extends Async.OriginalSource[T], Channel[T]: + + private val pending = ListBuffer[T]() + private val waiting = mutable.Set[Listener[T]]() + private var isClosed = false + + private def ensureOpen() = + if isClosed then throw ChannelClosedException() + + private def drainWaiting(x: T): Boolean = + waiting.iterator.find(_(x)) match + case Some(k) => waiting -= k; true + case None => false + + private def drainPending(k: Listener[T]): Boolean = + val sent = pending.nonEmpty && k(pending.head) + if sent then + while + pending.dropInPlace(1) + pending.nonEmpty && drainWaiting(pending.head) + do () + sent + + def read()(using Async): T = synchronized: + await(this) + + def send(x: T)(using Async): Unit = synchronized: + ensureOpen() + val sent = pending.isEmpty && drainWaiting(x) + if !sent then pending += x + + def poll(k: Listener[T]): Boolean = synchronized: + ensureOpen() + drainPending(k) + + def addListener(k: Listener[T]): Unit = synchronized: + waiting += k + + def dropListener(k: Listener[T]): Unit = synchronized: + waiting -= k + + protected def shutDown(finalValue: T) = + isClosed = true + waiting.foreach(_(finalValue)) + +end AsyncChannel + +/** An unbuffered, synchronous channel. Senders and readers both block + * until a communication between them happens. The channel provides two + * async sources, one for reading and one for sending. If a send operation + * encounters some waiting readers, or a read operation encounters some + * waiting sender the data is transmitted directly. Otherwise we add + * the operation to the corresponding pending set. + */ +trait SyncChannel[T] extends Channel[T]: + + val canRead: Async.Source[T] + val canSend: Async.Source[Listener[T]] + + def send(x: T)(using Async): Unit = await(canSend)(x) + + def read()(using Async): T = await(canRead) + +object SyncChannel: + + def apply[T](): SyncChannel[T] = new SyncChannel[T]: + + private val pendingReads = mutable.Set[Listener[T]]() + private val pendingSends = mutable.Set[Listener[Listener[T]]]() + private var isClosed = false + + private def ensureOpen() = + if isClosed then throw ChannelClosedException() + + private def link[T](pending: mutable.Set[T], op: T => Boolean): Boolean = + ensureOpen() + // Since sources are filterable, we have to match all pending readers or writers + // against the incoming request + pending.iterator.find(op) match + case Some(elem) => pending -= elem; true + case None => false + + private def collapse[T](k2: Listener[Listener[T]]): Option[T] = + var r: Option[T] = None + if k2 { x => r = Some(x); true } then r else None + + val canRead = new Async.OriginalSource[T]: + def poll(k: Listener[T]): Boolean = + link(pendingSends, sender => collapse(sender).map(k) == Some(true)) + def addListener(k: Listener[T]) = synchronized: + pendingReads += k + def dropListener(k: Listener[T]): Unit = synchronized: + pendingReads -= k + + val canSend = new Async.OriginalSource[Listener[T]]: + def poll(k: Listener[Listener[T]]): Boolean = + link(pendingReads, k(_)) + def addListener(k: Listener[Listener[T]]) = synchronized: + pendingSends += k + def dropListener(k: Listener[Listener[T]]): Unit = synchronized: + pendingSends -= k + + protected def shutDown(finalValue: T) = + isClosed = true + pendingReads.foreach(_(finalValue)) + +end SyncChannel + +def TestChannel(using ExecutionContext) = + val c = SyncChannel[Option[Int]]() + Future: + for i <- 0 to 100 do + c.send(Some(i)) + c.send(None) + Future: + var sum = 0 + def loop(): Unit = + c.read() match + case Some(x) => sum += x; loop() + case None => println(sum) + loop() + val chan = SyncChannel[Int]() + val allTasks = List( + Task: + println("task1") + chan.read(), + Task: + println("task2") + chan.read() + ) + + def start() = Future: + allTasks.map(_.run.value).sum + +def TestRace = + val c1, c2 = SyncChannel[Int]() + val s = c1.canSend + val c3 = Async.race(c1.canRead, c2.canRead) + val c4 = c3.filter(_ >= 0) + val d0 = SyncChannel[Int]() + val d1 = Async.race(c1.canRead, c2.canRead, d0.canRead) + val d2 = d1.map(_ + 1) + val c5 = Async.either(c1.canRead, c2.canRead) + .map: + case Left(x) => -x + case Right(x) => x + .filter(_ >= 0) + + val d5 = Async.either(c1.canRead, d2) + .map: + case Left(x) => -x + case Right(x) => x + diff --git a/tests/run/suspend-strawman-2/choices.scala b/tests/run/suspend-strawman-2/choices.scala new file mode 100644 index 000000000000..968c223d9c0b --- /dev/null +++ b/tests/run/suspend-strawman-2/choices.scala @@ -0,0 +1,28 @@ +import scala.util.boundary, boundary.Label +import runtime.suspend + +trait Choice: + def choose[A](choices: A*): A + +// the handler +def choices[T](body: Choice ?=> T): Seq[T] = + boundary[Seq[T]]: + given Choice with + def choose[A](choices: A*): A = + suspend[A, Seq[T]](s => choices.flatMap(s.resume)) + Seq(body) + +def choose[A](choices: A*)(using c: Choice): A = c.choose(choices*) + +def TestChoices: Seq[Int] = + choices: + def x = choose(1, -2, -3) + def y = choose("ab", "cde") + val xx = x; + xx + ( + if xx > 0 then + val z = choose(xx / 2, xx * 2) + y.length * z + else y.length + ) + diff --git a/tests/run/suspend-strawman-2/fiberRuntime.scala b/tests/run/suspend-strawman-2/fiberRuntime.scala new file mode 100644 index 000000000000..dbbc6fa66e68 --- /dev/null +++ b/tests/run/suspend-strawman-2/fiberRuntime.scala @@ -0,0 +1,52 @@ +package fiberRuntime + +object util: + inline val logging = false + inline def log(inline msg: String) = + if logging then println(msg) + + private val rand = new java.util.Random + + def sleepABit() = + Thread.sleep(rand.nextInt(100)) + + val threadName = new ThreadLocal[String] +end util +import util.* + +/** A delimited contination, which can be invoked with `resume` */ +class Suspension: + private var hasResumed = false + def resume(): Unit = synchronized: + hasResumed = true + notify() + def suspend(): Unit = synchronized: + if !hasResumed then + log(s"suspended ${threadName.get()}") + wait() + +def suspend[T, R](body: Suspension => Unit): Unit = + sleepABit() + log(s"suspending ${threadName.get()}") + val susp = Suspension() + body(susp) + sleepABit() + susp.suspend() + +object boundary: + final class Label[-T]() + + def setName(name: String) = + log(s"started $name, ${Thread.currentThread.getId()}") + sleepABit() + threadName.set(name) + + def apply[T](body: Label[T] ?=> Unit): Unit = + new Thread: + override def run() = + sleepABit() + try body(using Label[T]()) + finally log(s"finished ${threadName.get()} ${Thread.currentThread.getId()}") + .start() + + diff --git a/tests/run/suspend-strawman-2/futures.scala b/tests/run/suspend-strawman-2/futures.scala new file mode 100644 index 000000000000..f6dca15a9292 --- /dev/null +++ b/tests/run/suspend-strawman-2/futures.scala @@ -0,0 +1,242 @@ +package concurrent + +import scala.collection.mutable, mutable.ListBuffer +import fiberRuntime.util.* +import fiberRuntime.boundary +import scala.compiletime.uninitialized +import scala.util.{Try, Success, Failure} +import scala.annotation.unchecked.uncheckedVariance +import java.util.concurrent.CancellationException +import scala.concurrent.ExecutionContext + +/** A cancellable future that can suspend waiting for other asynchronous sources + */ +trait Future[+T] extends Async.OriginalSource[Try[T]], Cancellable: + + /** Wait for this future to be completed and return its result */ + def result(using async: Async): Try[T] + + /** Wait for this future to be completed, return its value in case of success, + * or rethrow exception in case of failure. + */ + def value(using async: Async): T = result.get + + /** Eventually stop computation of this future and fail with + * a `Cancellation` exception. + */ + def cancel(): Unit + +object Future: + + /** A future that is completed explicitly by calling its + * `complete` method. There are two public implementations + * + * - RunnableFuture: Completion is done by running a block of code + * - Promise.future: Completion is done by external request. + */ + private class CoreFuture[+T] extends Future[T]: + + @volatile protected var hasCompleted: Boolean = false + protected var cancelRequest = false + private var result: Try[T] = uninitialized // guaranteed to be set if hasCompleted = true + private val waiting: mutable.Set[Try[T] => Boolean] = mutable.Set() + + // Async.Source method implementations + + def poll(k: Async.Listener[Try[T]]): Boolean = + hasCompleted && k(result) + + def addListener(k: Async.Listener[Try[T]]): Unit = synchronized: + waiting += k + + def dropListener(k: Async.Listener[Try[T]]): Unit = synchronized: + waiting -= k + + // Cancellable method implementations + + def cancel(): Unit = + cancelRequest = true + + // Future method implementations + + def result(using async: Async): Try[T] = async.await(this) + + /** Complete future with result. If future was cancelled in the meantime, + * return a CancellationException failure instead. + * Note: @uncheckedVariance is safe here since `complete` is called from + * only two places: + * - from the initializer of RunnableFuture, where we are sure that `T` + * is exactly the type with which the future was created, and + * - from Promise.complete, where we are sure the type `T` is exactly + * the type with which the future was created since `Promise` is invariant. + */ + private[Future] def complete(result: Try[T] @uncheckedVariance): Unit = + val toNotify = synchronized: + if hasCompleted then Nil + else + this.result = result + hasCompleted = true + val ws = waiting.toList + waiting.clear() + ws + for listener <- toNotify do listener(result) + + end CoreFuture + + /** A future that is completed by evaluating `body` as a separate + * asynchronous operation in the given `scheduler` + */ + private class RunnableFuture[+T](body: Async ?=> T)(using ac: Async.Config) + extends CoreFuture[T]: + + def checkCancellation() = + if cancelRequest then throw CancellationException() + + /** a handler for Async */ + private def async(body: Async ?=> Unit): Unit = + class FutureAsync(using val config: Async.Config) extends Async: + + /** Await a source first by polling it, and, if that fails, by suspending + * in a onComplete call. + */ + def await[T](src: Async.Source[T]): T = + checkCancellation() + src.poll().getOrElse: + try + src.poll().getOrElse: + sleepABit() + log(s"suspending ${threadName.get()}") + var result: Option[T] = None + src.onComplete: x => + synchronized: + result = Some(x) + notify() + true + sleepABit() + synchronized: + log(s"suspended ${threadName.get()}") + while result.isEmpty do wait() + result.get + /* With full continuations, the try block can be written more simply as follows: + + suspend[T, Unit]: k => + src.onComplete: x => + scheduler.schedule: () => + k.resume(x) + true + */ + finally checkCancellation() + + def withConfig(config: Async.Config) = FutureAsync(using config) + + sleepABit() + try body(using FutureAsync()) + finally log(s"finished ${threadName.get()} ${Thread.currentThread.getId()}") + /** With continuations, this becomes: + + boundary [Unit]: + body(using FutureAsync()) + */ + end async + + ac.scheduler.execute: () => + async: + link() + Async.group: + complete(Try(body)) + unlink() + + end RunnableFuture + + /** Create a future that asynchronously executes `body` that defines + * its result value in a Try or returns failure if an exception was thrown. + * If the future is created in an Async context, it is added to the + * children of that context's root. + */ + def apply[T](body: Async ?=> T)(using ac: Async.Config): Future[T] = + RunnableFuture(body) + + /** A future that immediately terminates with the given result */ + def now[T](result: Try[T]): Future[T] = + val f = CoreFuture[T]() + f.complete(result) + f + + extension [T](f1: Future[T]) + + /** Parallel composition of two futures. + * If both futures succeed, succeed with their values in a pair. Otherwise, + * fail with the failure that was returned first. + */ + def zip[U](f2: Future[U])(using Async.Config): Future[(T, U)] = Future: + Async.await(Async.either(f1, f2)) match + case Left(Success(x1)) => (x1, f2.value) + case Right(Success(x2)) => (f1.value, x2) + case Left(Failure(ex)) => throw ex + case Right(Failure(ex)) => throw ex + + /** Alternative parallel composition of this task with `other` task. + * If either task succeeds, succeed with the success that was returned first. + * Otherwise, fail with the failure that was returned last. + */ + def alt(f2: Future[T])(using Async.Config): Future[T] = Future: + Async.await(Async.either(f1, f2)) match + case Left(Success(x1)) => x1 + case Right(Success(x2)) => x2 + case Left(_: Failure[?]) => f2.value + case Right(_: Failure[?]) => f1.value + + end extension + + // TODO: efficient n-ary versions of the last two operations + + /** A promise defines a future that is be completed via the + * promise's `complete` method. + */ + class Promise[T]: + private val myFuture = CoreFuture[T]() + + /** The future defined by this promise */ + val future: Future[T] = myFuture + + /** Define the result value of `future`. However, if `future` was + * cancelled in the meantime complete with a `CancellationException` + * failure instead. + */ + def complete(result: Try[T]): Unit = myFuture.complete(result) + + end Promise +end Future + +/** A task is a template that can be turned into a runnable future + * Composing tasks can be referentially transparent. + */ +class Task[+T](val body: Async ?=> T): + + /** Start a future computed from the `body` of this task */ + def run(using Async.Config) = Future(body) + +end Task + +def add(x: Future[Int], xs: List[Future[Int]])(using ExecutionContext): Future[Int] = + val b = x.zip: + Future: + xs.headOption.toString + + val _: Future[(Int, String)] = b + + val c = x.alt: + Future: + b.value._1 + val _: Future[Int] = c + + Future: + val f1 = Future: + x.value * 2 + x.value + xs.map(_.value).sum + +end add + +def Main(x: Future[Int], xs: List[Future[Int]])(using ExecutionContext): Int = + Async.blocking(add(x, xs).value) + diff --git a/tests/run/suspend-strawman-2/monadic-reflect.scala b/tests/run/suspend-strawman-2/monadic-reflect.scala new file mode 100644 index 000000000000..394e6ced71c5 --- /dev/null +++ b/tests/run/suspend-strawman-2/monadic-reflect.scala @@ -0,0 +1,56 @@ +import scala.util.boundary +import runtime.suspend + +trait Monad[F[_]]: + + /** The unit value for a monad */ + def pure[A](x: A): F[A] + + extension [A](x: F[A]) + /** The fundamental composition operation */ + def flatMap[B](f: A => F[B]): F[B] + + /** The `map` operation can now be defined in terms of `flatMap` */ + def map[B](f: A => B) = x.flatMap(f.andThen(pure)) + +end Monad + +trait CanReflect[M[_]]: + def reflect[R](mr: M[R]): R + +trait Monadic[M[_]: Monad]: + + /** + * Embedding of pure values into the monad M + */ + def pure[A](a: A): M[A] + + /** + * Sequencing of monadic values + * + * Implementations are required to implement sequencing in a stack-safe + * way, that is they either need to implement trampolining on their own + * or implement `sequence` as a tail recursive function. + * + * Actually the type X can be different for every call to f... + * It is a type aligned sequence, but for simplicity we do not enforce this + * here. + */ + def sequence[X, R](init: M[X])(f: X => Either[M[X], M[R]]): M[R] + + /** + * Helper to summon and use an instance of CanReflect[M] + */ + def reflect[R](mr: M[R])(using r: CanReflect[M]): R = r.reflect(mr) + + /** + * Reify a computation into a monadic value + */ + def reify[R](prog: CanReflect[M] ?=> R): M[R] = + boundary [M[R]]: + given CanReflect[M] with + def reflect[R2](mr: M[R2]): R2 = + suspend [R2, M[R]] (s => mr.flatMap(s.resume)) + pure(prog) + +end Monadic \ No newline at end of file diff --git a/tests/run/suspend-strawman-2/runtime.scala b/tests/run/suspend-strawman-2/runtime.scala new file mode 100644 index 000000000000..fa14fba07f7e --- /dev/null +++ b/tests/run/suspend-strawman-2/runtime.scala @@ -0,0 +1,8 @@ +package runtime +import scala.util.boundary, boundary.Label + +/** Contains a delimited contination, which can be invoked with `resume` */ +class Suspension[-T, +R]: + def resume(arg: T): R = ??? + +def suspend[T, R](body: Suspension[T, R] => R)(using Label[R]): T = ??? diff --git a/tests/run/suspend-strawman-2/simple-futures.scala b/tests/run/suspend-strawman-2/simple-futures.scala new file mode 100644 index 000000000000..0a80a74d49dc --- /dev/null +++ b/tests/run/suspend-strawman-2/simple-futures.scala @@ -0,0 +1,53 @@ +package simpleFutures + +import scala.collection.mutable.ListBuffer +import scala.util.boundary, boundary.Label +import runtime.suspend + +object Scheduler: + def schedule(task: Runnable): Unit = ??? + +trait Async: + def await[T](f: Future[T]): T + +class Future[+T](body: Async ?=> T): + private var result: Option[T] = None + private var waiting: ListBuffer[T => Unit] = ListBuffer() + private def addWaiting(k: T => Unit): Unit = waiting += k + + def await(using a: Async): T = a.await(this) + + private def complete(): Unit = + Future.async: + val value = body + val result = Some(value) + for k <- waiting do + Scheduler.schedule(() => k(value)) + waiting.clear() + + Scheduler.schedule(() => complete()) + +object Future: + + // a handler for Async + def async(body: Async ?=> Unit): Unit = + boundary [Unit]: + given Async with + def await[T](f: Future[T]): T = f.result match + case Some(x) => x + case None => suspend[T, Unit](s => f.addWaiting(s.resume)) + body + +end Future + +def Test(x: Future[Int], xs: List[Future[Int]]) = + Future: + x.await + xs.map(_.await).sum + + + + + + + + diff --git a/tests/run/suspend-strawman-2/streams.scala b/tests/run/suspend-strawman-2/streams.scala new file mode 100644 index 000000000000..f6bc61c73ef1 --- /dev/null +++ b/tests/run/suspend-strawman-2/streams.scala @@ -0,0 +1,18 @@ +package concurrent +import scala.util.{Try, Success, Failure} + +type Stream[+T] = Future[StreamResult[T]] + +enum StreamResult[+T]: + case More(elem: T, rest: Stream[T]) + case End extends StreamResult[Nothing] + +import StreamResult.* + +extension [T](c: Channel[Try[T]]) + def toStream(using Async.Config): Stream[T] = Future: + c.read() match + case Success(x) => StreamResult.More(x, toStream) + case Failure(ex: ChannelClosedException) => StreamResult.End + case Failure(ex) => throw ex +