Skip to content

RxJava retryWhen bizarre behavior #4207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
asarkar opened this issue Jul 17, 2016 · 14 comments
Closed

RxJava retryWhen bizarre behavior #4207

asarkar opened this issue Jul 17, 2016 · 14 comments
Labels

Comments

@asarkar
Copy link

asarkar commented Jul 17, 2016

I'm playing with the RxJava retryWhen operator. Very little is found about it on the internet, the only one worthy of any mention being this. That too falls short of exploring the various use cases that I'd like to understand. I also threw in asynchronous execution and retry with back-off to make it more realistic.

My setup is simple: I've a class ChuckNorrisJokesRepository that returns random number of Chuck Norris jokes from a JSON file. My class under test is ChuckNorrisJokesService which is shown below. The use cases I'm interested in are as follows:

  1. Succeeds on 1st attempt (no retries)
  2. Fails after 1 retry
  3. Attempts to retry 3 times but succeeds on 2nd hence doesn't retry 3rd time
  4. Succeeds on 3rd retry

Note: The project is available on my GitHub.

ChuckNorrisJokesService.java:

@Slf4j
@Builder
public class ChuckNorrisJokesService {
    @Getter
    private final AtomicReference<Jokes> jokes = new AtomicReference<>(new Jokes());

    private final Scheduler scheduler;
    private final ChuckNorrisJokesRepository jokesRepository;
    private final CountDownLatch latch;
    private final int numRetries;
    private final Map<String, List<String>> threads;

    public static class ChuckNorrisJokesServiceBuilder {
        public ChuckNorrisJokesService build() {
            if (scheduler == null) {
                scheduler = Schedulers.io();
            }

            if (jokesRepository == null) {
                jokesRepository = new ChuckNorrisJokesRepository();
            }

            if (threads == null) {
                threads = new ConcurrentHashMap<>();
            }

            requireNonNull(latch, "CountDownLatch must not be null.");

            return new ChuckNorrisJokesService(scheduler, jokesRepository, latch, numRetries, threads);
        }
    }

    public void setRandomJokes(int numJokes) {
        mergeThreadNames("getRandomJokes");

        Observable.fromCallable(() -> {
            log.debug("fromCallable - before call. Latch: {}.", latch.getCount());
            mergeThreadNames("fromCallable");
            latch.countDown();

            List<Joke> randomJokes = jokesRepository.getRandomJokes(numJokes);
            log.debug("fromCallable - after call. Latch: {}.", latch.getCount());

            return randomJokes;
        }).retryWhen(errors ->
                errors.zipWith(Observable.range(1, numRetries), (n, i) -> i).flatMap(retryCount -> {
                    log.debug("retryWhen. retryCount: {}.", retryCount);
                    mergeThreadNames("retryWhen");

                    return Observable.timer(retryCount, TimeUnit.SECONDS);
                }))
                .subscribeOn(scheduler)
                .subscribe(j -> {
                            log.debug("onNext. Latch: {}.", latch.getCount());
                            mergeThreadNames("onNext");

                            jokes.set(new Jokes("success", j));
                            latch.countDown();
                        },
                        ex -> {
                            log.error("onError. Latch: {}.", latch.getCount(), ex);
                            mergeThreadNames("onError");
                        },
                        () -> {
                            log.debug("onCompleted. Latch: {}.", latch.getCount());
                            mergeThreadNames("onCompleted");

                            latch.countDown();
                        }
                );
    }

    private void mergeThreadNames(String methodName) {
        threads.merge(methodName,
                new ArrayList<>(Arrays.asList(Thread.currentThread().getName())),
                (value, newValue) -> {
                    value.addAll(newValue);

                    return value;
                });
    }
}

For brevity, I'll only show the Spock test case for the 1st use case. See my GitHub for the other test cases.

def "succeeds on 1st attempt"() {
    setup:
    CountDownLatch latch = new CountDownLatch(2)
    Map<String, List<String>> threads = Mock(Map)
    ChuckNorrisJokesService service = ChuckNorrisJokesService.builder()
            .latch(latch)
            .threads(threads)
            .build()

    when:
    service.setRandomJokes(3)
    latch.await(2, TimeUnit.SECONDS)

    Jokes jokes = service.jokes.get()

    then:
    jokes.status == 'success'
    jokes.count() == 3

    1 * threads.merge('getRandomJokes', *_)
    1 * threads.merge('fromCallable', *_)
    0 * threads.merge('retryWhen', *_)
    1 * threads.merge('onNext', *_)
    0 * threads.merge('onError', *_)
    1 * threads.merge('onCompleted', *_)
}

This fails with:

Too few invocations for:

1 * threads.merge('fromCallable', *_)   (0 invocations)
1 * threads.merge('onNext', *_)   (0 invocations)

What I'm expecting is that fromCallable is called once, it succeeds, onNext is called once, followed by onCompleted. What am I missing?

P.S.: Full disclosure - I've also posted this question on StackOverflow but my hopes are not high of getting an answer there.

@akarnokd
Copy link
Member

1 * threads.merge('fromCallable', *_)   (0 invocations)

