1
1
from typing import TYPE_CHECKING
2
2
from typing import Any
3
3
from typing import Callable
4
+ from typing import Generic
5
+ from typing import Iterable
4
6
from typing import List
7
+ from typing import Mapping
8
+ from typing import Optional
9
+ from typing import Type
10
+ from typing import TypeVar
11
+ from typing import Union
5
12
6
13
from jsonschema_path import SchemaPath
7
14
8
15
from openapi_core .casting .schemas .datatypes import CasterCallable
9
16
from openapi_core .casting .schemas .exceptions import CastError
17
+ from openapi_core .schema .schemas import get_properties
18
+ from openapi_core .validation .schemas .validators import SchemaValidator
10
19
11
20
if TYPE_CHECKING :
12
21
from openapi_core .casting .schemas .factories import SchemaCastersFactory
13
22
14
23
15
- class BaseSchemaCaster :
16
- def __init__ (self , schema : SchemaPath ):
24
+ class PrimitiveCaster :
25
+ def __init__ (
26
+ self ,
27
+ schema : SchemaPath ,
28
+ schema_validator : SchemaValidator ,
29
+ schema_caster : "SchemaCaster" ,
30
+ ):
17
31
self .schema = schema
32
+ self .schema_validator = schema_validator
33
+ self .schema_caster = schema_caster
18
34
19
35
def __call__ (self , value : Any ) -> Any :
20
36
if value is None :
@@ -26,42 +42,206 @@ def cast(self, value: Any) -> Any:
26
42
raise NotImplementedError
27
43
28
44
29
- class CallableSchemaCaster (BaseSchemaCaster ):
30
- def __init__ (self , schema : SchemaPath , caster_callable : CasterCallable ):
31
- super ().__init__ (schema )
32
- self .caster_callable = caster_callable
45
+ class DummyCaster (PrimitiveCaster ):
46
+ def cast (self , value : Any ) -> Any :
47
+ return value
48
+
49
+
50
+ PrimitiveType = TypeVar ("PrimitiveType" )
51
+
52
+
53
+ class PrimitiveTypeCaster (Generic [PrimitiveType ], PrimitiveCaster ):
54
+ primitive_type : Type [PrimitiveType ] = NotImplemented
33
55
34
56
def cast (self , value : Any ) -> Any :
57
+ if isinstance (value , self .primitive_type ):
58
+ return value
59
+
60
+ if not isinstance (value , (str , bytes )):
61
+ raise CastError (value , self .schema ["type" ])
62
+
35
63
try :
36
- return self .caster_callable (value )
64
+ return self .primitive_type (value ) # type: ignore [call-arg]
37
65
except (ValueError , TypeError ):
38
66
raise CastError (value , self .schema ["type" ])
39
67
40
68
41
- class DummyCaster (BaseSchemaCaster ):
42
- def cast (self , value : Any ) -> Any :
43
- return value
69
+ class IntegerCaster (PrimitiveTypeCaster [int ]):
70
+ primitive_type = int
44
71
45
72
46
- class ComplexCaster (BaseSchemaCaster ):
47
- def __init__ (
48
- self , schema : SchemaPath , casters_factory : "SchemaCastersFactory"
49
- ):
50
- super ().__init__ (schema )
51
- self .casters_factory = casters_factory
73
+ class NumberCaster (PrimitiveTypeCaster [float ]):
74
+ primitive_type = float
75
+
52
76
77
+ class BooleanCaster (PrimitiveTypeCaster [bool ]):
78
+ primitive_type = bool
53
79
54
- class ArrayCaster (ComplexCaster ):
80
+
81
+ class ArrayCaster (PrimitiveCaster ):
55
82
@property
56
- def items_caster (self ) -> BaseSchemaCaster :
57
- return self .casters_factory .create (self .schema / "items" )
83
+ def items_caster (self ) -> "SchemaCaster" :
84
+ # sometimes we don't have any schema i.e. free-form objects
85
+ items_schema = self .schema .get ("items" , SchemaPath .from_dict ({}))
86
+ return self .schema_caster .evolve (items_schema )
58
87
59
88
def cast (self , value : Any ) -> List [Any ]:
60
89
# str and bytes are not arrays according to the OpenAPI spec
61
- if isinstance (value , (str , bytes )):
90
+ if isinstance (value , (str , bytes )) or not isinstance ( value , Iterable ) :
62
91
raise CastError (value , self .schema ["type" ])
63
92
64
93
try :
65
- return list (map (self .items_caster , value ))
94
+ return list (map (self .items_caster . cast , value ))
66
95
except (ValueError , TypeError ):
67
96
raise CastError (value , self .schema ["type" ])
97
+
98
+
99
+ class ObjectCaster (PrimitiveCaster ):
100
+ def cast (self , value : Any ) -> Any :
101
+ return self ._cast_proparties (value )
102
+
103
+ def evolve (self , schema : SchemaPath ) -> "ObjectCaster" :
104
+ cls = self .__class__
105
+
106
+ return cls (
107
+ schema ,
108
+ self .schema_validator .evolve (schema ),
109
+ self .schema_caster .evolve (schema ),
110
+ )
111
+
112
+ def _cast_proparties (self , value : Any , schema_only : bool = False ) -> Any :
113
+ if not isinstance (value , dict ):
114
+ raise CastError (value , self .schema ["type" ])
115
+
116
+ one_of_schema = self .schema_validator .get_one_of_schema (value )
117
+ if one_of_schema is not None :
118
+ one_of_properties = self .evolve (one_of_schema )._cast_proparties (
119
+ value , schema_only = True
120
+ )
121
+ value .update (one_of_properties )
122
+
123
+ any_of_schemas = self .schema_validator .iter_any_of_schemas (value )
124
+ for any_of_schema in any_of_schemas :
125
+ any_of_properties = self .evolve (any_of_schema )._cast_proparties (
126
+ value , schema_only = True
127
+ )
128
+ value .update (any_of_properties )
129
+
130
+ all_of_schemas = self .schema_validator .iter_all_of_schemas (value )
131
+ for all_of_schema in all_of_schemas :
132
+ all_of_properties = self .evolve (all_of_schema )._cast_proparties (
133
+ value , schema_only = True
134
+ )
135
+ value .update (all_of_properties )
136
+
137
+ for prop_name , prop_schema in get_properties (self .schema ).items ():
138
+ try :
139
+ prop_value = value [prop_name ]
140
+ except KeyError :
141
+ if "default" not in prop_schema :
142
+ continue
143
+ prop_value = prop_schema ["default" ]
144
+
145
+ value [prop_name ] = self .schema_caster .evolve (prop_schema ).cast (
146
+ prop_value
147
+ )
148
+
149
+ if schema_only :
150
+ return value
151
+
152
+ additional_properties = self .schema .getkey (
153
+ "additionalProperties" , True
154
+ )
155
+ if additional_properties is not False :
156
+ # free-form object
157
+ if additional_properties is True :
158
+ additional_prop_schema = SchemaPath .from_dict (
159
+ {"nullable" : True }
160
+ )
161
+ # defined schema
162
+ else :
163
+ additional_prop_schema = self .schema / "additionalProperties"
164
+ additional_prop_caster = self .schema_caster .evolve (
165
+ additional_prop_schema
166
+ )
167
+ for prop_name , prop_value in value .items ():
168
+ if prop_name in value :
169
+ continue
170
+ value [prop_name ] = additional_prop_caster .cast (prop_value )
171
+
172
+ return value
173
+
174
+
175
+ class TypesCaster :
176
+ casters : Mapping [str , Type [PrimitiveCaster ]] = {}
177
+ multi : Optional [Type [PrimitiveCaster ]] = None
178
+
179
+ def __init__ (
180
+ self ,
181
+ casters : Mapping [str , Type [PrimitiveCaster ]],
182
+ default : Type [PrimitiveCaster ],
183
+ multi : Optional [Type [PrimitiveCaster ]] = None ,
184
+ ):
185
+ self .casters = casters
186
+ self .default = default
187
+ self .multi = multi
188
+
189
+ def get_types (self ) -> List [str ]:
190
+ return list (self .casters .keys ())
191
+
192
+ def get_caster (
193
+ self ,
194
+ schema_type : Optional [Union [Iterable [str ], str ]],
195
+ ) -> Type ["PrimitiveCaster" ]:
196
+ if schema_type is None :
197
+ return self .default
198
+ if isinstance (schema_type , Iterable ) and not isinstance (
199
+ schema_type , str
200
+ ):
201
+ if self .multi is None :
202
+ raise TypeError ("caster does not accept multiple types" )
203
+ return self .multi
204
+
205
+ return self .casters [schema_type ]
206
+
207
+
208
+ class SchemaCaster :
209
+ def __init__ (
210
+ self ,
211
+ schema : SchemaPath ,
212
+ schema_validator : SchemaValidator ,
213
+ types_caster : TypesCaster ,
214
+ ):
215
+ self .schema = schema
216
+ self .schema_validator = schema_validator
217
+
218
+ self .types_caster = types_caster
219
+
220
+ def cast (self , value : Any ) -> Any :
221
+ # skip casting for nullable in OpenAPI 3.0
222
+ if value is None and self .schema .getkey ("nullable" , False ):
223
+ return value
224
+
225
+ schema_type = self .schema .getkey ("type" )
226
+ type_caster = self .get_type_caster (schema_type )
227
+ return type_caster (value )
228
+
229
+ def get_type_caster (
230
+ self ,
231
+ schema_type : Optional [Union [Iterable [str ], str ]],
232
+ ) -> PrimitiveCaster :
233
+ caster_cls = self .types_caster .get_caster (schema_type )
234
+ return caster_cls (
235
+ self .schema ,
236
+ self .schema_validator ,
237
+ self ,
238
+ )
239
+
240
+ def evolve (self , schema : SchemaPath ) -> "SchemaCaster" :
241
+ cls = self .__class__
242
+
243
+ return cls (
244
+ schema ,
245
+ self .schema_validator .evolve (schema ),
246
+ self .types_caster ,
247
+ )
0 commit comments