Skip to content

Commit fb96244

Browse files
Merge branch '1678-awswranglerathenaread_sql_query-is-leaking-memory' of github.com:aws/aws-sdk-pandas into 1678-awswranglerathenaread_sql_query-is-leaking-memory
2 parents 1d68a70 + 274ed6a commit fb96244

18 files changed

+153
-48
lines changed

awswrangler/athena/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
get_named_query_statement,
1010
get_query_columns_types,
1111
get_query_execution,
12+
get_query_executions,
1213
get_work_group,
14+
list_query_executions,
1315
repair_table,
1416
show_create_table,
1517
start_query_execution,
@@ -24,10 +26,12 @@
2426
"describe_table",
2527
"get_query_columns_types",
2628
"get_query_execution",
29+
"get_query_executions",
2730
"get_query_results",
2831
"get_named_query_statement",
2932
"get_work_group",
3033
"generate_create_query",
34+
"list_query_executions",
3135
"repair_table",
3236
"create_ctas_table",
3337
"show_create_table",

awswrangler/athena/_utils.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,3 +1144,104 @@ def get_query_execution(query_execution_id: str, boto3_session: Optional[boto3.S
11441144
QueryExecutionId=query_execution_id,
11451145
)
11461146
return cast(Dict[str, Any], response["QueryExecution"])
1147+
1148+
1149+
def get_query_executions(
1150+
query_execution_ids: List[str], return_unprocessed: bool = False, boto3_session: Optional[boto3.Session] = None
1151+
) -> Union[Tuple[pd.DataFrame, pd.DataFrame], pd.DataFrame]:
1152+
"""From specified query execution IDs, return a DataFrame of query execution details.
1153+
1154+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.batch_get_query_execution
1155+
1156+
Parameters
1157+
----------
1158+
query_execution_ids : List[str]
1159+
Athena query execution IDs.
1160+
return_unprocessed: bool.
1161+
True to also return query executions id that are unable to be processed.
1162+
False to only return DataFrame of query execution details.
1163+
Default is False
1164+
boto3_session : boto3.Session(), optional
1165+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
1166+
1167+
Returns
1168+
-------
1169+
DataFrame
1170+
DataFrame contain information about query execution details.
1171+
1172+
DataFrame
1173+
DataFrame contain information about unprocessed query execution ids.
1174+
1175+
Examples
1176+
--------
1177+
>>> import awswrangler as wr
1178+
>>> query_executions_df, unprocessed_query_executions_df = wr.athena.get_query_executions(
1179+
query_execution_ids=['query-execution-id','query-execution-id1']
1180+
)
1181+
"""
1182+
chunked_size: int = 50
1183+
query_executions: List[Dict[str, Any]] = []
1184+
unprocessed_query_execution: List[Dict[str, str]] = []
1185+
client_athena: boto3.client = _utils.client(service_name="athena", session=boto3_session)
1186+
for i in range(0, len(query_execution_ids), chunked_size):
1187+
response = client_athena.batch_get_query_execution(QueryExecutionIds=query_execution_ids[i : i + chunked_size])
1188+
query_executions += response["QueryExecutions"]
1189+
unprocessed_query_execution += response["UnprocessedQueryExecutionIds"]
1190+
if unprocessed_query_execution and not return_unprocessed:
1191+
_logger.warning(
1192+
"Some of query execution ids are unable to be processed."
1193+
"Set return_unprocessed to True to get unprocessed query execution ids"
1194+
)
1195+
if return_unprocessed:
1196+
return pd.json_normalize(query_executions), pd.json_normalize(unprocessed_query_execution)
1197+
return pd.json_normalize(query_executions)
1198+
1199+
1200+
def list_query_executions(workgroup: Optional[str] = None, boto3_session: Optional[boto3.Session] = None) -> List[str]:
1201+
"""Fetch list query execution IDs ran in specified workgroup or primary work group if not specified.
1202+
1203+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.list_query_executions
1204+
1205+
Parameters
1206+
----------
1207+
workgroup : str
1208+
The name of the workgroup from which the query_id are being returned.
1209+
If not specified, a list of available query execution IDs for the queries in the primary workgroup is returned.
1210+
boto3_session : boto3.Session(), optional
1211+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
1212+
1213+
Returns
1214+
-------
1215+
List[str]
1216+
List of query execution IDs.
1217+
1218+
Examples
1219+
--------
1220+
>>> import awswrangler as wr
1221+
>>> res = wr.athena.list_query_executions(workgroup='workgroup-name')
1222+
1223+
"""
1224+
client_athena: boto3.client = _utils.client(service_name="athena", session=boto3_session)
1225+
kwargs: Dict[str, Any] = {"base": 1}
1226+
if workgroup:
1227+
kwargs["WorkGroup"] = workgroup
1228+
query_list: List[str] = []
1229+
response: Dict[str, Any] = _utils.try_it(
1230+
f=client_athena.list_query_executions,
1231+
ex=botocore.exceptions.ClientError,
1232+
ex_code="ThrottlingException",
1233+
max_num_tries=5,
1234+
**kwargs,
1235+
)
1236+
query_list += response["QueryExecutionIds"]
1237+
while "NextToken" in response:
1238+
kwargs["NextToken"] = response["NextToken"]
1239+
response = _utils.try_it(
1240+
f=client_athena.list_query_executions,
1241+
ex=botocore.exceptions.ClientError,
1242+
ex_code="ThrottlingException",
1243+
max_num_tries=5,
1244+
**kwargs,
1245+
)
1246+
query_list += response["QueryExecutionIds"]
1247+
return query_list

docs/source/api.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,11 @@ Amazon Athena
119119
generate_create_query
120120
get_query_columns_types
121121
get_query_execution
122+
get_query_executions
122123
get_query_results
123124
get_named_query_statement
124125
get_work_group
126+
list_query_executions
125127
read_sql_query
126128
read_sql_table
127129
repair_table

tests/test_athena.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,3 +1233,22 @@ def test_athena_generate_create_query(path, glue_database, glue_table):
12331233
)
12341234
wr.athena.start_query_execution(sql=query, database=glue_database, wait=True)
12351235
assert query == wr.athena.generate_create_query(database=glue_database, table=glue_table)
1236+
1237+
1238+
def test_get_query_execution(workgroup0, workgroup1):
1239+
query_execution_ids = wr.athena.list_query_executions(workgroup=workgroup0) + wr.athena.list_query_executions(
1240+
workgroup=workgroup1
1241+
)
1242+
assert query_execution_ids
1243+
query_execution_detail = wr.athena.get_query_execution(query_execution_id=query_execution_ids[0])
1244+
query_executions_df = wr.athena.get_query_executions(query_execution_ids)
1245+
assert isinstance(query_executions_df, pd.DataFrame)
1246+
assert isinstance(query_execution_detail, dict)
1247+
assert set(query_execution_ids).intersection(set(query_executions_df["QueryExecutionId"].values.tolist()))
1248+
query_execution_ids1 = query_execution_ids + ["aaa", "bbb"]
1249+
query_executions_df, unprocessed_query_executions_df = wr.athena.get_query_executions(
1250+
query_execution_ids1, return_unprocessed=True
1251+
)
1252+
assert isinstance(unprocessed_query_executions_df, pd.DataFrame)
1253+
assert set(query_execution_ids).intersection(set(query_executions_df["QueryExecutionId"].values.tolist()))
1254+
assert {"aaa", "bbb"}.intersection(set(unprocessed_query_executions_df["QueryExecutionId"].values.tolist()))

