Skip to content

Implement the 'Repeat' operator #498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Nov 19, 2013

Hi, this PR implemented the Repeat operator #70. Please take a look. Thanks!

@cloudbees-pull-request-builder

RxJava-pull-requests #422 SUCCESS
This pull request looks good

observer.onCompleted();
} else {
ConnectableObservable<T> replayObservable = source
.replay();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benjchristensen
Copy link
Member

I believe this stack overflows:

        Observable.from(1).repeat().toBlockingObservable().forEach(new Action1<Integer>() {

            @Override
            public void call(Integer t1) {

            }});

Stacktrace:

java.lang.RuntimeException: java.lang.StackOverflowError
    at rx.observables.BlockingObservable.forEach(BlockingObservable.java:159)
    at rx.operators.OperationRepeatTest.main(OperationRepeatTest.java:39)
Caused by: java.lang.StackOverflowError
    at java.util.HashMap$Entry.<init>(HashMap.java:814)
    at java.util.HashMap.createEntry(HashMap.java:901)
    at java.util.HashMap.addEntry(HashMap.java:888)
    at java.util.HashMap.put(HashMap.java:509)
    at rx.subjects.ReplaySubject$SubscriptionFunc.call(ReplaySubject.java:115)
    at rx.subjects.ReplaySubject$SubscriptionFunc.call(ReplaySubject.java:1)
    at rx.subjects.ReplaySubject$DelegateSubscriptionFunc.onSubscribe(ReplaySubject.java:84)
    at rx.Observable.subscribe(Observable.java:224)
    at rx.operators.OperationMulticast$MulticastConnectableObservable$1.onSubscribe(OperationMulticast.java:41)
    at rx.Observable.subscribe(Observable.java:224)
    at rx.operators.OperationObserveOn$ObserveOn.onSubscribe(OperationObserveOn.java:50)
    at rx.Observable.subscribe(Observable.java:224)
    at rx.operators.OperationRepeat$1$1.onCompleted(OperationRepeat.java:82)
    at rx.subjects.ReplaySubject.onCompleted(ReplaySubject.java:141)
    at rx.operators.OperationMulticast$MulticastConnectableObservable$2.onCompleted(OperationMulticast.java:54)
    at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:48)
    at rx.Observable.subscribe(Observable.java:224)
    at rx.operators.OperationMulticast$MulticastConnectableObservable.connect(OperationMulticast.java:51)
    at rx.operators.OperationRepeat$1$1.onCompleted(OperationRepeat.java:83)
    at rx.subjects.ReplaySubject.onCompleted(ReplaySubject.java:141)
    at rx.operators.OperationMulticast$MulticastConnectableObservable$2.onCompleted(OperationMulticast.java:54)
    at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:48)
    at rx.Observable.subscribe(Observable.java:224)
    at rx.operators.OperationMulticast$MulticastConnectableObservable.connect(OperationMulticast.java:51)
    at rx.operators.OperationRepeat$1$1.onCompleted(OperationRepeat.java:83)
    at rx.subjects.ReplaySubject.onCompleted(ReplaySubject.java:141)
    at rx.operators.OperationMulticast$MulticastConnectableObservable$2.onCompleted(OperationMulticast.java:54)
    at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:48)
    at rx.Observable.subscribe(Observable.java:224)
    at rx.operators.OperationMulticast$MulticastConnectableObservable.connect(OperationMulticast.java:51)
    at rx.operators.OperationRepeat$1$1.onCompleted(OperationRepeat.java:83)
    at rx.subjects.ReplaySubject.onCompleted(ReplaySubject.java:141)
    at rx.operators.OperationMulticast$MulticastConnectableObservable$2.onCompleted(OperationMulticast.java:54)
    at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:48)
    at rx.Observable.subscribe(Observable.java:224)
    at rx.operators.OperationMulticast$MulticastConnectableObservable.connect(OperationMulticast.java:51)
    at rx.operators.OperationRepeat$1$1.onCompleted(OperationRepeat.java:83)
    at rx.subjects.ReplaySubject.onCompleted(ReplaySubject.java:141)
    at rx.operators.OperationMulticast$MulticastConnectableObservable$2.onCompleted(OperationMulticast.java:54)
    at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:48)
    at rx.Observable.subscribe(Observable.java:224)
