-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
ENH: add fsspec support #34266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: add fsspec support #34266
Changes from 8 commits
94e717f
fd7e072
302ba13
9e6d3b2
4564c8d
0654537
8d45cbb
006e736
724ebd8
9da1689
a595411
6dd1e92
6e13df7
3262063
4bc2411
68644ab
32bc586
037ef2c
c3c3075
85d6452
263dd3b
d0afbc3
6a587a5
b2992c1
9c03745
7982e7b
946297b
145306e
06e5a3a
8f3854c
50c08c8
9b20dc6
eb90fe8
b3e2cd2
4977a00
29a9785
565031b
606ce11
60b80a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -126,20 +126,6 @@ def stringify_path( | |
return _expand_user(filepath_or_buffer) | ||
|
||
|
||
def is_s3_url(url) -> bool: | ||
"""Check for an s3, s3n, or s3a url""" | ||
if not isinstance(url, str): | ||
return False | ||
return parse_url(url).scheme in ["s3", "s3n", "s3a"] | ||
|
||
|
||
def is_gcs_url(url) -> bool: | ||
"""Check for a gcs url""" | ||
if not isinstance(url, str): | ||
return False | ||
return parse_url(url).scheme in ["gcs", "gs"] | ||
|
||
|
||
def urlopen(*args, **kwargs): | ||
""" | ||
Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of | ||
|
@@ -150,38 +136,20 @@ def urlopen(*args, **kwargs): | |
return urllib.request.urlopen(*args, **kwargs) | ||
|
||
|
||
def get_fs_for_path(filepath: str): | ||
def is_fsspec_url(url: FilePathOrBuffer) -> bool: | ||
""" | ||
Get appropriate filesystem given a filepath. | ||
Supports s3fs, gcs and local file system. | ||
|
||
Parameters | ||
---------- | ||
filepath : str | ||
File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj | ||
|
||
Returns | ||
------- | ||
s3fs.S3FileSystem, gcsfs.GCSFileSystem, None | ||
Appropriate FileSystem to use. None for local filesystem. | ||
Returns true if fsspec is installed and the given URL looks like | ||
something fsspec can handle | ||
jreback marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
if is_s3_url(filepath): | ||
from pandas.io import s3 | ||
|
||
return s3.get_fs() | ||
elif is_gcs_url(filepath): | ||
from pandas.io import gcs | ||
|
||
return gcs.get_fs() | ||
else: | ||
return None | ||
return isinstance(url, str) and ("::" in url or "://" in url) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's for compound URLs, e.g., to enable local caching like "simplecache::s3://bucket/path" (or indeed via dask workers) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there ever one of those doesn't also include a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do special-case to assume "file://" where there is no protocol, but happy to drop that possibility in this use case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're saying that something like I think for now I'd prefer to avoid that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
|
||
|
||
def get_filepath_or_buffer( | ||
filepath_or_buffer: FilePathOrBuffer, | ||
encoding: Optional[str] = None, | ||
compression: Optional[str] = None, | ||
mode: Optional[str] = None, | ||
**storage_options: Dict[str, Any], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want callers to pass a dict or collect additional kwargs here, the docstring implies the former. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are not passing anything at all yet, so I don't mind whether it's kwargs or a dict keyword. I imagine in a user function like read_csv, there would be a
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
): | ||
""" | ||
If the filepath_or_buffer is a url, translate and return the buffer. | ||
|
@@ -194,6 +162,8 @@ def get_filepath_or_buffer( | |
compression : {{'gzip', 'bz2', 'zip', 'xz', None}}, optional | ||
encoding : the encoding to use to decode bytes, default is 'utf-8' | ||
mode : str, optional | ||
storage_options: dict | ||
passed on to fsspec, if using it; this is not yet accessed by the public API | ||
|
||
Returns | ||
------- | ||
|
@@ -204,6 +174,7 @@ def get_filepath_or_buffer( | |
filepath_or_buffer = stringify_path(filepath_or_buffer) | ||
|
||
if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer): | ||
# TODO: fsspec can also handle HTTP via requests, but leaving this unchanged | ||
req = urlopen(filepath_or_buffer) | ||
content_encoding = req.headers.get("Content-Encoding", None) | ||
if content_encoding == "gzip": | ||
|
@@ -213,19 +184,14 @@ def get_filepath_or_buffer( | |
req.close() | ||
return reader, encoding, compression, True | ||
|
||
if is_s3_url(filepath_or_buffer): | ||
from pandas.io import s3 | ||
|
||
return s3.get_filepath_or_buffer( | ||
filepath_or_buffer, encoding=encoding, compression=compression, mode=mode | ||
) | ||
if is_fsspec_url(filepath_or_buffer): | ||
import fsspec | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if is_gcs_url(filepath_or_buffer): | ||
from pandas.io import gcs | ||
|
||
return gcs.get_filepath_or_buffer( | ||
filepath_or_buffer, encoding=encoding, compression=compression, mode=mode | ||
) | ||
file_obj = fsspec.open( | ||
filepath_or_buffer, mode=mode or "rb", **storage_options | ||
).open() | ||
# TODO: both fsspec and pandas handle compression and encoding | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would resolve the TODO here? To not handle compression or encoding in pandas? Can you update the comment to indicate that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that pandas must still handle compression and encoding for local and http, that code will not be deprecated. Therefore, I think it's fine that we don't advertise the fact that fsspec can do that part too, and open everything on the backend as "rb"/"wb", uncompressed. The TODO would be resolved if at some point we decided that fsspec should handle all file ops, which is not likely in the near term. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, so I think the TODO can be removed. |
||
return file_obj, encoding, compression, True | ||
|
||
if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)): | ||
return _expand_user(filepath_or_buffer), None, compression, False | ||
|
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,12 +8,7 @@ | |
|
||
from pandas import DataFrame, get_option | ||
|
||
from pandas.io.common import ( | ||
get_filepath_or_buffer, | ||
get_fs_for_path, | ||
is_gcs_url, | ||
is_s3_url, | ||
) | ||
from pandas.io.common import get_filepath_or_buffer, is_fsspec_url | ||
|
||
|
||
def get_engine(engine: str) -> "BaseImpl": | ||
|
@@ -107,6 +102,11 @@ def write( | |
# write_to_dataset does not support a file-like object when | ||
# a directory path is used, so just pass the path string. | ||
if partition_cols is not None: | ||
if is_fsspec_url(path) and "filesystem" not in kwargs: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you leave a comment explaining this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, edit on that: this is the filesystem parameter (i.e., an actual instance) to pyarrow. I have no idea if people might currently be using that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, you're saying the user could pass a filesystem like
That certainly seems possible. Could you ensure that we have a test for that? |
||
import fsspec.core | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
fs, path = fsspec.core.url_to_fs(path) | ||
kwargs["filesystem"] = fs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could potentially also be useful for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the non-partitioned case, we pass a file-like object directly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could not process the path into a file object and pass the filesystem in both cases, if preferred. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If that's what we do right now, fine to leave it like that in this PR. |
||
self.api.parquet.write_to_dataset( | ||
table, | ||
path, | ||
|
@@ -122,9 +122,14 @@ def write( | |
file_obj_or_path.close() | ||
|
||
def read(self, path, columns=None, **kwargs): | ||
parquet_ds = self.api.parquet.ParquetDataset( | ||
path, filesystem=get_fs_for_path(path), **kwargs | ||
) | ||
if is_fsspec_url(path) and "filesystem" not in kwargs: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you additionally check that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you check this comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ping for this one There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (I am also fine with doing this as a follow-up myself) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought you meant that you intended to handle it; and yes please, you are in the best place to check the finer details of the calls to pyarrow. |
||
import fsspec.core | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
fs, path = fsspec.core.url_to_fs(path) | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
parquet_ds = self.api.parquet.ParquetDataset(path, filesystem=fs, **kwargs) | ||
else: | ||
parquet_ds = self.api.parquet.ParquetDataset(path, **kwargs) | ||
|
||
kwargs["columns"] = columns | ||
result = parquet_ds.read_pandas(**kwargs).to_pandas() | ||
return result | ||
|
@@ -164,13 +169,11 @@ def write( | |
if partition_cols is not None: | ||
kwargs["file_scheme"] = "hive" | ||
|
||
if is_s3_url(path) or is_gcs_url(path): | ||
# if path is s3:// or gs:// we need to open the file in 'wb' mode. | ||
# TODO: Support 'ab' | ||
if is_fsspec_url(path): | ||
import fsspec | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
path, _, _, _ = get_filepath_or_buffer(path, mode="wb") | ||
# And pass the opened file to the fastparquet internal impl. | ||
kwargs["open_with"] = lambda path, _: path | ||
# if filesystem is provided by fsspec, file must be opened in 'wb' mode. | ||
kwargs["open_with"] = lambda path, _: fsspec.open(path, "wb").open() | ||
else: | ||
path, _, _, _ = get_filepath_or_buffer(path) | ||
|
||
|
@@ -185,17 +188,11 @@ def write( | |
) | ||
|
||
def read(self, path, columns=None, **kwargs): | ||
if is_s3_url(path): | ||
from pandas.io.s3 import get_file_and_filesystem | ||
if is_fsspec_url(path): | ||
import fsspec | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# When path is s3:// an S3File is returned. | ||
# We need to retain the original path(str) while also | ||
# pass the S3File().open function to fastparquet impl. | ||
s3, filesystem = get_file_and_filesystem(path) | ||
try: | ||
parquet_file = self.api.ParquetFile(path, open_with=filesystem.open) | ||
finally: | ||
s3.close() | ||
open_with = lambda path, _: fsspec.open(path, "rb").open() | ||
parquet_file = self.api.ParquetFile(path, open_with=open_with) | ||
else: | ||
path, _, _, _ = get_filepath_or_buffer(path) | ||
parquet_file = self.api.ParquetFile(path) | ||
|
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import numpy as np | ||
import pytest | ||
|
||
from pandas import DataFrame, date_range, read_csv, read_parquet | ||
import pandas._testing as tm | ||
from pandas.util import _test_decorators as td | ||
|
||
df1 = DataFrame( | ||
{ | ||
"int": [1, 3], | ||
"float": [2.0, np.nan], | ||
"str": ["t", "s"], | ||
"dt": date_range("2018-06-18", periods=2), | ||
} | ||
) | ||
text = df1.to_csv(index=False).encode() | ||
TomAugspurger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
@pytest.fixture | ||
def cleared_fs(): | ||
import fsspec | ||
|
||
memfs = fsspec.filesystem("memory") | ||
try: | ||
yield memfs | ||
finally: | ||
memfs.store.clear() | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
@td.skip_if_no("fsspec") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you |
||
def test_read_csv(cleared_fs): | ||
from fsspec.implementations.memory import MemoryFile | ||
|
||
cleared_fs.store["test/test.csv"] = MemoryFile(data=text) | ||
df2 = read_csv("memory://test/test.csv", parse_dates=["dt"]) | ||
|
||
tm.assert_frame_equal(df1, df2) | ||
|
||
|
||
@td.skip_if_no("fsspec") | ||
def test_reasonable_error(monkeypatch): | ||
from fsspec.registry import known_implementations | ||
from fsspec import registry | ||
|
||
registry.target.clear() | ||
with pytest.raises(ValueError) as e: | ||
read_csv("nosuchprotocol://test/test.csv") | ||
assert "nosuchprotocol" in str(e.value) | ||
err_mgs = "test error messgae" | ||
monkeypatch.setitem( | ||
known_implementations, | ||
"couldexist", | ||
{"class": "unimportable.CouldExist", "err": err_mgs}, | ||
) | ||
with pytest.raises(ImportError) as e: | ||
read_csv("couldexist://test/test.csv") | ||
assert err_mgs in str(e.value) | ||
|
||
|
||
@td.skip_if_no("fsspec") | ||
def test_to_csv(cleared_fs): | ||
df1.to_csv("memory://test/test.csv", index=True) | ||
df2 = read_csv("memory://test/test.csv", parse_dates=["dt"], index_col=0) | ||
|
||
tm.assert_frame_equal(df1, df2) | ||
|
||
|
||
@td.skip_if_no("fastparquet") | ||
@td.skip_if_no("fsspec") | ||
def test_to_parquet_new_file(monkeypatch): | ||
"""Regression test for writing to a not-yet-existent GCS Parquet file.""" | ||
df1.to_parquet( | ||
"memory://test/test.csv", index=True, engine="fastparquet", compression=None | ||
) | ||
|
||
|
||
@td.skip_if_no("s3fs") | ||
def test_from_s3_csv(s3_resource, tips_file): | ||
tm.assert_equal(read_csv("s3://pandas-test/tips.csv"), read_csv(tips_file)) | ||
# the following are decompressed by pandas, not fsspec | ||
tm.assert_equal(read_csv("s3://pandas-test/tips.csv.gz"), read_csv(tips_file)) | ||
tm.assert_equal(read_csv("s3://pandas-test/tips.csv.bz2"), read_csv(tips_file)) | ||
|
||
|
||
@td.skip_if_no("s3fs") | ||
@td.skip_if_no("fastparquet") | ||
def test_s3_parquet(s3_resource): | ||
fn = "s3://pandas-test/test.parquet" | ||
df1.to_parquet(fn, index=False, engine="fastparquet", compression=None) | ||
df2 = read_parquet(fn, engine="fastparquet") | ||
tm.assert_equal(df1, df2) | ||
|
||
|
||
@td.skip_if_installed("fsspec") | ||
def test_not_present_exception(): | ||
with pytest.raises(ImportError) as e: | ||
read_csv("memory://test/test.csv") | ||
assert "fsspec library is required" in str(e.value) |
Uh oh!
There was an error while loading. Please reload this page.