Skip to content

Commit c25eabb

Browse files
committed
pythongh-113538: Allow client connections to be closed
Give applications the option of more forcefully terminating client connections for asyncio servers. Useful when terminating a service and there is limited time to wait for clients to finish up their work.
1 parent 9f0111e commit c25eabb

File tree

8 files changed

+110
-12
lines changed

8 files changed

+110
-12
lines changed

Doc/library/asyncio-eventloop.rst

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1641,6 +1641,23 @@ Do not instantiate the :class:`Server` class directly.
16411641
coroutine to wait until the server is closed (and no more
16421642
connections are active).
16431643

1644+
.. method:: close_clients()
1645+
1646+
Close all existing incoming client connections.
1647+
1648+
Calls :meth:`Transport.close` on all associated transports.
1649+
1650+
.. versionadded:: 3.13
1651+
1652+
.. method:: abort_clients()
1653+
1654+
Close all existing incoming client connections immediately,
1655+
without waiting for pending operations to complete.
1656+
1657+
Calls :meth:`Transport.abort` on all associated transports.
1658+
1659+
.. versionadded:: 3.13
1660+
16441661
.. method:: get_loop()
16451662

16461663
Return the event loop associated with the server object.

Doc/whatsnew/3.13.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ asyncio
168168
the Unix socket when the server is closed.
169169
(Contributed by Pierre Ossman in :gh:`111246`.)
170170

171+
* Add :meth:`asyncio.Server.close_clients` and
172+
:meth:`asyncio.Server.abort_clients` methods which allows to more
173+
forcefully close an asyncio server.
174+
171175
copy
172176
----
173177

Lib/asyncio/base_events.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
277277
ssl_handshake_timeout, ssl_shutdown_timeout=None):
278278
self._loop = loop
279279
self._sockets = sockets
280-
self._active_count = 0
280+
self._clients = set()
281281
self._waiters = []
282282
self._protocol_factory = protocol_factory
283283
self._backlog = backlog
@@ -290,14 +290,14 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
290290
def __repr__(self):
291291
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
292292

293-
def _attach(self):
293+
def _attach(self, transport):
294294
assert self._sockets is not None
295-
self._active_count += 1
295+
self._clients.add(transport)
296296

297-
def _detach(self):
298-
assert self._active_count > 0
299-
self._active_count -= 1
300-
if self._active_count == 0 and self._sockets is None:
297+
def _detach(self, transport):
298+
assert transport in self._clients
299+
self._clients.remove(transport)
300+
if len(self._clients) == 0 and self._sockets is None:
301301
self._wakeup()
302302

303303
def _wakeup(self):
@@ -346,9 +346,17 @@ def close(self):
346346
self._serving_forever_fut.cancel()
347347
self._serving_forever_fut = None
348348

349-
if self._active_count == 0:
349+
if len(self._clients) == 0:
350350
self._wakeup()
351351

352+
def close_clients(self):
353+
for transport in self._clients.copy():
354+
transport.close()
355+
356+
def abort_clients(self):
357+
for transport in self._clients.copy():
358+
transport.abort()
359+
352360
async def start_serving(self):
353361
self._start_serving()
354362
# Skip one loop iteration so that all 'loop.add_reader'

Lib/asyncio/events.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,14 @@ def close(self):
173173
"""Stop serving. This leaves existing connections open."""
174174
raise NotImplementedError
175175

176+
def close_clients(self):
177+
"""Close all active connections."""
178+
raise NotImplementedError
179+
180+
def abort_clients(self):
181+
"""Close all active connections immediately."""
182+
raise NotImplementedError
183+
176184
def get_loop(self):
177185
"""Get the event loop the Server object is attached to."""
178186
raise NotImplementedError

Lib/asyncio/proactor_events.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def __init__(self, loop, sock, protocol, waiter=None,
6363
self._called_connection_lost = False
6464
self._eof_written = False
6565
if self._server is not None:
66-
self._server._attach()
66+
self._server._attach(self)
6767
self._loop.call_soon(self._protocol.connection_made, self)
6868
if waiter is not None:
6969
# only wake up the waiter when connection_made() has been called
@@ -167,7 +167,7 @@ def _call_connection_lost(self, exc):
167167
self._sock = None
168168
server = self._server
169169
if server is not None:
170-
server._detach()
170+
server._detach(self)
171171
self._server = None
172172
self._called_connection_lost = True
173173

Lib/asyncio/selector_events.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,7 @@ def __init__(self, loop, sock, protocol, extra=None, server=None):
787787
self._paused = False # Set when pause_reading() called
788788

789789
if self._server is not None:
790-
self._server._attach()
790+
self._server._attach(self)
791791
loop._transports[self._sock_fd] = self
792792

793793
def __repr__(self):
@@ -902,7 +902,7 @@ def _call_connection_lost(self, exc):
902902
self._loop = None
903903
server = self._server
904904
if server is not None:
905-
server._detach()
905+
server._detach(self)
906906
self._server = None
907907

908908
def get_write_buffer_size(self):

Lib/test/test_asyncio/test_server.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,65 @@ async def serve(rd, wr):
187187
loop.call_soon(wr.close)
188188
await srv.wait_closed()
189189

190+
async def test_close_clients(self):
191+
async def serve(rd, wr):
192+
try:
193+
await rd.read()
194+
finally:
195+
wr.close()
196+
await wr.wait_closed()
197+
198+
srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
199+
self.addCleanup(srv.close)
200+
201+
addr = srv.sockets[0].getsockname()
202+
(rd, wr) = await asyncio.open_connection(addr[0], addr[1])
203+
self.addCleanup(wr.close)
204+
205+
task = asyncio.create_task(srv.wait_closed())
206+
await asyncio.sleep(0)
207+
self.assertFalse(task.done())
190208

209+
srv.close()
210+
srv.close_clients()
211+
await asyncio.sleep(0.1) # FIXME: flush call_soon()?
212+
self.assertTrue(task.done())
213+
214+
async def test_abort_clients(self):
215+
async def serve(rd, wr):
216+
nonlocal s_rd, s_wr
217+
s_rd = rd
218+
s_wr = wr
219+
await wr.wait_closed()
220+
221+
s_rd = s_wr = None
222+
srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
223+
self.addCleanup(srv.close)
224+
225+
addr = srv.sockets[0].getsockname()
226+
(c_rd, c_wr) = await asyncio.open_connection(addr[0], addr[1])
227+
self.addCleanup(c_wr.close)
228+
229+
# Make sure both sides are in a paused state
230+
while (s_wr.transport.get_write_buffer_size() == 0 or
231+
c_wr.transport.is_reading()):
232+
while s_wr.transport.get_write_buffer_size() == 0:
233+
s_wr.write(b'a' * 65536)
234+
await asyncio.sleep(0)
235+
await asyncio.sleep(0.1) # FIXME: More socket buffer space magically appears?
236+
237+
task = asyncio.create_task(srv.wait_closed())
238+
await asyncio.sleep(0)
239+
self.assertFalse(task.done())
240+
241+
# Sanity check
242+
self.assertNotEqual(s_wr.transport.get_write_buffer_size(), 0)
243+
self.assertFalse(c_wr.transport.is_reading())
244+
245+
srv.close()
246+
srv.abort_clients()
247+
await asyncio.sleep(0.1) # FIXME: flush call_soon()?
248+
self.assertTrue(task.done())
191249

192250

193251
# Test the various corner cases of Unix server socket removal
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Add :meth:`asyncio.Server.close_clients` and
2+
:meth:`asyncio.Server.abort_clients` methods which allows to more forcefully
3+
close an asyncio server.

0 commit comments

Comments
 (0)