... etc

If Subscriptions has not been touched before the StackOverflow you will instead see this:

java.lang.NoClassDefFoundError: Could not initialize class rx.subscriptions.Subscriptions
    at rx.Observable.subscribe(Observable.java:254)
    at rx.observables.BlockingObservable.protectivelyWrapAndSubscribe(BlockingObservable.java:93)
    at rx.observables.BlockingObservable.forEach(BlockingObservable.java:121)
    at rx.operators.OperationRepeatTest.main(OperationRepeatTest.java:39)

We need to use the recursive scheduler idiom to implement repeat.

@samuelgruetter
Copy link
Contributor

Small detail: in some places, you should replace "The number of times to repeat the element" by "The number of times to repeat the source sequence".

@akarnokd
Copy link
Member

Is there a reason the repeat() uses an external Scheduler and the ReplaySubject? How about a simpler approach:

    public static <T> OnSubscribeFunc<T> repeat(final Observable<T> source, final int count) {
        return new OnSubscribeFunc<T>() {
            @Override
            public Subscription onSubscribe(final Observer<? super T> t1) {
                final SerialSubscription sreg = new SerialSubscription();
                final Observable<T> ssource = source.subscribeOn(Schedulers.currentThread());
                Observer<T> o = new Observer<T>() {
                    int remaining = count;
                    @Override
                    public void onNext(T args) {
                        t1.onNext(args);
                    }
                    @Override
                    public void onError(Throwable e) {
                        try {
                            t1.onError(e);
                        } finally {
                            sreg.unsubscribe();
                        }
                    }
                    @Override
                    public void onCompleted() {
                        if (remaining-- > 0) {
                            sreg.setSubscription(ssource.subscribe(this));
                        } else {
                            t1.onCompleted();
                            sreg.unsubscribe();
                        }
                    }
                };
                sreg.setSubscription(ssource.subscribe(o));
                return sreg;
            }
        };
    }

@zsxwing
Copy link
Member Author

zsxwing commented Nov 20, 2013

@benjchristensen I think using Schedulers.immediate() causes the stack overflow problem. Here is a test in C#:

        static void Main(string[] args)
        {
            IObservable<int> obs = Observable.Create<int>(o => new Foo(o).test(Scheduler.Immediate, 0));
            obs.Subscribe(
                x => Console.WriteLine("OnNext: " + x)
                );
            Console.ReadLine();
        }

        class Foo
        {
            IObserver<int> _o;

            public Foo(IObserver<int> o)
            {
                _o = o;
            }

            public IDisposable test(IScheduler s, int i)
            {
               return s.Schedule(i + 1,  (scheduler, x) => {
                    _o.OnNext(x);
                    return this.test(scheduler, x);
                });
            }
        }

This code does not cause a stack overflow exception.

But in RxJava, the following unit test will cause a stack overflow exception:

    @Test
    public void testRecursiveScheduler1() {
        Observable<Integer> obs = Observable
                .create(new OnSubscribeFunc<Integer>() {
                    @Override
                    public Subscription onSubscribe(
                            final Observer<? super Integer> observer) {
                        return Schedulers.immediate().schedule(0,
                                new Func2<Scheduler, Integer, Subscription>() {
                                    @Override
                                    public Subscription call(
                                            Scheduler scheduler, Integer i) {
                                        observer.onNext(i);
                                        return scheduler.schedule(i + 1, this);
                                    }
                                });
                    }
                });

        obs.subscribe(new Observer<Integer>() {

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(Integer args) {
                System.out.println(args);
            }
        });
    }

