From 8afaa63fc43164ecc94067be89f8f75fff40dde9 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 3 May 2024 11:44:50 -0700 Subject: [PATCH 1/3] refactor(geomap): add type hints, refactor test_archive --- .../data_proc/geomap/geo_data_proc.py | 7 +- _delphi_utils_python/delphi_utils/geomap.py | 102 +++++++++++------- _delphi_utils_python/tests/test_archive.py | 40 +++---- _delphi_utils_python/tests/testing.py | 23 ++++ 4 files changed, 102 insertions(+), 70 deletions(-) create mode 100644 _delphi_utils_python/tests/testing.py diff --git a/_delphi_utils_python/data_proc/geomap/geo_data_proc.py b/_delphi_utils_python/data_proc/geomap/geo_data_proc.py index 287667812..c2a07a78f 100755 --- a/_delphi_utils_python/data_proc/geomap/geo_data_proc.py +++ b/_delphi_utils_python/data_proc/geomap/geo_data_proc.py @@ -12,7 +12,6 @@ from os import remove, listdir from os.path import join, isfile from zipfile import ZipFile -from pandas.core.frame import DataFrame import requests import pandas as pd @@ -70,8 +69,8 @@ def create_fips_zip_crosswalk(): # Find the population fractions (the heaviest computation, takes about a minute) # Note that the denominator in the fractions is the source population pop_df.set_index(["fips", "zip"], inplace=True) - fips_zip: DataFrame = pop_df.groupby("fips", as_index=False).apply(lambda g: g["pop"] / g["pop"].sum()) - zip_fips: DataFrame = pop_df.groupby("zip", as_index=False).apply(lambda g: g["pop"] / g["pop"].sum()) + fips_zip: pd.DataFrame = pop_df.groupby("fips", as_index=False).apply(lambda g: g["pop"] / g["pop"].sum()) + zip_fips: pd.DataFrame = pop_df.groupby("zip", as_index=False).apply(lambda g: g["pop"] / g["pop"].sum()) # Rename and write to file fips_zip = fips_zip.reset_index(level=["fips", "zip"]).rename(columns={"pop": "weight"}).query("weight > 0.0") @@ -228,7 +227,7 @@ def create_state_population_table(): derive_fips_state_crosswalk() census_pop = pd.read_csv(join(OUTPUT_DIR, FIPS_POPULATION_OUT_FILENAME), dtype={"fips": str, "pop": int}) - state: DataFrame = pd.read_csv(join(OUTPUT_DIR, FIPS_STATE_OUT_FILENAME), dtype=str) + state: pd.DataFrame = pd.read_csv(join(OUTPUT_DIR, FIPS_STATE_OUT_FILENAME), dtype=str) state_pop = state.merge(census_pop, on="fips").groupby(["state_code", "state_id", "state_name"], as_index=False).sum() state_pop.sort_values("state_code").to_csv(join(OUTPUT_DIR, STATE_POPULATION_OUT_FILENAME), index=False) diff --git a/_delphi_utils_python/delphi_utils/geomap.py b/_delphi_utils_python/delphi_utils/geomap.py index f43b80504..8fe7f521c 100644 --- a/_delphi_utils_python/delphi_utils/geomap.py +++ b/_delphi_utils_python/delphi_utils/geomap.py @@ -1,15 +1,12 @@ """Contains geographic mapping tools. Authors: Dmitry Shemetov @dshemetov, James Sharpnack @jsharpna, Maria Jahja -Created: 2020-06-01 - -TODO: -- use a caching utility to store the crossfiles - see: https://github.com/cmu-delphi/covidcast-indicators/issues/282 """ + # pylint: disable=too-many-lines from os.path import join from collections import defaultdict +from typing import Iterator, List, Literal, Optional, Set, Union import pandas as pd import pkg_resources @@ -106,7 +103,7 @@ class GeoMapper: # pylint: disable=too-many-public-methods "nation": {"pop": "nation_pop.csv"}, } - def __init__(self, census_year=2020): + def __init__(self, census_year: int = 2020): """Initialize geomapper. Parameters @@ -137,7 +134,9 @@ def __init__(self, census_year=2020): for geo_type in self._geos: self._geo_sets[geo_type] = self._load_geo_values(geo_type) - def _load_crosswalk_from_file(self, from_code, to_code, data_path): + def _load_crosswalk_from_file( + self, from_code: str, to_code: str, data_path: str + ) -> pd.DataFrame: stream = pkg_resources.resource_stream(__name__, data_path) dtype = { from_code: str, @@ -167,7 +166,9 @@ def _load_geo_values(self, geo_type): return set(crosswalk[geo_type]) @staticmethod - def convert_fips_to_mega(data, fips_col="fips", mega_col="megafips"): + def convert_fips_to_mega( + data: pd.DataFrame, fips_col: str = "fips", mega_col: str = "megafips" + ) -> pd.DataFrame: """Convert fips or chng-fips string to a megafips string.""" data = data.copy() data[mega_col] = data[fips_col].astype(str).str.zfill(5) @@ -176,14 +177,14 @@ def convert_fips_to_mega(data, fips_col="fips", mega_col="megafips"): @staticmethod def megacounty_creation( - data, - thr_count, - thr_win_len, - thr_col="visits", - fips_col="fips", - date_col="timestamp", - mega_col="megafips", - ): + data: pd.DataFrame, + thr_count: Union[float, int], + thr_win_len: int, + thr_col: str = "visits", + fips_col: str = "fips", + date_col: str = "timestamp", + mega_col: str = "megafips", + ) -> pd.DataFrame: """Create megacounty column. Parameters @@ -205,7 +206,7 @@ def megacounty_creation( if "_thr_col_roll" in data.columns: raise ValueError("Column name '_thr_col_roll' is reserved.") - def agg_sum_iter(data): + def agg_sum_iter(data: pd.DataFrame) -> Iterator[pd.DataFrame]: data_gby = ( data[[fips_col, date_col, thr_col]] .set_index(date_col) @@ -228,7 +229,13 @@ def agg_sum_iter(data): # Conversion functions def add_geocode( - self, df, from_code, new_code, from_col=None, new_col=None, dropna=True + self, + df: pd.DataFrame, + from_code: str, + new_code: str, + from_col: Optional[str] = None, + new_col: Optional[str] = None, + dropna: bool = True, ): """Add a new geocode column to a dataframe. @@ -316,7 +323,9 @@ def add_geocode( return df - def _add_nation_geocode(self, df, from_code, from_col, new_col): + def _add_nation_geocode( + self, df: pd.DataFrame, from_code: str, from_col: str, new_col: str + ) -> pd.DataFrame: """Add a nation geocode column to a dataframe. See `add_geocode()` documentation for argument description. @@ -334,15 +343,15 @@ def _add_nation_geocode(self, df, from_code, from_col, new_col): def replace_geocode( self, - df, - from_code, - new_code, - from_col=None, - new_col=None, - date_col="timestamp", - data_cols=None, - dropna=True, - ): + df: pd.DataFrame, + from_code: str, + new_code: str, + from_col: Optional[str] = None, + new_col: Optional[str] = None, + date_col: Optional[str] = "timestamp", + data_cols: Optional[List[str]] = None, + dropna: bool = True, + ) -> pd.DataFrame: """Replace a geocode column in a dataframe. Currently supported conversions: @@ -403,7 +412,13 @@ def replace_geocode( df = df.groupby([new_col]).sum(numeric_only=True).reset_index() return df - def add_population_column(self, data, geocode_type, geocode_col=None, dropna=True): + def add_population_column( + self, + data: pd.DataFrame, + geocode_type: Literal["fips", "zip"], + geocode_col: Optional[str] = None, + dropna: bool = True, + ) -> pd.DataFrame: """ Append a population column to a dataframe, based on the FIPS or ZIP code. @@ -451,15 +466,15 @@ def add_population_column(self, data, geocode_type, geocode_col=None, dropna=Tru @staticmethod def fips_to_megacounty( - data, - thr_count, - thr_win_len, - thr_col="visits", - fips_col="fips", - date_col="timestamp", - mega_col="megafips", + data: pd.DataFrame, + thr_count: Union[float, int], + thr_win_len: int, + thr_col: str = "visits", + fips_col: str = "fips", + date_col: str = "timestamp", + mega_col: str = "megafips", count_cols=None, - ): + ) -> pd.DataFrame: """Convert and aggregate from FIPS or chng-fips to megaFIPS. Parameters @@ -501,7 +516,7 @@ def fips_to_megacounty( data = data.reset_index().groupby([date_col, mega_col]).sum(numeric_only=True) return data.reset_index() - def as_mapper_name(self, geo_type, state="state_id"): + def as_mapper_name(self, geo_type: str, state: str = "state_id") -> str: """ Return the mapper equivalent of a region type. @@ -513,7 +528,7 @@ def as_mapper_name(self, geo_type, state="state_id"): return "fips" return geo_type - def get_crosswalk(self, from_code, to_code): + def get_crosswalk(self, from_code: str, to_code: str) -> pd.DataFrame: """Return a dataframe mapping the given geocodes. Parameters @@ -532,7 +547,7 @@ def get_crosswalk(self, from_code, to_code): except KeyError as e: raise ValueError(f'Mapping from "{from_code}" to "{to_code}" not found.') from e - def get_geo_values(self, geo_type): + def get_geo_values(self, geo_type: str) -> Set[str]: """ Return a set of all values for a given geography type. @@ -551,7 +566,12 @@ def get_geo_values(self, geo_type): except KeyError as e: raise ValueError(f'Given geo type "{geo_type}" not found') from e - def get_geos_within(self, container_geocode, contained_geocode_type, container_geocode_type): + def get_geos_within( + self, + container_geocode: str, + contained_geocode_type: str, + container_geocode_type: str, + ) -> Set[str]: """ Return all contained regions of the given type within the given container geocode. diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index e821e011b..589b55513 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -2,10 +2,11 @@ from io import StringIO, BytesIO from os import listdir, mkdir from os.path import join -from typing import Any, Dict, List +from typing import Dict, List from boto3 import Session -from git import Repo, exc +from git import Repo +from git.exc import InvalidGitRepositoryError import mock from moto import mock_s3 import numpy as np @@ -16,6 +17,7 @@ from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\ archiver_from_params from delphi_utils.nancodes import Nans +from testing import set_df_dtypes CSV_DTYPES = { "geo_id": str, "val": float, "se": float, "sample_size": float, @@ -26,20 +28,12 @@ class Example: def __init__(self, before, after, diff): def fix_df(df): if isinstance(df, pd.DataFrame): - return Example._set_df_datatypes(df, CSV_DTYPES) + return set_df_dtypes(df, CSV_DTYPES) return df self.before = fix_df(before) self.after = fix_df(after) self.diff = fix_df(diff) - @staticmethod - def _set_df_datatypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame: - df = df.copy() - for k, v in dtypes.items(): - if k in df.columns: - df[k] = df[k].astype(v) - return df - @dataclass class Expecteds: deleted: List[str] @@ -194,9 +188,6 @@ def __post_init__(self): assert set(EXPECTEDS.new) == set(f"{csv_name}.csv" for csv_name, dfs in CSVS.items() if dfs.before is None), \ "Bad programmer: added more new files to CSVS.after without updating EXPECTEDS.new" -def _assert_frames_equal_ignore_row_order(df1, df2, index_cols: List[str] = None): - return assert_frame_equal(df1.set_index(index_cols).sort_index(), df2.set_index(index_cols).sort_index()) - class ArchiveDifferTestlike: def set_up(self, tmp_path): cache_dir = join(str(tmp_path), "cache") @@ -209,10 +200,10 @@ def check_filtered_exports(self, export_dir): assert set(listdir(export_dir)) == set(EXPECTEDS.filtered_exports) for f in EXPECTEDS.filtered_exports: example = CSVS[f.replace(".csv", "")] - _assert_frames_equal_ignore_row_order( - pd.read_csv(join(export_dir, f), dtype=CSV_DTYPES), - example.after if example.diff is None else example.diff, - index_cols=["geo_id"] + example = example.after if example.diff is None else example.diff + assert_frame_equal( + pd.read_csv(join(export_dir, f), dtype=CSV_DTYPES).sort_values("geo_id", ignore_index=True), + example.sort_values("geo_id", ignore_index=True) ) class TestArchiveDiffer(ArchiveDifferTestlike): @@ -264,14 +255,13 @@ def test_diff_and_filter_exports(self, tmp_path): # Check that the diff files look as expected for key, diff_name in EXPECTEDS.common_diffs.items(): - if diff_name is None: continue - _assert_frames_equal_ignore_row_order( - pd.read_csv(join(export_dir, diff_name), dtype=CSV_DTYPES), - CSVS[key.replace(".csv", "")].diff, - index_cols=["geo_id"] + if diff_name is None: + continue + assert_frame_equal( + pd.read_csv(join(export_dir, diff_name), dtype=CSV_DTYPES).sort_values("geo_id", ignore_index=True), + CSVS[key.replace(".csv", "")].diff.sort_values("geo_id", ignore_index=True) ) - # Test filter_exports # =================== @@ -406,7 +396,7 @@ def test_init_args(self, tmp_path): GitArchiveDiffer(cache_dir, export_dir, override_dirty=False, commit_partial_success=True) - with pytest.raises(exc.InvalidGitRepositoryError): + with pytest.raises(InvalidGitRepositoryError): GitArchiveDiffer(cache_dir, export_dir) repo = Repo.init(cache_dir) diff --git a/_delphi_utils_python/tests/testing.py b/_delphi_utils_python/tests/testing.py new file mode 100644 index 000000000..7e8f55e90 --- /dev/null +++ b/_delphi_utils_python/tests/testing.py @@ -0,0 +1,23 @@ +"""Common utilities for testing functions.""" +from typing import Any, Dict +import pandas as pd + + +def check_valid_dtype(dtype): + """Check if a dtype is a valid Pandas type.""" + try: + pd.api.types.pandas_dtype(dtype) + except TypeError as e: + raise ValueError(f"Invalid dtype {dtype}") from e + + +def set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame: + """Set the dataframe column datatypes.""" + for d in dtypes.values(): + check_valid_dtype(d) + + df = df.copy() + for k, v in dtypes.items(): + if k in df.columns: + df[k] = df[k].astype(v) + return df From d4b056e7a4c11982324e9224c9f9f6fd5d5ec65c Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 3 May 2024 11:45:07 -0700 Subject: [PATCH 2/3] lint: format geomap.py with black --- _delphi_utils_python/delphi_utils/geomap.py | 97 +++++++++++---------- pyproject.toml | 4 + 2 files changed, 57 insertions(+), 44 deletions(-) create mode 100644 pyproject.toml diff --git a/_delphi_utils_python/delphi_utils/geomap.py b/_delphi_utils_python/delphi_utils/geomap.py index 8fe7f521c..29ae3667e 100644 --- a/_delphi_utils_python/delphi_utils/geomap.py +++ b/_delphi_utils_python/delphi_utils/geomap.py @@ -76,7 +76,7 @@ class GeoMapper: # pylint: disable=too-many-public-methods "msa": "zip_msa_table.csv", "pop": "zip_pop.csv", "state": "zip_state_code_table.csv", - "hhs": "zip_hhs_table.csv" + "hhs": "zip_hhs_table.csv", }, "fips": { "chng-fips": "fips_chng-fips_table.csv", @@ -87,19 +87,12 @@ class GeoMapper: # pylint: disable=too-many-public-methods "state": "fips_state_table.csv", "hhs": "fips_hhs_table.csv", }, + "hhs": {"pop": "hhs_pop.csv"}, "chng-fips": {"state": "chng-fips_state_table.csv"}, "state": {"state": "state_codes_table.csv"}, - "state_code": { - "hhs": "state_code_hhs_table.csv", - "pop": "state_pop.csv" - }, - "state_id": { - "pop": "state_pop.csv" - }, - "state_name": { - "pop": "state_pop.csv" - }, - "hhs": {"pop": "hhs_pop.csv"}, + "state_code": {"hhs": "state_code_hhs_table.csv", "pop": "state_pop.csv"}, + "state_id": {"pop": "state_pop.csv"}, + "state_name": {"pop": "state_pop.csv"}, "nation": {"pop": "nation_pop.csv"}, } @@ -117,19 +110,16 @@ def __init__(self, census_year: int = 2020): # Include all unique geos from first-level and second-level keys in # CROSSWALK_FILENAMES, with a few exceptions self._geos = { - subkey for mainkey in self.CROSSWALK_FILENAMES - for subkey in self.CROSSWALK_FILENAMES[mainkey] - }.union( - set(self.CROSSWALK_FILENAMES.keys()) - ) - set(["state", "pop"]) + subkey + for mainkey in self.CROSSWALK_FILENAMES + for subkey in self.CROSSWALK_FILENAMES[mainkey] + }.union(set(self.CROSSWALK_FILENAMES.keys())) - set(["state", "pop"]) for from_code, to_codes in self.CROSSWALK_FILENAMES.items(): for to_code, file_path in to_codes.items(): - self._crosswalks[from_code][to_code] = \ - self._load_crosswalk_from_file(from_code, - to_code, - join(f"data/{census_year}", file_path) - ) + self._crosswalks[from_code][to_code] = self._load_crosswalk_from_file( + from_code, to_code, join(f"data/{census_year}", file_path) + ) for geo_type in self._geos: self._geo_sets[geo_type] = self._load_geo_values(geo_type) @@ -143,13 +133,13 @@ def _load_crosswalk_from_file( to_code: str, "pop": int, "weight": float, - **{geo: str for geo in self._geos - set("nation")} + **{geo: str for geo in self._geos - set("nation")}, } usecols = [from_code, "pop"] if to_code == "pop" else None return pd.read_csv(stream, dtype=dtype, usecols=usecols) - def _load_geo_values(self, geo_type): + def _load_geo_values(self, geo_type: str) -> Set[str]: if geo_type == "nation": return {"us"} @@ -276,8 +266,9 @@ def add_geocode( df = df.copy() from_col = from_code if from_col is None else from_col new_col = new_code if new_col is None else new_col - assert from_col != new_col, \ - f"Can't use the same column '{from_col}' for both from_col and to_col" + assert ( + from_col != new_col + ), f"Can't use the same column '{from_col}' for both from_col and to_col" state_codes = ["state_code", "state_id", "state_name"] if not is_string_dtype(df[from_col]): @@ -337,7 +328,7 @@ def _add_nation_geocode( return df raise ValueError( - f"Conversion to the nation level is not supported " + "Conversion to the nation level is not supported " f"from {from_code}; try {valid_from_codes}" ) @@ -443,7 +434,15 @@ def add_population_column( """ geocode_col = geocode_type if geocode_col is None else geocode_col data = data.copy() - supported_geos = ["fips", "zip", "state_id", "state_name", "state_code", "hhs", "nation"] + supported_geos = [ + "fips", + "zip", + "state_id", + "state_name", + "state_code", + "hhs", + "nation", + ] if geocode_type not in supported_geos: raise ValueError( f"Only {supported_geos} geocodes supported. For other codes, aggregate those." @@ -457,11 +456,9 @@ def add_population_column( else: data[geocode_col] = data[geocode_col].astype(str) merge_type = "inner" if dropna else "left" - data_with_pop = ( - data - .merge(pop_df, left_on=geocode_col, right_on=geocode_type, how=merge_type) - .rename(columns={"pop": "population"}) - ) + data_with_pop = data.merge( + pop_df, left_on=geocode_col, right_on=geocode_type, how=merge_type + ).rename(columns={"pop": "population"}) return data_with_pop @staticmethod @@ -545,7 +542,9 @@ def get_crosswalk(self, from_code: str, to_code: str) -> pd.DataFrame: try: return self._crosswalks[from_code][to_code] except KeyError as e: - raise ValueError(f'Mapping from "{from_code}" to "{to_code}" not found.') from e + raise ValueError( + f'Mapping from "{from_code}" to "{to_code}" not found.' + ) from e def get_geo_values(self, geo_type: str) -> Set[str]: """ @@ -601,20 +600,30 @@ def get_geos_within( if contained_geocode_type == "state": if container_geocode_type == "nation" and container_geocode == "us": crosswalk = self._crosswalks["state"]["state"] - return set(crosswalk["state_id"]) # pylint: disable=unsubscriptable-object + return set(crosswalk["state_id"]) # pylint: disable=unsubscriptable-object if container_geocode_type == "hhs": crosswalk_hhs = self._crosswalks["fips"]["hhs"] crosswalk_state = self._crosswalks["fips"]["state"] - fips_hhs = crosswalk_hhs[crosswalk_hhs["hhs"] == container_geocode]["fips"] - return set(crosswalk_state[crosswalk_state["fips"].isin(fips_hhs)]["state_id"]) - elif (contained_geocode_type in ("county", "fips", "chng-fips") and - container_geocode_type == "state"): + fips_hhs = crosswalk_hhs[crosswalk_hhs["hhs"] == container_geocode][ + "fips" + ] + return set( + crosswalk_state[crosswalk_state["fips"].isin(fips_hhs)]["state_id"] + ) + elif ( + contained_geocode_type in ("county", "fips", "chng-fips") + and container_geocode_type == "state" + ): contained_geocode_type = self.as_mapper_name(contained_geocode_type) crosswalk = self._crosswalks[contained_geocode_type]["state"] return set( - crosswalk[crosswalk["state_id"] == container_geocode][contained_geocode_type] + crosswalk[crosswalk["state_id"] == container_geocode][ + contained_geocode_type + ] ) - raise ValueError("(contained_geocode_type, container_geocode_type) was " - f"({contained_geocode_type}, {container_geocode_type}), but " - "must be one of (state, nation), (state, hhs), (county, state)" - ", (fips, state), (chng-fips, state)") + raise ValueError( + "(contained_geocode_type, container_geocode_type) was " + f"({contained_geocode_type}, {container_geocode_type}), but " + "must be one of (state, nation), (state, hhs), (county, state)" + ", (fips, state), (chng-fips, state)" + ) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 000000000..9a31b63a0 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[tool.black] +line-length = 120 +target-version = ['py38'] +include = '_delphi_utils_python' From 677131e5d079c8fa8ed77e855c81265d23671e11 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 3 May 2024 11:46:29 -0700 Subject: [PATCH 3/3] repo: ignore format commit blame --- .git-blame-ignore-revs | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .git-blame-ignore-revs diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 000000000..f91c04645 --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1,2 @@ +# Format geomap.py with black +d4b056e7a4c11982324e9224c9f9f6fd5d5ec65c