Skip to content

Commit e6878cf

Browse files
committed
Invalidate statement cache on schema changes affecting statement result.
PostgreSQL will raise an exception when it detects that the result type of the query has changed from when the statement was prepared. This may happen, for example, after an ALTER TABLE or SET search_path. When this happens, and there is no transaction running, we can simply re-prepare the statement and try again. If the transaction _is_ running, this error will put it into an error state, and we have no choice but to raise an exception. The original error is somewhat cryptic, so we raise a custom InvalidCachedStatementError with the original server exception as context. In either case we clear the statement cache for this connection and all other connections of the pool this connection belongs to (if any). See #72 and #76 for discussion. Fixes: #72.
1 parent 703d9cf commit e6878cf

File tree

7 files changed

+215
-28
lines changed

7 files changed

+215
-28
lines changed

asyncpg/connection.py

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,9 @@ async def _get_statement(self, query, timeout):
226226
if not state.closed:
227227
return state
228228

229+
return await self._prepare_statement(query, timeout, cache)
230+
231+
async def _prepare_statement(self, query, timeout, cache):
229232
protocol = self._protocol
230233
state = await protocol.prepare(None, query, timeout)
231234

@@ -283,10 +286,7 @@ async def fetch(self, query, *args, timeout=None) -> list:
283286
284287
:return list: A list of :class:`Record` instances.
285288
"""
286-
stmt = await self._get_statement(query, timeout)
287-
data = await self._protocol.bind_execute(stmt, args, '', 0,
288-
False, timeout)
289-
return data
289+
return await self._do_execute(query, args, 0, timeout)
290290

291291
async def fetchval(self, query, *args, column=0, timeout=None):
292292
"""Run a query and return a value in the first row.
@@ -302,9 +302,7 @@ async def fetchval(self, query, *args, column=0, timeout=None):
302302
303303
:return: The value of the specified column of the first record.
304304
"""
305-
stmt = await self._get_statement(query, timeout)
306-
data = await self._protocol.bind_execute(stmt, args, '', 1,
307-
False, timeout)
305+
data = await self._do_execute(query, args, 1, timeout)
308306
if not data:
309307
return None
310308
return data[0][column]
@@ -318,9 +316,7 @@ async def fetchrow(self, query, *args, timeout=None):
318316
319317
:return: The first row as a :class:`Record` instance.
320318
"""
321-
stmt = await self._get_statement(query, timeout)
322-
data = await self._protocol.bind_execute(stmt, args, '', 1,
323-
False, timeout)
319+
data = await self._do_execute(query, args, 1, timeout)
324320
if not data:
325321
return None
326322
return data[0]
@@ -551,6 +547,69 @@ def _set_proxy(self, proxy):
551547

552548
self._proxy = proxy
553549

550+
def _drop_local_statement_cache(self):
551+
self._stmt_cache.clear()
552+
553+
def _drop_global_statement_cache(self):
554+
if self._proxy is not None:
555+
# This connection is a member of a pool, so we delegate
556+
# the cache drop to the pool.
557+
pool = self._proxy._holder._pool
558+
pool._drop_statement_cache()
559+
else:
560+
self._drop_local_statement_cache()
561+
562+
async def _do_execute(self, query, args, limit, timeout):
563+
stmt = await self._get_statement(query, timeout)
564+
565+
try:
566+
data = await self._protocol.bind_execute(
567+
stmt, args, '', limit, False, timeout)
568+
569+
except exceptions.FeatureNotSupportedError as e:
570+
if e.server_source_function == 'RevalidateCachedQuery':
571+
# PostgreSQL will raise an exception when it detects
572+
# that the result type of the query has changed from
573+
# when the statement was prepared. This may happen,
574+
# for example, after an ALTER TABLE or SET search_path.
575+
#
576+
# When this happens, and there is no transaction running,
577+
# we can simply re-prepare the statement and try again.
578+
#
579+
# If the transaction _is_ running, this error will put it
580+
# into an error state, and we have no choice but to raise
581+
# an exception. The original error is somewhat cryptic,
582+
# so we raise a custom error with the server exception as
583+
# context.
584+
#
585+
# In either case we clear the statement cache for this
586+
# connection and all other connections of the pool this
587+
# connection belongs to (if any).
588+
#
589+
# Note that we specifically do not rely on the error
590+
# message, as it is localizable.
591+
#
592+
# See https://github.com/MagicStack/asyncpg/issues/72
593+
# and https://github.com/MagicStack/asyncpg/issues/76
594+
# for discussion.
595+
#
596+
self._drop_global_statement_cache()
597+
598+
if self._protocol.is_in_transaction():
599+
raise exceptions.InvalidCachedStatementError(
600+
'cached statement plan is invalid due to a database '
601+
'schema or configuration change'
602+
) from e
603+
else:
604+
stmt = await self._prepare_statement(
605+
query, timeout, True)
606+
data = await self._protocol.bind_execute(
607+
stmt, args, '', limit, False, timeout)
608+
else:
609+
raise
610+
611+
return data
612+
554613

