Skip to content

Commit 0b26fa5

Browse files
committed
add parallel processing
1 parent 55a2831 commit 0b26fa5

File tree

5 files changed

+74
-39
lines changed

5 files changed

+74
-39
lines changed

requirements-dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
openstacksdk==4.2.0
2+
httpx==0.28.1
23
prometheus_client==0.21.1
34
pytest<9.0.0,>=8.0.0
45
pytest-cov==6.0.0

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
openstacksdk==4.2.0
2+
httpx==0.28.1
23
prometheus_client==0.21.1

src/collectors/instances_per_hypervisor.py

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,43 +3,49 @@
33
import time
44
import logging
55
from util import instances_per_hypervisor
6+
import asyncio
67

78
logger = logging.getLogger(__name__)
89

910

1011
class InstancesPerHypervisorCollector(Collector):
11-
def __init__(self, cloud_name):
12+
def __init__(self, cloud_name, cache_time):
1213
super().__init__()
1314
self.cloud_name = cloud_name
1415
self.metrics = []
1516
self.last_update = 0
16-
self.cache_time = 60
17+
self.cache_time = cache_time
18+
self.gauge = None
1719

1820
def _fetch_metrics(self):
21+
"""Fetch metrics from OpenStack and update the gauge."""
1922
try:
20-
new_metrics = instances_per_hypervisor.export_metrics(self.cloud_name)
21-
self.metrics = new_metrics
22-
self.last_update = time.time()
23+
# Clear previous metrics
24+
self.gauge = GaugeMetricFamily(
25+
"openstack_peepo_exporter_instances_per_hypervisor",
26+
"Number of instances per hypervisor",
27+
labels=["hypervisor_name", "hypervisor_id"],
28+
)
29+
30+
new_metrics = asyncio.run(
31+
instances_per_hypervisor.export_metrics(self.cloud_name)
32+
)
2333
logger.info(f"Updated metrics with {len(new_metrics)} hypervisors")
2434

35+
# Update the gauge with new values
36+
for metric in new_metrics:
37+
self.gauge.add_metric(
38+
[metric["hypervisor_name"], metric["hypervisor_id"]],
39+
metric["instance_count"],
40+
)
41+
42+
# Update last_update timestamp
43+
self.last_update = time.time()
2544
except Exception as e:
26-
logger.error(f"Error updating metrics: {str(e)}")
27-
raise
45+
logger.error(f"Error updating metrics: {e}")
2846

2947
def collect(self):
3048
if time.time() - self.last_update > self.cache_time:
3149
self._fetch_metrics()
3250

33-
gauge = GaugeMetricFamily(
34-
"openstack_peepo_exporter_instances_per_hypervisor",
35-
"Number of instances per hypervisor",
36-
labels=["hypervisor_name", "hypervisor_id"],
37-
)
38-
39-
for metric in self.metrics:
40-
gauge.add_metric(
41-
[metric["hypervisor_name"], metric["hypervisor_id"]],
42-
metric["instance_count"],
43-
)
44-
45-
yield gauge
51+
yield self.gauge

src/main.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,17 @@
3131
parser.add_argument(
3232
"--cloud", default=os.environ.get("OS_CLOUD"), help="The name of the cloud."
3333
)
34+
parser.add_argument(
35+
"--cache-time",
36+
default=os.environ.get("CACHE_TIME", 60),
37+
type=int,
38+
help="The time to cache the metrics in seconds.",
39+
)
3440
args = parser.parse_args()
3541

3642
# Register the collector
3743
registry = CollectorRegistry()
38-
registry.register(InstancesPerHypervisorCollector(args.cloud))
44+
registry.register(InstancesPerHypervisorCollector(args.cloud, args.cache_time))
3945

4046
# Start the metrics server
4147
start_http_server(args.port, args.addr, registry)
Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,49 @@
11
import openstack
2+
import httpx
3+
import asyncio
4+
import logging
5+
import time
26

7+
logger = logging.getLogger(__name__)
38

4-
def export_metrics(cloud_name: str):
9+
10+
async def fetch_allocations(client, rp_id):
11+
response = await client.get(f"/resource_providers/{rp_id}/allocations")
12+
return rp_id, response.json()
13+
14+
15+
async def export_metrics(cloud_name: str):
516
print(f"exporting metrics for {cloud_name}")
617
conn = openstack.connect(cloud=cloud_name)
18+
headers = conn.session.get_auth_headers()
19+
20+
placement_endpoint = conn.session.get_endpoint(
21+
service_type="placement", interface="public"
22+
)
723

824
instance_counts = []
25+
async with httpx.AsyncClient(
26+
base_url=placement_endpoint, headers=headers
27+
) as client:
28+
rps = list(conn.placement.resource_providers())
29+
tasks = [fetch_allocations(client, rp.id) for rp in rps]
30+
start_time = time.time()
31+
results = await asyncio.gather(*tasks)
32+
end_time = time.time()
33+
logger.info(
34+
f"Time taken to fetch {len(rps)} allocations: {end_time - start_time} seconds"
35+
)
36+
37+
for rp in rps:
38+
allocations = next((r[1] for r in results if r[0] == rp.id), {})
39+
instance_count = len(allocations.get("allocations", {}))
940

10-
for rp in conn.placement.resource_providers():
11-
print(f"rp: {rp}")
12-
hypervisor_id = rp.id
13-
hypervisor_name = rp.name
14-
15-
allocations = conn.placement.get(
16-
f"/resource_providers/{rp.id}/allocations"
17-
).json()
18-
instance_count = len(allocations.get("allocations", {}))
19-
data = {
20-
"hypervisor_id": hypervisor_id,
21-
"hypervisor_name": hypervisor_name,
22-
"instance_count": instance_count,
23-
}
24-
instance_counts.append(data)
25-
26-
print(f"Hypervisor: {hypervisor_name}, Instances: {instance_count}")
41+
data = {
42+
"hypervisor_id": rp.id,
43+
"hypervisor_name": rp.name,
44+
"instance_count": instance_count,
45+
}
46+
instance_counts.append(data)
47+
logger.debug(f"Hypervisor: {rp.name}, Instances: {instance_count}")
2748

2849
return instance_counts

0 commit comments

Comments
 (0)