Skip to content

Commit b695e56

Browse files
committed
Don't remove broken connections multiple times from the pool
Fixes `ValueError: deque.remove(x): x not in deque` when multiple connections to the same server brake more or less simultaneously.
1 parent d618b7b commit b695e56

File tree

4 files changed

+86
-22
lines changed

4 files changed

+86
-22
lines changed

neo4j/_async/io/_pool.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,16 @@ async def deactivate(self, address):
241241
connections = self.connections[address]
242242
except KeyError: # already removed from the connection pool
243243
return
244-
for conn in list(connections):
245-
if not conn.in_use:
246-
connections.remove(conn)
247-
try:
248-
await conn.close()
249-
except OSError:
250-
pass
244+
closable_connections = [
245+
conn for conn in connections if not conn.in_use
246+
]
247+
# First remove all connections in question, then try to close them.
248+
# If closing of a connection fails, we will end up in this method
249+
# again.
250+
for conn in closable_connections:
251+
connections.remove(conn)
252+
for conn in closable_connections:
253+
await conn.close()
251254
if not self.connections[address]:
252255
del self.connections[address]
253256

neo4j/_sync/io/_pool.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,16 @@ def deactivate(self, address):
241241
connections = self.connections[address]
242242
except KeyError: # already removed from the connection pool
243243
return
244-
for conn in list(connections):
245-
if not conn.in_use:
246-
connections.remove(conn)
247-
try:
248-
conn.close()
249-
except OSError:
250-
pass
244+
closable_connections = [
245+
conn for conn in connections if not conn.in_use
246+
]
247+
# First remove all connections in question, then try to close them.
248+
# If closing of a connection fails, we will end up in this method
249+
# again.
250+
for conn in closable_connections:
251+
connections.remove(conn)
252+
for conn in closable_connections:
253+
conn.close()
251254
if not self.connections[address]:
252255
del self.connections[address]
253256

tests/unit/async_/io/test_neo4j_pool.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@
3535
)
3636

3737
from ...._async_compat import mark_async_test
38-
from ..._async_compat import (
39-
AsyncMock,
40-
mark_async_test,
41-
)
4238
from ..work import async_fake_connection_generator
4339

4440

@@ -390,6 +386,39 @@ def liveness_side_effect(*args, **kwargs):
390386
assert cx3 in pool.connections[cx1.addr]
391387

392388

389+
@mark_async_test
390+
async def test_multiple_broken_connections_on_close(opener, mocker):
391+
def mock_connection_breaks_on_close(cx):
392+
async def close_side_effect():
393+
cx.closed.return_value = True
394+
cx.defunct.return_value = True
395+
await pool.deactivate(READER_ADDRESS)
396+
397+
cx.attach_mock(mocker.AsyncMock(side_effect=close_side_effect),
398+
"close")
399+
400+
# create pool with 2 idle connections
401+
pool = AsyncNeo4jPool(
402+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
403+
)
404+
cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
405+
cx2 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
406+
await pool.release(cx1)
407+
await pool.release(cx2)
408+
409+
# both will loose connection
410+
mock_connection_breaks_on_close(cx1)
411+
mock_connection_breaks_on_close(cx2)
412+
413+
# force pool to close cx1, which will make it realize that the server is
414+
# unreachable
415+
cx1.stale.return_value = True
416+
417+
cx3 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
418+
419+
assert cx3 is not cx1
420+
assert cx3 is not cx2
421+
393422

394423
@mark_async_test
395424
async def test_failing_opener_leaves_connections_in_use_alone(opener):

tests/unit/sync/io/test_neo4j_pool.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@
3535
)
3636

3737
from ...._async_compat import mark_sync_test
38-
from ..._async_compat import (
39-
mark_sync_test,
40-
Mock,
41-
)
4238
from ..work import fake_connection_generator
4339

4440

@@ -390,6 +386,39 @@ def liveness_side_effect(*args, **kwargs):
390386
assert cx3 in pool.connections[cx1.addr]
391387

392388

389+
@mark_sync_test
390+
def test_multiple_broken_connections_on_close(opener, mocker):
391+
def mock_connection_breaks_on_close(cx):
392+
def close_side_effect():
393+
cx.closed.return_value = True
394+
cx.defunct.return_value = True
395+
pool.deactivate(READER_ADDRESS)
396+
397+
cx.attach_mock(mocker.Mock(side_effect=close_side_effect),
398+
"close")
399+
400+
# create pool with 2 idle connections
401+
pool = Neo4jPool(
402+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
403+
)
404+
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
405+
cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None)
406+
pool.release(cx1)
407+
pool.release(cx2)
408+
409+
# both will loose connection
410+
mock_connection_breaks_on_close(cx1)
411+
mock_connection_breaks_on_close(cx2)
412+
413+
# force pool to close cx1, which will make it realize that the server is
414+
# unreachable
415+
cx1.stale.return_value = True
416+
417+
cx3 = pool.acquire(READ_ACCESS, 30, "test_db", None)
418+
419+
assert cx3 is not cx1
420+
assert cx3 is not cx2
421+
393422

394423
@mark_sync_test
395424
def test_failing_opener_leaves_connections_in_use_alone(opener):

0 commit comments

Comments
 (0)