From 3e6fe358cd3763b23d4eae4a972c82873cadf3e3 Mon Sep 17 00:00:00 2001 From: sepandhaghighi Date: Sun, 11 May 2025 20:08:59 +0330 Subject: [PATCH 1/7] feat : IPv4HTTPAdapter added --- ipspot/functions.py | 96 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 21 deletions(-) diff --git a/ipspot/functions.py b/ipspot/functions.py index 89eea9c..1311004 100644 --- a/ipspot/functions.py +++ b/ipspot/functions.py @@ -3,13 +3,65 @@ import argparse import ipaddress import socket -from typing import Union, Dict, Tuple, Any +from typing import Union, Dict, List, Tuple, Any import requests +from requests.adapters import HTTPAdapter +from urllib3.poolmanager import PoolManager from art import tprint from .params import REQUEST_HEADERS, IPv4API, PARAMETERS_NAME_MAP from .params import IPSPOT_OVERVIEW, IPSPOT_REPO, IPSPOT_VERSION +class IPv4HTTPAdapter(HTTPAdapter): + """ + A custom HTTPAdapter that enforces the use of IPv4 for DNS resolution + during HTTP(S) requests using the requests library. + """ + + def init_poolmanager(self, connections: int, maxsize: int, block: bool = False, **kwargs: Any) -> None: + """ + Initialize the connection pool manager using a temporary override of + socket.getaddrinfo to ensure only IPv4 addresses are used. + + :param connections: The number of connection pools to cache. + :param maxsize: The maximum number of connections to save in the pool. + :param block: Whether the connections should block when reaching the max size. + :param kwargs: Additional keyword arguments for the PoolManager. + """ + self.poolmanager = PoolManager( + num_pools=connections, + maxsize=maxsize, + block=block, + socket_options=self._ipv4_socket_options(), + **kwargs + ) + + def _ipv4_socket_options(self) -> List[Tuple]: + """ + Temporarily patches socket.getaddrinfo to filter only IPv4 addresses (AF_INET). + + :return: An empty list of socket options; DNS patching occurs here. + """ + original_getaddrinfo = socket.getaddrinfo + + def ipv4_only_getaddrinfo(*args: Any, **kwargs: Any) -> List[Tuple]: + results = original_getaddrinfo(*args, **kwargs) + return [res for res in results if res[0] == socket.AF_INET] + + # Save original for cleanup + self._original_getaddrinfo = socket.getaddrinfo + socket.getaddrinfo = ipv4_only_getaddrinfo + + return [] + + def __del__(self) -> None: + """ + Restores the original socket.getaddrinfo function upon adapter deletion. + """ + if hasattr(self, "_original_getaddrinfo"): + socket.getaddrinfo = self._original_getaddrinfo + + def ipspot_info() -> None: # pragma: no cover """Print ipspot details.""" tprint("IPSpot") @@ -98,26 +150,28 @@ def _ipapi_ipv4(geo: bool=False, timeout: Union[float, Tuple[float, float]] :param timeout: timeout value for API """ try: - response = requests.get("http://ip-api.com/json/", headers=REQUEST_HEADERS, timeout=timeout) - response.raise_for_status() - data = response.json() - - if data.get("status") != "success": - return {"status": False, "error": "ip-api lookup failed"} - result = {"status": True, "data": {"ip": data.get("query"), "api": "ip-api.com"}} - if geo: - geo_data = { - "city": data.get("city"), - "region": data.get("regionName"), - "country": data.get("country"), - "country_code": data.get("countryCode"), - "latitude": data.get("lat"), - "longitude": data.get("lon"), - "organization": data.get("org"), - "timezone": data.get("timezone") - } - result["data"].update(geo_data) - return result + with requests.Session() as session: + session.mount("http://", IPv4HTTPAdapter()) + session.mount("https://", IPv4HTTPAdapter()) + response = session.get("http://ip-api.com/json/", headers=REQUEST_HEADERS, timeout=timeout) + response.raise_for_status() + data = response.json() + if data.get("status") != "success": + return {"status": False, "error": "ip-api lookup failed"} + result = {"status": True, "data": {"ip": data.get("query"), "api": "ip-api.com"}} + if geo: + geo_data = { + "city": data.get("city"), + "region": data.get("regionName"), + "country": data.get("country"), + "country_code": data.get("countryCode"), + "latitude": data.get("lat"), + "longitude": data.get("lon"), + "organization": data.get("org"), + "timezone": data.get("timezone") + } + result["data"].update(geo_data) + return result except Exception as e: return {"status": False, "error": str(e)} From 8c6bfbac61be902c47e11a7b9aa813c2eed7749e Mon Sep 17 00:00:00 2001 From: sepandhaghighi Date: Mon, 12 May 2025 00:38:24 +0330 Subject: [PATCH 2/7] fix : IPv4HTTPAdapter updated --- ipspot/functions.py | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/ipspot/functions.py b/ipspot/functions.py index 1311004..dbcf90b 100644 --- a/ipspot/functions.py +++ b/ipspot/functions.py @@ -13,20 +13,17 @@ class IPv4HTTPAdapter(HTTPAdapter): - """ - A custom HTTPAdapter that enforces the use of IPv4 for DNS resolution - during HTTP(S) requests using the requests library. - """ + """A custom HTTPAdapter that enforces the use of IPv4 for DNS resolution during HTTP(S) requests using the requests library.""" - def init_poolmanager(self, connections: int, maxsize: int, block: bool = False, **kwargs: Any) -> None: + def init_poolmanager(self, connections: int, maxsize: int, block: bool = False, **kwargs: dict) -> None: """ Initialize the connection pool manager using a temporary override of socket.getaddrinfo to ensure only IPv4 addresses are used. - :param connections: The number of connection pools to cache. - :param maxsize: The maximum number of connections to save in the pool. - :param block: Whether the connections should block when reaching the max size. - :param kwargs: Additional keyword arguments for the PoolManager. + :param connections: the number of connection pools to cache + :param maxsize: the maximum number of connections to save in the pool + :param block: whether the connections should block when reaching the max size + :param kwargs: additional keyword arguments for the PoolManager """ self.poolmanager = PoolManager( num_pools=connections, @@ -40,7 +37,7 @@ def _ipv4_socket_options(self) -> List[Tuple]: """ Temporarily patches socket.getaddrinfo to filter only IPv4 addresses (AF_INET). - :return: An empty list of socket options; DNS patching occurs here. + :return: an empty list of socket options; DNS patching occurs here """ original_getaddrinfo = socket.getaddrinfo @@ -48,16 +45,13 @@ def ipv4_only_getaddrinfo(*args: Any, **kwargs: Any) -> List[Tuple]: results = original_getaddrinfo(*args, **kwargs) return [res for res in results if res[0] == socket.AF_INET] - # Save original for cleanup self._original_getaddrinfo = socket.getaddrinfo socket.getaddrinfo = ipv4_only_getaddrinfo return [] def __del__(self) -> None: - """ - Restores the original socket.getaddrinfo function upon adapter deletion. - """ + """Restores the original socket.getaddrinfo function upon adapter deletion.""" if hasattr(self, "_original_getaddrinfo"): socket.getaddrinfo = self._original_getaddrinfo From 9fec3589181634e4b1b4d8da5e1f73d88f6d3860 Mon Sep 17 00:00:00 2001 From: sepandhaghighi Date: Mon, 12 May 2025 00:39:51 +0330 Subject: [PATCH 3/7] fix : _ipinfo_ipv4 function updated --- ipspot/functions.py | 39 +++++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/ipspot/functions.py b/ipspot/functions.py index dbcf90b..95ce0f9 100644 --- a/ipspot/functions.py +++ b/ipspot/functions.py @@ -179,24 +179,27 @@ def _ipinfo_ipv4(geo: bool=False, timeout: Union[float, Tuple[float, float]] :param timeout: timeout value for API """ try: - response = requests.get("https://ipinfo.io/json", headers=REQUEST_HEADERS, timeout=timeout) - response.raise_for_status() - data = response.json() - result = {"status": True, "data": {"ip": data.get("ip"), "api": "ipinfo.io"}} - if geo: - loc = data.get("loc", "").split(",") - geo_data = { - "city": data.get("city"), - "region": data.get("region"), - "country": None, - "country_code": data.get("country"), - "latitude": float(loc[0]) if len(loc) == 2 else None, - "longitude": float(loc[1]) if len(loc) == 2 else None, - "organization": data.get("org"), - "timezone": data.get("timezone") - } - result["data"].update(geo_data) - return result + with requests.Session() as session: + session.mount("http://", IPv4HTTPAdapter()) + session.mount("https://", IPv4HTTPAdapter()) + response = session.get("https://ipinfo.io/json", headers=REQUEST_HEADERS, timeout=timeout) + response.raise_for_status() + data = response.json() + result = {"status": True, "data": {"ip": data.get("ip"), "api": "ipinfo.io"}} + if geo: + loc = data.get("loc", "").split(",") + geo_data = { + "city": data.get("city"), + "region": data.get("region"), + "country": None, + "country_code": data.get("country"), + "latitude": float(loc[0]) if len(loc) == 2 else None, + "longitude": float(loc[1]) if len(loc) == 2 else None, + "organization": data.get("org"), + "timezone": data.get("timezone") + } + result["data"].update(geo_data) + return result except Exception as e: return {"status": False, "error": str(e)} From 57e2b13ee0bc17796206b8b69b3ee5f1628bc93f Mon Sep 17 00:00:00 2001 From: sepandhaghighi Date: Mon, 12 May 2025 01:02:49 +0330 Subject: [PATCH 4/7] fix : tests updated --- tests/test_ipv4.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_ipv4.py b/tests/test_ipv4.py index 2a2793d..6a156ef 100644 --- a/tests/test_ipv4.py +++ b/tests/test_ipv4.py @@ -1,4 +1,5 @@ from unittest import mock +import requests from ipspot import get_private_ipv4, is_ipv4 from ipspot import get_public_ipv4, IPv4API from ipspot import is_loopback @@ -87,7 +88,7 @@ def test_public_ipv4_auto_timeout_error(): def test_public_ipv4_auto_net_error(): - with mock.patch("requests.get", side_effect=Exception("No Internet")): + with mock.patch("requests.get", side_effect=Exception("No Internet")), mock.patch.object(requests.Session, "get", side_effect=Exception("No Internet")): result = get_public_ipv4(api=IPv4API.AUTO) assert not result["status"] assert result["error"] == "All attempts failed." @@ -107,7 +108,7 @@ def test_public_ipv4_ipapi_timeout_error(): def test_public_ipv4_ipapi_net_error(): - with mock.patch("requests.get", side_effect=Exception("No Internet")): + with mock.patch.object(requests.Session, "get", side_effect=Exception("No Internet")): result = get_public_ipv4(api=IPv4API.IPAPI) assert not result["status"] assert result["error"] == "No Internet" @@ -127,7 +128,7 @@ def test_public_ipv4_ipinfo_timeout_error(): def test_public_ipv4_ipinfo_net_error(): - with mock.patch("requests.get", side_effect=Exception("No Internet")): + with mock.patch.object(requests.Session, "get", side_effect=Exception("No Internet")): result = get_public_ipv4(api=IPv4API.IPINFO) assert not result["status"] assert result["error"] == "No Internet" From a4639ca19909ce3eacbbbb3f7a968a6e4035550d Mon Sep 17 00:00:00 2001 From: sepandhaghighi Date: Mon, 12 May 2025 01:27:56 +0330 Subject: [PATCH 5/7] doc : CHANGELOG.md updated --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0b77d1..3a96a3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,11 +8,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - `is_ipv4` function - `is_loopback` function +- `IPv4HTTPAdapter` class - Support [ident.me](https://ident.me/json) - Support [tnedi.me](https://tnedi.me/json) ### Changed - `get_private_ipv4` function modified - `_ipsb_ipv4` function modified +- `_ipapi_ipv4` function modified +- `_ipinfo_ipv4` function modified - Test system modified ## [0.2] - 2025-05-04 ### Added From 4418628ce3aeaf6fa1f29554eb789f8a069819e3 Mon Sep 17 00:00:00 2001 From: sepandhaghighi Date: Mon, 12 May 2025 01:29:51 +0330 Subject: [PATCH 6/7] doc : docstring updated --- ipspot/functions.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ipspot/functions.py b/ipspot/functions.py index 95ce0f9..1ff8fc7 100644 --- a/ipspot/functions.py +++ b/ipspot/functions.py @@ -17,8 +17,7 @@ class IPv4HTTPAdapter(HTTPAdapter): def init_poolmanager(self, connections: int, maxsize: int, block: bool = False, **kwargs: dict) -> None: """ - Initialize the connection pool manager using a temporary override of - socket.getaddrinfo to ensure only IPv4 addresses are used. + Initialize the connection pool manager using a temporary override of socket.getaddrinfo to ensure only IPv4 addresses are used. :param connections: the number of connection pools to cache :param maxsize: the maximum number of connections to save in the pool From f740ba39eee7a54c8e273a93aa902fbb0d0675d7 Mon Sep 17 00:00:00 2001 From: sepandhaghighi Date: Mon, 12 May 2025 01:40:23 +0330 Subject: [PATCH 7/7] fix : typing hints updated --- ipspot/functions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ipspot/functions.py b/ipspot/functions.py index 1ff8fc7..e7b4bae 100644 --- a/ipspot/functions.py +++ b/ipspot/functions.py @@ -32,7 +32,7 @@ def init_poolmanager(self, connections: int, maxsize: int, block: bool = False, **kwargs ) - def _ipv4_socket_options(self) -> List[Tuple]: + def _ipv4_socket_options(self) -> list: """ Temporarily patches socket.getaddrinfo to filter only IPv4 addresses (AF_INET). @@ -40,7 +40,7 @@ def _ipv4_socket_options(self) -> List[Tuple]: """ original_getaddrinfo = socket.getaddrinfo - def ipv4_only_getaddrinfo(*args: Any, **kwargs: Any) -> List[Tuple]: + def ipv4_only_getaddrinfo(*args: list, **kwargs: dict) -> List[Tuple]: results = original_getaddrinfo(*args, **kwargs) return [res for res in results if res[0] == socket.AF_INET]