From df2ae6a08b8cbde38e0cc6d27f033efff77f0854 Mon Sep 17 00:00:00 2001 From: Ilia Baikov Date: Tue, 28 Jan 2025 02:29:45 +0300 Subject: [PATCH 1/2] parallel processing initial impl --- src/collectors/instances_per_hypervisor.py | 34 +++++++------- src/util/instances_per_hypervisor.py | 53 +++++++++++++++------- 2 files changed, 52 insertions(+), 35 deletions(-) diff --git a/src/collectors/instances_per_hypervisor.py b/src/collectors/instances_per_hypervisor.py index 6ce50da..285cbf3 100644 --- a/src/collectors/instances_per_hypervisor.py +++ b/src/collectors/instances_per_hypervisor.py @@ -3,6 +3,7 @@ import time import logging from util import instances_per_hypervisor +import asyncio logger = logging.getLogger(__name__) @@ -13,34 +14,31 @@ def __init__(self, cloud_name): self.metrics = [] self.last_update = 0 self.cache_time = 60 + self.gauge = GaugeMetricFamily( + 'openstack_peepo_exporter_instances_per_hypervisor', + 'Number of instances per hypervisor', + labels=['hypervisor_name', 'hypervisor_id'] + ) def _fetch_metrics(self): + """Fetch metrics from OpenStack and update the gauge.""" try: - new_metrics = instances_per_hypervisor.export_metrics(self.cloud_name) - self.metrics = new_metrics - self.last_update = time.time() + new_metrics = asyncio.run(instances_per_hypervisor.export_metrics(self.cloud_name)) logger.info(f"Updated metrics with {len(new_metrics)} hypervisors") + # Update the gauge with new values + for metric in new_metrics: + self.gauge.add_metric( + [metric['hypervisor_name'], metric['hypervisor_id']], + metric['instance_count'] + ) except Exception as e: - logger.error(f"Error updating metrics: {str(e)}") - raise + logger.error(f"Error updating metrics: {e}") def collect(self): if time.time() - self.last_update > self.cache_time: self._fetch_metrics() - gauge = GaugeMetricFamily( - 'openstack_peepo_exporter_instances_per_hypervisor', - 'Number of instances per hypervisor', - labels=['hypervisor_name', 'hypervisor_id'] - ) - - for metric in self.metrics: - gauge.add_metric( - [metric['hypervisor_name'], metric['hypervisor_id']], - metric['instance_count'] - ) - - yield gauge + yield self.gauge diff --git a/src/util/instances_per_hypervisor.py b/src/util/instances_per_hypervisor.py index f2095fc..f45f8f3 100644 --- a/src/util/instances_per_hypervisor.py +++ b/src/util/instances_per_hypervisor.py @@ -1,26 +1,45 @@ import openstack +import httpx +import asyncio +import logging +import time -def export_metrics(cloud_name: str): +logger = logging.getLogger(__name__) + +async def fetch_allocations(client, rp_id): + response = await client.get(f'/resource_providers/{rp_id}/allocations') + return rp_id, response.json() + +async def export_metrics(cloud_name: str): print(f"exporting metrics for {cloud_name}") conn = openstack.connect(cloud=cloud_name) - + headers = conn.session.get_auth_headers() + + placement_endpoint = conn.session.get_endpoint( + service_type='placement', + interface='public' + ) + instance_counts = [] - - for rp in conn.placement.resource_providers(): - print(f"rp: {rp}") - hypervisor_id = rp.id - hypervisor_name = rp.name - - allocations = conn.placement.get(f'/resource_providers/{rp.id}/allocations').json() - instance_count = len(allocations.get('allocations', {})) - data = { - 'hypervisor_id': hypervisor_id, - 'hypervisor_name': hypervisor_name, - 'instance_count': instance_count - } - instance_counts.append(data) + async with httpx.AsyncClient(base_url=placement_endpoint, headers=headers) as client: + rps = list(conn.placement.resource_providers()) + tasks = [fetch_allocations(client, rp.id) for rp in rps] + start_time = time.time() + results = await asyncio.gather(*tasks) + end_time = time.time() + logger.info(f"Time taken to fetch {len(rps)} allocations: {end_time - start_time} seconds") - print(f"Hypervisor: {hypervisor_name}, Instances: {instance_count}") + for rp in rps: + rp_id, allocations = next((r for r in results if r[0] == rp.id), (None, {})) + instance_count = len(allocations.get('allocations', {})) + + data = { + 'hypervisor_id': rp.id, + 'hypervisor_name': rp.name, + 'instance_count': instance_count + } + instance_counts.append(data) + logger.debug(f"Hypervisor: {rp.name}, Instances: {instance_count}") return instance_counts From 2430dd372de1b3089d3747b6c729bd75d334a7fb Mon Sep 17 00:00:00 2001 From: Ilia Baikov Date: Tue, 28 Jan 2025 09:30:42 +0300 Subject: [PATCH 2/2] parallel placement calls initial impl --- requirements-dev.txt | 1 + requirements.txt | 1 + src/collectors/instances_per_hypervisor.py | 64 +++++++--------------- src/main.py | 8 ++- src/util/instances_per_hypervisor.py | 33 ++++++----- 5 files changed, 47 insertions(+), 60 deletions(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index c7cbc78..dd8124a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,5 +1,6 @@ openstacksdk==4.2.0 prometheus_client==0.21.1 +httpx==0.28.1 pytest<9.0.0,>=8.0.0 pytest-cov==6.0.0 pytest-xdist==3.5.0 diff --git a/requirements.txt b/requirements.txt index aa8580a..f0eb8c1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ openstacksdk==4.2.0 +httpx==0.28.1 prometheus_client==0.21.1 diff --git a/src/collectors/instances_per_hypervisor.py b/src/collectors/instances_per_hypervisor.py index 3c44d69..ac27e18 100644 --- a/src/collectors/instances_per_hypervisor.py +++ b/src/collectors/instances_per_hypervisor.py @@ -9,69 +9,43 @@ class InstancesPerHypervisorCollector(Collector): - def __init__(self, cloud_name): + def __init__(self, cloud_name, cache_time): super().__init__() self.cloud_name = cloud_name self.metrics = [] self.last_update = 0 - self.cache_time = 60 -<<<<<<< HEAD - self.gauge = GaugeMetricFamily( - 'openstack_peepo_exporter_instances_per_hypervisor', - 'Number of instances per hypervisor', - labels=['hypervisor_name', 'hypervisor_id'] - ) + self.cache_time = cache_time + self.gauge = None def _fetch_metrics(self): """Fetch metrics from OpenStack and update the gauge.""" try: - new_metrics = asyncio.run(instances_per_hypervisor.export_metrics(self.cloud_name)) + # Clear previous metrics + self.gauge = GaugeMetricFamily( + "openstack_peepo_exporter_instances_per_hypervisor", + "Number of instances per hypervisor", + labels=["hypervisor_name", "hypervisor_id"], + ) + + new_metrics = asyncio.run( + instances_per_hypervisor.export_metrics(self.cloud_name) + ) logger.info(f"Updated metrics with {len(new_metrics)} hypervisors") - + # Update the gauge with new values for metric in new_metrics: self.gauge.add_metric( - [metric['hypervisor_name'], metric['hypervisor_id']], - metric['instance_count'] + [metric["hypervisor_name"], metric["hypervisor_id"]], + metric["instance_count"], ) - except Exception as e: - logger.error(f"Error updating metrics: {e}") - - def collect(self): - if time.time() - self.last_update > self.cache_time: - self._fetch_metrics() - - yield self.gauge - -======= - - def _fetch_metrics(self): - try: - new_metrics = instances_per_hypervisor.export_metrics(self.cloud_name) - self.metrics = new_metrics + # Update last_update timestamp self.last_update = time.time() - logger.info(f"Updated metrics with {len(new_metrics)} hypervisors") - except Exception as e: - logger.error(f"Error updating metrics: {str(e)}") - raise + logger.error(f"Error updating metrics: {e}") def collect(self): if time.time() - self.last_update > self.cache_time: self._fetch_metrics() - gauge = GaugeMetricFamily( - "openstack_peepo_exporter_instances_per_hypervisor", - "Number of instances per hypervisor", - labels=["hypervisor_name", "hypervisor_id"], - ) - - for metric in self.metrics: - gauge.add_metric( - [metric["hypervisor_name"], metric["hypervisor_id"]], - metric["instance_count"], - ) - - yield gauge ->>>>>>> main + yield self.gauge diff --git a/src/main.py b/src/main.py index 2e1af8b..9772822 100644 --- a/src/main.py +++ b/src/main.py @@ -31,11 +31,17 @@ parser.add_argument( "--cloud", default=os.environ.get("OS_CLOUD"), help="The name of the cloud." ) + parser.add_argument( + "--cache-time", + default=os.environ.get("CACHE_TIME", 60), + type=int, + help="The time to cache the metrics in seconds.", + ) args = parser.parse_args() # Register the collector registry = CollectorRegistry() - registry.register(InstancesPerHypervisorCollector(args.cloud)) + registry.register(InstancesPerHypervisorCollector(args.cloud, args.cache_time)) # Start the metrics server start_http_server(args.port, args.addr, registry) diff --git a/src/util/instances_per_hypervisor.py b/src/util/instances_per_hypervisor.py index cd83e22..5e18316 100644 --- a/src/util/instances_per_hypervisor.py +++ b/src/util/instances_per_hypervisor.py @@ -6,37 +6,42 @@ logger = logging.getLogger(__name__) + async def fetch_allocations(client, rp_id): - response = await client.get(f'/resource_providers/{rp_id}/allocations') + response = await client.get(f"/resource_providers/{rp_id}/allocations") return rp_id, response.json() + async def export_metrics(cloud_name: str): print(f"exporting metrics for {cloud_name}") conn = openstack.connect(cloud=cloud_name) headers = conn.session.get_auth_headers() - + placement_endpoint = conn.session.get_endpoint( - service_type='placement', - interface='public' + service_type="placement", interface="public" ) - + instance_counts = [] - async with httpx.AsyncClient(base_url=placement_endpoint, headers=headers) as client: + async with httpx.AsyncClient( + base_url=placement_endpoint, headers=headers + ) as client: rps = list(conn.placement.resource_providers()) tasks = [fetch_allocations(client, rp.id) for rp in rps] start_time = time.time() results = await asyncio.gather(*tasks) end_time = time.time() - logger.info(f"Time taken to fetch {len(rps)} allocations: {end_time - start_time} seconds") - + logger.info( + f"Time taken to fetch {len(rps)} allocations: {end_time - start_time} seconds" + ) + for rp in rps: - rp_id, allocations = next((r for r in results if r[0] == rp.id), (None, {})) - instance_count = len(allocations.get('allocations', {})) - + allocations = next((r[1] for r in results if r[0] == rp.id), {}) + instance_count = len(allocations.get("allocations", {})) + data = { - 'hypervisor_id': rp.id, - 'hypervisor_name': rp.name, - 'instance_count': instance_count + "hypervisor_id": rp.id, + "hypervisor_name": rp.name, + "instance_count": instance_count, } instance_counts.append(data) logger.debug(f"Hypervisor: {rp.name}, Instances: {instance_count}")