Skip to content

Error when using .apply_ufunc with .groupby_bins #1765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jbusecke opened this issue Dec 6, 2017 · 7 comments
Closed

Error when using .apply_ufunc with .groupby_bins #1765

jbusecke opened this issue Dec 6, 2017 · 7 comments

Comments

@jbusecke
Copy link
Contributor

jbusecke commented Dec 6, 2017

I am trying to create a function that applies a .groupby_bins operation over specified dimensions of a xarray dataset. E.g. I want to be able to sum temperture, salinity and other values grouped by oceanic oxygen concentrations.
I want to be able to be flexible over which dimensions I apply the groupby_bins operation. For instance, I would like to apply it in every depth colum (resulting in an array of (x,y,time) but also over all spatial dimensions, resulting in a timeseries.
I currently run into a strange error when I try the following.

Code Sample, a copy-pastable example if possible

import xarray as xr
import numpy as np
import dask.array as dsa
from dask.diagnostics import ProgressBar
def _func(data, bin_data, bins):
    """Group unlabeled array 'data' according to values in 'bin_data' using 
    bins defined in 'bins' and sum all values"""
    labels = bins[1:]
    da_data = xr.DataArray(data, name='data')
    da_bin_data = xr.DataArray(bin_data, name='bin_data')
    
    binned = da_data.groupby_bins(da_bin_data, bins, labels=labels,
                              include_lowest=True).sum()
    return binned

def wrapper(obj, bin_obj, bins, dims):
        obj = obj.copy()
        bin_obj = bin_obj.copy()
        n_bins = len(bins)-1
        
        binned = xr.apply_ufunc(_func, obj, bin_obj, bins,
                            vectorize=True,
                            input_core_dims=[dims, dims, ['dummy']],
                            output_core_dims=[['bin_data_bins']],
                            output_dtypes=[np.float],
                            output_sizes={'bin_data_bins':n_bins},
                            dask='parallelized')
        return binned

I am showing the problem here on a sythetic example, since my current working dataset is quite big.
The problem is the exact same.

# Groupby bins problem with small bins?
x_raw = np.arange(20)
y_raw = np.arange(10)
z_raw = np.arange(15)

x = xr.DataArray(dsa.from_array(x_raw, chunks=(-1)), dims=['x'], coords={'x':('x', x_raw)})
y = xr.DataArray(dsa.from_array(y_raw, chunks=(-1)), dims=['y'], coords={'y':('y', y_raw)})
z = xr.DataArray(dsa.from_array(z_raw, chunks=(-1)), dims=['z'], coords={'z':('z', z_raw)})

da = xr.DataArray(dsa.ones([20, 10, 15], chunks=[-1, -1, -1]), dims=['x', 'y', 'z'], coords={
    'x':x, 'y':y, 'z':z
})
da
<xarray.DataArray 'wrapped-bb05d395159047b749ca855110244cb7' (x: 20, y: 10, z: 15)>
dask.array<shape=(20, 10, 15), dtype=float64, chunksize=(20, 10, 15)>
Coordinates:
  * x        (x) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
  * y        (y) int64 0 1 2 3 4 5 6 7 8 9
  * z        (z) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14

Now I define some bins and apply the private _func on the DataArray. This works as expected. Note that the array just contains ones, hence we see 3000 in the first bin.

bins = np.arange(0,30,1)

# apply private function on unlabled array
binned_data = _func(da.data, da.data, bins)
print(binned_data.compute())
<xarray.DataArray 'data' (bin_data_bins: 29)>
array([ 3000.,    nan,    nan,    nan,    nan,    nan,    nan,    nan,    nan,
          nan,    nan,    nan,    nan,    nan,    nan,    nan,    nan,    nan,
          nan,    nan,    nan,    nan,    nan,    nan,    nan,    nan,    nan,
          nan,    nan])
Coordinates:
  * bin_data_bins  (bin_data_bins) int64 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ...

This would e.g. be an operation on a single time step. But when I now try to apply the function over the full array (core dimensions are set to all available dimensions).. I am getting a very strange error

binned_full = wrapper(da, da, bins, dims=['x','y','z'])
print(binned_full.data.compute())
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-13-05a5150badbf> in <module>()
      1 binned_full = wrapper(da, da, bins, dims=['x','y','z'])
----> 2 print(binned_full.data.compute())

~/miniconda/envs/standard/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    133         dask.base.compute
    134         """
--> 135         (result,) = compute(self, traverse=False, **kwargs)
    136         return result
    137 

~/miniconda/envs/standard/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    331     postcomputes = [a.__dask_postcompute__() if is_dask_collection(a)
    332                     else (None, a) for a in args]
--> 333     results = get(dsk, keys, **kwargs)
    334     results_iter = iter(results)
    335     return tuple(a if f is None else f(next(results_iter), *a)

~/miniconda/envs/standard/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     73     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     74                         cache=cache, get_id=_thread_get_id,
---> 75                         pack_exception=pack_exception, **kwargs)
     76 
     77     # Cleanup pools associated to dead threads

~/miniconda/envs/standard/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    519                         _execute_task(task, data)  # Re-execute locally
    520                     else:
--> 521                         raise_exception(exc, tb)
    522                 res, worker_id = loads(res_info)
    523                 state['cache'][key] = res

~/miniconda/envs/standard/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
     58         if exc.__traceback__ is not tb:
     59             raise exc.with_traceback(tb)
---> 60         raise exc
     61 
     62 else:

~/miniconda/envs/standard/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    288     try:
    289         task, data = loads(task_info)
--> 290         result = _execute_task(task, data)
    291         id = get_id()
    292         result = dumps((result, id))

~/miniconda/envs/standard/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    269         func, args = arg[0], arg[1:]
    270         args2 = [_execute_task(a, cache) for a in args]
--> 271         return func(*args2)
    272     elif not ishashable(arg):
    273         return arg

~/miniconda/envs/standard/lib/python3.6/site-packages/numpy/lib/function_base.py in __call__(self, *args, **kwargs)
   2737             vargs.extend([kwargs[_n] for _n in names])
   2738 
-> 2739         return self._vectorize_call(func=func, args=vargs)
   2740 
   2741     def _get_ufunc_and_otypes(self, func, args):

~/miniconda/envs/standard/lib/python3.6/site-packages/numpy/lib/function_base.py in _vectorize_call(self, func, args)
   2803         """Vectorized call to `func` over positional `args`."""
   2804         if self.signature is not None:
-> 2805             res = self._vectorize_call_with_signature(func, args)
   2806         elif not args:
   2807             res = func()

~/miniconda/envs/standard/lib/python3.6/site-packages/numpy/lib/function_base.py in _vectorize_call_with_signature(self, func, args)
   2844 
   2845         for index in np.ndindex(*broadcast_shape):
-> 2846             results = func(*(arg[index] for arg in args))
   2847 
   2848             n_results = len(results) if isinstance(results, tuple) else 1

<ipython-input-5-6d60d3b0b704> in _func(data, bin_data, bins)
      7 
      8     binned = da_data.groupby_bins(da_bin_data, bins, labels=labels,
----> 9                               include_lowest=True).sum()
     10     return binned
     11 

~/Work/CODE/PYTHON/xarray/xarray/core/common.py in groupby_bins(self, group, bins, right, labels, precision, include_lowest, squeeze)
    466                                  cut_kwargs={'right': right, 'labels': labels,
    467                                              'precision': precision,
--> 468                                              'include_lowest': include_lowest})
    469 
    470     def rolling(self, min_periods=None, center=False, **windows):

~/Work/CODE/PYTHON/xarray/xarray/core/groupby.py in __init__(self, obj, group, squeeze, grouper, bins, cut_kwargs)
    225 
    226         if bins is not None:
--> 227             binned = pd.cut(group.values, bins, **cut_kwargs)
    228             new_dim_name = group.name + '_bins'
    229             group = DataArray(binned, group.coords, name=new_dim_name)

~/miniconda/envs/standard/lib/python3.6/site-packages/pandas/core/reshape/tile.py in cut(x, bins, right, labels, retbins, precision, include_lowest)
    134                               precision=precision,
    135                               include_lowest=include_lowest,
--> 136                               dtype=dtype)
    137 
    138     return _postprocess_for_cut(fac, bins, retbins, x_is_series,

~/miniconda/envs/standard/lib/python3.6/site-packages/pandas/core/reshape/tile.py in _bins_to_cuts(x, bins, right, labels, precision, include_lowest, dtype, duplicates)
    227         return result, bins
    228 
--> 229     unique_bins = algos.unique(bins)
    230     if len(unique_bins) < len(bins) and len(bins) != 2:
    231         if duplicates == 'raise':

~/miniconda/envs/standard/lib/python3.6/site-packages/pandas/core/algorithms.py in unique(values)
    362 
    363     table = htable(len(values))
--> 364     uniques = table.unique(values)
    365     uniques = _reconstruct_data(uniques, dtype, original)
    366 

pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.Int64HashTable.unique()

~/miniconda/envs/standard/lib/python3.6/site-packages/pandas/_libs/hashtable.cpython-36m-darwin.so in View.MemoryView.memoryview_cwrapper()

~/miniconda/envs/standard/lib/python3.6/site-packages/pandas/_libs/hashtable.cpython-36m-darwin.so in View.MemoryView.memoryview.__cinit__()

ValueError: buffer source array is read-only

This error only gets triggered upon computation.

Problem description

I am not sure If this is a bug or a user error on my side. I am still trying to get used to .apply_ufunc.
If anybody has an idea for a workaround I would greatly appreciate it.

I am not sure if the rewrapping in xr.DataArrays in _func is actually necessary. I tried to find an equivalent functions that operates directly on dask.arrays but was not successful.

Output of xr.show_versions()

INSTALLED VERSIONS ------------------ commit: None python: 3.6.3.final.0 python-bits: 64 OS: Darwin OS-release: 16.7.0 machine: x86_64 processor: i386 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: en_US.UTF-8

xarray: 0.10.0rc1-9-gdbf7b01
pandas: 0.21.0
numpy: 1.13.3
scipy: 1.0.0
netCDF4: 1.3.1
h5netcdf: 0.5.0
Nio: None
bottleneck: 1.2.1
cyordereddict: None
dask: 0.16.0
matplotlib: 2.1.0
cartopy: 0.15.1
seaborn: 0.8.1
setuptools: 38.2.3
pip: 9.0.1
conda: None
pytest: 3.3.1
IPython: 6.2.1
sphinx: 1.6.5

@jbusecke
Copy link
Contributor Author

jbusecke commented Dec 7, 2017

Upon some more testing it seems the problem is related to how I pass the argument bins to apply_ufunc.

input_core_dims=[dims, dims, ['dummy']],

The bins are always going to be a 1D array that I want to pass identically for each call to _func.
I thought by giving it a "dummy" core_dim I would achieve that. If I hardcode the bins into _func and remove the third input argument (and corresponding core_dim) from apply_ufunc the results are as expected.

@shoyer
Copy link
Member

shoyer commented Dec 7, 2017

You probably need to give bins an explicit 'dummy' dimension for using in apply_ufunc.

We should update apply_ufunc to error when given an unlabeled array as input for an argument that has core-dimensions defined.

@jbusecke
Copy link
Contributor Author

jbusecke commented Dec 7, 2017

Ok I will give it a try soon, for now I shamelessly hardcoded it and it seems to work so far.

@jbusecke
Copy link
Contributor Author

jbusecke commented Dec 8, 2017

I added the line bins = xr.DataArray(bins, dims=['dummy']) but the same weird error appeared.

Here is the full tracer for this time (this was when I tried to save a netcdf). I could apply wrapper without error but if I either try to .compute() or .to_netcdf() (below) an error is raised.

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<timed eval> in <module>()

~/code/src/xarray/xarray/core/dataset.py in to_netcdf(self, path, mode, format, group, engine, encoding, unlimited_dims)
   1130         return to_netcdf(self, path, mode, format=format, group=group,
   1131                          engine=engine, encoding=encoding,
-> 1132                          unlimited_dims=unlimited_dims)
   1133 
   1134     def __unicode__(self):

~/code/src/xarray/xarray/backends/api.py in to_netcdf(dataset, path_or_file, mode, format, group, engine, writer, encoding, unlimited_dims)
    616     try:
    617         dataset.dump_to_store(store, sync=sync, encoding=encoding,
--> 618                               unlimited_dims=unlimited_dims)
    619         if path_or_file is None:
    620             return target.getvalue()

~/code/src/xarray/xarray/core/dataset.py in dump_to_store(self, store, encoder, sync, encoding, unlimited_dims)
   1069                     unlimited_dims=unlimited_dims)
   1070         if sync:
-> 1071             store.sync()
   1072 
   1073     def to_netcdf(self, path=None, mode='w', format=None, group=None,

~/code/src/xarray/xarray/backends/scipy_.py in sync(self)
    206     def sync(self):
    207         with self.ensure_open(autoclose=True):
--> 208             super(ScipyDataStore, self).sync()
    209             self.ds.flush()
    210 

~/code/src/xarray/xarray/backends/common.py in sync(self)
    208 
    209     def sync(self):
--> 210         self.writer.sync()
    211 
    212     def store_dataset(self, dataset):

~/code/src/xarray/xarray/backends/common.py in sync(self)
    185             import dask
    186             if LooseVersion(dask.__version__) > LooseVersion('0.8.1'):
--> 187                 da.store(self.sources, self.targets, lock=GLOBAL_LOCK)
    188             else:
    189                 da.store(self.sources, self.targets)

~/code/miniconda/envs/standard/lib/python3.6/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, **kwargs)
    898     dsk = sharedict.merge((name, updates), *[src.dask for src in sources])
    899     if compute:
--> 900         compute_as_if_collection(Array, dsk, keys, **kwargs)
    901     else:
    902         from ..delayed import Delayed

~/code/miniconda/envs/standard/lib/python3.6/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, get, **kwargs)
    210     get = get or _globals['get'] or cls.__dask_scheduler__
    211     dsk2 = optimization_function(cls)(ensure_dict(dsk), keys, **kwargs)
--> 212     return get(dsk2, keys, **kwargs)
    213 
    214 

~/code/miniconda/envs/standard/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     73     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     74                         cache=cache, get_id=_thread_get_id,
---> 75                         pack_exception=pack_exception, **kwargs)
     76 
     77     # Cleanup pools associated to dead threads

~/code/miniconda/envs/standard/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    519                         _execute_task(task, data)  # Re-execute locally
    520                     else:
--> 521                         raise_exception(exc, tb)
    522                 res, worker_id = loads(res_info)
    523                 state['cache'][key] = res

~/code/miniconda/envs/standard/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
     58         if exc.__traceback__ is not tb:
     59             raise exc.with_traceback(tb)
---> 60         raise exc
     61 
     62 else:

~/code/miniconda/envs/standard/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    288     try:
    289         task, data = loads(task_info)
--> 290         result = _execute_task(task, data)
    291         id = get_id()
    292         result = dumps((result, id))

~/code/miniconda/envs/standard/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    268     elif istask(arg):
    269         func, args = arg[0], arg[1:]
--> 270         args2 = [_execute_task(a, cache) for a in args]
    271         return func(*args2)
    272     elif not ishashable(arg):

~/code/miniconda/envs/standard/lib/python3.6/site-packages/dask/local.py in <listcomp>(.0)
    268     elif istask(arg):
    269         func, args = arg[0], arg[1:]
--> 270         args2 = [_execute_task(a, cache) for a in args]
    271         return func(*args2)
    272     elif not ishashable(arg):

~/code/miniconda/envs/standard/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    269         func, args = arg[0], arg[1:]
    270         args2 = [_execute_task(a, cache) for a in args]
--> 271         return func(*args2)
    272     elif not ishashable(arg):
    273         return arg

~/code/miniconda/envs/standard/lib/python3.6/site-packages/numpy/lib/function_base.py in __call__(self, *args, **kwargs)
   2737             vargs.extend([kwargs[_n] for _n in names])
   2738 
-> 2739         return self._vectorize_call(func=func, args=vargs)
   2740 
   2741     def _get_ufunc_and_otypes(self, func, args):

~/code/miniconda/envs/standard/lib/python3.6/site-packages/numpy/lib/function_base.py in _vectorize_call(self, func, args)
   2803         """Vectorized call to `func` over positional `args`."""
   2804         if self.signature is not None:
-> 2805             res = self._vectorize_call_with_signature(func, args)
   2806         elif not args:
   2807             res = func()

~/code/miniconda/envs/standard/lib/python3.6/site-packages/numpy/lib/function_base.py in _vectorize_call_with_signature(self, func, args)
   2844 
   2845         for index in np.ndindex(*broadcast_shape):
-> 2846             results = func(*(arg[index] for arg in args))
   2847 
   2848             n_results = len(results) if isinstance(results, tuple) else 1

<ipython-input-14-078f1623bd91> in _func(data, bin_data, bins)
     90 
     91         binned = da_data.groupby_bins(da_bin_data, bins, labels=labels,
---> 92                                   include_lowest=True).sum()
     93     return binned
     94 

~/code/src/xarray/xarray/core/common.py in groupby_bins(self, group, bins, right, labels, precision, include_lowest, squeeze)
    466                                  cut_kwargs={'right': right, 'labels': labels,
    467                                              'precision': precision,
--> 468                                              'include_lowest': include_lowest})
    469 
    470     def rolling(self, min_periods=None, center=False, **windows):

~/code/src/xarray/xarray/core/groupby.py in __init__(self, obj, group, squeeze, grouper, bins, cut_kwargs)
    225 
    226         if bins is not None:
--> 227             binned = pd.cut(group.values, bins, **cut_kwargs)
    228             new_dim_name = group.name + '_bins'
    229             group = DataArray(binned, group.coords, name=new_dim_name)

~/code/miniconda/envs/standard/lib/python3.6/site-packages/pandas/core/reshape/tile.py in cut(x, bins, right, labels, retbins, precision, include_lowest)
    134                               precision=precision,
    135                               include_lowest=include_lowest,
--> 136                               dtype=dtype)
    137 
    138     return _postprocess_for_cut(fac, bins, retbins, x_is_series,

~/code/miniconda/envs/standard/lib/python3.6/site-packages/pandas/core/reshape/tile.py in _bins_to_cuts(x, bins, right, labels, precision, include_lowest, dtype, duplicates)
    225         return result, bins
    226 
--> 227     unique_bins = algos.unique(bins)
    228     if len(unique_bins) < len(bins) and len(bins) != 2:
    229         if duplicates == 'raise':

~/code/miniconda/envs/standard/lib/python3.6/site-packages/pandas/core/algorithms.py in unique(values)
    354 
    355     table = htable(len(values))
--> 356     uniques = table.unique(values)
    357     uniques = _reconstruct_data(uniques, dtype, original)
    358 

pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.Float64HashTable.unique (pandas/_libs/hashtable.c:9781)()

~/code/miniconda/envs/standard/lib/python3.6/site-packages/pandas/_libs/hashtable.cpython-36m-x86_64-linux-gnu.so in View.MemoryView.memoryview_cwrapper (pandas/_libs/hashtable.c:45205)()

~/code/miniconda/envs/standard/lib/python3.6/site-packages/pandas/_libs/hashtable.cpython-36m-x86_64-linux-gnu.so in View.MemoryView.memoryview.__cinit__ (pandas/_libs/hashtable.c:41440)()

ValueError: buffer source array is read-only

@shoyer
Copy link
Member

shoyer commented Dec 14, 2017

I looked into this a little more. The fix is to make a copy of bins inside _func:

def _func(data, bin_data, bins):
    """Group unlabeled array 'data' according to values in 'bin_data' using 
    bins defined in 'bins' and sum all values"""
    bins = np.array(bins)
    labels = bins[1:]
    da_data = xr.DataArray(data, name='data')
    da_bin_data = xr.DataArray(bin_data, name='bin_data')
    
    binned = da_data.groupby_bins(da_bin_data, bins, labels=labels,
                              include_lowest=True).sum()
    return binned

The problem is that broadcasting (inside xarray.apply_ufunc/np.vectorize) produces readonly arrays, but the pandas function doesn't handle it properly (pandas-dev/pandas#18773). We could potentially add a work-around in pandas to fix this, but the ultimate source issue is cython/cython#1605

@jbusecke
Copy link
Contributor Author

Thank you very much! I will give that a try in the next days.

@dcherian
Copy link
Contributor

Closing since upstream issues have been closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants