Skip to content

Strawman: Suspensions for algebraic effects #16739

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 52 commits into from

Conversation

odersky
Copy link
Contributor

@odersky odersky commented Jan 21, 2023

I have called boundary and break as implemented in #16612 the foundation for algebraic effects in Scala. Let me try to spin that a bit further.

An algebraic effect system allows the definition of scoped handlers. The scope is delimited by a boundary that is also called a prompt. A handler can be called with arguments inside the scope for which it is defined. The handler can choose to return to the calling code with some result value, or abort the computation inside the prompt, returning some final result value instead. For us, boundary is the prompt, break is the abort, and handlers are regular functions that can return normally or abort with break.

boundary and break can be implemented with exceptions, and that's in fact how their implementation on the JVM works. But on another platform they could also be implemented in a more low-level (and possibly more efficient) way that does the necessary stack unrolling directly.

If we look at the algebraic effect literature, then many algebraic effect examples also make interesting use of continuations. One approach would be to declare that to be orthogonal. We can implement continuations with coroutines or green threads and all that logic can be embedded in the handler. But there's a missed opportunity here: A boundary.Label defines a part of the stack between the prompt and the current stack pointer. That's exactly what we need for a delimited continuation!

So, can we turn Labels into continations? Assuming enough low-level runtime machinery, we can. Here's a way to express this:

We can define in boundary a method

  def suspend[R]()(using label: Suspension[R]): Unit

Here, a Suspension is a class defined as follows:

class Suspension[+R]:
    def resume(): R

When suspend is called, the effect is a break to the passed label's boundary with the current suspension as argument. This suspension is in essence the call stack up to the label's boundary. The suspension is returned as the result to the handler that contains the boundary. The handler code can then store the suspension, and re-invoke it one or several times using the resume method in Suspension. The delimited continuation contains the boundary itself, which means that one can suspend multiple times.

To summarize: handlers for delimited continuations define labels that accept Suspension[T] values. A suspend operation returns to the handler with the delimited continuation that represents the call stack at the point of suspend. The handler can resume that computation using the suspension's resume method.

In practice, we sometimes need to pass some additional value together with the current suspension to the boundary. I have arranged for that by letting Suspension and suspend an additional parameter:

/** Contains a delimited contination, which can be invoked with `run`,
 *  plus some other value that is returned from a `suspend`.
 */
case class Suspension[+T, +R](x: T):
  def resume(): R = ???

/** Returns `Suspension(x)` to the boundary associated with the given label */
def suspend[T, R](x: T)(using Label[Suspension[T, R]]): Unit = ???

One could also add a third type parameter representing the result of suspend in the continuation code. I have not found convincing examples yet that could not work around this, so for simplicity of demonstration, the result is fixed as Unit.

EDIT: 15d270b provides an alternative to this scheme. It leaves Suspension as a single-parameter class and instead allows to wrap the suspension created in a suspend with some function.

Looking at the literature, I have not found too many convincing use cases of algebraic effects. The typical ones are exception like aborts (which just need break, but not suspend), use of some state "on the side" (which can be handled directly), async computation, and non-deterministic or exhaustive choice. This PR contains one very simplistic example for each of the last two: A simple system of futures and a simple system of choice in the style of Verse.

Of course, none of this works yet, since we do not have the means to suspend. This PR is there to illustrate what we could do if that changes, and to start deliberations what the right primitive should be.

@odd
Copy link

odd commented Jan 22, 2023

The handler code can then store the suspension, and re-invoke it one or several times using the restore method in Suspension.

Typo: the method in Suspension shown above is called resume.

@odersky
Copy link
Contributor Author

odersky commented Jan 23, 2023

This approach sits somewhat in the middle between traditional delimited continuations (DC) and traditional algebraic effects (AE). All three approaches also have strong connections with coroutines / green threads / virtual threads.

Comparison with Delimited Continuations

The boundary prompt corresponds to reset and suspend loosely corresponds to shift. In DC, shift is an operator that binds a continuation to a variable. When one does that, there's always a twist needed that distinguishes the continuation handling code from the continuation itself. Since everything is deterministic, even if one manages to store the "future" in a variable, it will not be able to influence that "future". So what gets stored as a continuation is only a part of the "future" and the continuation handling code is separate. In the case of DL this split is represented by the inside vs the outside of the shift operator, but that's not very obvious. By contrast, algebraic effects make this split clearer since it is the handler at the prompt that processes the continuation of the code contained inside it.

EDIT: After iterating with @b-studios suggestions it turns out that the new version of suspensions corresponds exactly to (multi-prompt) delimited continuations with boundary as reset and suspend as shift0. See #16739 (comment).

Comparison with Algebraic Effects

In algebraic effects, an effect is propagated outwards as a signal much like an exception. When it reaches a handler, the handler can decide to abort, or resume the computation. Resumption can be immediate or delayed, and it can happen once or multiple times. By contrast, in our proposal, we simply use functions as effect handlers. A handler is executed as a normal function at the point where it is called. It can then return normally (that corresponds to a simple resumption), or abort to an enclosing boundary with some value using a break or suspend. Compared to AE, the control logic is internal to the handler rather than exposed to the program.

Comparison with Coroutines

