diff --git a/doc/source/user_guide/io.rst b/doc/source/user_guide/io.rst index 965833c013c03..b04abf512fbeb 100644 --- a/doc/source/user_guide/io.rst +++ b/doc/source/user_guide/io.rst @@ -1627,6 +1627,20 @@ functions - the following example shows reading a CSV file: df = pd.read_csv("https://download.bls.gov/pub/time.series/cu/cu.item", sep="\t") +.. versionadded:: 1.3.0 + +A custom header can be sent alongside HTTP(s) requests by passing a dictionary +of header key value mappings to the ``storage_options`` keyword argument as shown below: + +.. code-block:: python + + headers = {"User-Agent": "pandas"} + df = pd.read_csv( + "https://download.bls.gov/pub/time.series/cu/cu.item", + sep="\t", + storage_options=headers + ) + All URLs which are not local files or HTTP(s) are handled by `fsspec`_, if installed, and its various filesystem implementations (including Amazon S3, Google Cloud, SSH, FTP, webHDFS...). diff --git a/doc/source/whatsnew/v1.3.0.rst b/doc/source/whatsnew/v1.3.0.rst index 26e548f519ecd..188ef83244be8 100644 --- a/doc/source/whatsnew/v1.3.0.rst +++ b/doc/source/whatsnew/v1.3.0.rst @@ -13,6 +13,26 @@ including other versions of pandas. Enhancements ~~~~~~~~~~~~ +.. _whatsnew_130.read_csv_json_http_headers: + +Custom HTTP(s) headers when reading csv or json files +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +When reading from a remote URL that is not handled by fsspec (ie. HTTP and +HTTPS) the dictionary passed to ``storage_options`` will be used to create the +headers included in the request. This can be used to control the User-Agent +header or send other custom headers (:issue:`36688`). +For example: + +.. ipython:: python + + headers = {"User-Agent": "pandas"} + df = pd.read_csv( + "https://download.bls.gov/pub/time.series/cu/cu.item", + sep="\t", + storage_options=headers + ) + .. _whatsnew_130.enhancements.other: diff --git a/pandas/core/shared_docs.py b/pandas/core/shared_docs.py index 3aeb3b664b27f..6d3249802ee5e 100644 --- a/pandas/core/shared_docs.py +++ b/pandas/core/shared_docs.py @@ -383,8 +383,7 @@ "storage_options" ] = """storage_options : dict, optional Extra options that make sense for a particular storage connection, e.g. - host, port, username, password, etc., if using a URL that will - be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error - will be raised if providing this argument with a non-fsspec URL. - See the fsspec and backend storage implementation docs for the set of - allowed keys and values.""" + host, port, username, password, etc. For HTTP(S) URLs the key-value pairs + are forwarded to ``urllib`` as header options. For other URLs (e.g. + starting with "s3://", and "gcs://") the key-value pairs are forwarded to + ``fsspec``. Please see ``fsspec`` and ``urllib`` for more details.""" diff --git a/pandas/io/common.py b/pandas/io/common.py index 9fede5180e727..250c9422213e7 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -276,12 +276,18 @@ def _get_filepath_or_buffer( fsspec_mode += "b" if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer): - # TODO: fsspec can also handle HTTP via requests, but leaving this unchanged - if storage_options: - raise ValueError( - "storage_options passed with file object or non-fsspec file path" - ) - req = urlopen(filepath_or_buffer) + # TODO: fsspec can also handle HTTP via requests, but leaving this + # unchanged. using fsspec appears to break the ability to infer if the + # server responded with gzipped data + storage_options = storage_options or {} + + # waiting until now for importing to match intended lazy logic of + # urlopen function defined elsewhere in this module + import urllib.request + + # assuming storage_options is to be interpretted as headers + req_info = urllib.request.Request(filepath_or_buffer, headers=storage_options) + req = urlopen(req_info) content_encoding = req.headers.get("Content-Encoding", None) if content_encoding == "gzip": # Override compression based on Content-Encoding header diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 8b1184df92eaf..44b58f244a2ad 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -14,7 +14,13 @@ from pandas import DataFrame, MultiIndex, get_option from pandas.core import generic -from pandas.io.common import IOHandles, get_handle, is_fsspec_url, stringify_path +from pandas.io.common import ( + IOHandles, + get_handle, + is_fsspec_url, + is_url, + stringify_path, +) def get_engine(engine: str) -> "BaseImpl": @@ -66,8 +72,10 @@ def _get_path_or_handle( fs, path_or_handle = fsspec.core.url_to_fs( path_or_handle, **(storage_options or {}) ) - elif storage_options: - raise ValueError("storage_options passed with buffer or non-fsspec filepath") + elif storage_options and (not is_url(path_or_handle) or mode != "rb"): + # can't write to a remote url + # without making use of fsspec at the moment + raise ValueError("storage_options passed with buffer, or non-supported URL") handles = None if ( @@ -79,7 +87,9 @@ def _get_path_or_handle( # use get_handle only when we are very certain that it is not a directory # fsspec resources can also point to directories # this branch is used for example when reading from non-fsspec URLs - handles = get_handle(path_or_handle, mode, is_text=False) + handles = get_handle( + path_or_handle, mode, is_text=False, storage_options=storage_options + ) fs = None path_or_handle = handles.handle return path_or_handle, handles, fs @@ -307,7 +317,9 @@ def read( # use get_handle only when we are very certain that it is not a directory # fsspec resources can also point to directories # this branch is used for example when reading from non-fsspec URLs - handles = get_handle(path, "rb", is_text=False) + handles = get_handle( + path, "rb", is_text=False, storage_options=storage_options + ) path = handles.handle parquet_file = self.api.ParquetFile(path, **parquet_kwargs) @@ -404,10 +416,12 @@ def to_parquet( return None +@doc(storage_options=generic._shared_docs["storage_options"]) def read_parquet( path, engine: str = "auto", columns=None, + storage_options: StorageOptions = None, use_nullable_dtypes: bool = False, **kwargs, ): @@ -432,13 +446,18 @@ def read_parquet( By file-like object, we refer to objects with a ``read()`` method, such as a file handle (e.g. via builtin ``open`` function) or ``StringIO``. - engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' + engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto' Parquet library to use. If 'auto', then the option ``io.parquet.engine`` is used. The default ``io.parquet.engine`` behavior is to try 'pyarrow', falling back to 'fastparquet' if 'pyarrow' is unavailable. columns : list, default=None If not None, only these columns will be read from the file. + + {storage_options} + + .. versionadded:: 1.3.0 + use_nullable_dtypes : bool, default False If True, use dtypes that use ``pd.NA`` as missing value indicator for the resulting DataFrame (only applicable for ``engine="pyarrow"``). @@ -448,6 +467,7 @@ def read_parquet( support dtypes) may change without notice. .. versionadded:: 1.2.0 + **kwargs Any additional kwargs are passed to the engine. @@ -456,6 +476,11 @@ def read_parquet( DataFrame """ impl = get_engine(engine) + return impl.read( - path, columns=columns, use_nullable_dtypes=use_nullable_dtypes, **kwargs + path, + columns=columns, + storage_options=storage_options, + use_nullable_dtypes=use_nullable_dtypes, + **kwargs, ) diff --git a/pandas/tests/io/test_user_agent.py b/pandas/tests/io/test_user_agent.py new file mode 100644 index 0000000000000..8894351597903 --- /dev/null +++ b/pandas/tests/io/test_user_agent.py @@ -0,0 +1,309 @@ +""" +Tests for the pandas custom headers in http(s) requests +""" +import gzip +import http.server +from io import BytesIO +import threading + +import pytest + +import pandas as pd +import pandas._testing as tm + + +class BaseUserAgentResponder(http.server.BaseHTTPRequestHandler): + """ + Base class for setting up a server that can be set up to respond + with a particular file format with accompanying content-type headers. + The interfaces on the different io methods are different enough + that this seemed logical to do. + """ + + def start_processing_headers(self): + """ + shared logic at the start of a GET request + """ + self.send_response(200) + self.requested_from_user_agent = self.headers["User-Agent"] + response_df = pd.DataFrame( + { + "header": [self.requested_from_user_agent], + } + ) + return response_df + + def gzip_bytes(self, response_bytes): + """ + some web servers will send back gzipped files to save bandwidth + """ + bio = BytesIO() + zipper = gzip.GzipFile(fileobj=bio, mode="w") + zipper.write(response_bytes) + zipper.close() + response_bytes = bio.getvalue() + return response_bytes + + def write_back_bytes(self, response_bytes): + """ + shared logic at the end of a GET request + """ + self.wfile.write(response_bytes) + + +class CSVUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + + self.send_header("Content-Type", "text/csv") + self.end_headers() + + response_bytes = response_df.to_csv(index=False).encode("utf-8") + self.write_back_bytes(response_bytes) + + +class GzippedCSVUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "text/csv") + self.send_header("Content-Encoding", "gzip") + self.end_headers() + + response_bytes = response_df.to_csv(index=False).encode("utf-8") + response_bytes = self.gzip_bytes(response_bytes) + + self.write_back_bytes(response_bytes) + + +class JSONUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/json") + self.end_headers() + + response_bytes = response_df.to_json().encode("utf-8") + + self.write_back_bytes(response_bytes) + + +class GzippedJSONUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/json") + self.send_header("Content-Encoding", "gzip") + self.end_headers() + + response_bytes = response_df.to_json().encode("utf-8") + response_bytes = self.gzip_bytes(response_bytes) + + self.write_back_bytes(response_bytes) + + +class ParquetPyArrowUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + + response_bytes = response_df.to_parquet(index=False, engine="pyarrow") + + self.write_back_bytes(response_bytes) + + +class ParquetFastParquetUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + + # the fastparquet engine doesn't like to write to a buffer + # it can do it via the open_with function being set appropriately + # however it automatically calls the close method and wipes the buffer + # so just overwrite that attribute on this instance to not do that + + # protected by an importorskip in the respective test + import fsspec + + response_df.to_parquet( + "memory://fastparquet_user_agent.parquet", + index=False, + engine="fastparquet", + compression=None, + ) + with fsspec.open("memory://fastparquet_user_agent.parquet", "rb") as f: + response_bytes = f.read() + + self.write_back_bytes(response_bytes) + + +class PickleUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + + bio = BytesIO() + response_df.to_pickle(bio) + response_bytes = bio.getvalue() + + self.write_back_bytes(response_bytes) + + +class StataUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + + bio = BytesIO() + response_df.to_stata(bio, write_index=False) + response_bytes = bio.getvalue() + + self.write_back_bytes(response_bytes) + + +class AllHeaderCSVResponder(http.server.BaseHTTPRequestHandler): + """ + Send all request headers back for checking round trip + """ + + def do_GET(self): + response_df = pd.DataFrame(self.headers.items()) + self.send_response(200) + self.send_header("Content-Type", "text/csv") + self.end_headers() + response_bytes = response_df.to_csv(index=False).encode("utf-8") + self.wfile.write(response_bytes) + + +@pytest.mark.parametrize( + "responder, read_method, port, parquet_engine", + [ + (CSVUserAgentResponder, pd.read_csv, 34259, None), + (JSONUserAgentResponder, pd.read_json, 34260, None), + (ParquetPyArrowUserAgentResponder, pd.read_parquet, 34268, "pyarrow"), + (ParquetFastParquetUserAgentResponder, pd.read_parquet, 34273, "fastparquet"), + (PickleUserAgentResponder, pd.read_pickle, 34271, None), + (StataUserAgentResponder, pd.read_stata, 34272, None), + (GzippedCSVUserAgentResponder, pd.read_csv, 34261, None), + (GzippedJSONUserAgentResponder, pd.read_json, 34262, None), + ], +) +def test_server_and_default_headers(responder, read_method, port, parquet_engine): + if parquet_engine is not None: + pytest.importorskip(parquet_engine) + if parquet_engine == "fastparquet": + pytest.importorskip("fsspec") + + server = http.server.HTTPServer(("localhost", port), responder) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + if parquet_engine is None: + df_http = read_method(f"http://localhost:{port}") + else: + df_http = read_method(f"http://localhost:{port}", engine=parquet_engine) + server.shutdown() + server.server_close() + server_thread.join() + assert not df_http.empty + + +@pytest.mark.parametrize( + "responder, read_method, port, parquet_engine", + [ + (CSVUserAgentResponder, pd.read_csv, 34263, None), + (JSONUserAgentResponder, pd.read_json, 34264, None), + (ParquetPyArrowUserAgentResponder, pd.read_parquet, 34270, "pyarrow"), + (ParquetFastParquetUserAgentResponder, pd.read_parquet, 34275, "fastparquet"), + (PickleUserAgentResponder, pd.read_pickle, 34273, None), + (StataUserAgentResponder, pd.read_stata, 34274, None), + (GzippedCSVUserAgentResponder, pd.read_csv, 34265, None), + (GzippedJSONUserAgentResponder, pd.read_json, 34266, None), + ], +) +def test_server_and_custom_headers(responder, read_method, port, parquet_engine): + if parquet_engine is not None: + pytest.importorskip(parquet_engine) + if parquet_engine == "fastparquet": + pytest.importorskip("fsspec") + + custom_user_agent = "Super Cool One" + df_true = pd.DataFrame({"header": [custom_user_agent]}) + server = http.server.HTTPServer(("localhost", port), responder) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + + if parquet_engine is None: + df_http = read_method( + f"http://localhost:{port}", + storage_options={"User-Agent": custom_user_agent}, + ) + else: + df_http = read_method( + f"http://localhost:{port}", + storage_options={"User-Agent": custom_user_agent}, + engine=parquet_engine, + ) + server.shutdown() + + server.server_close() + server_thread.join() + + tm.assert_frame_equal(df_true, df_http) + + +@pytest.mark.parametrize( + "responder, read_method, port", + [ + (AllHeaderCSVResponder, pd.read_csv, 34267), + ], +) +def test_server_and_all_custom_headers(responder, read_method, port): + custom_user_agent = "Super Cool One" + custom_auth_token = "Super Secret One" + storage_options = { + "User-Agent": custom_user_agent, + "Auth": custom_auth_token, + } + server = http.server.HTTPServer(("localhost", port), responder) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + + df_http = read_method( + f"http://localhost:{port}", + storage_options=storage_options, + ) + server.shutdown() + server.server_close() + server_thread.join() + + df_http = df_http[df_http["0"].isin(storage_options.keys())] + df_http = df_http.sort_values(["0"]).reset_index() + df_http = df_http[["0", "1"]] + + keys = list(storage_options.keys()) + df_true = pd.DataFrame({"0": keys, "1": [storage_options[k] for k in keys]}) + df_true = df_true.sort_values(["0"]) + df_true = df_true.reset_index().drop(["index"], axis=1) + + tm.assert_frame_equal(df_true, df_http) + + +@pytest.mark.parametrize( + "engine", + [ + "pyarrow", + "fastparquet", + ], +) +def test_to_parquet_to_disk_with_storage_options(engine): + headers = { + "User-Agent": "custom", + "Auth": "other_custom", + } + + pytest.importorskip(engine) + + true_df = pd.DataFrame({"column_name": ["column_value"]}) + with pytest.raises(ValueError): + true_df.to_parquet("/tmp/junk.parquet", storage_options=headers, engine=engine)