Skip to content

Remove temp cache table #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def test_caching(self):
# make sure the live utility is serving something sensible
cvc_database = live.Database()
cvc_database.connect()
epidata1 = cvc_database.get_covidcast_meta()
epidata1 = cvc_database.compute_covidcast_meta()
cvc_database.disconnect(False)
self.assertEqual(len(epidata1),1)
self.assertEqual(epidata1, [
Expand All @@ -94,11 +94,12 @@ def test_caching(self):
'min_time': 20200422,
'max_time': 20200422,
'num_locations': 2,
'num_points': 2,
'last_update': 789,
'min_value': 1,
'max_value': 1,
'mean_value': 1,
'stdev_value': 0,
'min_value': 1.0,
'max_value': 1.0,
'mean_value': 1.0,
'stdev_value': 0.0,
'max_issue': 20200423,
'min_lag': 0,
'max_lag': 1,
Expand All @@ -125,7 +126,7 @@ def test_caching(self):
self.cur.execute('''
update covidcast_meta_cache set
timestamp = UNIX_TIMESTAMP(NOW()),
epidata = '[{"hello": "world"}]'
epidata = '[{"data_source": "earth", "signal": "lols_per_capita", "time_type": "yearly", "geo_type": "country", "is_wip": 0}]'
''')
self.cnx.commit()

Expand All @@ -139,7 +140,7 @@ def test_caching(self):
self.assertEqual(epidata4, {
'result': 1,
'epidata': [{
'hello': 'world',
'data_source': 'earth', 'signal': 'lols_per_capita', 'time_type': 'yearly', 'geo_type': 'country', 'is_wip': 0
}],
'message': 'success',
})
Expand All @@ -148,7 +149,7 @@ def test_caching(self):
self.cur.execute('''
update covidcast_meta_cache set
timestamp = UNIX_TIMESTAMP(NOW()) - 3600 * 2,
epidata = '[{"hello": "world"}]'
epidata = '[{"data_source": "earth", "signal": "lols_per_capita", "time_type": "yearly", "geo_type": "country", "is_wip": 0}]'
''')
self.cnx.commit()

Expand Down
1 change: 1 addition & 0 deletions integrations/client/test_delphi_epidata.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ def test_covidcast_meta(self):
'min_time': 20200414,
'max_time': 20200415,
'num_locations': 1,
'num_points': 2,
'min_value': 6.0,
'max_value': 7.0,
'mean_value': 6.5,
Expand Down
2 changes: 2 additions & 0 deletions integrations/server/test_covidcast_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def test_round_trip(self):
'min_time': 1,
'max_time': 2,
'num_locations': 2,
'num_points': 4,
'min_value': 10,
'max_value': 20,
'mean_value': 15,
Expand Down Expand Up @@ -230,6 +231,7 @@ def test_suppress_work_in_progress(self):
'min_time': 1,
'max_time': 2,
'num_locations': 2,
'num_points': 4,
'min_value': 10,
'max_value': 20,
'mean_value': 15,
Expand Down
2 changes: 1 addition & 1 deletion src/acquisition/covidcast/covidcast_meta_cache_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def main(args, epidata_impl=Epidata, database_impl=Database):

# fetch metadata
try:
metadata = database.get_covidcast_meta()
metadata = database.compute_covidcast_meta()
except:
# clean up before failing
database.disconnect(True)
Expand Down
85 changes: 67 additions & 18 deletions src/acquisition/covidcast/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import json
import mysql.connector
import numpy as np
from math import ceil
from math import ceil, sqrt

# first party
import delphi.operations.secrets as secrets
Expand Down Expand Up @@ -183,7 +183,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False

# TODO: consider handling cc_rows as a generator instead of a list
self._cursor.execute(create_tmp_table_sql)

meta_cache = self.retrieve_covidcast_meta_cache()
try:

num_rows = len(cc_rows)
Expand Down Expand Up @@ -213,10 +213,17 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False


result = self._cursor.executemany(insert_into_tmp_sql, args)

# use the temp table to compute meta just for new data,
# then merge with the global meta
meta_update = self.compute_covidcast_meta(table_name=tmp_table_name)
meta_cache = Database.merge_cache_dicts(meta_cache, meta_update)

self._cursor.execute(insert_or_update_sql)
self._cursor.execute(zero_is_latest_issue_sql)
self._cursor.execute(set_is_latest_issue_sql)
self._cursor.execute(truncate_tmp_table_sql)
self.update_covidcast_meta_cache_from_dict(meta_cache)

if result is None:
# the SQL connector does not support returning number of rows affected
Expand Down Expand Up @@ -261,32 +268,33 @@ def get_data_stdev_across_locations(self, max_day):
self._cursor.execute(sql, args)
return list(self._cursor)

def get_covidcast_meta(self):
def compute_covidcast_meta(self, table_name='covidcast'):
"""Compute and return metadata on all non-WIP COVIDcast signals."""

meta = []

signal_list = []
sql = 'SELECT `source`, `signal` FROM `covidcast` GROUP BY `source`, `signal` ORDER BY `source` ASC, `signal` ASC;'
sql = f'SELECT `source`, `signal` FROM `{table_name}` GROUP BY `source`, `signal` ORDER BY `source` ASC, `signal` ASC;'
self._cursor.execute(sql)
for source, signal in list(self._cursor): # self._cursor is a generator; this lets us use the cursor for subsequent queries inside the loop
sql = "SELECT `is_wip` FROM `covidcast` WHERE `source`=%s AND `signal`=%s LIMIT 1"
sql = f"SELECT `is_wip` FROM `{table_name}` WHERE `source`=%s AND `signal`=%s LIMIT 1"
self._cursor.execute(sql, (source, signal))
is_wip = int(self._cursor.fetchone()[0]) # casting to int as it comes out as a '0' or '1' bytearray; bool('0')==True :(
if not is_wip:
signal_list.append((source, signal))

for source, signal in signal_list:

sql = '''
sql = f'''
SELECT
t.`source` AS `data_source`,
t.`signal`,
t.`time_type`,
t.`geo_type`,
MIN(t.`time_value`) AS `min_time`,
MAX(t.`time_value`) AS `max_time`,
COUNT(DISTINCT t.`geo_value`) AS `num_locations`,
`source` AS `data_source`,
`signal`,
`time_type`,
`geo_type`,
MIN(`time_value`) AS `min_time`,
MAX(`time_value`) AS `max_time`,
COUNT(DISTINCT `geo_value`) AS `num_locations`,
COUNT(`value`) AS `num_points`,
MIN(`value`) AS `min_value`,
MAX(`value`) AS `max_value`,
ROUND(AVG(`value`),7) AS `mean_value`,
Expand All @@ -296,17 +304,17 @@ def get_covidcast_meta(self):
MIN(`lag`) as `min_lag`,
MAX(`lag`) as `max_lag`
FROM
`covidcast` t
`{table_name}`
WHERE
`source` = %s AND
`signal` = %s AND
is_latest_issue = 1
GROUP BY
t.`time_type`,
t.`geo_type`
`time_type`,
`geo_type`
ORDER BY
t.`time_type` ASC,
t.`geo_type` ASC
`time_type` ASC,
`geo_type` ASC
'''
self._cursor.execute(sql, (source, signal))
meta.extend(list(dict(zip(self._cursor.column_names,x)) for x in self._cursor))
Expand Down Expand Up @@ -343,3 +351,44 @@ def retrieve_covidcast_meta_cache(self):
for entry in cache:
cache_hash[(entry['data_source'], entry['signal'], entry['time_type'], entry['geo_type'])] = entry
return cache_hash

def update_covidcast_meta_cache_from_dict(self, cache_hash):
"""Updates the `covidcast_meta_cache` table from the dict version of the cache, ordered by key"""

cache_list = [cache_hash[k] for k in sorted(cache_hash)]
self.update_covidcast_meta_cache(cache_list)

@staticmethod
def merge_cache_dicts(base_cache, cache_update):
"""merges two covidcast metadata caches (in dict form), modifying (and returning) the first"""

for data_id in cache_update:
if data_id not in base_cache:
base_cache[data_id] = cache_update[data_id]
continue

mainstats = base_cache[data_id].copy() # copy this entry so we can update the whole thing in one assignment
updtstats = cache_update[data_id]

# combined stdev formula shamelessly stolen from:
# https://math.stackexchange.com/questions/2971315/how-do-i-combine-standard-deviations-of-two-groups
sx, xbar, n = [mainstats[i] for i in ('stdev_value', 'mean_value', 'num_points')]
sy, ybar, m = [updtstats[i] for i in ('stdev_value', 'mean_value', 'num_points')]
comb_mean = ( xbar*n + ybar*m ) / (n+m)
comb_sd = sqrt(
( (n-1)*sx*sx + (m-1)*sy*sy ) / (n+m-1)
+
n*m*(xbar-ybar)**2 / ((n+m)*(n+m-1))
)

mainstats['num_points'] = m + n
mainstats['stdev_value'] = comb_sd
mainstats['mean_value'] = comb_mean
for stat in ('max_lag', 'max_value', 'max_time', 'max_issue', 'last_update', 'num_locations'):
mainstats[stat] = max(mainstats[stat], updtstats[stat])
for stat in ('min_lag', 'min_value', 'min_time'):
mainstats[stat] = min(mainstats[stat], updtstats[stat])

base_cache[data_id] = mainstats # write copy back to base_cache

return base_cache
2 changes: 1 addition & 1 deletion src/ddl/covidcast.sql
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,4 @@ CREATE TABLE `covidcast_meta_cache` (
`epidata` LONGTEXT NOT NULL,
PRIMARY KEY (`timestamp`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO covidcast_meta_cache VALUES (0, '');
INSERT INTO covidcast_meta_cache VALUES (0, '[]');
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_main_successful(self):
mock_epidata_impl = MagicMock()
mock_epidata_impl.covidcast_meta.return_value = api_response
mock_database = MagicMock()
mock_database.get_covidcast_meta.return_value=api_response['epidata']
mock_database.compute_covidcast_meta.return_value=api_response['epidata']
fake_database_impl = lambda: mock_database

main(
Expand Down Expand Up @@ -64,9 +64,9 @@ def test_main_failure(self):

args = None
mock_database = MagicMock()
mock_database.get_covidcast_meta.return_value = list()
mock_database.compute_covidcast_meta.return_value = list()
fake_database_impl = lambda: mock_database

main(args, epidata_impl=None, database_impl=fake_database_impl)

self.assertTrue(mock_database.get_covidcast_meta.called)
self.assertTrue(mock_database.compute_covidcast_meta.called)