1
1
package concurrent
2
2
import scala .collection .mutable , mutable .ListBuffer
3
- import scala .util .boundary .Label
3
+ import scala .util .boundary , boundary .Label
4
4
import runtime .suspend
5
+ import java .util .concurrent .CancellationException
5
6
import Async .{Listener , await }
6
7
7
8
/** An unbounded asynchronous channel. Senders do not wait for matching
@@ -96,6 +97,40 @@ object SyncChannel:
96
97
97
98
end SyncChannel
98
99
100
+ /** A simplistic coroutine. Error handling is still missing, */
101
+ class Coroutine (body : Async ?=> Unit )(using scheduler : Scheduler ) extends Cancellable :
102
+ private var children : mutable.ListBuffer [Cancellable ] = mutable.ListBuffer ()
103
+ @ volatile var cancelled = false
104
+
105
+ def cancel () =
106
+ cancelled = true
107
+ synchronized (children).foreach(_.cancel())
108
+
109
+ def addChild (child : Cancellable ) = synchronized :
110
+ children += child
111
+
112
+ boundary [Unit ]:
113
+ given Async = new Async .Impl (this , scheduler):
114
+ def checkCancellation () =
115
+ if cancelled then throw new CancellationException ()
116
+ try body
117
+ catch case ex : CancellationException => ()
118
+ end Coroutine
119
+
120
+ def TestChannel (using Scheduler ) =
121
+ val c = SyncChannel [Option [Int ]]()
122
+ Coroutine :
123
+ for i <- 0 to 100 do
124
+ c.send(Some (i))
125
+ c.send(None )
126
+ Coroutine :
127
+ var sum = 0
128
+ def loop (): Unit =
129
+ c.read() match
130
+ case Some (x) => sum += x; loop()
131
+ case None => println(sum)
132
+ loop()
133
+
99
134
def TestRace =
100
135
val c1, c2 = SyncChannel [Int ]()
101
136
val s = c1.canSend
@@ -114,3 +149,4 @@ def TestRace =
114
149
.map:
115
150
case Left (x) => - x
116
151
case Right (x) => x
152
+
0 commit comments