-
-
Notifications
You must be signed in to change notification settings - Fork 490
Phunter Analyzer #2841
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Phunter Analyzer #2841
Changes from 6 commits
300cb3b
d42f950
5d260d1
4f9a601
373e875
4aa4a06
fb9084f
4fb7ba3
6caac86
b32e62f
0c91e86
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
from django.db import migrations | ||
from django.db.models.fields.related_descriptors import ( | ||
ForwardManyToOneDescriptor, | ||
ForwardOneToOneDescriptor, | ||
ManyToManyDescriptor, | ||
ReverseManyToOneDescriptor, | ||
ReverseOneToOneDescriptor, | ||
) | ||
|
||
plugin = { | ||
"python_module": { | ||
"health_check_schedule": None, | ||
"update_schedule": None, | ||
"module": "phunter.PhunterAnalyzer", | ||
"base_path": "api_app.analyzers_manager.observable_analyzers", | ||
}, | ||
"name": "Phunter", | ||
"description": "[Phunter Analyzer](https://github.com/N0rz3/Phunter) is an OSINT tool for finding information about a phone number.", | ||
"disabled": False, | ||
"soft_time_limit": 60, | ||
"routing_key": "default", | ||
"health_check_status": True, | ||
"type": "observable", | ||
"docker_based": True, | ||
"maximum_tlp": "RED", | ||
"observable_supported": ["generic"], | ||
"supported_filetypes": [], | ||
"run_hash": False, | ||
"run_hash_type": "", | ||
"not_supported_filetypes": [], | ||
"mapping_data_model": {}, | ||
"model": "analyzers_manager.AnalyzerConfig", | ||
} | ||
|
||
params = [] | ||
|
||
values = [] | ||
|
||
|
||
def _get_real_obj(Model, field, value): | ||
def _get_obj(Model, other_model, value): | ||
if isinstance(value, dict): | ||
real_vals = {} | ||
for key, real_val in value.items(): | ||
real_vals[key] = _get_real_obj(other_model, key, real_val) | ||
value = other_model.objects.get_or_create(**real_vals)[0] | ||
# it is just the primary key serialized | ||
else: | ||
if isinstance(value, int): | ||
if Model.__name__ == "PluginConfig": | ||
value = other_model.objects.get(name=plugin["name"]) | ||
else: | ||
value = other_model.objects.get(pk=value) | ||
else: | ||
value = other_model.objects.get(name=value) | ||
return value | ||
|
||
if ( | ||
type(getattr(Model, field)) | ||
in [ | ||
ForwardManyToOneDescriptor, | ||
ReverseManyToOneDescriptor, | ||
ReverseOneToOneDescriptor, | ||
ForwardOneToOneDescriptor, | ||
] | ||
and value | ||
): | ||
other_model = getattr(Model, field).get_queryset().model | ||
value = _get_obj(Model, other_model, value) | ||
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: | ||
other_model = getattr(Model, field).rel.model | ||
value = [_get_obj(Model, other_model, val) for val in value] | ||
return value | ||
|
||
|
||
def _create_object(Model, data): | ||
mtm, no_mtm = {}, {} | ||
for field, value in data.items(): | ||
value = _get_real_obj(Model, field, value) | ||
if type(getattr(Model, field)) is ManyToManyDescriptor: | ||
mtm[field] = value | ||
else: | ||
no_mtm[field] = value | ||
try: | ||
o = Model.objects.get(**no_mtm) | ||
except Model.DoesNotExist: | ||
o = Model(**no_mtm) | ||
o.full_clean() | ||
o.save() | ||
for field, value in mtm.items(): | ||
attribute = getattr(o, field) | ||
if value is not None: | ||
attribute.set(value) | ||
return False | ||
return True | ||
|
||
|
||
def migrate(apps, schema_editor): | ||
Parameter = apps.get_model("api_app", "Parameter") | ||
PluginConfig = apps.get_model("api_app", "PluginConfig") | ||
python_path = plugin.pop("model") | ||
Model = apps.get_model(*python_path.split(".")) | ||
if not Model.objects.filter(name=plugin["name"]).exists(): | ||
exists = _create_object(Model, plugin) | ||
if not exists: | ||
for param in params: | ||
_create_object(Parameter, param) | ||
for value in values: | ||
_create_object(PluginConfig, value) | ||
|
||
|
||
def reverse_migrate(apps, schema_editor): | ||
python_path = plugin.pop("model") | ||
Model = apps.get_model(*python_path.split(".")) | ||
Model.objects.get(name=plugin["name"]).delete() | ||
|
||
|
||
class Migration(migrations.Migration): | ||
atomic = False | ||
dependencies = [ | ||
("api_app", "0071_delete_last_elastic_report"), | ||
("analyzers_manager", "0155_analyzer_config_debloat"), | ||
] | ||
|
||
operations = [migrations.RunPython(migrate, reverse_migrate)] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import logging | ||
|
||
import phonenumbers | ||
import requests | ||
|
||
from api_app.analyzers_manager.classes import DockerBasedAnalyzer, ObservableAnalyzer | ||
from api_app.analyzers_manager.exceptions import AnalyzerRunException | ||
from tests.mock_utils import MockUpResponse | ||
|
||
logging.basicConfig(level=logging.DEBUG) | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class PhunterAnalyzer(ObservableAnalyzer, DockerBasedAnalyzer): | ||
name: str = "Phunter" | ||
url: str = "http://phunter:5000/analyze" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
max_tries: int = 1 | ||
poll_distance: int = 0 | ||
|
||
def run(self): | ||
try: | ||
phonenumbers.parse(self.observable_name) | ||
except phonenumbers.phonenumberutil.NumberParseException: | ||
logger.error(f"Phone number parsing failed for: {self.observable_name}") | ||
return {"success": False, "error": "Invalid phone number"} | ||
|
||
req_data = {"phone_number": self.observable_name} | ||
logger.info(f"Sending {self.name} scan request: {req_data} to {self.url}") | ||
|
||
try: | ||
response = self._docker_run(req_data, analyzer_name=self.name) | ||
logger.info(f"[{self.name}] Scan successful by Phunter. Result: {response}") | ||
return response | ||
|
||
except requests.exceptions.RequestException as e: | ||
logger.error( | ||
f"[{self.name}] Request failed due to network issue: {e}", exc_info=True | ||
) | ||
raise AnalyzerRunException(f"Request error to Phunter API: {e}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logging is unnecessary here, |
||
|
||
except ValueError as e: | ||
logger.error(f"[{self.name}] Invalid response format: {e}", exc_info=True) | ||
raise AnalyzerRunException(f"Invalid response format from Phunter API: {e}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same |
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If no error is raised then nothing is returned. We should use a default There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a general Exception as fallback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😄 Didn't expect you to call for a general exception—tables have turned! |
||
@classmethod | ||
def update(self): | ||
pass | ||
|
||
@staticmethod | ||
def mocked_docker_analyzer_post(*args, **kwargs): | ||
mock_response = { | ||
"success": True, | ||
"report": { | ||
"valid": "yes", | ||
"views": "9", | ||
"carrier": "Vodafone", | ||
"location": "India", | ||
"operator": "Vodafone", | ||
"possible": "yes", | ||
"line_type": "FIXED LINE OR MOBILE", | ||
"local_time": "21:34:45", | ||
"spam_status": "Not spammer", | ||
"phone_number": "+918929554991", | ||
"national_format": "089295 54991", | ||
"international_format": "+91 89295 54991", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this number real ? You should use a non existing number here |
||
}, | ||
} | ||
return MockUpResponse(mock_response, 200) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
FROM python:3.12-slim | ||
|
||
# Install dependencies | ||
RUN apt-get update && apt-get install -y --no-install-recommends git | ||
|
||
# Clone Phunter | ||
RUN git clone https://github.com/N0rz3/Phunter.git /app/Phunter | ||
|
||
# Set working directory | ||
WORKDIR /app | ||
|
||
# Copy requirements file and app.py to the working directory | ||
COPY requirements.txt app.py ./ | ||
|
||
# Upgrade pip and install Python packages | ||
RUN pip install --no-cache-dir --upgrade pip && \ | ||
pip install --no-cache-dir -r requirements.txt && \ | ||
pip install --no-cache-dir -r /app/Phunter/requirements.txt | ||
|
||
# Expose port | ||
EXPOSE 5000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
# Run the app | ||
CMD ["python", "app.py"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
import logging | ||
import re | ||
import subprocess | ||
|
||
import phonenumbers | ||
from flask import Flask, jsonify, request | ||
|
||
# Logging Configuration | ||
logging.basicConfig(level=logging.DEBUG) | ||
logger = logging.getLogger(__name__) | ||
|
||
app = Flask(__name__) | ||
|
||
|
||
def strip_ansi_codes(text): | ||
"""Remove ANSI escape codes from terminal output""" | ||
return re.sub(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])", "", text) | ||
|
||
|
||
def parse_phunter_output(output): | ||
"""Parse output from Phunter CLI and convert to structured JSON""" | ||
result = {} | ||
key_mapping = { | ||
"phone number:": "phone_number", | ||
"possible:": "possible", | ||
"valid:": "valid", | ||
"operator:": "operator", | ||
"possible location:": "location", | ||
"location:": "location", | ||
"carrier:": "carrier", | ||
"line type:": "line_type", | ||
"international:": "international_format", | ||
"national:": "national_format", | ||
"local time:": "local_time", | ||
"views count:": "views", | ||
} | ||
|
||
lines = output.splitlines() | ||
|
||
for line in lines: | ||
line = line.strip().lower() | ||
|
||
if "not spammer" in line: | ||
result["spam_status"] = "Not spammer" | ||
continue | ||
|
||
for keyword, key in key_mapping.items(): | ||
if keyword in line: | ||
value = line.partition(":")[2].strip() | ||
if key in ("possible", "valid"): | ||
result[key] = "yes" if "✔" in value else "no" | ||
else: | ||
result[key] = value | ||
break | ||
|
||
return result | ||
|
||
|
||
@app.route("/analyze", methods=["POST"]) | ||
def analyze(): | ||
data = request.get_json() | ||
phone_number = data.get("phone_number") | ||
|
||
logger.info("Received analysis request") | ||
|
||
if not phone_number: | ||
logger.warning("No phone number provided in request") | ||
return jsonify({"error": "No phone number provided"}), 400 | ||
|
||
try: | ||
|
||
parsed_number = phonenumbers.parse(phone_number) | ||
if not phonenumbers.is_valid_number(parsed_number): | ||
logger.warning("Invalid phone number") | ||
return jsonify({"error": "Invalid phone number"}), 400 | ||
|
||
formatted_number = phonenumbers.format_number( | ||
parsed_number, phonenumbers.PhoneNumberFormat.E164 | ||
) | ||
|
||
except phonenumbers.phonenumberutil.NumberParseException: | ||
logger.warning("Phone number parsing failed") | ||
return jsonify({"error": "Invalid phone number format"}), 400 | ||
|
||
try: | ||
|
||
logger.info("Executing Phunter CLI tool") | ||
command = ["python3", "phunter.py", "-t", formatted_number] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe consider using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Umm but as we are already using a list which i believe is safer and preferred. What you say? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The list is not escaping possible injection in formatted_number, so it would be safer to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure |
||
result = subprocess.run( | ||
command, | ||
|
||
capture_output=True, | ||
text=True, | ||
check=True, | ||
cwd="/app/Phunter", | ||
) | ||
|
||
raw_output = result.stdout | ||
clean_output = strip_ansi_codes(raw_output) | ||
parsed_output = parse_phunter_output(clean_output) | ||
|
||
logger.info("Phunter analysis completed") | ||
|
||
return ( | ||
jsonify( | ||
{ | ||
"success": True, | ||
"report": parsed_output, | ||
} | ||
), | ||
200, | ||
) | ||
|
||
except subprocess.CalledProcessError as e: | ||
logger.error(f"Phunter CLI failed: {e.stderr}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove this as said |
||
return jsonify({"error": "Phunter execution failed"}), 500 | ||
|
||
|
||
if __name__ == "__main__": | ||
logger.info("Starting Phunter Flask API...") | ||
app.run(host="0.0.0.0", port=5000) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
services: | ||
phunter: | ||
build: | ||
context: ../integrations/phunter | ||
dockerfile: Dockerfile | ||
image: intelowlproject/intelowl_phunter:test |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,11 @@ | ||||||||
services: | ||||||||
phunter: | ||||||||
image: intelowlproject/phunter:${REACT_APP_INTELOWL_VERSION} | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
container_name: phunter | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
restart: unless-stopped | ||||||||
expose: | ||||||||
- "5000" | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||||
volumes: | ||||||||
- generic_logs:/var/log/intel_owl | ||||||||
depends_on: | ||||||||
- uwsgi |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
flask==3.1.0 | ||
phonenumbers==9.0.3 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,6 +89,7 @@ pylnk3==0.4.2 | |
androguard==3.4.0a1 # version >=4.x of androguard raises a dependency conflict with quark-engine==25.1.1 | ||
wad==0.4.6 | ||
debloat==1.6.4 | ||
phonenumbers==9.0.3 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not needed if in docker analyzer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The phunter.py is also using this module so that would need to be installed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that performing the check twice is not necessary. I would keep it in docker analyzer only There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. But I believe it would be better in phunter.py so that it would instantly returns if invalid phone number is passed. |
||
|
||
# httpx required for HTTP/2 support (Mullvad DNS rejects HTTP/1.1 with protocol errors) | ||
httpx[http2]==0.28.1 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should refactor this.
Maybe adding an
avoid_polling
parameter to the_docker_run
function which defaults toFalse
to keep compatibility.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure