From 39b538fe388d53b2ec8ab431558ac46b6230c0a6 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 14 Mar 2022 13:30:41 +0000 Subject: [PATCH 01/19] zero copy transport implementation --- Lib/asyncio/selector_events.py | 18 ++++++------- Lib/test/test_asyncio/test_selector_events.py | 25 +++++++++++-------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index d2ee49dd88f8cf..8f0d32f15f148a 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -749,8 +749,6 @@ class _SelectorTransport(transports._FlowControlMixin, max_size = 256 * 1024 # Buffer size passed to recv(). - _buffer_factory = bytearray # Constructs initial value for self._buffer. - # Attribute used in the destructor: it must be set even if the constructor # is not called (see _SelectorSslTransport which may start by raising an # exception) @@ -775,7 +773,7 @@ def __init__(self, loop, sock, protocol, extra=None, server=None): self.set_protocol(protocol) self._server = server - self._buffer = self._buffer_factory() + self._buffer = collections.deque() self._conn_lost = 0 # Set when call to connection_lost scheduled. self._closing = False # Set when close() called. if self._server is not None: @@ -879,7 +877,7 @@ def _call_connection_lost(self, exc): self._server = None def get_write_buffer_size(self): - return len(self._buffer) + return sum(map(len, self._buffer)) def _add_reader(self, fd, callback, *args): if self._closing: @@ -1039,6 +1037,7 @@ def write(self, data): raise RuntimeError('unable to write; sendfile is in progress') if not data: return + data = memoryview(data) if self._conn_lost: if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: @@ -1065,16 +1064,19 @@ def write(self, data): self._loop._add_writer(self._sock_fd, self._write_ready) # Add it to the buffer. - self._buffer.extend(data) + self._buffer.append(data) self._maybe_pause_protocol() def _write_ready(self): assert self._buffer, 'Data should not be empty' - if self._conn_lost: return try: - n = self._sock.send(self._buffer) + buffer = self._buffer.popleft() + n = self._sock.send(buffer) + if n != len(buffer): + # Not all data was written + self._buffer.appendleft(buffer[n:]) except (BlockingIOError, InterruptedError): pass except (SystemExit, KeyboardInterrupt): @@ -1086,8 +1088,6 @@ def _write_ready(self): if self._empty_waiter is not None: self._empty_waiter.set_exception(exc) else: - if n: - del self._buffer[:n] self._maybe_resume_protocol() # May append to buffer. if not self._buffer: self._loop._remove_writer(self._sock_fd) diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 796037bcf59c49..ffa51c5ad4845d 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1,5 +1,6 @@ """Tests for selector_events.py""" +import collections import sys import selectors import socket @@ -37,7 +38,10 @@ def _close_self_pipe(self): def list_to_buffer(l=()): - return bytearray().join(l) + buffer = collections.deque() + buffer.extend((memoryview(i) for i in l)) + return buffer + def close_transport(transport): @@ -664,14 +668,14 @@ def test_write_memoryview(self): def test_write_no_data(self): transport = self.socket_transport() - transport._buffer.extend(b'data') + transport._buffer.append(memoryview(b'data')) transport.write(b'') self.assertFalse(self.sock.send.called) self.assertEqual(list_to_buffer([b'data']), transport._buffer) def test_write_buffer(self): transport = self.socket_transport() - transport._buffer.extend(b'data1') + transport._buffer.append(b'data1') transport.write(b'data2') self.assertFalse(self.sock.send.called) self.assertEqual(list_to_buffer([b'data1', b'data2']), @@ -775,12 +779,12 @@ def test_write_ready(self): self.assertFalse(self.loop.writers) def test_write_ready_closing(self): - data = b'data' + data = memoryview(b'data') self.sock.send.return_value = len(data) transport = self.socket_transport() transport._closing = True - transport._buffer.extend(data) + transport._buffer.append(data) self.loop._add_writer(7, transport._write_ready) transport._write_ready() self.assertTrue(self.sock.send.called) @@ -795,11 +799,11 @@ def test_write_ready_no_data(self): self.assertRaises(AssertionError, transport._write_ready) def test_write_ready_partial(self): - data = b'data' + data = memoryview(b'data') self.sock.send.return_value = 2 transport = self.socket_transport() - transport._buffer.extend(data) + transport._buffer.append(data) self.loop._add_writer(7, transport._write_ready) transport._write_ready() self.loop.assert_writer(7, transport._write_ready) @@ -810,7 +814,7 @@ def test_write_ready_partial_none(self): self.sock.send.return_value = 0 transport = self.socket_transport() - transport._buffer.extend(data) + transport._buffer.append(data) self.loop._add_writer(7, transport._write_ready) transport._write_ready() self.loop.assert_writer(7, transport._write_ready) @@ -820,12 +824,13 @@ def test_write_ready_tryagain(self): self.sock.send.side_effect = BlockingIOError transport = self.socket_transport() - transport._buffer = list_to_buffer([b'data1', b'data2']) + buffer = list_to_buffer([b'data1', b'data2']) + transport._buffer = buffer self.loop._add_writer(7, transport._write_ready) transport._write_ready() self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer) + self.assertEqual(buffer, transport._buffer) def test_write_ready_exception(self): err = self.sock.send.side_effect = OSError() From abd2dc31d2a04a525aca5534730aa0017cc17f04 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Tue, 15 Mar 2022 08:21:55 +0000 Subject: [PATCH 02/19] WIP sendmsg --- Lib/asyncio/selector_events.py | 55 ++++++++++++++++++- Lib/test/test_asyncio/test_selector_events.py | 40 +++++++++++++- 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 8f0d32f15f148a..1c8acd82a8773f 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -899,7 +899,10 @@ def __init__(self, loop, sock, protocol, waiter=None, self._eof = False self._paused = False self._empty_waiter = None - + if hasattr(socket.socket, 'sendmsg'): + self._write_ready = self._write_sendmsg + else: + self._write_ready = self._write_send # Disable the Nagle algorithm -- small writes will be # sent without waiting for the TCP ACK. This generally # decreases the latency (in some cases significantly.) @@ -1067,7 +1070,46 @@ def write(self, data): self._buffer.append(data) self._maybe_pause_protocol() - def _write_ready(self): + def _write_sendmsg(self): + assert self._buffer, 'Data should not be empty' + if self._conn_lost: + return + try: + n = self._sock.sendmsg(self._buffer) + self._adjust_leftover_buffer(n) + except (BlockingIOError, InterruptedError): + pass + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: + self._loop._remove_writer(self._sock_fd) + self._buffer.clear() + self._fatal_error(exc, 'Fatal write error on socket transport') + if self._empty_waiter is not None: + self._empty_waiter.set_exception(exc) + else: + self._maybe_resume_protocol() # May append to buffer. + if not self._buffer: + self._loop._remove_writer(self._sock_fd) + if self._empty_waiter is not None: + self._empty_waiter.set_result(None) + if self._closing: + self._call_connection_lost(None) + elif self._eof: + self._sock.shutdown(socket.SHUT_WR) + + def _adjust_leftover_buffer(self, n: int, /) -> None: + buffer = self._buffer + while n: + b = buffer.popleft() + b_len = len(b) + if b_len <= n: + n -= b_len + else: + buffer.appendleft(b[n:]) + break + + def _write_send(self): assert self._buffer, 'Data should not be empty' if self._conn_lost: return @@ -1105,6 +1147,15 @@ def write_eof(self): if not self._buffer: self._sock.shutdown(socket.SHUT_WR) + def writelines(self, list_of_data): + hasbuffer = len(self._buffer) + self._buffer.extend([memoryview(i) for i in list_of_data]) + if not hasbuffer: + # Optimization: try to send now + self._write_ready() + return + self._maybe_pause_protocol() + def can_write_eof(self): return True diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index ffa51c5ad4845d..84155bd3f7bbb9 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -497,9 +497,13 @@ def setUp(self): self.sock = mock.Mock(socket.socket) self.sock_fd = self.sock.fileno.return_value = 7 - def socket_transport(self, waiter=None): + def socket_transport(self, waiter=None, sendmsg=False): transport = _SelectorSocketTransport(self.loop, self.sock, self.protocol, waiter=waiter) + if sendmsg: + transport._write_ready = transport._write_sendmsg + else: + transport._write_ready = transport._write_send self.addCleanup(close_transport, transport) return transport @@ -733,6 +737,40 @@ def test_write_tryagain(self): self.loop.assert_writer(7, transport._write_ready) self.assertEqual(list_to_buffer([b'data']), transport._buffer) + def test_write_sendmsg_no_data(self): + self.sock.sendmsg = mock.Mock() + self.sock.sendmsg.return_value = 0 + transport = self.socket_transport(sendmsg=True) + transport._buffer.append(memoryview(b'data')) + transport.write(b'') + self.assertFalse(self.sock.sendmsg.called) + self.assertEqual(list_to_buffer([b'data']), transport._buffer) + + def test_write_sendmsg_full(self): + data = memoryview(b'data') + self.sock.sendmsg = mock.Mock() + self.sock.sendmsg.return_value = len(data) + + transport = self.socket_transport(sendmsg=True) + transport._buffer.append(data) + self.loop._add_writer(7, transport._write_ready) + transport._write_ready() + self.assertTrue(self.sock.sendmsg.called) + self.assertFalse(self.loop.writers) + + def test_write_sendmsg_partial(self): + data = memoryview(b'data') + self.sock.sendmsg = mock.Mock() + # Sent partial data + self.sock.sendmsg.return_value = len(data) // 2 + + transport = self.socket_transport(sendmsg=True) + transport._buffer.append(data) + self.loop._add_writer(7, transport._write_ready) + transport._write_ready() + self.assertTrue(self.sock.sendmsg.called) + self.assertTrue(self.loop.writers) + @mock.patch('asyncio.selector_events.logger') def test_write_exception(self, m_log): err = self.sock.send.side_effect = OSError() From 669b661a1aa5e404c9ddff5419643d9d4adbf5e1 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Tue, 15 Mar 2022 09:15:37 +0000 Subject: [PATCH 03/19] writelines implementation --- Lib/asyncio/selector_events.py | 13 +++++++------ Lib/test/test_asyncio/test_selector_events.py | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 1c8acd82a8773f..1a8e156dd1c152 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1148,13 +1148,14 @@ def write_eof(self): self._sock.shutdown(socket.SHUT_WR) def writelines(self, list_of_data): - hasbuffer = len(self._buffer) - self._buffer.extend([memoryview(i) for i in list_of_data]) - if not hasbuffer: - # Optimization: try to send now - self._write_ready() + if self._eof: + raise RuntimeError('Cannot call writelines() after write_eof()') + if self._empty_waiter is not None: + raise RuntimeError('unable to writelines; sendfile is in progress') + if not list_of_data: return - self._maybe_pause_protocol() + self._buffer.extend([memoryview(i) for i in list_of_data]) + self._loop._add_writer(self._sock_fd, self._write_ready) def can_write_eof(self): return True diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 84155bd3f7bbb9..fd28725e804189 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -810,7 +810,7 @@ def test_write_ready(self): self.sock.send.return_value = len(data) transport = self.socket_transport() - transport._buffer.extend(data) + transport._buffer.append(data) self.loop._add_writer(7, transport._write_ready) transport._write_ready() self.assertTrue(self.sock.send.called) From 0692952b2a1446c66836fffadead07e032cd1e7a Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Tue, 15 Mar 2022 13:22:18 +0000 Subject: [PATCH 04/19] use sysconf --- Lib/asyncio/selector_events.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 1a8e156dd1c152..3400b1227b1750 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -9,14 +9,12 @@ import collections import errno import functools +import itertools +import os import selectors import socket import warnings import weakref -try: - import ssl -except ImportError: # pragma: no cover - ssl = None from . import base_events from . import constants @@ -28,6 +26,10 @@ from . import trsock from .log import logger +HAVE_SENDMSG = hasattr(socket.socket, 'sendmsg') + +if HAVE_SENDMSG: + SC_IOV_MAX = os.sysconf('SC_IOV_MAX') def _test_selector_event(selector, fd, event): # Test if the selector is monitoring 'event' events @@ -899,7 +901,7 @@ def __init__(self, loop, sock, protocol, waiter=None, self._eof = False self._paused = False self._empty_waiter = None - if hasattr(socket.socket, 'sendmsg'): + if HAVE_SENDMSG: self._write_ready = self._write_sendmsg else: self._write_ready = self._write_send @@ -1070,12 +1072,15 @@ def write(self, data): self._buffer.append(data) self._maybe_pause_protocol() + def _get_sendmsg_buffer(self): + return itertools.islice(self._buffer, SC_IOV_MAX) + def _write_sendmsg(self): assert self._buffer, 'Data should not be empty' if self._conn_lost: return try: - n = self._sock.sendmsg(self._buffer) + n = self._sock.sendmsg(self._get_sendmsg_buffer()) self._adjust_leftover_buffer(n) except (BlockingIOError, InterruptedError): pass From 2725334aacf040bc44d2f93e6e8ce05712ba964e Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Tue, 15 Mar 2022 13:44:56 +0000 Subject: [PATCH 05/19] fix tests --- Lib/asyncio/selector_events.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 3400b1227b1750..4b36c194cb3c3c 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -15,6 +15,10 @@ import socket import warnings import weakref +try: + import ssl +except ImportError: # pragma: no cover + ssl = None from . import base_events from . import constants From bed096d0ec859c9a316525926cd9e75ccccaf0bd Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 24 Oct 2022 12:27:10 +0530 Subject: [PATCH 06/19] skip test if sendmsg does not exists --- Lib/test/test_asyncio/test_selector_events.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index fd28725e804189..b20e4bce17c27c 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -746,6 +746,7 @@ def test_write_sendmsg_no_data(self): self.assertFalse(self.sock.sendmsg.called) self.assertEqual(list_to_buffer([b'data']), transport._buffer) + @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'no sendmsg') def test_write_sendmsg_full(self): data = memoryview(b'data') self.sock.sendmsg = mock.Mock() @@ -758,7 +759,9 @@ def test_write_sendmsg_full(self): self.assertTrue(self.sock.sendmsg.called) self.assertFalse(self.loop.writers) + @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'no sendmsg') def test_write_sendmsg_partial(self): + data = memoryview(b'data') self.sock.sendmsg = mock.Mock() # Sent partial data From f090e8d9582a74a418d7cda7a7b6cd1743bb7155 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Mon, 24 Oct 2022 07:31:13 +0000 Subject: [PATCH 07/19] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2022-10-24-07-31-11.gh-issue-91166.-IG06R.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2022-10-24-07-31-11.gh-issue-91166.-IG06R.rst diff --git a/Misc/NEWS.d/next/Library/2022-10-24-07-31-11.gh-issue-91166.-IG06R.rst b/Misc/NEWS.d/next/Library/2022-10-24-07-31-11.gh-issue-91166.-IG06R.rst new file mode 100644 index 00000000000000..5ee08ec57843b5 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-10-24-07-31-11.gh-issue-91166.-IG06R.rst @@ -0,0 +1 @@ +:mod:`asyncio` is optimized to avoid excessive copying when writing to socket and use :meth:`~socket.socket.sendmsg` if the platform supports it. Patch by Kumar Aditya. From d6c77cd63ac0c541df5df1b9346d1ccb75ce40fd Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 24 Oct 2022 08:02:50 +0000 Subject: [PATCH 08/19] fix check on other platforms --- Lib/asyncio/selector_events.py | 6 +++++- Lib/test/test_asyncio/test_selector_events.py | 17 +++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 4b36c194cb3c3c..b96cc64f7d1772 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -33,7 +33,11 @@ HAVE_SENDMSG = hasattr(socket.socket, 'sendmsg') if HAVE_SENDMSG: - SC_IOV_MAX = os.sysconf('SC_IOV_MAX') + try: + SC_IOV_MAX = os.sysconf('SC_IOV_MAX') + except OSError: + # Fallback to send + HAVE_SENDMSG = False def _test_selector_event(selector, fd, event): # Test if the selector is monitoring 'event' events diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index b20e4bce17c27c..8075312026342b 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1,24 +1,25 @@ """Tests for selector_events.py""" import collections -import sys import selectors import socket +import sys import unittest +from asyncio import selector_events from unittest import mock + try: import ssl except ImportError: ssl = None import asyncio -from asyncio.selector_events import BaseSelectorEventLoop -from asyncio.selector_events import _SelectorTransport -from asyncio.selector_events import _SelectorSocketTransport -from asyncio.selector_events import _SelectorDatagramTransport +from asyncio.selector_events import (BaseSelectorEventLoop, + _SelectorDatagramTransport, + _SelectorSocketTransport, + _SelectorTransport) from test.test_asyncio import utils as test_utils - MOCK_ANY = mock.ANY @@ -746,7 +747,7 @@ def test_write_sendmsg_no_data(self): self.assertFalse(self.sock.sendmsg.called) self.assertEqual(list_to_buffer([b'data']), transport._buffer) - @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'no sendmsg') + @unittest.skipUnless(selector_events.HAVE_SENDMSG, 'no sendmsg') def test_write_sendmsg_full(self): data = memoryview(b'data') self.sock.sendmsg = mock.Mock() @@ -759,7 +760,7 @@ def test_write_sendmsg_full(self): self.assertTrue(self.sock.sendmsg.called) self.assertFalse(self.loop.writers) - @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'no sendmsg') + @unittest.skipUnless(selector_events.HAVE_SENDMSG, 'no sendmsg') def test_write_sendmsg_partial(self): data = memoryview(b'data') From f2ee40430a4aac7db7b76aa757d271b78bfdf2e6 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 28 Nov 2022 17:05:12 +0530 Subject: [PATCH 09/19] rename some vars --- Lib/asyncio/selector_events.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index b96cc64f7d1772..a905b4ead9649e 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -30,14 +30,14 @@ from . import trsock from .log import logger -HAVE_SENDMSG = hasattr(socket.socket, 'sendmsg') +_HAVE_SENDMSG = hasattr(socket.socket, 'sendmsg') -if HAVE_SENDMSG: +if _HAVE_SENDMSG: try: SC_IOV_MAX = os.sysconf('SC_IOV_MAX') except OSError: # Fallback to send - HAVE_SENDMSG = False + _HAVE_SENDMSG = False def _test_selector_event(selector, fd, event): # Test if the selector is monitoring 'event' events @@ -909,7 +909,7 @@ def __init__(self, loop, sock, protocol, waiter=None, self._eof = False self._paused = False self._empty_waiter = None - if HAVE_SENDMSG: + if _HAVE_SENDMSG: self._write_ready = self._write_sendmsg else: self._write_ready = self._write_send @@ -1088,8 +1088,8 @@ def _write_sendmsg(self): if self._conn_lost: return try: - n = self._sock.sendmsg(self._get_sendmsg_buffer()) - self._adjust_leftover_buffer(n) + nbytes = self._sock.sendmsg(self._get_sendmsg_buffer()) + self._adjust_leftover_buffer(nbytes) except (BlockingIOError, InterruptedError): pass except (SystemExit, KeyboardInterrupt): @@ -1111,15 +1111,15 @@ def _write_sendmsg(self): elif self._eof: self._sock.shutdown(socket.SHUT_WR) - def _adjust_leftover_buffer(self, n: int, /) -> None: + def _adjust_leftover_buffer(self, nbytes: int, /) -> None: buffer = self._buffer - while n: + while nbytes: b = buffer.popleft() b_len = len(b) - if b_len <= n: - n -= b_len + if b_len <= nbytes: + nbytes -= b_len else: - buffer.appendleft(b[n:]) + buffer.appendleft(b[nbytes:]) break def _write_send(self): From effab035d63528c21df5668995eec95b8e60176d Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 28 Nov 2022 17:07:19 +0530 Subject: [PATCH 10/19] _HAVE_SENDMSG -> _HAS_SENDMSG --- Lib/asyncio/selector_events.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index a905b4ead9649e..a8a3abdd0fd2bd 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -30,14 +30,14 @@ from . import trsock from .log import logger -_HAVE_SENDMSG = hasattr(socket.socket, 'sendmsg') +_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg') -if _HAVE_SENDMSG: +if _HAS_SENDMSG: try: SC_IOV_MAX = os.sysconf('SC_IOV_MAX') except OSError: # Fallback to send - _HAVE_SENDMSG = False + _HAS_SENDMSG = False def _test_selector_event(selector, fd, event): # Test if the selector is monitoring 'event' events @@ -909,7 +909,7 @@ def __init__(self, loop, sock, protocol, waiter=None, self._eof = False self._paused = False self._empty_waiter = None - if _HAVE_SENDMSG: + if _HAS_SENDMSG: self._write_ready = self._write_sendmsg else: self._write_ready = self._write_send From e1e43621d4e7c4c483cb62b9f5cd82b6ee4d010d Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 28 Nov 2022 17:31:44 +0530 Subject: [PATCH 11/19] fix tests --- Lib/asyncio/selector_events.py | 5 +++-- Lib/test/test_asyncio/test_selector_events.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index a8a3abdd0fd2bd..2519f2fe3d5258 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1111,7 +1111,7 @@ def _write_sendmsg(self): elif self._eof: self._sock.shutdown(socket.SHUT_WR) - def _adjust_leftover_buffer(self, nbytes: int, /) -> None: + def _adjust_leftover_buffer(self, nbytes: int) -> None: buffer = self._buffer while nbytes: b = buffer.popleft() @@ -1127,7 +1127,8 @@ def _write_send(self): if self._conn_lost: return try: - buffer = self._buffer.popleft() + buffer = bytearray().join(self._buffer) + self._buffer.clear() n = self._sock.send(buffer) if n != len(buffer): # Not all data was written diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 8075312026342b..701e77ffd9c17c 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -747,7 +747,7 @@ def test_write_sendmsg_no_data(self): self.assertFalse(self.sock.sendmsg.called) self.assertEqual(list_to_buffer([b'data']), transport._buffer) - @unittest.skipUnless(selector_events.HAVE_SENDMSG, 'no sendmsg') + @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg') def test_write_sendmsg_full(self): data = memoryview(b'data') self.sock.sendmsg = mock.Mock() @@ -760,7 +760,7 @@ def test_write_sendmsg_full(self): self.assertTrue(self.sock.sendmsg.called) self.assertFalse(self.loop.writers) - @unittest.skipUnless(selector_events.HAVE_SENDMSG, 'no sendmsg') + @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg') def test_write_sendmsg_partial(self): data = memoryview(b'data') From bdb1bdabf67ece7533ef5874c3bcf9897876243f Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 28 Nov 2022 17:43:54 +0530 Subject: [PATCH 12/19] fix tests --- Lib/asyncio/selector_events.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 2519f2fe3d5258..702c6948903527 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1127,8 +1127,7 @@ def _write_send(self): if self._conn_lost: return try: - buffer = bytearray().join(self._buffer) - self._buffer.clear() + buffer = self._buffer.popleft() n = self._sock.send(buffer) if n != len(buffer): # Not all data was written From d1fae6cdd3b3ba20dc5a2e2bae2f468844b8071d Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 28 Nov 2022 18:04:32 +0530 Subject: [PATCH 13/19] fix writelines and add comments --- Lib/asyncio/selector_events.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 702c6948903527..6fcf93c9f0f9a6 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1080,16 +1080,16 @@ def write(self, data): self._buffer.append(data) self._maybe_pause_protocol() - def _get_sendmsg_buffer(self): - return itertools.islice(self._buffer, SC_IOV_MAX) + def _get_sendmsg_buffer(self, buffer: collections.deque): + return itertools.islice(buffer, SC_IOV_MAX) def _write_sendmsg(self): assert self._buffer, 'Data should not be empty' if self._conn_lost: return try: - nbytes = self._sock.sendmsg(self._get_sendmsg_buffer()) - self._adjust_leftover_buffer(nbytes) + nbytes = self._sock.sendmsg(self._get_sendmsg_buffer(self._buffer)) + self._adjust_leftover_buffer(self._buffer, nbytes) except (BlockingIOError, InterruptedError): pass except (SystemExit, KeyboardInterrupt): @@ -1111,8 +1111,7 @@ def _write_sendmsg(self): elif self._eof: self._sock.shutdown(socket.SHUT_WR) - def _adjust_leftover_buffer(self, nbytes: int) -> None: - buffer = self._buffer + def _adjust_leftover_buffer(self, buffer: collections.deque, nbytes: int) -> None: while nbytes: b = buffer.popleft() b_len = len(b) @@ -1160,15 +1159,19 @@ def write_eof(self): if not self._buffer: self._sock.shutdown(socket.SHUT_WR) - def writelines(self, list_of_data): - if self._eof: - raise RuntimeError('Cannot call writelines() after write_eof()') - if self._empty_waiter is not None: - raise RuntimeError('unable to writelines; sendfile is in progress') - if not list_of_data: - return - self._buffer.extend([memoryview(i) for i in list_of_data]) - self._loop._add_writer(self._sock_fd, self._write_ready) + if _HAS_SENDMSG: + # Use faster implementation with sendmsg() if available otherwise fallback + # to the default implementation of writelines in WriteTransport + def writelines(self, list_of_data): + if self._eof: + raise RuntimeError('Cannot call writelines() after write_eof()') + if self._empty_waiter is not None: + raise RuntimeError('unable to writelines; sendfile is in progress') + if not list_of_data: + return + self._buffer.extend([memoryview(i) for i in list_of_data]) + self._write_sendmsg() + def can_write_eof(self): return True From cd45016633258d51051ab5da675b4e279438e125 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 28 Nov 2022 18:10:22 +0530 Subject: [PATCH 14/19] optimize calling --- Lib/asyncio/selector_events.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 6fcf93c9f0f9a6..c3f3e26fe43b77 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1080,16 +1080,16 @@ def write(self, data): self._buffer.append(data) self._maybe_pause_protocol() - def _get_sendmsg_buffer(self, buffer: collections.deque): - return itertools.islice(buffer, SC_IOV_MAX) + def _get_sendmsg_buffer(self): + return itertools.islice(self._buffer, SC_IOV_MAX) def _write_sendmsg(self): assert self._buffer, 'Data should not be empty' if self._conn_lost: return try: - nbytes = self._sock.sendmsg(self._get_sendmsg_buffer(self._buffer)) - self._adjust_leftover_buffer(self._buffer, nbytes) + nbytes = self._sock.sendmsg(self._get_sendmsg_buffer()) + self._adjust_leftover_buffer(nbytes) except (BlockingIOError, InterruptedError): pass except (SystemExit, KeyboardInterrupt): @@ -1111,7 +1111,8 @@ def _write_sendmsg(self): elif self._eof: self._sock.shutdown(socket.SHUT_WR) - def _adjust_leftover_buffer(self, buffer: collections.deque, nbytes: int) -> None: + def _adjust_leftover_buffer(self, nbytes: int) -> None: + buffer = self._buffer while nbytes: b = buffer.popleft() b_len = len(b) From 5b962f504ac92167eaf3db5cd2a192fac697ac6b Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 28 Nov 2022 18:14:03 +0530 Subject: [PATCH 15/19] use send if sendmsg does not exists in writelines --- Lib/asyncio/selector_events.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index c3f3e26fe43b77..3d17ec5b771b06 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1160,19 +1160,15 @@ def write_eof(self): if not self._buffer: self._sock.shutdown(socket.SHUT_WR) - if _HAS_SENDMSG: - # Use faster implementation with sendmsg() if available otherwise fallback - # to the default implementation of writelines in WriteTransport - def writelines(self, list_of_data): - if self._eof: - raise RuntimeError('Cannot call writelines() after write_eof()') - if self._empty_waiter is not None: - raise RuntimeError('unable to writelines; sendfile is in progress') - if not list_of_data: - return - self._buffer.extend([memoryview(i) for i in list_of_data]) - self._write_sendmsg() - + def writelines(self, list_of_data): + if self._eof: + raise RuntimeError('Cannot call writelines() after write_eof()') + if self._empty_waiter is not None: + raise RuntimeError('unable to writelines; sendfile is in progress') + if not list_of_data: + return + self._buffer.extend([memoryview(i) for i in list_of_data]) + self._write_ready() def can_write_eof(self): return True From 152b74899c8c5442d1180b220903312f938ddd2a Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Fri, 2 Dec 2022 18:34:52 +0530 Subject: [PATCH 16/19] Update Lib/asyncio/selector_events.py Co-authored-by: Guido van Rossum --- Lib/asyncio/selector_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index c1dff58c1fa073..b9a2d20d671b1e 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1175,7 +1175,7 @@ def writelines(self, list_of_data): raise RuntimeError('unable to writelines; sendfile is in progress') if not list_of_data: return - self._buffer.extend([memoryview(i) for i in list_of_data]) + self._buffer.extend([memoryview(data) for data in list_of_data]) self._write_ready() def can_write_eof(self): From 2c62bcb4bc9449937ad46288665ba36457beb681 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Tue, 13 Dec 2022 06:47:38 +0000 Subject: [PATCH 17/19] more tests --- Lib/asyncio/selector_events.py | 3 +- Lib/test/test_asyncio/test_selector_events.py | 32 ++++++++++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index b9a2d20d671b1e..3fd01a3c9ff582 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1058,7 +1058,6 @@ def write(self, data): raise RuntimeError('unable to write; sendfile is in progress') if not data: return - data = memoryview(data) if self._conn_lost: if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: @@ -1078,7 +1077,7 @@ def write(self, data): self._fatal_error(exc, 'Fatal write error on socket transport') return else: - data = data[n:] + data = memoryview(data)[n:] if not data: return # Not all was written; register write handler. diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 54164f0397a5da..08524d0e46c906 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -766,7 +766,7 @@ def test_write_sendmsg_partial(self): data = memoryview(b'data') self.sock.sendmsg = mock.Mock() # Sent partial data - self.sock.sendmsg.return_value = len(data) // 2 + self.sock.sendmsg.return_value = 2 transport = self.socket_transport(sendmsg=True) transport._buffer.append(data) @@ -774,6 +774,36 @@ def test_write_sendmsg_partial(self): transport._write_ready() self.assertTrue(self.sock.sendmsg.called) self.assertTrue(self.loop.writers) + self.assertEqual(list_to_buffer([b'ta']), transport._buffer) + + @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg') + def test_write_sendmsg_half_buffer(self): + data = [memoryview(b'data1'), memoryview(b'data2')] + self.sock.sendmsg = mock.Mock() + # Sent partial data + self.sock.sendmsg.return_value = 2 + + transport = self.socket_transport(sendmsg=True) + transport._buffer.extend(data) + self.loop._add_writer(7, transport._write_ready) + transport._write_ready() + self.assertTrue(self.sock.sendmsg.called) + self.assertTrue(self.loop.writers) + self.assertEqual(list_to_buffer([b'ta1', b'data2']), transport._buffer) + + @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg') + def test_write_sendmsg_OSError(self): + data = memoryview(b'data') + self.sock.sendmsg = mock.Mock() + self.sock.sendmsg.side_effect = OSError + + transport = self.socket_transport(sendmsg=True) + transport._buffer.extend(data) + self.loop._add_writer(7, transport._write_ready) + transport._write_ready() + self.assertTrue(self.sock.sendmsg.called) + self.assertFalse(self.loop.writers) + self.assertEqual(list_to_buffer([]), transport._buffer) @mock.patch('asyncio.selector_events.logger') def test_write_exception(self, m_log): From 9b92cfff1184c2aa390a7e1be60e8bbf17b1eee4 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Tue, 13 Dec 2022 06:59:48 +0000 Subject: [PATCH 18/19] check fatal error --- Lib/test/test_asyncio/test_selector_events.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 08524d0e46c906..5b6cd20eccc228 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -795,15 +795,19 @@ def test_write_sendmsg_half_buffer(self): def test_write_sendmsg_OSError(self): data = memoryview(b'data') self.sock.sendmsg = mock.Mock() - self.sock.sendmsg.side_effect = OSError + err = self.sock.sendmsg.side_effect = OSError() transport = self.socket_transport(sendmsg=True) + transport._fatal_error = mock.Mock() transport._buffer.extend(data) self.loop._add_writer(7, transport._write_ready) transport._write_ready() self.assertTrue(self.sock.sendmsg.called) self.assertFalse(self.loop.writers) self.assertEqual(list_to_buffer([]), transport._buffer) + transport._fatal_error.assert_called_with( + err, + 'Fatal write error on socket transport') @mock.patch('asyncio.selector_events.logger') def test_write_exception(self, m_log): From 2ca3571382cebeed65e4fa52098a01a5ac8927be Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Thu, 22 Dec 2022 13:23:25 +0000 Subject: [PATCH 19/19] code review --- Lib/test/test_asyncio/test_selector_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 5b6cd20eccc228..921c98a2702d76 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -800,7 +800,7 @@ def test_write_sendmsg_OSError(self): transport = self.socket_transport(sendmsg=True) transport._fatal_error = mock.Mock() transport._buffer.extend(data) - self.loop._add_writer(7, transport._write_ready) + # Calls _fatal_error and clears the buffer transport._write_ready() self.assertTrue(self.sock.sendmsg.called) self.assertFalse(self.loop.writers)