Skip to content

Commit 3bcac3b

Browse files
authored
Merge pull request #62 from dadokkio/master
Add support for campaings entity added in MITRE v12
2 parents da8c545 + ff8ed68 commit 3bcac3b

File tree

2 files changed

+143
-8
lines changed

2 files changed

+143
-8
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,5 @@ venv.bak/
108108
# intellij
109109
.idea/
110110

111-
.DS_Store
111+
.DS_Store
112+
.history

attackcti/attack_api.py

Lines changed: 141 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,20 @@ def translate_stix_objects(self, stix_objects):
211211
"x_mitre_collection_layers": "collection_layers",
212212
"x_mitre_contributors": "contributors"
213213
}
214+
campaign_stix_mapping = {
215+
"type": "type",
216+
"id": "id",
217+
"created_by_ref": "created_by_ref",
218+
"created": "created",
219+
"modified": "modified",
220+
"name": "name",
221+
"description": "campaign_description",
222+
"aliases": "campaign_aliases",
223+
"object_marking_refs": "object_marking_refs",
224+
"external_references": "external_references",
225+
"x_mitre_first_seen_citation": "first_seen_citation",
226+
"x_mitre_last_seen_citation": "last_seen_citation"
227+
}
214228

215229
# ******** Helper Functions ********
216230
def handle_list(list_object, object_type):
@@ -233,6 +247,8 @@ def handle_list(list_object, object_type):
233247
obj_dict['tactic_id'] = list_object[0]['external_id']
234248
elif obj_dict['type'] == 'matrix':
235249
obj_dict['matrix_id'] = list_object[0]['external_id']
250+
elif obj_dict['type'] == 'campaign':
251+
obj_dict['campaign_id'] = list_object[0]['external_id']
236252
elif object_type == "kill_chain_phases":
237253
tactic_list = list()
238254
for phase in list_object:
@@ -269,6 +285,8 @@ def handle_list(list_object, object_type):
269285
stix_mapping = marking_stix_mapping
270286
elif obj['type'] == "x-mitre-data-source":
271287
stix_mapping = data_source_stix_mapping
288+
elif obj['type'] == "campaign":
289+
stix_mapping = campaign_stix_mapping
272290
else:
273291
return stix_objects_list
274292

@@ -333,7 +351,8 @@ def get_enterprise(self, stix_format=True):
333351
"tactics": self.get_enterprise_tactics,
334352
"matrix": Filter("type", "=", "x-mitre-matrix"),
335353
"identity": Filter("type", "=", "identity"),
336-
"marking-definition": Filter("type", "=", "marking-definition")
354+
"marking-definition": Filter("type", "=", "marking-definition"),
355+
"campaign": self.get_enterprise_campaigns
337356
}
338357
enterprise_stix_objects = dict()
339358
for key in enterprise_filter_objects:
@@ -342,6 +361,25 @@ def get_enterprise(self, stix_format=True):
342361
enterprise_stix_objects[key] = self.translate_stix_objects(enterprise_stix_objects[key])
343362
return enterprise_stix_objects
344363