I'm trying to understand the ImmediateScheduler in Rx.Net. Seems that it also uses a queue to save the Actions to avoid the stack overflow exception.
http://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/ImmediateScheduler.cs
http://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs

@cloudbees-pull-request-builder

RxJava-pull-requests #444 FAILURE
Looks like there's a problem with this pull request

@cloudbees-pull-request-builder

RxJava-pull-requests #445 FAILURE
Looks like there's a problem with this pull request

@cloudbees-pull-request-builder

RxJava-pull-requests #446 SUCCESS
This pull request looks good

@zsxwing
Copy link
Member Author

zsxwing commented Nov 24, 2013

The current implementation still has some problems which can not be simply handled in OperationRepeat.

If using ImmediateScheduler in repeat operator, there may be a stack overflow exception mentioned above.

If using CurrentThreadScheduler, the following test will not stop. I suppose it should stop when the observer has some error.

    @Test
    public void testRepeatWithInfiniteRepeatCountWithCurrentThread() {
        Observable<String> observable = repeat("foo", Schedulers.currentThread());

        @SuppressWarnings("unchecked")
        Observer<String> observer = (Observer<String>) mock(Observer.class);

        doAnswer(new Answer<Void>() {
            private int count = 0;

            @Override
            public Void answer(InvocationOnMock invocation) throws Throwable {
                count++;
                if (count == 100) {
                    // Only verify if repeating 100 times
                    // We can not really verify if repeating infinitely.
                    throw new RuntimeException("some error");
                }
                return null;
            }
        }).when(observer).onNext(anyString());

        observable.subscribe(observer);

        InOrder inOrder = inOrder(observer);
        inOrder.verify(observer, times(100)).onNext("foo");
        inOrder.verify(observer).onError(isA(RuntimeException.class));
        inOrder.verifyNoMoreInteractions();
    }