555614
async def connect(dsn=None, *,
556615
host=None, port=None,

asyncpg/exceptions/_base.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010

1111
__all__ = ('PostgresError', 'FatalPostgresError', 'UnknownPostgresError',
12-
'InterfaceError')
12+
'InterfaceError', 'InvalidCachedStatementError')
1313

1414

1515
class PostgresMessageMeta(type):
@@ -105,5 +105,13 @@ class UnknownPostgresError(FatalPostgresError):
105105
"""An error with an unknown SQLSTATE code."""
106106

107107

108-
class InterfaceError(Exception):
108+
class AsyncpgError(Exception):
109+
"""An error raised by asyncpg, not PostgreSQL."""
110+
111+
112+
class InterfaceError(AsyncpgError):
109113
"""An error caused by improper use of asyncpg API."""
114+
115+
116+
class InvalidCachedStatementError(AsyncpgError):
117+
"""Cached statement is not longer valid."""

asyncpg/pool.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,12 @@ def _check_init(self):
404404
if self._closed:
405405
raise exceptions.InterfaceError('pool is closed')
406406

407+
def _drop_statement_cache(self):
408+
# Drop statement cache for all connections in the pool.
409+
for ch in self._holders:
410+
if ch._con is not None:
411+
ch._con._drop_local_statement_cache()
412+
407413
def __await__(self):
408414
return self._async__init__().__await__()
409415