364+
def get_enterprise_campaigns(self, skip_revoked_deprecated=True, stix_format=True):
365+
""" Extracts all the available campaigns STIX objects in the Enterprise ATT&CK matrix
366+
367+
Args:
368+
skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects.
369+
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
370+
371+
Returns:
372+
List of STIX objects
373+
"""
374+
enterprise_campaigns = self.TC_ENTERPRISE_SOURCE.query([Filter("type", "=", "campaign")])
375+
376+
if skip_revoked_deprecated:
377+
enterprise_campaigns = self.remove_revoked_deprecated(enterprise_campaigns)
378+
379+
if not stix_format:
380+
enterprise_campaigns = self.translate_stix_objects(enterprise_campaigns)
381+
return enterprise_campaigns
382+
345383
def get_enterprise_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, enrich_data_sources = False, stix_format=True):
346384
""" Extracts all the available techniques STIX objects in the Enterprise ATT&CK matrix
347385
@@ -639,15 +677,36 @@ def get_mobile(self, stix_format=True):
639677
"tactics": self.get_mobile_tactics,
640678
"matrix": Filter("type", "=", "x-mitre-matrix"),
641679
"identity": Filter("type", "=", "identity"),
642-
"marking-definition": Filter("type", "=", "marking-definition")
680+
"marking-definition": Filter("type", "=", "marking-definition"),
681+
"campaigns": self.get_mobile_campaigns
643682
}
644683
mobile_stix_objects = {}
645684
for key in mobile_filter_objects:
646685
mobile_stix_objects[key] = self.TC_MOBILE_SOURCE.query(mobile_filter_objects[key]) if isinstance(mobile_filter_objects[key], Filter) else mobile_filter_objects[key]()
647686
if not stix_format:
648687
mobile_stix_objects[key] = self.translate_stix_objects(mobile_stix_objects[key])
649688
return mobile_stix_objects
650-
689+
690+
def get_mobile_campaigns(self, skip_revoked_deprecated=True, stix_format=True):
691+
""" Extracts all the available techniques STIX objects in the Mobile ATT&CK matrix
692+
693+
Args:
694+
skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects.
695+
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
696+
697+
Returns:
698+
List of STIX objects
699+
"""
700+
701+
mobile_campaigns = self.TC_MOBILE_SOURCE.query(Filter("type", "=", "campaign"))
702+
703+
if skip_revoked_deprecated:
704+
mobile_campaigns = self.remove_revoked_deprecated(mobile_campaigns)
705+
706+
if not stix_format:
707+
mobile_campaigns = self.translate_stix_objects(mobile_campaigns)
708+
return mobile_campaigns
709+
651710
def get_mobile_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, stix_format=True):
652711
""" Extracts all the available techniques STIX objects in the Mobile ATT&CK matrix
653712
@@ -948,7 +1007,32 @@ def get_stix_objects(self, stix_format=True):
9481007
for resource_type in attack_stix_objects[matrix].keys():
9491008
attack_stix_objects[matrix][resource_type] = self.translate_stix_objects(attack_stix_objects[matrix][resource_type])
9501009
return attack_stix_objects
951-
1010+
1011+
def get_campaigns(self, skip_revoked_deprecated=True, stix_format=True):
1012+
""" Extracts all the available campaigns STIX objects across all ATT&CK matrices
1013+
1014+
Args:
1015+
skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects.
1016+
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
1017+
1018+
Returns:
1019+
List of STIX objects
1020+
"""
1021+
1022+
enterprise_campaigns = self.get_enterprise_campaigns()
1023+
mobile_campaigns = self.get_mobile_campaigns()
1024+
for mc in mobile_campaigns:
1025+
if mc not in enterprise_campaigns:
1026+
enterprise_campaigns.append(mc)
1027+
1028+
if skip_revoked_deprecated:
1029+
enterprise_campaigns = self.remove_revoked_deprecated(enterprise_campaigns)
1030+
1031+
if not stix_format:
1032+
enterprise_campaigns = self.translate_stix_objects(enterprise_campaigns)
1033+
1034+
return enterprise_campaigns
1035+
9521036
def get_techniques(self, include_subtechniques=True, skip_revoked_deprecated=True, enrich_data_sources=False, stix_format=True):
9531037
""" Extracts all the available techniques STIX objects across all ATT&CK matrices
9541038
@@ -1272,7 +1356,7 @@ def get_object_by_attack_id(self, object_type, attack_id, stix_format=True):
12721356
List of STIX objects
12731357
12741358
"""
1275-
valid_objects = {'attack-pattern','course-of-action','intrusion-set','malware','tool','x-mitre-data-source', 'x-mitre-data-component'}
1359+
valid_objects = {'attack-pattern','course-of-action','intrusion-set','malware','tool','x-mitre-data-source', 'x-mitre-data-component', 'campaign'}
12761360
if object_type not in valid_objects:
12771361
raise ValueError(f"ERROR: Valid object must be one of {valid_objects}")
12781362
else:
@@ -1285,6 +1369,36 @@ def get_object_by_attack_id(self, object_type, attack_id, stix_format=True):
12851369
all_stix_objects = self.translate_stix_objects(all_stix_objects)
12861370
return all_stix_objects
12871371

1372+
def get_campaign_by_alias(self, campaign_alias, case=True, stix_format=True):
1373+
""" Extracts campaign STIX objects by alias name accross all ATT&CK matrices
1374+
1375+
Args:
1376+
campaign_alias (str) : Alias of threat actor group
1377+
case (bool) : case sensitive or not
1378+
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
1379+
1380+
Returns:
1381+
List of STIX objects
1382+
1383+
"""
1384+
if not case:
1385+
all_campaigns = self.get_campaigns()
1386+
all_campaigns_list = list()
1387+
for campaign in all_campaigns:
1388+
if "aliases" in campaign.keys():
1389+
for alias in campaign['aliases']:
1390+
if campaign_alias.lower() in alias.lower():
1391+
all_campaigns_list.append(campaign)
1392+
else:
1393+
filter_objects = [
1394+
Filter('type', '=', 'campaign'),
1395+
Filter('aliases', '=', campaign_alias)
1396+
]
1397+
all_campaigns_list = self.COMPOSITE_DS.query(filter_objects)
1398+
if not stix_format:
1399+
all_campaigns_list = self.translate_stix_objects(all_campaigns_list)
1400+
return all_campaigns_list
1401+
12881402
def get_group_by_alias(self, group_alias, case=True, stix_format=True):
12891403
""" Extracts group STIX objects by alias name accross all ATT&CK matrices
12901404
@@ -1314,7 +1428,27 @@ def get_group_by_alias(self, group_alias, case=True, stix_format=True):
13141428
if not stix_format:
13151429
all_groups_list = self.translate_stix_objects(all_groups_list)
13161430
return all_groups_list
1317-
1431+
1432+
def get_campaigns_since_time(self, timestamp, stix_format=True):
1433+
""" Extracts campaings STIX objects since specific time accross all ATT&CK matrices
1434+
1435+
Args:
1436+
timestamp (timestamp): Timestamp
1437+
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
1438+
1439+
Returns:
1440+
List of STIX objects
1441+
1442+
"""
1443+
filter_objects = [
1444+
Filter('type', '=', 'campaign'),
1445+
Filter('created', '>', timestamp)
1446+
]
1447+
all_campaigns_list = self.COMPOSITE_DS.query(filter_objects)
1448+
if not stix_format:
1449+
all_campaigns_list = self.translate_stix_objects(all_campaigns_list)
1450+
return all_campaigns_list
1451+
13181452
def get_techniques_since_time(self, timestamp, stix_format=True):
13191453
""" Extracts techniques STIX objects since specific time accross all ATT&CK matrices
13201454
@@ -1807,4 +1941,4 @@ def enrich_techniques_data_sources(self, stix_object):
18071941
if technique_ds:
18081942
new_data_sources = [ v for v in technique_ds.values()]
18091943
stix_object[i] = stix_object[i].new_version(x_mitre_data_sources = new_data_sources)
1810-
return stix_object
1944+
return stix_object

0 commit comments

Comments
 (0)