&& calledTimes.incrementAndGet() == repeatCount) {
observer.onCompleted();
} else {
subscription.setSubscription(scheduler.schedule(cancel, self));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem about CurrentThreadScheduler is here. When we reach the onCompleted, an synchronized Observable has not return the subscription yet. In Observable.java, every subscribe will be wrapped by a SafeObservableSubscription and SafeObserver. If the observer has some error, SafeObserver will unsubscribe the SafeObservableSubscription. But now SafeObservableSubscription has not be set the real Subscription yet. So now when we is in the onCompleted here, we has no way to know if unsubscribe has happens. We will continue to call scheduler.schedule.

Next time, when we enter onCompleted here again, we still do not know if unsubscribe has happens. So we will not be able to terminate it even if the observer has some error.

                SafeObservableSubscription subscription = new SafeObservableSubscription();
                subscription.wrap(onSubscribeFunction.onSubscribe(new SafeObserver<T>(subscription, observer)));
                return hook.onSubscribeReturn(this, subscription);

@headinthebox
Copy link
Contributor

  • You should use the current thread scheduler (which is what C# does), see my pull request.
  • I don't think we need an overload with count, just as easy to do a take(n) afterwards.
  • There is an issue with toblocking observable where it does not terminate that does not appear in .toeneumerable() in .NET

@zsxwing
Copy link
Member Author

zsxwing commented Nov 24, 2013

You should use the current thread scheduler (which is what C# does), see my pull request.

Agree. But now as the current thread has some issue, I use ImmediateScheduler to pass the unit test.

I don't think we need an overload with count, just as easy to do a take(n) afterwards.

Could you give me an example? How can we know the n to use in take when the observable size is unknown.

There is an issue with toblocking observable where it does not terminate that does not appear in .toeneumerable() in .NET

I suppose my problem it's about SafeObservableSubscription and SafeObserver. But maybe we are talking about the same issue since SafeObservableSubscription and SafeObserver are used in toBlockingObservable, too.

@akarnokd
Copy link
Member

This hangs in Rx.NET as well (2.1.30214.0)

Observable.Return(1).Repeat().Take(100).Subscribe(Console.WriteLine);

The issue is in the Repeat.subscribe() not returning until the (infinite) re-subscription finishes, so there is no way the Take method can signal its uplink to terminate as it hasn't yet received a reference to it. However, this works:

Observable.Return(1, Scheduler.Default).Repeat().Take(100).Subscribe(Console.WriteLine);

as now the Repeat.onCompleted can run in another thread and the Repeat.subscribe() can return.

I believe this can be achieved by not subscribing to the raw source in the Repeat.subscribe() but rather using subscribeOn(Schedulers.newThread()) to move the subscription into a parallel thread and let the Repeat.subscribe() return.

public static <T> OnSubscribeFunc<T> repeat(final Observable<T> source, final int count) {
    return new OnSubscribeFunc<T>() {
        @Override
        public Subscription onSubscribe(final Observer<? super T> t1) {
            final SerialSubscription sreg = new SerialSubscription();
            final Observable<T> ssource = source.subscribeOn(Schedulers.newThread());
            Observer<T> o = new Observer<T>() {
                int remaining = count;
                @Override
                public void onNext(T args) {
                    t1.onNext(args);
                }
                @Override
                public void onError(Throwable e) {
                    try {
                        t1.onError(e);
                    } finally {
                        sreg.unsubscribe();
                    }
                }
                @Override
                public void onCompleted() {
                    if (remaining-- > 0) {
                        sreg.setSubscription(ssource.subscribe(this));
                    } else {
                        t1.onCompleted();
                        sreg.unsubscribe();
                    }
                }
            };
            sreg.setSubscription(ssource.subscribe(o));
            return sreg;
        }
    };
}

@zsxwing
Copy link
Member Author

zsxwing commented Nov 25, 2013

I believe this can be achieved by not subscribing to the raw source in the Repeat.subscribe() but rather using subscribeOn(Schedulers.newThread()) to move the subscription into a parallel thread and let the Repeat.subscribe() return.

Thanks, you remind me one thing: There is not an repeat overload with Observable and Scheduler together in Rx.Net. When I tried to implement the repeat overload, I was wondering why there was not a such overload. Maybe this is because repeat can not be used with all of Schedulers. However, even if we do not provide such overload, users still can use Schedulers by observeOn. For example, Observable.from(1).repeat().take(100).observeOn().(Schedulers.CurrentThread);.

So is it OK that we do not provide this kind of method and warn that should not use repeat with Schedulers.CurrentThread() or Schedulers.immediate() in the document?

@akarnokd
Copy link
Member

I think Rx.Net started out its Return operator to run on the threadpool. If manually put back there, the example works. Nowadays it runs on the immediate scheduler, causing the problem. Even if you warn the user about the scheduler, there is no way to know if an incoming observable is dangerous or not. This affects other operators such as concat and onerrorresume.

@headinthebox
Copy link
Contributor

Return always used the immediate scheduler :-)

@zsxwing
Copy link
Member Author

zsxwing commented Nov 26, 2013

Same stack overflow issue happens in interval.

    @Test
    public void testIntervalWithImmediateScheduler() {
        Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.immediate())
                .subscribe(new Observer<Long>() {

                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                    }

                    @Override
                    public void onNext(Long args) {
                        System.out.println(args);
                    }
                });
    }

@zsxwing
Copy link
Member Author

zsxwing commented Nov 26, 2013

CurrentThreadScheduler can work with interval because interval does not use SafeObservableSubscription and SafeObserver.

@benjchristensen
Copy link
Member

I am working with @headinthebox on changes to Schedulers including Interval and will come back to this and #518 when ready.

@benjchristensen benjchristensen mentioned this pull request Nov 26, 2013
@benjchristensen
Copy link
Member

Completed in #699

@zsxwing zsxwing deleted the repeat branch March 2, 2014 03:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants