1
1
"""OpenAPI core validation validators module"""
2
+ import re
2
3
from functools import cached_property
3
4
from typing import Any
4
5
from typing import Mapping
23
24
)
24
25
from openapi_core .protocols import Request
25
26
from openapi_core .protocols import WebhookRequest
26
- from openapi_core .schema .parameters import get_value
27
+ from openapi_core .schema .parameters import get_aslist
28
+ from openapi_core .schema .parameters import get_deep_object_value
29
+ from openapi_core .schema .parameters import get_explode
30
+ from openapi_core .schema .parameters import get_style
31
+ from openapi_core .schema .protocols import SuportsGetAll
32
+ from openapi_core .schema .protocols import SuportsGetList
27
33
from openapi_core .spec import Spec
28
34
from openapi_core .templating .media_types .datatypes import MediaType
29
35
from openapi_core .templating .paths .datatypes import PathOperationServer
@@ -70,10 +76,14 @@ def __init__(
70
76
self .extra_format_validators = extra_format_validators
71
77
self .extra_media_type_deserializers = extra_media_type_deserializers
72
78
73
- def _get_media_type (self , content : Spec , mimetype : str ) -> MediaType :
79
+ def _get_media_type (
80
+ self , content : Spec , mimetype : Optional [str ] = None
81
+ ) -> MediaType :
74
82
from openapi_core .templating .media_types .finders import MediaTypeFinder
75
83
76
84
finder = MediaTypeFinder (content )
85
+ if mimetype is None :
86
+ return finder .get_first ()
77
87
return finder .find (mimetype )
78
88
79
89
def _deserialise_media_type (self , mimetype : str , value : Any ) -> Any :
@@ -99,25 +109,74 @@ def _validate_schema(self, schema: Spec, value: Any) -> None:
99
109
)
100
110
validator .validate (value )
101
111
102
- def _get_param_or_header_value (
112
+ def _get_param_or_header (
113
+ self ,
114
+ param_or_header : Spec ,
115
+ location : Mapping [str , Any ],
116
+ name : Optional [str ] = None ,
117
+ ) -> Any :
118
+ # Simple scenario
119
+ if "content" not in param_or_header :
120
+ return self ._get_simple_value (param_or_header , location , name = name )
121
+
122
+ # Complex scenario
123
+ return self ._get_complex (param_or_header , location , name = name )
124
+
125
+ def _get_simple_value (
103
126
self ,
104
127
param_or_header : Spec ,
105
128
location : Mapping [str , Any ],
106
129
name : Optional [str ] = None ,
130
+ ) -> Any :
131
+ try :
132
+ raw = self ._get_style_value (param_or_header , location , name = name )
133
+ except KeyError :
134
+ if "schema" not in param_or_header :
135
+ raise
136
+ schema = param_or_header / "schema"
137
+ if "default" not in schema :
138
+ raise
139
+ raw = schema ["default" ]
140
+ return self ._get_param_or_header_value (raw , param_or_header )
141
+
142
+ def _get_complex (
143
+ self ,
144
+ param_or_header : Spec ,
145
+ location : Mapping [str , Any ],
146
+ name : Optional [str ] = None ,
147
+ ) -> Any :
148
+ content = param_or_header / "content"
149
+ try :
150
+ raw = self ._get_media_type_value (
151
+ param_or_header , location , name = name
152
+ )
153
+ except KeyError :
154
+ if "schema" not in param_or_header :
155
+ raise
156
+ schema = param_or_header / "schema"
157
+ if "default" not in schema :
158
+ raise
159
+ raw = schema ["default" ]
160
+ return self ._get_content_value (raw , content )
161
+
162
+ def _get_param_or_header_value (
163
+ self ,
164
+ raw : Any ,
165
+ param_or_header : Spec ,
107
166
) -> Any :
108
167
casted , schema = self ._get_param_or_header_value_and_schema (
109
- param_or_header , location , name
168
+ raw , param_or_header
110
169
)
111
170
if schema is None :
112
171
return casted
113
172
self ._validate_schema (schema , casted )
114
173
return casted
115
174
116
175
def _get_content_value (
117
- self , raw : Any , mimetype : str , content : Spec
176
+ self , raw : Any , content : Spec , mimetype : Optional [ str ] = None
118
177
) -> Any :
119
178
casted , schema = self ._get_content_value_and_schema (
120
- raw , mimetype , content
179
+ raw , content , mimetype
121
180
)
122
181
if schema is None :
123
182
return casted
@@ -126,42 +185,22 @@ def _get_content_value(
126
185
127
186
def _get_param_or_header_value_and_schema (
128
187
self ,
188
+ raw : Any ,
129
189
param_or_header : Spec ,
130
- location : Mapping [str , Any ],
131
- name : Optional [str ] = None ,
132
190
) -> Tuple [Any , Spec ]:
133
- try :
134
- raw_value = get_value (param_or_header , location , name = name )
135
- except KeyError :
136
- if "schema" not in param_or_header :
137
- raise
138
- schema = param_or_header / "schema"
139
- if "default" not in schema :
140
- raise
141
- casted = schema ["default" ]
142
- else :
143
- # Simple scenario
144
- if "content" not in param_or_header :
145
- deserialised = self ._deserialise_style (
146
- param_or_header , raw_value
147
- )
148
- schema = param_or_header / "schema"
149
- # Complex scenario
150
- else :
151
- content = param_or_header / "content"
152
- mimetype , media_type = next (content .items ())
153
- deserialised = self ._deserialise_media_type (
154
- mimetype , raw_value
155
- )
156
- schema = media_type / "schema"
157
- casted = self ._cast (schema , deserialised )
191
+ deserialised = self ._deserialise_style (param_or_header , raw )
192
+ schema = param_or_header / "schema"
193
+ casted = self ._cast (schema , deserialised )
158
194
return casted , schema
159
195
160
196
def _get_content_value_and_schema (
161
- self , raw : Any , mimetype : str , content : Spec
197
+ self ,
198
+ raw : Any ,
199
+ content : Spec ,
200
+ mimetype : Optional [str ] = None ,
162
201
) -> Tuple [Any , Optional [Spec ]]:
163
- media_type , mimetype = self ._get_media_type (content , mimetype )
164
- deserialised = self ._deserialise_media_type (mimetype , raw )
202
+ media_type , mime_type = self ._get_media_type (content , mimetype )
203
+ deserialised = self ._deserialise_media_type (mime_type , raw )
165
204
casted = self ._cast (media_type , deserialised )
166
205
167
206
if "schema" not in media_type :
@@ -170,6 +209,45 @@ def _get_content_value_and_schema(
170
209
schema = media_type / "schema"
171
210
return casted , schema
172
211
212
+ def _get_style_value (
213
+ self ,
214
+ param_or_header : Spec ,
215
+ location : Mapping [str , Any ],
216
+ name : Optional [str ] = None ,
217
+ ) -> Any :
218
+ name = name or param_or_header ["name" ]
219
+ style = get_style (param_or_header )
220
+ if name not in location :
221
+ # Only check if the name is not in the location if the style of
222
+ # the param is deepObject,this is because deepObjects will never be found
223
+ # as their key also includes the properties of the object already.
224
+ if style != "deepObject" :
225
+ raise KeyError
226
+ keys_str = " " .join (location .keys ())
227
+ if not re .search (rf"{ name } \[\w+\]" , keys_str ):
228
+ raise KeyError
229
+
230
+ aslist = get_aslist (param_or_header )
231
+ explode = get_explode (param_or_header )
232
+ if aslist and explode :
233
+ if style == "deepObject" :
234
+ return get_deep_object_value (location , name )
235
+ if isinstance (location , SuportsGetAll ):
236
+ return location .getall (name )
237
+ if isinstance (location , SuportsGetList ):
238
+ return location .getlist (name )
239
+
240
+ return location [name ]
241
+
242
+ def _get_media_type_value (
243
+ self ,
244
+ param_or_header : Spec ,
245
+ location : Mapping [str , Any ],
246
+ name : Optional [str ] = None ,
247
+ ) -> Any :
248
+ name = name or param_or_header ["name" ]
249
+ return location [name ]
250
+
173
251
174
252
class BaseAPICallValidator (BaseValidator ):
175
253
@cached_property
0 commit comments