Skip to content

Commit d4ba982

Browse files
committed
ENH: add pd.read_ipc and pd.to_ipc to provide efficient serialization to/from memory
1 parent f35209e commit d4ba982

File tree

2 files changed

+275
-0
lines changed

2 files changed

+275
-0
lines changed

pandas/io/ipc.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
""" ipc format compat """
2+
3+
from pandas.types.generic import ABCIndexClass, ABCSeries, ABCDataFrame
4+
from pandas.compat import BytesIO, string_types
5+
from pandas._libs.lib import is_string_array, is_unicode_array
6+
from pandas.types.common import is_object_dtype
7+
8+
9+
def _try_import():
10+
# since pandas
11+
# we need to import on first use
12+
13+
try:
14+
import pyarrow
15+
except ImportError:
16+
17+
# give a nice error message
18+
raise ImportError("the pyarrow is not installed\n"
19+
"you can install via conda\n"
20+
"conda install pyarrow -c conda-forge")
21+
22+
return pyarrow
23+
24+
25+
def to_ipc(obj, engine='infer'):
26+
"""
27+
Write a DataFrame to the ipc format
28+
29+
Parameters
30+
----------
31+
obj : Index, Series, DataFrame
32+
engine : string, optional
33+
string to indicate the engine {'infer', 'pickle', 'pyarrow'}
34+
'infer' will pick an engine based upon performance considerations
35+
36+
Returns
37+
-------
38+
dict-of-metadata and bytes
39+
40+
"""
41+
if engine == 'pickle':
42+
return _to_pickle(obj)
43+
elif engine == 'pyarrow':
44+
try:
45+
return _to_pyarrow(obj)
46+
except: # pragma
47+
pass
48+
49+
if isinstance(obj, (ABCIndexClass, ABCSeries)):
50+
return _to_pickle(obj)
51+
elif isinstance(obj, ABCDataFrame):
52+
53+
# decide quickly if we can serialize using
54+
# pyarrow or pickle
55+
56+
# smallish, just pickle
57+
if len(obj) <= 100000:
58+
return _to_pickle(obj)
59+
60+
# check our object columns
61+
for c, col in obj.iteritems():
62+
if not is_object_dtype(col):
63+
continue
64+
65+
# if we discover we have actual python objects
66+
# embedded with strings/unicode, then pickle
67+
values = col.values
68+
if isinstance(values[0], string_types):
69+
if not is_string_array(values):
70+
return _to_pickle(obj)
71+
else:
72+
if not is_unicode_array(values):
73+
return _to_pickle(obj)
74+
75+
return _to_pyarrow(obj)
76+
77+
raise ValueError("ipc only supports IO with Index,"
78+
"Series, DataFrames, a {} was "
79+
"passed".format(type(obj)))
80+
81+
82+
def _to_pyarrow(df):
83+
""" helper routine to return via pyarrow """
84+
pyarrow = _try_import()
85+
return pyarrow.write_ipc(df)
86+
87+
88+
def _to_pickle(obj):
89+
""" helper routine to return a pickle of an object """
90+
from pandas import to_pickle
91+
db = BytesIO()
92+
to_pickle(obj, db)
93+
return db.getvalue()
94+
95+
96+
def read_ipc(db, engine='infer'):
97+
"""
98+
Load a pyarrow ipc format object from the file dict-of-bytes
99+
100+
.. versionadded 0.20.0
101+
102+
Parameters
103+
----------
104+
dict-of-meta-and-bytes : a dictionary of meta data & bytes
105+
engine : string, optional
106+
string to indicate the engine {'infer', 'pickle', 'pyarrow'}
107+
'infer' will pick an engine based upon performance considerations
108+
109+
Returns
110+
-------
111+
DataFrame
112+
113+
"""
114+
if engine == 'pickle':
115+
return _read_pickle(db)
116+
try:
117+
return _read_pyarrow(db)
118+
except: # pragma
119+
return _read_pickle(db)
120+
121+
122+
def _read_pyarrow(db):
123+
""" helper to return via pyarrow """
124+
pyarrow = _try_import()
125+
return pyarrow.read_ipc(db)
126+
127+
128+
def _read_pickle(db):
129+
""" helper to return via pickle """
130+
from pandas import read_pickle
131+
132+
db = BytesIO(db)
133+
return read_pickle(db)

