Skip to content

Commit 826ca50

Browse files
authored
Merge pull request #105 from dengziming/ISSUE-104-mapresult
ISSUE-104: remove unnecessary mapResult
2 parents 6c26580 + 3a2883d commit 826ca50

File tree

1 file changed

+36
-54
lines changed

1 file changed

+36
-54
lines changed

core/src/main/scala/scala/collection/parallel/ParIterableLike.scala

Lines changed: 36 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ self =>
359359
* if this $coll is empty.
360360
*/
361361
def reduce[U >: T](op: (U, U) => U): U = {
362-
tasksupport.executeAndWaitResult(new Reduce(op, splitter) mapResult { _.get })
362+
tasksupport.executeAndWaitResult(new Reduce(op, splitter)).get
363363
}
364364

365365
/** Optionally reduces the elements of this sequence using the specified associative binary operator.
@@ -464,11 +464,11 @@ self =>
464464
}
465465

466466
def min[U >: T](implicit ord: Ordering[U]): T = {
467-
tasksupport.executeAndWaitResult(new Min(ord, splitter) mapResult { _.get }).asInstanceOf[T]
467+
tasksupport.executeAndWaitResult(new Min(ord, splitter)).get.asInstanceOf[T]
468468
}
469469

470470
def max[U >: T](implicit ord: Ordering[U]): T = {
471-
tasksupport.executeAndWaitResult(new Max(ord, splitter) mapResult { _.get }).asInstanceOf[T]
471+
tasksupport.executeAndWaitResult(new Max(ord, splitter)).get.asInstanceOf[T]
472472
}
473473

474474
def maxBy[S](f: T => S)(implicit cmp: Ordering[S]): T = {
@@ -484,15 +484,15 @@ self =>
484484
}
485485

486486
def map[S](f: T => S): CC[S] = {
487-
tasksupport.executeAndWaitResult(new Map[S, CC[S]](f, combinerFactory(() => companion.newCombiner[S]), splitter) mapResult { _.resultWithTaskSupport })
487+
tasksupport.executeAndWaitResult(new Map[S, CC[S]](f, combinerFactory(() => companion.newCombiner[S]), splitter)).resultWithTaskSupport
488488
}
489489

490490
def collect[S](pf: PartialFunction[T, S]): CC[S] = {
491-
tasksupport.executeAndWaitResult(new Collect[S, CC[S]](pf, combinerFactory(() => companion.newCombiner[S]), splitter) mapResult { _.resultWithTaskSupport })
491+
tasksupport.executeAndWaitResult(new Collect[S, CC[S]](pf, combinerFactory(() => companion.newCombiner[S]), splitter)).resultWithTaskSupport
492492
}
493493

494494
def flatMap[S](f: T => IterableOnce[S]): CC[S] = {
495-
tasksupport.executeAndWaitResult(new FlatMap[S, CC[S]](f, combinerFactory(() => companion.newCombiner[S]), splitter) mapResult { _.resultWithTaskSupport })
495+
tasksupport.executeAndWaitResult(new FlatMap[S, CC[S]](f, combinerFactory(() => companion.newCombiner[S]), splitter)).resultWithTaskSupport
496496
}
497497

498498
/** Tests whether a predicate holds for all elements of this $coll.
@@ -572,11 +572,11 @@ self =>
572572
def withFilter(pred: T => Boolean): Repr = filter(pred)
573573

574574
def filter(pred: T => Boolean): Repr = {
575-
tasksupport.executeAndWaitResult(new Filter(pred, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
575+
tasksupport.executeAndWaitResult(new Filter(pred, combinerFactory, splitter)).resultWithTaskSupport
576576
}
577577

578578
def filterNot(pred: T => Boolean): Repr = {
579-
tasksupport.executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
579+
tasksupport.executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter)).resultWithTaskSupport
580580
}
581581

582582
def ++[U >: T](that: IterableOnce[U]): CC[U] = that match {
@@ -588,10 +588,8 @@ self =>
588588
val othtask = new other.Copy(cfactory, other.splitter)
589589
tasksupport.executeAndWaitResult(othtask)
590590
}
591-
val task = (copythis parallel copythat) { _ combine _ } mapResult {
592-
_.resultWithTaskSupport
593-
}
594-
tasksupport.executeAndWaitResult(task)
591+
val task = (copythis parallel copythat) { _ combine _ }
592+
tasksupport.executeAndWaitResult(task).resultWithTaskSupport
595593
case _ =>
596594
// println("case parallel builder, `that` not parallel")
597595
val copythis = new Copy(combinerFactory(() => companion.newCombiner[U]), splitter)
@@ -600,15 +598,13 @@ self =>
600598
cb ++= that
601599
cb
602600
}
603-
tasksupport.executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.resultWithTaskSupport })
601+
tasksupport.executeAndWaitResult((copythis parallel copythat) { _ combine _ }).resultWithTaskSupport
604602
}
605603

606604
def partition(pred: T => Boolean): (Repr, Repr) = {
607-
tasksupport.executeAndWaitResult(
608-
new Partition(pred, combinerFactory, combinerFactory, splitter) mapResult {
609-
p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport)
610-
}
611-
)
605+
val result = tasksupport.executeAndWaitResult(
606+
new Partition(pred, combinerFactory, combinerFactory, splitter))
607+
(result._1.resultWithTaskSupport, result._2.resultWithTaskSupport)
612608
}
613609

614610
def groupBy[K](f: T => K): immutable.ParMap[K, Repr] = {
@@ -621,9 +617,7 @@ self =>
621617
def take(n: Int): Repr = {
622618
val actualn = if (size > n) n else size
623619
if (actualn < MIN_FOR_COPY) take_sequential(actualn)
624-
else tasksupport.executeAndWaitResult(new Take(actualn, combinerFactory, splitter) mapResult {
625-
_.resultWithTaskSupport
626-
})
620+
else tasksupport.executeAndWaitResult(new Take(actualn, combinerFactory, splitter)).resultWithTaskSupport
627621
}
628622

629623
private def take_sequential(n: Int) = {
@@ -641,7 +635,7 @@ self =>
641635
def drop(n: Int): Repr = {
642636
val actualn = if (size > n) n else size
643637
if ((size - actualn) < MIN_FOR_COPY) drop_sequential(actualn)
644-
else tasksupport.executeAndWaitResult(new Drop(actualn, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
638+
else tasksupport.executeAndWaitResult(new Drop(actualn, combinerFactory, splitter)).resultWithTaskSupport
645639
}
646640

647641
private def drop_sequential(n: Int) = {
@@ -656,7 +650,7 @@ self =>
656650
val from = unc_from min size max 0
657651
val until = unc_until min size max from
658652
if ((until - from) <= MIN_FOR_COPY) slice_sequential(from, until)
659-
else tasksupport.executeAndWaitResult(new Slice(from, until, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
653+
else tasksupport.executeAndWaitResult(new Slice(from, until, combinerFactory, splitter)).resultWithTaskSupport
660654
}
661655

662656
private def slice_sequential(from: Int, until: Int): Repr = {
@@ -671,11 +665,9 @@ self =>
671665
}
672666

673667
def splitAt(n: Int): (Repr, Repr) = {
674-
tasksupport.executeAndWaitResult(
675-
new SplitAt(n, combinerFactory, combinerFactory, splitter) mapResult {
676-
p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport)
677-
}
678-
)
668+
val result = tasksupport.executeAndWaitResult(
669+
new SplitAt(n, combinerFactory, combinerFactory, splitter))
670+
(result._1.resultWithTaskSupport, result._2.resultWithTaskSupport)
679671
}
680672

681673
/** Computes a prefix scan of the elements of the collection.
@@ -689,9 +681,7 @@ self =>
689681
*/
690682
def scan[U >: T](z: U)(op: (U, U) => U): CC[U] = {
691683
if (size > 0) tasksupport.executeAndWaitResult(new CreateScanTree(0, size, z, op, splitter) mapResult {
692-
tree => tasksupport.executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => companion.newCombiner[U])) mapResult {
693-
cb => cb.resultWithTaskSupport
694-
})
684+
tree => tasksupport.executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => companion.newCombiner[U]))).resultWithTaskSupport
695685
}) else setTaskSupport((companion.newCombiner[U] += z).result(), tasksupport)
696686
}
697687

