Skip to content

Commit b8d39d8

Browse files
hexmindRobWin
authored andcommitted
Issue ReactiveX#235: TimeLimiter as rx transformer (ReactiveX#624)
1 parent 1a26679 commit b8d39d8

File tree

9 files changed

+796
-0
lines changed

9 files changed

+796
-0
lines changed

resilience4j-rxjava2/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
dependencies {
22
compileOnly project(':resilience4j-circuitbreaker')
33
compileOnly project(':resilience4j-ratelimiter')
4+
compileOnly project(':resilience4j-timelimiter')
45
compileOnly project(':resilience4j-bulkhead')
56
compileOnly project(':resilience4j-retry')
67
compileOnly ( libraries.rxjava2)
78
testCompile project(':resilience4j-test')
89
testCompile project(':resilience4j-circuitbreaker')
910
testCompile project(':resilience4j-ratelimiter')
11+
testCompile project(':resilience4j-timelimiter')
1012
testCompile project(':resilience4j-bulkhead')
1113
testCompile project(':resilience4j-retry')
1214
testCompile ( libraries.rxjava2)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2019 authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.github.resilience4j.timelimiter.transformer;
18+
19+
import io.github.resilience4j.timelimiter.TimeLimiter;
20+
import io.reactivex.Completable;
21+
import io.reactivex.CompletableSource;
22+
import io.reactivex.CompletableTransformer;
23+
import io.reactivex.Flowable;
24+
import io.reactivex.FlowableTransformer;
25+
import io.reactivex.Maybe;
26+
import io.reactivex.MaybeSource;
27+
import io.reactivex.MaybeTransformer;
28+
import io.reactivex.Observable;
29+
import io.reactivex.ObservableSource;
30+
import io.reactivex.ObservableTransformer;
31+
import io.reactivex.Single;
32+
import io.reactivex.SingleSource;
33+
import io.reactivex.SingleTransformer;
34+
35+
import java.util.concurrent.TimeUnit;
36+
37+
import org.reactivestreams.Publisher;
38+
39+
public class TimeLimiterTransformer<T> implements FlowableTransformer<T, T>, ObservableTransformer<T, T>,
40+
SingleTransformer<T, T>, CompletableTransformer, MaybeTransformer<T, T> {
41+
42+
private final TimeLimiter timeLimiter;
43+
44+
private TimeLimiterTransformer(TimeLimiter timeLimiter) {
45+
this.timeLimiter = timeLimiter;
46+
}
47+
48+
/**
49+
* Creates a TimeLimiterTransformer.
50+
*
51+
* @param timeLimiter the TimeLimiter
52+
* @param <T> the value type of the upstream and downstream
53+
* @return a TimeLimiterTransformer
54+
*/
55+
public static <T> TimeLimiterTransformer<T> of(TimeLimiter timeLimiter) {
56+
return new TimeLimiterTransformer<>(timeLimiter);
57+
}
58+
59+
@Override
60+
public Publisher<T> apply(Flowable<T> upstream) {
61+
return upstream
62+
.timeout(getTimeoutInMillis(), TimeUnit.MILLISECONDS)
63+
.doOnNext(t -> timeLimiter.onSuccess())
64+
.doOnComplete(timeLimiter::onSuccess)
65+
.doOnError(timeLimiter::onError);
66+
}
67+
68+
@Override
69+
public ObservableSource<T> apply(Observable<T> upstream) {
70+
return upstream
71+
.timeout(getTimeoutInMillis(), TimeUnit.MILLISECONDS)
72+
.doOnNext(t -> timeLimiter.onSuccess())
73+
.doOnComplete(timeLimiter::onSuccess)
74+
.doOnError(timeLimiter::onError);
75+
}
76+
77+
@Override
78+
public SingleSource<T> apply(Single<T> upstream) {
79+
return upstream
80+
.timeout(getTimeoutInMillis(), TimeUnit.MILLISECONDS)
81+
.doOnSuccess(t -> timeLimiter.onSuccess())
82+
.doOnError(timeLimiter::onError);
83+
}
84+
85+
@Override
86+
public CompletableSource apply(Completable upstream) {
87+
return upstream
88+
.timeout(getTimeoutInMillis(), TimeUnit.MILLISECONDS)
89+
.doOnComplete(timeLimiter::onSuccess)
90+
.doOnError(timeLimiter::onError);
91+
}
92+
93+
@Override
94+
public MaybeSource<T> apply(Maybe<T> upstream) {
95+
return upstream
96+
.timeout(getTimeoutInMillis(), TimeUnit.MILLISECONDS)
97+
.doOnSuccess(t -> timeLimiter.onSuccess())
98+
.doOnComplete(timeLimiter::onSuccess)
99+
.doOnError(timeLimiter::onError);
100+
}
101+
102+
private long getTimeoutInMillis() {
103+
return timeLimiter.getTimeLimiterConfig()
104+
.getTimeoutDuration()
105+
.toMillis();
106+
}
107+
108+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
*
3+
* Copyright 2019 authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*
18+
*/
19+
@NonNullApi
20+
@NonNullFields
21+
package io.github.resilience4j.timelimiter.transformer;
22+
23+
import io.github.resilience4j.core.lang.NonNullApi;
24+
import io.github.resilience4j.core.lang.NonNullFields;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.github.resilience4j;
2+
3+
import io.reactivex.plugins.RxJavaPlugins;
4+
import io.reactivex.schedulers.TestScheduler;
5+
6+
import org.junit.rules.TestRule;
7+
import org.junit.runner.Description;
8+
import org.junit.runners.model.Statement;
9+
10+
public class TestSchedulerRule implements TestRule {
11+
12+
private final TestScheduler testScheduler = new TestScheduler();
13+
14+
public TestScheduler getTestScheduler() {
15+
return testScheduler;
16+
}
17+
18+
@Override
19+
public Statement apply(final Statement statement, Description description) {
20+
return new Statement() {
21+
@Override
22+
public void evaluate() throws Throwable {
23+
RxJavaPlugins.setIoSchedulerHandler(scheduler -> testScheduler);
24+
RxJavaPlugins.setComputationSchedulerHandler(scheduler -> testScheduler);
25+
RxJavaPlugins.setNewThreadSchedulerHandler(scheduler -> testScheduler);
26+
try {
27+
statement.evaluate();
28+
} finally {
29+
RxJavaPlugins.reset();
30+
}
31+
}
32+
};
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2019 authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.github.resilience4j.timelimiter.transformer;
18+
19+
import io.github.resilience4j.TestSchedulerRule;
20+
import io.github.resilience4j.test.HelloWorldService;
21+
import io.github.resilience4j.timelimiter.TimeLimiter;
22+
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
23+
import io.reactivex.Completable;
24+
import io.reactivex.Maybe;
25+
import io.reactivex.observers.TestObserver;
26+
import io.reactivex.schedulers.TestScheduler;
27+
28+
import java.time.Duration;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
31+
32+
import org.junit.Rule;
33+
import org.junit.Test;
34+
35+
import static org.mockito.ArgumentMatchers.any;
36+
import static org.mockito.BDDMockito.given;
37+
import static org.mockito.BDDMockito.then;
38+
import static org.mockito.Mockito.mock;
39+
40+
public class TimeLimiterTransformerCompletableTest {
41+
42+
@Rule
43+
public final TestSchedulerRule testSchedulerRule = new TestSchedulerRule();
44+
private final TestScheduler testScheduler = testSchedulerRule.getTestScheduler();
45+
private final TimeLimiter timeLimiter = mock(TimeLimiter.class);
46+
private final HelloWorldService helloWorldService = mock(HelloWorldService.class);
47+
48+
@Test
49+
public void otherError() {
50+
given(helloWorldService.returnHelloWorld())
51+
.willThrow(new RuntimeException());
52+
given(timeLimiter.getTimeLimiterConfig())
53+
.willReturn(toConfig(Duration.ZERO));
54+
TestObserver<?> observer = Completable.fromRunnable(helloWorldService::returnHelloWorld)
55+
.compose(TimeLimiterTransformer.of(timeLimiter))
56+
.test();
57+
58+
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
59+
60+
observer.assertError(RuntimeException.class);
61+
then(timeLimiter).should()
62+
.onError(any(RuntimeException.class));
63+
}
64+
65+
@Test
66+
public void timeout() {
67+
given(timeLimiter.getTimeLimiterConfig())
68+
.willReturn(toConfig(Duration.ZERO));
69+
TestObserver<?> observer = Maybe.timer(1, TimeUnit.MINUTES)
70+
.flatMapCompletable(t -> Completable.complete())
71+
.compose(TimeLimiterTransformer.of(timeLimiter))
72+
.test();
73+
74+
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
75+
76+
observer.assertError(TimeoutException.class);
77+
then(timeLimiter).should()
78+
.onError(any(TimeoutException.class));
79+
}
80+
81+
@Test
82+
public void doNotTimeout() {
83+
given(helloWorldService.returnHelloWorld())
84+
.willReturn("hello");
85+
given(timeLimiter.getTimeLimiterConfig())
86+
.willReturn(toConfig(Duration.ofMinutes(1)));
87+
TestObserver<?> observer = Completable.fromRunnable(helloWorldService::returnHelloWorld)
88+
.compose(TimeLimiterTransformer.of(timeLimiter))
89+
.test();
90+
91+
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
92+
93+
observer.assertComplete();
94+
then(timeLimiter).should()
95+
.onSuccess();
96+
}
97+
98+
private TimeLimiterConfig toConfig(Duration timeout) {
99+
return TimeLimiterConfig.custom()
100+
.timeoutDuration(timeout)
101+
.build();
102+
}
103+
104+
}

0 commit comments

Comments
 (0)