Skip to content

Flatten operator #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mttkay opened this issue Jun 18, 2013 · 22 comments
Closed

Flatten operator #295

mttkay opened this issue Jun 18, 2013 · 22 comments

Comments

@mttkay
Copy link
Contributor

mttkay commented Jun 18, 2013

This is more a request for discussion.

I'm not sure .NET's Rx has something like this, but one thing I miss in RxJava is an operator to flatten an Observable<List<T>> to Observable<T>. As far as I can tell, this currently requires one to write somewhat clunky code using flatMap:

Observable.from(...).toList().flatMap(new Func1<List<...>, Observable<...>>() {
        @Override
        public Observable<...> call(List<...> list) {
            return Observable.from(list);
        }
    });

i.e. there seems to be no standard reverse operation for toList. Is this something you would deem worth having in the library itself? It would simplify the above call to:

Observable.from(...).toList().flatten()
@michaeldejong
Copy link
Contributor

It looks a bit similar to the switch operator on Observable. That takes Observable<Observable<T>> and turns it into Observable<T>. But perhaps a new operator which enumerates entries from an Observable<Collection<T>> is not such a bad idea.

Not only would this be the reverse operation of toList, it would also be a reverse operation for the buffer operation, which buffers incoming elements into lists of elements, which are then propagated.

Note that this does have some limitations. To me Observable.from(...).toList().flatten() seems impossible. The flatten operator should only be available / usable for Observables which produce Collections of elements, and not for Observables which produce non-Collection objects.

The only usable syntax I can come up with is: Observable.flatten(Observable.from(...).toList())

@mttkay
Copy link
Contributor Author

mttkay commented Jun 18, 2013

Note that this does have some limitations. To me Observable.from(...).toList().flatten() seems impossible. The flatten operator should only be available / usable for Observables which produce Collections of elements, and not for Observables which produce non-Collection objects.

that's a good point actually, didn't think about this. If we can't have the extra readability by flatten being an instance method, it might even suffice to have a flatten Func1 for now, which could be used like this:

// have to check if T could actually be inferred by Java here
Observable.from(1, 2, 3).toList().flatMap(Functions.flatten())

This is not much longer, doesn't read much worse, and would have the benefit of not impacting the Observable class interface.

@daveray
Copy link
Contributor

daveray commented Jun 18, 2013

I don't have much opinion on the syntax, but I'd find this useful. I end up
doing this operation on the result of toSortedList to switch back to an
observable sequence and continue processing.

Dave

On Tuesday, June 18, 2013, Matthias Käppler wrote:

Note that this does have some limitations. To me
Observable.from(...).toList().flatten() seems impossible. The flatten
operator should only be available / usable for Observables which produce
Collections of elements, and not for Observables which produce
non-Collection objects.

that's a good point actually, didn't think about this. If we can't have
the extra readability by flatten being an instance method, it might even
suffice to have a flatten Func1 for now, which could be used like this:

// have to check if T could actually be inferred by Java here
Observable.from(1, 2, 3).toList().flatMap(Functions.flatten())

This is not much longer, doesn't read much worse, and would have the
benefit of not impacting the Observable class interface.


Reply to this email directly or view it on GitHubhttps://github.com//issues/295#issuecomment-19610646
.

@prabirshrestha
Copy link
Contributor

Here is how to flatten IObservable<int[]> to IObservable<int> in .NET

var observable = Observable.Return(new[] { 1, 2, 3, 4, 5 });

observable                    // IObservable<int[]> observable
    .SelectMany(x => x)       // returns Observable<int>
    .Subscribe(Console.WriteLine);

Here is the SelectMany operator being used.

public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector);

@ghost ghost assigned benjchristensen Aug 30, 2013
@benjchristensen
Copy link
Member

Can anyone see a way of doing this that is type-safe?

Perhaps flatten(Class type) is the only way to do it so that we pass in the type that it should return? If it can't cast everything to that then it would onError a ClassCastException. Perhaps flatten() could exist but would have to emit Observable<Object>.

@samuelgruetter
Copy link
Contributor

I'm slightly confused by this discussion and I'm not sure if everyone is talking about the same thing... ;-)

But I think @benjchristensen you're looking for something like this:

    // in Observable.java:
    public static <T> Observable<T> flatten(Observable<? extends List<? extends T>> source) {
        Func1<List<? extends T>, Observable<T>> f = new Func1<List<? extends T>, Observable<T>>() {
            public Observable<T> call(List<? extends T> t1) {
                return Observable.from(t1);
            }
        };
        return Observable.concat(source.map(f));
    }
    // in some test file: 
    @Test public void testFlattenFromList() {
        Observable<List<Integer>> o = Observable.from(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5, 6));
        for (Integer i : flatten(o).toBlockingObservable().toIterable()) System.out.println(i);
    }

@akarnokd
Copy link
Member

akarnokd commented Dec 6, 2013

Yes, this looks straightforward. Rx.NET has a SelectMany overload for this kind of operation. I think using a List is too strong type here, maybe an Iteratable will suffice.

@MikolajKakol
Copy link

With new lift operation on Observables it's quite easy to create flatten operator. I've created something like this:

public static class ListToItemsOperator<T> implements Operator<T, Iterable<T>> {

    @Override
    public Subscriber<? super Iterable<T>> call(final Subscriber<? super T> child) {
        return new Subscriber<Iterable<T>>(child) {

            @Override
            public void onCompleted() {
                child.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }

            @Override
            public void onNext(Iterable<T> list) {
                for (T t : list) {
                    if (child.isUnsubscribed()) {
                        break;
                    }
                    try {
                        child.onNext(t);
                    } catch (Exception e) {
          onError(OnErrorThrowable.addValueAsLastCause(e, t));
                        return;
                    }
                }
            }
        };
    }
}