tutorials/006 - Amazon Athena.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@
143143
" mode=\"overwrite\",\n",
144144
" database=\"awswrangler_test\",\n",
145145
" table=\"noaa\"\n",
146-
");"
146+
")"
147147
]
148148
},
149149
{

tutorials/007 - Redshift, MySQL, PostgreSQL, SQL Server, Oracle.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"\n",
99
"# 7 - Redshift, MySQL, PostgreSQL, SQL Server and Oracle\n",
1010
"\n",
11-
"[awswrangler](https://github.com/aws/aws-sdk-pandas)'s Redshift, MySQL and PostgreSQL have two basic function in common that tries to follow the Pandas conventions, but add more data type consistency.\n",
11+
"[awswrangler](https://github.com/aws/aws-sdk-pandas)'s Redshift, MySQL and PostgreSQL have two basic functions in common that try to follow Pandas conventions, but add more data type consistency.\n",
1212
"\n",
1313
"- [wr.redshift.to_sql()](https://aws-sdk-pandas.readthedocs.io/en/2.17.0/stubs/awswrangler.redshift.to_sql.html)\n",
1414
"- [wr.redshift.read_sql_query()](https://aws-sdk-pandas.readthedocs.io/en/2.17.0/stubs/awswrangler.redshift.read_sql_query.html)\n",

tutorials/014 - Schema Evolution.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"\n",
99
"# 14 - Schema Evolution\n",
1010
"\n",
11-
"awswrangler support new **columns** on Parquet and CSV datasets through:\n",
11+
"awswrangler supports new **columns** on Parquet and CSV datasets through:\n",
1212
"\n",
1313
"- [wr.s3.to_parquet()](https://aws-sdk-pandas.readthedocs.io/en/2.17.0/stubs/awswrangler.s3.to_parquet.html#awswrangler.s3.to_parquet)\n",
1414
"- [wr.s3.store_parquet_metadata()](https://aws-sdk-pandas.readthedocs.io/en/2.17.0/stubs/awswrangler.s3.store_parquet_metadata.html#awswrangler.s3.store_parquet_metadata) i.e. \"Crawler\"\n",

tutorials/015 - EMR.ipynb

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,13 +160,6 @@
160160
"source": [
161161
"wr.emr.terminate_cluster(cluster_id)"
162162
]
163-
},
164-
{
165-
"cell_type": "code",
166-
"execution_count": null,
167-
"metadata": {},
168-
"outputs": [],
169-
"source": []
170163
}
171164
],
172165
"metadata": {

tutorials/016 - EMR & Docker.ipynb

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@
201201
"print(f\"awswrangler version: {wr.__version__}\")\n",
202202
"\"\"\"\n",
203203
"\n",
204-
"boto3.client(\"s3\").put_object(Body=script, Bucket=bucket, Key=\"test_docker.py\");"
204+
"boto3.client(\"s3\").put_object(Body=script, Bucket=bucket, Key=\"test_docker.py\")"
205205
]
206206
},
207207
{
@@ -329,13 +329,6 @@
329329
"\n",
330330
"wr.emr.terminate_cluster(cluster_id)"
331331
]
332-
},
333-
{
334-
"cell_type": "code",
335-
"execution_count": null,
336-
"metadata": {},
337-
"outputs": [],
338-
"source": []
339332
}
340333
],
341334
"metadata": {

tutorials/017 - Partition Projection.ipynb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@
159159
" \"month\": \"1,12\",\n",
160160
" \"day\": \"1,31\"\n",
161161
" },\n",
162-
");"
162+
")"
163163
]
164164
},
165165
{
@@ -334,7 +334,7 @@
334334
" projection_values={\n",
335335
" \"city\": \"São Paulo,Tokio,Seattle\"\n",
336336
" },\n",
337-
");"
337+
")"
338338
]
339339
},
340340
{
@@ -511,7 +511,7 @@
511511
" \"dt\": \"2020-01-01,2020-01-03\",\n",
512512
" \"ts\": \"2020-01-01 00:00:00,2020-01-01 00:00:02\"\n",
513513
" },\n",
514-
");"
514+
")"
515515
]
516516
},
517517
{
@@ -679,7 +679,7 @@
679679
" projection_types={\n",
680680
" \"uuid\": \"injected\",\n",
681681
" }\n",
682-
");"
682+
")"
683683
]
684684
},
685685
{

tutorials/018 - QuickSight.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"* [Exploring the public AWS COVID-19 data lake](https://aws.amazon.com/blogs/big-data/exploring-the-public-aws-covid-19-data-lake/)\n",
1717
"* [CloudFormation template](https://covid19-lake.s3.us-east-2.amazonaws.com/cfn/CovidLakeStack.template.json)\n",
1818
"\n",
19-
"*Please, install the Cloudformation template above to have access to the public data lake.*\n",
19+
"*Please, install the CloudFormation template above to have access to the public data lake.*\n",
2020
"\n",
2121
"*P.S. To be able to access the public data lake, you must allow explicitly QuickSight to access the related external bucket.*"
2222
]

