Skip to content

Commit 4210cfd

Browse files
committed
Support PgBouncer by sending only a single SYNC message per query
This is a simple implementation of PgBouncer support for asyncpg. It doesn't have any detection features, but at least changes asyncpg behavior in such a way that using PgBouncer is possible. This commit gets rid of the explicit SYNC after a parse/describe sequence and changes is to a FLUSH. This should work regardless of the setting of statement_cache_size and whether or not it's pgbouncer or a direct postgres connection. With this, PgBouncer is supported when setting statement_cache_size explicitly to 0.
1 parent c7c0007 commit 4210cfd

File tree

3 files changed

+11
-7
lines changed

3 files changed

+11
-7
lines changed

asyncpg/exceptions/_base.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ def _make_constructor(cls, fields, query=None):
142142
purpose;
143143
144144
* if you have no option of avoiding the use of pgbouncer,
145-
then you must switch pgbouncer's pool_mode to "session".
145+
then you can set statement_cache_size to 0 when creating
146+
the asyncpg connection object.
146147
""")
147148

148149
dct['hint'] = hint

asyncpg/protocol/coreproto.pyx

+7-6
Original file line numberDiff line numberDiff line change
@@ -187,19 +187,19 @@ cdef class CoreProtocol:
187187
elif mtype == b'T':
188188
# Row description
189189
self.result_row_desc = self.buffer.consume_message()
190+
self._push_result()
190191

191192
elif mtype == b'E':
192193
# ErrorResponse
193194
self._parse_msg_error_response(True)
194-
195-
elif mtype == b'Z':
196-
# ReadyForQuery
197-
self._parse_msg_ready_for_query()
198-
self._push_result()
195+
# force explicit sync upon error
196+
self._write(SYNC_MESSAGE)
197+
self._set_state(PROTOCOL_CANCELLED)
199198

200199
elif mtype == b'n':
201200
# NoData
202201
self.buffer.discard_message()
202+
self._push_result()
203203

204204
cdef _process__bind_execute(self, char mtype):
205205
if mtype == b'D':
@@ -853,7 +853,7 @@ cdef class CoreProtocol:
853853
buf.end_message()
854854
packet.write_buffer(buf)
855855

856-
packet.write_bytes(SYNC_MESSAGE)
856+
packet.write_bytes(FLUSH_MESSAGE)
857857

858858
self._write(packet)
859859

@@ -1028,3 +1028,4 @@ cdef class CoreProtocol:
10281028

10291029

10301030
cdef bytes SYNC_MESSAGE = bytes(WriteBuffer.new_message(b'S').end_message())
1031+
cdef bytes FLUSH_MESSAGE = bytes(WriteBuffer.new_message(b'H').end_message())

asyncpg/protocol/protocol.pyx

+2
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,8 @@ cdef class BaseProtocol(CoreProtocol):
588588
})
589589
self.abort()
590590

591+
if self.state == PROTOCOL_PREPARE:
592+
self._write(SYNC_MESSAGE)
591593
self._set_state(PROTOCOL_CANCELLED)
592594

593595
def _on_timeout(self, fut):

0 commit comments

Comments
 (0)