diff --git a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py index 6058f6ca1..62f3e88ce 100644 --- a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py +++ b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py @@ -82,7 +82,7 @@ def test_caching(self): # make sure the live utility is serving something sensible cvc_database = live.Database() cvc_database.connect() - epidata1 = cvc_database.get_covidcast_meta() + epidata1 = cvc_database.compute_covidcast_meta() cvc_database.disconnect(False) self.assertEqual(len(epidata1),1) self.assertEqual(epidata1, [ @@ -94,11 +94,12 @@ def test_caching(self): 'min_time': 20200422, 'max_time': 20200422, 'num_locations': 2, + 'num_points': 2, 'last_update': 789, - 'min_value': 1, - 'max_value': 1, - 'mean_value': 1, - 'stdev_value': 0, + 'min_value': 1.0, + 'max_value': 1.0, + 'mean_value': 1.0, + 'stdev_value': 0.0, 'max_issue': 20200423, 'min_lag': 0, 'max_lag': 1, @@ -125,7 +126,7 @@ def test_caching(self): self.cur.execute(''' update covidcast_meta_cache set timestamp = UNIX_TIMESTAMP(NOW()), - epidata = '[{"hello": "world"}]' + epidata = '[{"data_source": "earth", "signal": "lols_per_capita", "time_type": "yearly", "geo_type": "country", "is_wip": 0}]' ''') self.cnx.commit() @@ -139,7 +140,7 @@ def test_caching(self): self.assertEqual(epidata4, { 'result': 1, 'epidata': [{ - 'hello': 'world', + 'data_source': 'earth', 'signal': 'lols_per_capita', 'time_type': 'yearly', 'geo_type': 'country', 'is_wip': 0 }], 'message': 'success', }) @@ -148,7 +149,7 @@ def test_caching(self): self.cur.execute(''' update covidcast_meta_cache set timestamp = UNIX_TIMESTAMP(NOW()) - 3600 * 2, - epidata = '[{"hello": "world"}]' + epidata = '[{"data_source": "earth", "signal": "lols_per_capita", "time_type": "yearly", "geo_type": "country", "is_wip": 0}]' ''') self.cnx.commit() diff --git a/integrations/client/test_delphi_epidata.py b/integrations/client/test_delphi_epidata.py index 7e5113602..a94437ab6 100644 --- a/integrations/client/test_delphi_epidata.py +++ b/integrations/client/test_delphi_epidata.py @@ -375,6 +375,7 @@ def test_covidcast_meta(self): 'min_time': 20200414, 'max_time': 20200415, 'num_locations': 1, + 'num_points': 2, 'min_value': 6.0, 'max_value': 7.0, 'mean_value': 6.5, diff --git a/integrations/server/test_covidcast_meta.py b/integrations/server/test_covidcast_meta.py index 298468b89..50ed70aee 100644 --- a/integrations/server/test_covidcast_meta.py +++ b/integrations/server/test_covidcast_meta.py @@ -62,6 +62,7 @@ def test_round_trip(self): 'min_time': 1, 'max_time': 2, 'num_locations': 2, + 'num_points': 4, 'min_value': 10, 'max_value': 20, 'mean_value': 15, @@ -230,6 +231,7 @@ def test_suppress_work_in_progress(self): 'min_time': 1, 'max_time': 2, 'num_locations': 2, + 'num_points': 4, 'min_value': 10, 'max_value': 20, 'mean_value': 15, diff --git a/src/acquisition/covidcast/covidcast_meta_cache_updater.py b/src/acquisition/covidcast/covidcast_meta_cache_updater.py index e28fbeb10..a713f80ac 100644 --- a/src/acquisition/covidcast/covidcast_meta_cache_updater.py +++ b/src/acquisition/covidcast/covidcast_meta_cache_updater.py @@ -26,7 +26,7 @@ def main(args, epidata_impl=Epidata, database_impl=Database): # fetch metadata try: - metadata = database.get_covidcast_meta() + metadata = database.compute_covidcast_meta() except: # clean up before failing database.disconnect(True) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index e9d895dcb..708b6a248 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -7,7 +7,7 @@ import json import mysql.connector import numpy as np -from math import ceil +from math import ceil, sqrt # first party import delphi.operations.secrets as secrets @@ -183,7 +183,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False # TODO: consider handling cc_rows as a generator instead of a list self._cursor.execute(create_tmp_table_sql) - + meta_cache = self.retrieve_covidcast_meta_cache() try: num_rows = len(cc_rows) @@ -213,10 +213,17 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False result = self._cursor.executemany(insert_into_tmp_sql, args) + + # use the temp table to compute meta just for new data, + # then merge with the global meta + meta_update = self.compute_covidcast_meta(table_name=tmp_table_name) + meta_cache = Database.merge_cache_dicts(meta_cache, meta_update) + self._cursor.execute(insert_or_update_sql) self._cursor.execute(zero_is_latest_issue_sql) self._cursor.execute(set_is_latest_issue_sql) self._cursor.execute(truncate_tmp_table_sql) + self.update_covidcast_meta_cache_from_dict(meta_cache) if result is None: # the SQL connector does not support returning number of rows affected @@ -261,16 +268,16 @@ def get_data_stdev_across_locations(self, max_day): self._cursor.execute(sql, args) return list(self._cursor) - def get_covidcast_meta(self): + def compute_covidcast_meta(self, table_name='covidcast'): """Compute and return metadata on all non-WIP COVIDcast signals.""" meta = [] signal_list = [] - sql = 'SELECT `source`, `signal` FROM `covidcast` GROUP BY `source`, `signal` ORDER BY `source` ASC, `signal` ASC;' + sql = f'SELECT `source`, `signal` FROM `{table_name}` GROUP BY `source`, `signal` ORDER BY `source` ASC, `signal` ASC;' self._cursor.execute(sql) for source, signal in list(self._cursor): # self._cursor is a generator; this lets us use the cursor for subsequent queries inside the loop - sql = "SELECT `is_wip` FROM `covidcast` WHERE `source`=%s AND `signal`=%s LIMIT 1" + sql = f"SELECT `is_wip` FROM `{table_name}` WHERE `source`=%s AND `signal`=%s LIMIT 1" self._cursor.execute(sql, (source, signal)) is_wip = int(self._cursor.fetchone()[0]) # casting to int as it comes out as a '0' or '1' bytearray; bool('0')==True :( if not is_wip: @@ -278,15 +285,16 @@ def get_covidcast_meta(self): for source, signal in signal_list: - sql = ''' + sql = f''' SELECT - t.`source` AS `data_source`, - t.`signal`, - t.`time_type`, - t.`geo_type`, - MIN(t.`time_value`) AS `min_time`, - MAX(t.`time_value`) AS `max_time`, - COUNT(DISTINCT t.`geo_value`) AS `num_locations`, + `source` AS `data_source`, + `signal`, + `time_type`, + `geo_type`, + MIN(`time_value`) AS `min_time`, + MAX(`time_value`) AS `max_time`, + COUNT(DISTINCT `geo_value`) AS `num_locations`, + COUNT(`value`) AS `num_points`, MIN(`value`) AS `min_value`, MAX(`value`) AS `max_value`, ROUND(AVG(`value`),7) AS `mean_value`, @@ -296,17 +304,17 @@ def get_covidcast_meta(self): MIN(`lag`) as `min_lag`, MAX(`lag`) as `max_lag` FROM - `covidcast` t + `{table_name}` WHERE `source` = %s AND `signal` = %s AND is_latest_issue = 1 GROUP BY - t.`time_type`, - t.`geo_type` + `time_type`, + `geo_type` ORDER BY - t.`time_type` ASC, - t.`geo_type` ASC + `time_type` ASC, + `geo_type` ASC ''' self._cursor.execute(sql, (source, signal)) meta.extend(list(dict(zip(self._cursor.column_names,x)) for x in self._cursor)) @@ -343,3 +351,44 @@ def retrieve_covidcast_meta_cache(self): for entry in cache: cache_hash[(entry['data_source'], entry['signal'], entry['time_type'], entry['geo_type'])] = entry return cache_hash + + def update_covidcast_meta_cache_from_dict(self, cache_hash): + """Updates the `covidcast_meta_cache` table from the dict version of the cache, ordered by key""" + + cache_list = [cache_hash[k] for k in sorted(cache_hash)] + self.update_covidcast_meta_cache(cache_list) + + @staticmethod + def merge_cache_dicts(base_cache, cache_update): + """merges two covidcast metadata caches (in dict form), modifying (and returning) the first""" + + for data_id in cache_update: + if data_id not in base_cache: + base_cache[data_id] = cache_update[data_id] + continue + + mainstats = base_cache[data_id].copy() # copy this entry so we can update the whole thing in one assignment + updtstats = cache_update[data_id] + + # combined stdev formula shamelessly stolen from: + # https://math.stackexchange.com/questions/2971315/how-do-i-combine-standard-deviations-of-two-groups + sx, xbar, n = [mainstats[i] for i in ('stdev_value', 'mean_value', 'num_points')] + sy, ybar, m = [updtstats[i] for i in ('stdev_value', 'mean_value', 'num_points')] + comb_mean = ( xbar*n + ybar*m ) / (n+m) + comb_sd = sqrt( + ( (n-1)*sx*sx + (m-1)*sy*sy ) / (n+m-1) + + + n*m*(xbar-ybar)**2 / ((n+m)*(n+m-1)) + ) + + mainstats['num_points'] = m + n + mainstats['stdev_value'] = comb_sd + mainstats['mean_value'] = comb_mean + for stat in ('max_lag', 'max_value', 'max_time', 'max_issue', 'last_update', 'num_locations'): + mainstats[stat] = max(mainstats[stat], updtstats[stat]) + for stat in ('min_lag', 'min_value', 'min_time'): + mainstats[stat] = min(mainstats[stat], updtstats[stat]) + + base_cache[data_id] = mainstats # write copy back to base_cache + + return base_cache diff --git a/src/ddl/covidcast.sql b/src/ddl/covidcast.sql index 192bf2c53..ac43c7c9d 100644 --- a/src/ddl/covidcast.sql +++ b/src/ddl/covidcast.sql @@ -138,4 +138,4 @@ CREATE TABLE `covidcast_meta_cache` ( `epidata` LONGTEXT NOT NULL, PRIMARY KEY (`timestamp`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -INSERT INTO covidcast_meta_cache VALUES (0, ''); +INSERT INTO covidcast_meta_cache VALUES (0, '[]'); diff --git a/tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py b/tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py index 71e043e6a..913ebaf9f 100644 --- a/tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py +++ b/tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py @@ -36,7 +36,7 @@ def test_main_successful(self): mock_epidata_impl = MagicMock() mock_epidata_impl.covidcast_meta.return_value = api_response mock_database = MagicMock() - mock_database.get_covidcast_meta.return_value=api_response['epidata'] + mock_database.compute_covidcast_meta.return_value=api_response['epidata'] fake_database_impl = lambda: mock_database main( @@ -64,9 +64,9 @@ def test_main_failure(self): args = None mock_database = MagicMock() - mock_database.get_covidcast_meta.return_value = list() + mock_database.compute_covidcast_meta.return_value = list() fake_database_impl = lambda: mock_database main(args, epidata_impl=None, database_impl=fake_database_impl) - self.assertTrue(mock_database.get_covidcast_meta.called) + self.assertTrue(mock_database.compute_covidcast_meta.called)