Usage:

    Observable<List<Integer>> listObs = Observable.from(Arrays.asList(1, 2, 3, 4, 5, 6)).toList();
    listObs.lift(new ListToItemsOperator<Integer>()).subscribe(DumpObs.make());
    listObs.lift(new ListToItemsOperator<Integer>()).take(2).subscribe(Dump.Obs.make());

I'm not sure how good is that code, but it looks doable with type safety so It might be good moment to add it do core operators?

@benjchristensen
Copy link
Member

That works well to pull in via lift() because you can provide the correctly typed implementation for the Observable you're working on, but it can't be put on Observable as an instance method unless we do casting (and blow up if invalid) since that only works when T is Iterable<T> (i.e. Observable<Iterable<T>>) and an Observable can be anything.

It could be done as a static Observable.flatten(Observable<Iterable<T>> o) (as @samuelgruetter mentions above) but then we'd need many different overloads of flatten and that doesn't work with the chaining pattern that is wanted.

In short, Java doesn't allow conditional extension methods on a class based on its generic type, so a flatten instance method can't be typed to Iterable<T>, or Iterable<Iterable<T>> or other such variants. It will always only receive whatever T is.

@MikolajKakol
Copy link

Sure, we shouldn't force to add that to Observable I was rather thinking to expand rx.operators package with FlattenOperator. Since this implementation is immutable we might might have single instance of this operator to slightly reduce objects creation.

@abersnaze
Copy link
Contributor

Other notable places where we've done this are MathObservable and StringObservable. The contradiction is the that dematerialize() is an instance method on Observable<T> when it should only exist on instances Observable<Notification<T>>.

We could go crazy with the overloads to make things type safe

class Observable<T> {
    NotificationObservable<T> materialize()
    Observable<R> map(Func1<T, R>)
    NotificationObservable<R> map(Func1<T, Notification<R>>)
    ... a billion methods later ...
}
class NotificationObservable<T> extends Observable<Notification<T>> {
    Observable<T> dematerialize();
    // just in case someone materializes a materialized Observable.... turtles all the way down.
    NotificationNotificationObservable<T> materialize()
}
class NotificationNotificationObservable<T> extends NotificationObservable<Notification<T>> {
    NotificationObservable<T> dematerialize()
}

ok that was a silly idea.

@benjchristensen
Copy link
Member

Yes, dematerialize is the counter-example where we do casting an throw an error if the type is not a Notification.

@benjchristensen
Copy link
Member

I was rather thinking to expand rx.operators package with FlattenOperator

We have yet to start exposing custom operators for use via lift like this, and the rx.operators package is not the right place, since those are internal implementations and can change at any time.

@benjchristensen
Copy link
Member

Originally from #1408 by @ccmtaylor


In my code, I often make API calls that return Observable<Iterable<T>> and then I want to process each element of the returned collection. To do this, I keep having to write something like:

moviesService
    .listMovies()
    .flatMap(movies -> Observable.from(movies))
    .map(movie -> doSomethingWith(movie));

In Java 8 / scala, this isn't too bad, but with Java 7 / Android syntax, the flatMap call is not pretty.

Since we can't add a flatten() instance method that only applies to Observable<Iterable<T>> in java, I see two solutions:

  1. Add Observable<T> flatten(Observable<Iterable<T>> obs) to rx.Observable:

    Observable.flatten(moviesService.listMovies())
        .map(movie -> doSomethingWith(movie));
  2. Add Func1<Iterable<T>, Observable<T>> flatten() to rx.functions.Functions

    moviesService
        .listMovies()
        .flatMap(Functions.<Movie>flatten()) // because Java can't figure out the generics
        .map(movie -> doSomethingWith(movie));

@benjchristensen
Copy link
Member

As this discussion continues, some items to keep in mind...

  • If we're going to have a collection of static functions, we need a set of conventions that let it scale well and not become just a random catch-all. This is why I am not okay yet with just adding Functions.flatten. This is why so far we have put things like this in separate contrib modules where they are optionally brought in as desired (for example, the StringObservable).
  • Optimizing for languages without lambdas is not necessarily a reason to do something.
  • Whatever we do we're stuck with for a very long time.

@headinthebox
Copy link
Contributor

Note that here you can use .mergeMapIterator (https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable).

That said, I agree with Ben, it is painful that Java has no extension methods so it becomes either/or. the saving grace (mmm) is that static methods do not need a receiver, it is less ugly to write f(xs) than FooBar.f(xs), but of course you just want to write xs.f().

@benjchristensen
Copy link
Member

Here is how to do it with Iterable:

Observable<Iterable<Integer>> o = Observable.from(Arrays.asList(1, 2), Arrays.asList(3, 4));
o.flatMapIterable(is -> is).forEach(System.out::println);

@vincent-paing
Copy link

Hello, there's no more from method in rxjava 2. Is there a workaround?

@akarnokd
Copy link
Member

akarnokd commented Dec 5, 2016

@vincent-paing check again, it is still called flatMapIterable in 2.x.

@vincent-paing
Copy link

vincent-paing commented Dec 5, 2016

@akarnokd yea but there's no longer "from" method. I tried used fromIterable(myArrayList) but IDE gives me error due to different type

@akarnokd
Copy link
Member

akarnokd commented Dec 5, 2016

It is called fromArray() to avoid ambiguities in Java 8+.

@vincent-paing
Copy link

@akarnokd Thanks, that works :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests