From 2919a58d4dd9d35619fe15aa25452be7bacfc9b5 Mon Sep 17 00:00:00 2001 From: Kathryn M Mazaitis Date: Mon, 13 Feb 2023 10:49:04 -0500 Subject: [PATCH 1/5] Move logger module to acquisition.common * includes deployment --- deploy.json | 9 +++++++++ src/acquisition/{covidcast => common}/logger.py | 0 .../covidcast/covidcast_meta_cache_updater.py | 2 +- src/acquisition/covidcast/csv_importer.py | 2 +- src/acquisition/covidcast/csv_to_database.py | 2 +- src/acquisition/covidcast/database.py | 2 +- src/acquisition/covidcast/delete_batch.py | 2 +- src/acquisition/covidcast/file_archiver.py | 2 +- src/acquisition/covidcast/signal_dash_data_generator.py | 2 +- 9 files changed, 16 insertions(+), 7 deletions(-) rename src/acquisition/{covidcast => common}/logger.py (100%) diff --git a/deploy.json b/deploy.json index 45b45883e..9956268b1 100644 --- a/deploy.json +++ b/deploy.json @@ -40,6 +40,15 @@ "add-header-comment": true }, + "// acquisition - common", + { + "type": "move", + "src": "src/acquisition/common/", + "dst": "[[package]]/acquisition/common/", + "match": "^.*\\.(py)$", + "add-header-comment": true + }, + "// acquisition - fluview", { "type": "move", diff --git a/src/acquisition/covidcast/logger.py b/src/acquisition/common/logger.py similarity index 100% rename from src/acquisition/covidcast/logger.py rename to src/acquisition/common/logger.py diff --git a/src/acquisition/covidcast/covidcast_meta_cache_updater.py b/src/acquisition/covidcast/covidcast_meta_cache_updater.py index a46345b62..b4eff0d08 100644 --- a/src/acquisition/covidcast/covidcast_meta_cache_updater.py +++ b/src/acquisition/covidcast/covidcast_meta_cache_updater.py @@ -7,7 +7,7 @@ # first party from delphi.epidata.acquisition.covidcast.database import Database -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger from delphi.epidata.client.delphi_epidata import Epidata def get_argument_parser(): diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index 0fa936802..3eaec7d2a 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -16,7 +16,7 @@ from delphi_utils import Nans from delphi.utils.epiweek import delta_epiweeks from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger DataFrameRow = NamedTuple('DFRow', [ ('geo_id', str), diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index 842e820c9..90270cb27 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -11,7 +11,7 @@ from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger def get_argument_parser(): diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 3beedac82..347c85841 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -14,7 +14,7 @@ # first party import delphi.operations.secrets as secrets -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow diff --git a/src/acquisition/covidcast/delete_batch.py b/src/acquisition/covidcast/delete_batch.py index fe40897fd..ae6ddc487 100644 --- a/src/acquisition/covidcast/delete_batch.py +++ b/src/acquisition/covidcast/delete_batch.py @@ -8,7 +8,7 @@ # first party from delphi.epidata.acquisition.covidcast.database import Database -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger def get_argument_parser(): diff --git a/src/acquisition/covidcast/file_archiver.py b/src/acquisition/covidcast/file_archiver.py index 92686f3cf..368677133 100644 --- a/src/acquisition/covidcast/file_archiver.py +++ b/src/acquisition/covidcast/file_archiver.py @@ -6,7 +6,7 @@ import shutil # first party -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger class FileArchiver: """Archives files by moving and compressing.""" diff --git a/src/acquisition/covidcast/signal_dash_data_generator.py b/src/acquisition/covidcast/signal_dash_data_generator.py index 2e7467487..431dae9fd 100644 --- a/src/acquisition/covidcast/signal_dash_data_generator.py +++ b/src/acquisition/covidcast/signal_dash_data_generator.py @@ -15,7 +15,7 @@ # first party import covidcast import delphi.operations.secrets as secrets -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger LOOKBACK_DAYS_FOR_COVERAGE = 56 From a55cc108fd1f4ac77664f90a87069f418e47037d Mon Sep 17 00:00:00 2001 From: Kathryn M Mazaitis Date: Mon, 13 Feb 2023 10:49:59 -0500 Subject: [PATCH 2/5] Convert covid_hosp to use structured logger * also adds some previously-absent logging --- src/acquisition/covid_hosp/common/database.py | 16 +++++-- src/acquisition/covid_hosp/common/network.py | 11 ++--- src/acquisition/covid_hosp/common/utils.py | 45 ++++++++++++++----- .../covid_hosp/facility/database.py | 2 +- 4 files changed, 54 insertions(+), 20 deletions(-) diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index 8875828fa..c2b9530b0 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -124,7 +124,7 @@ def contains_revision(self, revision): for (result,) in cursor: return bool(result) - def insert_metadata(self, publication_date, revision, meta_json): + def insert_metadata(self, publication_date, revision, meta_json, logger=False): """Add revision metadata to the database. Parameters @@ -135,6 +135,8 @@ def insert_metadata(self, publication_date, revision, meta_json): Unique revision string. meta_json : str Metadata serialized as a JSON string. + logger structlog.Logger [optional; default False] + Logger to receive messages """ with self.new_cursor() as cursor: @@ -152,7 +154,7 @@ def insert_metadata(self, publication_date, revision, meta_json): (%s, %s, %s, %s, %s, NOW()) ''', (self.table_name, self.hhs_dataset_id, publication_date, revision, meta_json)) - def insert_dataset(self, publication_date, dataframe): + def insert_dataset(self, publication_date, dataframe, logger=False): """Add a dataset to the database. Parameters @@ -161,6 +163,8 @@ def insert_dataset(self, publication_date, dataframe): Date when the dataset was published in YYYYMMDD format. dataframe : pandas.DataFrame The dataset. + logger structlog.Logger [optional; default False] + Logger to receive messages. """ dataframe_columns_and_types = [ x for x in self.columns_and_types.values() if x.csv_name in dataframe.columns @@ -181,6 +185,8 @@ def nan_safe_dtype(dtype, value): sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \ f'VALUES ({value_placeholders})' id_and_publication_date = (0, publication_date) + if logger: + logger.info("updating values") with self.new_cursor() as cursor: for _, row in dataframe.iterrows(): values = [] @@ -193,6 +199,8 @@ def nan_safe_dtype(dtype, value): # deal with non/seldomly updated columns used like a fk table (if this database needs it) if hasattr(self, 'AGGREGATE_KEY_COLS'): + if logger: + logger.info("updating keys") ak_cols = self.AGGREGATE_KEY_COLS # restrict data to just the key columns and remove duplicate rows @@ -225,7 +233,7 @@ def nan_safe_dtype(dtype, value): cur.executemany(ak_insert_sql, ak_data) - def get_max_issue(self): + def get_max_issue(self, logger=False): """Fetch the most recent issue. This is used to bookend what updates we pull in from the HHS metadata. @@ -242,4 +250,6 @@ def get_max_issue(self): for (result,) in cursor: if result is not None: return pd.Timestamp(str(result)) + if logger: + logger.info("get_max_issue", msg="no matching results in meta table; returning 1900/1/1 epoch") return pd.Timestamp("1900/1/1") diff --git a/src/acquisition/covid_hosp/common/network.py b/src/acquisition/covid_hosp/common/network.py index ba0cca281..0482f4129 100644 --- a/src/acquisition/covid_hosp/common/network.py +++ b/src/acquisition/covid_hosp/common/network.py @@ -6,7 +6,7 @@ class Network: METADATA_URL_TEMPLATE = \ 'https://healthdata.gov/api/views/%s/rows.csv' - def fetch_metadata_for_dataset(dataset_id): + def fetch_metadata_for_dataset(dataset_id, logger=False): """Download and return metadata. Parameters @@ -20,14 +20,15 @@ def fetch_metadata_for_dataset(dataset_id): The metadata object. """ url = Network.METADATA_URL_TEMPLATE % dataset_id - print(f'fetching metadata at {url}') + if logger: + logger.info('fetching metadata', url=url) df = Network.fetch_dataset(url) df["Update Date"] = pandas.to_datetime(df["Update Date"]) df.sort_values("Update Date", inplace=True) df.set_index("Update Date", inplace=True) return df - def fetch_dataset(url, pandas_impl=pandas): + def fetch_dataset(url, pandas_impl=pandas, logger=False): """Download and return a dataset. Type inference is disabled in favor of explicit type casting at the @@ -44,6 +45,6 @@ def fetch_dataset(url, pandas_impl=pandas): pandas.DataFrame The dataset. """ - - print(f'fetching dataset at {url}') + if logger: + logger.info('fetching dataset', url=url) return pandas_impl.read_csv(url, dtype=str) diff --git a/src/acquisition/covid_hosp/common/utils.py b/src/acquisition/covid_hosp/common/utils.py index 99a6b4f33..db8918cb8 100644 --- a/src/acquisition/covid_hosp/common/utils.py +++ b/src/acquisition/covid_hosp/common/utils.py @@ -6,6 +6,8 @@ import pandas as pd +from delphi.epidata.acquisition.common.logger import get_structured_logger + class CovidHospException(Exception): """Exception raised exclusively by `covid_hosp` utilities.""" @@ -69,7 +71,15 @@ def parse_bool(value): return False raise CovidHospException(f'cannot convert "{value}" to bool') - def issues_to_fetch(metadata, newer_than, older_than): + def limited_string_fn(length): + def limited_string(value): + value = str(value) + if len(value) > length: + raise CovidHospException(f"Value '{value}':{len(value)} longer than max {length}") + return value + return limited_string + + def issues_to_fetch(metadata, newer_than, older_than, logger=False): """ Construct all issue dates and URLs to be ingested based on metadata. @@ -81,6 +91,8 @@ def issues_to_fetch(metadata, newer_than, older_than): Lower bound (exclusive) of days to get issues for. older_than Date Upper bound (exclusive) of days to get issues for + logger structlog.Logger [optional; default False] + Logger to receive messages Returns ------- Dictionary of {issue day: list of (download urls, index)} @@ -100,11 +112,12 @@ def issues_to_fetch(metadata, newer_than, older_than): elif day >= older_than: n_beyond += 1 if n_beyond > 0: - print(f"{n_beyond} issues available on {older_than} or newer") + if logger: + logger.info("issues available", on_or_newer=older_than, count=n_beyond) return daily_issues @staticmethod - def merge_by_key_cols(dfs, key_cols): + def merge_by_key_cols(dfs, key_cols, logger=False): """Merge a list of data frames as a series of updates. Parameters: @@ -113,6 +126,8 @@ def merge_by_key_cols(dfs, key_cols): Data frames to merge, ordered from earliest to latest. key_cols: list(str) Columns to use as the index. + logger structlog.Logger [optional; default False] + Logger to receive messages Returns a single data frame containing the most recent data for each state+date. """ @@ -120,6 +135,11 @@ def merge_by_key_cols(dfs, key_cols): dfs = [df.set_index(key_cols) for df in dfs if not all(k in df.index.names for k in key_cols)] result = dfs[0] + if logger and len(dfs) > 7: + logger.warning( + "expensive operation", + msg="concatenating more than 7 files may result in long running times", + count=len(dfs)) for df in dfs[1:]: # update values for existing keys result.update(df) @@ -153,22 +173,25 @@ def update_dataset(database, network, newer_than=None, older_than=None): bool Whether a new dataset was acquired. """ - metadata = network.fetch_metadata() + logger = get_structured_logger(f"{database.__class__.__module__}.{database.__class__.__name__}.update_dataset") + + metadata = network.fetch_metadata(logger=logger) datasets = [] with database.connect() as db: - max_issue = db.get_max_issue() + max_issue = db.get_max_issue(logger=logger) older_than = datetime.datetime.today().date() if newer_than is None else older_than newer_than = max_issue if newer_than is None else newer_than - daily_issues = Utils.issues_to_fetch(metadata, newer_than, older_than) + daily_issues = Utils.issues_to_fetch(metadata, newer_than, older_than, logger=logger) if not daily_issues: - print("no new issues, nothing to do") + logger.info("no new issues; nothing to do") return False for issue, revisions in daily_issues.items(): issue_int = int(issue.strftime("%Y%m%d")) # download the dataset and add it to the database dataset = Utils.merge_by_key_cols([network.fetch_dataset(url) for url, _ in revisions], - db.KEY_COLS) + db.KEY_COLS, + logger=logger) # add metadata to the database all_metadata = [] for url, index in revisions: @@ -180,10 +203,10 @@ def update_dataset(database, network, newer_than=None, older_than=None): )) with database.connect() as db: for issue_int, dataset, all_metadata in datasets: - db.insert_dataset(issue_int, dataset) + db.insert_dataset(issue_int, dataset, logger=logger) for url, metadata_json in all_metadata: - db.insert_metadata(issue_int, url, metadata_json) - print(f'successfully acquired {len(dataset)} rows') + db.insert_metadata(issue_int, url, metadata_json, logger=logger) + logger.info("acquired rows", count=len(dataset)) # note that the transaction is committed by exiting the `with` block return True diff --git a/src/acquisition/covid_hosp/facility/database.py b/src/acquisition/covid_hosp/facility/database.py index 665256a4f..d84e53322 100644 --- a/src/acquisition/covid_hosp/facility/database.py +++ b/src/acquisition/covid_hosp/facility/database.py @@ -40,7 +40,7 @@ class Database(BaseDatabase): Columndef('ccn', 'ccn', str), Columndef('city', 'city', str), Columndef('fips_code', 'fips_code', str), - Columndef('geocoded_hospital_address', 'geocoded_hospital_address', str), + Columndef('geocoded_hospital_address', 'geocoded_hospital_address', Utils.limited_string_fn(32)), Columndef('hhs_ids', 'hhs_ids', str), Columndef('hospital_name', 'hospital_name', str), Columndef('hospital_subtype', 'hospital_subtype', str), From 158d8aa35ca7924a2ff2fe01a652839c35f7010d Mon Sep 17 00:00:00 2001 From: Kathryn M Mazaitis Date: Mon, 27 Feb 2023 11:43:59 -0500 Subject: [PATCH 3/5] [covid_hosp:f] Fix geocode acquisition and running time. * Switch to executemany * Add new limited_geocode datatype * Fix test prep missing from #1030 --- .../covid_hosp/facility/test_scenarios.py | 1 + src/acquisition/covid_hosp/common/database.py | 32 +++++++++++++++---- src/acquisition/covid_hosp/common/network.py | 4 +++ src/acquisition/covid_hosp/common/utils.py | 24 +++++++++++--- .../covid_hosp/facility/database.py | 2 +- .../covid_hosp/common/test_database.py | 10 ++++-- .../covid_hosp/common/test_utils.py | 4 +-- .../covid_hosp/facility/test_database.py | 11 +++++-- .../covid_hosp/state_daily/test_database.py | 4 +-- .../state_timeseries/test_database.py | 4 +-- 10 files changed, 71 insertions(+), 25 deletions(-) diff --git a/integrations/acquisition/covid_hosp/facility/test_scenarios.py b/integrations/acquisition/covid_hosp/facility/test_scenarios.py index 4c47d689e..aaa3c5e3b 100644 --- a/integrations/acquisition/covid_hosp/facility/test_scenarios.py +++ b/integrations/acquisition/covid_hosp/facility/test_scenarios.py @@ -38,6 +38,7 @@ def setUp(self): with Database.connect() as db: with db.new_cursor() as cur: cur.execute('truncate table covid_hosp_facility') + cur.execute('truncate table covid_hosp_facility_key') cur.execute('truncate table covid_hosp_meta') @freeze_time("2021-03-16") diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index c2b9530b0..1d98d22fa 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -186,21 +186,37 @@ def nan_safe_dtype(dtype, value): f'VALUES ({value_placeholders})' id_and_publication_date = (0, publication_date) if logger: - logger.info("updating values") + logger.info('updating values', count=len(dataframe.index)) + n = 0 + many_values = [] with self.new_cursor() as cursor: - for _, row in dataframe.iterrows(): + for index, row in dataframe.iterrows(): values = [] for c in dataframe_columns_and_types: values.append(nan_safe_dtype(c.dtype, row[c.csv_name])) - cursor.execute(sql, - id_and_publication_date + - tuple(values) + - tuple(i.csv_name for i in self.additional_fields)) + many_values.append(id_and_publication_date + + tuple(values) + + tuple(i.csv_name for i in self.additional_fields)) + n += 1 + # insert in batches because one at a time is slow and all at once makes + # the connection drop :( + if n % 5_000 == 0: + try: + cursor.executemany(sql, many_values) + many_values = [] + except Exception as e: + if logger: + logger.info('error on insert', index=index, values=values) + logger.error(e) + raise e + # insert final batch + if many_values: + cursor.executemany(sql, many_values) # deal with non/seldomly updated columns used like a fk table (if this database needs it) if hasattr(self, 'AGGREGATE_KEY_COLS'): if logger: - logger.info("updating keys") + logger.info('updating keys') ak_cols = self.AGGREGATE_KEY_COLS # restrict data to just the key columns and remove duplicate rows @@ -227,6 +243,8 @@ def nan_safe_dtype(dtype, value): ak_table = self.table_name + '_key' # assemble full SQL statement ak_insert_sql = f'INSERT INTO `{ak_table}` ({ak_cols_str}) VALUES ({values_str}) AS v ON DUPLICATE KEY UPDATE {ak_updates_str}' + if logger: + logger.info("database query", sql=ak_insert_sql) # commit the data with self.new_cursor() as cur: diff --git a/src/acquisition/covid_hosp/common/network.py b/src/acquisition/covid_hosp/common/network.py index 0482f4129..7b6228f16 100644 --- a/src/acquisition/covid_hosp/common/network.py +++ b/src/acquisition/covid_hosp/common/network.py @@ -13,6 +13,8 @@ def fetch_metadata_for_dataset(dataset_id, logger=False): ---------- dataset_id : str healthdata.gov dataset identifier of the dataset. + logger : structlog.Logger [optional; default False] + Logger to receive messages. Returns ------- @@ -39,6 +41,8 @@ def fetch_dataset(url, pandas_impl=pandas, logger=False): ---------- url : str URL to the dataset in CSV format. + logger : structlog.Logger [optional; default False] + Logger to receive messages. Returns ------- diff --git a/src/acquisition/covid_hosp/common/utils.py b/src/acquisition/covid_hosp/common/utils.py index db8918cb8..b1a9c6520 100644 --- a/src/acquisition/covid_hosp/common/utils.py +++ b/src/acquisition/covid_hosp/common/utils.py @@ -79,6 +79,17 @@ def limited_string(value): return value return limited_string + GEOCODE_LENGTH = 32 + GEOCODE_PATTERN = re.compile(r'POINT \(([0-9.-]*) ([0-9.-]*)\)') + def limited_geocode(value): + if len(value) < Utils.GEOCODE_LENGTH: + return value + # otherwise parse and reduce precision to 5 + m = Utils.GEOCODE_PATTERN.match(value) + if not m: + raise CovidHospException(f"Couldn't parse geocode '{value}'") + return f'POINT ({" ".join(map(lambda x: f"{float(x):.6f}", m.groups()))})' + def issues_to_fetch(metadata, newer_than, older_than, logger=False): """ Construct all issue dates and URLs to be ingested based on metadata. @@ -100,6 +111,7 @@ def issues_to_fetch(metadata, newer_than, older_than, logger=False): """ daily_issues = {} n_beyond = 0 + n_selected = 0 for index in sorted(set(metadata.index)): day = index.date() if day > newer_than and day < older_than: @@ -109,11 +121,13 @@ def issues_to_fetch(metadata, newer_than, older_than, logger=False): daily_issues[day] = urls_list else: daily_issues[day] += urls_list + n_selected += len(urls_list) elif day >= older_than: n_beyond += 1 - if n_beyond > 0: - if logger: - logger.info("issues available", on_or_newer=older_than, count=n_beyond) + if logger: + if n_beyond > 0: + logger.info("issues available beyond selection", on_or_newer=older_than, count=n_beyond) + logger.info("issues selected", newer_than=str(newer_than), older_than=str(older_than), count=n_selected) return daily_issues @staticmethod @@ -173,7 +187,7 @@ def update_dataset(database, network, newer_than=None, older_than=None): bool Whether a new dataset was acquired. """ - logger = get_structured_logger(f"{database.__class__.__module__}.{database.__class__.__name__}.update_dataset") + logger = get_structured_logger(f"{database.__module__}.{database.__name__}.update_dataset") metadata = network.fetch_metadata(logger=logger) datasets = [] @@ -189,7 +203,7 @@ def update_dataset(database, network, newer_than=None, older_than=None): for issue, revisions in daily_issues.items(): issue_int = int(issue.strftime("%Y%m%d")) # download the dataset and add it to the database - dataset = Utils.merge_by_key_cols([network.fetch_dataset(url) for url, _ in revisions], + dataset = Utils.merge_by_key_cols([network.fetch_dataset(url, logger=logger) for url, _ in revisions], db.KEY_COLS, logger=logger) # add metadata to the database diff --git a/src/acquisition/covid_hosp/facility/database.py b/src/acquisition/covid_hosp/facility/database.py index d84e53322..172f32dc4 100644 --- a/src/acquisition/covid_hosp/facility/database.py +++ b/src/acquisition/covid_hosp/facility/database.py @@ -40,7 +40,7 @@ class Database(BaseDatabase): Columndef('ccn', 'ccn', str), Columndef('city', 'city', str), Columndef('fips_code', 'fips_code', str), - Columndef('geocoded_hospital_address', 'geocoded_hospital_address', Utils.limited_string_fn(32)), + Columndef('geocoded_hospital_address', 'geocoded_hospital_address', Utils.limited_geocode), Columndef('hhs_ids', 'hhs_ids', str), Columndef('hospital_name', 'hospital_name', str), Columndef('hospital_subtype', 'hospital_subtype', str), diff --git a/tests/acquisition/covid_hosp/common/test_database.py b/tests/acquisition/covid_hosp/common/test_database.py index 09244dd2f..c070a00ae 100644 --- a/tests/acquisition/covid_hosp/common/test_database.py +++ b/tests/acquisition/covid_hosp/common/test_database.py @@ -144,9 +144,9 @@ def test_insert_dataset(self): result = database.insert_dataset(sentinel.publication_date, dataset) self.assertIsNone(result) - self.assertEqual(mock_cursor.execute.call_count, 6) + self.assertEqual(mock_cursor.executemany.call_count, 1) - actual_sql = mock_cursor.execute.call_args[0][0] + actual_sql = mock_cursor.executemany.call_args[0][0] self.assertIn( 'INSERT INTO `test_table` (`id`, `publication_date`, `sql_str_col`, `sql_int_col`, `sql_float_col`)', actual_sql) @@ -162,5 +162,9 @@ def test_insert_dataset(self): for i, expected in enumerate(expected_values): with self.subTest(name=f'row {i + 1}'): - actual = mock_cursor.execute.call_args_list[i][0][1] + # [0]: the first call() object + # [0]: get positional args out of the call() object + # [-1]: the last arg of the executemany call + # [i]: the ith row inserted in the executemany + actual = mock_cursor.executemany.call_args_list[0][0][-1][i] self.assertEqual(actual, (0, sentinel.publication_date) + expected) diff --git a/tests/acquisition/covid_hosp/common/test_utils.py b/tests/acquisition/covid_hosp/common/test_utils.py index 85dbd110c..fb0ad14fc 100644 --- a/tests/acquisition/covid_hosp/common/test_utils.py +++ b/tests/acquisition/covid_hosp/common/test_utils.py @@ -97,7 +97,7 @@ def test_run_skip_old_dataset(self): mock_network = MagicMock() mock_network.fetch_metadata.return_value = \ self.test_utils.load_sample_metadata() - mock_database = MagicMock() + mock_database = MagicMock(**{"__module__":"test_module", "__name__":"test_name"}) with mock_database.connect() as mock_connection: pass mock_connection.get_max_issue.return_value = pd.Timestamp("2200/1/1") @@ -117,7 +117,7 @@ def test_run_acquire_new_dataset(self): self.test_utils.load_sample_metadata() fake_dataset = pd.DataFrame({"date": [pd.Timestamp("2020/1/1")], "state": ["ca"]}) mock_network.fetch_dataset.return_value = fake_dataset - mock_database = MagicMock() + mock_database = MagicMock(**{"__module__":"test_module", "__name__":"test_name"}) with mock_database.connect() as mock_connection: pass type(mock_connection).KEY_COLS = PropertyMock(return_value=["state", "date"]) diff --git a/tests/acquisition/covid_hosp/facility/test_database.py b/tests/acquisition/covid_hosp/facility/test_database.py index 28872a6ac..2e1ee29fe 100644 --- a/tests/acquisition/covid_hosp/facility/test_database.py +++ b/tests/acquisition/covid_hosp/facility/test_database.py @@ -35,9 +35,14 @@ def test_insert_dataset(self): result = database.insert_dataset(sentinel.publication_date, dataset) self.assertIsNone(result) - self.assertEqual(mock_cursor.execute.call_count, 22) - - last_query_values = mock_cursor.execute.call_args[0][-1] + # once for the values, once for the keys + self.assertEqual(mock_cursor.executemany.call_count, 2) + + # [0]: the first call() object + # [0]: get the positional args out of the call() object + # [-1]: the last arg of the executemany call + # [-1]: the last row inserted in the executemany + last_query_values = mock_cursor.executemany.call_args_list[0][0][-1][-1] expected_query_values = ( 0, sentinel.publication_date, '450822', 20201130, '6800 N MACARTHUR BLVD', 61.1, 7, 428, 60.9, 7, 426, 61.1, 7, 428, diff --git a/tests/acquisition/covid_hosp/state_daily/test_database.py b/tests/acquisition/covid_hosp/state_daily/test_database.py index efa439669..95401d7cc 100644 --- a/tests/acquisition/covid_hosp/state_daily/test_database.py +++ b/tests/acquisition/covid_hosp/state_daily/test_database.py @@ -38,9 +38,9 @@ def test_insert_dataset(self): result = database.insert_dataset(sentinel.issue, dataset) self.assertIsNone(result) - self.assertEqual(mock_cursor.execute.call_count, 53) + self.assertEqual(mock_cursor.executemany.call_count, 1) - last_query_values = mock_cursor.execute.call_args[0][-1] + last_query_values = mock_cursor.executemany.call_args[0][-1][-1] expected_query_values = ( 0, sentinel.issue, 'WY', 20201209, 0.2519685039370078, 29, 127, 32, 0.4233576642335766, 31, 137, 58, 22, 2, diff --git a/tests/acquisition/covid_hosp/state_timeseries/test_database.py b/tests/acquisition/covid_hosp/state_timeseries/test_database.py index 2649f7b5f..24897d42d 100644 --- a/tests/acquisition/covid_hosp/state_timeseries/test_database.py +++ b/tests/acquisition/covid_hosp/state_timeseries/test_database.py @@ -36,9 +36,9 @@ def test_insert_dataset(self): result = database.insert_dataset(sentinel.issue, dataset) self.assertIsNone(result) - self.assertEqual(mock_cursor.execute.call_count, 22) + self.assertEqual(mock_cursor.executemany.call_count, 1) - last_query_values = mock_cursor.execute.call_args[0][-1] + last_query_values = mock_cursor.executemany.call_args[0][-1][-1] expected_query_values = ( 0, sentinel.issue, 'WY', 20200826, 0.0934579439252336, 26, 107, 10, 0.4298245614035088, 28, 114, 49, 19, 7, 2, None, 4, 2, 0, 1, '2', 0, 26, From 221e53dae6bef67c90c7f6d51d8d1ec72bdee6be Mon Sep 17 00:00:00 2001 From: Kathryn M Mazaitis Date: Fri, 3 Mar 2023 11:12:27 -0500 Subject: [PATCH 4/5] Move logger construction into Database base class --- src/acquisition/covid_hosp/common/database.py | 7 ++++++- src/acquisition/covid_hosp/common/utils.py | 3 +-- tests/acquisition/covid_hosp/common/test_utils.py | 4 ++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index 1d98d22fa..4f15b975f 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -11,6 +11,7 @@ # first party import delphi.operations.secrets as secrets +from delphi.epidata.acquisition.common.logger import get_structured_logger Columndef = namedtuple("Columndef", "csv_name sql_name dtype") @@ -53,6 +54,10 @@ def __init__(self, self.key_columns = key_columns if key_columns is not None else [] self.additional_fields = additional_fields if additional_fields is not None else [] + @classmethod + def logger(database_class): + return get_structured_logger(f"{database_class.__module__}") + @classmethod @contextmanager def connect(database_class, mysql_connector_impl=mysql.connector): @@ -269,5 +274,5 @@ def get_max_issue(self, logger=False): if result is not None: return pd.Timestamp(str(result)) if logger: - logger.info("get_max_issue", msg="no matching results in meta table; returning 1900/1/1 epoch") + logger.warn("get_max_issue", msg="no matching results in meta table; returning 1900/1/1 epoch") return pd.Timestamp("1900/1/1") diff --git a/src/acquisition/covid_hosp/common/utils.py b/src/acquisition/covid_hosp/common/utils.py index b1a9c6520..4c9e6172e 100644 --- a/src/acquisition/covid_hosp/common/utils.py +++ b/src/acquisition/covid_hosp/common/utils.py @@ -6,7 +6,6 @@ import pandas as pd -from delphi.epidata.acquisition.common.logger import get_structured_logger class CovidHospException(Exception): """Exception raised exclusively by `covid_hosp` utilities.""" @@ -187,7 +186,7 @@ def update_dataset(database, network, newer_than=None, older_than=None): bool Whether a new dataset was acquired. """ - logger = get_structured_logger(f"{database.__module__}.{database.__name__}.update_dataset") + logger = database.logger() metadata = network.fetch_metadata(logger=logger) datasets = [] diff --git a/tests/acquisition/covid_hosp/common/test_utils.py b/tests/acquisition/covid_hosp/common/test_utils.py index fb0ad14fc..85dbd110c 100644 --- a/tests/acquisition/covid_hosp/common/test_utils.py +++ b/tests/acquisition/covid_hosp/common/test_utils.py @@ -97,7 +97,7 @@ def test_run_skip_old_dataset(self): mock_network = MagicMock() mock_network.fetch_metadata.return_value = \ self.test_utils.load_sample_metadata() - mock_database = MagicMock(**{"__module__":"test_module", "__name__":"test_name"}) + mock_database = MagicMock() with mock_database.connect() as mock_connection: pass mock_connection.get_max_issue.return_value = pd.Timestamp("2200/1/1") @@ -117,7 +117,7 @@ def test_run_acquire_new_dataset(self): self.test_utils.load_sample_metadata() fake_dataset = pd.DataFrame({"date": [pd.Timestamp("2020/1/1")], "state": ["ca"]}) mock_network.fetch_dataset.return_value = fake_dataset - mock_database = MagicMock(**{"__module__":"test_module", "__name__":"test_name"}) + mock_database = MagicMock() with mock_database.connect() as mock_connection: pass type(mock_connection).KEY_COLS = PropertyMock(return_value=["state", "date"]) From 2544fa35b7ab4fb7336859adebeec278b2704357 Mon Sep 17 00:00:00 2001 From: Katie Mazaitis Date: Fri, 3 Mar 2023 11:13:00 -0500 Subject: [PATCH 5/5] Apply suggestions from code review Co-authored-by: melange396 --- src/acquisition/covid_hosp/common/database.py | 3 +-- src/acquisition/covid_hosp/common/utils.py | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index 1d98d22fa..19facd0ba 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -206,8 +206,7 @@ def nan_safe_dtype(dtype, value): many_values = [] except Exception as e: if logger: - logger.info('error on insert', index=index, values=values) - logger.error(e) + logger.error('error on insert', publ_date=publication_date, in_lines=(n-5_000, n), index=index, values=values, exception=e) raise e # insert final batch if many_values: diff --git a/src/acquisition/covid_hosp/common/utils.py b/src/acquisition/covid_hosp/common/utils.py index b1a9c6520..fb9c3fade 100644 --- a/src/acquisition/covid_hosp/common/utils.py +++ b/src/acquisition/covid_hosp/common/utils.py @@ -80,15 +80,15 @@ def limited_string(value): return limited_string GEOCODE_LENGTH = 32 - GEOCODE_PATTERN = re.compile(r'POINT \(([0-9.-]*) ([0-9.-]*)\)') + GEOCODE_PATTERN = re.compile(r'POINT \((-?[0-9.]+) (-?[0-9.]+)\)') def limited_geocode(value): if len(value) < Utils.GEOCODE_LENGTH: return value - # otherwise parse and reduce precision to 5 + # otherwise parse and set precision to 6 decimal places m = Utils.GEOCODE_PATTERN.match(value) if not m: raise CovidHospException(f"Couldn't parse geocode '{value}'") - return f'POINT ({" ".join(map(lambda x: f"{float(x):.6f}", m.groups()))})' + return f'POINT ({" ".join(f"{float(x):.6f}" for x in m.groups())})' def issues_to_fetch(metadata, newer_than, older_than, logger=False): """