Skip to content

Commit c9f5709

Browse files
authored
Merge pull request #733 from robsdedude/fix-pool-connection-ownership-4.4
[4.4] Fix pool connection ownership 4.4 Backport or #732 Fixing and testing two faulty corner-cases * When a connection fails, all idle connections to that host will be closed. If their closure fails, the same action is invoked recursively while being in the process of closing idle connections already. This leads to inconsistent data. * [4.4 only] When opening of a new connection fails, the pool used to forcefully close all connections to the host, including lent connections. Opening the doors wide for race conditions because connections are not thread safe.
2 parents 7ca9d90 + f2b9057 commit c9f5709

File tree

2 files changed

+63
-22
lines changed

2 files changed

+63
-22
lines changed

neo4j/io/__init__.py

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ def time_remaining():
695695
try:
696696
connection = self.opener(address, timeout)
697697
except ServiceUnavailable:
698-
self.remove(address)
698+
self.deactivate(address)
699699
raise
700700
else:
701701
connection.pool = self
@@ -772,38 +772,34 @@ def deactivate(self, address):
772772
connections = self.connections[address]
773773
except KeyError: # already removed from the connection pool
774774
return
775-
for conn in list(connections):
776-
if not conn.in_use:
777-
connections.remove(conn)
778-
try:
779-
conn.close()
780-
except OSError:
781-
pass
782-
if not connections:
783-
self.remove(address)
775+
closable_connections = [
776+
conn for conn in connections if not conn.in_use
777+
]
778+
# First remove all connections in question, then try to close them.
779+
# If closing of a connection fails, we will end up in this method
780+
# again.
781+
for conn in closable_connections:
782+
connections.remove(conn)
783+
for conn in closable_connections:
784+
conn.close()
785+
if not self.connections[address]:
786+
del self.connections[address]
784787

785788
def on_write_failure(self, address):
786789
raise WriteServiceUnavailable("No write service available for pool {}".format(self))
787790

788-
def remove(self, address):
789-
""" Remove an address from the connection pool, if present, closing
790-
all connections to that address.
791-
"""
792-
with self.lock:
793-
for connection in self.connections.pop(address, ()):
794-
try:
795-
connection.close()
796-
except OSError:
797-
pass
798-
799791
def close(self):
800792
""" Close all connections and empty the pool.
801793
This method is thread safe.
802794
"""
803795
try:
804796
with self.lock:
805797
for address in list(self.connections):
806-
self.remove(address)
798+
for connection in self.connections.pop(address, ()):
799+
try:
800+
connection.close()
801+
except OSError:
802+
pass
807803
except TypeError:
808804
pass
809805

tests/unit/io/test_neo4j_pool.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
RoutingConfig,
3636
WorkspaceConfig
3737
)
38+
from neo4j.exceptions import (
39+
ServiceUnavailable,
40+
SessionExpired
41+
)
3842
from neo4j.io import Neo4jPool
3943

4044

@@ -226,3 +230,44 @@ def test_release_does_not_resets_defunct_connections(opener):
226230
cx1.defunct.assert_called_once()
227231
cx1.is_reset_mock.asset_not_called()
228232
cx1.reset.asset_not_called()
233+
234+
235+
def test_multiple_broken_connections_on_close(opener):
236+
def mock_connection_breaks_on_close(cx):
237+
def close_side_effect():
238+
cx.closed.return_value = True
239+
cx.defunct.return_value = True
240+
pool.deactivate(READER_ADDRESS)
241+
242+
cx.attach_mock(Mock(side_effect=close_side_effect), "close")
243+
244+
# create pool with 2 idle connections
245+
pool = Neo4jPool(opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS)
246+
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
247+
cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None)
248+
pool.release(cx1)
249+
pool.release(cx2)
250+
251+
# both will loose connection
252+
mock_connection_breaks_on_close(cx1)
253+
mock_connection_breaks_on_close(cx2)
254+
255+
# force pool to close cx1, which will make it realize that the server is
256+
# unreachable
257+
cx1.stale.return_value = True
258+
259+
cx3 = pool.acquire(READ_ACCESS, 30, "test_db", None)
260+
261+
assert cx3 is not cx1
262+
assert cx3 is not cx2
263+
264+
265+
def test_failing_opener_leaves_connections_in_use_alone(opener):
266+
pool = Neo4jPool(opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS)
267+
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
268+
269+
opener.side_effect = ServiceUnavailable("Server overloaded")
270+
with pytest.raises((ServiceUnavailable, SessionExpired)):
271+
pool.acquire(READ_ACCESS, 30, "test_db", None)
272+
273+
assert not cx1.closed()

0 commit comments

Comments
 (0)