Skip to content

REF: Refactor sparse HDF5 read / write #28456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/whatsnew/v1.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Other API changes
^^^^^^^^^^^^^^^^^

- :meth:`pandas.api.types.infer_dtype` will now return "integer-na" for integer and ``np.nan`` mix (:issue:`27283`)
-
- :func:`read_hdf` now reads sparse values into a :class:`Series` or :class:`DataFrame` with sparse values rather than a ``SparseDataFrame`` or ``SparseSeries`` (:issue:`28456`)
-

.. _whatsnew_1000.deprecations:
Expand Down
118 changes: 30 additions & 88 deletions pandas/io/pytables.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
is_datetime64tz_dtype,
is_extension_type,
is_list_like,
is_sparse,
is_timedelta64_dtype,
)
from pandas.core.dtypes.missing import array_equivalent
Expand All @@ -40,8 +41,7 @@
MultiIndex,
PeriodIndex,
Series,
SparseDataFrame,
SparseSeries,
SparseArray,
TimedeltaIndex,
concat,
isna,
Expand Down Expand Up @@ -173,22 +173,17 @@ class DuplicateWarning(Warning):
"""

# map object types
_TYPE_MAP = {
Series: "series",
SparseSeries: "sparse_series",
DataFrame: "frame",
SparseDataFrame: "sparse_frame",
}
_TYPE_MAP = {Series: "series", DataFrame: "frame"}

# storer class map
_STORER_MAP = {
"Series": "LegacySeriesFixed",
"DataFrame": "LegacyFrameFixed",
"DataMatrix": "LegacyFrameFixed",
"series": "SeriesFixed",
"sparse_series": "SparseSeriesFixed",
"sparse_series": "SeriesFixed",
"frame": "FrameFixed",
"sparse_frame": "SparseFrameFixed",
"sparse_frame": "FrameFixed",
}

# table class map
Expand Down Expand Up @@ -2754,6 +2749,19 @@ def read_array(self, key, start=None, stop=None):
elif dtype == "timedelta64":
ret = np.asarray(ret, dtype="m8[ns]")

if dtype == "Sparse":
if start or stop:
raise NotImplementedError(
"start and/or stop are not supported in fixed Sparse reading"
)
sp_index = self.read_index("{}_sp_index".format(key))
ret = SparseArray(
ret,
sparse_index=sp_index,
fill_value=self.attrs["{}_fill_value".format(key)],
kind=self.attrs["{}_kind".format(key)],
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the old "SparseSeriesFixed" used the "sp_values" key to write the sparse values. How does this code work with that? (or why does it not need to read this key specifically?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check (perhaps we don't have test coverage for it)

or why does it not need to read this key specifically?

IIRC, I needed this because DataFrame.write eventually calls this on a sparse ExtensionBlock. When I didn't prefix this with the key, I got name errors from reading the wrong value (I kept overwriting sp_index on each column of the dataframe).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not tested...

# write
>>> s = pd.Series([1, None, 2, 3]).to_sparse()
>>> s.to_hdf("foo.h5", "ss")

Reading

In [2]: pd.read_hdf("foo.h5", "ss")
---------------------------------------------------------------------------
NoSuchNodeError                           Traceback (most recent call last)
<ipython-input-2-a3883e54184d> in <module>
----> 1 pd.read_hdf("foo.h5", "ss")

~/sandbox/pandas/pandas/io/pytables.py in read_hdf(path_or_buf, key, mode, **kwargs)
    399                     )
    400             key = candidate_only_group._v_pathname
--> 401         return store.select(key, auto_close=auto_close, **kwargs)
    402     except (ValueError, TypeError, KeyError):
    403         # if there is an error, close the store

~/sandbox/pandas/pandas/io/pytables.py in select(self, key, where, start, stop, columns, iterator, chunksize, auto_close, **kwargs)
    780         )
    781
--> 782         return it.get_result()
    783
    784     def select_as_coordinates(self, key, where=None, start=None, stop=None, **kwargs):

~/sandbox/pandas/pandas/io/pytables.py in get_result(self, coordinates)
   1642
   1643         # directly return the result
-> 1644         results = self.func(self.start, self.stop, where)
   1645         self.close()
   1646         return results

~/sandbox/pandas/pandas/io/pytables.py in func(_start, _stop, _where)
    764         # function to call on iteration
    765         def func(_start, _stop, _where):
--> 766             return s.read(start=_start, stop=_stop, where=_where, columns=columns)
    767
    768         # create the iterator

~/sandbox/pandas/pandas/io/pytables.py in read(self, **kwargs)
   3083         kwargs = self.validate_read(kwargs)
   3084         index = self.read_index("index", **kwargs)
-> 3085         values = self.read_array("values", **kwargs)
   3086         return Series(values, index=index, name=self.name)
   3087

~/sandbox/pandas/pandas/io/pytables.py in read_array(self, key, start, stop)
   2725         import tables
   2726
-> 2727         node = getattr(self.group, key)
   2728         attrs = node._v_attrs
   2729

~/Envs/pandas-dev/lib/python3.7/site-packages/tables/group.py in __getattr__(self, name)
    837             self._g_add_children_names()
    838             return self.__dict__[name]
--> 839         return self._f_get_child(name)
    840
    841     def __setattr__(self, name, value):

~/Envs/pandas-dev/lib/python3.7/site-packages/tables/group.py in _f_get_child(self, childname)
    709         self._g_check_open()
    710
--> 711         self._g_check_has_child(childname)
    712
    713         childpath = join_path(self._v_pathname, childname)

~/Envs/pandas-dev/lib/python3.7/site-packages/tables/group.py in _g_check_has_child(self, name)
    396             raise NoSuchNodeError(
    397                 "group ``%s`` does not have a child named ``%s``"
--> 398                 % (self._v_pathname, name))
    399         return node_type
    400

NoSuchNodeError: group ``/ss`` does not have a child named ``values``

That's raising on values, but it's the same issue. Previously we used sp_values, now we use {key}. Will investigate, and add a failing test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth to keep the SparseSeries/SparseFrameFixed classes that you removed (or at least the read part of it), but adjust it to create a Series[sparse] instead of SparseSeries. That seems the easiest to me to handle the legacy hdf5 files that contain such sparse data


if transposed:
return ret.T
else:
Expand Down Expand Up @@ -3004,7 +3012,7 @@ def write_array(self, key, value, items=None):
vlarr = self._handle.create_vlarray(self.group, key, _tables().ObjectAtom())
vlarr.append(value)
else:
if empty_array:
if empty_array and not is_sparse(value):
self.write_array_empty(key, value)
else:
if is_datetime64_dtype(value.dtype):
Expand All @@ -3021,6 +3029,17 @@ def write_array(self, key, value, items=None):
elif is_timedelta64_dtype(value.dtype):
self._handle.create_array(self.group, key, value.view("i8"))
getattr(self.group, key)._v_attrs.value_type = "timedelta64"
elif is_sparse(value):
# TODO: think about EA API for this.
# value._write_hdf5(self)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is value._write_hd5(self) a commented-out line or part of the TODO comment above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of the TODO.

self.write_index("{}_sp_index".format(key), value.sp_index)
self._handle.create_array(self.group, key, value.sp_values)
getattr(self.group, key)._v_attrs.value_type = "Sparse"
setattr(self.attrs, "{}_fill_value".format(key), value.fill_value)
setattr(self.attrs, "{}_kind".format(key), value.kind)
self.attributes.extend(
["{}_fill_value".format(key), "{}_kind".format(key)]
)
else:
self._handle.create_array(self.group, key, value)

Expand Down Expand Up @@ -3078,83 +3097,6 @@ def write(self, obj, **kwargs):
self.attrs.name = obj.name


class SparseFixed(GenericFixed):
def validate_read(self, kwargs):
"""
we don't support start, stop kwds in Sparse
"""
kwargs = super().validate_read(kwargs)
if "start" in kwargs or "stop" in kwargs:
raise NotImplementedError(
"start and/or stop are not supported in fixed Sparse reading"
)
return kwargs


class SparseSeriesFixed(SparseFixed):
pandas_kind = "sparse_series"
attributes = ["name", "fill_value", "kind"]

def read(self, **kwargs):
kwargs = self.validate_read(kwargs)
index = self.read_index("index")
sp_values = self.read_array("sp_values")
sp_index = self.read_index("sp_index")
return SparseSeries(
sp_values,
index=index,
sparse_index=sp_index,
kind=self.kind or "block",
fill_value=self.fill_value,
name=self.name,
)

def write(self, obj, **kwargs):
super().write(obj, **kwargs)
self.write_index("index", obj.index)
self.write_index("sp_index", obj.sp_index)
self.write_array("sp_values", obj.sp_values)
self.attrs.name = obj.name
self.attrs.fill_value = obj.fill_value
self.attrs.kind = obj.kind


class SparseFrameFixed(SparseFixed):
pandas_kind = "sparse_frame"
attributes = ["default_kind", "default_fill_value"]

def read(self, **kwargs):
kwargs = self.validate_read(kwargs)
columns = self.read_index("columns")
sdict = {}
for c in columns:
key = "sparse_series_{columns}".format(columns=c)
s = SparseSeriesFixed(self.parent, getattr(self.group, key))
s.infer_axes()
sdict[c] = s.read()
return SparseDataFrame(
sdict,
columns=columns,
default_kind=self.default_kind,
default_fill_value=self.default_fill_value,
)

def write(self, obj, **kwargs):
""" write it as a collection of individual sparse series """
super().write(obj, **kwargs)
for name, ss in obj.items():
key = "sparse_series_{name}".format(name=name)
if key not in self.group._v_children:
node = self._handle.create_group(self.group, key)
else:
node = getattr(self.group, key)
s = SparseSeriesFixed(self.parent, node)
s.write(ss)
self.attrs.default_fill_value = obj.default_fill_value
self.attrs.default_kind = obj.default_kind
self.write_index("columns", obj.columns)


class BlockManagerFixed(GenericFixed):
attributes = ["ndim", "nblocks"]
is_shape_reversed = False
Expand Down
43 changes: 19 additions & 24 deletions pandas/tests/io/pytables/test_pytables.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,6 @@
ignore_natural_naming_warning = pytest.mark.filterwarnings(
"ignore:object name:tables.exceptions.NaturalNameWarning"
)
ignore_sparse = pytest.mark.filterwarnings("ignore:Sparse:FutureWarning")
ignore_dataframe_tosparse = pytest.mark.filterwarnings(
"ignore:DataFrame.to_sparse:FutureWarning"
)
ignore_series_tosparse = pytest.mark.filterwarnings(
"ignore:Series.to_sparse:FutureWarning"
)

# contextmanager to ensure the file cleanup


Expand Down Expand Up @@ -2353,38 +2345,45 @@ def test_series(self):
ts3 = Series(ts.values, Index(np.asarray(ts.index, dtype=object), dtype=object))
self._check_roundtrip(ts3, tm.assert_series_equal, check_index_type=False)

@ignore_sparse
@ignore_series_tosparse
def test_sparse_series(self):

s = tm.makeStringSeries()
s.iloc[3:5] = np.nan
ss = s.to_sparse()
ss = s.astype("Sparse")
self._check_roundtrip(ss, tm.assert_series_equal, check_series_type=True)

ss2 = s.to_sparse(kind="integer")
ss2 = pd.Series(pd.SparseArray(s, kind="integer"))
self._check_roundtrip(ss2, tm.assert_series_equal, check_series_type=True)

ss3 = s.to_sparse(fill_value=0)
ss3 = pd.Series(pd.SparseArray(s, fill_value=0))
self._check_roundtrip(ss3, tm.assert_series_equal, check_series_type=True)

@ignore_sparse
@ignore_dataframe_tosparse
def test_sparse_frame(self):

s = tm.makeDataFrame()
s.iloc[3:5, 1:3] = np.nan
s.iloc[8:10, -2] = np.nan
ss = s.to_sparse()
ss = s.astype("Sparse")

self._check_double_roundtrip(ss, tm.assert_frame_equal, check_frame_type=True)

ss2 = s.to_sparse(kind="integer")
ss2 = s.apply(lambda x: pd.SparseArray(x, kind="integer"))
self._check_double_roundtrip(ss2, tm.assert_frame_equal, check_frame_type=True)

ss3 = s.to_sparse(fill_value=0)
ss3 = s.apply(lambda x: pd.SparseArray(x, fill_value=0))
self._check_double_roundtrip(ss3, tm.assert_frame_equal, check_frame_type=True)

def test_mixed_sparse_dense_frame(self):
df = pd.DataFrame(
{
"A": [0, 1, 2, 3],
"B": pd.SparseArray([0, 1, 2, 3], kind="block"),
"C": [0.0, 1.0, 2.0, 3.0],
"D": pd.SparseArray([0.0, 1.0, 2.0, 3.0], kind="integer"),
}
)
self._check_roundtrip(df, tm.assert_frame_equal)

def test_float_index(self):

# GH #454
Expand Down Expand Up @@ -2709,15 +2708,13 @@ def test_overwrite_node(self):

tm.assert_series_equal(store["a"], ts)

@ignore_sparse
@ignore_dataframe_tosparse
def test_sparse_with_compression(self):

# GH 2931

# make sparse dataframe
arr = np.random.binomial(n=1, p=0.01, size=(1000, 10))
df = DataFrame(arr).to_sparse(fill_value=0)
df = DataFrame(arr).apply(lambda x: pd.SparseArray(x, fill_value=0))

# case 1: store uncompressed
self._check_double_roundtrip(
Expand Down Expand Up @@ -3890,8 +3887,6 @@ def test_start_stop_multiple(self):
expected = df.loc[[0], ["foo", "bar"]]
tm.assert_frame_equal(result, expected)

@ignore_sparse
@ignore_dataframe_tosparse
def test_start_stop_fixed(self):

with ensure_clean_store(self.path) as store:
Expand Down Expand Up @@ -3931,7 +3926,7 @@ def test_start_stop_fixed(self):
df = tm.makeDataFrame()
df.iloc[3:5, 1:3] = np.nan
df.iloc[8:10, -2] = np.nan
dfs = df.to_sparse()
dfs = df.apply(pd.SparseArray)
store.put("dfs", dfs)
with pytest.raises(NotImplementedError):
store.select("dfs", start=0, stop=5)
Expand Down