Skip to content

Doctor_visits patching code #1977

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doctor_visits/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,9 @@ The output will show the number of unit tests that passed and failed, along
with the percentage of code covered by the tests. None of the tests should
fail and the code lines that are not covered by unit tests should be small and
should not include critical sub-routines.

## Running Patches:
To get data issued during specific date range, output in batch issue format, adjust `params.json` in accordance with `patch.py`, then run
```
env/bin/python -m delphi_doctor_visits.patch
```
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ def change_date_format(name):
name = '_'.join(split_name)
return name

def download(ftp_credentials, out_path, logger):
def download(ftp_credentials, out_path, logger, issue_date=None):
"""Pull the latest raw files."""
current_time = datetime.datetime.now()
if not issue_date:
current_time = datetime.datetime.now()
else:
current_time = datetime.datetime.strptime(issue_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59)

Comment on lines +56 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (optional): since we're using this chunk in multiple places, consider factoring it out into a separate function. (2 duplicates is okay to leave, if we use this in a third place, we definitely should factor it out.)

logger.info("starting download", time=current_time)
seconds_in_day = 24 * 60 * 60

Expand Down
9 changes: 6 additions & 3 deletions doctor_visits/delphi_doctor_visits/get_latest_claims_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
import datetime
from pathlib import Path

def get_latest_filename(dir_path, logger):
def get_latest_filename(dir_path, logger, issue_date=None):
"""Get the latest filename from the list of downloaded raw files."""
current_date = datetime.datetime.now()
if issue_date:
current_date = datetime.datetime.strptime(issue_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59)
else:
current_date = datetime.datetime.now()
files = list(Path(dir_path).glob("*"))

latest_timestamp = datetime.datetime(1900, 1, 1)
Expand All @@ -24,7 +27,7 @@ def get_latest_filename(dir_path, logger):
latest_timestamp = timestamp
latest_filename = file

assert current_date.date() == latest_timestamp.date(), "no drop for today"
assert current_date.date() == latest_timestamp.date(), f"no drop for {current_date}"

logger.info("Latest claims file", filename=latest_filename)

Expand Down
71 changes: 71 additions & 0 deletions doctor_visits/delphi_doctor_visits/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
This module is used for patching data in the delphi_doctor_visits package.

To use this module, you need to specify the range of issue dates in params.json, like so:

{
"common": {
...
},
"validation": {
...
},
"patch": {
"patch_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/doctor_visits/AprilPatch",
"start_issue": "2024-04-20",
"end_issue": "2024-04-21"
}
}

It will generate data for that range of issue dates, and store them in batch issue format:
[name-of-patch]/issue_[issue-date]/doctor-visits/actual_data_file.csv
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: add example output

Suggested change
[name-of-patch]/issue_[issue-date]/doctor-visits/actual_data_file.csv
[name-of-patch]/issue_[issue-date]/doctor-visits/actual_data_file.csv
The run using the above example `params.json` will create a directory 'covidcast-indicators/doctor_visits/AprilPatch' containing:
AprilPatch/
issue_20240420/
doctor-visits/
20240408_state_smoothed_adj_cli.csv
20240408_state_smoothed_cli.csv
...
issue_20240421/
doctor-visits/
20240409_state_smoothed_adj_cli.csv
20240408_state_smoothed_cli.csv
...

"""

from datetime import datetime, timedelta
from os import makedirs

from delphi_utils import get_structured_logger, read_params

from .run import run_module


def patch():
"""
Run the doctor visits indicator for a range of issue dates.

The range of issue dates is specified in params.json using the following keys:
- "patch": Only used for patching data
- "start_date": str, YYYY-MM-DD format, first issue date
- "end_date": str, YYYY-MM-DD format, last issue date
- "patch_dir": str, directory to write all issues output
"""
params = read_params()
logger = get_structured_logger("delphi_doctor_visits.patch", filename=params["common"]["log_filename"])

start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")

logger.info(f"""Start patching {params["patch"]["patch_dir"]}""")
logger.info(f"""Start issue: {start_issue.strftime("%Y-%m-%d")}""")
logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""")

makedirs(params["patch"]["patch_dir"], exist_ok=True)

current_issue = start_issue

while current_issue <= end_issue:
logger.info(f"""Running issue {current_issue.strftime("%Y-%m-%d")}""")

params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d")

current_issue_yyyymmdd = current_issue.strftime("%Y%m%d")
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/doctor-visits"""
makedirs(f"{current_issue_dir}", exist_ok=True)
params["common"]["export_dir"] = f"""{current_issue_dir}"""

run_module(params, logger)
current_issue += timedelta(days=1)


if __name__ == "__main__":
patch()
22 changes: 15 additions & 7 deletions doctor_visits/delphi_doctor_visits/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .get_latest_claims_name import get_latest_filename


def run_module(params): # pylint: disable=too-many-statements
def run_module(params, logger=None): # pylint: disable=too-many-statements
"""
Run doctor visits indicator.

