Skip to content

Commit 91292a9

Browse files
committed
Don't remove broken connections multiple times from the pool
Fixes `ValueError: deque.remove(x): x not in deque` when multiple connections to the same server brake more or less simultaneously.
1 parent d618b7b commit 91292a9

File tree

4 files changed

+88
-22
lines changed

4 files changed

+88
-22
lines changed

neo4j/_async/io/_pool.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,16 @@ async def deactivate(self, address):
241241
connections = self.connections[address]
242242
except KeyError: # already removed from the connection pool
243243
return
244-
for conn in list(connections):
245-
if not conn.in_use:
246-
connections.remove(conn)
247-
try:
248-
await conn.close()
249-
except OSError:
250-
pass
244+
closable_connections = [
245+
conn for conn in connections if not conn.in_use
246+
]
247+
# First remove all connections in question, then try to close them.
248+
# If closing of a connection fails, we will end up in this method
249+
# again.
250+
for conn in closable_connections:
251+
connections.remove(conn)
252+
for conn in closable_connections:
253+
await conn.close()
251254
if not self.connections[address]:
252255
del self.connections[address]
253256

neo4j/_sync/io/_pool.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,16 @@ def deactivate(self, address):
241241
connections = self.connections[address]
242242
except KeyError: # already removed from the connection pool
243243
return
244-
for conn in list(connections):
245-
if not conn.in_use:
246-
connections.remove(conn)
247-
try:
248-
conn.close()
249-
except OSError:
250-
pass
244+
closable_connections = [
245+
conn for conn in connections if not conn.in_use
246+
]
247+
# First remove all connections in question, then try to close them.
248+
# If closing of a connection fails, we will end up in this method
249+
# again.
250+
for conn in closable_connections:
251+
connections.remove(conn)
252+
for conn in closable_connections:
253+
conn.close()
251254
if not self.connections[address]:
252255
del self.connections[address]
253256

tests/unit/async_/io/test_neo4j_pool.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@
3535
)
3636

3737
from ...._async_compat import mark_async_test
38-
from ..._async_compat import (
39-
AsyncMock,
40-
mark_async_test,
41-
)
4238
from ..work import async_fake_connection_generator
4339

4440

@@ -391,6 +387,40 @@ def liveness_side_effect(*args, **kwargs):
391387

392388

393389

390+
@mark_async_test
391+
async def test_multiple_broken_connections_on_close(opener, mocker):
392+
def mock_connection_breaks_on_close(cx):
393+
async def close_side_effect():
394+
cx.closed.return_value = True
395+
cx.defunct.return_value = True
396+
await pool.deactivate(READER_ADDRESS)
397+
398+
cx.attach_mock(mocker.AsyncMock(side_effect=close_side_effect),
399+
"close")
400+
401+
# create pool with 2 idle connections
402+
pool = AsyncNeo4jPool(
403+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
404+
)
405+
cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
406+
cx2 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
407+
await pool.release(cx1)
408+
await pool.release(cx2)
409+
410+
# both will loose connection
411+
mock_connection_breaks_on_close(cx1)
412+
mock_connection_breaks_on_close(cx2)
413+
414+
# force pool to close cx1, which will make it realize that the server is
415+
# unreachable
416+
cx1.stale.return_value = True
417+
418+
cx3 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
419+
420+
assert cx3 is not cx1
421+
assert cx3 is not cx2
422+
423+
394424
@mark_async_test
395425
async def test_failing_opener_leaves_connections_in_use_alone(opener):
396426
pool = AsyncNeo4jPool(

tests/unit/sync/io/test_neo4j_pool.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@
3535
)
3636

3737
from ...._async_compat import mark_sync_test
38-
from ..._async_compat import (
39-
mark_sync_test,
40-
Mock,
41-
)
4238
from ..work import fake_connection_generator
4339

4440

@@ -391,6 +387,40 @@ def liveness_side_effect(*args, **kwargs):
391387

392388

393389

390+
@mark_sync_test
391+
def test_multiple_broken_connections_on_close(opener, mocker):
392+
def mock_connection_breaks_on_close(cx):
393+
def close_side_effect():
394+
cx.closed.return_value = True
395+
cx.defunct.return_value = True
396+
pool.deactivate(READER_ADDRESS)
397+
398+
cx.attach_mock(mocker.Mock(side_effect=close_side_effect),
399+
"close")
400+
401+
# create pool with 2 idle connections
402+
pool = Neo4jPool(
403+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
404+
)
405+
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
406+
cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None)
407+
pool.release(cx1)
408+
pool.release(cx2)
409+
410+
# both will loose connection
411+
mock_connection_breaks_on_close(cx1)
412+
mock_connection_breaks_on_close(cx2)
413+
414+
# force pool to close cx1, which will make it realize that the server is
415+
# unreachable
416+
cx1.stale.return_value = True
417+
418+
cx3 = pool.acquire(READ_ACCESS, 30, "test_db", None)
419+
420+
assert cx3 is not cx1
421+
assert cx3 is not cx2
422+
423+
394424
@mark_sync_test
395425
def test_failing_opener_leaves_connections_in_use_alone(opener):
396426
pool = Neo4jPool(

0 commit comments

Comments
 (0)