Skip to content

Commit b043fbd

Browse files
fvanneeelprans
authored andcommitted
Support PgBouncer by sending only a single SYNC message per query
This is a simple implementation of PgBouncer support for asyncpg. It doesn't have any detection features, but at least changes asyncpg behavior in such a way that using PgBouncer is possible. This commit gets rid of the explicit SYNC after a parse/describe sequence and changes is to a FLUSH. This should work regardless of the setting of statement_cache_size and whether or not it's pgbouncer or a direct postgres connection. With this, PgBouncer is supported when setting statement_cache_size explicitly to 0.
1 parent c7c0007 commit b043fbd

File tree

3 files changed

+21
-7
lines changed

3 files changed

+21
-7
lines changed

asyncpg/exceptions/_base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ def _make_constructor(cls, fields, query=None):
142142
purpose;
143143
144144
* if you have no option of avoiding the use of pgbouncer,
145-
then you must switch pgbouncer's pool_mode to "session".
145+
then you can set statement_cache_size to 0 when creating
146+
the asyncpg connection object.
146147
""")
147148

148149
dct['hint'] = hint

asyncpg/protocol/coreproto.pyx

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ cdef class CoreProtocol:
118118
self.result = apg_exc.InternalClientError(
119119
'unknown error in protocol implementation')
120120

121+
self._parse_msg_ready_for_query()
121122
self._push_result()
122123

123124
else:
@@ -187,19 +188,23 @@ cdef class CoreProtocol:
187188
elif mtype == b'T':
188189
# Row description
189190
self.result_row_desc = self.buffer.consume_message()
191+
self._push_result()
190192

191193
elif mtype == b'E':
192194
# ErrorResponse
193195
self._parse_msg_error_response(True)
194-
195-
elif mtype == b'Z':
196-
# ReadyForQuery
197-
self._parse_msg_ready_for_query()
198-
self._push_result()
196+
# we don't send a sync during the parse/describe sequence
197+
# but send a FLUSH instead. If an error happens we need to
198+
# send a SYNC explicitly in order to mark the end of the transaction.
199+
# this effectively clears the error and we then wait until we get a
200+
# ready for new query message
201+
self._write(SYNC_MESSAGE)
202+
self.state = PROTOCOL_ERROR_CONSUME
199203

200204
elif mtype == b'n':
201205
# NoData
202206
self.buffer.discard_message()
207+
self._push_result()
203208

204209
cdef _process__bind_execute(self, char mtype):
205210
if mtype == b'D':
@@ -853,7 +858,7 @@ cdef class CoreProtocol:
853858
buf.end_message()
854859
packet.write_buffer(buf)
855860

856-
packet.write_bytes(SYNC_MESSAGE)
861+
packet.write_bytes(FLUSH_MESSAGE)
857862

858863
self._write(packet)
859864

@@ -1028,3 +1033,4 @@ cdef class CoreProtocol:
10281033

10291034

10301035
cdef bytes SYNC_MESSAGE = bytes(WriteBuffer.new_message(b'S').end_message())
1036+
cdef bytes FLUSH_MESSAGE = bytes(WriteBuffer.new_message(b'H').end_message())

asyncpg/protocol/protocol.pyx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,13 @@ cdef class BaseProtocol(CoreProtocol):
588588
})
589589
self.abort()
590590

591+
if self.state == PROTOCOL_PREPARE:
592+
# we need to send a SYNC to server if we cancel during the PREPARE phase
593+
# because the PREPARE sequence does not send a SYNC itself.
594+
# we cannot send this extra SYNC if we are not in PREPARE phase,
595+
# because then we would issue two SYNCs and we would get two ReadyForQuery
596+
# replies, which our current state machine implementation cannot handle
597+
self._write(SYNC_MESSAGE)
591598
self._set_state(PROTOCOL_CANCELLED)
592599

593600
def _on_timeout(self, fut):

0 commit comments

Comments
 (0)