-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Feature: AggregateMonotonicity #14271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
# Conflicts: # datafusion/core/src/physical_optimizer/enforce_sorting.rs # datafusion/core/src/physical_optimizer/test_utils.rs
# Conflicts: # datafusion/core/src/physical_optimizer/enforce_sorting.rs # datafusion/physical-optimizer/src/test_utils.rs
separate stubs and count_udafs
change monotonicity to return an Enum rather than Option<bool> fix indices re-add monotonicity tests
# Conflicts: # datafusion/core/tests/physical_optimizer/enforce_sorting.rs
@@ -4963,6 +4963,9 @@ false | |||
true | |||
NULL | |||
|
|||
statement ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are related with #14231
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order that the tests better explain the implications of this change, can you please add a new test rather than updating the existing test (by setting this option).
So that would mean set the flag and run the EXPLAIN again in a separate block
That will let the tests better illustrate any change in behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
datafusion/expr/src/udaf.rs
Outdated
/// function is monotonically increasing if its value increases as its argument grows | ||
/// (as a set). Formally, `f` is a monotonically increasing set function if `f(S) >= f(T)` | ||
/// whenever `S` is a superset of `T`. | ||
fn monotonicity(&self, _data_type: &DataType) -> AggregateExprMonotonicity { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend adding a note at the beginning of the comment: This is used for a specific (is it BoundedWindowAggExec? )optimization and can be skipped by using the default implementation.
This interface seems quite difficult to understand for a general user who only wants to add a simple UDAF
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to follow the existing model for ScalarUDFs here instead?
Soemthing like this:
pub trait AggregateUDFImpl {
...
/// returns the output order of this aggregate expression given the input properites
fn output_ordering(&self, inputs: &[ExprProperties]) -> Result<SortProperties>;
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not possible because this property is purely related with the function's nature. It does not depend input order or anything else, just the relation between the element-wise increment (or decrement) in the grouping set and resulting values of aggregate function. I'm renaming the monotonicity as set-monotonicity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend adding a note at the beginning of the comment: This is used for a specific (is it BoundedWindowAggExec? )optimization and can be skipped by using the default implementation. This interface seems quite difficult to understand for a general user who only wants to add a simple UDAF
We've tried to provide a good documentation, and the API's itself comes up with a default implementation. If the general users are not interested at these properties, we are not forcing them to be. Do you have further suggestions either for code or documentation level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mertak-synnada -- I like where this is headed. I am not sure about some of the plan changes and I also have some questions about the API
Thanks @2010YOUY01 for the look as well
datafusion/expr/src/udaf.rs
Outdated
/// function is monotonically increasing if its value increases as its argument grows | ||
/// (as a set). Formally, `f` is a monotonically increasing set function if `f(S) >= f(T)` | ||
/// whenever `S` is a superset of `T`. | ||
fn monotonicity(&self, _data_type: &DataType) -> AggregateExprMonotonicity { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to follow the existing model for ScalarUDFs here instead?
Soemthing like this:
pub trait AggregateUDFImpl {
...
/// returns the output order of this aggregate expression given the input properites
fn output_ordering(&self, inputs: &[ExprProperties]) -> Result<SortProperties>;
...
}
@@ -4963,6 +4963,9 @@ false | |||
true | |||
NULL | |||
|
|||
statement ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order that the tests better explain the implications of this change, can you please add a new test rather than updating the existing test (by setting this option).
So that would mean set the flag and run the EXPLAIN again in a separate block
That will let the tests better illustrate any change in behavior
f1777ef
to
1f02953
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some minor comments, almost ready to go
6b90eba
to
1875336
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM and is ready to go from my perspective. @alamb, it'd be great if you can take a look. It doesn't introduce any changes to existing plans/tests unless it is a strict improvement, but I'd still prefer if you could take a final quick look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mertak-synnada and @ozankabak
I think I am missing something here -- the code is very nicely structured and does what the PR says it should do. However, the optimization doesn't seem to compute the same answer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @ozankabak and @mertak-synnada
I am still confused about this PR -- I am sorry I am probably missing something silly
My understanding of this PR
As I understand this PR, it is optimizing queries like
select a, count(b) FROM ... GROUP BY a ORDER BY a, count(b)
By noticing that when
- the input is already sorted by
a
- we use the ordering preserving grouping (
ordering_mode=Sorted
)
This implies that the output is already sorted by a, count(b)
and thus no SortExec
is needed
This makes total sense to me and is a great optimization ✅
My confusion -- doesn't this always hold?
What I don't understand is why this optimization relies on the specific aggregate function used (aka why is AggregateExprSetMonotonicity
needed)?
It seems to me like any query like the following doesn't need an extra sort.
select a, agg(b) FROM ... GROUP BY a ORDER BY a, agg(b)
(where agg(b)
is any aggregate )
My reasoning is that the GROUP BY
ensures that there are no duplicates in the a
column, so by definition the stream is sorted by a, <any other columns>
as we know a
is unique 😕
Thanks for reviewing carefully, as always, much appreciated 🚀
You are right that all queries of this form can be optimized independent of what Now, coming back to the original aim of the PR -- the main intent behind
Does that help? |
@alamb could you take a final look? |
d7e3135
to
5e9b2db
Compare
This now includes the optimization for single-row outputs, windowing operations with set-monotonic functions, and it lays the foundational machinery for more sophisticated optimizations based on expressions involving functions with set-monotonicity properties. I am quite happy with the final state of this PR. Once @alamb confirms there are no concerns left, I will merge. |
I will try and give it a good look later today |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I looked at the plans carefully and they look ok, but I am not sure the tests in
Set-Monotonic Window Aggregate functions can output results in order
Are really testing the monotonic aggregate functions (they seem to be missing an ORDER BY
)
@@ -6203,3 +6203,20 @@ physical_plan | |||
14)--------------PlaceholderRowExec | |||
15)------------ProjectionExec: expr=[1 as id, 2 as foo] | |||
16)--------------PlaceholderRowExec | |||
|
|||
# SortExec is removed if it is coming after one-row producing AggregateExec's having an empty group by expression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
# physical plan should contain SortExec. | ||
# Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required | ||
# global order. The existing sort is for the second-term lexicographical ordering requirement, which is being | ||
# preserved also at lexicographical level during the BoundedWindowAggExec. | ||
query TT | ||
EXPLAIN SELECT c9, sum1 FROM (SELECT c9, | ||
SUM(c9) OVER(ORDER BY c9 DESC) as sum1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see -- the fact that each subsequent value in the window here has additional values added to to it and Sum is increasing means the data is still sorted that way 👍
set datafusion.optimizer.prefer_existing_sort = true; | ||
|
||
query TT | ||
EXPLAIN SELECT c1, SUM(c9) OVER(PARTITION BY c1) as sum_c9 FROM aggregate_test_100_ordered ORDER BY c1, sum_c9; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This query should not depend on the particular aggregate to avoid the sort I don't think (because it only has a PARTITION BY
not an ORDER BY
clause
The plan looks fine to me, but the comments imply this is testing something related to the set monotonic aggregate functions
Likewise for the query below with OVER()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right -- the output of the query repeats the same value for c9
for every c1
group regardless of the particular window/aggregation function, because the frame is the whole table. So we should be able to do this optimization irrespective of set monotonicity. However, we don't just yet (using AVG
instead of SUM
reveals this).
We will fix this with a follow-on PR early next week and move these tests elsewhere with that PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've missed that🤦♂️ We need a frame [unbounded-current row]
to test these monotonic functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm tracking this testing issue, and will fix it
Which issue does this PR close?
Closes #.
Rationale for this change
This PR creates a definition of set-monotonicity for Aggregate expressions. Some aggregation functions create ordered results by definition (such as count, min, max). With this PR, we're adding this information to the output ordering and be able to remove some SortExecs while optimizing
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?