Skip to content

Commit 3eea8ad

Browse files
Merge pull request ReactiveX#571 from akarnokd/SampleWithObservable2
Operation Sample with Observable v2
2 parents 69e8e8c + f3f3ef9 commit 3eea8ad

File tree

3 files changed

+263
-2
lines changed

3 files changed

+263
-2
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4463,7 +4463,22 @@ public Observable<T> sample(long period, TimeUnit unit) {
44634463
public Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
44644464
return create(OperationSample.sample(this, period, unit, scheduler));
44654465
}
4466-
4466+
4467+
/**
4468+
* Return an Observable that emits the results of sampling the items
4469+
* emitted by this Observable when the <code>sampler</code>
4470+
* Observable produces an item or completes.
4471+
*
4472+
* @param sampler the Observable to use for sampling this
4473+
*
4474+
* @return an Observable that emits the results of sampling the items
4475+
* emitted by this Observable when the <code>sampler</code>
4476+
* Observable produces an item or completes.
4477+
*/
4478+
public <U> Observable<T> sample(Observable<U> sampler) {
4479+
return create(new OperationSample.SampleWithObservable<T, U>(this, sampler));
4480+
}
4481+
44674482
/**
44684483
* Returns an Observable that applies a function of your choosing to the
44694484
* first item emitted by a source Observable, then feeds the result of that

rxjava-core/src/main/java/rx/operators/OperationSample.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import rx.Scheduler;
2626
import rx.Subscription;
2727
import rx.concurrency.Schedulers;
28+
import rx.subscriptions.CompositeSubscription;
29+
import rx.subscriptions.SerialSubscription;
2830
import rx.subscriptions.Subscriptions;
2931
import rx.util.functions.Action0;
3032

@@ -115,4 +117,91 @@ public void call() {
115117
});
116118
}
117119
}
120+
/**
121+
* Sample with the help of another observable.
122+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229742.aspx'>MSDN: Observable.Sample</a>
123+
*/
124+
public static class SampleWithObservable<T, U> implements OnSubscribeFunc<T> {
125+
final Observable<T> source;
126+
final Observable<U> sampler;
127+
public SampleWithObservable(Observable<T> source, Observable<U> sampler) {
128+
this.source = source;
129+
this.sampler = sampler;
130+
}
131+
@Override
132+
public Subscription onSubscribe(Observer<? super T> t1) {
133+
return new ResultManager(t1).init();
134+
}
135+
/** Observe source values. */
136+
class ResultManager implements Observer<T> {
137+
final Observer<? super T> observer;
138+
final CompositeSubscription cancel;
139+
T value;
140+
boolean valueTaken = true;
141+
boolean done;
142+
final Object guard;
143+
public ResultManager(Observer<? super T> observer) {
144+
this.observer = observer;
145+
cancel = new CompositeSubscription();
146+
guard = new Object();
147+
}
148+
public Subscription init() {
149+
cancel.add(source.subscribe(this));
150+
cancel.add(sampler.subscribe(new Sampler()));
151+
152+
return cancel;
153+
}
154+
@Override
155+
public void onNext(T args) {
156+
synchronized (guard) {
157+
valueTaken = false;
158+
value = args;
159+
}
160+
}
161+
162+
@Override
163+
public void onError(Throwable e) {
164+
synchronized (guard) {
165+
if (!done) {
166+
done = true;
167+
observer.onError(e);
168+
cancel.unsubscribe();
169+
}
170+
}
171+
}
172+
173+
@Override
174+
public void onCompleted() {
175+
synchronized (guard) {
176+
if (!done) {
177+
done = true;
178+
observer.onCompleted();
179+
cancel.unsubscribe();
180+
}
181+
}
182+
}
183+
/** Take the latest value, but only once. */
184+
class Sampler implements Observer<U> {
185+
@Override
186+
public void onNext(U args) {
187+
synchronized (guard) {
188+
if (!valueTaken && !done) {
189+
valueTaken = true;
190+
observer.onNext(value);
191+
}
192+
}
193+
}
194+
195+
@Override
196+
public void onError(Throwable e) {
197+
ResultManager.this.onError(e);
198+
}
199+
200+
@Override
201+
public void onCompleted() {
202+
ResultManager.this.onCompleted();
203+
}
204+
}
205+
}
206+
}
118207
}

