Skip to content

Commit 3267aed

Browse files
hexmindRobWin
authored andcommitted
Issue ReactiveX#235: TimeLimiter as a Reactor operator (ReactiveX#636)
1 parent d6a1833 commit 3267aed

File tree

3 files changed

+256
-0
lines changed

3 files changed

+256
-0
lines changed

resilience4j-reactor/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
dependencies {
22
compileOnly project(':resilience4j-circuitbreaker')
33
compileOnly project(':resilience4j-ratelimiter')
4+
compileOnly project(':resilience4j-timelimiter')
45
compileOnly project(':resilience4j-bulkhead')
56
compileOnly project(':resilience4j-retry')
67
compileOnly (libraries.reactor)
78
testCompile project(':resilience4j-test')
89
testCompile project(':resilience4j-circuitbreaker')
910
testCompile project(':resilience4j-ratelimiter')
11+
testCompile project(':resilience4j-timelimiter')
1012
testCompile project(':resilience4j-bulkhead')
1113
testCompile project(':resilience4j-retry')
1214
testCompile (libraries.reactor)
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2019 authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.github.resilience4j.reactor.timelimiter;
17+
18+
import io.github.resilience4j.reactor.IllegalPublisherException;
19+
import io.github.resilience4j.timelimiter.TimeLimiter;
20+
import reactor.core.publisher.Flux;
21+
import reactor.core.publisher.Mono;
22+
23+
import java.time.Duration;
24+
import java.util.function.UnaryOperator;
25+
26+
import org.reactivestreams.Publisher;
27+
28+
/**
29+
* A Reactor TimeLimiter operator which wraps a reactive type in a TimeLimiter.
30+
*
31+
* @param <T> the value type of the upstream and downstream
32+
*/
33+
public class TimeLimiterOperator<T> implements UnaryOperator<Publisher<T>> {
34+
35+
private final TimeLimiter timeLimiter;
36+
37+
private TimeLimiterOperator(TimeLimiter timeLimiter) {
38+
this.timeLimiter = timeLimiter;
39+
}
40+
41+
/**
42+
* Creates a timeLimiter.
43+
*
44+
* @param <T> the value type of the upstream and downstream
45+
* @param timeLimiter the timeLimiter
46+
* @return a TimeLimiterOperator
47+
*/
48+
public static <T> TimeLimiterOperator<T> of(TimeLimiter timeLimiter) {
49+
return new TimeLimiterOperator<>(timeLimiter);
50+
}
51+
52+
@Override
53+
public Publisher<T> apply(Publisher<T> publisher) {
54+
if (publisher instanceof Mono) {
55+
return withTimeout((Mono<T>) publisher);
56+
} else if (publisher instanceof Flux) {
57+
return withTimeout((Flux<T>) publisher);
58+
} else {
59+
throw new IllegalPublisherException(publisher);
60+
}
61+
}
62+
63+
private Publisher<T> withTimeout(Mono<T> upstream) {
64+
return upstream.timeout(getTimeout())
65+
.doOnNext(t -> timeLimiter.onSuccess())
66+
.doOnSuccess(t -> timeLimiter.onSuccess())
67+
.doOnError(timeLimiter::onError);
68+
}
69+
70+
private Publisher<T> withTimeout(Flux<T> upstream) {
71+
return upstream.timeout(getTimeout())
72+
.doOnNext(t -> timeLimiter.onSuccess())
73+
.doOnComplete(timeLimiter::onSuccess)
74+
.doOnError(timeLimiter::onError);
75+
}
76+
77+
private Duration getTimeout() {
78+
return timeLimiter.getTimeLimiterConfig().getTimeoutDuration();
79+
}
80+
81+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright 2019 authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.github.resilience4j.reactor.timelimiter;
18+
19+
import io.github.resilience4j.test.HelloWorldService;
20+
import io.github.resilience4j.timelimiter.TimeLimiter;
21+
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
22+
import reactor.core.publisher.Flux;
23+
import reactor.core.publisher.Mono;
24+
import reactor.test.StepVerifier;
25+
26+
import java.time.Duration;
27+
import java.util.concurrent.TimeoutException;
28+
29+
import org.junit.Test;
30+
31+
import static org.mockito.ArgumentMatchers.any;
32+
import static org.mockito.BDDMockito.given;
33+
import static org.mockito.BDDMockito.then;
34+
import static org.mockito.Mockito.mock;
35+
import static org.mockito.Mockito.times;
36+
37+
38+
public class TimeLimiterOperatorTest {
39+
40+
private final TimeLimiter timeLimiter = mock(TimeLimiter.class);
41+
private final HelloWorldService helloWorldService = mock(HelloWorldService.class);
42+
43+
@Test
44+
public void doNotTimeoutUsingMono() {
45+
given(timeLimiter.getTimeLimiterConfig())
46+
.willReturn(toConfig(Duration.ofMinutes(1)));
47+
given(helloWorldService.returnHelloWorld())
48+
.willReturn("Hello world");
49+
50+
Mono<?> mono = Mono.fromCallable(helloWorldService::returnHelloWorld)
51+
.compose(TimeLimiterOperator.of(timeLimiter));
52+
53+
StepVerifier.create(mono)
54+
.expectNextCount(1)
55+
.verifyComplete();
56+
then(timeLimiter).should(times(2))
57+
.onSuccess();
58+
}
59+
60+
@Test
61+
public void timeoutUsingMono() {
62+
given(timeLimiter.getTimeLimiterConfig())
63+
.willReturn(toConfig(Duration.ofMillis(1)));
64+
65+
Mono<?> mono = Mono.delay(Duration.ofMinutes(1))
66+
.compose(TimeLimiterOperator.of(timeLimiter));
67+
68+
StepVerifier.create(mono)
69+
.expectError(TimeoutException.class)
70+
.verify(Duration.ofMinutes(1));
71+
then(timeLimiter).should()
72+
.onError(any(TimeoutException.class));
73+
}
74+
75+
@Test
76+
public void timeoutNeverUsingMono() {
77+
given(timeLimiter.getTimeLimiterConfig())
78+
.willReturn(toConfig(Duration.ofMillis(1)));
79+
80+
Mono<?> flux = Mono.never()
81+
.compose(TimeLimiterOperator.of(timeLimiter));
82+
83+
StepVerifier.create(flux)
84+
.expectError(TimeoutException.class)
85+
.verify(Duration.ofMinutes(1));
86+
then(timeLimiter).should()
87+
.onError(any(TimeoutException.class));
88+
}
89+
90+
@Test
91+
public void otherErrorUsingMono() {
92+
given(timeLimiter.getTimeLimiterConfig())
93+
.willReturn(toConfig(Duration.ofMinutes(1)));
94+
given(helloWorldService.returnHelloWorld())
95+
.willThrow(new Error("BAM!"));
96+
97+
Mono<?> mono = Mono.fromCallable(helloWorldService::returnHelloWorld)
98+
.compose(TimeLimiterOperator.of(timeLimiter));
99+
100+
StepVerifier.create(mono)
101+
.expectError(Error.class)
102+
.verify(Duration.ofMinutes(1));
103+
then(timeLimiter).should()
104+
.onError(any(Error.class));
105+
}
106+
107+
@Test
108+
public void doNotTimeoutUsingFlux() {
109+
given(timeLimiter.getTimeLimiterConfig())
110+
.willReturn(toConfig(Duration.ofMinutes(1)));
111+
112+
Flux<?> flux = Flux.interval(Duration.ofMillis(1))
113+
.take(2)
114+
.compose(TimeLimiterOperator.of(timeLimiter));
115+
116+
StepVerifier.create(flux)
117+
.expectNextCount(2)
118+
.verifyComplete();
119+
then(timeLimiter).should(times(3))
120+
.onSuccess();
121+
}
122+
123+
@Test
124+
public void timeoutUsingFlux() {
125+
given(timeLimiter.getTimeLimiterConfig())
126+
.willReturn(toConfig(Duration.ofMillis(1)));
127+
128+
Flux<?> flux = Flux.interval(Duration.ofSeconds(1))
129+
.compose(TimeLimiterOperator.of(timeLimiter));
130+
131+
StepVerifier.create(flux)
132+
.expectError(TimeoutException.class)
133+
.verify(Duration.ofMinutes(1));
134+
then(timeLimiter).should()
135+
.onError(any(TimeoutException.class));
136+
}
137+
138+
@Test
139+
public void timeoutNeverUsingFlux() {
140+
given(timeLimiter.getTimeLimiterConfig())
141+
.willReturn(toConfig(Duration.ofMillis(1)));
142+
143+
Flux<?> flux = Flux.never()
144+
.compose(TimeLimiterOperator.of(timeLimiter));
145+
146+
StepVerifier.create(flux)
147+
.expectError(TimeoutException.class)
148+
.verify(Duration.ofMinutes(1));
149+
then(timeLimiter).should()
150+
.onError(any(TimeoutException.class));
151+
}
152+
153+
@Test
154+
public void otherErrorUsingFlux() {
155+
given(timeLimiter.getTimeLimiterConfig())
156+
.willReturn(toConfig(Duration.ofMinutes(1)));
157+
158+
Flux<?> flux = Flux.error(new Error("BAM!"))
159+
.compose(TimeLimiterOperator.of(timeLimiter));
160+
161+
StepVerifier.create(flux)
162+
.expectError(Error.class)
163+
.verify(Duration.ofMinutes(1));
164+
then(timeLimiter).should()
165+
.onError(any(Error.class));
166+
}
167+
168+
private TimeLimiterConfig toConfig(Duration timeout) {
169+
return TimeLimiterConfig.custom()
170+
.timeoutDuration(timeout)
171+
.build();
172+
}
173+
}

0 commit comments

Comments
 (0)