Suspensions are the essence of one possible way to implement coroutines, which happens to be the one used in project Loom: When suspending a coroutine, copy its stack and execution state to an object. When resuming it, copy these data back to the stack and registers of an existing thread. Suspensions are this essence and nothing more; all other aspects of coroutines or virtual threads can be implemented in libraries on top of them. Suspensions look a bit more flexible than coroutines in the sense that they offer greater flexibility which part of a thread stack is suspended.

def resume(): R = ???
object Suspension:
def apply[R](): Suspension[R] =
??? // magic, can be called only from `suspend`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@odersky Thanks for the proposal and first steps toward suspension!

In case it serves as a reference, we are planning from Xebia to send a SIP for suspension based on our work and experience with Arrow and the original PRE-SIP discussion for suspended functions and continuations

We are developing a compiler plugin that implements this kind of functionality.
In that regard, I'm attaching some links in case it helps with the next steps for this proposal.
To support Suspension or Suspend as implicit and make it non-blocking here we have followed a similar approach as the kotlin compiler does, where it changes the function declaration to obtain the continuation argument and return Object so it can thread the suspended program as callbacks.
The ultimate goal of this style of desugaring, regardless of boundary and others, is to be able to implement the following operation or any other callback in a non-blocking reactive runtime.

def await[A](f: Future[A])(using Suspend): A =
  summon[Suspend].suspendContinuation[A] { continuation =>
    // complete on future complete and wire future cancellation if any
  }

In regards to boundary or similar scope abstractions, something we ran into is that for interop with other frameworks in the JVM, the thrown exception needs to be implemented in terms of CancellationException.

This is so that a boundary can have async tasks inside that may get canceled when the scope exits with an exception. Similarly, in Kotlin, they use the couroutineScope and observe CancellationException in the std lib

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this looks very close to what we need if there are no native suspensions. The question is, can you do this without having to split higher-order functions on the user side? I.e. can you work with a map signature like this:

extension [A](xs: List[A])
  def map[B](f: A => B): List[B]

and have it work for asynchronous functions f as well? It seems to me that you will have to duplicate maps code to specialize for sync vs async. If you can do this automatically in the code generation, that would be ideal.

Also agree that we need to integrate with cancellation. The strawmans here don't do that yet, since they are just the first baby steps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is multiple solutions to implementing continuation capture without user facing duplication of higher-order functions. They typically require either:

  1. a specialized runtime system (ideally with a meta-stack, a linked list of barrier delimited call stacks)
  2. or a different compilation strategy (for example into CPS, or a la generalized exceptions)

