Skip to content

Change interaction with server to only send one SYNC message (Pgbouncer support) #493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion asyncpg/exceptions/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ def _make_constructor(cls, fields, query=None):
purpose;

* if you have no option of avoiding the use of pgbouncer,
then you must switch pgbouncer's pool_mode to "session".
then you can set statement_cache_size to 0 when creating
the asyncpg connection object.
""")

dct['hint'] = hint
Expand Down
18 changes: 12 additions & 6 deletions asyncpg/protocol/coreproto.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ cdef class CoreProtocol:
self.result = apg_exc.InternalClientError(
'unknown error in protocol implementation')

self._parse_msg_ready_for_query()
self._push_result()

else:
Expand Down Expand Up @@ -187,19 +188,23 @@ cdef class CoreProtocol:
elif mtype == b'T':
# Row description
self.result_row_desc = self.buffer.consume_message()
self._push_result()

elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()
# we don't send a sync during the parse/describe sequence
# but send a FLUSH instead. If an error happens we need to
# send a SYNC explicitly in order to mark the end of the transaction.
# this effectively clears the error and we then wait until we get a
# ready for new query message
self._write(SYNC_MESSAGE)
self.state = PROTOCOL_ERROR_CONSUME

elif mtype == b'n':
# NoData
self.buffer.discard_message()
self._push_result()

cdef _process__bind_execute(self, char mtype):
if mtype == b'D':
Expand Down Expand Up @@ -853,7 +858,7 @@ cdef class CoreProtocol:
buf.end_message()
packet.write_buffer(buf)

packet.write_bytes(SYNC_MESSAGE)
packet.write_bytes(FLUSH_MESSAGE)

self._write(packet)

Expand Down Expand Up @@ -1028,3 +1033,4 @@ cdef class CoreProtocol:


cdef bytes SYNC_MESSAGE = bytes(WriteBuffer.new_message(b'S').end_message())
cdef bytes FLUSH_MESSAGE = bytes(WriteBuffer.new_message(b'H').end_message())
7 changes: 7 additions & 0 deletions asyncpg/protocol/protocol.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,13 @@ cdef class BaseProtocol(CoreProtocol):
})
self.abort()

if self.state == PROTOCOL_PREPARE:
# we need to send a SYNC to server if we cancel during the PREPARE phase
# because the PREPARE sequence does not send a SYNC itself.
# we cannot send this extra SYNC if we are not in PREPARE phase,
# because then we would issue two SYNCs and we would get two ReadyForQuery
# replies, which our current state machine implementation cannot handle
self._write(SYNC_MESSAGE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably safe to send Sync unconditionally here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not - some of the testcases fail when done unconditionally (I first had it like that). We get into a situation where we may end up in IDLE state due to the first sync received, and then receive another ReadyForQuery message - the current state machine goes into error state when that happens

Copy link
Contributor Author

@fvannee fvannee Nov 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that may actually have to do with existing code line 120 - on receiving a Z message it doesn't parse the state. Isn't there supposed to be a:
self._parse_msg_ready_for_query()
on line 120?

edit: ah no, i'm mixing some things up now. We have to send it conditionally, because in IDLE state we cannot receive a Z message (unless we change _read_server_messages as well to allow receiving Z message in IDLE state).
Still, I think line 120 needs this parse_msg_ready_for_query line, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is suspicious. Server messages should always be read in full. I'd say it's OK to also receive any number of ReadyForQuery in the IDLE state. This should make it safe to send any number of Sync messages.

Copy link
Contributor Author

@fvannee fvannee Nov 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've experimented a bit with removing the if-statement and putting the ReadyForQuery handling in the state machine in IDLE state. However, it's giving me some warnings I'm not sure are good.
All test cases pass, but I see some extra lines like this after some test cases and I can't figure out exactly why it's not waiting for these.

Future exception was never retrieved
future: <Future finished exception=BrokenPipeError(32, 'Broken pipe')>
Traceback (most recent call last):
  File "/usr/lib64/python3.6/asyncio/selector_events.py", line 417, in _sock_sendall
    n = sock.send(data)
BrokenPipeError: [Errno 32] Broken pipe

I propose to keep this line as it is for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let's keep it, but please add a comment on why it's needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, done

self._set_state(PROTOCOL_CANCELLED)

def _on_timeout(self, fut):
Expand Down