Skip to content

Commit 1875336

Browse files
committed
add slt
1 parent 16bdac4 commit 1875336

File tree

5 files changed

+184
-4
lines changed

5 files changed

+184
-4
lines changed

datafusion/physical-expr/src/aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::expressions::Column;
4242

4343
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
4444
use arrow_schema::SortOptions;
45-
use datafusion_common::{internal_err, not_impl_err, ScalarValue, Result};
45+
use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
4646
use datafusion_expr::{AggregateExprSetMonotonicity, AggregateUDF, ReversedUDAF};
4747
use datafusion_expr_common::accumulator::Accumulator;
4848
use datafusion_expr_common::groups_accumulator::GroupsAccumulator;

datafusion/physical-expr/src/window/aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::{reverse_order_bys, EquivalenceProperties, PhysicalExpr};
3232
use arrow::array::Array;
3333
use arrow::record_batch::RecordBatch;
3434
use arrow::{array::ArrayRef, datatypes::Field};
35-
use datafusion_common::{DataFusionError, ScalarValue, Result};
35+
use datafusion_common::{DataFusionError, Result, ScalarValue};
3636
use datafusion_expr::{Accumulator, WindowFrame};
3737
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3838

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ impl AggregateExec {
512512
&group_expr_mapping,
513513
&mode,
514514
&input_order_mode,
515-
aggr_expr.clone(),
515+
aggr_expr.as_slice(),
516516
);
517517

