Skip to content

Commit 07041fb

Browse files
authored
Merge pull request #732 from robsdedude/fix-pool-connection-ownership
Fix pool connection ownership Fixing and testing two faulty corner-cases * When a connection fails, all idle connections to that host will be closed. If their closure fails, the same action is invoked recursively while being in the process of closing idle connections already. This leads to inconsistent data. * [5.0 test only] When opening of a new connection fails, the pool used to forcefully close all connections to the host, including lent connections. Opening the doors wide for race conditions because connections are not thread safe.
2 parents 1ed96f9 + b695e56 commit 07041fb

File tree

4 files changed

+122
-40
lines changed

4 files changed

+122
-40
lines changed

neo4j/_async/io/_pool.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -241,31 +241,24 @@ async def deactivate(self, address):
241241
connections = self.connections[address]
242242
except KeyError: # already removed from the connection pool
243243
return
244-
for conn in list(connections):
245-
if not conn.in_use:
246-
connections.remove(conn)
247-
try:
248-
await conn.close()
249-
except OSError:
250-
pass
251-
if not connections:
252-
await self.remove(address)
244+
closable_connections = [
245+
conn for conn in connections if not conn.in_use
246+
]
247+
# First remove all connections in question, then try to close them.
248+
# If closing of a connection fails, we will end up in this method
249+
# again.
250+
for conn in closable_connections:
251+
connections.remove(conn)
252+
for conn in closable_connections:
253+
await conn.close()
254+
if not self.connections[address]:
255+
del self.connections[address]
253256

254257
def on_write_failure(self, address):
255258
raise WriteServiceUnavailable(
256259
"No write service available for pool {}".format(self)
257260
)
258261

259-
async def remove(self, address):
260-
""" Remove an address from the connection pool, if present, closing
261-
all connections to that address.
262-
"""
263-
async with self.lock:
264-
for connection in self.connections.pop(address, ()):
265-
try:
266-
await connection.close()
267-
except OSError:
268-
pass
269262

270263
async def close(self):
271264
""" Close all connections and empty the pool.
@@ -274,7 +267,8 @@ async def close(self):
274267
try:
275268
async with self.lock:
276269
for address in list(self.connections):
277-
await self.remove(address)
270+
for connection in self.connections.pop(address, ()):
271+
await connection.close()
278272
except TypeError:
279273
pass
280274

neo4j/_sync/io/_pool.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -241,31 +241,24 @@ def deactivate(self, address):
241241
connections = self.connections[address]
242242
except KeyError: # already removed from the connection pool
243243
return
244-
for conn in list(connections):
245-
if not conn.in_use:
246-
connections.remove(conn)
247-
try:
248-
conn.close()
249-
except OSError:
250-
pass
251-
if not connections:
252-
self.remove(address)
244+
closable_connections = [
245+
conn for conn in connections if not conn.in_use
246+
]
247+
# First remove all connections in question, then try to close them.
248+
# If closing of a connection fails, we will end up in this method
249+
# again.
250+
for conn in closable_connections:
251+
connections.remove(conn)
252+
for conn in closable_connections:
253+
conn.close()
254+
if not self.connections[address]:
255+
del self.connections[address]
253256

254257
def on_write_failure(self, address):
255258
raise WriteServiceUnavailable(
256259
"No write service available for pool {}".format(self)
257260
)
258261

259-
def remove(self, address):
260-
""" Remove an address from the connection pool, if present, closing
261-
all connections to that address.
262-
"""
263-
with self.lock:
264-
for connection in self.connections.pop(address, ()):
265-
try:
266-
connection.close()
267-
except OSError:
268-
pass
269262

270263
def close(self):
271264
""" Close all connections and empty the pool.
@@ -274,7 +267,8 @@ def close(self):
274267
try:
275268
with self.lock:
276269
for address in list(self.connections):
277-
self.remove(address)
270+
for connection in self.connections.pop(address, ()):
271+
connection.close()
278272
except TypeError:
279273
pass
280274