pandas/tests/io/test_ipc.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
""" test ipc compat """
2+
3+
import pytest
4+
pyarrow = pytest.importorskip('pyarrow')
5+
6+
from distutils.version import LooseVersion
7+
import numpy as np
8+
import pandas as pd
9+
from pandas import Series, Index, DataFrame
10+
from pandas.io.ipc import (to_ipc, read_ipc,
11+
_to_pickle, _to_pyarrow,
12+
_read_pickle, _read_pyarrow)
13+
14+
import pandas.util.testing as tm
15+
16+
_HAVE_LATEST_PYARROW = LooseVersion(pyarrow.__version__) > '0.2.0'
17+
18+
19+
@pytest.fixture(
20+
params=[('pickle', _to_pickle, _read_pickle),
21+
pytest.mark.skipif(not _HAVE_LATEST_PYARROW,
22+
reason='need newer pyarrow version')(
23+
'pyarrow', _to_pyarrow, _read_pyarrow)],
24+
ids=lambda x: x[0])
25+
def engine(request):
26+
return request.param
27+
28+
29+
@pytest.fixture
30+
def pa():
31+
if not _HAVE_LATEST_PYARROW:
32+
pytest.skip("need newer pyarrow")
33+
34+
35+
def make_mixed_frame(N):
36+
return DataFrame(
37+
{'A': np.arange(N),
38+
'B': np.random.randn(N),
39+
'C': 'foo',
40+
'D': tm.makeStringIndex(N),
41+
'E': pd.Categorical.from_codes(np.repeat([0, 1], N // 2),
42+
categories=['foo', 'bar']),
43+
'F': pd.date_range('20130101', freq='s', periods=N)})
44+
45+
46+
class TestIPC(object):
47+
48+
def check_error_on_write(self, df, exc):
49+
# check that we are raising the exception
50+
# on writing
51+
52+
with pytest.raises(exc):
53+
to_ipc(df)
54+
55+
def check_round_trip(self, df, engine=None):
56+
57+
if engine is None:
58+
writer = to_ipc
59+
reader = read_ipc
60+
else:
61+
_, writer, reader = engine
62+
63+
b = writer(df)
64+
result = reader(b)
65+
tm.assert_frame_equal(result, df)
66+
67+
def test_error(self):
68+
for obj in [1, 'foo', pd.Timestamp('20130101'),
69+
np.array([1, 2, 3])]:
70+
self.check_error_on_write(obj, ValueError)
71+
72+
def test_with_small_size(self, engine):
73+
74+
N = 100
75+
df = make_mixed_frame(N)
76+
self.check_round_trip(df, engine)
77+
78+
def test_with_med_size(self, engine):
79+
80+
# large size
81+
N = 10000
82+
df = make_mixed_frame(N)
83+
self.check_round_trip(df, engine)
84+
85+
def test_with_large_size(self, engine):
86+
87+
# large size
88+
N = 1000000
89+
df = make_mixed_frame(N)
90+
self.check_round_trip(df, engine)
91+
92+
def test_non_dataframe(self):
93+
94+
i = Index(['foo', 'bar'])
95+
b = to_ipc(i)
96+
result = read_ipc(b)
97+
tm.assert_index_equal(result, i)
98+
99+
s = Series(['foo', 'bar'])
100+
b = to_ipc(s)
101+
result = read_ipc(b)
102+
tm.assert_series_equal(result, s)
103+
104+
def test_basic(self, pa):
105+
106+
df = pd.DataFrame({
107+
'string': list('abc'),
108+
'int': list(range(1, 4)),
109+
'uint': np.arange(3, 6).astype('u1'),
110+
'float': np.arange(4.0, 7.0, dtype='float64'),
111+
'bool': [True, False, True],
112+
'bool_with_nan': [True, None, True],
113+
'cat': pd.Categorical(list('abc')),
114+
'date_range': pd.date_range('20130101', periods=3),
115+
'date_range_tz': pd.date_range('20130101', periods=3,
116+
tz='US/Eastern'),
117+
'timedelta': pd.timedelta_range('1 day', periods=3)})
118+
119+
# should work both on pickle & pyarrow
120+
# TODO: how to assure this?
121+
self.check_round_trip(df)
122+
123+
def test_pickle_only(self):
124+
125+
# period
126+
df = pd.DataFrame({'a': pd.period_range('2013', freq='M', periods=3)})
127+
self.check_round_trip(df)
128+
129+
# non-strings
130+
df = pd.DataFrame({'a': ['a', 1, 2.0]})
131+
self.check_round_trip(df)
132+
133+
def test_duplicate_columns(self, pa):
134+
135+
df = pd.DataFrame(np.arange(12).reshape(4, 3),
136+
columns=list('aaa')).copy()
137+
self.check_round_trip(df)
138+
139+
def test_stringify_columns(self, pa):
140+
141+
df = pd.DataFrame(np.arange(12).reshape(4, 3)).copy()
142+
self.check_round_trip(df)

0 commit comments

Comments
 (0)