@@ -711,15 +701,12 @@ self =>
711701
val cbf = combinerFactory
712702
if (cbf.doesShareCombiners) {
713703
val parseqspan = toSeq.takeWhile(pred)
714-
tasksupport.executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter) mapResult {
715-
_.resultWithTaskSupport
716-
})
704+
tasksupport.executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter)).resultWithTaskSupport
717705
} else {
718706
val cntx = new DefaultSignalling with AtomicIndexFlag
719707
cntx.setIndexFlag(Int.MaxValue)
720-
tasksupport.executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx) mapResult {
721-
_._1.resultWithTaskSupport
722-
})
708+
val result = tasksupport.executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx))
709+
result._1.resultWithTaskSupport
723710
}
724711
}
725712

@@ -736,18 +723,18 @@ self =>
736723
val cbf = combinerFactory
737724
if (cbf.doesShareCombiners) {
738725
val (xs, ys) = toSeq.span(pred)
739-
val copyxs = new Copy(combinerFactory, xs.splitter) mapResult { _.resultWithTaskSupport }
740-
val copyys = new Copy(combinerFactory, ys.splitter) mapResult { _.resultWithTaskSupport }
726+
val copyxs = new Copy(combinerFactory, xs.splitter)
727+
val copyys = new Copy(combinerFactory, ys.splitter)
741728
val copyall = (copyxs parallel copyys) {
742729
(xr, yr) => (xr, yr)
743730
}
744-
tasksupport.executeAndWaitResult(copyall)
731+
val result = tasksupport.executeAndWaitResult(copyall)
732+
(result._1.resultWithTaskSupport, result._2.resultWithTaskSupport)
745733
} else {
746734
val cntx = new DefaultSignalling with AtomicIndexFlag
747735
cntx.setIndexFlag(Int.MaxValue)
748-
tasksupport.executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult {
749-
p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport)
750-
})
736+
val result = tasksupport.executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx))
737+
(result._1.resultWithTaskSupport, result._2.resultWithTaskSupport)
751738
}
752739
}
753740

