Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/iocs.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ It is also possible to load STIX2 files automatically from the environment varia
export MVT_STIX2="/home/user/IOC1.stix2:/home/user/IOC2.stix2"
```

## STIX2 Support

So far MVT implements only a subset of [STIX2 specifications](https://docs.oasis-open.org/cti/stix/v2.1/csprd01/stix-v2.1-csprd01.html):

* It only supports checks for one value (such as `[domain-name:value='DOMAIN']`) and not boolean expressions over multiple comparisons
* It only supports the following types: `domain-name:value`, `process:name`, `email-addr:value`, `file:name`, `file:path`, `file:hashes.md5`, `file:hashes.sha1`, `file:hashes.sha256`, `app:id`, `configuration-profile:id`, `android-property:name`, `url:value` (but each type will only be checked by a module if it is relevant to the type of data obtained)

## Known repositories of STIX2 IOCs

- The [Amnesty International investigations repository](https://github.com/AmnestyTech/investigations) contains STIX-formatted IOCs for:
Expand All @@ -46,3 +53,6 @@ export MVT_STIX2="/home/user/IOC1.stix2:/home/user/IOC2.stix2"
You can automaticallly download the latest public indicator files with the command `mvt-ios download-iocs` or `mvt-android download-iocs`. These commands download the list of indicators from the [mvt-indicators](https://github.com/mvt-project/mvt-indicators/blob/main/indicators.yaml) repository and store them in the [appdir](https://pypi.org/project/appdirs/) folder. They are then loaded automatically by MVT.

Please [open an issue](https://github.com/mvt-project/mvt/issues/) to suggest new sources of STIX-formatted IOCs.



3 changes: 2 additions & 1 deletion src/mvt/android/modules/adb/chrome_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ def check_indicators(self) -> None:
return

for result in self.results:
if self.indicators.check_domain(result["url"]):
if self.indicators.check_url(result["url"]):
self.detected.append(result)
continue

def _parse_db(self, db_path: str) -> None:
"""Parse a Chrome History database file.
Expand Down
3 changes: 2 additions & 1 deletion src/mvt/android/modules/adb/sms.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ def check_indicators(self) -> None:
if message_links == []:
message_links = check_for_links(message["body"])

if self.indicators.check_domains(message_links):
if self.indicators.check_urls(message_links):
self.detected.append(message)
continue

def _parse_db(self, db_path: str) -> None:
"""Parse an Android bugle_db SMS database file.
Expand Down
3 changes: 2 additions & 1 deletion src/mvt/android/modules/adb/whatsapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ def check_indicators(self) -> None:
continue

message_links = check_for_links(message["data"])
if self.indicators.check_domains(message_links):
if self.indicators.check_urls(message_links):
self.detected.append(message)
continue

def _parse_db(self, db_path: str) -> None:
"""Parse an Android msgstore.db WhatsApp database file.
Expand Down
3 changes: 2 additions & 1 deletion src/mvt/android/modules/backup/sms.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ def check_indicators(self) -> None:
if message_links == []:
message_links = check_for_links(message.get("text", ""))

if self.indicators.check_domains(message_links):
if self.indicators.check_urls(message_links):
self.detected.append(message)
continue

def run(self) -> None:
sms_path = "apps/com.android.providers.telephony/d_f/*_sms_backup"
Expand Down
149 changes: 119 additions & 30 deletions src/mvt/common/indicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,26 @@ def _new_collection(
"emails": [],
"file_names": [],
"file_paths": [],
"files_md5": [],
"files_sha1": [],
"files_sha256": [],
"app_ids": [],
"ios_profile_ids": [],
"android_property_names": [],
"urls": [],
"count": 0,
}

def _add_indicator(self, ioc: str, ioc_coll: dict, ioc_coll_list: list) -> None:
ioc = ioc.strip("'")
ioc = ioc.replace("'", "").strip()
if ioc not in ioc_coll_list:
ioc_coll_list.append(ioc)
ioc_coll["count"] += 1
self.total_ioc_count += 1

def _process_indicator(self, indicator: dict, collection: dict) -> None:
key, value = indicator.get("pattern", "").strip("[]").split("=")
key = key.strip()

if key == "domain-name:value":
# We force domain names to lower case.
Expand Down Expand Up @@ -116,6 +120,14 @@ def _process_indicator(self, indicator: dict, collection: dict) -> None:
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["file_paths"]
)
elif key == "file:hashes.md5":
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["files_md5"]
)
elif key == "file:hashes.sha1":
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["files_sha1"]
)
elif key == "file:hashes.sha256":
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["files_sha256"]
Expand All @@ -137,6 +149,14 @@ def _process_indicator(self, indicator: dict, collection: dict) -> None:
ioc_coll=collection,
ioc_coll_list=collection["android_property_names"],
)
elif key == "url:value":
self._add_indicator(
ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["urls"],
)
else:
self.log.debug("Can't add indicator %s, type %s not supported", value, key)

def parse_stix2(self, file_path: str) -> None:
"""Extract indicators from a STIX2 file.
Expand All @@ -160,13 +180,17 @@ def parse_stix2(self, file_path: str) -> None:
malware = {}
indicators = []
relationships = []
reports = []
for entry in data.get("objects", []):
entry_type = entry.get("type", "")
# Consider both malware and reports as collections
if entry_type == "malware":
malware[entry["id"]] = {
"name": entry["name"],
"description": entry.get("description", ""),
}
elif entry_type == "report":
reports.append(entry)
elif entry_type == "indicator":
indicators.append(entry)
elif entry_type == "relationship":
Expand All @@ -183,27 +207,58 @@ def parse_stix2(self, file_path: str) -> None:
)
collections.append(collection)

for report in reports:
collection = self._new_collection(
report["id"],
report.get("name", ""),
report.get("description", ""),
os.path.basename(file_path),
file_path,
)
collections.append(collection)

# Adds a default collection
default_collection = self._new_collection(
"0",
"Default collection",
"Collection with IOCs unrelated to malware or reports",
os.path.basename(file_path),
file_path,
)

# We loop through all indicators.
for indicator in indicators:
malware_id = None

# We loop through all relationships and find the one pertinent to
# the current indicator.
for relationship in relationships:
if relationship["source_ref"] != indicator["id"]:
continue

# Look for a malware definition with the correct identifier.
if relationship["target_ref"] in malware.keys():
malware_id = relationship["target_ref"]
break

# Now we look for the correct collection matching the malware ID we
# got from the relationship.
for collection in collections:
if collection["id"] == malware_id:
self._process_indicator(indicator, collection)
break
# We loop through reports first to see if the indicator is in the refs
for report in reports:
for ref in report.get("object_refs", []):
if ref == indicator["id"]:
malware_id = report["id"]
break

if malware_id is None:
# We loop through all relationships and find the one pertinent to
# the current indicator.
for relationship in relationships:
if relationship["source_ref"] != indicator["id"]:
continue

# Look for a malware definition with the correct identifier.
if relationship["target_ref"] in malware.keys():
malware_id = relationship["target_ref"]
break

if malware_id is not None:
# Now we look for the correct collection matching the malware ID we
# got from the relationship.
for collection in collections:
if collection["id"] == malware_id:
self._process_indicator(indicator, collection)
break
else:
# Adds to the default collection
self._process_indicator(indicator, default_collection)

for coll in collections:
self.log.debug(
Expand All @@ -213,6 +268,9 @@ def parse_stix2(self, file_path: str) -> None:
)

self.ioc_collections.extend(collections)
if default_collection["count"] > 0:
# Adds the default collection only if therare some IOCs in it
self.ioc_collections.append(default_collection)

def load_indicators_files(
self, files: list, load_default: Optional[bool] = True
Expand Down Expand Up @@ -251,7 +309,7 @@ def get_ioc_matcher(
Build an Aho-Corasick automaton from a list of iocs (i.e indicators)
Returns an Aho-Corasick automaton

This data-structue and algorithim allows for fast matching of a large number
This data-structue and algorithm allows for fast matching of a large number
of match strings (i.e IOCs) against a large body of text. This will also
match strings containing the IOC, so it is important to confirm the
match is a valid IOC before using it.
Expand All @@ -261,23 +319,23 @@ def get_ioc_matcher(
print(ioc)

We use an LRU cache to avoid rebuilding the automaton every time we call a
function such as check_domain().
function such as check_url().
"""
automaton = ahocorasick.Automaton()
if ioc_type:
iocs = self.get_iocs(ioc_type)
elif ioc_list:
iocs = ioc_list
else:
raise ValueError("Must provide either ioc_tyxpe or ioc_list")
raise ValueError("Must provide either ioc_type or ioc_list")

for ioc in iocs:
automaton.add_word(ioc["value"], ioc)
automaton.make_automaton()
return automaton

@lru_cache()
def check_domain(self, url: str) -> Union[dict, None]:
def check_url(self, url: str) -> Union[dict, None]:
"""Check if a given URL matches any of the provided domain indicators.

:param url: URL to match against domain indicators
Expand All @@ -290,9 +348,21 @@ def check_domain(self, url: str) -> Union[dict, None]:
if not isinstance(url, str):
return None

# Create an Aho-Corasick automaton from the list of domains
domain_matcher = self.get_ioc_matcher("domains")
# Check the URL first
for ioc in self.get_iocs("urls"):
if ioc["value"] == url:
self.log.warning(
"Found a known suspicious URL %s "
'matching indicator "%s" from "%s"',
url,
ioc["value"],
ioc["name"],
)
return ioc

# Then check the domain
# Create an Aho-Corasick automaton from the list of urls
domain_matcher = self.get_ioc_matcher("domains")
try:
# First we use the provided URL.
orig_url = URL(url)
Expand All @@ -316,7 +386,7 @@ def check_domain(self, url: str) -> Union[dict, None]:
orig_url.url,
dest_url.url,
)
return self.check_domain(dest_url.url)
return self.check_url(dest_url.url)

final_url = dest_url
else:
Expand Down Expand Up @@ -389,7 +459,7 @@ def check_domain(self, url: str) -> Union[dict, None]:

return None

def check_domains(self, urls: list) -> Union[dict, None]:
def check_urls(self, urls: list) -> Union[dict, None]:
"""Check a list of URLs against the provided list of domain indicators.

:param urls: List of URLs to check against domain indicators
Expand All @@ -401,7 +471,7 @@ def check_domains(self, urls: list) -> Union[dict, None]:
return None

for url in urls:
check = self.check_domain(url)
check = self.check_url(url)
if check:
return check

Expand Down Expand Up @@ -591,17 +661,24 @@ def check_profile(self, profile_uuid: str) -> Union[dict, None]:
return None

def check_file_hash(self, file_hash: str) -> Union[dict, None]:
"""Check the provided SHA256 file hash against the list of indicators.
"""Check the provided file hash against the list of indicators.

:param file_hash: SHA256 hash to check
:param file_hash: hash to check
:type file_hash: str
:returns: Indicator details if matched, otherwise None

"""
if not file_hash:
return None

for ioc in self.get_iocs("files_sha256"):
if len(file_hash) == 32:
hash_type = "md5"
elif len(file_hash) == 40:
hash_type = "sha1"
else:
hash_type = "sha256"

for ioc in self.get_iocs("files_" + hash_type):
if file_hash.lower() == ioc["value"].lower():
self.log.warning(
'Found a known suspicious file with hash "%s" '
Expand Down Expand Up @@ -659,3 +736,15 @@ def check_android_property_name(self, property_name: str) -> Optional[dict]:
return ioc

return None

def check_domain(self, url: str) -> Union[dict, None]:
"""
Renamed check_url now, kept for compatibility
"""
return self.check_url(url)

def check_domains(self, urls: list) -> Union[dict, None]:
"""
Renamed check_domains, kept for compatibility
"""
return self.check_urls(urls)
2 changes: 1 addition & 1 deletion src/mvt/ios/modules/backup/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def check_indicators(self) -> None:
except Exception:
continue

ioc = self.indicators.check_domain(part)
ioc = self.indicators.check_url(part)
if ioc:
self.log.warning(
'Found mention of domain "%s" in a backup file with '
Expand Down
8 changes: 1 addition & 7 deletions src/mvt/ios/modules/fs/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,8 @@ def check_indicators(self) -> None:
self.detected.append(new_result)
continue

ioc = self.indicators.check_domain(value)
ioc = self.indicators.check_url(value)
if ioc:
self.log.warning(
'Found mention of a malicious domain "%s" in %s file at %s',
value,
result["artifact"],
result["isodate"],
)
new_result = copy.copy(result)
new_result["matched_indicator"] = ioc
self.detected.append(new_result)
Expand Down
2 changes: 1 addition & 1 deletion src/mvt/ios/modules/fs/cache_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def check_indicators(self) -> None:
self.detected = {}
for key, values in self.results.items():
for value in values:
ioc = self.indicators.check_domain(value["url"])
ioc = self.indicators.check_url(value["url"])
if ioc:
value["matched_indicator"] = ioc
if key not in self.detected:
Expand Down
Loading
Loading