tests/unit/async_/io/test_neo4j_pool.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,3 +384,50 @@ def liveness_side_effect(*args, **kwargs):
384384
cx3.reset.assert_awaited_once()
385385
assert cx1 not in pool.connections[cx1.addr]
386386
assert cx3 in pool.connections[cx1.addr]
387+
388+
389+
@mark_async_test
390+
async def test_multiple_broken_connections_on_close(opener, mocker):
391+
def mock_connection_breaks_on_close(cx):
392+
async def close_side_effect():
393+
cx.closed.return_value = True
394+
cx.defunct.return_value = True
395+
await pool.deactivate(READER_ADDRESS)
396+
397+
cx.attach_mock(mocker.AsyncMock(side_effect=close_side_effect),
398+
"close")
399+
400+
# create pool with 2 idle connections
401+
pool = AsyncNeo4jPool(
402+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
403+
)
404+
cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
405+
cx2 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
406+
await pool.release(cx1)
407+
await pool.release(cx2)
408+
409+
# both will loose connection
410+
mock_connection_breaks_on_close(cx1)
411+
mock_connection_breaks_on_close(cx2)
412+
413+
# force pool to close cx1, which will make it realize that the server is
414+
# unreachable
415+
cx1.stale.return_value = True
416+
417+
cx3 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
418+
419+
assert cx3 is not cx1
420+
assert cx3 is not cx2
421+
422+
423+
@mark_async_test
424+
async def test_failing_opener_leaves_connections_in_use_alone(opener):
425+
pool = AsyncNeo4jPool(
426+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
427+
)
428+
cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
429+
430+
opener.side_effect = ServiceUnavailable("Server overloaded")
431+
with pytest.raises((ServiceUnavailable, SessionExpired)):
432+
await pool.acquire(READ_ACCESS, 30, "test_db", None)
433+
assert not cx1.closed()

tests/unit/sync/io/test_neo4j_pool.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,3 +384,50 @@ def liveness_side_effect(*args, **kwargs):
384384
cx3.reset.assert_called_once()
385385
assert cx1 not in pool.connections[cx1.addr]
386386
assert cx3 in pool.connections[cx1.addr]
387+
388+
389+
@mark_sync_test
390+
def test_multiple_broken_connections_on_close(opener, mocker):
391+
def mock_connection_breaks_on_close(cx):
392+
def close_side_effect():
393+
cx.closed.return_value = True
394+
cx.defunct.return_value = True
395+
pool.deactivate(READER_ADDRESS)
396+
397+
cx.attach_mock(mocker.Mock(side_effect=close_side_effect),
398+
"close")
399+
400+
# create pool with 2 idle connections
401+
pool = Neo4jPool(
402+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
403+
)
404+
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
405+
cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None)
406+
pool.release(cx1)
407+
pool.release(cx2)
408+
409+
# both will loose connection
410+
mock_connection_breaks_on_close(cx1)
411+
mock_connection_breaks_on_close(cx2)
412+
413+
# force pool to close cx1, which will make it realize that the server is
414+
# unreachable
415+
cx1.stale.return_value = True
416+
417+
cx3 = pool.acquire(READ_ACCESS, 30, "test_db", None)
418+
419+
assert cx3 is not cx1
420+
assert cx3 is not cx2
421+
422+
423+
@mark_sync_test
424+
def test_failing_opener_leaves_connections_in_use_alone(opener):
425+
pool = Neo4jPool(
426+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
427+
)
428+
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
429+
430+
opener.side_effect = ServiceUnavailable("Server overloaded")
431+
with pytest.raises((ServiceUnavailable, SessionExpired)):
432+
pool.acquire(READ_ACCESS, 30, "test_db", None)
433+
assert not cx1.closed()

0 commit comments

Comments
 (0)