rxjava-core/src/test/java/rx/operators/OperationSampleTest.java

Lines changed: 158 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.mockito.Matchers.*;
1918
import static org.mockito.Mockito.*;
2019

2120
import java.util.concurrent.TimeUnit;
@@ -28,19 +27,22 @@
2827
import rx.Observer;
2928
import rx.Subscription;
3029
import rx.concurrency.TestScheduler;
30+
import rx.subjects.PublishSubject;
3131
import rx.subscriptions.Subscriptions;
3232
import rx.util.functions.Action0;
3333

3434
public class OperationSampleTest {
3535
private TestScheduler scheduler;
3636
private Observer<Long> observer;
37+
private Observer<Object> observer2;
3738

3839
@Before
3940
@SuppressWarnings("unchecked")
4041
// due to mocking
4142
public void before() {
4243
scheduler = new TestScheduler();
4344
observer = mock(Observer.class);
45+
observer2 = mock(Observer.class);
4446
}
4547

4648
@Test
@@ -105,4 +107,159 @@ public void call() {
105107
verify(observer, times(1)).onCompleted();
106108
verify(observer, never()).onError(any(Throwable.class));
107109
}
110+
@Test
111+
public void sampleWithSamplerNormal() {
112+
PublishSubject<Integer> source = PublishSubject.create();
113+
PublishSubject<Integer> sampler = PublishSubject.create();
114+
115+
Observable<Integer> m = source.sample(sampler);
116+
m.subscribe(observer2);
117+
118+
source.onNext(1);
119+
source.onNext(2);
120+
sampler.onNext(1);
121+
source.onNext(3);
122+
source.onNext(4);
123+
sampler.onNext(2);
124+
source.onCompleted();
125+
sampler.onNext(3);
126+
127+
128+
InOrder inOrder = inOrder(observer2);
129+
inOrder.verify(observer2, never()).onNext(1);
130+
inOrder.verify(observer2, times(1)).onNext(2);
131+
inOrder.verify(observer2, never()).onNext(3);
132+
inOrder.verify(observer2, times(1)).onNext(4);
133+
inOrder.verify(observer2, times(1)).onCompleted();
134+
verify(observer, never()).onError(any(Throwable.class));
135+
}
136+
@Test
137+
public void sampleWithSamplerNoDuplicates() {
138+
PublishSubject<Integer> source = PublishSubject.create();
139+
PublishSubject<Integer> sampler = PublishSubject.create();
140+
141+
Observable<Integer> m = source.sample(sampler);
142+
m.subscribe(observer2);
143+
144+
source.onNext(1);
145+
source.onNext(2);
146+
sampler.onNext(1);
147+
sampler.onNext(1);
148+
149+
source.onNext(3);
150+
source.onNext(4);
151+
sampler.onNext(2);
152+
sampler.onNext(2);
153+
154+
source.onCompleted();
155+
sampler.onNext(3);
156+
157+
158+
InOrder inOrder = inOrder(observer2);
159+
inOrder.verify(observer2, never()).onNext(1);
160+
inOrder.verify(observer2, times(1)).onNext(2);
161+
inOrder.verify(observer2, never()).onNext(3);
162+
inOrder.verify(observer2, times(1)).onNext(4);
163+
inOrder.verify(observer2, times(1)).onCompleted();
164+
verify(observer, never()).onError(any(Throwable.class));
165+
}
166+
@Test
167+
public void sampleWithSamplerTerminatingEarly() {
168+
PublishSubject<Integer> source = PublishSubject.create();
169+
PublishSubject<Integer> sampler = PublishSubject.create();
170+
171+
Observable<Integer> m = source.sample(sampler);
172+
m.subscribe(observer2);
173+
174+
source.onNext(1);
175+
source.onNext(2);
176+
sampler.onNext(1);
177+
sampler.onCompleted();
178+
179+
source.onNext(3);
180+
source.onNext(4);
181+
182+
183+
184+
InOrder inOrder = inOrder(observer2);
185+
inOrder.verify(observer2, never()).onNext(1);
186+
inOrder.verify(observer2, times(1)).onNext(2);
187+
inOrder.verify(observer2, times(1)).onCompleted();
188+
inOrder.verify(observer2, never()).onNext(any());
189+
verify(observer, never()).onError(any(Throwable.class));
190+
}
191+
@Test
192+
public void sampleWithSamplerEmitAndTerminate() {
193+
PublishSubject<Integer> source = PublishSubject.create();
194+
PublishSubject<Integer> sampler = PublishSubject.create();
195+
196+
Observable<Integer> m = source.sample(sampler);
197+
m.subscribe(observer2);
198+
199+
source.onNext(1);
200+
source.onNext(2);
201+
sampler.onNext(1);
202+
source.onNext(3);
203+
source.onCompleted();
204+
sampler.onNext(2);
205+
sampler.onCompleted();
206+
207+
InOrder inOrder = inOrder(observer2);
208+
inOrder.verify(observer2, never()).onNext(1);
209+
inOrder.verify(observer2, times(1)).onNext(2);
210+
inOrder.verify(observer2, never()).onNext(3);
211+
inOrder.verify(observer2, times(1)).onCompleted();
212+
inOrder.verify(observer2, never()).onNext(any());
213+
verify(observer, never()).onError(any(Throwable.class));
214+
}
215+
@Test
216+
public void sampleWithSamplerEmptySource() {
217+
PublishSubject<Integer> source = PublishSubject.create();
218+
PublishSubject<Integer> sampler = PublishSubject.create();
219+
220+
Observable<Integer> m = source.sample(sampler);
221+
m.subscribe(observer2);
222+
223+
source.onCompleted();
224+
sampler.onNext(1);
225+
226+
InOrder inOrder = inOrder(observer2);
227+
inOrder.verify(observer2, times(1)).onCompleted();
228+
verify(observer2, never()).onNext(any());
229+
verify(observer, never()).onError(any(Throwable.class));
230+
}
231+
@Test
232+
public void sampleWithSamplerSourceThrows() {
233+
PublishSubject<Integer> source = PublishSubject.create();
234+
PublishSubject<Integer> sampler = PublishSubject.create();
235+
236+
Observable<Integer> m = source.sample(sampler);
237+
m.subscribe(observer2);
238+
239+
source.onNext(1);
240+
source.onError(new RuntimeException("Forced failure!"));
241+
sampler.onNext(1);
242+
243+
InOrder inOrder = inOrder(observer2);
244+
inOrder.verify(observer2, times(1)).onError(any(Throwable.class));
245+
verify(observer2, never()).onNext(any());
246+
verify(observer, never()).onCompleted();
247+
}
248+
@Test
249+
public void sampleWithSamplerThrows() {
250+
PublishSubject<Integer> source = PublishSubject.create();
251+
PublishSubject<Integer> sampler = PublishSubject.create();
252+
253+
Observable<Integer> m = source.sample(sampler);
254+
m.subscribe(observer2);
255+
256+
source.onNext(1);
257+
sampler.onNext(1);
258+
sampler.onError(new RuntimeException("Forced failure!"));
259+
260+
InOrder inOrder = inOrder(observer2);
261+
inOrder.verify(observer2, times(1)).onNext(1);
262+
inOrder.verify(observer2, times(1)).onError(any(RuntimeException.class));
263+
verify(observer, never()).onCompleted();
264+
}
108265
}

0 commit comments

Comments
 (0)