Skip to content

add Observable.sort() instance methods #1386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed

add Observable.sort() instance methods #1386

wants to merge 1 commit into from

Conversation

davidmoten
Copy link
Collaborator

Added two Observable.sort instance methods that are equivalent to

Observable.toSortedList().flatMap(x -> Observable.from(x))

I guess you might ask well why don't people just code that. I think it adds the following:

  • a more natural reading api method
  • doesn't interrupt the observable flow with buffered type Observable<List<T>>
  • less mess for those people coding without lambdas

@cloudbees-pull-request-builder

RxJava-pull-requests #1325 SUCCESS
This pull request looks good

@headinthebox
Copy link
Contributor

I am not a big fan of having any sort of sort on observables. Sorting in general makes little sense for observable streams, rule of thumb, it you cannot write using scan it should probably not be there. If you do want to sort, that is available on regular finite lists.

@davidmoten
Copy link
Collaborator Author

Sorting doesn't make sense for infinite streams of course. On one project I use rxjava in a high performance computing role to process chunks of data on disk. Many thousands of chunks of data need to be sorted within each chunk then aggregated chunk by chunk in parallel later merged and aggregated with another function. Works well and quickly, very happy with it.

Observables come in all flavours small, large, infinite, etc and a sort method makes sense for the small (collection wholly fits in memory) to me.

@headinthebox
Copy link
Contributor

But you are not sorting an observable, you are sorting a list created from an observable. Saying xs.sort() seems like it is harmless, but it isn't. Truth in advertising.

And for your scenario, of course I lack the details, but I really wonder why you turn sorted lists back to observables, and don't work at the level of Observable[List[T]] as opposed to flattening Observable[T}

@headinthebox
Copy link
Contributor

Note it sucks that Java does not have extension methods. In .NET or Scala you can easily add this as a convenience method for your specific use-case.

@davidmoten
Copy link
Collaborator Author

my scenario, nope not that simple. The top level code is here if you are interested but it might be a bit too much detail! In short I have 115m timestamped position reports for ships over a year and for each ship I load the reports and sort by time (they arrive out of order due to different delivery latencies). At that point I can join the dots and start calculating metrics about grids on the earth like distance travelled per cell. you then end up with nifty vessel traffic density plots.

I take your point about extension methods. The language you use changes the things you care about eh. I'm a long term scala person but haven't used rxjava with it yet.

I'm not really fussed about this pull request, just thought I'd put it out there as a practical thing and expected pushback as the sort() method is an obvious omission from the api, had to be deliberate!

@headinthebox
Copy link
Contributor

had to be deliberate!

Very deliberate. It is not a streaming operation.

@davidmoten
Copy link
Collaborator Author

I'd like to explore this a bit further to see if I can glean a rule to apply in other similar situations. The sort method as proposed has some characteristics that are apparently inherently undesirable:

  • buffers entire incoming stream till completion then starts emitting
  • changes the order of the incoming items

I'm assuming that the 2nd point is not a problem, asynchronous processing can change order anyway and we often don't mind.

Re the first point let's look at the the last method which waits till completion before emitting. It seems to be a bona fide member of the ok Observable methods so I assume there is nothing inherently undesirable about it.

We are left with the fact that the entire stream is buffered. In general this sounds risky if your buffer has practical limits (like available memory). So lets reword the objection as that a potentially large stream is buffered. I went looking for card carrying Observable methods that have this characteristic and there are a few, among them zip, merge, and window with a time interval. Those methods are part of the backpressure work going on at the moment and may not threaten a buffer filling if backpressure can be applied. So perhaps that is the nub of it. The sort method is an undesirable method in a streaming world because

  • it buffers potentially large streams and the only way backpressure can assist is in preventing completion.

No doubt I've missed something important. Enlighten me @headinthebox.

@headinthebox
Copy link
Contributor

Well-behaved operators should produce output "in constant time" after receiving input. This has nothing to do with back pressure, please don't confuse orthogonal concerns.

Changing the order is ok, as soon as you do flatMap or merge that will happen.

So last is dangerous as well, like any other operator that buffers values until the end take linear time to produce output. Alas it is too late to kick it out.

zip is dangerous because producing output depends on two streams, so the fact that output is produced in "constant time" or not looking for one of the inputs is determined by the other.

merge is OK since it produced output as soon as input arrives, as is window which immediately starts producing output when a new window is produced. buffer is still OK since its intent is to produce input as soon as it can, but generally it is better to use window if you want to be streaming.

@benjchristensen
Copy link
Member

The toList operator is the worst of the bunch. By the way, we already have toSortedList ... which is already "bad" and does virtually the same thing as sort, just it emits List<T> rather than flattening back into T.

If I could go back to the beginning of RxJava, I'd probably argue for these types of operators to be on a different Observable, similar to how BlockingObservable was separated out.

@headinthebox
Copy link
Contributor

If I could go back to the beginning of RxJava

Yes, me too!

@davidmoten
Copy link
Collaborator Author

I'm still searching for a bit of precision about these undesirable operators. The idea of producing output within constant time is interesting but I can manufacture a variant of sort that emits a STILL_GOING token every N input items and I suspect that that operator is still an undesirable.

I'm suspecting that it hinges on emission after completion of source.

Note that buffer can emit after completion of source but does not always do so.

Note also that materialize always emits after completion but emits a constant.

So how's this: an operator is undesirable if

  • the operator always emits non-constant information after source completion