518518
Ok(AggregateExec {
@@ -649,7 +649,7 @@ impl AggregateExec {
649649
group_expr_mapping: &ProjectionMapping,
650650
mode: &AggregateMode,
651651
input_order_mode: &InputOrderMode,
652-
aggr_exprs: Vec<Arc<AggregateFunctionExpr>>,
652+
aggr_exprs: &[Arc<AggregateFunctionExpr>],
653653
) -> PlanProperties {
654654
// Construct equivalence properties:
655655
let mut eq_properties = input

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6203,3 +6203,97 @@ physical_plan
62036203
14)--------------PlaceholderRowExec
62046204
15)------------ProjectionExec: expr=[1 as id, 2 as foo]
62056205
16)--------------PlaceholderRowExec
6206+
6207+
6208+
# Set-Monotonic Aggregate functions can output results in order
6209+
statement ok
6210+
CREATE EXTERNAL TABLE aggregate_test_100_ordered (
6211+
c1 VARCHAR NOT NULL,
6212+
c2 TINYINT NOT NULL,
6213+
c3 SMALLINT NOT NULL,
6214+
c4 SMALLINT,
6215+
c5 INT,
6216+
c6 BIGINT NOT NULL,
6217+
c7 SMALLINT NOT NULL,
6218+
c8 INT NOT NULL,
6219+
c9 INT UNSIGNED NOT NULL,
6220+
c10 BIGINT UNSIGNED NOT NULL,
6221+
c11 FLOAT NOT NULL,
6222+
c12 DOUBLE NOT NULL,
6223+
c13 VARCHAR NOT NULL
6224+
)
6225+
STORED AS CSV
6226+
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
6227+
WITH ORDER (c1)
6228+
OPTIONS ('format.has_header' 'true');
6229+
6230+
statement ok
6231+
set datafusion.optimizer.prefer_existing_sort = true;
6232+
6233+
query TT
6234+
EXPLAIN SELECT c1, SUM(c9) as sum_c9 FROM aggregate_test_100_ordered GROUP BY c1 ORDER BY c1, sum_c9;
6235+
----
6236+
logical_plan
6237+
01)Sort: aggregate_test_100_ordered.c1 ASC NULLS LAST, sum_c9 ASC NULLS LAST
6238+
02)--Projection: aggregate_test_100_ordered.c1, sum(aggregate_test_100_ordered.c9) AS sum_c9
6239+
03)----Aggregate: groupBy=[[aggregate_test_100_ordered.c1]], aggr=[[sum(CAST(aggregate_test_100_ordered.c9 AS UInt64))]]
6240+
04)------TableScan: aggregate_test_100_ordered projection=[c1, c9]
6241+
physical_plan
6242+
01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST, sum_c9@1 ASC NULLS LAST]
6243+
02)--ProjectionExec: expr=[c1@0 as c1, sum(aggregate_test_100_ordered.c9)@1 as sum_c9]
6244+
03)----AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(aggregate_test_100_ordered.c9)], ordering_mode=Sorted
6245+
04)------CoalesceBatchesExec: target_batch_size=8192
6246+
05)--------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4, preserve_order=true, sort_exprs=c1@0 ASC NULLS LAST, sum(aggregate_test_100_ordered.c9)@1 ASC NULLS LAST
6247+
06)----------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[sum(aggregate_test_100_ordered.c9)], ordering_mode=Sorted
6248+
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6249+
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
6250+
6251+
query TT
6252+
EXPLAIN SELECT SUM(c9) as sum_c9 FROM aggregate_test_100_ordered ORDER BY sum_c9;
6253+
----
6254+
logical_plan
6255+
01)Sort: sum_c9 ASC NULLS LAST
6256+
02)--Projection: sum(aggregate_test_100_ordered.c9) AS sum_c9
6257+
03)----Aggregate: groupBy=[[]], aggr=[[sum(CAST(aggregate_test_100_ordered.c9 AS UInt64))]]
6258+
04)------TableScan: aggregate_test_100_ordered projection=[c9]
6259+
physical_plan
6260+
01)ProjectionExec: expr=[sum(aggregate_test_100_ordered.c9)@0 as sum_c9]
6261+
02)--AggregateExec: mode=Final, gby=[], aggr=[sum(aggregate_test_100_ordered.c9)]
6262+
03)----CoalescePartitionsExec
6263+
04)------AggregateExec: mode=Partial, gby=[], aggr=[sum(aggregate_test_100_ordered.c9)]
6264+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6265+
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
6266+
6267+
query TT
6268+
EXPLAIN SELECT c1, MIN(c5) as min_c5 FROM aggregate_test_100_ordered GROUP BY c1 ORDER BY c1, min_c5 DESC NULLS LAST;
6269+
----
6270+
logical_plan
6271+
01)Sort: aggregate_test_100_ordered.c1 ASC NULLS LAST, min_c5 DESC NULLS LAST
6272+
02)--Projection: aggregate_test_100_ordered.c1, min(aggregate_test_100_ordered.c5) AS min_c5
6273+
03)----Aggregate: groupBy=[[aggregate_test_100_ordered.c1]], aggr=[[min(aggregate_test_100_ordered.c5)]]
6274+
04)------TableScan: aggregate_test_100_ordered projection=[c1, c5]
6275+
physical_plan
6276+
01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST, min_c5@1 DESC NULLS LAST]
6277+
02)--ProjectionExec: expr=[c1@0 as c1, min(aggregate_test_100_ordered.c5)@1 as min_c5]
6278+
03)----AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[min(aggregate_test_100_ordered.c5)], ordering_mode=Sorted
6279+
04)------CoalesceBatchesExec: target_batch_size=8192
6280+
05)--------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4, preserve_order=true, sort_exprs=c1@0 ASC NULLS LAST, min(aggregate_test_100_ordered.c5)@1 DESC NULLS LAST
6281+
06)----------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[min(aggregate_test_100_ordered.c5)], ordering_mode=Sorted
6282+
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6283+
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c5], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
6284+
6285+
query TT
6286+
EXPLAIN SELECT MAX(c5) as max_c5 FROM aggregate_test_100_ordered ORDER BY max_c5;
6287+
----
6288+
logical_plan
6289+
01)Sort: max_c5 ASC NULLS LAST
6290+
02)--Projection: max(aggregate_test_100_ordered.c5) AS max_c5
6291+
03)----Aggregate: groupBy=[[]], aggr=[[max(aggregate_test_100_ordered.c5)]]
6292+
04)------TableScan: aggregate_test_100_ordered projection=[c5]
6293+
physical_plan
6294+
01)ProjectionExec: expr=[max(aggregate_test_100_ordered.c5)@0 as max_c5]
6295+
02)--AggregateExec: mode=Final, gby=[], aggr=[max(aggregate_test_100_ordered.c5)]
6296+
03)----CoalescePartitionsExec
6297+
04)------AggregateExec: mode=Partial, gby=[], aggr=[max(aggregate_test_100_ordered.c5)]
6298+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6299+
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5], has_header=true

datafusion/sqllogictest/test_files/window.slt

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5452,3 +5452,89 @@ order by c1, c2, rank1, rank2;
54525452

