Skip to content

Commit d4ece1a

Browse files
Merge pull request ReactiveX#573 from akarnokd/OpeningClosingRemoved
Removed Opening and Closing historical artifacts.
2 parents caf6a82 + 5487f4e commit d4ece1a

File tree

12 files changed

+106
-269
lines changed

12 files changed

+106
-269
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ trait Observable[+T]
7575
import scala.collection.Seq
7676
import scala.concurrent.duration.{Duration, TimeUnit}
7777
import rx.util.functions._
78-
import rx.lang.scala.util._
7978
import rx.lang.scala.observables.BlockingObservable
8079
import ImplicitFunctionConversions._
8180
import JavaConversions._
@@ -302,45 +301,44 @@ trait Observable[+T]
302301
* Creates an Observable which produces buffers of collected values.
303302
*
304303
* This Observable produces connected non-overlapping buffers. The current buffer is
305-
* emitted and replaced with a new buffer when the Observable produced by the specified function produces a [[rx.lang.scala.util.Closing]] object. The function will then
304+
* emitted and replaced with a new buffer when the Observable produced by the specified function produces an object. The function will then
306305
* be used to create a new Observable to listen for the end of the next buffer.
307306
*
308307
* @param closings
309308
* The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created.
310-
* When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated buffer
309+
* When this [[rx.lang.scala.Observable]] produces an object, the associated buffer
311310
* is emitted and replaced with a new one.
312311
* @return
313312
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers, which are emitted
314-
* when the current [[rx.lang.scala.Observable]] created with the function argument produces a [[rx.lang.scala.util.Closing]] object.
313+
* when the current [[rx.lang.scala.Observable]] created with the function argument produces an object.
315314
*/
316-
def buffer(closings: () => Observable[Closing]) : Observable[Seq[T]] = {
315+
def buffer[Closing](closings: () => Observable[_ <: Closing]) : Observable[Seq[T]] = {
317316
val f: Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable
318-
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(f)
317+
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Closing](f)
319318
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
320319
}
321-
322320
/**
323321
* Creates an Observable which produces buffers of collected values.
324322
*
325323
* This Observable produces buffers. Buffers are created when the specified `openings`
326-
* Observable produces a [[rx.lang.scala.util.Opening]] object. Additionally the function argument
324+
* Observable produces an object. Additionally the function argument
327325
* is used to create an Observable which produces [[rx.lang.scala.util.Closing]] objects. When this
328326
* Observable produces such an object, the associated buffer is emitted.
329327
*
330328
* @param openings
331-
* The [[rx.lang.scala.Observable]] which, when it produces a [[rx.lang.scala.util.Opening]] object, will cause
329+
* The [[rx.lang.scala.Observable]] which, when it produces an object, will cause
332330
* another buffer to be created.
333331
* @param closings
334332
* The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created.
335-
* When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated buffer
333+
* When this [[rx.lang.scala.Observable]] produces an object, the associated buffer
336334
* is emitted.
337335
* @return
338336
* An [[rx.lang.scala.Observable]] which produces buffers which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
339337
*/
340-
def buffer(openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = {
338+
def buffer[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = {
341339
val opening: rx.Observable[_ <: Opening] = openings.asJavaObservable
342-
val closing: Func1[Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable
343-
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(opening, closing)
340+
val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable
341+
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Closing](opening, closing)
344342
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
345343
}
346344

@@ -512,22 +510,22 @@ trait Observable[+T]
512510
/**
513511
* Creates an Observable which produces windows of collected values. This Observable produces connected
514512
* non-overlapping windows. The current window is emitted and replaced with a new window when the
515-
* Observable produced by the specified function produces a [[rx.lang.scala.util.Closing]] object.
513+
* Observable produced by the specified function produces an object.
516514
* The function will then be used to create a new Observable to listen for the end of the next
517515
* window.
518516
*
519517
* @param closings
520518
* The function which is used to produce an [[rx.lang.scala.Observable]] for every window created.
521-
* When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated window
519+
* When this [[rx.lang.scala.Observable]] produces an object, the associated window
522520
* is emitted and replaced with a new one.
523521
* @return
524522
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows, which are emitted
525-
* when the current [[rx.lang.scala.Observable]] created with the function argument produces a [[rx.lang.scala.util.Closing]] object.
523+
* when the current [[rx.lang.scala.Observable]] created with the function argument produces an object.
526524
*/
527-
def window(closings: () => Observable[Closing]): Observable[Observable[T]] = {
525+
def window[Closing](closings: () => Observable[Closing]): Observable[Observable[T]] = {
528526
val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable
529-
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window(func)
530-
val o2 = toScalaObservable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => {
527+
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Closing](func)
528+
val o2 = Observable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => {
531529
val x2 = x.asInstanceOf[rx.Observable[_ <: T]]
532530
toScalaObservable[T](x2)
533531
})
@@ -536,23 +534,23 @@ trait Observable[+T]
536534

537535
/**
538536
* Creates an Observable which produces windows of collected values. This Observable produces windows.
539-
* Chunks are created when the specified `openings` Observable produces a [[rx.lang.scala.util.Opening]] object.
537+
* Chunks are created when the specified `openings` Observable produces an object.
540538
* Additionally the `closings` argument is used to create an Observable which produces [[rx.lang.scala.util.Closing]] objects.
541539
* When this Observable produces such an object, the associated window is emitted.
542540
*
543541
* @param openings
544-
* The [[rx.lang.scala.Observable]] which when it produces a [[rx.lang.scala.util.Opening]] object, will cause
542+
* The [[rx.lang.scala.Observable]] which when it produces an object, will cause
545543
* another window to be created.
546544
* @param closings
547545
* The function which is used to produce an [[rx.lang.scala.Observable]] for every window created.
548-
* When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated window
546+
* When this [[rx.lang.scala.Observable]] produces an object, the associated window
549547
* is emitted.
550548
* @return
551549
* An [[rx.lang.scala.Observable]] which produces windows which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
552550
*/
553-
def window(openings: Observable[Opening], closings: Opening => Observable[Closing]) = {
551+
def window[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]) = {
554552
Observable.jObsOfJObsToScObsOfScObs(
555-
asJavaObservable.window(openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
553+
asJavaObservable.window[Opening, Closing](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
556554
: Observable[Observable[T]] // SI-7818
557555
}
558556

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/util/package.scala

Lines changed: 0 additions & 49 deletions
This file was deleted.

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,7 @@
111111
import rx.subjects.ReplaySubject;
112112
import rx.subjects.Subject;
113113
import rx.subscriptions.Subscriptions;
114-
import rx.util.Closing;
115114
import rx.util.OnErrorNotImplementedException;
116-
import rx.util.Opening;
117115
import rx.util.Range;
118116
import rx.util.TimeInterval;
119117
import rx.util.Timestamped;
@@ -2812,31 +2810,31 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLates
28122810
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, o9, combineFunction));
28132811
}
28142812

2815-
/**
2813+
/**
28162814
* Creates an Observable that produces buffers of collected items.
28172815
* <p>
28182816
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/buffer1.png">
28192817
* <p>
28202818
* This Observable produces connected, non-overlapping buffers. The current
28212819
* buffer is emitted and replaced with a new buffer when the Observable
2822-
* produced by the specified <code>bufferClosingSelector</code> produces a
2823-
* {@link rx.util.Closing} object. The <code>bufferClosingSelector</code>
2820+
* produced by the specified <code>bufferClosingSelector</code> produces an
2821+
* object. The <code>bufferClosingSelector</code>
28242822
* will then be used to create a new Observable to listen for the end of
28252823
* the next buffer.
28262824
*
28272825
* @param bufferClosingSelector the {@link Func0} which is used to produce
28282826
* an {@link Observable} for every buffer
28292827
* created. When this {@link Observable}
2830-
* produces a {@link rx.util.Closing} object,
2828+
* produces an object,
28312829
* the associated buffer is emitted and
28322830
* replaced with a new one.
28332831
* @return an {@link Observable} which produces connected, non-overlapping
28342832
* buffers, which are emitted when the current {@link Observable}
2835-
* created with the {@link Func0} argument produces a
2836-
* {@link rx.util.Closing} object
2833+
* created with the {@link Func0} argument produces an
2834+
* object
28372835
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#buffer">RxJava Wiki: buffer()</a>
28382836
*/
2839-
public Observable<List<T>> buffer(Func0<? extends Observable<? extends Closing>> bufferClosingSelector) {
2837+
public <TClosing> Observable<List<T>> buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector) {
28402838
return create(OperationBuffer.buffer(this, bufferClosingSelector));
28412839
}
28422840

@@ -2846,26 +2844,26 @@ public Observable<List<T>> buffer(Func0<? extends Observable<? extends Closing>>
28462844
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/buffer2.png">
28472845
* <p>
28482846
* This Observable produces buffers. Buffers are created when the specified
2849-
* <code>bufferOpenings</code> Observable produces a {@link rx.util.Opening}
2847+
* <code>bufferOpenings</code> Observable produces an
28502848
* object. Additionally the <code>bufferClosingSelector</code> argument is
2851-
* used to create an Observable which produces {@link rx.util.Closing}
2849+
* used to create an Observable which produces
28522850
* objects. When this Observable produces such an object, the associated
28532851
* buffer is emitted.
28542852
*
2855-
* @param bufferOpenings the {@link Observable} that, when it produces a
2856-
* {@link rx.util.Opening} object, will cause another
2853+
* @param bufferOpenings the {@link Observable} that, when it produces an
2854+
* object, will cause another
28572855
* buffer to be created
28582856
* @param bufferClosingSelector the {@link Func1} that is used to produce
28592857
* an {@link Observable} for every buffer
28602858
* created. When this {@link Observable}
2861-
* produces a {@link rx.util.Closing} object,
2859+
* produces an object,
28622860
* the associated buffer is emitted.
28632861
* @return an {@link Observable} that produces buffers that are created and
28642862
* emitted when the specified {@link Observable}s publish certain
28652863
* objects
28662864
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#buffer">RxJava Wiki: buffer()</a>
28672865
*/
2868-
public Observable<List<T>> buffer(Observable<? extends Opening> bufferOpenings, Func1<Opening, ? extends Observable<? extends Closing>> bufferClosingSelector) {
2866+
public <TOpening, TClosing> Observable<List<T>> buffer(Observable<? extends TOpening> bufferOpenings, Func1<? super TOpening, ? extends Observable<? extends TClosing>> bufferClosingSelector) {
28692867
return create(OperationBuffer.buffer(this, bufferOpenings, bufferClosingSelector));
28702868
}
28712869

@@ -3062,54 +3060,54 @@ public Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit,
30623060
* Creates an Observable that produces windows of collected items. This
30633061
* Observable produces connected, non-overlapping windows. The current
30643062
* window is emitted and replaced with a new window when the Observable
3065-
* produced by the specified <code>closingSelector</code> produces a
3066-
* {@link rx.util.Closing} object. The <code>closingSelector</code> will
3063+
* produced by the specified <code>closingSelector</code> produces an
3064+
* object. The <code>closingSelector</code> will
30673065
* then be used to create a new Observable to listen for the end of the next
30683066
* window.
30693067
* <p>
30703068
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window1.png">
30713069
*
30723070
* @param closingSelector the {@link Func0} used to produce an
30733071
* {@link Observable} for every window created. When this
3074-
* {@link Observable} emits a {@link rx.util.Closing} object, the
3072+
* {@link Observable} emits an object, the
30753073
* associated window is emitted and replaced with a new one.
30763074
* @return an {@link Observable} that produces connected, non-overlapping
30773075
* windows, which are emitted when the current {@link Observable}
3078-
* created with the <code>closingSelector</code> argument emits a
3079-
* {@link rx.util.Closing} object.
3076+
* created with the <code>closingSelector</code> argument emits an
3077+
* object.
30803078
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#window">RxJava Wiki: window()</a>
30813079
*/
3082-
public Observable<Observable<T>> window(Func0<? extends Observable<? extends Closing>> closingSelector) {
3080+
public <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector) {
30833081
return create(OperationWindow.window(this, closingSelector));
30843082
}
30853083

30863084
/**
30873085
* Creates an Observable that produces windows of collected items. This
30883086
* Observable produces windows. Chunks are created when the
3089-
* <code>windowOpenings</code> Observable produces a {@link rx.util.Opening}
3087+
* <code>windowOpenings</code> Observable produces an
30903088
* object. Additionally the <code>closingSelector</code> argument creates an
3091-
* Observable that produces {@link rx.util.Closing} objects. When this
3089+
* Observable that produces objects. When this
30923090
* Observable produces such an object, the associated window is emitted.
30933091
* <p>
30943092
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window2.png">
30953093
*
3096-
* @param windowOpenings the {@link Observable} that, when it produces a
3097-
* {@link rx.util.Opening} object, causes another
3094+
* @param windowOpenings the {@link Observable} that, when it produces an
3095+
* object, causes another
30983096
* window to be created
30993097
* @param closingSelector the {@link Func1} that produces an
31003098
* {@link Observable} for every window created. When
3101-
* this {@link Observable} produces a
3102-
* {@link rx.util.Closing} object, the associated
3099+
* this {@link Observable} produces an
3100+
* object, the associated
31033101
* window is emitted.
31043102
* @return an {@link Observable} that produces windows that are created and
31053103
* emitted when the specified {@link Observable}s publish certain
31063104
* objects
31073105
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#window">RxJava Wiki: window()</a>
31083106
*/
3109-
public Observable<Observable<T>> window(Observable<? extends Opening> windowOpenings, Func1<Opening, ? extends Observable<? extends Closing>> closingSelector) {
3107+
public <TOpening, TClosing> Observable<Observable<T>> window(Observable<? extends TOpening> windowOpenings, Func1<? super TOpening, ? extends Observable<? extends TClosing>> closingSelector) {
31103108
return create(OperationWindow.window(this, windowOpenings, closingSelector));
31113109
}
3112-
3110+
31133111
/**
31143112
* Creates an Observable that produces windows of collected items. This
31153113
* Observable produces connected, non-overlapping windows, each containing

0 commit comments

Comments
 (0)