Skip to content

Commit 875e232

Browse files
committed
Merge pull request #3757 from kboyarshinov/sample_operator_termination
1.x: Operator sample emits last sampled value before termination.
2 parents 686231b + 3e307fe commit 875e232

File tree

3 files changed

+41
-3
lines changed

3 files changed

+41
-3
lines changed

src/main/java/rx/internal/operators/OperatorSampleWithObservable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void onError(Throwable e) {
6666

6767
@Override
6868
public void onCompleted() {
69-
// onNext(null); // emit the very last value?
69+
onNext(null);
7070
s.onCompleted();
7171
// no need to null check, main is assigned before any of the two gets subscribed
7272
main.get().unsubscribe();
@@ -88,7 +88,7 @@ public void onError(Throwable e) {
8888

8989
@Override
9090
public void onCompleted() {
91-
// samplerSub.onNext(null); // emit the very last value?
91+
samplerSub.onNext(null);
9292
s.onCompleted();
9393

9494
samplerSub.unsubscribe();

src/main/java/rx/internal/operators/OperatorSampleWithTime.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,17 @@ public void onError(Throwable e) {
8989

9090
@Override
9191
public void onCompleted() {
92+
emitIfNonEmpty();
9293
subscriber.onCompleted();
9394
unsubscribe();
9495
}
9596

9697
@Override
9798
public void call() {
99+
emitIfNonEmpty();
100+
}
101+
102+
private void emitIfNonEmpty() {
98103
Object localValue = value.getAndSet(EMPTY_TOKEN);
99104
if (localValue != EMPTY_TOKEN) {
100105
try {

src/test/java/rx/internal/operators/OperatorSampleTest.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,39 @@ public void call() {
109109
verify(observer, never()).onError(any(Throwable.class));
110110
}
111111

112+
@Test
113+
public void sampleWithTimeEmitAndTerminate() {
114+
Observable<Long> source = Observable.create(new OnSubscribe<Long>() {
115+
@Override
116+
public void call(final Subscriber<? super Long> observer1) {
117+
innerScheduler.schedule(new Action0() {
118+
@Override
119+
public void call() {
120+
observer1.onNext(1L);
121+
}
122+
}, 1, TimeUnit.SECONDS);
123+
innerScheduler.schedule(new Action0() {
124+
@Override
125+
public void call() {
126+
observer1.onNext(2L);
127+
observer1.onCompleted();
128+
}
129+
}, 2, TimeUnit.SECONDS);
130+
}
131+
});
132+
133+
Observable<Long> sampled = source.sample(400L, TimeUnit.MILLISECONDS, scheduler);
134+
sampled.subscribe(observer);
135+
136+
InOrder inOrder = inOrder(observer);
137+
138+
scheduler.advanceTimeTo(2000L, TimeUnit.MILLISECONDS);
139+
inOrder.verify(observer, times(1)).onNext(1L);
140+
inOrder.verify(observer, times(1)).onNext(2L);
141+
verify(observer, times(1)).onCompleted();
142+
verify(observer, never()).onError(any(Throwable.class));
143+
}
144+
112145
@Test
113146
public void sampleWithSamplerNormal() {
114147
PublishSubject<Integer> source = PublishSubject.create();
@@ -208,7 +241,7 @@ public void sampleWithSamplerEmitAndTerminate() {
208241
InOrder inOrder = inOrder(observer2);
209242
inOrder.verify(observer2, never()).onNext(1);
210243
inOrder.verify(observer2, times(1)).onNext(2);
211-
inOrder.verify(observer2, never()).onNext(3);
244+
inOrder.verify(observer2, times(1)).onNext(3);
212245
inOrder.verify(observer2, times(1)).onCompleted();
213246
inOrder.verify(observer2, never()).onNext(any());
214247
verify(observer, never()).onError(any(Throwable.class));

0 commit comments

Comments
 (0)