No idea, what does the log contain? This looks suspicious to me, maybe it leads to IndexOutOfBoundsException a line below. Otherwise, try single-stepping on the callable.

1 * threads.merge('onNext', *_)   (0 invocations)

You count down the latch before the value is returned and thus the main thread may not see the call to onNext as it wakes up before the subscribe(j -> { ... is executed on the background thread.

@asarkar
Copy link
Author

asarkar commented Jul 17, 2016

@akarnokd fromCallable - before call is not printed at all, which is why the counting down the latch cannot be the issue. It is, in fact, necessary to count down before the method jokesRepository.getRandomJokes(numJokes) is called, otherwise if it throws an exception, the code will block forever.
Also, I don't think there's any IndexOutOfBoundsException here because that method is returning jokes just fine for other test case. I've tried changing it to the following but that didn't make any difference.

return IntStream.range(0, numJokes)
    .map(i -> ThreadLocalRandom.current().nextInt(i, allJokesCount))
    .mapToObj(jokes::get)
    .collect(toList());

If I set the retry count to 1 (default is 0), the code works fine. It's as if the code insists on retrying instead of falling back to it. The official doc only shows a trivial example that doesn't help at all.

@akarnokd
Copy link
Member

Yes, due to the inner wireing, if the inner retry sequence completes at any time, that may complete the outer sequence no matter where it is at the moment.

@asarkar
Copy link
Author

asarkar commented Jul 17, 2016

But don't you think that I shouldn't have to retry if the first attempt succeeds? That's the most basic use case.
The only thing printed in the log is the onCompleted statement.
04:31:06.035 [RxIoScheduler-2] DEBUG name.abhijitsarkar.java.rxjava.ChuckNorrisJokesService - onCompleted. Latch: 2.

@akarnokd
Copy link
Member

The operator requires that your Observable returned by the function reacts to to the values to the provided Observable to that function. That means that the resulting Observable should emit onCompleted only if the source inner Observable emitted a Throwable. range and many other operators emit onCompleted immediately after they emit their last value thus violating the requirement above and is subject to undefined behavior.

It can get quite convoluted (especially if time is involved) to have those terminal events of the inner Observable separated:

source.retryWhen(o -> {
    int[] count = { 0 };
    return o.concatMap(e -> {
              if (++count[0] <= numRetries) {
                  return Observable.just(1);
              }
              return Observable.error(e);
         });
});

Note that the body of the function can be stateful as it is invoked for each downstream subscriber to retryWhen individually.

@asarkar
Copy link
Author

asarkar commented Jul 17, 2016

@akarnokd David, your answer is quite involved so allow me to ask some follow up questions:

The operator requires that your Observable returned by the function reacts to to the values to the provided Observable to that function

What operator? Are you referring to the retryWhen? What function? Notification function? Reacts to what values? The one emitted by getRandomJokes in my example?

the resulting Observable should emit onCompleted only if the source inner Observable emitted a Throwable

Do you mean logic in retryWhen should emit onCompleted only if getRandomJokes emitted a Throwable?

int[] count = { 0 };

Why not simply int count = 0;

return Observable.just(1);

But I want return Observable.timer(retryCount, TimeUnit.SECONDS);

Lastly, if what you said above is right, the official document is just wrong. It uses a range to demonstrate retry logic. It also means that the logic in retryWhen must always execute, and not only in the case of a Throwable emitted by the first attempt.

@akarnokd
Copy link
Member

The operator retryWhen takes a function with the signature of Func1<Observable<Throwable>, Observable<?>>. That function you provide receives an Observable<Throwable> where the regular onNext events are Throwable instances. In that function, you should return an Observable of anything that when calls onNext itself, that will trigger a retry or calls onError/onCompleted to terminate the whole sequence with an error/completion.

Why not simply int count = 0;

You need it to be mutable inside the concatMap's lambda.

But I want return Observable.timer(retryCount, TimeUnit.SECONDS);

Then return that.

official document is just wrong

Unfortunately, we have a few operators contributed and forgotten; often with problems. One of them is retryWhen which has some stealthy concurrency bugs I wasn't able to pinpoint yet. Most likely I have to rewrite it from scratch one day and also update the documentation.

@asarkar
Copy link
Author

asarkar commented Jul 18, 2016

source.retryWhen(o -> {
    int[] count = { 0 };
    return o.concatMap(e -> {
              if (++count[0] <= numRetries) {
                  return Observable.just(1);
              }
              return Observable.error(e);
         });
});

I'm assuming you didn't try to compile this because it doesn't.
Observable.just(1) is of type Observable<? extends Integer> whereas Observable.error(e) is of type Observable<Object>. Those are incompatible.

@asarkar
Copy link
Author

asarkar commented Jul 18, 2016

I helped the compiler out by creating a RetryWithDelay class as follows (thanks to this SO thread):

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {
...
}

All my test cases work now. Thanks for your help @akarnokd David. If you choose to improve the docs by adding non-trivial use cases, you have my permission to reference to my GitHub code.

@asarkar asarkar closed this as completed Jul 18, 2016
@stealthcode
Copy link

Those are incompatible.

If that doesn't work in your IDE you can provide hints to the compiler for the error case to coerce the type system into agreement.

              return Observable.<Integer>error(e);

The purpose of the operator is to give users a reactive feedback loop by which they could transform exceptions into a signal that the source observable should be resubscribed to. You should not drop/filter any signals.

Your initial example is very long. When you post a question could you try to reduce the example down to the Minimal, Complete, Verifiable example? I have implementation of a retryWhen notification handler that look very similar to yours. There shouldn't be a problem there. I suspect that in reducing the problem to the MCV example you'll find what was causing your problem. Also you could take a look at this blog post RxJava's repeatWhen and retryWhen, explained.

which has some stealthy concurrency bugs I wasn't able to pinpoint yet.

Cute @akarnokd. @abersnaze and I wrote the retryWhen operator around 2 years ago. This was when there was no Single or "other" observable type without subclassing and inheriting all of the base operators. There are some operators (i.e. filter) which should not be available for users to use when transforming the observable passed into the retryWhen notification handler function. So, that's to say that I believe that the method signature could change for the better and result in a simplification of the problem to a more manageable operator.

I have made successful (read non-buggy) implementations which rely heavily on retryWhen. Here at Netflix it's used frequently. If you choose to rewrite it (which I encourage you to do if you suspect that it's buggy) then I ask you that you rename it to a different method and deprecate the existing. Otherwise bugs introduced into retryWhen could be detrimental. Failure modes are very rarely traversed paths but quite critical.

@asarkar
Copy link
Author

asarkar commented Jul 18, 2016

@stealthcode

Your initial example is very long. When you post a question could you try to reduce the example down to the Minimal, Complete, Verifiable example?

I don't agree. My example is not long, it has just as much code as required for the use cases I'm interested in. The overzealousness for "Minimal, Complete, Verifiable" produces the kind of cute but nonsensical hello world type example that's in the official doc or in the blog you referred to, where an error is thrown every time the Observable is subscribed to, and the code always retries. Such examples, quite conveniently, only touch the obvious happy path in the code and leave the complex cases for the user to deal with.

Also, if you read my original question again, you'll find that the blog post you referred to is already mentioned, and as I said, doesn't cover the various use cases I stated.

At the end of the day, the discussion here with David lead me to find a solution. All the power to you if you implemented complex solution at Netflix using retryWhen but unless you make that available to the rest of us, we'll have to resort to solving our own problems, perhaps with some help from the community.

@stealthcode
Copy link

but unless you make that available to the rest of us, we'll have to resort to solving our own problems, perhaps with some help from the community.

The zipWith example is exactly one case I'd give you that I know works correctly. It sounds like your are asking me to share examples which have already been shared. This is why I suspect something else in your example that works incorrectly. But since you asked, here is a copy paste example from my source code.

someOffBoxHystrixObservable
.timeout(TIMEOUT_THRESHOLD_MS, TimeUnit.MILLISECONDS)
.retryWhen(o -> {
    Observable<Integer> range = Observable.range(1, RETRY_MAX_ATTEMPTS);
    Observable<Observable<Long>> zipWith = o.zipWith(range, (e, i) ->
        i < RETRY_MAX_ATTEMPTS ?
                Observable.timer(RETRY_DELAY_MS, TimeUnit.MILLISECONDS) :
                Observable.error(e));
    return Observable.merge(zipWith);
})

The goal of RxJava is to provide a composable framework for async development. Since each operator should be composable and its external behavior can be verified the MCV process should lend very well to these kinds of bugs. You can use the TestSubscriber and the TestScheduler to demonstrate async scenarios and verify the outputs.

@stealthcode
Copy link

Oh, one more thing to mention that might be helpful. I use the .cacheWithInitialCapacity(1) operator frequently to ensure that work is saved and avoiding resubscriptions. For instance

return someOffBoxCall()
    .timeout(threshold, MILLISECONDS)
    .retryWhen(this::backoffHandler)
    .cacheWithInitialCapacity(1);

will return an observable that's resilient to resubscriptions and returns a copy of the same data (for any call that is idempotent in the lifetime of that observable's usage).

@asarkar
Copy link
Author

asarkar commented Jul 19, 2016

@stealthcode
This doesn't work for RETRY_MAX_ATTEMPTS = 0 and succeeds on first attempt (my use case 1). Based on David's response earlier, he was suspecting that range is emitting an onCompleted immediately when there's no retry to be made and the first attempt was never made. My current code is written without range and works for no retry use case.

someOffBoxHystrixObservable
.timeout(TIMEOUT_THRESHOLD_MS, TimeUnit.MILLISECONDS)
.retryWhen(o -> {
    Observable<Integer> range = Observable.range(1, RETRY_MAX_ATTEMPTS);
    Observable<Observable<Long>> zipWith = o.zipWith(range, (e, i) ->
        i < RETRY_MAX_ATTEMPTS ?
                Observable.timer(RETRY_DELAY_MS, TimeUnit.MILLISECONDS) :
                Observable.error(e));
    return Observable.merge(zipWith);
})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants