Skip to content

Commit 8694663

Browse files
author
Aleksandar Prokopec
committed
Fix some issues in parallel Ctrie.
This change resolves some issues with ParCtrie splitters and their `remaining` method, which currently evaluates the size of the Ctrie. Since this is still not done lazily, nor in parallel, it has a certain cost, which is unacceptable. Change #1: The `shouldSplitFurther` method is by default implemented by calling the `remaining` method. This method now forwards the call to the same method in the splitter which is by default implemented in the same way as before, but can be overridden by custom collections such as the ParCtrie. Change #2: ParCtrie splitter now has a `level` member which just counts how many times the method has been split. This information is used to override the default `shouldSplitFurther` implementation. Change #3: The tasks and splitters rely heavily on the `remaining` method in the splitter for most operations. There is an additional method called `isRemainingCheap` which returns true by default, but can be overridden by custom collections such as the `Ctrie`.
1 parent 2d9dfe3 commit 8694663

File tree

6 files changed

+75
-46
lines changed

6 files changed

+75
-46
lines changed

src/library/scala/collection/mutable/Ctrie.scala

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ package mutable
1313

1414
import java.util.concurrent.atomic._
1515
import collection.immutable.{ ListMap => ImmutableListMap }
16+
import collection.parallel.mutable.ParCtrie
1617
import generic._
1718
import annotation.tailrec
1819
import annotation.switch
@@ -578,13 +579,16 @@ private[mutable] case class RDCSS_Descriptor[K, V](old: INode[K, V], expectedmai
578579
* iterator and clear operations. The cost of evaluating the (lazy) snapshot is
579580
* distributed across subsequent updates, thus making snapshot evaluation horizontally scalable.
580581
*
582+
* For details, see: http://lampwww.epfl.ch/~prokopec/ctries-snapshot.pdf
583+
*
581584
* @author Aleksandar Prokopec
582585
* @since 2.10
583586
*/
584587
@SerialVersionUID(0L - 6402774413839597105L)
585588
final class Ctrie[K, V] private (r: AnyRef, rtupd: AtomicReferenceFieldUpdater[Ctrie[K, V], AnyRef])
586589
extends ConcurrentMap[K, V]
587590
with MapLike[K, V, Ctrie[K, V]]
591+
with CustomParallelizable[(K, V), ParCtrie[K, V]]
588592
with Serializable
589593
{
590594
import Ctrie.computeHash
@@ -710,6 +714,10 @@ extends ConcurrentMap[K, V]
710714

711715
/* public methods */
712716

717+
override def seq = this
718+
719+
override def par = new ParCtrie(this)
720+
713721
override def empty: Ctrie[K, V] = new Ctrie[K, V]
714722

715723
@inline final def isReadOnly = rootupdater eq null
@@ -820,7 +828,7 @@ extends ConcurrentMap[K, V]
820828

821829
def iterator: Iterator[(K, V)] =
822830
if (nonReadOnly) readOnlySnapshot().iterator
823-
else new CtrieIterator(this)
831+
else new CtrieIterator(0, this)
824832

825833
override def stringPrefix = "Ctrie"
826834

@@ -844,7 +852,7 @@ object Ctrie extends MutableMapFactory[Ctrie] {
844852
}
845853

846854

847-
private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] {
855+
private[collection] class CtrieIterator[K, V](var level: Int, ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] {
848856
var stack = new Array[Array[BasicNode]](7)
849857
var stackpos = new Array[Int](7)
850858
var depth = -1
@@ -910,7 +918,7 @@ private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean
910918
}
911919
} else current = null
912920

913-
protected def newIterator(_ct: Ctrie[K, V], _mustInit: Boolean) = new CtrieIterator[K, V](_ct, _mustInit)
921+
protected def newIterator(_lev: Int, _ct: Ctrie[K, V], _mustInit: Boolean) = new CtrieIterator[K, V](_lev, _ct, _mustInit)
914922

915923
/** Returns a sequence of iterators over subsets of this iterator.
916924
* It's used to ease the implementation of splitters for a parallel version of the Ctrie.
@@ -920,24 +928,30 @@ private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean
920928
val it = subiter
921929
subiter = null
922930
advance()
931+
this.level += 1
923932
Seq(it, this)
924-
} else if (depth == -1) Seq(this) else {
933+
} else if (depth == -1) {
934+
this.level += 1
935+
Seq(this)
936+
} else {
925937
var d = 0
926938
while (d <= depth) {
927939
val rem = stack(d).length - 1 - stackpos(d)
928940
if (rem > 0) {
929941
val (arr1, arr2) = stack(d).drop(stackpos(d) + 1).splitAt(rem / 2)
930942
stack(d) = arr1
931943
stackpos(d) = -1
932-
val it = newIterator(ct, false)
944+
val it = newIterator(level + 1, ct, false)
933945
it.stack(0) = arr2
934946
it.stackpos(0) = -1
935947
it.depth = 0
936948
it.advance() // <-- fix it
949+
this.level += 1
937950
return Seq(this, it)
938951
}
939952
d += 1
940953
}
954+
this.level += 1
941955
Seq(this)
942956
}
943957

src/library/scala/collection/parallel/ParIterableLike.scala

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,6 @@ import annotation.unchecked.uncheckedVariance
9696
* The combination of methods `toMap`, `toSeq` or `toSet` along with `par` and `seq` is a flexible
9797
* way to change between different collection types.
9898
*
99-
* The method:
100-
*
101-
* {{{
102-
* def threshold(sz: Int, p: Int): Int
103-
* }}}
104-
*
105-
* provides an estimate on the minimum number of elements the collection has before
106-
* the splitting stops and depends on the number of elements in the collection. A rule of the
107-
* thumb is the number of elements divided by 8 times the parallelism level. This method may
108-
* be overridden in concrete implementations if necessary.
109-
*
11099
* Since this trait extends the `Iterable` trait, methods like `size` must also
111100
* be implemented in concrete collections, while `iterator` forwards to `splitter` by
112101
* default.
@@ -206,18 +195,6 @@ self: ParIterableLike[T, Repr, Sequential] =>
206195
*/
207196
def isStrictSplitterCollection = true
208197

209-
/** Some minimal number of elements after which this collection should be handled
210-
* sequentially by different processors.
211-
*
212-
* This method depends on the size of the collection and the parallelism level, which
213-
* are both specified as arguments.
214-
*
215-
* @param sz the size based on which to compute the threshold
216-
* @param p the parallelism level based on which to compute the threshold
217-
* @return the maximum number of elements for performing operations sequentially
218-
*/
219-
def threshold(sz: Int, p: Int): Int = thresholdFromSize(sz, p)
220-
221198
/** The `newBuilder` operation returns a parallel builder assigned to this collection's fork/join pool.
222199
* This method forwards the call to `newCombiner`.
223200
*/
@@ -833,7 +810,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
833810
extends StrictSplitterCheckTask[R, Tp] {
834811
protected[this] val pit: IterableSplitter[T]
835812
protected[this] def newSubtask(p: IterableSplitter[T]): Accessor[R, Tp]
836-
def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel)
813+
def shouldSplitFurther = pit.shouldSplitFurther(self.repr, parallelismLevel)
837814
def split = pit.splitWithSignalling.map(newSubtask(_)) // default split procedure
838815
private[parallel] override def signalAbort = pit.abort
839816
override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")(supername: " + super.toString + ")"
@@ -1362,7 +1339,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
13621339

13631340
/* scan tree */
13641341

1365-
protected[this] def scanBlockSize = (threshold(size, parallelismLevel) / 2) max 1
1342+
protected[this] def scanBlockSize = (thresholdFromSize(size, parallelismLevel) / 2) max 1
13661343

13671344
protected[this] trait ScanTree[U >: T] {
13681345
def beginsAt: Int

src/library/scala/collection/parallel/RemainsIterator.scala

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ private[collection] trait RemainsIterator[+T] extends Iterator[T] {
2828
* This method doesn't change the state of the iterator.
2929
*/
3030
def remaining: Int
31+
32+
/** For most collections, this is a cheap operation.
33+
* Exceptions can override this method.
34+
*/
35+
def isRemainingCheap = true
3136
}
3237

3338

@@ -112,7 +117,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
112117

113118
def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
114119
//val cb = pbf(repr)
115-
cb.sizeHint(remaining)
120+
if (isRemainingCheap) cb.sizeHint(remaining)
116121
while (hasNext) cb += f(next)
117122
cb
118123
}
@@ -137,7 +142,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
137142
}
138143

