diff --git a/.gitignore b/.gitignore index b6f442b..7b9599c 100644 --- a/.gitignore +++ b/.gitignore @@ -108,4 +108,5 @@ venv.bak/ # intellij .idea/ -.DS_Store \ No newline at end of file +.DS_Store +.history diff --git a/attackcti/attack_api.py b/attackcti/attack_api.py index 8429299..3658add 100644 --- a/attackcti/attack_api.py +++ b/attackcti/attack_api.py @@ -208,6 +208,20 @@ def translate_stix_objects(self, stix_objects): "x_mitre_collection_layers": "collection_layers", "x_mitre_contributors": "contributors" } + campaign_stix_mapping = { + "type": "type", + "id": "id", + "created_by_ref": "created_by_ref", + "created": "created", + "modified": "modified", + "name": "name", + "description": "campaign_description", + "aliases": "campaign_aliases", + "object_marking_refs": "object_marking_refs", + "external_references": "external_references", + "x_mitre_first_seen_citation": "first_seen_citation", + "x_mitre_last_seen_citation": "last_seen_citation" + } # ******** Helper Functions ******** def handle_list(list_object, object_type): @@ -230,6 +244,8 @@ def handle_list(list_object, object_type): obj_dict['tactic_id'] = list_object[0]['external_id'] elif obj_dict['type'] == 'matrix': obj_dict['matrix_id'] = list_object[0]['external_id'] + elif obj_dict['type'] == 'campaign': + obj_dict['campaign_id'] = list_object[0]['external_id'] elif object_type == "kill_chain_phases": tactic_list = list() for phase in list_object: @@ -266,6 +282,8 @@ def handle_list(list_object, object_type): stix_mapping = marking_stix_mapping elif obj['type'] == "x-mitre-data-source": stix_mapping = data_source_stix_mapping + elif obj['type'] == "campaign": + stix_mapping = campaign_stix_mapping else: return stix_objects_list @@ -330,7 +348,8 @@ def get_enterprise(self, stix_format=True): "tactics": self.get_enterprise_tactics, "matrix": Filter("type", "=", "x-mitre-matrix"), "identity": Filter("type", "=", "identity"), - "marking-definition": Filter("type", "=", "marking-definition") + "marking-definition": Filter("type", "=", "marking-definition"), + "campaign": self.get_enterprise_campaigns } enterprise_stix_objects = dict() for key in enterprise_filter_objects: @@ -339,6 +358,25 @@ def get_enterprise(self, stix_format=True): enterprise_stix_objects[key] = self.translate_stix_objects(enterprise_stix_objects[key]) return enterprise_stix_objects + def get_enterprise_campaigns(self, skip_revoked_deprecated=True, stix_format=True): + """ Extracts all the available campaigns STIX objects in the Enterprise ATT&CK matrix + + Args: + skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. + stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique') + + Returns: + List of STIX objects + """ + enterprise_campaigns = self.TC_ENTERPRISE_SOURCE.query([Filter("type", "=", "campaign")]) + + if skip_revoked_deprecated: + enterprise_campaigns = self.remove_revoked_deprecated(enterprise_campaigns) + + if not stix_format: + enterprise_campaigns = self.translate_stix_objects(enterprise_campaigns) + return enterprise_campaigns + def get_enterprise_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, enrich_data_sources = False, stix_format=True): """ Extracts all the available techniques STIX objects in the Enterprise ATT&CK matrix @@ -636,7 +674,8 @@ def get_mobile(self, stix_format=True): "tactics": self.get_mobile_tactics, "matrix": Filter("type", "=", "x-mitre-matrix"), "identity": Filter("type", "=", "identity"), - "marking-definition": Filter("type", "=", "marking-definition") + "marking-definition": Filter("type", "=", "marking-definition"), + "campaigns": self.get_mobile_campaigns } mobile_stix_objects = {} for key in mobile_filter_objects: @@ -644,7 +683,27 @@ def get_mobile(self, stix_format=True): if not stix_format: mobile_stix_objects[key] = self.translate_stix_objects(mobile_stix_objects[key]) return mobile_stix_objects - + + def get_mobile_campaigns(self, skip_revoked_deprecated=True, stix_format=True): + """ Extracts all the available techniques STIX objects in the Mobile ATT&CK matrix + + Args: + skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. + stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique') + + Returns: + List of STIX objects + """ + + mobile_campaigns = self.TC_MOBILE_SOURCE.query(Filter("type", "=", "campaign")) + + if skip_revoked_deprecated: + mobile_campaigns = self.remove_revoked_deprecated(mobile_campaigns) + + if not stix_format: + mobile_campaigns = self.translate_stix_objects(mobile_campaigns) + return mobile_campaigns + def get_mobile_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, stix_format=True): """ Extracts all the available techniques STIX objects in the Mobile ATT&CK matrix @@ -945,7 +1004,32 @@ def get_stix_objects(self, stix_format=True): for resource_type in attack_stix_objects[matrix].keys(): attack_stix_objects[matrix][resource_type] = self.translate_stix_objects(attack_stix_objects[matrix][resource_type]) return attack_stix_objects - + + def get_campaigns(self, skip_revoked_deprecated=True, stix_format=True): + """ Extracts all the available campaigns STIX objects across all ATT&CK matrices + + Args: + skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. + stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique') + + Returns: + List of STIX objects + """ + + enterprise_campaigns = self.get_enterprise_campaigns() + mobile_campaigns = self.get_mobile_campaigns() + for mc in mobile_campaigns: + if mc not in enterprise_campaigns: + enterprise_campaigns.append(mc) + + if skip_revoked_deprecated: + enterprise_campaigns = self.remove_revoked_deprecated(enterprise_campaigns) + + if not stix_format: + enterprise_campaigns = self.translate_stix_objects(enterprise_campaigns) + + return enterprise_campaigns + def get_techniques(self, include_subtechniques=True, skip_revoked_deprecated=True, enrich_data_sources=False, stix_format=True): """ Extracts all the available techniques STIX objects across all ATT&CK matrices @@ -1269,7 +1353,7 @@ def get_object_by_attack_id(self, object_type, attack_id, stix_format=True): List of STIX objects """ - valid_objects = {'attack-pattern','course-of-action','intrusion-set','malware','tool','x-mitre-data-source', 'x-mitre-data-component'} + valid_objects = {'attack-pattern','course-of-action','intrusion-set','malware','tool','x-mitre-data-source', 'x-mitre-data-component', 'campaign'} if object_type not in valid_objects: raise ValueError(f"ERROR: Valid object must be one of {valid_objects}") else: @@ -1282,6 +1366,36 @@ def get_object_by_attack_id(self, object_type, attack_id, stix_format=True): all_stix_objects = self.translate_stix_objects(all_stix_objects) return all_stix_objects + def get_campaign_by_alias(self, campaign_alias, case=True, stix_format=True): + """ Extracts campaign STIX objects by alias name accross all ATT&CK matrices + + Args: + campaign_alias (str) : Alias of threat actor group + case (bool) : case sensitive or not + stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique') + + Returns: + List of STIX objects + + """ + if not case: + all_campaigns = self.get_campaigns() + all_campaigns_list = list() + for campaign in all_campaigns: + if "aliases" in campaign.keys(): + for alias in campaign['aliases']: + if campaign_alias.lower() in alias.lower(): + all_campaigns_list.append(campaign) + else: + filter_objects = [ + Filter('type', '=', 'campaign'), + Filter('aliases', '=', campaign_alias) + ] + all_campaigns_list = self.COMPOSITE_DS.query(filter_objects) + if not stix_format: + all_campaigns_list = self.translate_stix_objects(all_campaigns_list) + return all_campaigns_list + def get_group_by_alias(self, group_alias, case=True, stix_format=True): """ Extracts group STIX objects by alias name accross all ATT&CK matrices @@ -1311,7 +1425,27 @@ def get_group_by_alias(self, group_alias, case=True, stix_format=True): if not stix_format: all_groups_list = self.translate_stix_objects(all_groups_list) return all_groups_list - + + def get_campaigns_since_time(self, timestamp, stix_format=True): + """ Extracts campaings STIX objects since specific time accross all ATT&CK matrices + + Args: + timestamp (timestamp): Timestamp + stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique') + + Returns: + List of STIX objects + + """ + filter_objects = [ + Filter('type', '=', 'campaign'), + Filter('created', '>', timestamp) + ] + all_campaigns_list = self.COMPOSITE_DS.query(filter_objects) + if not stix_format: + all_campaigns_list = self.translate_stix_objects(all_campaigns_list) + return all_campaigns_list + def get_techniques_since_time(self, timestamp, stix_format=True): """ Extracts techniques STIX objects since specific time accross all ATT&CK matrices @@ -1804,4 +1938,4 @@ def enrich_techniques_data_sources(self, stix_object): if technique_ds: new_data_sources = [ v for v in technique_ds.values()] stix_object[i] = stix_object[i].new_version(x_mitre_data_sources = new_data_sources) - return stix_object \ No newline at end of file + return stix_object