Skip to content

Commit 08a29dd

Browse files
committed
Add OriginalSource abstract class
1 parent c91fad9 commit 08a29dd

File tree

7 files changed

+33
-20
lines changed

7 files changed

+33
-20
lines changed

tests/run/suspend-strawman-2/Async.scala

+14-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ object Async:
9595
trait Source[+T]:
9696

9797
/** If data is available at present, pass it to function `k`
98-
* and return the result of this call.
98+
* and return the result of this call. Otherwise return false.
9999
* `k` returns true iff the data was consumed in an async block.
100100
* Calls to `poll` are always synchronous.
101101
*/
@@ -122,6 +122,19 @@ object Async:
122122

123123
end Source
124124

125+
/** An original source has a standard definition of `onCopmplete` in terms
126+
* of `poll` and `addListener`.
127+
*/
128+
abstract class OriginalSource[+T] extends Source[T]:
129+
130+
/** Add `k` to the listener set of this source */
131+
protected def addListener(k: Listener[T]): Unit
132+
133+
def onComplete(k: Listener[T]): Unit = synchronized:
134+
if !poll(k) then addListener(k)
135+
136+
end OriginalSource
137+
125138
/** A source that transforms an original source in some way */
126139
abstract class DerivedSource[T, U](val original: Source[T]) extends Source[U]:
127140

tests/run/suspend-strawman-2/channels.scala

+11-11
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import Async.{Listener, await}
88
/** An unbounded asynchronous channel. Senders do not wait for matching
99
* readers.
1010
*/
11-
class UnboundedChannel[T] extends Async.Source[T]:
11+
class UnboundedChannel[T] extends Async.OriginalSource[T]:
1212

1313
private val pending = ListBuffer[T]()
1414
private val waiting = mutable.Set[Listener[T]]()
@@ -37,8 +37,8 @@ class UnboundedChannel[T] extends Async.Source[T]:
3737
def poll(k: Listener[T]): Boolean = synchronized:
3838
drainPending(k)
3939

40-
def onComplete(k: Listener[T]): Unit = synchronized:
41-
if !drainPending(k) then waiting += k
40+
def addListener(k: Listener[T]): Unit = synchronized:
41+
waiting += k
4242

4343
def dropListener(k: Listener[T]): Unit = synchronized:
4444
waiting -= k
@@ -79,20 +79,20 @@ object SyncChannel:
7979
var r: Option[T] = None
8080
if k2 { x => r = Some(x); true } then r else None
8181

82-
val canRead = new Async.Source[T]:
82+
val canRead = new Async.OriginalSource[T]:
8383
def poll(k: Listener[T]): Boolean =
8484
link(pendingSends, sender => collapse(sender).map(k) == Some(true))
85-
def onComplete(k: Listener[T]): Unit =
86-
if !poll(k) then pendingReads += k
87-
def dropListener(k: Listener[T]): Unit =
85+
def addListener(k: Listener[T]) = synchronized:
86+
pendingReads += k
87+
def dropListener(k: Listener[T]): Unit = synchronized:
8888
pendingReads -= k
8989

90-
val canSend = new Async.Source[Listener[T]]:
90+
val canSend = new Async.OriginalSource[Listener[T]]:
9191
def poll(k: Listener[Listener[T]]): Boolean =
9292
link(pendingReads, k(_))
93-
def onComplete(k: Listener[Listener[T]]): Unit =
94-
if !poll(k) then pendingSends += k
95-
def dropListener(k: Listener[Listener[T]]): Unit =
93+
def addListener(k: Listener[Listener[T]]) = synchronized:
94+
pendingSends += k
95+
def dropListener(k: Listener[Listener[T]]): Unit = synchronized:
9696
pendingSends -= k
9797

9898
end SyncChannel

tests/run/suspend-strawman-2/choices.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import scala.util.boundary, boundary.Label
2-
import runtime.*
2+
import runtime.suspend
33

44
trait Choice:
55
def choose[A](choices: A*): A

tests/run/suspend-strawman-2/futures.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import java.util.concurrent.CancellationException
99

1010
/** A cancellable future that can suspend waiting for other asynchronous sources
1111
*/
12-
trait Future[+T] extends Async.Source[Try[T]], Cancellable:
12+
trait Future[+T] extends Async.OriginalSource[Try[T]], Cancellable:
1313

1414
/** Wait for this future to be completed, return its value in case of success,
1515
* or rethrow exception in case of failure.
@@ -56,10 +56,10 @@ object Future:
5656
def poll(k: Async.Listener[Try[T]]): Boolean =
5757
hasCompleted && k(result)
5858

59-
def onComplete(k: Async.Listener[Try[T]]): Unit = synchronized:
60-
if !poll(k) then waiting += k
59+
def addListener(k: Async.Listener[Try[T]]): Unit = synchronized:
60+
waiting += k
6161

62-
def dropListener(k: Async.Listener[Try[T]]): Unit =
62+
def dropListener(k: Async.Listener[Try[T]]): Unit = synchronized:
6363
waiting -= k
6464

6565
// Cancellable method implementations

tests/run/suspend-strawman-2/monadic-reflect.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import scala.util.boundary
2-
import runtime.*
2+
import runtime.suspend
33

44
trait Monad[F[_]]:
55

tests/pos/suspend-strawman-2/scheduler.scala renamed to tests/run/suspend-strawman-2/scheduler.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package concurrent
22

33
/** A hypothetical task scheduler trait */
44
trait Scheduler:
5-
def schedule(task: Runnable): Unit = ???
5+
def schedule(task: Runnable): Unit = task.run()
66

77
object Scheduler extends Scheduler:
88
given fromAsyncConfig(using ac: Async.Config): Scheduler = ac.scheduler

tests/run/suspend-strawman-2/simple-futures.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package simpleFutures
22

33
import scala.collection.mutable.ListBuffer
44
import scala.util.boundary, boundary.Label
5-
import runtime.*
5+
import runtime.suspend
66
import concurrent.Scheduler
77

88
trait Async:

0 commit comments

Comments
 (0)