diff --git a/attackcti/attack_api.py b/attackcti/attack_api.py index bfced2c..ae515da 100644 --- a/attackcti/attack_api.py +++ b/attackcti/attack_api.py @@ -339,12 +339,13 @@ def get_enterprise(self, stix_format=True): enterprise_stix_objects[key] = self.translate_stix_objects(enterprise_stix_objects[key]) return enterprise_stix_objects - def get_enterprise_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, stix_format=True): + def get_enterprise_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, enrich_data_sources = False, stix_format=True): """ Extracts all the available techniques STIX objects in the Enterprise ATT&CK matrix Args: skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. include_subtechniques (bool): default True. Include techniques and sub-techniques STIX objects. + enrich_data_sources (bool): default False. Adds data component and data source context to each technqiue. stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique') Returns: @@ -362,6 +363,9 @@ def get_enterprise_techniques(self, skip_revoked_deprecated=True, include_subtec if skip_revoked_deprecated: enterprise_techniques = self.remove_revoked_deprecated(enterprise_techniques) + if enrich_data_sources: + enterprise_techniques = self.enrich_data_sources(enterprise_techniques) + if not stix_format: enterprise_techniques = self.translate_stix_objects(enterprise_techniques) return enterprise_techniques @@ -923,12 +927,13 @@ def get_stix_objects(self, stix_format=True): enterprise_objects[enterkey] = self.translate_stix_objects(enterprise_objects[enterkey]) return enterprise_objects - def get_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, stix_format=True): + def get_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, enrich_data_sources =False, stix_format=True): """ Extracts all the available techniques STIX objects across all ATT&CK matrices Args: skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. include_subtechniques (bool): default True. Include techniques and sub-techniques STIX objects. + enrich_data_sources (bool): default False. Adds data component and data source context to each technqiue. stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique') Returns: @@ -946,8 +951,12 @@ def get_techniques(self, skip_revoked_deprecated=True, include_subtechniques=Tru if skip_revoked_deprecated: all_techniques = self.remove_revoked_deprecated(all_techniques) + if enrich_data_sources: + all_techniques = self.enrich_data_sources(all_techniques) + if not stix_format: all_techniques = self.translate_stix_objects(all_techniques) + return all_techniques def get_groups(self, stix_format=True): @@ -1030,19 +1039,36 @@ def get_software(self, stix_format=True): all_software = self.translate_stix_objects(all_software) return all_software - def get_relationships(self, stix_format=True): + def get_relationships(self, relationship_type = 'all' , stix_format=True): """ Extracts all the available relationships STIX objects across all ATT&CK matrices Args: stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique') - + relationship_type (string): Type of relationship (uses, mitigates, subtechnique-of, detects, revoked-by) + Reference: https://github.com/mitre/cti/blob/master/USAGE.md#relationships Returns: List of STIX objects """ - all_relationships = self.COMPOSITE_DS.query(Filter("type", "=", "relationship")) + + relationship_types = ['all','uses', 'mitigates', 'subtechnique-of', 'detects', 'revoked-by'] + + if relationship_type not in relationship_types: + raise ValueError("Argument relationship_type: Possible values are 'all','uses', 'mitigates', 'subtechnique-of', 'detects', 'revoked-by'") + + else: + + if relationship_type == 'all': + all_relationships = self.COMPOSITE_DS.query(Filter("type", "=", "relationship")) + + else: + all_relationships = self.COMPOSITE_DS.query( + Filter("type", "=", "relationship"), + Filter("relationship_type", "=", relationship_type)) + if not stix_format: all_relationships = self.translate_stix_objects(all_relationships) + return all_relationships def get_tactics(self, stix_format=True): @@ -1071,8 +1097,10 @@ def get_data_sources(self, stix_format=True): """ data_sources = self.get_enterprise_data_sources() + if not stix_format: data_sources = self.translate_stix_objects(data_sources) + return data_sources # ******** Custom Functions ******** @@ -1830,3 +1858,70 @@ def get_data_components_by_data_source(self, stix_object, stix_format=True): if not stix_format: data_components = self.translate_stix_objects(data_components) return data_components + + def get_data_source_by_data_component(self, stix_object, stix_format=True): + """ Extracts data source STIX object referenced by a data component STIX object. + + Args: + stix_object (Stix object) : STIX Object data component to retrieve data source SITX objects from. + stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique') + + Returns: + List of STIX objects + + """ + + filter_objects = [ + Filter('type', '=', 'x-mitre-data-source'), + Filter('id', '=', stix_object['x_mitre_data_source_ref']) + ] + + data_source = self.COMPOSITE_DS.query(filter_objects) + + if not stix_format: + data_source = self.translate_stix_objects(data_source) + + return data_source + + def enrich_data_sources(self, stix_object): + """ Adds data sources context to STIX Object Technique. + + Args: + stix_object (List of Stix objects) : STIX Object technique to retrieve data source and data component SITX objects context from. + + Returns: + List of STIX objects + + """ + # Getting relationships of type 'detects': data component -> detects -> technique + all_relationships = self.get_relationships(relationship_type="detects") + + # Creating a dictionary where keys are technique_id and values are a list of dictionaries (Each dictionary makes reference to a data_component_id) + technique_components_dict = dict() + + for relationship in all_relationships: + if relationship['target_ref'] not in technique_components_dict.keys(): + technique_components_dict[relationship['target_ref']] = [{'data_component_id':relationship['source_ref']}] + else: + technique_components_dict[relationship['target_ref']].append({'data_component_id':relationship['source_ref']}) + + # Getting all data sources objects + data_sources = self.get_data_sources() + + # Creating a dictionary where the keys are data_component_id and values are dictionaries (Each dictionary makes reference to data component name, data source id, and data source name) + component_source_dict = dict() + for data_source in data_sources: + for data_component in data_source['data_components']: + component_source_dict[data_component['id']] = {'data_component':data_component['name'], 'data_source_id':data_source['id'],'data_source':data_source['name']} + + # Updating technique_components_dict (Adding data component name, data source id, and data source name) + for technique_id in technique_components_dict.keys(): + for component_id in technique_components_dict[technique_id]: + component_id.update(component_source_dict[component_id['data_component_id']]) + + # https://stix2.readthedocs.io/en/latest/guide/versioning.html + for i in range(len(stix_object)): + if stix_object[i].id in technique_components_dict.keys(): + stix_object[i] = stix_object[i].new_version(x_mitre_data_sources = technique_components_dict[stix_object[i].id]) + + return stix_object