Before saying which strategy is the best it is important to say:

  1. which platform it should run on (what is the calling convention on the platform, are tail calls supported, etc.)
  2. what are the desired performance characteristics (direct style code, continuation capture, resumption, ec.)
  3. what style of resumption is desired (resume once, multiple times, only tail, ...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which platform it should run on (what is the calling convention on the platform, are tail calls supported, etc.)

I think we are looking for solutions on all platforms: JVM with or without Loom, Scala Native, and Scala JS. This means we might need more than one solution.

what are the desired performance characteristics (direct style code, continuation capture, resumption, ec.)

I believe a solution should not slow down direct-style code. Typical use case for resumptions is async code waiting on external events.

what style of resumption is desired (resume once, multiple times, only tail, ...)

The use case that must be supported is async, which I believe can be expressed with tail continuations? The rest is good to have but not required if doing so would break some of the other requirements.

-3 + 2
-3 + 3
*/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a slightly different design of suspend which gives rise to a more "traditional" implementation of choose.

class Suspension[T, R]:
  def resume(arg: T): R = ???

def suspend[T, R](body: Suspension[T, R] ?=> R)(using Label[R]): T

trait Choice {
  def choose[A](choices: A*): A
}

// the handler
def choices[T](body: Choice ?=> T): List[T] =
  boundary [List[T]] {
    given Choice {
      def choose[A](choices: List[A]): A = suspend [A, List[T]] {
        choices.flatMap(a => resume(a))
      }
    }
    List(body)
  }

@main def test: List[Int] =
  def x = choose(1, -2, -3)
  def y = choose("ab", "cde")
  choices:
    val xx = x;
    xx + (
      if xx > 0 then y.length * x
      else y.length
    )

I purposefully annotated the types at boundary and suspend to highlight how the answer types look like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note how the implementation details of Choice are completely encapsulated at the definition site (choices). The user does not need to be aware that there are boundaries involved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is also a slightly modified version of async await with the above interface:

trait Future[T] {
  private var result: Option[T] = None
  private var waiting: ListBuffer[Runnable] = ListBuffer()

  def await()(using a: Async): T = a.await(this)
}

trait Async {
  def await[T](f: Future[T]): T
}

object Future {
  private def complete[T](f: Future[T], value: T): Unit =
    f.result = Some(value)
    f.waiting.foreach(Scheduler.schedule)
    f.waiting = ListBuffer()

  // a handler for Async
  def async(body: Async ?=> Unit): Unit =
    boundary [Unit] {
      given Async {
        def await[T](f: Future[T]): T = f.result match 
          case Some(x) = x
          case None => suspend [T, Unit] { k ?=> 
            val blocked = () => k.resume();
            f.waiting += blocked;
            f.await
          }
      }
      body
    }
}

def test(x1: Future[Int], x2: Future[Int]) =
  Future.async:
    println(x1.await + x2.await)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another indication that this slightly modified interface of Suspension is more "traditional" is that it is trivial to implement monadic reflection:

trait CanReflect[M[_]] {
  def reflect[R](mr: M[R]): R
}

trait Monadic[M[_]] {

  /**
   * Embedding of pure values into the monad M
   */
  def pure[A](a: A): M[A]

  /**
   * Sequencing of monadic values
   *
   * Implementations are required to implement sequencing in a stack-safe
   * way, that is they either need to implement trampolining on their own
   * or implement `sequence` as a tail recursive function.
   *
   * Actually the type X can be different for every call to f...
   * It is a type aligned sequence, but for simplicity we do not enforce this
   * here.
   */
  def sequence[X, R](init: M[X])(f: X => Either[M[X], M[R]]): M[R]

  /**
   * Helper to summon and use an instance of CanReflect[M]
   */
  def reflect[R](mr: M[R])(using r: CanReflect[M]): R = r.reflect(mr)

  /**
   * Reify a computation into a monadic value
   */
  def reify[R](prog: CanReflect[M] ?=> R): M[R] = boundary [M[R]] {
    given CanReflect[M] {
      // We could even shorten this to `suspend(mr.flatMap)` which makes the connection obvious.
      def reflect[R](mr: M[R]): R = suspend [R, M[R]] { k ?=> mr.flatMap { r => k.resume(r) } }
    }
  }
}

This is also a practical indication of its usefulness, since monadic reflection (we might need a less scary name) is a perfect way of switching back-and-forth between "old" monadic library implementations and new ones using control effect, as illustrated in our repo:

https://github.com/lampepfl/monadic-reflection/blob/main/core/src/main/scala/monadic/examples/FutureIO.scala

Copy link
Contributor Author

@odersky odersky Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played a bit with these alternative encodings. I completed the futures sketch to working test case in 9538783. There's a slight hitch with variance, if we want to make Future covariant. But it can be solved with an @uncheckedVariance annotation since the usage is arguably sound. I left a comment in the test case.

But I have a more serious doubt. How is this snippet supposed to work:

        def await[T](f: Future[T]): T = f.result match
          case Some(x) => x
          case None =>
            suspend[T, Unit]: s ?=>
              f.waiting += (v => s.resume(v))
              f.await()

Here, the code following s ?=> needs to be executed before we resume since it installs the suspension in the waiting set of the other future. But then we follow this by f.await(). Isn't that an infinite recursion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have it: We just need to shift the final await two spaces to the left:

        def await[T](f: Future[T]): T = f.result match
          case Some(x) => x
          case None =>
            suspend[T, Unit]: s ?=>
              f.waiting += (v => s.resume(v))
            f.await

Then it should work. Right?

Copy link
Contributor Author

@odersky odersky Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the difference between the previous and alternative encoding amounts the following:

In the previous encoding, we return a suspension as part of the value to the enclosing boundary. The boundary then acts on the suspension by adding it to the waiting set. In the new encoding, we install the suspension directly and simply return with () to the boundary. I like that part.

Furthermore, in the previous encoding, suspend() always was of type Unit whereas now it returns a value that gets passed to resume. At least as far as futures are concerned, that part seemed make things a bit more complicated.

Copy link
Contributor Author

@odersky odersky Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a middle version futuresALT2 that reverts back to unit-valued suspends while keeping the general design of futuresALT.

Copy link
Contributor

@b-studios b-studios Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In futuresALT, since we resume with a value we don't need the f.await in:

suspend[T, Unit]: s ?=>
  f.waiting += (v => s.resume(v))
-f.await

(unless I am completely mistaken)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct. I pushed a simplified version.

@fwbrasil
Copy link
Contributor

fwbrasil commented Jan 30, 2023

I've been working on an algebraic effects library in Scala 3 called Kyo. It's still experimental but it already has several effects implemented, it might be a good source of inspiration to find real-world effects to explore. I'm not sure I follow the logic of the Choices effect but it seems to be the equivalent of the Lists effect in Kyo. Here's an example that seems to match what is being pursued in one of the examples in this PR:

https://scastie.scala-lang.org/tRMNGHk9Rp6jCfpfSXWdHw

import language.implicitConversions

import kyo.core._
import kyo.direct._
import kyo.lists._

val x = Lists(1, -2, -3)
val y = Lists("ab", "cde")

val v: Int > Lists =
  Kyo.direct {
    val xx = Kyo(x)
    xx + (
      if (xx > 0) then Kyo(y).length * Kyo(x)
      else Kyo(y).length
    )
  }

val a: List[Int] = Lists.run(v)
require(a == List(3, 4, -3, -5, -5, -8, 0, 1, -1, 0))

A few observations:

  • The order of the results seems different from what is described in the PR's code but I'm guessing the example wasn't supposed to be 100% accurate.
  • I'm using the kyo-direct module in this example, which is a thin wrapper of dotty-cps-async with async/await renamed to Kyo.direct/Kyo. Kyo also provides a regular monadic API.
  • The > infix type is used to encode effects in the type system. The first param is the type of the computation and the second one is a type-level set of effects pending for handling. The type-level set is encoded using a type union so Nothing means zero pending effects (a pure value).

Kyo also provides effect polymorphism so it's possible to mix other effects in the same computation. In this other example xx * 10 is forked using a fiber:

https://scastie.scala-lang.org/BQ98H2HoRR273BNig10Yeg

import kyo.core._
import kyo.direct._
import kyo.lists._
import kyo.ios._
import kyo.concurrent.fibers._

val x = Lists(1, -2, -3)
val y = Lists("ab", "cde")

val v: Int > (Lists | IOs | Fibers) =
  Kyo.direct {
    val xx = Kyo(x)
    Kyo(Fibers.fork(xx * 10)) + (
      if (xx > 0) then Kyo(y).length * Kyo(x)
      else Kyo(y).length
    )
  }

val a: List[Int] = IOs.run(Fibers.block(Lists.run(v)))
require(a == List(12, 13, 6, 4, 4, 1, -18, -17, -28, -27))

The Lists effect is implemented here: https://github.com/fwbrasil/kyo/blob/main/kyo-core/src/main/scala/kyo/lists.scala. It's interesting to observe that it's possible to implement a complete effect system like Kyo without specific language extensions.


end Future

def Test(x: Future[Int], xs: List[Future[Int]])(using Async) =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

param using Async is not required for this test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed.

@odersky
Copy link
Contributor Author

odersky commented Feb 1, 2023

@fwbrasil Thanks for the pointer. I also looked at your talk at Functional Scala. This looks interesting, but I am still at a loss to grok all the details. It seems > lifts an initializer to be a lambda and < applies that lambda again? If that's the case this looks like a variant of workflow expressions to me, am I on the right track? It would be good to explain these things in more detail and in particular to work out the relationship between this and what's usually called algebraic effect systems. Algebraic effects systems are typically described either as resumable exceptions or as some play on delimited continuations. I did not see where either of these concepts come up in your design, but maybe I am missing something.

@fwbrasil
Copy link
Contributor

fwbrasil commented Feb 1, 2023

@odersky I'm sorry that the project doesn't have documentation yet, I have a couple of things to finish in the implementation and will work on it after that. Regarding Kyo and algebraic effects, the elements of a complete algebraic effect system are present. Suspensions (> at the term level) are similar to the mechanism being created here and are "abstract" operations that have meaning only when effects are later handled (using <).

I think the confusion could be because the effect handling itself happens via an implicit handler so it's harder to see the algebraic effect mechanism. Let's take the Tries effect as an example:

https://scastie.scala-lang.org/ruK93TeGRtqEpYjOCSC9Lw

import language.implicitConversions

import kyo.core._
import kyo.tries._
import scala.util.Try

val v1: Try[Int] = Try(1)

// suspend the `Tries` effect
val v2: Int > Tries = v1 > Tries

// since the `Tries` effect is suspended,
// Kyo only saves transformations
// (apply is both map and flatMap in Kyo)
val v3: String > Tries = v2(_ + 1)(i => s"i$i")

// handle the `Tries` effect, which invokes
// the implicit handler's apply method with
// the suspended value and the continuation
// (`handler(Try(1), v => v(_ + 1)(i => s"i$i"))`)
val v4: Try[String] > Nothing = v3 < Tries

// since there are no pending effects, the
// computation can be converted to a pure value
val v5: Try[String] = v4

println(v5) // Success(i2)

The handler for the Tries effect basically short-circuits the computation by discarding the continuation and suspending Failure again if necessary:

https://github.com/fwbrasil/kyo/blob/main/kyo-core/src/main/scala/kyo/tries.scala

given ShallowHandler[Try, Tries] with {

  def pure[T](v: T) =
    Success(v)

  override def handle[T](ex: Throwable) =
    Failure(ex) > Tries

  def apply[T, U, S](m: Try[T], f: T => U > (S | Tries)): U > (S | Tries) =
    m match {
      case m: Failure[T] =>
        m.asInstanceOf[Failure[U]] > Tries
      case _ =>
        try f(m.asInstanceOf[Success[T]].value)
        catch {
          case ex if (NonFatal(ex)) =>
            Failure(ex) > Tries
        }
    }
}

An interesting fact about Kyo is that the continuation (f in this code) is a regular pure function instead of a magic resume operation, which gives a lot of freedom on how effects can be handled. Some effects can even choose to save the continuation to use it later.

@odersky
Copy link
Contributor Author

odersky commented Feb 2, 2023

@b-studios I did some cleaning up where the old files are in a directory suspend-strawman-1 and the files embodying your suggestions are in suspend-strawman-2. I also added monadic reflection. I had to massage it a little bit to compile. Essentially, the [R] parameter on CanReflect's reflect had to go to the enclosing trait CanReflect. Also the body of the boundary call must end in pure(prog), not prog. Can you take a look and let me know whether you agree?

Overall, choices came out much nicer in strawman-2 and for futures it's more or less the same. That, plus monadic reflection is a good indication that we should go with the strawman-2 design.

EDIT: As you noted, and wanted to imply with the k parameter name, it comes out even more elegantly if we replace Suspension[T, R] with T => R. But in the end that might be a step too far. I think it's valuable to see in the type that a function is really a suspension.

@odersky
Copy link
Contributor Author

odersky commented Feb 2, 2023

@fwbrasil Thanks for the explanations. The implementation a bit overwhelming with all these type parameters floating around. Some questions:

  • Why do you import implicitConversions? Where do they apply? In fact we want to get rid of most implicit conversions in Scala, so this might be a red flag.
  • I did not understand v2(_ + 1)(i => s"i$i") what is the significance of the two lambdas? Given the Tries effect, I would have thought one is a success handler and the other an error handler. But that does not correspond to the program logic in a way I can see.
  • Could you have printed v4 as well with the same result?

Overall, it looks like there's some hidden reification of a program into functions going on. How does that work with classical side effects? Are they delayed? Also, conversion into cps is reputed to be the slow compared to other suspension techniques, see for instance this paper. Do you think that could become a problem?

/**
* Reify a computation into a monadic value
*/
def reify[R](prog: CanReflect[M, R] ?=> R): M[R] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side remark: I know one can choose type parameter names liberally, but it would help readability to use the same letter for the same concept, i.e. R in Suspension represents continuation whether here it is an equivalent of T.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

R represents the continuation result. That's morally the same here, i.e. it's the result type of the effect monad.

@fwbrasil
Copy link
Contributor

fwbrasil commented Feb 2, 2023

Why do you import implicitConversions? Where do they apply? In fact we want to get rid of most implicit conversions in Scala, so this might be a red flag.

Kyo uses an implicit conversion to express an isomorphism. Values without pending effects (e.g. Int > Nothing) are represented internally in Kyo without any boxing and can be converted back and forth from/to Kyo via identity implicit conversions: T > Nothing to T and T to T > Nothing. The conversions have no special logic other than the type cast. This seems a genuine use of Conversion but if the feature goes away, it'd be a question of adapting the API (it would hurt usability a bit, though).

I did not understand v2(_ + 1)(i => s"i$i") what is the significance of the two lambdas? Given the Tries effect, I would have thought one is a success handler and the other an error handler. But that does not correspond to the program logic in a way I can see.

These are transformations like in any other monad, at this point of the computation there's no handling of effects, only accumulation of transformations. Transformations are a bit different in Kyo. Since pure values are members of the > type via the implicit conversion, there's no need to differentiate between map and flatMap in most cases and the apply method plays the role of both. The code is equivalent to v2.map(_ + 1).map(i => s"i$i"), which is a syntax Kyo also supports: https://scastie.scala-lang.org/4Q75Op3qSG6BNcBcVIud6g

Could you have printed v4 as well with the same result?

Yes, that's correct: https://scastie.scala-lang.org/L63UuleESGqY9vibK2WFkw

Overall, it looks like there's some hidden reification of a program into functions going on. How does that work with classical side effects? Are they delayed?

Similarly to other effect systems, side effects must be wrapped with the IOs effect, which delays the execution until the handling of the IOs effect. For example: IOs(println("hello")).

I'm not sure what you mean regarding reification but I guess you're thinking some sort of macro system, which isn't the case. Kyo's core mechanism is relatively simple and uses regular functions and values to accumulate transformations and handle effects. You can read Kyo's core implementation in just about 300 lines of code: https://github.com/fwbrasil/kyo/blob/main/kyo-core/src/main/scala/kyo/core.scala.

Also, conversion into cps is reputed to be the slow compared to other suspension techniques, see for instance this paper. Do you think that could become a problem?

No at all, Kyo integrates with dotty-cps-async in a way leaves no additional overhead. That's achieved by making the methods used by the transformation inline: https://github.com/fwbrasil/kyo/blob/main/kyo-direct/src/main/scala/kyo/direct.scala#L72. The CPS transformation doesn't even show up in the generated bytecode.

Kyo itself is also carefully crafted for high performance based on my experience working on the performance of this kind of library for almost a decade now. You can see a preview of the benchmark results here: https://jmh.morethan.io/?source=https://gist.githubusercontent.com/fwbrasil/ca0be39eff7807c11346de6a17e9c02c/raw/e7885eb645318ae5c016ae65742692f25b719620/jmh-results-graal.json

trait CanReflect[M[_], R] {
def reflect(mr: M[R])(using Monad[M]): R
}
trait CanReflect[M[_]: Monad, R]:
Copy link
Contributor

@b-studios b-studios Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the context bound here really necessary? In our monadic reflection implementation, we do not specify the bound here. The moment we need the fact that M is a monad is only at reify.

I think having the bound here introduces non-essential dependencies that might make the interface more difficult to understand than it needs to be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the bound is not necessary.

@b-studios
Copy link
Contributor

b-studios commented Feb 3, 2023

Essentially, the [R] parameter on CanReflect's reflect had to go to the enclosing trait CanReflect.

I think this way the interface is not generic enough. I think you really want it to be rank-2:

So instead of

trait CanReflect[M[_], R]:
  def reflect(mr: M[R]): R

you want

trait CanReflect[M[_]]:
  def reflect[R](mr: M[R]): R

For example, if you have the list monad, you want to be able to reflect both Int out of List[Int] as well as String out of List[String].

@b-studios
Copy link
Contributor

b-studios commented Feb 3, 2023

I guess you wanted to move the R out because it was difficult to get it type correct? This is to be expected :) The answer type needs to align "in the end" (which is at the delimiter), but might have arbitrary many different answer types between the call to reflect and the one to reify. That is you might have the following sequence

