Skip to content

Change Thread in retryWhen will go into disposed #5490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
walfud opened this issue Jul 15, 2017 · 4 comments
Closed

Change Thread in retryWhen will go into disposed #5490

walfud opened this issue Jul 15, 2017 · 4 comments

Comments

@walfud
Copy link

walfud commented Jul 15, 2017

rxjava 2.1.1

I have a retryWhen operator in my stream like follow:

        final Disposable[] disposable = new Disposable[1];
        Observable.just(0)
                .map(new Function<Object, Object>() {
                    @Override
                    public Object apply(@NonNull Object o) throws Exception {
                        Log.i(TAG, "apply: before thread change: " + disposable[0].isDisposed());
                        return o;
                    }
                })
//                .observeOn(Schedulers.io())           // If change thread, the next `map` will not be reachable
                .map(new Function<Object, Object>() {
                    @Override
                    public Object apply(@NonNull Object o) throws Exception {
                        Log.i(TAG, "apply: after thread change: " + disposable[0].isDisposed());
                        return o;
                    }
                })
                .map(new Function<Object, Object>() {
                    @Override
                    public Object apply(@NonNull Object o) throws Exception {
                        throw new RuntimeException();
                    }
                })
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
                        return throwableObservable.zipWith(Observable.range(1, 1), new BiFunction<Throwable, Integer, Object>() {
                            @Override
                            public Object apply(@NonNull Throwable throwable, @NonNull Integer integer) throws Exception {
                                return integer;
                            }
                        })
                                .flatMap(new Function<Object, ObservableSource<?>>() {
                                    @Override
                                    public ObservableSource<?> apply(@NonNull Object o) throws Exception {
                                        return Observable.timer(0, TimeUnit.MILLISECONDS);
                                    }
                                });
                    }
                })
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.i(TAG, "onSubscribe: ");
                        disposable[0] = d;
                    }

                    @Override
                    public void onNext(Object o) {
                        Log.i(TAG, "onNext: ");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.i(TAG, "onError: ", e);
                    }

                    @Override
                    public void onComplete() {
                        Log.i(TAG, "onComplete: ");
                    }
                });

The code above seems ok:

28173-28173 I: onSubscribe: 
28173-28173 I: apply: before thread change: false
28173-28173 I: apply: after thread change: false
28173-28250 I: apply: before thread change: false
28173-28250 I: apply: after thread change: false
28173-28250 I: onComplete: 

But if you uncomment the observeOn line, the second map operator may not be reached, the output follow:

27269-27269 I: onSubscribe: 
27269-27269 I: apply: before thread change: false
27269-27298 I: apply: after thread change: false
27269-27338 I: apply: before thread change: false
27269-27338 I: onComplete: 

I debug the code and found that the first round awalys be ok, but in the second round(emitted by retry operator), first map procedure is ok, the second map procedure(below observeOn) may or may not be called. And even if the second map procedure is called, the disposable[0].isDisposed() == true .

So, is it anything wrong in my code or rxjava bug?

@akarnokd
Copy link
Member

I'm not sure how you run the example code but due to observeOn and timer, you have to wait a bit so the flow has time to print out the status. If I add Thread.sleep(1000) after subscribe or simply use blockingSubscribe, everything is printed as expected.

@akarnokd
Copy link
Member

akarnokd commented Jul 15, 2017

Since the retryWhen operator expects you to react to an error signal, the structure you are using may signal an onComplete during the time the upstream is re-streamed, yielding the pattern you see. You could rewrite the retry handling as follows:

    .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) {
            return throwableObservable
            .takeWhile(new Predicate<Throwable>() {
                int counter;
                @Override
                public boolean test(Throwable v) throws Exception {
                    return ++counter < 2;
                }
            })
            .flatMap(new Function<Object, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(@NonNull Object o) throws Exception {
                    return Observable.timer(0, TimeUnit.MILLISECONDS);
                }
            });
        }
    })

Here, takeWhile will react to being invoked the second time by completing the stream between subscriptions.

@walfud
Copy link
Author

walfud commented Jul 15, 2017

Thanks. I run the code in android(a click listener) environment. Finally I found a post about compose retryWhen & range operators will cause bizarre behavior: #4207.
I'll use a counter like the example above to control the retry times.

@akarnokd
Copy link
Member

Looks like this question has been answered. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants