From 8233b04a09f43cee59ed1f618c3457f22d3b2e75 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Wed, 29 May 2013 23:32:59 +0200 Subject: [PATCH 1/7] Initial implementation and wiring of the buffer operators. --- rxjava-core/src/main/java/rx/Observable.java | 428 ++++++- .../java/rx/operators/OperationBuffer.java | 1013 +++++++++++++++++ 2 files changed, 1437 insertions(+), 4 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationBuffer.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e824b0f210..88f2590751 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,10 +15,6 @@ */ package rx; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -41,6 +37,9 @@ import rx.operators.AtomicObservableSubscription; import rx.operators.AtomicObserver; import rx.operators.OperationAll; +import rx.operators.OperationBuffer; +import rx.operators.OperationBuffer.BufferClosing; +import rx.operators.OperationBuffer.BufferOpening; import rx.operators.OperationCache; import rx.operators.OperationConcat; import rx.operators.OperationDefer; @@ -95,6 +94,17 @@ import rx.util.functions.Function; import rx.util.functions.FunctionLanguageAdaptor; import rx.util.functions.Functions; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; /** * The Observable interface that implements the Reactive Pattern. @@ -591,6 +601,221 @@ public Subscription call(Observer observer) { } + /** + * Creates an Observable which produces buffers of collected values. This Observable produces connected + * non-overlapping buffers. The current buffer is emitted and replaced with a new buffer when the + * Observable produced by the specified {@link Func0} produces a {@link BufferClosing} object. The + * {@link Func0} will then be used to create a new Observable to listen for the end of the next buffer. + * + * @param source + * The source {@link Observable} which produces values. + * @param bufferClosingSelector + * The {@link Func0} which is used to produce an {@link Observable} for every buffer created. + * When this {@link Observable} produces a {@link BufferClosing} object, the associated buffer + * is emitted and replaced with a new one. + * @return + * An {@link Observable} which produces connected non-overlapping buffers, which are emitted + * when the current {@link Observable} created with the {@link Func0} argument produces a + * {@link BufferClosing} object. + */ + public static Observable> buffer(Observable source, Func0> bufferClosingSelector) { + return create(OperationBuffer.buffer(source, bufferClosingSelector)); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces buffers. + * Buffers are created when the specified "bufferOpenings" Observable produces a {@link BufferOpening} object. + * Additionally the {@link Func0} argument is used to create an Observable which produces {@link BufferClosing} + * objects. When this Observable produces such an object, the associated buffer is emitted. + * + * @param source + * The source {@link Observable} which produces values. + * @param bufferOpenings + * The {@link Observable} which when it produces a {@link BufferOpening} object, will cause + * another buffer to be created. + * @param bufferClosingSelector + * The {@link Func0} which is used to produce an {@link Observable} for every buffer created. + * When this {@link Observable} produces a {@link BufferClosing} object, the associated buffer + * is emitted. + * @return + * An {@link Observable} which produces buffers which are created and emitted when the specified + * {@link Observable}s publish certain objects. + */ + public static Observable> buffer(Observable source, Observable bufferOpenings, Func1> bufferClosingSelector) { + return create(OperationBuffer.buffer(source, bufferOpenings, bufferClosingSelector)); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces connected + * non-overlapping buffers, each containing "count" elements. When the source Observable completes or + * encounters an error, the current buffer is emitted, and the event is propagated. + * + * @param source + * The source {@link Observable} which produces values. + * @param count + * The maximum size of each buffer before it should be emitted. + * @return + * An {@link Observable} which produces connected non-overlapping buffers containing at most + * "count" produced values. + */ + public static Observable> buffer(Observable source, int count) { + return create(OperationBuffer.buffer(source, count)); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces buffers every + * "skip" values, each containing "count" elements. When the source Observable completes or encounters an error, + * the current buffer is emitted and the event is propagated. + * + * @param source + * The source {@link Observable} which produces values. + * @param count + * The maximum size of each buffer before it should be emitted. + * @param skip + * How many produced values need to be skipped before starting a new buffer. Note that when "skip" and + * "count" are equals that this is the same operation as {@link Observable#buffer(Observable, int)}. + * @return + * An {@link Observable} which produces buffers every "skipped" values containing at most + * "count" produced values. + */ + public static Observable> buffer(Observable source, int count, int skip) { + return create(OperationBuffer.buffer(source, count, skip)); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces connected + * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument. When the source + * Observable completes or encounters an error, the current buffer is emitted and the event is propagated. + * + * @param source + * The source {@link Observable} which produces values. + * @param timespan + * The period of time each buffer is collecting values before it should be emitted, and + * replaced with a new buffer. + * @param unit + * The unit of time which applies to the "timespan" argument. + * @return + * An {@link Observable} which produces connected non-overlapping buffers with a fixed duration. + */ + public static Observable> buffer(Observable source, long timespan, TimeUnit unit) { + return create(OperationBuffer.buffer(source, timespan, unit)); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces connected + * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument. When the source + * Observable completes or encounters an error, the current buffer is emitted and the event is propagated. + * + * @param source + * The source {@link Observable} which produces values. + * @param timespan + * The period of time each buffer is collecting values before it should be emitted, and + * replaced with a new buffer. + * @param unit + * The unit of time which applies to the "timespan" argument. + * @param scheduler + * The {@link Scheduler} to use when determining the end and start of a buffer. + * @return + * An {@link Observable} which produces connected non-overlapping buffers with a fixed duration. + */ + public static Observable> buffer(Observable source, long timespan, TimeUnit unit, Scheduler scheduler) { + return create(OperationBuffer.buffer(source, timespan, unit, scheduler)); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces connected + * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument or a maximum size + * specified by the "count" argument (which ever is reached first). When the source Observable completes + * or encounters an error, the current buffer is emitted and the event is propagated. + * + * @param source + * The source {@link Observable} which produces values. + * @param timespan + * The period of time each buffer is collecting values before it should be emitted, and + * replaced with a new buffer. + * @param unit + * The unit of time which applies to the "timespan" argument. + * @param count + * The maximum size of each buffer before it should be emitted. + * @return + * An {@link Observable} which produces connected non-overlapping buffers which are emitted after + * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first). + */ + public static Observable> buffer(Observable source, long timespan, TimeUnit unit, int count) { + return create(OperationBuffer.buffer(source, timespan, unit, count)); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces connected + * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument or a maximum size + * specified by the "count" argument (which ever is reached first). When the source Observable completes + * or encounters an error, the current buffer is emitted and the event is propagated. + * + * @param source + * The source {@link Observable} which produces values. + * @param timespan + * The period of time each buffer is collecting values before it should be emitted, and + * replaced with a new buffer. + * @param unit + * The unit of time which applies to the "timespan" argument. + * @param count + * The maximum size of each buffer before it should be emitted. + * @param scheduler + * The {@link Scheduler} to use when determining the end and start of a buffer. + * @return + * An {@link Observable} which produces connected non-overlapping buffers which are emitted after + * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first). + */ + public static Observable> buffer(Observable source, long timespan, TimeUnit unit, int count, Scheduler scheduler) { + return create(OperationBuffer.buffer(source, timespan, unit, count, scheduler)); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable starts a new buffer + * periodically, which is determined by the "timeshift" argument. Each buffer is emitted after a fixed timespan + * specified by the "timespan" argument. When the source Observable completes or encounters an error, the + * current buffer is emitted and the event is propagated. + * + * @param source + * The source {@link Observable} which produces values. + * @param timespan + * The period of time each buffer is collecting values before it should be emitted. + * @param timeshift + * The period of time after which a new buffer will be created. + * @param unit + * The unit of time which applies to the "timespan" and "timeshift" argument. + * @return + * An {@link Observable} which produces new buffers periodically, and these are emitted after + * a fixed timespan has elapsed. + */ + public static Observable> buffer(Observable source, long timespan, long timeshift, TimeUnit unit) { + return create(OperationBuffer.buffer(source, timespan, timeshift, unit)); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable starts a new buffer + * periodically, which is determined by the "timeshift" argument. Each buffer is emitted after a fixed timespan + * specified by the "timespan" argument. When the source Observable completes or encounters an error, the + * current buffer is emitted and the event is propagated. + * + * @param source + * The source {@link Observable} which produces values. + * @param timespan + * The period of time each buffer is collecting values before it should be emitted. + * @param timeshift + * The period of time after which a new buffer will be created. + * @param unit + * The unit of time which applies to the "timespan" and "timeshift" argument. + * @param scheduler + * The {@link Scheduler} to use when determining the end and start of a buffer. + * @return + * An {@link Observable} which produces new buffers periodically, and these are emitted after + * a fixed timespan has elapsed. + */ + public static Observable> buffer(Observable source, long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) { + return create(OperationBuffer.buffer(source, timespan, timeshift, unit, scheduler)); + } + /** * Creates an Observable that will execute the given function when a {@link Observer} subscribes to it. *

@@ -2324,6 +2549,201 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) { }); } + /** + * Creates an Observable which produces buffers of collected values. This Observable produces connected + * non-overlapping buffers. The current buffer is emitted and replaced with a new buffer when the + * Observable produced by the specified {@link Func0} produces a {@link BufferClosing} object. The + * {@link Func0} will then be used to create a new Observable to listen for the end of the next buffer. + * + * @param bufferClosingSelector + * The {@link Func0} which is used to produce an {@link Observable} for every buffer created. + * When this {@link Observable} produces a {@link BufferClosing} object, the associated buffer + * is emitted and replaced with a new one. + * @return + * An {@link Observable} which produces connected non-overlapping buffers, which are emitted + * when the current {@link Observable} created with the {@link Func0} argument produces a + * {@link BufferClosing} object. + */ + public Observable> buffer(Func0> bufferClosingSelector) { + return buffer(this, bufferClosingSelector); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces buffers. + * Buffers are created when the specified "bufferOpenings" Observable produces a {@link BufferOpening} object. + * Additionally the {@link Func0} argument is used to create an Observable which produces {@link BufferClosing} + * objects. When this Observable produces such an object, the associated buffer is emitted. + * + * @param bufferOpenings + * The {@link Observable} which when it produces a {@link BufferOpening} object, will cause + * another buffer to be created. + * @param bufferClosingSelector + * The {@link Func0} which is used to produce an {@link Observable} for every buffer created. + * When this {@link Observable} produces a {@link BufferClosing} object, the associated buffer + * is emitted. + * @return + * An {@link Observable} which produces buffers which are created and emitted when the specified + * {@link Observable}s publish certain objects. + */ + public Observable> buffer(Observable bufferOpenings, Func1> bufferClosingSelector) { + return buffer(this, bufferOpenings, bufferClosingSelector); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces connected + * non-overlapping buffers, each containing "count" elements. When the source Observable completes or + * encounters an error, the current buffer is emitted, and the event is propagated. + * + * @param count + * The maximum size of each buffer before it should be emitted. + * @return + * An {@link Observable} which produces connected non-overlapping buffers containing at most + * "count" produced values. + */ + public Observable> buffer(int count) { + return buffer(this, count); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces buffers every + * "skip" values, each containing "count" elements. When the source Observable completes or encounters an error, + * the current buffer is emitted, and the event is propagated. + * + * @param count + * The maximum size of each buffer before it should be emitted. + * @param skip + * How many produced values need to be skipped before starting a new buffer. Note that when "skip" and + * "count" are equals that this is the same operation as {@link Observable#buffer(Observable, int)}. + * @return + * An {@link Observable} which produces buffers every "skipped" values containing at most + * "count" produced values. + */ + public Observable> buffer(int count, int skip) { + return buffer(this, count, skip); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces connected + * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument. When the source + * Observable completes or encounters an error, the current buffer is emitted and the event is propagated. + * + * @param timespan + * The period of time each buffer is collecting values before it should be emitted, and + * replaced with a new buffer. + * @param unit + * The unit of time which applies to the "timespan" argument. + * @return + * An {@link Observable} which produces connected non-overlapping buffers with a fixed duration. + */ + public Observable> buffer(long timespan, TimeUnit unit) { + return buffer(this, timespan, unit); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces connected + * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument. When the source + * Observable completes or encounters an error, the current buffer is emitted and the event is propagated. + * + * @param timespan + * The period of time each buffer is collecting values before it should be emitted, and + * replaced with a new buffer. + * @param unit + * The unit of time which applies to the "timespan" argument. + * @param scheduler + * The {@link Scheduler} to use when determining the end and start of a buffer. + * @return + * An {@link Observable} which produces connected non-overlapping buffers with a fixed duration. + */ + public Observable> buffer(long timespan, TimeUnit unit, Scheduler scheduler) { + return buffer(this, timespan, unit, scheduler); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces connected + * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument or a maximum size + * specified by the "count" argument (which ever is reached first). When the source Observable completes + * or encounters an error, the current buffer is emitted and the event is propagated. + * + * @param timespan + * The period of time each buffer is collecting values before it should be emitted, and + * replaced with a new buffer. + * @param unit + * The unit of time which applies to the "timespan" argument. + * @param count + * The maximum size of each buffer before it should be emitted. + * @return + * An {@link Observable} which produces connected non-overlapping buffers which are emitted after + * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first). + */ + public Observable> buffer(long timespan, TimeUnit unit, int count) { + return buffer(this, timespan, unit, count); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable produces connected + * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument or a maximum size + * specified by the "count" argument (which ever is reached first). When the source Observable completes + * or encounters an error, the current buffer is emitted and the event is propagated. + * + * @param timespan + * The period of time each buffer is collecting values before it should be emitted, and + * replaced with a new buffer. + * @param unit + * The unit of time which applies to the "timespan" argument. + * @param count + * The maximum size of each buffer before it should be emitted. + * @param scheduler + * The {@link Scheduler} to use when determining the end and start of a buffer. + * @return + * An {@link Observable} which produces connected non-overlapping buffers which are emitted after + * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first). + */ + public Observable> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler) { + return buffer(this, timespan, unit, count, scheduler); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable starts a new buffer + * periodically, which is determined by the "timeshift" argument. Each buffer is emitted after a fixed timespan + * specified by the "timespan" argument. When the source Observable completes or encounters an error, the + * current buffer is emitted and the event is propagated. + * + * @param timespan + * The period of time each buffer is collecting values before it should be emitted. + * @param timeshift + * The period of time after which a new buffer will be created. + * @param unit + * The unit of time which applies to the "timespan" and "timeshift" argument. + * @return + * An {@link Observable} which produces new buffers periodically, and these are emitted after + * a fixed timespan has elapsed. + */ + public Observable> buffer(long timespan, long timeshift, TimeUnit unit) { + return buffer(this, timespan, timeshift, unit); + } + + /** + * Creates an Observable which produces buffers of collected values. This Observable starts a new buffer + * periodically, which is determined by the "timeshift" argument. Each buffer is emitted after a fixed timespan + * specified by the "timespan" argument. When the source Observable completes or encounters an error, the + * current buffer is emitted and the event is propagated. + * + * @param timespan + * The period of time each buffer is collecting values before it should be emitted. + * @param timeshift + * The period of time after which a new buffer will be created. + * @param unit + * The unit of time which applies to the "timespan" and "timeshift" argument. + * @param scheduler + * The {@link Scheduler} to use when determining the end and start of a buffer. + * @return + * An {@link Observable} which produces new buffers periodically, and these are emitted after + * a fixed timespan has elapsed. + */ + public Observable> buffer(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) { + return buffer(this, timespan, timeshift, unit, scheduler); + } + /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java new file mode 100644 index 0000000000..9c9f6aeba1 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -0,0 +1,1013 @@ +package rx.operators; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + + +public final class OperationBuffer { + + /** + *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * values from the specified {@link Observable} source and stores them in a buffer until the {@link Observable} + * constructed using the {@link Func0} argument, produces a {@link BufferClosing} value. The buffer is then + * emitted, and a new buffer is created to replace it. A new {@link Observable} will be constructed using the + * provided {@link Func0} object, which will determine when this new buffer is emitted. When the source + * {@link Observable} completes or produces an error, the current buffer is emitted, and the event is propagated + * to all subscribed {@link Observer}s.

+ * + *

Note that this operation only produces non-overlapping buffers. At all times there is + * exactly one buffer actively storing values.

+ * + * @param source + * The {@link Observable} which produces values. + * @param bufferClosingSelector + * A {@link Func0} object which produces {@link Observable}s. These + * {@link Observable}s determine when a buffer is emitted and replaced by simply + * producing an {@link BufferClosing} object. + * @return + * the {@link Func1} object representing the specified buffer operation. + */ + public static Func1>, Subscription> buffer(final Observable source, final Func0> bufferClosingSelector) { + return new Func1>, Subscription>() { + @Override + public Subscription call(final Observer> observer) { + NonOverlappingBuffers buffers = new NonOverlappingBuffers(observer); + BufferCreator creator = new ObservableBasedSingleBufferCreator(buffers, bufferClosingSelector); + return source.subscribe(new BufferObserver(buffers, observer, creator)); + } + }; + } + + /** + *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * values from the specified {@link Observable} source and stores them in the currently active buffers. Initially + * there are no buffers active.

+ * + *

Buffers can be created by pushing a {@link BufferOpening} value to the "bufferOpenings" {@link Observable}. + * This creates a new buffer which will then start recording values which are produced by the "source" + * {@link Observable}. Additionally the "bufferClosingSelector" will be used to construct an {@link Observable} + * which can produce {@link BufferClosing} values. When it does so it will close this (and only this) newly created + * buffer. When the source {@link Observable} completes or produces an error, all buffers are emitted, and the + * event is propagated to all subscribed {@link Observer}s.

+ * + *

Note that when using this operation multiple overlapping buffers + * could be active at any one point.

+ * + * @param source + * The {@link Observable} which produces values. + * @param bufferOpenings + * An {@link Observable} which when it produces a {@link BufferOpening} value will + * create a new buffer which instantly starts recording the "source" {@link Observable}. + * @param bufferClosingSelector + * A {@link Func0} object which produces {@link Observable}s. These + * {@link Observable}s determine when a buffer is emitted and replaced by simply + * producing an {@link BufferClosing} object. + * @return + * the {@link Func1} object representing the specified buffer operation. + */ + public static Func1>, Subscription> buffer(final Observable source, final Observable bufferOpenings, final Func1> bufferClosingSelector) { + return new Func1>, Subscription>() { + @Override + public Subscription call(final Observer> observer) { + OverlappingBuffers buffers = new OverlappingBuffers(observer); + BufferCreator creator = new ObservableBasedMultiBufferCreator(buffers, bufferOpenings, bufferClosingSelector); + return source.subscribe(new BufferObserver(buffers, observer, creator)); + } + }; + } + + /** + *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * values from the specified {@link Observable} source and stores them in a buffer until the buffer contains + * a specified number of elements. The buffer is then emitted, and a new buffer is created to replace it. + * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and + * the event is propagated to all subscribed {@link Observer}s.

+ * + *

Note that this operation only produces non-overlapping buffers. At all times there is + * exactly one buffer actively storing values.

+ * + * @param source + * The {@link Observable} which produces values. + * @param count + * The number of elements a buffer should have before being emitted and replaced. + * @return + * the {@link Func1} object representing the specified buffer operation. + */ + public static Func1>, Subscription> buffer(Observable source, int count) { + return buffer(source, count, count); + } + + /** + *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * values from the specified {@link Observable} source and stores them in all active buffers until the buffer + * contains a specified number of elements. The buffer is then emitted. Buffers are created after a certain + * amount of values have been received. When the source {@link Observable} completes or produces an error, the + * currently active buffers are emitted, and the event is propagated to all subscribed {@link Observer}s.

+ * + *

Note that this operation can produce non-connected, connected non-overlapping, or overlapping + * buffers depending on the input parameters.

+ * + * @param source + * The {@link Observable} which produces values. + * @param count + * The number of elements a buffer should have before being emitted. + * @param skip + * The interval with which buffers have to be created. Note that when "skip" == "count" + * that this is the same as calling {@link OperationBuffer#buffer(Observable, int)}. + * If "skip" < "count", this buffer operation will produce overlapping buffers and if "skip" + * > "count" non-overlapping buffers will be created and some values will not be pushed + * into a buffer at all! + * @return + * the {@link Func1} object representing the specified buffer operation. + */ + public static Func1>, Subscription> buffer(final Observable source, final int count, final int skip) { + return new Func1>, Subscription>() { + @Override + public Subscription call(final Observer> observer) { + Buffers buffers = new SizeBasedBuffers(observer, count); + BufferCreator creator = new SkippingBufferCreator(buffers, skip); + return source.subscribe(new BufferObserver(buffers, observer, creator)); + } + }; + } + + /** + *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer + * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. + * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and + * the event is propagated to all subscribed {@link Observer}s.

+ * + *

Note that this operation only produces non-overlapping buffers. At all times there is + * exactly one buffer actively storing values.

+ * + * @param source + * The {@link Observable} which produces values. + * @param timespan + * The amount of time all buffers must be actively collect values before being emitted. + * @param unit + * The {@link TimeUnit} defining the unit of time for the timespan. + * @return + * the {@link Func1} object representing the specified buffer operation. + */ + public static Func1>, Subscription> buffer(Observable source, long timespan, TimeUnit unit) { + return buffer(source, timespan, unit, Schedulers.newThread()); + } + + /** + *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer + * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. + * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and + * the event is propagated to all subscribed {@link Observer}s.

+ * + *

Note that this operation only produces non-overlapping buffers. At all times there is + * exactly one buffer actively storing values.

+ * + * @param source + * The {@link Observable} which produces values. + * @param timespan + * The amount of time all buffers must be actively collect values before being emitted. + * @param unit + * The {@link TimeUnit} defining the unit of time for the timespan. + * @param scheduler + * The {@link Scheduler} to use for timing buffers. + * @return + * the {@link Func1} object representing the specified buffer operation. + */ + public static Func1>, Subscription> buffer(final Observable source, final long timespan, final TimeUnit unit, final Scheduler scheduler) { + return new Func1>, Subscription>() { + @Override + public Subscription call(final Observer> observer) { + NonOverlappingBuffers buffers = new NonOverlappingBuffers(observer); + BufferCreator creator = new TimeBasedBufferCreator(buffers, timespan, unit, scheduler); + return source.subscribe(new BufferObserver(buffers, observer, creator)); + } + }; + } + + /** + *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer + * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. + * Additionally the buffer is automatically emitted once it reaches a specified number of elements. + * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and + * the event is propagated to all subscribed {@link Observer}s.

+ * + *

Note that this operation only produces non-overlapping buffers. At all times there is + * exactly one buffer actively storing values.

+ * + * @param source + * The {@link Observable} which produces values. + * @param timespan + * The amount of time all buffers must be actively collect values before being emitted. + * @param unit + * The {@link TimeUnit} defining the unit of time for the timespan. + * @param count + * The maximum size of the buffer. Once a buffer reaches this size, it is emitted. + * @return + * the {@link Func1} object representing the specified buffer operation. + */ + public static Func1>, Subscription> buffer(Observable source, long timespan, TimeUnit unit, int count) { + return buffer(source, timespan, unit, count, Schedulers.newThread()); + } + + /** + *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer + * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. + * Additionally the buffer is automatically emitted once it reaches a specified number of elements. + * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and + * the event is propagated to all subscribed {@link Observer}s.

+ * + *

Note that this operation only produces non-overlapping buffers. At all times there is + * exactly one buffer actively storing values.

+ * + * @param source + * The {@link Observable} which produces values. + * @param timespan + * The amount of time all buffers must be actively collect values before being emitted. + * @param unit + * The {@link TimeUnit} defining the unit of time for the timespan. + * @param count + * The maximum size of the buffer. Once a buffer reaches this size, it is emitted. + * @param scheduler + * The {@link Scheduler} to use for timing buffers. + * @return + * the {@link Func1} object representing the specified buffer operation. + */ + public static Func1>, Subscription> buffer(final Observable source, final long timespan, final TimeUnit unit, final int count, final Scheduler scheduler) { + return new Func1>, Subscription>() { + @Override + public Subscription call(final Observer> observer) { + Buffers buffers = new TimeAndSizeBasedBuffers(observer, count, timespan, unit, scheduler); + BufferCreator creator = new SingleBufferCreator(buffers); + return source.subscribe(new BufferObserver(buffers, observer, creator)); + } + }; + } + + /** + *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer + * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. + * The creation of buffers is also periodical. How often this is done depends on the specified timeshift. + * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and + * the event is propagated to all subscribed {@link Observer}s.

+ * + *

Note that this operation can produce non-connected, or overlapping buffers depending + * on the input parameters.

+ * + * @param source + * The {@link Observable} which produces values. + * @param timespan + * The amount of time all buffers must be actively collect values before being emitted. + * @param timeshift + * The amount of time between creating buffers. + * @param unit + * The {@link TimeUnit} defining the unit of time for the timespan. + * @return + * the {@link Func1} object representing the specified buffer operation. + */ + public static Func1>, Subscription> buffer(Observable source, long timespan, long timeshift, TimeUnit unit) { + return buffer(source, timespan, timeshift, unit, Schedulers.newThread()); + } + + /** + *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer + * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. + * The creation of buffers is also periodical. How often this is done depends on the specified timeshift. + * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and + * the event is propagated to all subscribed {@link Observer}s.

+ * + *

Note that this operation can produce non-connected, or overlapping buffers depending + * on the input parameters.

+ * + * @param source + * The {@link Observable} which produces values. + * @param timespan + * The amount of time all buffers must be actively collect values before being emitted. + * @param timeshift + * The amount of time between creating buffers. + * @param unit + * The {@link TimeUnit} defining the unit of time for the timespan. + * @param scheduler + * The {@link Scheduler} to use for timing buffers. + * @return + * the {@link Func1} object representing the specified buffer operation. + */ + public static Func1>, Subscription> buffer(final Observable source, final long timespan, final long timeshift, final TimeUnit unit, final Scheduler scheduler) { + return new Func1>, Subscription>() { + @Override + public Subscription call(final Observer> observer) { + OverlappingBuffers buffers = new TimeBasedBuffers(observer, timespan, unit, scheduler); + BufferCreator creator = new TimeBasedBufferCreator(buffers, timeshift, unit, scheduler); + return source.subscribe(new BufferObserver(buffers, observer, creator)); + } + }; + } + + private static class BufferObserver implements Observer { + + private final Buffers buffers; + private final Observer> observer; + private final BufferCreator creator; + + public BufferObserver(Buffers buffers, Observer> observer, BufferCreator creator) { + this.observer = observer; + this.creator = creator; + this.buffers = buffers; + } + + @Override + public void onCompleted() { + creator.stop(); + buffers.emitAllBuffers(); + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + creator.stop(); + buffers.emitAllBuffers(); + observer.onError(e); + e.printStackTrace(); + } + + @Override + public void onNext(T args) { + creator.onValuePushed(); + buffers.pushValue(args); + } + } + + private interface BufferCreator { + void onValuePushed(); + void stop(); + } + + private static class SingleBufferCreator implements BufferCreator { + + public SingleBufferCreator(Buffers buffers) { + buffers.createBuffer(); + } + + @Override + public void onValuePushed() { + // Do nothing. + } + + @Override + public void stop() { + // Do nothing. + } + } + + private static class ObservableBasedSingleBufferCreator implements BufferCreator { + + private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + private final Func0> bufferClosingSelector; + private final NonOverlappingBuffers buffers; + + public ObservableBasedSingleBufferCreator(NonOverlappingBuffers buffers, Func0> bufferClosingSelector) { + this.buffers = buffers; + this.bufferClosingSelector = bufferClosingSelector; + + buffers.createBuffer(); + listenForBufferEnd(); + } + + private void listenForBufferEnd() { + Observable closingObservable = bufferClosingSelector.call(); + closingObservable.subscribe(new Action1() { + @Override + public void call(BufferClosing closing) { + buffers.emitAndReplaceBuffer(); + listenForBufferEnd(); + } + }); + } + + @Override + public void onValuePushed() { + // Ignore value pushes. + } + + @Override + public void stop() { + subscription.unsubscribe(); + } + } + + private static class ObservableBasedMultiBufferCreator implements BufferCreator { + + private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + + public ObservableBasedMultiBufferCreator(final Buffers buffers, Observable bufferOpenings, final Func1> bufferClosingSelector) { + subscription.wrap(bufferOpenings.subscribe(new Action1() { + @Override + public void call(BufferOpening opening) { + final Buffer buffer = buffers.createBuffer(); + Observable closingObservable = bufferClosingSelector.call(opening); + + closingObservable.subscribe(new Action1() { + @Override + public void call(BufferClosing closing) { + buffers.emitBuffer(buffer); + } + }); + } + })); + } + + @Override + public void onValuePushed() { + // Ignore value pushes. + } + + @Override + public void stop() { + subscription.unsubscribe(); + } + } + + private static class TimeBasedBufferCreator implements BufferCreator { + + private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + + public TimeBasedBufferCreator(final NonOverlappingBuffers buffers, long time, TimeUnit unit, Scheduler scheduler) { + this.subscription.wrap(scheduler.schedulePeriodically(new Action0() { + @Override + public void call() { + buffers.emitAndReplaceBuffer(); + } + }, 0, time, unit)); + } + + public TimeBasedBufferCreator(final OverlappingBuffers buffers, long time, TimeUnit unit, Scheduler scheduler) { + this.subscription.wrap(scheduler.schedulePeriodically(new Action0() { + @Override + public void call() { + buffers.createBuffer(); + } + }, 0, time, unit)); + } + + @Override + public void onValuePushed() { + // Do nothing: buffers are created periodically. + } + + @Override + public void stop() { + subscription.unsubscribe(); + } + } + + private static class SkippingBufferCreator implements BufferCreator { + + private final AtomicInteger skipped = new AtomicInteger(1); + private final Buffers buffers; + private final int skip; + + public SkippingBufferCreator(Buffers buffers, int skip) { + this.buffers = buffers; + this.skip = skip; + } + + @Override + public void onValuePushed() { + if (skipped.decrementAndGet() == 0) { + skipped.set(skip); + buffers.createBuffer(); + } + } + + @Override + public void stop() { + // Nothing to stop: we're not using a Scheduler. + } + } + + private static class NonOverlappingBuffers extends Buffers { + + private final Object lock = new Object(); + + public NonOverlappingBuffers(Observer> observer) { + super(observer); + } + + public Buffer emitAndReplaceBuffer() { + synchronized (lock) { + emitBuffer(getBuffer()); + return createBuffer(); + } + } + + @Override + public void pushValue(T value) { + synchronized (lock) { + super.pushValue(value); + } + } + } + + private static class OverlappingBuffers extends Buffers { + + public OverlappingBuffers(Observer> observer) { + super(observer); + } + } + + private static class TimeAndSizeBasedBuffers extends Buffers { + + private final ConcurrentMap, Subscription> subscriptions = new ConcurrentHashMap, Subscription>(); + + private final Scheduler scheduler; + private final long maxTime; + private final TimeUnit unit; + private final int maxSize; + + public TimeAndSizeBasedBuffers(Observer> observer, int maxSize, long maxTime, TimeUnit unit, Scheduler scheduler) { + super(observer); + this.maxSize = maxSize; + this.maxTime = maxTime; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Buffer createBuffer() { + final Buffer buffer = super.createBuffer(); + subscriptions.put(buffer, scheduler.schedule(new Action0() { + @Override + public void call() { + emitBuffer(buffer); + } + }, maxTime, unit)); + return buffer; + } + + @Override + public void emitBuffer(Buffer buffer) { + Subscription subscription = subscriptions.remove(buffer); + if (subscription == null) { + // Buffer was already emitted. + return; + } + + subscription.unsubscribe(); + super.emitBuffer(buffer); + createBuffer(); + } + + @Override + public void pushValue(T value) { + super.pushValue(value); + + Buffer buffer; + while ((buffer = getBuffer()) != null) { + if (buffer.contents.size() >= maxSize) { + emitBuffer(buffer); + } else { + // Buffer is not at full capacity yet, and neither will remaining buffers be so we can terminate. + break; + } + } + } + } + + private static class TimeBasedBuffers extends OverlappingBuffers { + + private final ConcurrentMap, Subscription> subscriptions = new ConcurrentHashMap, Subscription>(); + + private final Scheduler scheduler; + private final long time; + private final TimeUnit unit; + + public TimeBasedBuffers(Observer> observer, long time, TimeUnit unit, Scheduler scheduler) { + super(observer); + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Buffer createBuffer() { + final Buffer buffer = super.createBuffer(); + subscriptions.put(buffer, scheduler.schedule(new Action0() { + @Override + public void call() { + emitBuffer(buffer); + } + }, time, unit)); + return buffer; + } + + @Override + public void emitBuffer(Buffer buffer) { + subscriptions.remove(buffer); + super.emitBuffer(buffer); + } + } + + private static class SizeBasedBuffers extends Buffers { + + private final int size; + + public SizeBasedBuffers(Observer> observer, int size) { + super(observer); + this.size = size; + } + + @Override + public void pushValue(T value) { + super.pushValue(value); + + Buffer buffer; + while ((buffer = getBuffer()) != null) { + if (buffer.contents.size() >= size) { + emitBuffer(buffer); + } else { + // Buffer is not at full capacity yet, and neither will remaining buffers be so we can terminate. + break; + } + } + } + } + + private static class Buffers { + + private final Queue> buffers = new ConcurrentLinkedQueue>(); + private final Observer> observer; + + public Buffers(Observer> observer) { + this.observer = observer; + } + + public Buffer createBuffer() { + Buffer buffer = new Buffer(); + buffers.add(buffer); + return buffer; + } + + public void emitAllBuffers() { + Buffer buffer; + while ((buffer = buffers.poll()) != null) { + observer.onNext(buffer.getContents()); + } + } + + public void emitBuffer(Buffer buffer) { + if (!buffers.remove(buffer)) { + // Concurrency issue: Buffer is already emitted! + return; + } + observer.onNext(buffer.getContents()); + } + + public Buffer getBuffer() { + return buffers.peek(); + } + + public void pushValue(T value) { + List> copy = new ArrayList>(buffers); + for (Buffer buffer : copy) { + buffer.pushValue(value); + } + } + } + + private static class Buffer { + private final List contents = new ArrayList(); + + public void pushValue(T value) { + contents.add(value); + } + + public List getContents() { + return contents; + } + } + + public interface BufferOpening { + // Tagging interface for objects which can open buffers. + } + + public interface BufferClosing { + // Tagging interface for objects which can close buffers. + } + + public static class BufferOpenings implements BufferOpening { + // Simple class implementing BufferOpening for conveniance. + } + public static class BufferClosings implements BufferClosing { + // Simple class implementing BufferClosing for conveniance. + } + + public static class UnitTest { + + private Observer> observer; + private TestScheduler scheduler; + + @Before + @SuppressWarnings("unchecked") + public void before() { + observer = Mockito.mock(Observer.class); + scheduler = new TestScheduler(); + } + + @Test + public void testComplete() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + observer.onCompleted(); + return Subscriptions.empty(); + } + }); + + Observable> buffered = Observable.create(buffer(source, 3, 3)); + buffered.subscribe(observer); + + Mockito.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); + Mockito.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + Mockito.verify(observer, Mockito.times(1)).onCompleted(); + } + + @Test + public void testSkipAndCountOverlappingBuffers() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + observer.onNext("one"); + observer.onNext("two"); + observer.onNext("three"); + observer.onNext("four"); + observer.onNext("five"); + return Subscriptions.empty(); + } + }); + + Observable> buffered = Observable.create(buffer(source, 3, 1)); + buffered.subscribe(observer); + + InOrder inOrder = Mockito.inOrder(observer); + inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two", "three")); + inOrder.verify(observer, Mockito.times(1)).onNext(list("two", "three", "four")); + inOrder.verify(observer, Mockito.times(1)).onNext(list("three", "four", "five")); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.never()).onCompleted(); + } + + @Test + public void testSkipAndCountGaplessBuffers() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + observer.onNext("one"); + observer.onNext("two"); + observer.onNext("three"); + observer.onNext("four"); + observer.onNext("five"); + observer.onCompleted(); + return Subscriptions.empty(); + } + }); + + Observable> buffered = Observable.create(buffer(source, 3, 3)); + buffered.subscribe(observer); + + InOrder inOrder = Mockito.inOrder(observer); + inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two", "three")); + inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five")); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.times(1)).onCompleted(); + } + + @Test + public void testSkipAndCountBuffersWithGaps() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + observer.onNext("one"); + observer.onNext("two"); + observer.onNext("three"); + observer.onNext("four"); + observer.onNext("five"); + observer.onCompleted(); + return Subscriptions.empty(); + } + }); + + Observable> buffered = Observable.create(buffer(source, 2, 3)); + buffered.subscribe(observer); + + InOrder inOrder = Mockito.inOrder(observer); + inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two")); + inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five")); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.times(1)).onCompleted(); + } + + @Test + public void testTimedAndCount() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + push(observer, "one", 10); + push(observer, "two", 90); + push(observer, "three", 110); + push(observer, "four", 190); + push(observer, "five", 210); + complete(observer, 250); + return Subscriptions.empty(); + } + }); + + Observable> buffered = Observable.create(buffer(source, 100, TimeUnit.MILLISECONDS, 2, scheduler)); + buffered.subscribe(observer); + + InOrder inOrder = Mockito.inOrder(observer); + scheduler.advanceTimeTo(100, TimeUnit.MILLISECONDS); + inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two")); + + scheduler.advanceTimeTo(200, TimeUnit.MILLISECONDS); + inOrder.verify(observer, Mockito.times(1)).onNext(list("three", "four")); + + scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); + inOrder.verify(observer, Mockito.times(1)).onNext(list("five")); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.times(1)).onCompleted(); + } + + @Test + public void testTimed() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + push(observer, "one", 98); + push(observer, "two", 99); + push(observer, "three", 100); + push(observer, "four", 101); + push(observer, "five", 102); + complete(observer, 150); + return Subscriptions.empty(); + } + }); + + Observable> buffered = Observable.create(buffer(source, 100, TimeUnit.MILLISECONDS, scheduler)); + buffered.subscribe(observer); + + InOrder inOrder = Mockito.inOrder(observer); + scheduler.advanceTimeTo(101, TimeUnit.MILLISECONDS); + inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two", "three")); + + scheduler.advanceTimeTo(201, TimeUnit.MILLISECONDS); + inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five")); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.times(1)).onCompleted(); + } + + @Test + public void testObservableBasedOpenerAndCloser() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + push(observer, "one", 10); + push(observer, "two", 60); + push(observer, "three", 110); + push(observer, "four", 160); + push(observer, "five", 210); + complete(observer, 500); + return Subscriptions.empty(); + } + }); + + Observable openings = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + push(observer, new BufferOpenings(), 50); + push(observer, new BufferOpenings(), 200); + complete(observer, 250); + return Subscriptions.empty(); + } + }); + + Func1> closer = new Func1>() { + @Override + public Observable call(BufferOpening opening) { + return Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + push(observer, new BufferClosings(), 100); + complete(observer, 101); + return Subscriptions.empty(); + } + }); + } + }; + + Observable> buffered = Observable.create(buffer(source, openings, closer)); + buffered.subscribe(observer); + + InOrder inOrder = Mockito.inOrder(observer); + scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS); + inOrder.verify(observer, Mockito.times(1)).onNext(list("two", "three")); + inOrder.verify(observer, Mockito.times(1)).onNext(list("five")); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.times(1)).onCompleted(); + } + + @Test + public void testObservableBasedCloser() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + push(observer, "one", 10); + push(observer, "two", 60); + push(observer, "three", 110); + push(observer, "four", 160); + push(observer, "five", 210); + complete(observer, 500); + return Subscriptions.empty(); + } + }); + + Func0> closer = new Func0>() { + @Override + public Observable call() { + return Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + push(observer, new BufferClosings(), 100); + complete(observer, 101); + return Subscriptions.empty(); + } + }); + } + }; + + Observable> buffered = Observable.create(buffer(source, closer)); + buffered.subscribe(observer); + + InOrder inOrder = Mockito.inOrder(observer); + scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS); + inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two")); + inOrder.verify(observer, Mockito.times(1)).onNext(list("three", "four")); + inOrder.verify(observer, Mockito.times(1)).onNext(list("five")); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.times(1)).onCompleted(); + } + + private List list(String... args) { + List list = new ArrayList(); + for (String arg : args) { + list.add(arg); + } + return list; + } + + private void push(final Observer observer, final T value, int delay) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onNext(value); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void complete(final Observer observer, int delay) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onCompleted(); + } + }, delay, TimeUnit.MILLISECONDS); + } + } +} From bf01afa8d712740fd784cae6a968ae9b7fe2c027 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Wed, 29 May 2013 23:53:00 +0200 Subject: [PATCH 2/7] Restored static imports in Observable. --- rxjava-core/src/main/java/rx/Observable.java | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 88f2590751..e35a276661 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,6 +15,10 @@ */ package rx; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -94,17 +98,6 @@ import rx.util.functions.Function; import rx.util.functions.FunctionLanguageAdaptor; import rx.util.functions.Functions; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyString; /** * The Observable interface that implements the Reactive Pattern. From 2c6ae528859e5c63dd661b8c17e92341cf46b1af Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Wed, 29 May 2013 23:57:47 +0200 Subject: [PATCH 3/7] Added static create methods to BufferOpenings and BufferClosings. --- .../java/rx/operators/OperationBuffer.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index 9c9f6aeba1..55b63c47f0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -722,11 +722,25 @@ public interface BufferClosing { // Tagging interface for objects which can close buffers. } - public static class BufferOpenings implements BufferOpening { - // Simple class implementing BufferOpening for conveniance. + public static class BufferOpenings { + + public static BufferOpening create() { + return new BufferOpening() {}; + } + + private BufferOpenings() { + // Prevent instantation. + } } - public static class BufferClosings implements BufferClosing { - // Simple class implementing BufferClosing for conveniance. + public static class BufferClosings { + + public static BufferClosing create() { + return new BufferClosing() {}; + } + + private BufferClosings() { + // Prevent instantation. + } } public static class UnitTest { From 3ed9f0f35aa82e04798ff4f35a32207787d78074 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Thu, 30 May 2013 00:02:02 +0200 Subject: [PATCH 4/7] Added license header to OperationBuffer. --- .../java/rx/operators/OperationBuffer.java | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index 55b63c47f0..913a7abe78 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; import java.util.ArrayList; @@ -925,8 +940,8 @@ public Subscription call(Observer observer) { Observable openings = Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer observer) { - push(observer, new BufferOpenings(), 50); - push(observer, new BufferOpenings(), 200); + push(observer, BufferOpenings.create(), 50); + push(observer, BufferOpenings.create(), 200); complete(observer, 250); return Subscriptions.empty(); } @@ -938,7 +953,7 @@ public Observable call(BufferOpening opening) { return Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer observer) { - push(observer, new BufferClosings(), 100); + push(observer, BufferClosings.create(), 100); complete(observer, 101); return Subscriptions.empty(); } @@ -978,7 +993,7 @@ public Observable call() { return Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer observer) { - push(observer, new BufferClosings(), 100); + push(observer, BufferClosings.create(), 100); complete(observer, 101); return Subscriptions.empty(); } From 6f500493949d7c1e938a1f3325e1033eef6e13b2 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Sat, 8 Jun 2013 15:45:03 +0200 Subject: [PATCH 5/7] Changed default scheduler used in buffer operation and removed printStackTrace call --- .../src/main/java/rx/operators/OperationBuffer.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index 913a7abe78..6335d8393a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -189,7 +189,7 @@ public Subscription call(final Observer> observer) { * the {@link Func1} object representing the specified buffer operation. */ public static Func1>, Subscription> buffer(Observable source, long timespan, TimeUnit unit) { - return buffer(source, timespan, unit, Schedulers.newThread()); + return buffer(source, timespan, unit, Schedulers.threadPoolForComputation()); } /** @@ -247,7 +247,7 @@ public Subscription call(final Observer> observer) { * the {@link Func1} object representing the specified buffer operation. */ public static Func1>, Subscription> buffer(Observable source, long timespan, TimeUnit unit, int count) { - return buffer(source, timespan, unit, count, Schedulers.newThread()); + return buffer(source, timespan, unit, count, Schedulers.threadPoolForComputation()); } /** @@ -308,7 +308,7 @@ public Subscription call(final Observer> observer) { * the {@link Func1} object representing the specified buffer operation. */ public static Func1>, Subscription> buffer(Observable source, long timespan, long timeshift, TimeUnit unit) { - return buffer(source, timespan, timeshift, unit, Schedulers.newThread()); + return buffer(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation()); } /** @@ -370,7 +370,6 @@ public void onError(Exception e) { creator.stop(); buffers.emitAllBuffers(); observer.onError(e); - e.printStackTrace(); } @Override From 81adc5c56ed93b418ffdc158f7db21a25b3402f5 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Sun, 30 Jun 2013 13:23:31 +0200 Subject: [PATCH 6/7] Added JavaDoc and moved public helper classes of OperationBuffer class to rx.util package. --- rxjava-core/src/main/java/rx/Observable.java | 4 +- .../java/rx/operators/OperationBuffer.java | 180 +++++++++++++++--- .../src/main/java/rx/util/BufferClosing.java | 5 + .../src/main/java/rx/util/BufferClosings.java | 12 ++ .../src/main/java/rx/util/BufferOpening.java | 5 + .../src/main/java/rx/util/BufferOpenings.java | 12 ++ 6 files changed, 185 insertions(+), 33 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/util/BufferClosing.java create mode 100644 rxjava-core/src/main/java/rx/util/BufferClosings.java create mode 100644 rxjava-core/src/main/java/rx/util/BufferOpening.java create mode 100644 rxjava-core/src/main/java/rx/util/BufferOpenings.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e35a276661..1b859c7184 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -42,8 +42,6 @@ import rx.operators.AtomicObserver; import rx.operators.OperationAll; import rx.operators.OperationBuffer; -import rx.operators.OperationBuffer.BufferClosing; -import rx.operators.OperationBuffer.BufferOpening; import rx.operators.OperationCache; import rx.operators.OperationConcat; import rx.operators.OperationDefer; @@ -85,6 +83,8 @@ import rx.subjects.Subject; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; +import rx.util.BufferClosing; +import rx.util.BufferOpening; import rx.util.Range; import rx.util.Timestamped; import rx.util.functions.Action0; diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index 6335d8393a..d41fe41904 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -36,6 +36,10 @@ import rx.concurrency.Schedulers; import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; +import rx.util.BufferClosing; +import rx.util.BufferClosings; +import rx.util.BufferOpening; +import rx.util.BufferOpenings; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func0; @@ -346,6 +350,15 @@ public Subscription call(final Observer> observer) { }; } + /** + * This {@link BufferObserver} object can be constructed using a {@link Buffers} object, + * a {@link Observer} object, and a {@link BufferCreator} object. The {@link BufferCreator} + * will manage the creation, and in some rare cases emission of internal {@link Buffer} objects + * in the specified {@link Buffers} object. Under normal circumstances the {@link Buffers} + * object specifies when a created {@link Buffer} is emitted. + * + * @param The type of object all internal {@link Buffer} objects record. + */ private static class BufferObserver implements Observer { private final Buffers buffers; @@ -379,11 +392,31 @@ public void onNext(T args) { } } + /** + * This interface defines a way which specifies when to create a new internal {@link Buffer} object. + * + * @param The type of object all internal {@link Buffer} objects record. + */ private interface BufferCreator { + /** + * Signifies a onNext event. + */ void onValuePushed(); + + /** + * Signifies a onCompleted or onError event. Should be used to clean up open + * subscriptions and other still running background tasks. + */ void stop(); } + /** + * This {@link BufferCreator} creates a new {@link Buffer} when it is initialized, but + * provides no additional functionality. This class should primarily be used when the + * internal {@link Buffer} is closed externally. + * + * @param The type of object all internal {@link Buffer} objects record. + */ private static class SingleBufferCreator implements BufferCreator { public SingleBufferCreator(Buffers buffers) { @@ -401,6 +434,13 @@ public void stop() { } } + /** + * This {@link BufferCreator} creates a new {@link Buffer} whenever it receives an + * object from the provided {@link Observable} created with the + * bufferClosingSelector {@link Func0}. + * + * @param The type of object all internal {@link Buffer} objects record. + */ private static class ObservableBasedSingleBufferCreator implements BufferCreator { private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); @@ -437,11 +477,19 @@ public void stop() { } } + /** + * This {@link BufferCreator} creates a new {@link Buffer} whenever it receives + * an object from the provided bufferOpenings {@link Observable}, and closes the corresponding + * {@link Buffer} object when it receives an object from the provided {@link Observable} created + * with the bufferClosingSelector {@link Func1}. + * + * @param The type of object all internal {@link Buffer} objects record. + */ private static class ObservableBasedMultiBufferCreator implements BufferCreator { private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - public ObservableBasedMultiBufferCreator(final Buffers buffers, Observable bufferOpenings, final Func1> bufferClosingSelector) { + public ObservableBasedMultiBufferCreator(final OverlappingBuffers buffers, Observable bufferOpenings, final Func1> bufferClosingSelector) { subscription.wrap(bufferOpenings.subscribe(new Action1() { @Override public void call(BufferOpening opening) { @@ -469,6 +517,12 @@ public void stop() { } } + /** + * This {@link BufferCreator} creates a new {@link Buffer} every time after a fixed + * period of time has elapsed. + * + * @param The type of object all internal {@link Buffer} objects record. + */ private static class TimeBasedBufferCreator implements BufferCreator { private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); @@ -502,6 +556,12 @@ public void stop() { } } + /** + * This {@link BufferCreator} creates a new {@link Buffer} every time after it has + * seen a certain amount of elements. + * + * @param The type of object all internal {@link Buffer} objects record. + */ private static class SkippingBufferCreator implements BufferCreator { private final AtomicInteger skipped = new AtomicInteger(1); @@ -527,6 +587,12 @@ public void stop() { } } + /** + * This class is an extension on the {@link Buffers} class which only supports one + * active (not yet emitted) internal {@link Buffer} object. + * + * @param The type of object all internal {@link Buffer} objects record. + */ private static class NonOverlappingBuffers extends Buffers { private final Object lock = new Object(); @@ -550,13 +616,27 @@ public void pushValue(T value) { } } + /** + * This class is an extension on the {@link Buffers} class which actually has no additional + * behavior than its super class. Classes extending this class, are expected to support + * two or more active (not yet emitted) internal {@link Buffer} objects. + * + * @param The type of object all internal {@link Buffer} objects record. + */ private static class OverlappingBuffers extends Buffers { - public OverlappingBuffers(Observer> observer) { super(observer); } } + /** + * This class is an extension on the {@link Buffers} class. Every internal buffer has + * a has a maximum time to live and a maximum internal capacity. When the buffer has + * reached the end of its life, or reached its maximum internal capacity it is + * automatically emitted. + * + * @param The type of object all internal {@link Buffer} objects record. + */ private static class TimeAndSizeBasedBuffers extends Buffers { private final ConcurrentMap, Subscription> subscriptions = new ConcurrentHashMap, Subscription>(); @@ -615,6 +695,13 @@ public void pushValue(T value) { } } + /** + * This class is an extension on the {@link Buffers} class. Every internal buffer has + * a has a maximum time to live. When the buffer has reached the end of its life it is + * automatically emitted. + * + * @param The type of object all internal {@link Buffer} objects record. + */ private static class TimeBasedBuffers extends OverlappingBuffers { private final ConcurrentMap, Subscription> subscriptions = new ConcurrentHashMap, Subscription>(); @@ -649,6 +736,13 @@ public void emitBuffer(Buffer buffer) { } } + /** + * This class is an extension on the {@link Buffers} class. Every internal buffer has + * a fixed maximum capacity. When the buffer has reached its maximum capacity it is + * automatically emitted. + * + * @param The type of object all internal {@link Buffer} objects record. + */ private static class SizeBasedBuffers extends Buffers { private final int size; @@ -674,21 +768,42 @@ public void pushValue(T value) { } } + /** + * This class represents an object which contains and manages multiple {@link Buffer} objects. + * + * @param The type of objects which the internal {@link Buffer} objects record. + */ private static class Buffers { private final Queue> buffers = new ConcurrentLinkedQueue>(); private final Observer> observer; + /** + * Constructs a new {@link Buffers} object for the specified {@link Observer}. + * + * @param observer + * The {@link Observer} to which this object will emit its internal + * {@link Buffer} objects to when requested. + */ public Buffers(Observer> observer) { this.observer = observer; } + /** + * This method will instantiate a new {@link Buffer} object and register it internally. + * + * @return + * The constructed empty {@link Buffer} object. + */ public Buffer createBuffer() { Buffer buffer = new Buffer(); buffers.add(buffer); return buffer; } + /** + * This method emits all not yet emitted {@link Buffer} objects. + */ public void emitAllBuffers() { Buffer buffer; while ((buffer = buffers.poll()) != null) { @@ -696,6 +811,12 @@ public void emitAllBuffers() { } } + /** + * This method emits the specified {@link Buffer} object. + * + * @param buffer + * The {@link Buffer} to emit. + */ public void emitBuffer(Buffer buffer) { if (!buffers.remove(buffer)) { // Concurrency issue: Buffer is already emitted! @@ -704,10 +825,20 @@ public void emitBuffer(Buffer buffer) { observer.onNext(buffer.getContents()); } + /** + * @return + * The oldest (in case there are multiple) {@link Buffer} object. + */ public Buffer getBuffer() { return buffers.peek(); } + /** + * This method pushes a value to all not yet emitted {@link Buffer} objects. + * + * @param value + * The value to push to all not yet emitted {@link Buffer} objects. + */ public void pushValue(T value) { List> copy = new ArrayList>(buffers); for (Buffer buffer : copy) { @@ -716,47 +847,34 @@ public void pushValue(T value) { } } + /** + * This class represents a single buffer: A sequence of recorded values. + * + * @param The type of objects which this {@link Buffer} can hold. + */ private static class Buffer { private final List contents = new ArrayList(); + /** + * Appends a specified value to the {@link Buffer}. + * + * @param value + * The value to append to the {@link Buffer}. + */ public void pushValue(T value) { contents.add(value); } + /** + * @return + * The mutable underlying {@link List} which contains all the + * recorded values in this {@link Buffer} object. + */ public List getContents() { return contents; } } - public interface BufferOpening { - // Tagging interface for objects which can open buffers. - } - - public interface BufferClosing { - // Tagging interface for objects which can close buffers. - } - - public static class BufferOpenings { - - public static BufferOpening create() { - return new BufferOpening() {}; - } - - private BufferOpenings() { - // Prevent instantation. - } - } - public static class BufferClosings { - - public static BufferClosing create() { - return new BufferClosing() {}; - } - - private BufferClosings() { - // Prevent instantation. - } - } - public static class UnitTest { private Observer> observer; diff --git a/rxjava-core/src/main/java/rx/util/BufferClosing.java b/rxjava-core/src/main/java/rx/util/BufferClosing.java new file mode 100644 index 0000000000..d77e133674 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/BufferClosing.java @@ -0,0 +1,5 @@ +package rx.util; + +public interface BufferClosing { + // Tagging interface for objects which can close buffers. +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/BufferClosings.java b/rxjava-core/src/main/java/rx/util/BufferClosings.java new file mode 100644 index 0000000000..e4674f5c1e --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/BufferClosings.java @@ -0,0 +1,12 @@ +package rx.util; + +public class BufferClosings { + + public static BufferClosing create() { + return new BufferClosing() {}; + } + + private BufferClosings() { + // Prevent instantation. + } +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/BufferOpening.java b/rxjava-core/src/main/java/rx/util/BufferOpening.java new file mode 100644 index 0000000000..6c45202dd8 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/BufferOpening.java @@ -0,0 +1,5 @@ +package rx.util; + +public interface BufferOpening { + // Tagging interface for objects which can open buffers. +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/BufferOpenings.java b/rxjava-core/src/main/java/rx/util/BufferOpenings.java new file mode 100644 index 0000000000..4409b774ce --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/BufferOpenings.java @@ -0,0 +1,12 @@ +package rx.util; + +public class BufferOpenings { + + public static BufferOpening create() { + return new BufferOpening() {}; + } + + private BufferOpenings() { + // Prevent instantation. + } +} \ No newline at end of file From 5ded8249023cab886098d31cc354069ba7107cd0 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Thu, 11 Jul 2013 23:07:30 +0200 Subject: [PATCH 7/7] Improved buffer operation test cases with checks for empty buffers. --- .../src/main/java/rx/operators/OperationBuffer.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index d41fe41904..5ecb31a0a7 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -926,6 +926,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two", "three")); inOrder.verify(observer, Mockito.times(1)).onNext(list("two", "three", "four")); inOrder.verify(observer, Mockito.times(1)).onNext(list("three", "four", "five")); + inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); inOrder.verify(observer, Mockito.never()).onCompleted(); } @@ -951,6 +952,7 @@ public Subscription call(Observer observer) { InOrder inOrder = Mockito.inOrder(observer); inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two", "three")); inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five")); + inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); inOrder.verify(observer, Mockito.times(1)).onCompleted(); } @@ -976,6 +978,7 @@ public Subscription call(Observer observer) { InOrder inOrder = Mockito.inOrder(observer); inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two")); inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five")); + inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); inOrder.verify(observer, Mockito.times(1)).onCompleted(); } @@ -1007,6 +1010,7 @@ public Subscription call(Observer observer) { scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); inOrder.verify(observer, Mockito.times(1)).onNext(list("five")); + inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); inOrder.verify(observer, Mockito.times(1)).onCompleted(); } @@ -1035,6 +1039,7 @@ public Subscription call(Observer observer) { scheduler.advanceTimeTo(201, TimeUnit.MILLISECONDS); inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five")); + inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); inOrder.verify(observer, Mockito.times(1)).onCompleted(); } @@ -1085,6 +1090,7 @@ public Subscription call(Observer observer) { scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS); inOrder.verify(observer, Mockito.times(1)).onNext(list("two", "three")); inOrder.verify(observer, Mockito.times(1)).onNext(list("five")); + inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); inOrder.verify(observer, Mockito.times(1)).onCompleted(); } @@ -1099,7 +1105,7 @@ public Subscription call(Observer observer) { push(observer, "three", 110); push(observer, "four", 160); push(observer, "five", 210); - complete(observer, 500); + complete(observer, 250); return Subscriptions.empty(); } }); @@ -1126,6 +1132,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two")); inOrder.verify(observer, Mockito.times(1)).onNext(list("three", "four")); inOrder.verify(observer, Mockito.times(1)).onNext(list("five")); + inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); inOrder.verify(observer, Mockito.times(1)).onCompleted(); }