Skip to content

Commit 7ba9067

Browse files
author
Aaron Tull
committed
Merge pull request #3562 from artem-zinnatullin/single-do-on-unsubscribe
Add Single.doOnUnsubscribe()
2 parents 9d4f9c3 + ebad70d commit 7ba9067

File tree

2 files changed

+85
-0
lines changed

2 files changed

+85
-0
lines changed

src/main/java/rx/Single.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import rx.annotations.Experimental;
2222
import rx.exceptions.Exceptions;
2323
import rx.exceptions.OnErrorNotImplementedException;
24+
import rx.functions.Action0;
2425
import rx.functions.Action1;
2526
import rx.functions.Func1;
2627
import rx.functions.Func2;
@@ -34,6 +35,7 @@
3435
import rx.internal.operators.OnSubscribeToObservableFuture;
3536
import rx.internal.operators.OperatorDelay;
3637
import rx.internal.operators.OperatorDoOnEach;
38+
import rx.internal.operators.OperatorDoOnUnsubscribe;
3739
import rx.internal.operators.OperatorMap;
3840
import rx.internal.operators.OperatorObserveOn;
3941
import rx.internal.operators.OperatorOnErrorReturn;
@@ -1998,4 +2000,24 @@ public void call(SingleSubscriber<? super T> singleSubscriber) {
19982000
}
19992001
});
20002002
}
2003+
2004+
/**
2005+
* Modifies the source {@link Single} so that it invokes the given action when it is unsubscribed from
2006+
* its subscribers.
2007+
* <p>
2008+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnUnsubscribe.png" alt="">
2009+
* <dl>
2010+
* <dt><b>Scheduler:</b></dt>
2011+
* <dd>{@code doOnUnsubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
2012+
* </dl>
2013+
*
2014+
* @param action
2015+
* the action that gets called when this {@link Single} is unsubscribed.
2016+
* @return the source {@link Single} modified so as to call this Action when appropriate.
2017+
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
2018+
*/
2019+
@Experimental
2020+
public final Single<T> doOnUnsubscribe(final Action0 action) {
2021+
return lift(new OperatorDoOnUnsubscribe<T>(action));
2022+
}
20012023
}

src/test/java/rx/SingleTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.mockito.stubbing.Answer;
4141
import rx.Single.OnSubscribe;
4242
import rx.exceptions.CompositeException;
43+
import rx.functions.Action;
4344
import rx.functions.Action0;
4445
import rx.functions.Action1;
4546
import rx.functions.Func1;
@@ -828,4 +829,66 @@ public void deferShouldPassNullPointerExceptionToTheSubscriberIfSingleFactoryRet
828829

829830
verify(singleFactory).call();
830831
}
832+
833+
@Test
834+
public void doOnUnsubscribeShouldInvokeActionAfterSuccess() {
835+
Action0 action = mock(Action0.class);
836+
837+
Single<String> single = Single
838+
.just("test")
839+
.doOnUnsubscribe(action);
840+
841+
verifyZeroInteractions(action);
842+
843+
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
844+
single.subscribe(testSubscriber);
845+
846+
testSubscriber.assertValue("test");
847+
testSubscriber.assertCompleted();
848+
849+
verify(action).call();
850+
}
851+
852+
@Test
853+
public void doOnUnsubscribeShouldInvokeActionAfterError() {
854+
Action0 action = mock(Action0.class);
855+
856+
Single<Object> single = Single
857+
.error(new RuntimeException("test"))
858+
.doOnUnsubscribe(action);
859+
860+
verifyZeroInteractions(action);
861+
862+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
863+
single.subscribe(testSubscriber);
864+
865+
testSubscriber.assertError(RuntimeException.class);
866+
assertEquals("test", testSubscriber.getOnErrorEvents().get(0).getMessage());
867+
868+
verify(action).call();
869+
}
870+
871+
@Test
872+
public void doOnUnsubscribeShouldInvokeActionAfterExplicitUnsubscription() {
873+
Action0 action = mock(Action0.class);
874+
875+
Single<Object> single = Single
876+
.create(new OnSubscribe<Object>() {
877+
@Override
878+
public void call(SingleSubscriber<? super Object> singleSubscriber) {
879+
// Broken Single that never ends itself (simulates long computation in one thread).
880+
}
881+
})
882+
.doOnUnsubscribe(action);
883+
884+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
885+
Subscription subscription = single.subscribe(testSubscriber);
886+
887+
verifyZeroInteractions(action);
888+
889+
subscription.unsubscribe();
890+
verify(action).call();
891+
testSubscriber.assertNoValues();
892+
testSubscriber.assertNoTerminalEvent();
893+
}
831894
}

0 commit comments

Comments
 (0)