asyncpg/prepared_stmt.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,7 @@ async def fetch(self, *args, timeout=None):
154154
155155
:return: A list of :class:`Record` instances.
156156
"""
157-
self.__check_open()
158-
protocol = self._connection._protocol
159-
data, status, _ = await protocol.bind_execute(
160-
self._state, args, '', 0, True, timeout)
161-
self._last_status = status
157+
data = await self.__bind_execute(args, 0, timeout)
162158
return data
163159

164160
async def fetchval(self, *args, column=0, timeout=None):
@@ -174,11 +170,7 @@ async def fetchval(self, *args, column=0, timeout=None):
174170
175171
:return: The value of the specified column of the first record.
176172
"""
177-
self.__check_open()
178-
protocol = self._connection._protocol
179-
data, status, _ = await protocol.bind_execute(
180-
self._state, args, '', 1, True, timeout)
181-
self._last_status = status
173+
data = await self.__bind_execute(args, 1, timeout)
182174
if not data:
183175
return None
184176
return data[0][column]
@@ -192,15 +184,32 @@ async def fetchrow(self, *args, timeout=None):
192184
193185
:return: The first row as a :class:`Record` instance.
194186
"""
195-
self.__check_open()
196-
protocol = self._connection._protocol
197-
data, status, _ = await protocol.bind_execute(
198-
self._state, args, '', 1, True, timeout)
199-
self._last_status = status
187+
data = await self.__bind_execute(args, 1, timeout)
200188
if not data:
201189
return None
202190
return data[0]
203191

192+
async def __bind_execute(self, args, limit, timeout):
193+
self.__check_open()
194+
protocol = self._connection._protocol
195+
196+
try:
197+
data, status, _ = await protocol.bind_execute(
198+
self._state, args, '', limit, True, timeout)
199+
except exceptions.FeatureNotSupportedError as e:
200+
if e.server_source_function == 'RevalidateCachedQuery':
201+
# Handle query plan rejection due to schema or config changes.
202+
# See comment in Connection._do_execute.
203+
raise exceptions.InvalidCachedStatementError(
204+
'prepared statement is invalid due to a database '
205+
'schema or configuration change'
206+
) from e
207+
else:
208+
raise
209+
210+
self._last_status = status
211+
return data
212+
204213
def __check_open(self):
205214
if self._state.closed:
206215
raise exceptions.InterfaceError('prepared statement is closed')

asyncpg/protocol/protocol.pyx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ cdef class BaseProtocol(CoreProtocol):
121121
return self.settings
122122

123123
def is_in_transaction(self):
124-
return self.xact_status == PQTRANS_INTRANS
124+
return self.xact_status in (PQTRANS_INTRANS, PQTRANS_INERROR)
125125

126126
async def prepare(self, stmt_name, query, timeout):
127127
if self.cancel_waiter is not None:

tests/test_cache_invalidation.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# Copyright (C) 2016-present the ayncpg authors and contributors
2+
# <see AUTHORS file>
3+
#
4+
# This module is part of asyncpg and is released under
5+
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
6+
7+
8+
import asyncpg
9+
from asyncpg import _testbase as tb
10+
11+
12+
class TestCacheInvalidation(tb.ConnectedTestCase):
13+
async def test_prepare_cache_invalidation_silent(self):
14+
await self.con.execute('CREATE TABLE tab1(a int, b int)')
15+
16+
try:
17+
await self.con.execute('INSERT INTO tab1 VALUES (1, 2)')
18+
result = await self.con.fetchrow('SELECT * FROM tab1')
19+
self.assertEqual(result, (1, 2))
20+
21+
await self.con.execute(
22+
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
23+
24+
result = await self.con.fetchrow('SELECT * FROM tab1')
25+
self.assertEqual(result, (1, '2'))
26+
finally:
27+
await self.con.execute('DROP TABLE tab1')
28+
29+
async def test_prepare_cache_invalidation_in_transaction(self):
30+
await self.con.execute('CREATE TABLE tab1(a int, b int)')
31+
32+
try:
33+
await self.con.execute('INSERT INTO tab1 VALUES (1, 2)')
34+
result = await self.con.fetchrow('SELECT * FROM tab1')
35+
self.assertEqual(result, (1, 2))
36+
37+
await self.con.execute(
38+
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
39+
40+
with self.assertRaisesRegex(asyncpg.InvalidCachedStatementError,
41+
'cached statement plan is invalid'):
42+
async with self.con.transaction():
43+
result = await self.con.fetchrow('SELECT * FROM tab1')
44+
45+
# This is now OK,
46+
result = await self.con.fetchrow('SELECT * FROM tab1')
47+
self.assertEqual(result, (1, '2'))
48+
finally:
49+
await self.con.execute('DROP TABLE tab1')
50+
51+
async def test_prepare_cache_invalidation_in_pool(self):
52+
pool = await self.create_pool(database='postgres',
53+
min_size=2, max_size=2)
54+
55+
con1 = await pool.acquire()
56+
con2 = await pool.acquire()
57+
58+
await self.con.execute('CREATE TABLE tab1(a int, b int)')
59+
60+
try:
61+
await self.con.execute('INSERT INTO tab1 VALUES (1, 2)')
62+
63+
result = await con1.fetchrow('SELECT * FROM tab1')
64+
self.assertEqual(result, (1, 2))
65+
66+
result = await con2.fetchrow('SELECT * FROM tab1')
67+
self.assertEqual(result, (1, 2))
68+
69+
await self.con.execute(
70+
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
71+
72+
# con1 tries the same plan, will invalidate the cache
73+
# for the entire pool.
74+
result = await con1.fetchrow('SELECT * FROM tab1')
75+
self.assertEqual(result, (1, '2'))
76+
77+
async with con2.transaction():
78+
# This should work, as con1 should have invalidated
79+
# the plan cache.
80+
result = await con2.fetchrow('SELECT * FROM tab1')
81+
self.assertEqual(result, (1, '2'))
82+
83+
finally:
84+
await self.con.execute('DROP TABLE tab1')
85+
await pool.release(con2)
86+
await pool.release(con1)
87+
await pool.close()

tests/test_prepare.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,3 +406,21 @@ async def test_prepare_22_empty(self):
406406
result = await self.con.fetchrow('SELECT')
407407
self.assertEqual(result, ())
408408
self.assertEqual(repr(result), '<Record>')
409+
410+
async def test_prepare_statement_invalid(self):
411+
await self.con.execute('CREATE TABLE tab1(a int, b int)')
412+
413+
try:
414+
await self.con.execute('INSERT INTO tab1 VALUES (1, 2)')
415+
416+
stmt = await self.con.prepare('SELECT * FROM tab1')
417+
418+
await self.con.execute(
419+
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
420+
421+
with self.assertRaisesRegex(asyncpg.InvalidCachedStatementError,
422+
'prepared statement is invalid'):
423+
await stmt.fetchrow()
424+
425+
finally:
426+
await self.con.execute('DROP TABLE tab1')

0 commit comments

Comments
 (0)