Skip to content

Add a workaround for bpo-37658 #608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions asyncpg/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,19 @@ async def wait_closed(stream):
# On Windows wait_closed() sometimes propagates
# ConnectionResetError which is totally unnecessary.
pass


# Workaround for https://bugs.python.org/issue37658
async def wait_for(fut, timeout):
if timeout is None:
return await fut

fut = asyncio.ensure_future(fut)

try:
return await asyncio.wait_for(fut, timeout)
except asyncio.CancelledError:
if fut.done():
return fut.result()
else:
raise
18 changes: 2 additions & 16 deletions asyncpg/connect_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,18 +636,13 @@ async def _connect_addr(

connector = asyncio.ensure_future(connector)
before = time.monotonic()
try:
tr, pr = await asyncio.wait_for(
connector, timeout=timeout)
except asyncio.CancelledError:
connector.add_done_callback(_close_leaked_connection)
raise
tr, pr = await compat.wait_for(connector, timeout=timeout)
timeout -= time.monotonic() - before

try:
if timeout <= 0:
raise asyncio.TimeoutError
await asyncio.wait_for(connected, timeout=timeout)
await compat.wait_for(connected, timeout=timeout)
except (Exception, asyncio.CancelledError):
tr.close()
raise
Expand Down Expand Up @@ -745,12 +740,3 @@ def _create_future(loop):
return asyncio.Future(loop=loop)
else:
return create_future()


def _close_leaked_connection(fut):
try:
tr, pr = fut.result()
if tr:
tr.close()
except asyncio.CancelledError:
pass # hide the exception
5 changes: 3 additions & 2 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import time
import warnings

from . import compat
from . import connection
from . import connect_utils
from . import exceptions
Expand Down Expand Up @@ -198,7 +199,7 @@ async def release(self, timeout):
# If the connection is in cancellation state,
# wait for the cancellation
started = time.monotonic()
await asyncio.wait_for(
await compat.wait_for(
self._con._protocol._wait_for_cancellation(),
budget)
if budget is not None:
Expand Down Expand Up @@ -623,7 +624,7 @@ async def _acquire_impl():
if timeout is None:
return await _acquire_impl()
else:
return await asyncio.wait_for(
return await compat.wait_for(
_acquire_impl(), timeout=timeout)

async def release(self, connection, *, timeout=None):
Expand Down
20 changes: 20 additions & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,26 @@ async def worker():
self.cluster.trust_local_connections()
self.cluster.reload()

async def test_pool_handles_task_cancel_in_acquire_with_timeout(self):
# See https://github.com/MagicStack/asyncpg/issues/547
pool = await self.create_pool(database='postgres',
min_size=1, max_size=1)

async def worker():
async with pool.acquire(timeout=100):
pass

# Schedule task
task = self.loop.create_task(worker())
# Yield to task, but cancel almost immediately
await asyncio.sleep(0.00000000001)
# Cancel the worker.
task.cancel()
# Wait to make sure the cleanup has completed.
await asyncio.sleep(0.4)
# Check that the connection has been returned to the pool.
self.assertEqual(pool._queue.qsize(), 1)

async def test_pool_handles_task_cancel_in_release(self):
# Use SlowResetConnectionPool to simulate
# the Task.cancel() and __aexit__ race.
Expand Down