54535453
statement ok
54545454
drop table t1;
5455+
5456+
5457+
# Set-Monotonic Window Aggregate functions can output results in order
5458+
statement ok
5459+
CREATE EXTERNAL TABLE aggregate_test_100_ordered (
5460+
c1 VARCHAR NOT NULL,
5461+
c2 TINYINT NOT NULL,
5462+
c3 SMALLINT NOT NULL,
5463+
c4 SMALLINT,
5464+
c5 INT,
5465+
c6 BIGINT NOT NULL,
5466+
c7 SMALLINT NOT NULL,
5467+
c8 INT NOT NULL,
5468+
c9 INT UNSIGNED NOT NULL,
5469+
c10 BIGINT UNSIGNED NOT NULL,
5470+
c11 FLOAT NOT NULL,
5471+
c12 DOUBLE NOT NULL,
5472+
c13 VARCHAR NOT NULL
5473+
)
5474+
STORED AS CSV
5475+
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
5476+
WITH ORDER (c1)
5477+
OPTIONS ('format.has_header' 'true');
5478+
5479+
statement ok
5480+
set datafusion.optimizer.prefer_existing_sort = true;
5481+
5482+
query TT
5483+
EXPLAIN SELECT c1, SUM(c9) OVER(PARTITION BY c1) as sum_c9 FROM aggregate_test_100_ordered ORDER BY c1, sum_c9;
5484+
----
5485+
logical_plan
5486+
01)Sort: aggregate_test_100_ordered.c1 ASC NULLS LAST, sum_c9 ASC NULLS LAST
5487+
02)--Projection: aggregate_test_100_ordered.c1, sum(aggregate_test_100_ordered.c9) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_c9
5488+
03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100_ordered.c9 AS UInt64)) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
5489+
04)------TableScan: aggregate_test_100_ordered projection=[c1, c9]
5490+
physical_plan
5491+
01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST, sum_c9@1 ASC NULLS LAST]
5492+
02)--ProjectionExec: expr=[c1@0 as c1, sum(aggregate_test_100_ordered.c9) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@2 as sum_c9]
5493+
03)----WindowAggExec: wdw=[sum(aggregate_test_100_ordered.c9) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(aggregate_test_100_ordered.c9) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]
5494+
04)------CoalesceBatchesExec: target_batch_size=1
5495+
05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2, preserve_order=true, sort_exprs=c1@0 ASC NULLS LAST
5496+
06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
5497+
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
5498+
5499+
query TT
5500+
EXPLAIN SELECT SUM(c9) OVER() as sum_c9 FROM aggregate_test_100_ordered ORDER BY sum_c9;
5501+
----
5502+
logical_plan
5503+
01)Sort: sum_c9 ASC NULLS LAST
5504+
02)--Projection: sum(aggregate_test_100_ordered.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_c9
5505+
03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100_ordered.c9 AS UInt64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
5506+
04)------TableScan: aggregate_test_100_ordered projection=[c9]
5507+
physical_plan
5508+
01)ProjectionExec: expr=[sum(aggregate_test_100_ordered.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as sum_c9]
5509+
02)--WindowAggExec: wdw=[sum(aggregate_test_100_ordered.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(aggregate_test_100_ordered.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]
5510+
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
5511+
5512+
query TT
5513+
EXPLAIN SELECT c1, MIN(c5) OVER(PARTITION BY c1) as min_c5 FROM aggregate_test_100_ordered ORDER BY c1, min_c5 DESC NULLS LAST;
5514+
----
5515+
logical_plan
5516+
01)Sort: aggregate_test_100_ordered.c1 ASC NULLS LAST, min_c5 DESC NULLS LAST
5517+
02)--Projection: aggregate_test_100_ordered.c1, min(aggregate_test_100_ordered.c5) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS min_c5
5518+
03)----WindowAggr: windowExpr=[[min(aggregate_test_100_ordered.c5) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
5519+
04)------TableScan: aggregate_test_100_ordered projection=[c1, c5]
5520+
physical_plan
5521+
01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST, min_c5@1 DESC NULLS LAST]
5522+
02)--ProjectionExec: expr=[c1@0 as c1, min(aggregate_test_100_ordered.c5) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@2 as min_c5]
5523+
03)----WindowAggExec: wdw=[min(aggregate_test_100_ordered.c5) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "min(aggregate_test_100_ordered.c5) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]
5524+
04)------CoalesceBatchesExec: target_batch_size=1
5525+
05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2, preserve_order=true, sort_exprs=c1@0 ASC NULLS LAST
5526+
06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
5527+
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c5], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
5528+
5529+
query TT
5530+
EXPLAIN SELECT MAX(c5) OVER() as max_c5 FROM aggregate_test_100_ordered ORDER BY max_c5;
5531+
----
5532+
logical_plan
5533+
01)Sort: max_c5 ASC NULLS LAST
5534+
02)--Projection: max(aggregate_test_100_ordered.c5) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS max_c5
5535+
03)----WindowAggr: windowExpr=[[max(aggregate_test_100_ordered.c5) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
5536+
04)------TableScan: aggregate_test_100_ordered projection=[c5]
5537+
physical_plan
5538+
01)ProjectionExec: expr=[max(aggregate_test_100_ordered.c5) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as max_c5]
5539+
02)--WindowAggExec: wdw=[max(aggregate_test_100_ordered.c5) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "max(aggregate_test_100_ordered.c5) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]
5540+
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5], has_header=true

0 commit comments

Comments
 (0)