Skip to content

Commit 7d9a95d

Browse files
committed
Add tests for pool when connection opener fails
1 parent 1ed96f9 commit 7d9a95d

File tree

4 files changed

+130
-40
lines changed

4 files changed

+130
-40
lines changed

neo4j/_async/io/_pool.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -241,31 +241,24 @@ async def deactivate(self, address):
241241
connections = self.connections[address]
242242
except KeyError: # already removed from the connection pool
243243
return
244-
for conn in list(connections):
245-
if not conn.in_use:
246-
connections.remove(conn)
247-
try:
248-
await conn.close()
249-
except OSError:
250-
pass
251-
if not connections:
252-
await self.remove(address)
244+
closable_connections = [
245+
conn for conn in connections if not conn.in_use
246+
]
247+
# First remove all connections in question, then try to close them.
248+
# If closing of a connection fails, we will end up in this method
249+
# again.
250+
for conn in closable_connections:
251+
connections.remove(conn)
252+
for conn in closable_connections:
253+
await conn.close()
254+
if not self.connections[address]:
255+
del self.connections[address]
253256

254257
def on_write_failure(self, address):
255258
raise WriteServiceUnavailable(
256259
"No write service available for pool {}".format(self)
257260
)
258261

259-
async def remove(self, address):
260-
""" Remove an address from the connection pool, if present, closing
261-
all connections to that address.
262-
"""
263-
async with self.lock:
264-
for connection in self.connections.pop(address, ()):
265-
try:
266-
await connection.close()
267-
except OSError:
268-
pass
269262

270263
async def close(self):
271264
""" Close all connections and empty the pool.
@@ -274,7 +267,8 @@ async def close(self):
274267
try:
275268
async with self.lock:
276269
for address in list(self.connections):
277-
await self.remove(address)
270+
for connection in self.connections.pop(address, ()):
271+
await connection.close()
278272
except TypeError:
279273
pass
280274

neo4j/_sync/io/_pool.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -241,31 +241,24 @@ def deactivate(self, address):
241241
connections = self.connections[address]
242242
except KeyError: # already removed from the connection pool
243243
return
244-
for conn in list(connections):
245-
if not conn.in_use:
246-
connections.remove(conn)
247-
try:
248-
conn.close()
249-
except OSError:
250-
pass
251-
if not connections:
252-
self.remove(address)
244+
closable_connections = [
245+
conn for conn in connections if not conn.in_use
246+
]
247+
# First remove all connections in question, then try to close them.
248+
# If closing of a connection fails, we will end up in this method
249+
# again.
250+
for conn in closable_connections:
251+
connections.remove(conn)
252+
for conn in closable_connections:
253+
conn.close()
254+
if not self.connections[address]:
255+
del self.connections[address]
253256

254257
def on_write_failure(self, address):
255258
raise WriteServiceUnavailable(
256259
"No write service available for pool {}".format(self)
257260
)
258261

259-
def remove(self, address):
260-
""" Remove an address from the connection pool, if present, closing
261-
all connections to that address.
262-
"""
263-
with self.lock:
264-
for connection in self.connections.pop(address, ()):
265-
try:
266-
connection.close()
267-
except OSError:
268-
pass
269262

270263
def close(self):
271264
""" Close all connections and empty the pool.
@@ -274,7 +267,8 @@ def close(self):
274267
try:
275268
with self.lock:
276269
for address in list(self.connections):
277-
self.remove(address)
270+
for connection in self.connections.pop(address, ()):
271+
connection.close()
278272
except TypeError:
279273
pass
280274

tests/unit/async_/io/test_neo4j_pool.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
)
3636

3737
from ...._async_compat import mark_async_test
38+
from ..._async_compat import (
39+
AsyncMock,
40+
mark_async_test,
41+
)
3842
from ..work import async_fake_connection_generator
3943

4044

