Skip to content

Commit 9dec4b6

Browse files
RomehRobWin
authored andcommitted
Add response predicate to retry sync and async for enhancement ReactiveX#259
1 parent 657c81d commit 9dec4b6

File tree

7 files changed

+572
-284
lines changed

7 files changed

+572
-284
lines changed

resilience4j-retry/src/main/java/io/github/resilience4j/retry/AsyncRetry.java

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package io.github.resilience4j.retry;
22

3+
import java.util.concurrent.CompletableFuture;
4+
import java.util.concurrent.CompletionStage;
5+
import java.util.concurrent.ScheduledExecutorService;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.function.Supplier;
8+
39
import io.github.resilience4j.core.EventConsumer;
410
import io.github.resilience4j.retry.event.RetryEvent;
511
import io.github.resilience4j.retry.event.RetryOnErrorEvent;
@@ -8,12 +14,6 @@
814
import io.github.resilience4j.retry.event.RetryOnSuccessEvent;
915
import io.github.resilience4j.retry.internal.AsyncRetryImpl;
1016

11-
import java.util.concurrent.CompletableFuture;
12-
import java.util.concurrent.CompletionStage;
13-
import java.util.concurrent.ScheduledExecutorService;
14-
import java.util.concurrent.TimeUnit;
15-
import java.util.function.Supplier;
16-
1717
/**
1818
* A AsyncRetry instance is thread-safe can be used to decorate multiple requests.
1919
*/
@@ -110,7 +110,7 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(
110110
return () -> {
111111

112112
final CompletableFuture<T> promise = new CompletableFuture<>();
113-
final Runnable block = new AsyncRetryBlock<>(scheduler, retry.context(), supplier, promise);
113+
@SuppressWarnings("unchecked") final Runnable block = new AsyncRetryBlock<>(scheduler, retry.context(), supplier, promise);
114114
block.run();
115115

116116
return promise;
@@ -155,7 +155,7 @@ interface Metrics {
155155
long getNumberOfFailedCallsWithRetryAttempt();
156156
}
157157

158-
interface Context {
158+
interface Context<T> {
159159

160160
/**
161161
* Records a successful call.
@@ -168,6 +168,14 @@ interface Context {
168168
* @return delay in milliseconds until the next try
169169
*/
170170
long onError(Throwable throwable);
171+
172+
/**
173+
* check the result call.
174+
*
175+
* @param result the result to validate
176+
* @return delay in milliseconds until the next try if the result match the predicate
177+
*/
178+
long onResult(T result);
171179
}
172180

173181
/**
@@ -189,15 +197,15 @@ interface EventPublisher extends io.github.resilience4j.core.EventPublisher<Retr
189197

190198
class AsyncRetryBlock<T> implements Runnable {
191199
private final ScheduledExecutorService scheduler;
192-
private final AsyncRetry.Context retryContext;
200+
private final AsyncRetry.Context<T> retryContext;
193201
private final Supplier<CompletionStage<T>> supplier;
194202
private final CompletableFuture<T> promise;
195203

196204
AsyncRetryBlock(
197-
ScheduledExecutorService scheduler,
198-
AsyncRetry.Context retryContext,
199-
Supplier<CompletionStage<T>> supplier,
200-
CompletableFuture<T> promise
205+
ScheduledExecutorService scheduler,
206+
AsyncRetry.Context<T> retryContext,
207+
Supplier<CompletionStage<T>> supplier,
208+
CompletableFuture<T> promise
201209
) {
202210
this.scheduler = scheduler;
203211
this.retryContext = retryContext;
@@ -217,11 +225,10 @@ public void run() {
217225
}
218226

219227
stage.whenComplete((result, t) -> {
220-
if (t != null) {
228+
if (result != null) {
229+
onResult(result);
230+
} else if (t != null) {
221231
onError(t);
222-
} else {
223-
promise.complete(result);
224-
retryContext.onSuccess();
225232
}
226233
});
227234
}
@@ -235,4 +242,15 @@ private void onError(Throwable t) {
235242
scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
236243
}
237244
}
245+
246+
private void onResult(T result) {
247+
final long delay = retryContext.onResult(result);
248+
249+
if (delay < 1) {
250+
promise.complete(result);
251+
retryContext.onSuccess();
252+
} else {
253+
scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
254+
}
255+
}
238256
}

resilience4j-retry/src/main/java/io/github/resilience4j/retry/Retry.java

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
*/
1919
package io.github.resilience4j.retry;
2020

21+
import java.util.concurrent.Callable;
22+
import java.util.function.Function;
23+
import java.util.function.Supplier;
24+
2125
import io.github.resilience4j.core.EventConsumer;
2226
import io.github.resilience4j.retry.event.RetryEvent;
2327
import io.github.resilience4j.retry.event.RetryOnErrorEvent;
@@ -29,10 +33,6 @@
2933
import io.vavr.CheckedFunction1;
3034
import io.vavr.CheckedRunnable;
3135

32-
import java.util.concurrent.Callable;
33-
import java.util.function.Function;
34-
import java.util.function.Supplier;
35-
3636
/**
3737
* A Retry instance is thread-safe can be used to decorate multiple requests.
3838
* A Retry.
@@ -145,11 +145,15 @@ default void executeRunnable(Runnable runnable){
145145
*/
146146
static <T> CheckedFunction0<T> decorateCheckedSupplier(Retry retry, CheckedFunction0<T> supplier){
147147
return () -> {
148-
Retry.Context context = retry.context();
148+
@SuppressWarnings("unchecked")
149+
Retry.Context<T> context = retry.context();
149150
do try {
150151
T result = supplier.apply();
151-
context.onSuccess();
152-
return result;
152+
final boolean validationOfResult = context.onResult(result);
153+
if (!validationOfResult) {
154+
context.onSuccess();
155+
return result;
156+
}
153157
} catch (Exception exception) {
154158
context.onError(exception);
155159
} while (true);
@@ -189,11 +193,15 @@ static CheckedRunnable decorateCheckedRunnable(Retry retry, CheckedRunnable runn
189193
*/
190194
static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(Retry retry, CheckedFunction1<T, R> function){
191195
return (T t) -> {
192-
Retry.Context context = retry.context();
196+
@SuppressWarnings("unchecked")
197+
Retry.Context<R> context = retry.context();
193198
do try {
194-
R result = function.apply(t);
195-
context.onSuccess();
196-
return result;
199+
R result = function.apply(t);
200+
final boolean validationOfResult = context.onResult(result);
201+
if (!validationOfResult) {
202+
context.onSuccess();
203+
return result;
204+
}
197205
} catch (Exception exception) {
198206
context.onError(exception);
199207
} while (true);
@@ -211,11 +219,15 @@ static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(Retry retry, Checke
211219
*/
212220
static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier){
213221
return () -> {
214-
Retry.Context context = retry.context();
222+
@SuppressWarnings("unchecked")
223+
Retry.Context<T> context = retry.context();
215224
do try {
216-
T result = supplier.get();
217-
context.onSuccess();
218-
return result;
225+
T result = supplier.get();
226+
final boolean validationOfResult = context.onResult(result);
227+
if (!validationOfResult) {
228+
context.onSuccess();
229+
return result;
230+
}
219231
} catch (RuntimeException runtimeException) {
220232
context.onRuntimeError(runtimeException);
221233
} while (true);
@@ -233,11 +245,15 @@ static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier){
233245
*/
234246
static <T> Callable<T> decorateCallable(Retry retry, Callable<T> supplier){
235247
return () -> {
236-
Retry.Context context = retry.context();
248+
@SuppressWarnings("unchecked")
249+
Retry.Context<T> context = retry.context();
237250
do try {
238-
T result = supplier.call();
239-
context.onSuccess();
240-
return result;
251+
T result = supplier.call();
252+
final boolean validationOfResult = context.onResult(result);
253+
if (!validationOfResult) {
254+
context.onSuccess();
255+
return result;
256+
}
241257
} catch (RuntimeException runtimeException) {
242258
context.onRuntimeError(runtimeException);
243259
} while (true);
@@ -277,11 +293,15 @@ static Runnable decorateRunnable(Retry retry, Runnable runnable){
277293
*/
278294
static <T, R> Function<T, R> decorateFunction(Retry retry, Function<T, R> function){
279295
return (T t) -> {
280-
Retry.Context context = retry.context();
296+
@SuppressWarnings("unchecked")
297+
Retry.Context<R> context = retry.context();
281298
do try {
282-
R result = function.apply(t);
283-
context.onSuccess();
284-
return result;
299+
R result = function.apply(t);
300+
final boolean validationOfResult = context.onResult(result);
301+
if (!validationOfResult) {
302+
context.onSuccess();
303+
return result;
304+
}
285305
} catch (RuntimeException runtimeException) {
286306
context.onRuntimeError(runtimeException);
287307
} while (true);
@@ -326,13 +346,24 @@ interface Metrics {
326346
long getNumberOfFailedCallsWithRetryAttempt();
327347
}
328348

329-
interface Context {
349+
/**
350+
* the retry context which will be used during the retry iteration to decide what can be done on error , result, on runtime error
351+
*
352+
* @param <T> the result type
353+
*/
354+
interface Context<T> {
330355

331356
/**
332357
* Records a successful call.
333358
*/
334359
void onSuccess();
335360

361+
/**
362+
* @param result the returned result from the called logic
363+
* @return true if we need to retry again or false if no retry anymore
364+
*/
365+
boolean onResult(T result);
366+
336367
/**
337368
* Handles a checked exception
338369
*

0 commit comments

Comments
 (0)