reflect at type A
flatMap A => B
flatMap B => C
flatMap C => ... R
reify List at R

If you don't implement type aligned sequences, there is no way around a (safe) cast like in:
https://github.com/lampepfl/monadic-reflection/blob/main/core/src/main/scala/monadic/Monadic.scala#L43

Edit: see fix below -- I was confused by a different experiment with project Loom.

@b-studios
Copy link
Contributor

Also the body of the boundary call must end in pure(prog), not prog.

Yes, that is completely right. Sorry if I missed that one (can also be found here)

boundary [M[R]]:
given CanReflect[M] with
def reflect[R2](mr: M[R2]): R2 =
suspend [R2, M[R]] (s => mr.flatMap(s.resume))
Copy link
Contributor

@b-studios b-studios Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I was just about to propose this. I mixed up that for my project loom experiments the casts were necessary since they implement coroutines and not proper continuations.

One more argument for proper continuations: no casts ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed! I am very pleased how everything simply holds together.

Copy link
Contributor

@b-studios b-studios Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In retrospect, shouldn't be too surprising recalling Filinski's original equation:

which in Scalaish syntax translates to:

def reify(e) = boundary(pure(e))
def reflect(e) = suspend(k => e.flatMap(k.resume))

* is bind / flatMap in his paper:

(note that there is the different that here we implement multiprompt delimited control; so we do not fixate ourselves to one monad, but can layer monads -- something that Filinski envisioned 5 years after the original '94 paper)

(notenote: multi-prompt delimited control and capabilities are a natural perfect fit: having unique term-level prompts, we can pass them around, close over them, etc.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat! So this reproduction of Filinski's work shows that boundary/suspend can serve as the canonical way to reflect monads, and therefore to represent effects. That strengthens the case for it considerably.

Copy link
Contributor

@b-studios b-studios Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! boundary is just a more friendly name for a (multiprompt) reset and suspend is a much more friendly name for a multiprompt version of shift0 :)

Monads, (multiprompt) delimited control, and monadic reflection have "comparable" expressive power -- the devil is in the details if you actually want to prove things there.

And just to make this even more clear: I am totally in favor of choosing battle-proven, theoretically well-founded delimited control operators as a foundation. The one proposed here "reset_p/shift0_p" is actually my personal favourite since it gives rise to a very nice compositional semantics and allows many different (and efficient) implementation strategies.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@odersky @b-studios In response to how to suspend:

I apologize for the length of this ahead of time. The longer code examples are linked with gists below. The full examples and transformed outputs will be linked at the bottom of this reply.

We've completed our first round of the suspend CPS compiler transformations inline with the design we proposed in Pre-SIP: Suspended functions and continuations in Scala 3.
. In brief, ours are one-shot delimited continuations. We derive the suspension point labels by analyzing the bodies of definitions declaring a given Suspend parameter. An example below:

package examples

import continuations.*

@main def ThreeDependentContinuations =
  def threeDependentContinuations(a: Int, b: Int, c: Int)(using s: Suspend): Int =
    val d = 4
    val continuationOne:Int = s.suspendContinuation(_.resume(Right(d + a)))
    val e = 5
    val continuationTwo:Int = s.suspendContinuation(_.resume(Right(continuationOne + e + b)))
    val f = 6
    val result:Int = s.suspendContinuation(_.resume(Right(continuationTwo + f + c)))
    result
  println(threeDependentContinuations(1,2,3))

https://gist.github.com/jackcviers/59c5d06c33a0a2508bec30a8f4c280a0

The output of the above program is:

21

During the transformation, a class extending the targeted platform's ContinuationImpl and the input variables and intermediary suspended results is generated. It implements invokeSuspend to call the user's transformed suspended definition. Gist

We then instantiate the generated continuation class and cycle through labels in the order of the resumed continuations, assigning the result to a local result variable, and jumping to the break label of the next continuation or dependent calculation. Each resume call is handeled by a SafeContinuation class instance, that stores the continuation state (Undecided, Suspended, Resumed, Error|Result) and ensures that the continuation cannot be resumed with more than one value. This state variable is a volatile variable, and we transition between states using compare and swap. The suspend parameter is removed from the declaration and a completion continuation parameter is inserted and the return type is modified to include Null:

def threeDependentContinuations(a: Int, b: Int, c: Int, 
  completion: continuations.Continuation[Int]
): Int | Null | (
  continuations.Continuation.State.Suspended : 
    continuations.Continuation.State
)

The state cases are rather repetitive, so I'll just paste a single case snippet of the user's transformed method in Gist.

This continues until we have a final value:

case 3 =>
  // ...
  result = $result.asInstanceOf[Int]
  // ...

Which is then returned.

It works with asynchronous resumes. However, due to the result returning the continuation state (Suspended) when resume has not yet been called, you will get Suspended as the result of the CPS transformation:

...
given ExecutorService = Executors.newWorkStealingPool()

@main def ListMapNoResume = 
  def twoArgumentsOneContinuationsCFBefore(x: Int, y: Int): Suspend ?=> Int | Continuation.State.Suspended.type =
    println("twoArgumentsOneContinuationsCFBefore")
    val z = 1
    summon[Suspend].suspendContinuation[Int]{ c =>
      summon[ExecutorService].submit(new Runnable{
        override def run =
          val sleepTime = Random.between(10, 5000)
          println(s"sleepTime $sleepTime")
          Thread.sleep(sleepTime)
          c.resume(Right(x + y + z))})
    }

  val mappedContinuations = (1 to 100).toList.map(twoArgumentsOneContinuationsCFBefore(1, _))
  println(mappedContinuations)

Results Gist

It is expected programmers would use async/await to block and wait until each continuation is resumed. Note this is current WIP.

No modification is made to List#map in this transformation:

val mappedContinuations: List[Matchable] = 
[info]           intWrapper(1).to(100).toList.map[Matchable](
[info]             {
[info]               def $anonfun(_$1: Int): Matchable = 
[info]                 twoArgumentsOneContinuationsCFBefore(1, _$1, 
[info]                   continuations.jvm.internal.ContinuationStub.contImpl
[info]                 )
[info]               closure($anonfun)
[info]             }
[info]           )

Since we are writing the scheduler and runtime, currently, we stub in an identity continuation for the initial continuation when calling suspended definitions to kick everything off at runtime. This, as with the interceptors, is planned to be platform specific.

There are some optimizations that we implement as well. For example, when a suspended definition has a constant body, we don't impose the use of a state machine:

def zeroArgumentsZeroContinuations()(using Suspend): Int = 1
//becomes
def zeroArgumentsZeroContinuations(
  completion: continuations.Continuation[Int | Any]
): Any = 1

Our Suspend is only a marker trait. Our continuation doesn't abstract over the input. We provide for a context tuple holding services that we can search by type to apply for contextual handlers, but I anticipate that being lifted to givens and/capabilities for things not necessary for different initial continuations. To make the interface as simple as possible to use we don't rely upon declared delimiters or additional parameters, though it does impose a cost in the implementation complexity.

I believe we can accommodate the strawmen here, though it will be different supporting multiple resumes, and puts the control of label insertion in the hands of the users. That may make the CPS transformation simpler, IMHO.

My motivation for the PRE-SIP was also Filinski 1994, and algebraic effects. I want interop with existing monadic libraries, but also to be able to compose multiple effects (State, Nondeterminism (choice in this proposal), error handling, console IO, monadic composition) without transformer stacks and eval trampolines for stack-unsafe monadic effects. Mostly things that are shown inMatija Pretnar/2015/An Introduction to Algebraic Effects and Handlers.

Full source code of the above examples, and bytecode outputs available in this Gist. More working examples are available in the 47Deg/TBD repository.

Copy link
Contributor Author

@odersky odersky Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer! I have some questions that I try to ask for every scheme that implements continuations:

  • Which code is transformed to allow suspensions? I.e. what is the precise criterion that determines whether you transform code or leave it alone?
  • How is effect polymorphism handled? As an example, can you still work with a single definition of map with the standard signature? And is its code transformed or not?
  • How is the code transformed? CPS transform, state machines, capture/replay via exceptions, or something else?
  • What is the performance degradation for transformed code vs direct style code if the code does in the end not suspend at runtime? And is it the same after the first suspension?

Copy link

@jackcviers jackcviers Mar 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@odersky We've changed the definitions to match some of the Strawman
and literature, and we're in the process of making it match the strawman api. There is one exception in ours -- resume must be unit returning in order for the boundaries to work:

shift -> suspend

Which code is transformed to allow suspensions
And
How is effect polymorphism handled? As an example, can you still work with a single definition of map with the standard signature? And is its code transformed or not?

A suspended definition is a ValOrDefDef that includes Suspend in a
ContextFunction parameter position or in a using
ParameterClause.

E.g.

val suspendedFoo: (Suspend) ?=> Int = ...
def suspendedFooDef(using Suspend): Int = ...

Once a ValOrDefDef has been determined to be a suspended definition,
if the RHS has no calls to Suspend#shift, then no transformation of
the RHS occurs:

def constantSuspendedDefinition(i: Int)(using Suspend): Int = 1
//becomes
def constantSuspendedDefinition(i: Int, completion: Continuation[Int]): Int = 1

If only a single call to Suspend.shift occurs, then we install the
SafeContinuation, which handles interception from the boundary
completion continuation, allowing for scheduling, cancellation, and
deferred completion. Otherwise, the code is normal:

def suspendsOneTime(i: Int)(using s:Suspend): Int = s.shift(continuation => continuation.resume(i + 1))
//becomes
def suspendsOneTime(i: Int, completion: Continuation[Int]): Any | Null | 
  (continuations.Continuation.State.Suspended : continuations.Continuation.State) = 
  val continuation1: continuations.Continuation[String] = completion
  val safeContinuation: continuations.SafeContinuation[String] =
    continuations.SafeContinuation.init[String](continuation1)
  {
    {
      safeContinuation.resume(i.+(1))
    }
  }
  safeContinuation.getOrThrow()

With two or more shifts, we generate a Frame class to hold the
inputs, intermediate state, Frame "program counter", and invoke and
interception machinery. Then we modify the suspended definition into a
state machine, switching on the Frame program counter, inserting
CaseDef clauses for each program counter in the suspended
definition, with the same SafeContinuation calls as above:

def threeDependentContinuations(a: Int, b: Int, c: Int)(using s: Suspend): Int =
    val d = 4
    val continuationOne: Int = s.shift(_.resume(d + a))
    val e = 5
    val continuationTwo: Int = s.shift(_.resume(continuationOne + e + b))
    val f = 6
    val result: Int = s.shift(_.resume(continuationTwo + f + c))
    result

becomes
this

Finally, calls to suspended definitions in source code are transformed
to include the boundary continuation (THIS IS CURRENT WIP AND IS
SUBJECT TO SMALL CHANGES):

blockingBoundary.createContinuation{
  threeDepenedentContinuations(1,2,3)
}

//becomes
val blockingBoundary = new BlockingBoundaryContinuation{}
threeDependentContinuations(1,2,3, blockingBoundary)
def suspendedDefCallsOtherSuspendedDef()(using Suspend): Int = otherSuspendedDef()
// becomes
def suspendedDefCallsOtherSuspendedDef(completion: Continuation[Int]): Int =
  otherSuspendedDef(completion)

The only problem comes with HOFs. For that we need async and await
and awaitAll for collections, or we need to reflect and reify other
monads. These can be introduced as a library:

def mapExample()(using Suspend): List[Int] =
  List(1,2,3).map(async{someSuspendedDef(_)).awaitAll // async returns Deferred[A], awaitAll waits until the deferreds all have resumed, or cancelled, an error in one cancels the others

We've also discussed the possibility of instrumenting all functions
and switching between the slow-path functions that actually suspend
and the uninstrumented version of the function at runtime with
@b-studios.

How is the code transformed? CPS transform, state machines, capture/replay via exceptions, or something else?

The code is CPS transformed implemented as a compiler phase. We use a
Frame stack machine per suspended definition with a frame
counter-based switch table, goto labels, and return from label.

Boundaries are implemented as continuation objects and provide
interception, scheduling, and cancellation. The default boundary is
non-blocking -- that is if resume isn't called yet when control is
returned to the calling function, the suspension point can return
Suspended instead of the result type.

Boundaries can also be blocking (automatic await behavior), or
concurrent (return Deffered[A] results that have a blocking await
method to retrieve the result or throw if the suspension was resumed
with an error). Blocking and concurrent boundaries implement
structured concurrency with cancellation.

See above with the exact details.

We're willing to discuss and experiment with other transformations --
we've discussed some with @b-studios.

What is the performance degradation for transformed code vs direct style code if the code does in the end not suspend at runtime?
And is it the same after the first suspension?

I have run jmh benchmarks with:

[info] # JMH version: 1.32
[info] # VM version: JDK 19, OpenJDK 64-Bit Server VM, 19+36-2238
[info] # VM options: <none>
[info] # Blackhole mode: full + dont-inline hint
[info] # Warmup: 5 iterations, 10 s each
[info] # Measurement: 5 iterations, 10 s each
[info] # Timeout: 10 min per iteration
[info] # Threads: 1 thread, will synchronize iterations
[info] # Benchmark mode: Throughput, ops/time

So, overhead of suspension vs raw scala code, and overhead of raw
scala code called after suspension

def SuspendedScalaCode =
  def suspendedScalaCode(a: Int, b: Int, c: Int)(using s: Suspend): Int =
    s.shift(_.resume(a + b + c))
  suspendedScalaCode(1,2,3)

def PassthroughAfterFirstSuspension =
  def passthroughAfterFirstSuspension(a: Int, b: Int, c: Int)(using s:Suspend): Int =
    val x = s.shift[Int](_.resume(a + b))
    x + c
  passthroughAfterFirstSuspension(1,2,3)

def Passthrough =
  def passthrough(a: Int, b: Int, c: Int)(using Suspend): Int =
    a + b + c
  passthrough(1, 2, 3)

def MeasureRawScala =
  def rawScala(a:Int, b: Int, c: Int): Int = a + b + c
  rawScala(1,2,3)

We determine the overhead of scala code after suspend by measuring the
overhead of SuspendedScalaCode and RawScala, then
PassthroughAfterFirstSuspension and RawScala, then measuring the
difference.

Overhead Type Percentage Overhead
suspended without suspend overhead 0.47%
suspension overhead 92.79%
regular scala code after suspension overhead 2.08%

Instead of having an additional value be a part of a suspension,
allow to wrap the suspension with a wrapper function passed to
`suspend`. This saves one type parameter on `Suspension` and gives
more flexibility in what gets returned to a boundary.
We now have in

  suspend-strawman-1: futures and choice in the original version
  suspend-strawman-2: futures, choices, and monadic reflection according to Jonathan's design
 - Add first-order poll method for convenience
 - Drop link() method
Was in the docs but not in the code.
 - Remove Channel.close since there is no good general
   implementation.
 - Make toStream work on Channel[Try[T]] instead.
The document for describing this has moved to the README.md in the lampepfl/async
project. I am leaving the code as tests to have a version that works with normal threads.
lampepfl/async requires Loom's virtual threads
private def complete(): Unit =
Future.async:
val value = body
val result = Some(value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be result = Some(value) (not val).


class Future[+T](body: Async ?=> T):
private var result: Option[T] = None
private var waiting: ListBuffer[T => Unit] = ListBuffer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiting is a mutable ListBuffer and can be immutable itself.

@natsukagami
Copy link
Contributor

There's an implementation of multi-shot delimited continuations in JVM in https://github.com/javactrl/javactrl. They seem to use bytecode instrumentation + some exception handling magic. The author also has a library that extends Javascript with some async primitives (delimited continuations are one of them): https://github.com/awto/effectfuljs

@odersky
Copy link
Contributor Author

odersky commented Apr 22, 2023

This is now its own project at lampepfl/async

@odersky odersky closed this Apr 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants