diff --git a/pytensor/tensor/elemwise.py b/pytensor/tensor/elemwise.py index a6a2f2ce4b..15fe218374 100644 --- a/pytensor/tensor/elemwise.py +++ b/pytensor/tensor/elemwise.py @@ -1422,7 +1422,7 @@ def infer_shape(self, fgraph, node, shapes): def _c_all(self, node, name, input_names, output_names, sub): [inp] = node.inputs [out] = node.outputs - ndim = inp.type.ndim + inp_ndim = inp.type.ndim [inp_name] = input_names [out_name] = output_names @@ -1454,10 +1454,10 @@ def _c_all(self, node, name, input_names, output_names, sub): assert var.dtype == node.outputs[0].dtype return var.owner.op._c_all(var.owner, name, input_names, output_names, sub) - inp_dims = list(range(ndim)) + inp_dims = list(range(inp_ndim)) non_reduced_dims = [i for i in inp_dims if i not in axis] - counter = iter(range(ndim)) - acc_dims = ["x" if i in axis else next(counter) for i in range(ndim)] + counter = iter(range(inp_ndim)) + acc_dims = ["x" if i in axis else next(counter) for i in range(inp_ndim)] sub = sub.copy() sub["lv0"] = inp_name @@ -1484,7 +1484,9 @@ def _c_all(self, node, name, input_names, output_names, sub): cgen.make_declare( [acc_dims], [out_dtype], out_sub, compute_stride_jump=False ) - + cgen.make_alloc([non_reduced_dims], out_dtype, sub) + + cgen.make_careduce_alloc( + inp_name, out_name, inp_ndim, axis, out_dtype, sub["fail"] + ) + cgen.make_checks( [acc_dims], [out_dtype], out_sub, compute_stride_jump=False ) @@ -1500,7 +1502,10 @@ def _c_all(self, node, name, input_names, output_names, sub): cgen.make_declare( [acc_dims], [acc_dtype], acc_sub, compute_stride_jump=False ) - + cgen.make_alloc([non_reduced_dims], acc_dtype, sub) + + cgen.make_careduce_alloc( + inp_name, acc_name, inp_ndim, axis, out_dtype, sub["fail"] + ) + + cgen.make_careduce_alloc([non_reduced_dims], acc_dtype, sub) + cgen.make_checks( [acc_dims], [acc_dtype], acc_sub, compute_stride_jump=False ) @@ -1524,8 +1529,6 @@ def _c_all(self, node, name, input_names, output_names, sub): elif identity is None: raise TypeError(f"The {self.scalar_op} does not define an identity.") - initial_value = f"{acc_name}_i = {identity};" - inner_task = self.scalar_op.c_code( Apply( self.scalar_op, @@ -1544,28 +1547,16 @@ def _c_all(self, node, name, input_names, output_names, sub): sub, ) - if out.type.ndim == 0: - # Simple case where everything is reduced, no need for loop ordering - loop = cgen.make_complete_loop_careduce( - inp_var=inp_name, - acc_var=acc_name, - inp_dtype=inp_dtype, - acc_dtype=acc_dtype, - initial_value=initial_value, - inner_task=inner_task, - fail_code=sub["fail"], - ) - else: - loop = cgen.make_reordered_loop_careduce( - inp_var=inp_name, - acc_var=acc_name, - inp_dtype=inp_dtype, - acc_dtype=acc_dtype, - inp_ndim=ndim, - reduction_axes=axis, - initial_value=initial_value, - inner_task=inner_task, - ) + loop = cgen.make_reordered_loop_careduce( + inp_var=inp_name, + acc_var=acc_name, + inp_dtype=inp_dtype, + acc_dtype=acc_dtype, + inp_ndim=inp_ndim, + reduction_axes=axis, + initial_value=identity, + inner_task=inner_task, + ) if acc_dtype != out_dtype: cast = dedent( @@ -1589,7 +1580,7 @@ def c_headers(self, **kwargs): def c_code_cache_version_apply(self, node): # the version corresponding to the c code in this Op - version = [10] + version = [11] # now we insert versions for the ops on which we depend... scalar_node = Apply( diff --git a/pytensor/tensor/elemwise_cgen.py b/pytensor/tensor/elemwise_cgen.py index 5d50f02ad5..5b79e87dd3 100644 --- a/pytensor/tensor/elemwise_cgen.py +++ b/pytensor/tensor/elemwise_cgen.py @@ -1,6 +1,8 @@ from collections.abc import Sequence from textwrap import dedent, indent +import numpy as np + from pytensor.configdefaults import config @@ -191,7 +193,7 @@ def make_alloc(loop_orders, dtype, sub, fortran="0"): PyArray_Dims new_dims; new_dims.len = {nd}; new_dims.ptr = dims; - PyObject* success = PyArray_Resize({olv}, &new_dims, 0, NPY_CORDER); + PyObject* success = PyArray_Resize({olv}, &new_dims, 0, NPY_KEEPORDER); if (!success) {{ // If we can't resize the ndarray we have we can allocate a new one. PyErr_Clear(); @@ -450,129 +452,117 @@ def get_loop_strides(loop_order, i): return f"{{\n{code}\n}}\n" -################## -# DimShuffle # -################## +def make_careduce_alloc( + inp_name: str, + out_name: str, + inp_ndim: int, + reduced_axes: Sequence[int], + dtype: str, + fail: str, +) -> str: + """Generate C code to allocate outputs for a Careduce operation. -################# -# Broadcast # -################# + We create an output that is aligned with the strides of the input variable + to optimize memor access -################ -# CAReduce # -################ + """ + elemsize = np.dtype(dtype.lower().removeprefix("npy_")).itemsize + type = dtype.upper() + if type.startswith("PYTENSOR_COMPLEX"): + type = type.replace("PYTENSOR_COMPLEX", "NPY_COMPLEX") + out_ndim = inp_ndim - len(reduced_axes) + out_dims_values = ",".join( + f"PyArray_DIMS({inp_name})[{i}]" + for i in range(inp_ndim) + if i not in reduced_axes + ) + declare_out_dims = f"npy_intp out_dims[{out_ndim}] = {{ {out_dims_values} }};" -def make_complete_loop_careduce( - inp_var: str, - acc_var: str, - inp_dtype: str, - acc_dtype: str, - initial_value: str, - inner_task: str, - fail_code, -) -> str: - """Generate C code for a complete reduction loop. + declare_unsorted_out_strides = f""" +npy_intp out_strides[{out_ndim}]; +npy_intp total_size = {elemsize}; +""" - The generated code for a float64 input variable `inp` and accumulation variable `acc` looks like: + for i in range(out_ndim): + declare_unsorted_out_strides += f""" +out_strides[{i}] = total_size; +total_size *= out_dims[{i}]; +""" - .. code-block:: C - { - NpyIter* iter; - NpyIter_IterNextFunc *iternext; - char** data_ptr; - npy_intp* stride_ptr,* innersize_ptr; + # Fill the loop vector with the appropriate pairs + populate_order_strides = "" + i = 0 + for j in range(inp_ndim): + if j in reduced_axes: + continue + populate_order_strides += ( + f"strides_it->first = abs(PyArray_STRIDES({inp_name})[{j}]);\n" + ) + populate_order_strides += f"strides_it->second = out_strides[{i}];\n" + populate_order_strides += "++strides_it;\n" + i += 1 - // Special case for empty inputs - if (PyArray_SIZE(inp) == 0) { - npy_float64 acc_i = *(npy_float64*)(PyArray_DATA(acc)); - acc_i = 0; - }else{ - iter = NpyIter_New(inp, - NPY_ITER_READONLY| NPY_ITER_EXTERNAL_LOOP| NPY_ITER_REFS_OK, - NPY_KEEPORDER, - NPY_NO_CASTING, - NULL); - iternext = NpyIter_GetIterNext(iter, NULL); - if (iternext == NULL) { - NpyIter_Deallocate(iter); - { fail } - } - data_ptr = NpyIter_GetDataPtrArray(iter); - stride_ptr = NpyIter_GetInnerStrideArray(iter); - innersize_ptr = NpyIter_GetInnerLoopSizePtr(iter); - - npy_float64 acc_i; - acc_i = 0; - do { - char* data = *data_ptr; - npy_intp stride = *stride_ptr; - npy_intp count = *innersize_ptr; - - while(count--) { - npy_float64 inp_i = *((npy_float64*)data); - acc_i = acc_i + inp_i; - data += stride; - } + sort_strides = dedent( + f""" + std::vector< std::pair > strides({out_ndim}); + std::vector< std::pair >::iterator strides_it = strides.begin(); + {populate_order_strides} + std::sort(strides.begin(), strides.end()); + """ + ) - } while(iternext(iter)); - NpyIter_Deallocate(iter); + reassign_sorted_strides = "" + for i in range(out_ndim): + reassign_sorted_strides += f"out_strides[{i}] = strides[{i}].second;\n" - *(npy_float64*)(PyArray_DATA(acc)) = acc_i; - } - } - """ - return dedent( + code = dedent( f""" {{ - NpyIter* iter; - NpyIter_IterNextFunc *iternext; - char** data_ptr; - npy_intp* stride_ptr,* innersize_ptr; + {declare_out_dims} - // Special case for empty inputs - if (PyArray_SIZE({inp_var}) == 0) {{ - {acc_dtype} &{acc_var}_i = *({acc_dtype}*)(PyArray_DATA({acc_var})); - {initial_value} - }}else{{ - iter = NpyIter_New({inp_var}, - NPY_ITER_READONLY| NPY_ITER_EXTERNAL_LOOP| NPY_ITER_REFS_OK, - NPY_KEEPORDER, - NPY_NO_CASTING, - NULL); - - iternext = NpyIter_GetIterNext(iter, NULL); - if (iternext == NULL) {{ - NpyIter_Deallocate(iter); - {fail_code} + if ({out_name}) {{ + PyArray_Dims new_dims; + new_dims.len = {out_ndim}; + new_dims.ptr = out_dims; + PyObject* success = PyArray_Resize({out_name}, &new_dims, 0, NPY_KEEPORDER); + if (!success) {{ + // If we can't resize the ndarray we have we can allocate a new one. + PyErr_Clear(); + Py_XDECREF({out_name}); + {out_name} = NULL; + }} else {{ + Py_DECREF(success); }} + }} - data_ptr = NpyIter_GetDataPtrArray(iter); - stride_ptr = NpyIter_GetInnerStrideArray(iter); - innersize_ptr = NpyIter_GetInnerLoopSizePtr(iter); - - {acc_dtype} {acc_var}_i; - {initial_value} - - do {{ - char* data = *data_ptr; - npy_intp stride = *stride_ptr; - npy_intp count = *innersize_ptr; - - while(count--) {{ - {inp_dtype} {inp_var}_i = *(({inp_dtype}*)data); - {inner_task} - data += stride; - }} - }} while(iternext(iter)); - - NpyIter_Deallocate(iter); - *({acc_dtype}*)(PyArray_DATA({acc_var})) = {acc_var}_i; + if (!{out_name}) {{ + {declare_unsorted_out_strides} + {sort_strides} + {reassign_sorted_strides} + + {out_name} = (PyArrayObject*)PyArray_NewFromDescr( + &PyArray_Type, + PyArray_DescrFromType({type}), + {out_ndim}, + out_dims, + out_strides, + NULL, + NPY_ARRAY_WRITEABLE, + NULL + ); + if ({out_name}) {{ + PyArray_UpdateFlags({out_name}, NPY_ARRAY_UPDATE_ALL); + }} + }} + if (!{out_name}) {{ + {fail} }} }} """ ) + return code def make_reordered_loop_careduce( @@ -649,18 +639,15 @@ def make_reordered_loop_careduce( """ + empty_condition = f"PyArray_SIZE({inp_var}) == 0" empty_case = dedent( f""" - // Special case for empty inputs - if (PyArray_SIZE({inp_var}) == 0) {{ - {acc_var}_iter = ({acc_dtype}*)(PyArray_DATA({acc_var})); - int n = PyArray_SIZE({acc_var}); - for(int i = 0; i < n; i++) - {{ - {acc_dtype} &{acc_var}_i = {acc_var}_iter[i]; - {initial_value} - }} - }} else {{ + {acc_var}_iter = ({acc_dtype}*)(PyArray_DATA({acc_var})); + int n = PyArray_SIZE({acc_var}); + for(int i = 0; i < n; i++) + {{ + {acc_var}_iter[i] = {initial_value}; + }} """ ) @@ -691,22 +678,21 @@ def make_reordered_loop_careduce( counter = iter(range(inp_ndim)) unsorted_vars = dedent( f""" - int dim_lengths[{inp_ndim}] = {{{','.join(f'{inp_var}_n{i}' for i in range(inp_ndim))}}}; - int inp_strides[{inp_ndim}] = {{{','.join(f'{inp_var}_stride{i}' for i in range(inp_ndim))}}}; - int acc_strides[{inp_ndim}] = {{{','.join("0" if i in reduction_axes else f'{acc_var}_stride{next(counter)}'for i in range(inp_ndim))}}}; + npy_intp dim_lengths[{inp_ndim}] = {{{','.join(f'{inp_var}_n{i}' for i in range(inp_ndim))}}}; + npy_intp inp_strides[{inp_ndim}] = {{{','.join(f'{inp_var}_stride{i}' for i in range(inp_ndim))}}}; + npy_intp acc_strides[{inp_ndim}] = {{{','.join("0" if i in reduction_axes else f'{acc_var}_stride{next(counter)}'for i in range(inp_ndim))}}}; bool reduction_axes[{inp_ndim}] = {{{', '.join("1" if i in reduction_axes else "0" for i in range(inp_ndim))}}};\n """ ) - sorted_vars = "loops_it = loops.begin();" + sorted_vars = "" for i in range(inp_ndim): sorted_vars += dedent( f""" - int dim_length_{i} = dim_lengths[loops_it->second]; - int is_reduction_axis_{i} = reduction_axes[loops_it->second]; - int {inp_var}_stride_{i} = inp_strides[loops_it->second]; - int {acc_var}_stride_{i} = acc_strides[loops_it->second]; - ++loops_it; + npy_intp dim_length_{i} = dim_lengths[loops[{i}].second]; + bool is_reduction_axis_{i} = reduction_axes[loops[{i}].second]; + npy_intp {inp_var}_stride_{i} = inp_strides[loops[{i}].second]; + npy_intp {acc_var}_stride_{i} = acc_strides[loops[{i}].second]; """ ) @@ -717,45 +703,124 @@ def make_reordered_loop_careduce( """ ) - pointer_update = "" - for var, dtype in ((inp_var, inp_dtype), (acc_var, acc_dtype)): - pointer_update += f"{dtype} &{var}_i = *({var}_iter" - for i in reversed(tuple(range(inp_ndim))): - iter_var = f"iter_{i}" - pointer_update += f" + {var}_stride_{i}*{iter_var}" - pointer_update += ");\n" + non_empty_body = "\n".join((order_loops, unsorted_vars, sorted_vars, declare_iter)) - # Set initial value in first iteration of each output - # This happens on the first iteration of every reduction axis - initial_iteration = " && ".join( - f"(!is_reduction_axis_{i} || iter_{i} == 0)" for i in range(inp_ndim) - ) - set_initial_value = dedent( - f""" - if({initial_iteration}) - {{ - {initial_value} - }} - """ + reduction_along_contiguous_dim = ( + f"({acc_var}_stride_{inp_ndim-1} == 0) && ({inp_var}_stride_{inp_ndim-1} == 1)" ) - # We set do pointer_update, initial_value and inner task in inner loop - loop = "\n\n".join((pointer_update, set_initial_value, f"{{{inner_task}}}")) + # Inner loop where accumulation is done on temporary values + # This is used for the case where a contiguous dimension is being reduced + temporary_accumulation_loop = None + for i in reversed(range(inp_ndim)): + iter_var = f"iter_{i}" + dim_length = f"dim_length_{i}" + if i == inp_ndim - 1: + # Innermost loop + if inp_ndim == 1: + # There is only one axis + is_initial_iteration = "1" + else: + is_initial_iteration = " && ".join( + f"(!is_reduction_axis_{j} || iter_{j} == 0)" + for j in range(inp_ndim - 1) + ) + # Code where we write to an intermediate variable in the innermost loop + inp_pointer_update = f"{inp_dtype} &{inp_var}_i = *({inp_var}_iter" + for j in reversed(tuple(range(inp_ndim))): + inp_pointer_update += f" + {inp_var}_stride_{j}*iter_{j}" + inp_pointer_update += ");" + + acc_pointer = f"*({acc_var}_iter" + for j in reversed(tuple(range(inp_ndim - 1))): + acc_pointer += f" + {acc_var}_stride_{j}*iter_{j}" + acc_pointer += ")" + + temporary_accumulation_loop = dedent( + f""" + if ({is_initial_iteration}){{ + {acc_pointer} = {initial_value}; + }} + {acc_dtype} {acc_var}_i = {initial_value}; + for (int {iter_var} = 0; {iter_var}<{dim_length}; {iter_var}++){{ + {inp_pointer_update} + {{ + {inner_task} + }} + }} + {acc_pointer} += {acc_var}_i; + """ + ) + else: + temporary_accumulation_loop = dedent( + f""" + for(int {iter_var} = 0; {iter_var}<{dim_length}; {iter_var}++){{ + {temporary_accumulation_loop} + }} + """ + ) - # Create outer loops recursively + # Code where the innermost loop writes directly to the output buffer + # Chosen either because the fastest changing dimension isn't being reduced + # Or if it is, it is not contiguous. # TODO: Check this last check is actually helping with SIMD? + direct_accumulation_loop = None for i in reversed(range(inp_ndim)): iter_var = f"iter_{i}" dim_length = f"dim_length_{i}" - loop = dedent( - f""" - for(int {iter_var} = 0; {iter_var}<{dim_length}; {iter_var}++){{ - {loop} - }} - """ - ) + if i == inp_ndim - 1: + # Innermost loop + + # Define expressions that update the pointers based on loop iteration + pointer_update = "" + for var, dtype in ((inp_var, inp_dtype), (acc_var, acc_dtype)): + pointer_update += f"{dtype} &{var}_i = *({var}_iter" + for j in reversed(tuple(range(inp_ndim))): + pointer_update += f" + {var}_stride_{j}*iter_{j}" + pointer_update += ");\n" + + # Set initial value in first iteration of each output + # This happens on the first iteration of every reduction axis + is_initial_iteration = " && ".join( + f"(!is_reduction_axis_{j} || iter_{j} == 0)" for j in range(inp_ndim) + ) - non_empty_case = "\n".join( - (order_loops, unsorted_vars, sorted_vars, declare_iter, loop) - ) - code = "\n".join((empty_case, non_empty_case, "}")) - return f"{{\n{code}\n}}\n" + direct_accumulation_loop = dedent( + f""" + for(int {iter_var} = 0; {iter_var}<{dim_length}; {iter_var}++){{ + {pointer_update} + if({is_initial_iteration}){{ + {acc_var}_i = {initial_value}; + }} + {{ + {inner_task} + }} + }} + """ + ) + else: + direct_accumulation_loop = dedent( + f""" + for(int {iter_var} = 0; {iter_var}<{dim_length}; {iter_var}++){{ + {direct_accumulation_loop} + }} + """ + ) + + code = f""" + if ({empty_condition}) {{ + // Empty case + {empty_case} + }} else {{ + {non_empty_body} + if ({reduction_along_contiguous_dim}) {{ + // Temporary accumulation loop + // SIMD friendly + {temporary_accumulation_loop} + }} else {{ + // Direct accumulation loop + {direct_accumulation_loop} + }} + }} + """ + + return code diff --git a/tests/tensor/test_elemwise.py b/tests/tensor/test_elemwise.py index e89a70d0f1..e49ab39938 100644 --- a/tests/tensor/test_elemwise.py +++ b/tests/tensor/test_elemwise.py @@ -1045,7 +1045,7 @@ def careduce_benchmark_tester(axis, c_contiguous, mode, benchmark): x = pytensor.shared(x_test, name="x", shape=x_test.shape) out = x.transpose(transpose_axis).sum(axis=axis) - fn = pytensor.function([], out, mode=mode) + fn = pytensor.function([], out, mode=mode, trust_input=True) np.testing.assert_allclose( fn(), @@ -1064,6 +1064,9 @@ def careduce_benchmark_tester(axis, c_contiguous, mode, benchmark): (True, False), ids=lambda x: f"c_contiguous={x}", ) +@pytensor.config.change_flags( + gcc__cxxflags="-freciprocal-math -ffp-contract=fast -funsafe-math-optimizations -fassociative-math -fno-signed-zeros -ftree-loop-distribution -funroll-loops -ftracer" +) def test_c_careduce_benchmark(axis, c_contiguous, benchmark): return careduce_benchmark_tester( axis, c_contiguous, mode="FAST_RUN", benchmark=benchmark