Skip to content

Implement support for multirange types #851

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion asyncpg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ def __init__(self, protocol, transport, loop,
self._server_caps = _detect_server_capabilities(
self._server_version, settings)

self._intro_query = introspection.INTRO_LOOKUP_TYPES
if self._server_version < (14, 0):
self._intro_query = introspection.INTRO_LOOKUP_TYPES_13
else:
self._intro_query = introspection.INTRO_LOOKUP_TYPES

self._reset_query = None
self._proxy = None
Expand Down
125 changes: 124 additions & 1 deletion asyncpg/introspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0


_TYPEINFO = '''\
_TYPEINFO_13 = '''\
(
SELECT
t.oid AS oid,
Expand Down Expand Up @@ -82,6 +82,129 @@
'''


INTRO_LOOKUP_TYPES_13 = '''\
WITH RECURSIVE typeinfo_tree(
oid, ns, name, kind, basetype, elemtype, elemdelim,
range_subtype, attrtypoids, attrnames, depth)
AS (
SELECT
ti.oid, ti.ns, ti.name, ti.kind, ti.basetype,
ti.elemtype, ti.elemdelim, ti.range_subtype,
ti.attrtypoids, ti.attrnames, 0
FROM
{typeinfo} AS ti
WHERE
ti.oid = any($1::oid[])

UNION ALL

SELECT
ti.oid, ti.ns, ti.name, ti.kind, ti.basetype,
ti.elemtype, ti.elemdelim, ti.range_subtype,
ti.attrtypoids, ti.attrnames, tt.depth + 1
FROM
{typeinfo} ti,
typeinfo_tree tt
WHERE
(tt.elemtype IS NOT NULL AND ti.oid = tt.elemtype)
OR (tt.attrtypoids IS NOT NULL AND ti.oid = any(tt.attrtypoids))
OR (tt.range_subtype IS NOT NULL AND ti.oid = tt.range_subtype)
)

SELECT DISTINCT
*,
basetype::regtype::text AS basetype_name,
elemtype::regtype::text AS elemtype_name,
range_subtype::regtype::text AS range_subtype_name
FROM
typeinfo_tree
ORDER BY
depth DESC
'''.format(typeinfo=_TYPEINFO_13)


_TYPEINFO = '''\
(
SELECT
t.oid AS oid,
ns.nspname AS ns,
t.typname AS name,
t.typtype AS kind,
(CASE WHEN t.typtype = 'd' THEN
(WITH RECURSIVE typebases(oid, depth) AS (
SELECT
t2.typbasetype AS oid,
0 AS depth
FROM
pg_type t2
WHERE
t2.oid = t.oid

UNION ALL

SELECT
t2.typbasetype AS oid,
tb.depth + 1 AS depth
FROM
pg_type t2,
typebases tb
WHERE
tb.oid = t2.oid
AND t2.typbasetype != 0
) SELECT oid FROM typebases ORDER BY depth DESC LIMIT 1)

ELSE NULL
END) AS basetype,
t.typelem AS elemtype,
elem_t.typdelim AS elemdelim,
COALESCE(
range_t.rngsubtype,
multirange_t.rngsubtype) AS range_subtype,
(CASE WHEN t.typtype = 'c' THEN
(SELECT
array_agg(ia.atttypid ORDER BY ia.attnum)
FROM
pg_attribute ia
INNER JOIN pg_class c
ON (ia.attrelid = c.oid)
WHERE
ia.attnum > 0 AND NOT ia.attisdropped
AND c.reltype = t.oid)

ELSE NULL
END) AS attrtypoids,
(CASE WHEN t.typtype = 'c' THEN
(SELECT
array_agg(ia.attname::text ORDER BY ia.attnum)
FROM
pg_attribute ia
INNER JOIN pg_class c
ON (ia.attrelid = c.oid)
WHERE
ia.attnum > 0 AND NOT ia.attisdropped
AND c.reltype = t.oid)

ELSE NULL
END) AS attrnames
FROM
pg_catalog.pg_type AS t
INNER JOIN pg_catalog.pg_namespace ns ON (
ns.oid = t.typnamespace)
LEFT JOIN pg_type elem_t ON (
t.typlen = -1 AND
t.typelem != 0 AND
t.typelem = elem_t.oid
)
LEFT JOIN pg_range range_t ON (
t.oid = range_t.rngtypid
)
LEFT JOIN pg_range multirange_t ON (
t.oid = multirange_t.rngmultitypid
)
)
'''


INTRO_LOOKUP_TYPES = '''\
WITH RECURSIVE typeinfo_tree(
oid, ns, name, kind, basetype, elemtype, elemdelim,
Expand Down
12 changes: 0 additions & 12 deletions asyncpg/protocol/codecs/array.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -858,19 +858,7 @@ cdef arraytext_decode(ConnectionSettings settings, FRBuffer *buf):
return array_decode(settings, buf, <decode_func_ex>&text_decode_ex, NULL)


cdef anyarray_decode(ConnectionSettings settings, FRBuffer *buf):
# Instances of anyarray (or any other polymorphic pseudotype) are
# never supposed to be returned from actual queries.
raise exceptions.ProtocolError(
'unexpected instance of \'anyarray\' type')


cdef init_array_codecs():
register_core_codec(ANYARRAYOID,
NULL,
<decode_func>&anyarray_decode,
PG_FORMAT_BINARY)

# oid[] and text[] are registered as core codecs
# to make type introspection query work
#
Expand Down
24 changes: 18 additions & 6 deletions asyncpg/protocol/codecs/base.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ ctypedef object (*codec_decode_func)(Codec codec,


cdef enum CodecType:
CODEC_UNDEFINED = 0
CODEC_C = 1
CODEC_PY = 2
CODEC_ARRAY = 3
CODEC_COMPOSITE = 4
CODEC_RANGE = 5
CODEC_UNDEFINED = 0
CODEC_C = 1
CODEC_PY = 2
CODEC_ARRAY = 3
CODEC_COMPOSITE = 4
CODEC_RANGE = 5
CODEC_MULTIRANGE = 6


cdef enum ServerDataFormat:
Expand Down Expand Up @@ -95,6 +96,9 @@ cdef class Codec:
cdef encode_range(self, ConnectionSettings settings, WriteBuffer buf,
object obj)

cdef encode_multirange(self, ConnectionSettings settings, WriteBuffer buf,
object obj)

cdef encode_composite(self, ConnectionSettings settings, WriteBuffer buf,
object obj)

Expand All @@ -109,6 +113,8 @@ cdef class Codec:

cdef decode_range(self, ConnectionSettings settings, FRBuffer *buf)

cdef decode_multirange(self, ConnectionSettings settings, FRBuffer *buf)

cdef decode_composite(self, ConnectionSettings settings, FRBuffer *buf)

cdef decode_in_python(self, ConnectionSettings settings, FRBuffer *buf)
Expand Down Expand Up @@ -139,6 +145,12 @@ cdef class Codec:
str schema,
Codec element_codec)

@staticmethod
cdef Codec new_multirange_codec(uint32_t oid,
str name,
str schema,
Codec element_codec)

@staticmethod
cdef Codec new_composite_codec(uint32_t oid,
str name,
Expand Down
56 changes: 54 additions & 2 deletions asyncpg/protocol/codecs/base.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ cdef class Codec:
'range types is not supported'.format(schema, name))
self.encoder = <codec_encode_func>&self.encode_range
self.decoder = <codec_decode_func>&self.decode_range
elif type == CODEC_MULTIRANGE:
if format != PG_FORMAT_BINARY:
raise exceptions.UnsupportedClientFeatureError(
'cannot decode type "{}"."{}": text encoding of '
'range types is not supported'.format(schema, name))
self.encoder = <codec_encode_func>&self.encode_multirange
self.decoder = <codec_decode_func>&self.decode_multirange
elif type == CODEC_COMPOSITE:
if format != PG_FORMAT_BINARY:
raise exceptions.UnsupportedClientFeatureError(
Expand Down Expand Up @@ -122,6 +129,12 @@ cdef class Codec:
codec_encode_func_ex,
<void*>(<cpython.PyObject>self.element_codec))

cdef encode_multirange(self, ConnectionSettings settings, WriteBuffer buf,
object obj):
multirange_encode(settings, buf, obj, self.element_codec.oid,
codec_encode_func_ex,
<void*>(<cpython.PyObject>self.element_codec))

cdef encode_composite(self, ConnectionSettings settings, WriteBuffer buf,
object obj):
cdef:
Expand Down Expand Up @@ -209,6 +222,10 @@ cdef class Codec:
return range_decode(settings, buf, codec_decode_func_ex,
<void*>(<cpython.PyObject>self.element_codec))

cdef decode_multirange(self, ConnectionSettings settings, FRBuffer *buf):
return multirange_decode(settings, buf, codec_decode_func_ex,
<void*>(<cpython.PyObject>self.element_codec))

cdef decode_composite(self, ConnectionSettings settings,
FRBuffer *buf):
cdef:
Expand Down Expand Up @@ -294,7 +311,11 @@ cdef class Codec:
if self.c_encoder is not NULL or self.py_encoder is not None:
return True

elif self.type == CODEC_ARRAY or self.type == CODEC_RANGE:
elif (
self.type == CODEC_ARRAY
or self.type == CODEC_RANGE
or self.type == CODEC_MULTIRANGE
):
return self.element_codec.has_encoder()

elif self.type == CODEC_COMPOSITE:
Expand All @@ -312,7 +333,11 @@ cdef class Codec:
if self.c_decoder is not NULL or self.py_decoder is not None:
return True

elif self.type == CODEC_ARRAY or self.type == CODEC_RANGE:
elif (
self.type == CODEC_ARRAY
or self.type == CODEC_RANGE
or self.type == CODEC_MULTIRANGE
):
return self.element_codec.has_decoder()

elif self.type == CODEC_COMPOSITE:
Expand Down Expand Up @@ -358,6 +383,18 @@ cdef class Codec:
None, None, None, 0)
return codec

@staticmethod
cdef Codec new_multirange_codec(uint32_t oid,
str name,
str schema,
Codec element_codec):
cdef Codec codec
codec = Codec(oid)
codec.init(name, schema, 'multirange', CODEC_MULTIRANGE,
element_codec.format, PG_XFORMAT_OBJECT, NULL, NULL,
None, None, element_codec, None, None, None, 0)
return codec

@staticmethod
cdef Codec new_composite_codec(uint32_t oid,
str name,
Expand Down Expand Up @@ -536,6 +573,21 @@ cdef class DataCodecConfig:
self._derived_type_codecs[oid, elem_codec.format] = \
Codec.new_range_codec(oid, name, schema, elem_codec)

elif ti['kind'] == b'm':
# Multirange type

if not range_subtype_oid:
raise exceptions.InternalClientError(
f'type record missing base type for multirange {oid}')

elem_codec = self.get_codec(range_subtype_oid, PG_FORMAT_ANY)
if elem_codec is None:
elem_codec = self.declare_fallback_codec(
range_subtype_oid, ti['range_subtype_name'], schema)

self._derived_type_codecs[oid, elem_codec.format] = \
Codec.new_multirange_codec(oid, name, schema, elem_codec)

elif ti['kind'] == b'e':
# Enum types are essentially text
self._set_builtin_type_codec(oid, name, schema, 'scalar',
Expand Down
18 changes: 16 additions & 2 deletions asyncpg/protocol/codecs/pgproto.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,9 @@ cdef init_pseudo_codecs():
FDW_HANDLEROID, TSM_HANDLEROID, INTERNALOID, OPAQUEOID,
ANYELEMENTOID, ANYNONARRAYOID, ANYCOMPATIBLEOID,
ANYCOMPATIBLEARRAYOID, ANYCOMPATIBLENONARRAYOID,
ANYCOMPATIBLERANGEOID, PG_DDL_COMMANDOID, INDEX_AM_HANDLEROID,
TABLE_AM_HANDLEROID,
ANYCOMPATIBLERANGEOID, ANYCOMPATIBLEMULTIRANGEOID,
ANYRANGEOID, ANYMULTIRANGEOID, ANYARRAYOID,
PG_DDL_COMMANDOID, INDEX_AM_HANDLEROID, TABLE_AM_HANDLEROID,
]

register_core_codec(ANYENUMOID,
Expand Down Expand Up @@ -330,6 +331,19 @@ cdef init_pseudo_codecs():
<decode_func>pgproto.bytea_decode,
PG_FORMAT_BINARY)

# These two are internal to BRIN index support and are unlikely
# to be sent, but since I/O functions for these exist, add decoders
# nonetheless.
register_core_codec(PG_BRIN_BLOOM_SUMMARYOID,
NULL,
<decode_func>pgproto.bytea_decode,
PG_FORMAT_BINARY)

register_core_codec(PG_BRIN_MINMAX_MULTI_SUMMARYOID,
NULL,
<decode_func>pgproto.bytea_decode,
PG_FORMAT_BINARY)


cdef init_text_codecs():
textoids = [
Expand Down
Loading