From 2f591901ec57dd79bec54fc6cc1ba5caa04cf017 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 21 Aug 2019 22:25:19 +0200 Subject: [PATCH 1/2] 2.x: Fix refCount() not resetting when cross-canceled --- .../operators/flowable/FlowableRefCount.java | 40 ++++++++++++++----- .../observable/ObservableRefCount.java | 40 ++++++++++++++----- .../flowable/FlowableRefCountAltTest.java | 19 +++++++++ .../flowable/FlowableRefCountTest.java | 20 ++++++++++ .../observable/ObservableRefCountAltTest.java | 19 +++++++++ .../observable/ObservableRefCountTest.java | 21 +++++++++- 6 files changed, 138 insertions(+), 21 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java index 02ed97b462..2da1306632 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java @@ -115,22 +115,42 @@ void cancel(RefConnection rc) { void terminated(RefConnection rc) { synchronized (this) { - if (connection != null && connection == rc) { - connection = null; - if (rc.timer != null) { - rc.timer.dispose(); + if (source instanceof FlowablePublishClassic) { + if (connection != null && connection == rc) { + connection = null; + clearTimer(rc); } - } - if (--rc.subscriberCount == 0) { - if (source instanceof Disposable) { - ((Disposable)source).dispose(); - } else if (source instanceof ResettableConnectable) { - ((ResettableConnectable)source).resetIf(rc.get()); + + if (--rc.subscriberCount == 0) { + reset(rc); + } + } else { + if (connection != null && connection == rc) { + clearTimer(rc); + if (--rc.subscriberCount == 0) { + connection = null; + reset(rc); + } } } } } + void clearTimer(RefConnection rc) { + if (rc.timer != null) { + rc.timer.dispose(); + rc.timer = null; + } + } + + void reset(RefConnection rc) { + if (source instanceof Disposable) { + ((Disposable)source).dispose(); + } else if (source instanceof ResettableConnectable) { + ((ResettableConnectable)source).resetIf(rc.get()); + } + } + void timeout(RefConnection rc) { synchronized (this) { if (rc.subscriberCount == 0 && rc == connection) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java index 5306f4481d..27e633c664 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java @@ -112,22 +112,42 @@ void cancel(RefConnection rc) { void terminated(RefConnection rc) { synchronized (this) { - if (connection != null && connection == rc) { - connection = null; - if (rc.timer != null) { - rc.timer.dispose(); + if (source instanceof ObservablePublishClassic) { + if (connection != null && connection == rc) { + connection = null; + clearTimer(rc); } - } - if (--rc.subscriberCount == 0) { - if (source instanceof Disposable) { - ((Disposable)source).dispose(); - } else if (source instanceof ResettableConnectable) { - ((ResettableConnectable)source).resetIf(rc.get()); + + if (--rc.subscriberCount == 0) { + reset(rc); + } + } else { + if (connection != null && connection == rc) { + clearTimer(rc); + if (--rc.subscriberCount == 0) { + connection = null; + reset(rc); + } } } } } + void clearTimer(RefConnection rc) { + if (rc.timer != null) { + rc.timer.dispose(); + rc.timer = null; + } + } + + void reset(RefConnection rc) { + if (source instanceof Disposable) { + ((Disposable)source).dispose(); + } else if (source instanceof ResettableConnectable) { + ((ResettableConnectable)source).resetIf(rc.get()); + } + } + void timeout(RefConnection rc) { synchronized (this) { if (rc.subscriberCount == 0 && rc == connection) { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountAltTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountAltTest.java index 2ee23b2dc1..c8eca05dca 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountAltTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountAltTest.java @@ -1443,4 +1443,23 @@ public void publishRefCountShallBeThreadSafe() { .assertComplete(); } } + + @Test + public void upstreamTerminationTriggersAnotherCancel() throws Exception { + ReplayProcessor rp = ReplayProcessor.create(); + rp.onNext(1); + rp.onComplete(); + + Flowable shared = rp.share(); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index 3cb5f57fb0..c032e61da5 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.flowable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.io.IOException; @@ -1436,4 +1437,23 @@ public void disconnectBeforeConnect() { flowable.take(1).test().assertResult(2); } + + @Test + public void upstreamTerminationTriggersAnotherCancel() throws Exception { + ReplayProcessor rp = ReplayProcessor.create(); + rp.onNext(1); + rp.onComplete(); + + Flowable shared = rp.share(); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountAltTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountAltTest.java index 05aada6b84..0f675bd15d 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountAltTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountAltTest.java @@ -1399,4 +1399,23 @@ public void publishRefCountShallBeThreadSafe() { .assertComplete(); } } + + @Test + public void upstreamTerminationTriggersAnotherCancel() throws Exception { + ReplaySubject rs = ReplaySubject.create(); + rs.onNext(1); + rs.onComplete(); + + Observable shared = rs.share(); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index 99a2a79f79..e631f60bf6 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -532,7 +532,7 @@ public Integer apply(Integer t1, Integer t2) { to2.assertValue(30); } - @Test(timeout = 10000) + @Test//(timeout = 10000) public void testUpstreamErrorAllowsRetry() throws InterruptedException { final AtomicInteger intervalSubscribed = new AtomicInteger(); Observable interval = @@ -1380,4 +1380,23 @@ public void disconnectBeforeConnect() { observable.take(1).test().assertResult(2); } + + @Test + public void upstreamTerminationTriggersAnotherCancel() throws Exception { + ReplaySubject rs = ReplaySubject.create(); + rs.onNext(1); + rs.onComplete(); + + Observable shared = rs.share(); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + } } From 29737584c458bb635984c03382809697c5d656a4 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Thu, 22 Aug 2019 19:58:06 +0200 Subject: [PATCH 2/2] Undo test timeout comment --- .../internal/operators/observable/ObservableRefCountTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index e631f60bf6..485afc54e1 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -532,7 +532,7 @@ public Integer apply(Integer t1, Integer t2) { to2.assertValue(30); } - @Test//(timeout = 10000) + @Test(timeout = 10000) public void testUpstreamErrorAllowsRetry() throws InterruptedException { final AtomicInteger intervalSubscribed = new AtomicInteger(); Observable interval =