139144
def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](b: Bld): Bld = {
140-
b.sizeHint(remaining)
145+
if (isRemainingCheap) b.sizeHint(remaining)
141146
while (hasNext) b += next
142147
b
143148
}
@@ -179,7 +184,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
179184

180185
def drop2combiner[U >: T, This](n: Int, cb: Combiner[U, This]): Combiner[U, This] = {
181186
drop(n)
182-
cb.sizeHint(remaining)
187+
if (isRemainingCheap) cb.sizeHint(remaining)
183188
while (hasNext) cb += next
184189
cb
185190
}
@@ -197,7 +202,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
197202

198203
def splitAt2combiners[U >: T, This](at: Int, before: Combiner[U, This], after: Combiner[U, This]) = {
199204
before.sizeHint(at)
200-
after.sizeHint(remaining - at)
205+
if (isRemainingCheap) after.sizeHint(remaining - at)
201206
var left = at
202207
while (left > 0) {
203208
before += next
@@ -223,7 +228,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
223228
val curr = next
224229
if (p(curr)) before += curr
225230
else {
226-
after.sizeHint(remaining + 1)
231+
if (isRemainingCheap) after.sizeHint(remaining + 1)
227232
after += curr
228233
isBefore = false
229234
}
@@ -263,15 +268,15 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
263268
}
264269

265270
def zip2combiner[U >: T, S, That](otherpit: RemainsIterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
266-
cb.sizeHint(remaining min otherpit.remaining)
271+
if (isRemainingCheap && otherpit.isRemainingCheap) cb.sizeHint(remaining min otherpit.remaining)
267272
while (hasNext && otherpit.hasNext) {
268273
cb += ((next, otherpit.next))
269274
}
270275
cb
271276
}
272277

273278
def zipAll2combiner[U >: T, S, That](that: RemainsIterator[S], thiselem: U, thatelem: S, cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
274-
cb.sizeHint(remaining max that.remaining)
279+
if (isRemainingCheap && that.isRemainingCheap) cb.sizeHint(remaining max that.remaining)
275280
while (this.hasNext && that.hasNext) cb += ((this.next, that.next))
276281
while (this.hasNext) cb += ((this.next, thatelem))
277282
while (that.hasNext) cb += ((thiselem, that.next))
@@ -330,7 +335,7 @@ private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIter
330335
/* transformers */
331336

332337
def reverse2combiner[U >: T, This](cb: Combiner[U, This]): Combiner[U, This] = {
333-
cb.sizeHint(remaining)
338+
if (isRemainingCheap) cb.sizeHint(remaining)
334339
var lst = List[T]()
335340
while (hasNext) lst ::= next
336341
while (lst != Nil) {
@@ -342,7 +347,7 @@ private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIter
342347

343348
def reverseMap2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
344349
//val cb = cbf(repr)
345-
cb.sizeHint(remaining)
350+
if (isRemainingCheap) cb.sizeHint(remaining)
346351
var lst = List[S]()
347352
while (hasNext) lst ::= f(next)
348353
while (lst != Nil) {
@@ -354,7 +359,7 @@ private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIter
354359

355360
def updated2combiner[U >: T, That](index: Int, elem: U, cb: Combiner[U, That]): Combiner[U, That] = {
356361
//val cb = cbf(repr)
357-
cb.sizeHint(remaining)
362+
if (isRemainingCheap) cb.sizeHint(remaining)
358363
var j = 0
359364
while (hasNext) {
360365
if (j == index) {
@@ -395,6 +400,8 @@ self =>
395400
pits
396401
}
397402

403+
def shouldSplitFurther[S](coll: ParIterable[S], parallelismLevel: Int) = remaining > thresholdFromSize(coll.size, parallelismLevel)
404+
398405
/** The number of elements this iterator has yet to traverse. This method
399406
* doesn't change the state of the iterator.
400407
*

src/library/scala/collection/parallel/mutable/ParCtrie.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.collection.mutable.CtrieIterator
2727
* @author Aleksandar Prokopec
2828
* @since 2.10
2929
*/
30-
final class ParCtrie[K, V] private[mutable] (private val ctrie: Ctrie[K, V])
30+
final class ParCtrie[K, V] private[collection] (private val ctrie: Ctrie[K, V])
3131
extends ParMap[K, V]
3232
with GenericParMapTemplate[K, V, ParCtrie]
3333
with ParMapLike[K, V, ParCtrie[K, V], Ctrie[K, V]]
@@ -45,7 +45,7 @@ extends ParMap[K, V]
4545

4646
override def seq = ctrie
4747

48-
def splitter = new ParCtrieSplitter(ctrie.readOnlySnapshot().asInstanceOf[Ctrie[K, V]], true)
48+
def splitter = new ParCtrieSplitter(0, ctrie.readOnlySnapshot().asInstanceOf[Ctrie[K, V]], true)
4949

5050
override def size = ctrie.size
5151

@@ -76,15 +76,21 @@ extends ParMap[K, V]
7676
}
7777

7878

79-
private[collection] class ParCtrieSplitter[K, V](ct: Ctrie[K, V], mustInit: Boolean)
80-
extends CtrieIterator[K, V](ct, mustInit)
79+
private[collection] class ParCtrieSplitter[K, V](lev: Int, ct: Ctrie[K, V], mustInit: Boolean)
80+
extends CtrieIterator[K, V](lev, ct, mustInit)
8181
with IterableSplitter[(K, V)]
8282
{
8383
// only evaluated if `remaining` is invoked (which is not used by most tasks)
84-
lazy val totalsize = ct.iterator.size // TODO improve to lazily compute sizes
84+
//lazy val totalsize = ct.iterator.size /* TODO improve to lazily compute sizes */
85+
def totalsize: Int = throw new UnsupportedOperationException
8586
var iterated = 0
8687

87-
protected override def newIterator(_ct: Ctrie[K, V], _mustInit: Boolean) = new ParCtrieSplitter[K, V](_ct, _mustInit)
88+
protected override def newIterator(_lev: Int, _ct: Ctrie[K, V], _mustInit: Boolean) = new ParCtrieSplitter[K, V](_lev, _ct, _mustInit)
89+
90+
override def shouldSplitFurther[S](coll: collection.parallel.ParIterable[S], parallelismLevel: Int) = {
91+
val maxsplits = 3 + Integer.highestOneBit(parallelismLevel)
92+
level < maxsplits
93+
}
8894

8995
def dup = null // TODO necessary for views
9096

@@ -95,6 +101,8 @@ extends CtrieIterator[K, V](ct, mustInit)
95101

96102
def split: Seq[IterableSplitter[(K, V)]] = subdivide().asInstanceOf[Seq[IterableSplitter[(K, V)]]]
97103

104+
override def isRemainingCheap = false
105+
98106
def remaining: Int = totalsize - iterated
99107
}
100108

test/benchmarking/ParCtrie-map.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
2+
3+
4+
import collection.parallel.mutable.ParCtrie
5+
6+
7+
8+
object Map extends testing.Benchmark {
9+
val length = sys.props("length").toInt
10+
val par = sys.props("par").toInt
11+
val parctrie = ParCtrie((0 until length) zip (0 until length): _*)
12+
13+
collection.parallel.ForkJoinTasks.defaultForkJoinPool.setParallelism(par)
14+
15+
def run = {
16+
parctrie map {
17+
kv => kv
18+
}
19+
}
20+
}
21+

test/benchmarking/TreeSetInsert.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ object JavaUtilTS extends testing.Benchmark {
3333
}
3434
}
3535

36+
3637
object MutableTS extends testing.Benchmark {
3738
val length = sys.props("length").toInt
3839
var data: Array[Dummy] = (0 until length) map { a => new Dummy(a) } toArray
@@ -50,6 +51,7 @@ object MutableTS extends testing.Benchmark {
5051
}
5152
}
5253

54+
5355
object ImmutableTS extends testing.Benchmark {
5456
val length = sys.props("length").toInt
5557
var data: Array[Dummy] = (0 until length) map { a => new Dummy(a) } toArray

0 commit comments

Comments
 (0)