1
1
"""OpenAPI core validation validators module"""
2
+ import re
2
3
from functools import cached_property
3
4
from typing import Any
4
5
from typing import Mapping
23
24
)
24
25
from openapi_core .protocols import Request
25
26
from openapi_core .protocols import WebhookRequest
26
- from openapi_core .schema .parameters import get_value
27
+ from openapi_core .schema .parameters import get_aslist
28
+ from openapi_core .schema .parameters import get_deep_object_value
29
+ from openapi_core .schema .parameters import get_explode
30
+ from openapi_core .schema .parameters import get_style
31
+ from openapi_core .schema .protocols import SuportsGetAll
32
+ from openapi_core .schema .protocols import SuportsGetList
27
33
from openapi_core .spec import Spec
28
34
from openapi_core .templating .media_types .datatypes import MediaType
29
35
from openapi_core .templating .paths .datatypes import PathOperationServer
@@ -70,10 +76,14 @@ def __init__(
70
76
self .extra_format_validators = extra_format_validators
71
77
self .extra_media_type_deserializers = extra_media_type_deserializers
72
78
73
- def _get_media_type (self , content : Spec , mimetype : str ) -> MediaType :
79
+ def _get_media_type (
80
+ self , content : Spec , mimetype : Optional [str ] = None
81
+ ) -> MediaType :
74
82
from openapi_core .templating .media_types .finders import MediaTypeFinder
75
83
76
84
finder = MediaTypeFinder (content )
85
+ if mimetype is None :
86
+ return finder .get_first ()
77
87
return finder .find (mimetype )
78
88
79
89
def _deserialise_media_type (self , mimetype : str , value : Any ) -> Any :
@@ -99,25 +109,65 @@ def _validate_schema(self, schema: Spec, value: Any) -> None:
99
109
)
100
110
validator .validate (value )
101
111
102
- def _get_param_or_header_value (
112
+ def _get_param_or_header (
113
+ self ,
114
+ param_or_header : Spec ,
115
+ location : Mapping [str , Any ],
116
+ name : Optional [str ] = None ,
117
+ ) -> Any :
118
+ # Simple scenario
119
+ if "content" not in param_or_header :
120
+ return self ._get_simple_value (param_or_header , location , name = name )
121
+
122
+ # Complex scenario
123
+ return self ._get_complex (param_or_header , location , name = name )
124
+
125
+ def _get_simple_value (
126
+ self ,
127
+ param_or_header : Spec ,
128
+ location : Mapping [str , Any ],
129
+ name : Optional [str ] = None ,
130
+ ) -> Any :
131
+ try :
132
+ raw = self ._get_style_value (param_or_header , location , name = name )
133
+ except KeyError :
134
+ # in simple scenrios schema always exist
135
+ schema = param_or_header / "schema"
136
+ if "default" not in schema :
137
+ raise
138
+ raw = schema ["default" ]
139
+ return self ._get_param_or_header_value (raw , param_or_header )
140
+
141
+ def _get_complex (
103
142
self ,
104
143
param_or_header : Spec ,
105
144
location : Mapping [str , Any ],
106
145
name : Optional [str ] = None ,
146
+ ) -> Any :
147
+ content = param_or_header / "content"
148
+ # no point to catch KetError
149
+ # in complex scenrios schema doesn't exist
150
+ raw = self ._get_media_type_value (param_or_header , location , name = name )
151
+ return self ._get_content_value (raw , content )
152
+
153
+ def _get_param_or_header_value (
154
+ self ,
155
+ raw : Any ,
156
+ param_or_header : Spec ,
107
157
) -> Any :
108
158
casted , schema = self ._get_param_or_header_value_and_schema (
109
- param_or_header , location , name
159
+ raw , param_or_header
110
160
)
111
161
if schema is None :
112
162
return casted
113
163
self ._validate_schema (schema , casted )
114
164
return casted
115
165
116
166
def _get_content_value (
117
- self , raw : Any , mimetype : str , content : Spec
167
+ self , raw : Any , content : Spec , mimetype : Optional [ str ] = None
118
168
) -> Any :
119
169
casted , schema = self ._get_content_value_and_schema (
120
- raw , mimetype , content
170
+ raw , content , mimetype
121
171
)
122
172
if schema is None :
123
173
return casted
@@ -126,42 +176,22 @@ def _get_content_value(
126
176
127
177
def _get_param_or_header_value_and_schema (
128
178
self ,
179
+ raw : Any ,
129
180
param_or_header : Spec ,
130
- location : Mapping [str , Any ],
131
- name : Optional [str ] = None ,
132
181
) -> Tuple [Any , Spec ]:
133
- try :
134
- raw_value = get_value (param_or_header , location , name = name )
135
- except KeyError :
136
- if "schema" not in param_or_header :
137
- raise
138
- schema = param_or_header / "schema"
139
- if "default" not in schema :
140
- raise
141
- casted = schema ["default" ]
142
- else :
143
- # Simple scenario
144
- if "content" not in param_or_header :
145
- deserialised = self ._deserialise_style (
146
- param_or_header , raw_value
147
- )
148
- schema = param_or_header / "schema"
149
- # Complex scenario
150
- else :
151
- content = param_or_header / "content"
152
- mimetype , media_type = next (content .items ())
153
- deserialised = self ._deserialise_media_type (
154
- mimetype , raw_value
155
- )
156
- schema = media_type / "schema"
157
- casted = self ._cast (schema , deserialised )
182
+ deserialised = self ._deserialise_style (param_or_header , raw )
183
+ schema = param_or_header / "schema"
184
+ casted = self ._cast (schema , deserialised )
158
185
return casted , schema
159
186
160
187
def _get_content_value_and_schema (
161
- self , raw : Any , mimetype : str , content : Spec
188
+ self ,
189
+ raw : Any ,
190
+ content : Spec ,
191
+ mimetype : Optional [str ] = None ,
162
192
) -> Tuple [Any , Optional [Spec ]]:
163
- media_type , mimetype = self ._get_media_type (content , mimetype )
164
- deserialised = self ._deserialise_media_type (mimetype , raw )
193
+ media_type , mime_type = self ._get_media_type (content , mimetype )
194
+ deserialised = self ._deserialise_media_type (mime_type , raw )
165
195
casted = self ._cast (media_type , deserialised )
166
196
167
197
if "schema" not in media_type :
@@ -170,6 +200,45 @@ def _get_content_value_and_schema(
170
200
schema = media_type / "schema"
171
201
return casted , schema
172
202
203
+ def _get_style_value (
204
+ self ,
205
+ param_or_header : Spec ,
206
+ location : Mapping [str , Any ],
207
+ name : Optional [str ] = None ,
208
+ ) -> Any :
209
+ name = name or param_or_header ["name" ]
210
+ style = get_style (param_or_header )
211
+ if name not in location :
212
+ # Only check if the name is not in the location if the style of
213
+ # the param is deepObject,this is because deepObjects will never be found
214
+ # as their key also includes the properties of the object already.
215
+ if style != "deepObject" :
216
+ raise KeyError
217
+ keys_str = " " .join (location .keys ())
218
+ if not re .search (rf"{ name } \[\w+\]" , keys_str ):
219
+ raise KeyError
220
+
221
+ aslist = get_aslist (param_or_header )
222
+ explode = get_explode (param_or_header )
223
+ if aslist and explode :
224
+ if style == "deepObject" :
225
+ return get_deep_object_value (location , name )
226
+ if isinstance (location , SuportsGetAll ):
227
+ return location .getall (name )
228
+ if isinstance (location , SuportsGetList ):
229
+ return location .getlist (name )
230
+
231
+ return location [name ]
232
+
233
+ def _get_media_type_value (
234
+ self ,
235
+ param_or_header : Spec ,
236
+ location : Mapping [str , Any ],
237
+ name : Optional [str ] = None ,
238
+ ) -> Any :
239
+ name = name or param_or_header ["name" ]
240
+ return location [name ]
241
+
173
242
174
243
class BaseAPICallValidator (BaseValidator ):
175
244
@cached_property
0 commit comments