Skip to content

Commit 4c2c11a

Browse files
authored
Merge branch 'main' into release_contraint_on_pandas
2 parents 43eaa1b + a37a54c commit 4c2c11a

File tree

2 files changed

+48
-21
lines changed

2 files changed

+48
-21
lines changed

awswrangler/opensearch/_utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,8 @@ def create_collection(
332332
status = response["collectionDetails"][0]["status"] # type: ignore
333333

334334
if status == "FAILED":
335-
error_details: str = response.get("collectionErrorDetails")[0] # type: ignore
335+
errors = response["collectionErrorDetails"] # type: ignore
336+
error_details = errors[0] if len(errors) > 0 else "No error details provided"
336337
raise exceptions.QueryFailed(f"Failed to create collection `{name}`: {error_details}.")
337338

338339
return response["collectionDetails"][0] # type: ignore

tests/test_opensearch.py

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@
127127
]
128128

129129

130+
def _get_unique_suffix() -> str:
131+
return str(uuid.uuid4())[:8]
132+
133+
130134
@pytest.fixture(scope="session")
131135
def cloudformation_outputs():
132136
return extract_cloudformation_outputs()
@@ -249,7 +253,7 @@ def client(request, opensearch_1_0_client, elasticsearch_7_10_fgac_client, opens
249253

250254

251255
def test_create_index(client):
252-
index = "test_create_index"
256+
index = f"test_create_index_{_get_unique_suffix()}"
253257
wr.opensearch.delete_index(client, index)
254258
time.sleep(30) # let the cluster clean up
255259
response = wr.opensearch.create_index(
@@ -259,56 +263,65 @@ def test_create_index(client):
259263
settings={"index": {"number_of_shards": 1, "number_of_replicas": 1}},
260264
)
261265
assert response.get("acknowledged", False) is True
266+
wr.opensearch.delete_index(client, index)
262267

263268

264269
def test_delete_index(client):
265-
index = "test_delete_index"
270+
index = f"test_create_index_{_get_unique_suffix()}"
266271
wr.opensearch.create_index(client, index=index)
267272
response = wr.opensearch.delete_index(client, index=index)
268273
assert response.get("acknowledged", False) is True
269274

270275

271276
def test_index_df(client):
277+
index = f"test_index_df_{_get_unique_suffix()}"
272278
response = wr.opensearch.index_df(
273279
client,
274280
df=pd.DataFrame([{"_id": "1", "name": "John"}, {"_id": "2", "name": "George"}, {"_id": "3", "name": "Julia"}]),
275-
index="test_index_df1",
281+
index=index,
276282
)
277283
assert response.get("success", 0) == 3
284+
wr.opensearch.delete_index(client, index)
278285

279286

280287
def test_index_df_with_array(client):
288+
index = f"test_index_df_array_{_get_unique_suffix()}"
281289
response = wr.opensearch.index_df(
282290
client,
283291
df=pd.DataFrame(
284292
[{"_id": "1", "name": "John", "tags": ["foo", "bar"]}, {"_id": "2", "name": "George", "tags": ["foo"]}]
285293
),
286-
index="test_index_df1",
294+
index=index,
287295
)
288296
assert response.get("success", 0) == 2
297+
wr.opensearch.delete_index(client, index)
289298

290299

291300
def test_index_documents(client):
301+
index = f"test_index_documents_{_get_unique_suffix()}"
292302
response = wr.opensearch.index_documents(
293303
client,
294304
documents=[{"_id": "1", "name": "John"}, {"_id": "2", "name": "George"}, {"_id": "3", "name": "Julia"}],
295-
index="test_index_documents1",
305+
index=index,
296306
)
297307
assert response.get("success", 0) == 3
308+
wr.opensearch.delete_index(client, index)
298309

299310

300311
def test_index_documents_id_keys(client):
301-
wr.opensearch.index_documents(
302-
client, documents=inspections_documents, index="test_index_documents_id_keys", id_keys=["inspection_id"]
303-
)
312+
index = f"test_index_documents_id_keys_{_get_unique_suffix()}"
313+
wr.opensearch.index_documents(client, documents=inspections_documents, index=index, id_keys=["inspection_id"])
314+
wr.opensearch.delete_index(client, index)
304315

305316

306317
def test_index_documents_no_id_keys(client):
307-
wr.opensearch.index_documents(client, documents=inspections_documents, index="test_index_documents_no_id_keys")
318+
index = f"test_index_documents_no_id_keys_{_get_unique_suffix()}"
319+
wr.opensearch.index_documents(client, documents=inspections_documents, index=index)
320+
wr.opensearch.delete_index(client, index)
308321

309322

310323
def test_search(client):
311-
index = "test_search"
324+
index = f"test_search_{_get_unique_suffix()}"
312325
kwargs = {} if _is_serverless(client) else {"refresh": "wait_for"}
313326
wr.opensearch.index_documents(
314327
client, documents=inspections_documents, index=index, id_keys=["inspection_id"], **kwargs
@@ -330,11 +343,12 @@ def test_search(client):
330343
search_body={"query": {"match": {"business_name": "message"}}},
331344
)
332345
assert df.shape == (0, 0)
346+
wr.opensearch.delete_index(client, index)
333347

334348

335349
@pytest.mark.parametrize("filter_path", [None, "hits.hits._source", ["hits.hits._source"]])
336350
def test_search_filter_path(client, filter_path):
337-
index = "test_search"
351+
index = f"test_search_filter_{_get_unique_suffix()}"
338352
kwargs = {} if _is_serverless(client) else {"refresh": "wait_for"}
339353
wr.opensearch.index_documents(
340354
client, documents=inspections_documents, index=index, id_keys=["inspection_id"], **kwargs
@@ -351,11 +365,12 @@ def test_search_filter_path(client, filter_path):
351365
filter_path=filter_path,
352366
)
353367
assert df.shape[0] == 3
368+
wr.opensearch.delete_index(client, index)
354369

355370

356371
@pytest.mark.xfail(raises=wr.exceptions.NotSupported, reason="Scroll not available for OpenSearch Serverless.")
357372
def test_search_scroll(client):
358-
index = "test_search_scroll"
373+
index = f"test_search_scroll_{_get_unique_suffix()}"
359374
kwargs = {} if _is_serverless(client) else {"refresh": "wait_for"}
360375
wr.opensearch.index_documents(
361376
client, documents=inspections_documents, index=index, id_keys=["inspection_id"], **kwargs
@@ -364,32 +379,37 @@ def test_search_scroll(client):
364379
client, index=index, is_scroll=True, _source=["inspection_id", "business_name", "business_location"]
365380
)
366381
assert df.shape[0] == 5
382+
wr.opensearch.delete_index(client, index)
367383

368384

369385
@pytest.mark.xfail(raises=wr.exceptions.NotSupported, reason="SQL plugin not available for OpenSearch Serverless.")
370386
@pytest.mark.parametrize("fetch_size", [None, 1000, 10000])
371387
@pytest.mark.parametrize("fetch_size_param_name", ["size", "fetch_size"])
372388
def test_search_sql(client, fetch_size, fetch_size_param_name):
373-
index = "test_search_sql"
389+
index = f"test_search_sql_{_get_unique_suffix()}"
374390
kwargs = {} if _is_serverless(client) else {"refresh": "wait_for"}
375391
wr.opensearch.index_documents(
376392
client, documents=inspections_documents, index=index, id_keys=["inspection_id"], **kwargs
377393
)
378394
search_kwargs = {fetch_size_param_name: fetch_size} if fetch_size else {}
379395
df = wr.opensearch.search_by_sql(client, sql_query=f"select * from {index}", **search_kwargs)
380396
assert df.shape[0] == 5
397+
wr.opensearch.delete_index(client, index)
381398

382399

383400
def test_index_json_local(client):
401+
index = f"test_index_json_local_{_get_unique_suffix()}"
384402
file_path = f"{tempfile.gettempdir()}/inspections.json"
385403
with open(file_path, "w") as filehandle:
386404
for doc in inspections_documents:
387405
filehandle.write("%s\n" % json.dumps(doc))
388-
response = wr.opensearch.index_json(client, index="test_index_json_local", path=file_path)
406+
response = wr.opensearch.index_json(client, index=index, path=file_path)
389407
assert response.get("success", 0) == 6
408+
wr.opensearch.delete_index(client, index)
390409

391410

392411
def test_index_json_s3(client, path):
412+
index = f"test_index_json_s3_{_get_unique_suffix()}"
393413
file_path = f"{tempfile.gettempdir()}/inspections.json"
394414
with open(file_path, "w") as filehandle:
395415
for doc in inspections_documents:
@@ -398,22 +418,24 @@ def test_index_json_s3(client, path):
398418
path = f"{path}opensearch/inspections.json"
399419
bucket, key = wr._utils.parse_path(path)
400420
s3.upload_file(file_path, bucket, key)
401-
response = wr.opensearch.index_json(client, index="test_index_json_s3", path=path)
421+
response = wr.opensearch.index_json(client, index=index, path=path)
402422
assert response.get("success", 0) == 6
423+
wr.opensearch.delete_index(client, index)
403424

404425

405426
def test_index_csv_local(client):
406427
file_path = f"{tempfile.gettempdir()}/inspections.csv"
407-
index = "test_index_csv_local"
428+
index = f"test_index_csv_local_{_get_unique_suffix()}"
408429
df = pd.DataFrame(inspections_documents)
409430
df.to_csv(file_path, index=False)
410431
response = wr.opensearch.index_csv(client, path=file_path, index=index)
411432
assert response.get("success", 0) == 6
433+
wr.opensearch.delete_index(client, index)
412434

413435

414436
def test_index_csv_s3(client, path):
415437
file_path = f"{tempfile.gettempdir()}/inspections.csv"
416-
index = "test_index_csv_s3"
438+
index = f"test_index_csv_s3_{_get_unique_suffix()}"
417439
df = pd.DataFrame(inspections_documents)
418440
df.to_csv(file_path, index=False)
419441
s3 = boto3.client("s3")
@@ -422,20 +444,24 @@ def test_index_csv_s3(client, path):
422444
s3.upload_file(file_path, bucket, key)
423445
response = wr.opensearch.index_csv(client, path=path, index=index)
424446
assert response.get("success", 0) == 6
447+
wr.opensearch.delete_index(client, index)
425448

426449

427450
@pytest.mark.skip(reason="takes a long time (~5 mins) since testing against small clusters")
428451
def test_index_json_s3_large_file(client):
452+
index = f"test_index_json_s3_large_file_{_get_unique_suffix()}"
429453
path = "s3://irs-form-990/index_2011.json"
430454
response = wr.opensearch.index_json(
431-
client, index="test_index_json_s3_large_file", path=path, json_path="Filings2011", id_keys=["EIN"], bulk_size=20
455+
client, index=index, path=path, json_path="Filings2011", id_keys=["EIN"], bulk_size=20
432456
)
433457
assert response.get("success", 0) > 0
458+
wr.opensearch.delete_index(client, index)
434459

435460

461+
@pytest.mark.skip(reason="Temporary skip until collection cleanup issue is resolved")
436462
def test_opensearch_serverless_create_collection(opensearch_serverless_client) -> None:
437-
collection_name: str = f"col-{str(uuid.uuid4())[:8]}"
438-
client: boto3.client = boto3.client(service_name="opensearchserverless")
463+
collection_name: str = f"col-{_get_unique_suffix()}"
464+
client = boto3.client(service_name="opensearchserverless")
439465

440466
collection: Dict[str, Any] = wr.opensearch.create_collection(
441467
name=collection_name,

0 commit comments

Comments
 (0)