tutorials/019 - Athena Cache.ipynb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
"\n",
99
"# 19 - Amazon Athena Cache\n",
1010
"\n",
11-
"[awswrangler](https://github.com/aws/aws-sdk-pandas) has a cache strategy that is disabled by default and can be enabled passing `max_cache_seconds` biggier than 0. This cache strategy for Amazon Athena can help you to **decrease query times and costs**.\n",
11+
"[awswrangler](https://github.com/aws/aws-sdk-pandas) has a cache strategy that is disabled by default and can be enabled by passing `max_cache_seconds` bigger than 0. This cache strategy for Amazon Athena can help you to **decrease query times and costs**.\n",
1212
"\n",
1313
"When calling `read_sql_query`, instead of just running the query, we now can verify if the query has been run before. If so, and this last run was within `max_cache_seconds` (a new parameter to `read_sql_query`), we return the same results as last time if they are still available in S3. We have seen this increase performance more than 100x, but the potential is pretty much infinite.\n",
1414
"\n",
1515
"The detailed approach is:\n",
1616
"- When `read_sql_query` is called with `max_cache_seconds > 0` (it defaults to 0), we check for the last queries run by the same workgroup (the most we can get without pagination).\n",
17-
"- By default it will check the last 50 queries, but you can customize it throught the `max_cache_query_inspections` argument.\n",
17+
"- By default it will check the last 50 queries, but you can customize it through the `max_cache_query_inspections` argument.\n",
1818
"- We then sort those queries based on CompletionDateTime, descending\n",
1919
"- For each of those queries, we check if their CompletionDateTime is still within the `max_cache_seconds` window. If so, we check if the query string is the same as now (with some smart heuristics to guarantee coverage over both `ctas_approach`es). If they are the same, we check if the last one's results are still on S3, and then return them instead of re-running the query.\n",
2020
"- During the whole cache resolution phase, if there is anything wrong, the logic falls back to the usual `read_sql_query` path.\n",
@@ -292,7 +292,7 @@
292292
" mode=\"overwrite\",\n",
293293
" database=\"awswrangler_test\",\n",
294294
" table=\"noaa\"\n",
295-
");"
295+
")"
296296
]
297297
},
298298
{

tutorials/020 - Spark Table Interoperability.ipynb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
"\n",
99
"# 20 - Spark Table Interoperability\n",
1010
"\n",
11-
"[awswrangler](https://github.com/aws/aws-sdk-pandas) has no difficults to insert, overwrite or do any other kind of interaction with a Table created by Apache Spark.\n",
11+
"[awswrangler](https://github.com/aws/aws-sdk-pandas) has no difficulty to insert, overwrite or do any other kind of interaction with a Table created by Apache Spark.\n",
1212
"\n",
13-
"But if you want to do the oposite (Spark interacting with a table created by awswrangler) you should be aware that awswrangler follows the Hive's format and you must be explicit when using the Spark's `saveAsTable` method:"
13+
"But if you want to do the opposite (Spark interacting with a table created by awswrangler) you should be aware that awswrangler follows the Hive's format and you must be explicit when using the Spark's `saveAsTable` method:"
1414
]
1515
},
1616
{

tutorials/022 - Writing Partitions Concurrently.ipynb

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
"* `concurrent_partitioning` argument:\n",
1212
"\n",
1313
" If True will increase the parallelism level during the partitions writing. It will decrease the\n",
14-
" writing time and increase the memory usage.\n",
14+
" writing time and increase memory usage.\n",
1515
"\n",
1616
"*P.S. Check the [function API doc](https://aws-sdk-pandas.readthedocs.io/en/2.17.0/api.html) to see it has some argument that can be configured through Global configurations.*"
1717
]
@@ -121,7 +121,7 @@
121121
" dataset=True,\n",
122122
" mode=\"overwrite\",\n",
123123
" partition_cols=[\"year\"],\n",
124-
");"
124+
")"
125125
]
126126
},
127127
{
@@ -157,15 +157,8 @@
157157
" mode=\"overwrite\",\n",
158158
" partition_cols=[\"year\"],\n",
159159
" concurrent_partitioning=True # <-----\n",
160-
");"
160+
")"
161161
]
162-
},
163-
{
164-
"cell_type": "code",
165-
"execution_count": null,
166-
"metadata": {},
167-
"outputs": [],
168-
"source": []
169162
}
170163
],
171164
"metadata": {

tutorials/025 - Redshift - Loading Parquet files with Spectrum.ipynb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@
164164
"metadata": {},
165165
"outputs": [],
166166
"source": [
167-
"wr.s3.to_parquet(df, PATH, max_rows_by_file=2, dataset=True, mode=\"overwrite\");"
167+
"wr.s3.to_parquet(df, PATH, max_rows_by_file=2, dataset=True, mode=\"overwrite\")"
168168
]
169169
},
170170
{
@@ -252,7 +252,7 @@
252252
" \"col0\": [10, 11],\n",
253253
" \"col1\": [\"k\", \"l\"],\n",
254254
"})\n",
255-
"wr.s3.to_parquet(df, PATH, dataset=True, mode=\"overwrite\");"
255+
"wr.s3.to_parquet(df, PATH, dataset=True, mode=\"overwrite\")"
256256
]
257257
},
258258
{

tutorials/026 - Amazon Timestream.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
"from datetime import datetime\n",
2828
"\n",
2929
"wr.timestream.create_database(\"sampleDB\")\n",
30-
"wr.timestream.create_table(\"sampleDB\", \"sampleTable\", memory_retention_hours=1, magnetic_retention_days=1);"
30+
"wr.timestream.create_table(\"sampleDB\", \"sampleTable\", memory_retention_hours=1, magnetic_retention_days=1)"
3131
]
3232
},
3333
{

tutorials/027 - Amazon Timestream 2.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@
101101
"outputs": [],
102102
"source": [
103103
"wr.timestream.create_database(\"sampleDB\")\n",
104-
"wr.timestream.create_table(\"sampleDB\", \"sampleTable\", memory_retention_hours=1, magnetic_retention_days=1);"
104+
"wr.timestream.create_table(\"sampleDB\", \"sampleTable\", memory_retention_hours=1, magnetic_retention_days=1)"
105105
]
106106
},
107107
{

0 commit comments

Comments
 (0)