@@ -765,10 +752,7 @@ self =>
765752
val cntx = new DefaultSignalling with AtomicIndexFlag
766753
cntx.setIndexFlag(Int.MaxValue)
767754
tasksupport.executeAndWaitResult(
768-
new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult {
769-
_._2.resultWithTaskSupport
770-
}
771-
)
755+
new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx))._2.resultWithTaskSupport
772756
}
773757

774758
def copyToArray[U >: T](xs: Array[U]): Unit = copyToArray(xs, 0)
@@ -785,7 +769,7 @@ self =>
785769
def zip[U >: T, S](that: ParIterable[S]): CC[(U, S)] = {
786770
that match {
787771
case thatseq: ParSeq[S] =>
788-
tasksupport.executeAndWaitResult(new Zip(combinerFactory(() => companion.newCombiner[(U, S)]), splitter, thatseq.splitter) mapResult { _.resultWithTaskSupport })
772+
tasksupport.executeAndWaitResult(new Zip(combinerFactory(() => companion.newCombiner[(U, S)]), splitter, thatseq.splitter)).resultWithTaskSupport
789773
case _ =>
790774
(companion.newBuilder[(U, S)] ++= setTaskSupport(seq.zip(that.seq), tasksupport)).result()
791775
}
@@ -806,18 +790,16 @@ self =>
806790
def zipAll[S, U >: T](that: ParIterable[S], thisElem: U, thatElem: S): CC[(U, S)] = {
807791
val thatseq = that.toSeq
808792
tasksupport.executeAndWaitResult(
809-
new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => companion.newCombiner[(U, S)]), splitter, thatseq.splitter) mapResult {
810-
_.resultWithTaskSupport
811-
}
812-
)
793+
new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => companion.newCombiner[(U, S)]), splitter, thatseq.splitter)
794+
).resultWithTaskSupport
813795
}
814796

815797
protected def toParCollection[U >: T, That](cbf: () => Combiner[U, That]): That = {
816-
tasksupport.executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter) mapResult { _.resultWithTaskSupport })
798+
tasksupport.executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter)).resultWithTaskSupport
817799
}
818800

819801
protected def toParMap[K, V, That](cbf: () => Combiner[(K, V), That])(implicit ev: T <:< (K, V)): That = {
820-
tasksupport.executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev) mapResult { _.resultWithTaskSupport })
802+
tasksupport.executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev)).resultWithTaskSupport
821803
}
822804

823805
def toArray[U >: T: ClassTag]: Array[U] = {

0 commit comments

Comments
 (0)