Expand All @@ -42,18 +42,26 @@ def run_module(params): # pylint: disable=too-many-statements
- "se": bool, whether to write out standard errors
- "obfuscated_prefix": str, prefix for signal name if write_se is True.
- "parallel": bool, whether to update sensor in parallel.
- "patch": Only used for patching data, remove if not patching.
Check out patch.py and README for more details on how to run patches.
- "start_date": str, YYYY-MM-DD format, first issue date
- "end_date": str, YYYY-MM-DD format, last issue date
- "patch_dir": str, directory to write all issues output
"""
start_time = time.time()
logger = get_structured_logger(
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
issue_date = params.get("patch", {}).get("current_issue", None)
if not logger:
logger = get_structured_logger(
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)

# pull latest data
download(params["indicator"]["ftp_credentials"],
params["indicator"]["input_dir"], logger)
download(params["indicator"]["ftp_credentials"], params["indicator"]["input_dir"], logger, issue_date=issue_date)

# find the latest files (these have timestamps)
claims_file = get_latest_filename(params["indicator"]["input_dir"], logger)
claims_file = get_latest_filename(params["indicator"]["input_dir"], logger, issue_date=issue_date)

# modify data
modify_and_write(claims_file, logger)
Expand Down
28 changes: 28 additions & 0 deletions doctor_visits/tests/test_download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import unittest
from unittest.mock import patch, MagicMock
from delphi_doctor_visits.download_claims_ftp_files import download

class TestDownload(unittest.TestCase):
@patch('delphi_doctor_visits.download_claims_ftp_files.paramiko.SSHClient')
@patch('delphi_doctor_visits.download_claims_ftp_files.path.exists', return_value=False)
def test_download(self, mock_exists, mock_sshclient):
mock_sshclient_instance = MagicMock()
mock_sshclient.return_value = mock_sshclient_instance
mock_sftp = MagicMock()
mock_sshclient_instance.open_sftp.return_value = mock_sftp
mock_sftp.listdir_attr.return_value = [MagicMock(filename="SYNEDI_AGG_OUTPATIENT_20200207_1455CDT.csv.gz")]
ftp_credentials = {"host": "test_host", "user": "test_user", "pass": "test_pass", "port": "test_port"}
out_path = "./test_data/"
logger = MagicMock()

#case 1: download with issue_date that does not exist on ftp server
download(ftp_credentials, out_path, logger, issue_date="2020-02-08")
mock_sshclient_instance.connect.assert_called_once_with(ftp_credentials["host"], username=ftp_credentials["user"], password=ftp_credentials["pass"], port=ftp_credentials["port"])
mock_sftp.get.assert_not_called()

# case 2: download with issue_date that exists on ftp server
download(ftp_credentials, out_path, logger, issue_date="2020-02-07")
mock_sftp.get.assert_called()

if __name__ == '__main__':
unittest.main()
11 changes: 7 additions & 4 deletions doctor_visits/tests/test_get_latest_claims_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

class TestGetLatestFileName:
logger = Mock()

dir_path = "test_data"

def test_get_latest_claims_name(self):
dir_path = "./test_data/"

with pytest.raises(AssertionError):
get_latest_filename(dir_path, self.logger)
get_latest_filename(self.dir_path, self.logger)

def test_get_latest_claims_name_with_issue_date(self):
result = get_latest_filename(self.dir_path, self.logger, issue_date="2020-02-07")
assert str(result) == f"{self.dir_path}/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz"
37 changes: 37 additions & 0 deletions doctor_visits/tests/test_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import unittest
from unittest.mock import patch as mock_patch, call
from delphi_doctor_visits.patch import patch
import os
import shutil

class TestPatchModule(unittest.TestCase):
def test_patch(self):
with mock_patch('delphi_doctor_visits.patch.run_module') as mock_run_module, \
mock_patch('delphi_doctor_visits.patch.get_structured_logger') as mock_get_structured_logger, \
mock_patch('delphi_doctor_visits.patch.read_params') as mock_read_params:

mock_read_params.return_value = {
"common": {
"log_filename": "test.log"
},
"patch": {
"start_issue": "2021-01-01",
"end_issue": "2021-01-02",
"patch_dir": "./patch_dir"
}
}

patch()

self.assertIn('current_issue', mock_read_params.return_value['patch'])
self.assertEqual(mock_read_params.return_value['patch']['current_issue'], '2021-01-02')

self.assertTrue(os.path.isdir('./patch_dir'))
self.assertTrue(os.path.isdir('./patch_dir/issue_20210101/doctor-visits'))
self.assertTrue(os.path.isdir('./patch_dir/issue_20210102/doctor-visits'))

# Clean up the created directories after the test
shutil.rmtree(mock_read_params.return_value["patch"]["patch_dir"])

if __name__ == '__main__':
unittest.main()
Loading