On this basis, the following operators would be considered undesirable or "bad"

  • sort (doesn't exist except in this pull request)
  • last
  • toSortedList
  • toList
  • toMap,toMultimap
  • all
  • collect (I think though haven't looked closely at this one)
  • count,max,min,sum
  • takeLast

The above criterion does not categorize zip as undesirable but as it operates on multiple streams I think it's a special case that could be explored on its own.

@benjchristensen
Copy link
Member

Your list is correct (and include reduce). This was discussed somewhat in #671.

Adding a sort function to the current state of Observable is no different than having toSortedList which we already have. As I mentioned above, if I could go back in time I would argue for moving all of these aggregate/terminal operators to a different Observable.

The way I'd prefer to handle these flattening requirements is with a flatten operator, but Java doesn't make that easy to do. There is a discussion about that in #295.

I think the main drawback of an operator called sort as opposed to toSortedList is whether it is clear to people that sort will wait until the entire origin is finished before starting to emit anything. The toSortedList operator is at least explicit about this (toSortedList().flatten() is more explicit than sort() ... if we had flatten()). The same reason reduce/collectare dangerous ...

None of those however are necessarily reasons for not adding sort since we obviously already have this problem.

@headinthebox
Copy link
Contributor

IMHO you are overanalyzing this a bit :-). Of course creating a "stuttering" (term they use for this in process algebra) stream when you sort is not very useful. I am not I understand what the problem is with 'materialize' emitting after completion, it just turns an Observable[T] into an Observable[Notification[T]] and does not wait for the input stream to terminate to produce notifications. Same with buffer, it outputs the buffered values as soon as the buffer is filled as you asked for (but as I said, you should normally prefer window).

That aside, your list is spot on. You miss reduce and probably some other operators as well like concatMap. Note however that sometimes doing a .toList on a nested observable is exactly what you need, but typically not a good idea.

@davidmoten
Copy link
Collaborator Author

I'm more interested in the criterion I put forward than the list and I wasn't suggesting there is a problem with emitting after completion like buffer can do and materialize does. Here's the criterion:

  • an operator is "bad" if it always emits non-constant information after source completion

@davidmoten
Copy link
Collaborator Author

None of this is an argument for adding sort, I don't really care about that. What I'm interested in is characterizing the API. I routinely use the so-called bad operators and am glad they are there but I think what's going on is that the rxjava API serves both a general purpose functional programming role as well as the stream processing role. Splitting stuff off into separate stream and non-stream modules is a pain for people because as @headinthebox mentions we don't have extension methods in java. Oh well. I'm still pretty happy with the API as is but wouldn't mind if more non-stream bits got chucked in for general convenience.

@headinthebox
Copy link
Contributor

Not sure that works, for example under this definition scan would be bad; I think it is easier/better to think what a good operator is, namely one that produces its results incrementally. For example, if you look at groupBy in Rx, we made sure sends out new groups and values on existing groups as soon as a new element arrives on the input, whereas in SQL or most implementation on in-memory collections, groupBy in not incremental like that. But even then groupBy is kind of dangerous because it can have an unbounded number of keys, that why we have groupByUntil.

Not you could try to define a .sort function that would send out the input so far as incremental sorted lists, but obviously that would not work for infinite streams.

You may want to take a look at http://en.wikipedia.org/wiki/Streaming_algorithm as well for interesting algorithms over infinite streams.

@benjchristensen
Copy link
Member

That's a good characterization of it ... the stream vs non-stream operators. I think the question comes down to what's more important ... having easy (familiar ... in your face ... dangling right off Observable) access to non-stream operators, or the fact that people then really need to understand which ones work on infinite streams and which ones require a terminal state.

We already crossed that line (Rx.Net led the way). So, do we try and pull back, or do we just accept it and let them all be in here. If we do let them all in, do we distinguish between them in any way?

@headinthebox
Copy link
Contributor

Note that there is no problem combining Rx with say Java8 streams when you do .toList and then call the existing operators on those. If you would look at my code, you see this split a lot. I.e. don't try to use Rx for collections that are really lists, for that use http://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html.

@headinthebox
Copy link
Contributor

To be fair @benjchristensen in .NET we pulled back as well. No API is perfect, but that should not be a reason to make it more imperfect,

@benjchristensen
Copy link
Member

@headinthebox I totally agree ... I was just commenting on how we're in a weird middle-ground where we have a mixture, so it's hard to say "no" to adding new non-stream operators when precedent already exists.

@davidmoten
Copy link
Collaborator Author

Using the bad definition, scan is not bad, it doesn't emit after completion but rather emits after each item. reduce on the other hand would be bad.

@davidmoten
Copy link
Collaborator Author

I'm going to try to redefine "bad".

Define a well behaved stream operator as one that

  • does not always emits non-constant information after source completion

@davidmoten
Copy link
Collaborator Author

I've had a great time using the functional and streaming aspects of the api together. It's an absolute winner in the java space especially pre java 8 but also post. We could add javadoc to the poorly behaved stream operators so that people know and allow useful functional constructs to enter the api. I'm not a fan of making things verbose in the API because it's educational in some sense (toSortedList.flatten). Documentation can help there.

@davidmoten
Copy link
Collaborator Author

Perhaps this.

Stream Operator Criterion
An Operator is considered a Stream Operator if and only if it does not always emit non-constant information after source completion.

@benjchristensen
Copy link
Member

I'm going to pass on this for now ... sorting of streams is generally a bad idea, and we already have toSortedList and window/buffer that can then each be sorted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants