Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added
- `is_ipv4` function
- `is_loopback` function
- `IPv4HTTPAdapter` class
- Support [ident.me](https://ident.me/json)
- Support [tnedi.me](https://tnedi.me/json)
### Changed
- `get_private_ipv4` function modified
- `_ipsb_ipv4` function modified
- `_ipapi_ipv4` function modified
- `_ipinfo_ipv4` function modified
- Test system modified
## [0.2] - 2025-05-04
### Added
Expand Down
128 changes: 89 additions & 39 deletions ipspot/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,58 @@
import argparse
import ipaddress
import socket
from typing import Union, Dict, Tuple, Any
from typing import Union, Dict, List, Tuple, Any
import requests
from requests.adapters import HTTPAdapter
from urllib3.poolmanager import PoolManager
from art import tprint
from .params import REQUEST_HEADERS, IPv4API, PARAMETERS_NAME_MAP
from .params import IPSPOT_OVERVIEW, IPSPOT_REPO, IPSPOT_VERSION


class IPv4HTTPAdapter(HTTPAdapter):
"""A custom HTTPAdapter that enforces the use of IPv4 for DNS resolution during HTTP(S) requests using the requests library."""

def init_poolmanager(self, connections: int, maxsize: int, block: bool = False, **kwargs: dict) -> None:
"""
Initialize the connection pool manager using a temporary override of socket.getaddrinfo to ensure only IPv4 addresses are used.

:param connections: the number of connection pools to cache
:param maxsize: the maximum number of connections to save in the pool
:param block: whether the connections should block when reaching the max size
:param kwargs: additional keyword arguments for the PoolManager
"""
self.poolmanager = PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
socket_options=self._ipv4_socket_options(),
**kwargs
)

def _ipv4_socket_options(self) -> list:
"""
Temporarily patches socket.getaddrinfo to filter only IPv4 addresses (AF_INET).

:return: an empty list of socket options; DNS patching occurs here
"""
original_getaddrinfo = socket.getaddrinfo

def ipv4_only_getaddrinfo(*args: list, **kwargs: dict) -> List[Tuple]:
results = original_getaddrinfo(*args, **kwargs)
return [res for res in results if res[0] == socket.AF_INET]

self._original_getaddrinfo = socket.getaddrinfo
socket.getaddrinfo = ipv4_only_getaddrinfo

return []

def __del__(self) -> None:
"""Restores the original socket.getaddrinfo function upon adapter deletion."""
if hasattr(self, "_original_getaddrinfo"):
socket.getaddrinfo = self._original_getaddrinfo


def ipspot_info() -> None: # pragma: no cover
"""Print ipspot details."""
tprint("IPSpot")
Expand Down Expand Up @@ -98,26 +143,28 @@ def _ipapi_ipv4(geo: bool=False, timeout: Union[float, Tuple[float, float]]
:param timeout: timeout value for API
"""
try:
response = requests.get("http://ip-api.com/json/", headers=REQUEST_HEADERS, timeout=timeout)
response.raise_for_status()
data = response.json()

if data.get("status") != "success":
return {"status": False, "error": "ip-api lookup failed"}
result = {"status": True, "data": {"ip": data.get("query"), "api": "ip-api.com"}}
if geo:
geo_data = {
"city": data.get("city"),
"region": data.get("regionName"),
"country": data.get("country"),
"country_code": data.get("countryCode"),
"latitude": data.get("lat"),
"longitude": data.get("lon"),
"organization": data.get("org"),
"timezone": data.get("timezone")
}
result["data"].update(geo_data)
return result
with requests.Session() as session:
session.mount("http://", IPv4HTTPAdapter())
session.mount("https://", IPv4HTTPAdapter())
response = session.get("http://ip-api.com/json/", headers=REQUEST_HEADERS, timeout=timeout)
response.raise_for_status()
data = response.json()
if data.get("status") != "success":
return {"status": False, "error": "ip-api lookup failed"}
result = {"status": True, "data": {"ip": data.get("query"), "api": "ip-api.com"}}
if geo:
geo_data = {
"city": data.get("city"),
"region": data.get("regionName"),
"country": data.get("country"),
"country_code": data.get("countryCode"),
"latitude": data.get("lat"),
"longitude": data.get("lon"),
"organization": data.get("org"),
"timezone": data.get("timezone")
}
result["data"].update(geo_data)
return result
except Exception as e:
return {"status": False, "error": str(e)}

Expand All @@ -131,24 +178,27 @@ def _ipinfo_ipv4(geo: bool=False, timeout: Union[float, Tuple[float, float]]
:param timeout: timeout value for API
"""
try:
response = requests.get("https://ipinfo.io/json", headers=REQUEST_HEADERS, timeout=timeout)
response.raise_for_status()
data = response.json()
result = {"status": True, "data": {"ip": data.get("ip"), "api": "ipinfo.io"}}
if geo:
loc = data.get("loc", "").split(",")
geo_data = {
"city": data.get("city"),
"region": data.get("region"),
"country": None,
"country_code": data.get("country"),
"latitude": float(loc[0]) if len(loc) == 2 else None,
"longitude": float(loc[1]) if len(loc) == 2 else None,
"organization": data.get("org"),
"timezone": data.get("timezone")
}
result["data"].update(geo_data)
return result
with requests.Session() as session:
session.mount("http://", IPv4HTTPAdapter())
session.mount("https://", IPv4HTTPAdapter())
response = session.get("https://ipinfo.io/json", headers=REQUEST_HEADERS, timeout=timeout)
response.raise_for_status()
data = response.json()
result = {"status": True, "data": {"ip": data.get("ip"), "api": "ipinfo.io"}}
if geo:
loc = data.get("loc", "").split(",")
geo_data = {
"city": data.get("city"),
"region": data.get("region"),
"country": None,
"country_code": data.get("country"),
"latitude": float(loc[0]) if len(loc) == 2 else None,
"longitude": float(loc[1]) if len(loc) == 2 else None,
"organization": data.get("org"),
"timezone": data.get("timezone")
}
result["data"].update(geo_data)
return result
except Exception as e:
return {"status": False, "error": str(e)}

Expand Down
7 changes: 4 additions & 3 deletions tests/test_ipv4.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from unittest import mock
import requests
from ipspot import get_private_ipv4, is_ipv4
from ipspot import get_public_ipv4, IPv4API
from ipspot import is_loopback
Expand Down Expand Up @@ -87,7 +88,7 @@ def test_public_ipv4_auto_timeout_error():


def test_public_ipv4_auto_net_error():
with mock.patch("requests.get", side_effect=Exception("No Internet")):
with mock.patch("requests.get", side_effect=Exception("No Internet")), mock.patch.object(requests.Session, "get", side_effect=Exception("No Internet")):
result = get_public_ipv4(api=IPv4API.AUTO)
assert not result["status"]
assert result["error"] == "All attempts failed."
Expand All @@ -107,7 +108,7 @@ def test_public_ipv4_ipapi_timeout_error():


def test_public_ipv4_ipapi_net_error():
with mock.patch("requests.get", side_effect=Exception("No Internet")):
with mock.patch.object(requests.Session, "get", side_effect=Exception("No Internet")):
result = get_public_ipv4(api=IPv4API.IPAPI)
assert not result["status"]
assert result["error"] == "No Internet"
Expand All @@ -127,7 +128,7 @@ def test_public_ipv4_ipinfo_timeout_error():


def test_public_ipv4_ipinfo_net_error():
with mock.patch("requests.get", side_effect=Exception("No Internet")):
with mock.patch.object(requests.Session, "get", side_effect=Exception("No Internet")):
result = get_public_ipv4(api=IPv4API.IPINFO)
assert not result["status"]
assert result["error"] == "No Internet"
Expand Down