Skip to content

Commit ebc471e

Browse files
authored
CLN: File handling for PyArrow parquet (#37828)
1 parent e487d48 commit ebc471e

File tree

2 files changed

+77
-54
lines changed

2 files changed

+77
-54
lines changed

pandas/io/parquet.py

Lines changed: 74 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import io
44
import os
5-
from typing import Any, AnyStr, Dict, List, Optional
5+
from typing import Any, AnyStr, Dict, List, Optional, Tuple
66
from warnings import catch_warnings
77

88
from pandas._typing import FilePathOrBuffer, StorageOptions
@@ -11,7 +11,7 @@
1111

1212
from pandas import DataFrame, get_option
1313

14-
from pandas.io.common import get_handle, is_fsspec_url, stringify_path
14+
from pandas.io.common import IOHandles, get_handle, is_fsspec_url, stringify_path
1515

1616

1717
def get_engine(engine: str) -> "BaseImpl":
@@ -48,6 +48,40 @@ def get_engine(engine: str) -> "BaseImpl":
4848
raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")
4949

5050

51+
def _get_path_or_handle(
52+
path: FilePathOrBuffer,
53+
fs: Any,
54+
storage_options: StorageOptions = None,
55+
mode: str = "rb",
56+
is_dir: bool = False,
57+
) -> Tuple[FilePathOrBuffer, Optional[IOHandles], Any]:
58+
"""File handling for PyArrow."""
59+
path_or_handle = stringify_path(path)
60+
if is_fsspec_url(path_or_handle) and fs is None:
61+
fsspec = import_optional_dependency("fsspec")
62+
63+
fs, path_or_handle = fsspec.core.url_to_fs(
64+
path_or_handle, **(storage_options or {})
65+
)
66+
elif storage_options:
67+
raise ValueError("storage_options passed with buffer or non-fsspec filepath")
68+
69+
handles = None
70+
if (
71+
not fs
72+
and not is_dir
73+
and isinstance(path_or_handle, str)
74+
and not os.path.isdir(path_or_handle)
75+
):
76+
# use get_handle only when we are very certain that it is not a directory
77+
# fsspec resources can also point to directories
78+
# this branch is used for example when reading from non-fsspec URLs
79+
handles = get_handle(path_or_handle, mode, is_text=False)
80+
fs = None
81+
path_or_handle = handles.handle
82+
return path_or_handle, handles, fs
83+
84+
5185
class BaseImpl:
5286
@staticmethod
5387
def validate_dataframe(df: DataFrame):
@@ -103,64 +137,50 @@ def write(
103137

104138
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
105139

106-
path = stringify_path(path)
107-
# get_handle could be used here (for write_table, not for write_to_dataset)
108-
# but it would complicate the code.
109-
if is_fsspec_url(path) and "filesystem" not in kwargs:
110-
# make fsspec instance, which pyarrow will use to open paths
111-
fsspec = import_optional_dependency("fsspec")
112-
113-
fs, path = fsspec.core.url_to_fs(path, **(storage_options or {}))
114-
kwargs["filesystem"] = fs
115-
116-
elif storage_options:
117-
raise ValueError(
118-
"storage_options passed with file object or non-fsspec file path"
119-
)
120-
121-
if partition_cols is not None:
122-
# writes to multiple files under the given path
123-
self.api.parquet.write_to_dataset(
124-
table,
125-
path,
126-
compression=compression,
127-
partition_cols=partition_cols,
128-
**kwargs,
129-
)
130-
else:
131-
# write to single output file
132-
self.api.parquet.write_table(table, path, compression=compression, **kwargs)
140+
path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
141+
path,
142+
kwargs.pop("filesystem", None),
143+
storage_options=storage_options,
144+
mode="wb",
145+
is_dir=partition_cols is not None,
146+
)
147+
try:
148+
if partition_cols is not None:
149+
# writes to multiple files under the given path
150+
self.api.parquet.write_to_dataset(
151+
table,
152+
path_or_handle,
153+
compression=compression,
154+
partition_cols=partition_cols,
155+
**kwargs,
156+
)
157+
else:
158+
# write to single output file
159+
self.api.parquet.write_table(
160+
table, path_or_handle, compression=compression, **kwargs
161+
)
162+
finally:
163+
if handles is not None:
164+
handles.close()
133165

134166
def read(
135167
self, path, columns=None, storage_options: StorageOptions = None, **kwargs
136168
):
137-
path = stringify_path(path)
138-
handles = None
139-
fs = kwargs.pop("filesystem", None)
140-
if is_fsspec_url(path) and fs is None:
141-
fsspec = import_optional_dependency("fsspec")
142-
143-
fs, path = fsspec.core.url_to_fs(path, **(storage_options or {}))
144-
elif storage_options:
145-
raise ValueError(
146-
"storage_options passed with buffer or non-fsspec filepath"
147-
)
148-
if not fs and isinstance(path, str) and not os.path.isdir(path):
149-
# use get_handle only when we are very certain that it is not a directory
150-
# fsspec resources can also point to directories
151-
# this branch is used for example when reading from non-fsspec URLs
152-
handles = get_handle(path, "rb", is_text=False)
153-
path = handles.handle
154-
155169
kwargs["use_pandas_metadata"] = True
156-
result = self.api.parquet.read_table(
157-
path, columns=columns, filesystem=fs, **kwargs
158-
).to_pandas()
159170

160-
if handles is not None:
161-
handles.close()
162-
163-
return result
171+
path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
172+
path,
173+
kwargs.pop("filesystem", None),
174+
storage_options=storage_options,
175+
mode="rb",
176+
)
177+
try:
178+
return self.api.parquet.read_table(
179+
path_or_handle, columns=columns, **kwargs
180+
).to_pandas()
181+
finally:
182+
if handles is not None:
183+
handles.close()
164184

165185

166186
class FastParquetImpl(BaseImpl):

pandas/tests/io/test_parquet.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,7 @@ def test_partition_cols_supported(self, pa, df_full):
691691
dataset = pq.ParquetDataset(path, validate_schema=False)
692692
assert len(dataset.partitions.partition_names) == 2
693693
assert dataset.partitions.partition_names == set(partition_cols)
694+
assert read_parquet(path).shape == df.shape
694695

695696
def test_partition_cols_string(self, pa, df_full):
696697
# GH #27117
@@ -704,6 +705,7 @@ def test_partition_cols_string(self, pa, df_full):
704705
dataset = pq.ParquetDataset(path, validate_schema=False)
705706
assert len(dataset.partitions.partition_names) == 1
706707
assert dataset.partitions.partition_names == set(partition_cols_list)
708+
assert read_parquet(path).shape == df.shape
707709

708710
@pytest.mark.parametrize("path_type", [str, pathlib.Path])
709711
def test_partition_cols_pathlib(self, pa, df_compat, path_type):
@@ -716,6 +718,7 @@ def test_partition_cols_pathlib(self, pa, df_compat, path_type):
716718
with tm.ensure_clean_dir() as path_str:
717719
path = path_type(path_str)
718720
df.to_parquet(path, partition_cols=partition_cols_list)
721+
assert read_parquet(path).shape == df.shape
719722

720723
def test_empty_dataframe(self, pa):
721724
# GH #27339

0 commit comments

Comments
 (0)