@@ -384,3 +388,50 @@ def liveness_side_effect(*args, **kwargs):
384388
cx3.reset.assert_awaited_once()
385389
assert cx1 not in pool.connections[cx1.addr]
386390
assert cx3 in pool.connections[cx1.addr]
391+
392+
393+
394+
@mark_async_test
395+
async def test_multiple_broken_connections_on_close(opener):
396+
def mock_connection_breaks_on_close(cx):
397+
async def close_side_effect():
398+
cx.closed.return_value = True
399+
cx.defunct.return_value = True
400+
await pool.deactivate(READER_ADDRESS)
401+
402+
cx.attach_mock(AsyncMock(side_effect=close_side_effect), "close")
403+
404+
# create pool with 2 idle connections
405+
pool = AsyncNeo4jPool(
406+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
407+
)
408+
cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
409+
cx2 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
410+
await pool.release(cx1)
411+
await pool.release(cx2)
412+
413+
# both will loose connection
414+
mock_connection_breaks_on_close(cx1)
415+
mock_connection_breaks_on_close(cx2)
416+
417+
# force pool to close cx1, which will make it realize that the server is
418+
# unreachable
419+
cx1.stale.return_value = True
420+
421+
cx3 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
422+
423+
assert cx3 is not cx1
424+
assert cx3 is not cx2
425+
426+
427+
@mark_async_test
428+
async def test_failing_opener_leaves_connections_in_use_alone(opener):
429+
pool = AsyncNeo4jPool(
430+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
431+
)
432+
cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
433+
434+
opener.side_effect = ServiceUnavailable("Server overloaded")
435+
with pytest.raises((ServiceUnavailable, SessionExpired)):
436+
await pool.acquire(READ_ACCESS, 30, "test_db", None)
437+
assert not cx1.closed()

tests/unit/sync/io/test_neo4j_pool.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
)
3636

3737
from ...._async_compat import mark_sync_test
38+
from ..._async_compat import (
39+
mark_sync_test,
40+
Mock,
41+
)
3842
from ..work import fake_connection_generator
3943

4044

@@ -384,3 +388,50 @@ def liveness_side_effect(*args, **kwargs):
384388
cx3.reset.assert_called_once()
385389
assert cx1 not in pool.connections[cx1.addr]
386390
assert cx3 in pool.connections[cx1.addr]
391+
392+
393+
394+
@mark_sync_test
395+
def test_multiple_broken_connections_on_close(opener):
396+
def mock_connection_breaks_on_close(cx):
397+
def close_side_effect():
398+
cx.closed.return_value = True
399+
cx.defunct.return_value = True
400+
pool.deactivate(READER_ADDRESS)
401+
402+
cx.attach_mock(Mock(side_effect=close_side_effect), "close")
403+
404+
# create pool with 2 idle connections
405+
pool = Neo4jPool(
406+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
407+
)
408+
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
409+
cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None)
410+
pool.release(cx1)
411+
pool.release(cx2)
412+
413+
# both will loose connection
414+
mock_connection_breaks_on_close(cx1)
415+
mock_connection_breaks_on_close(cx2)
416+
417+
# force pool to close cx1, which will make it realize that the server is
418+
# unreachable
419+
cx1.stale.return_value = True
420+
421+
cx3 = pool.acquire(READ_ACCESS, 30, "test_db", None)
422+
423+
assert cx3 is not cx1
424+
assert cx3 is not cx2
425+
426+
427+
@mark_sync_test
428+
def test_failing_opener_leaves_connections_in_use_alone(opener):
429+
pool = Neo4jPool(
430+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
431+
)
432+
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
433+
434+
opener.side_effect = ServiceUnavailable("Server overloaded")
435+
with pytest.raises((ServiceUnavailable, SessionExpired)):
436+
pool.acquire(READ_ACCESS, 30, "test_db", None)
437+
assert not cx1.closed()

0 commit comments

Comments
 (0)