Skip to content

Commit ea66f3b

Browse files
author
Vitaliy Zakaznikov
committed
Fixing #61
1 parent 58fcbe9 commit ea66f3b

File tree

2 files changed

+101
-73
lines changed

2 files changed

+101
-73
lines changed

testflows/github/hetzner/runners/metrics.py

Lines changed: 56 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import logging
1818
from datetime import datetime
1919

20+
from github.WorkflowRun import WorkflowRun, WorkflowRunJob
2021
from prometheus_client import Counter, Gauge, Histogram, Info
2122
from .estimate import get_server_price
2223
from .constants import standby_server_name_prefix, recycle_server_name_prefix
@@ -656,7 +657,7 @@ def update_runners(
656657
).set(count)
657658

658659

659-
def update_jobs(workflow_runs):
660+
def update_jobs(run_jobs: list[(WorkflowRun, WorkflowRunJob)]):
660661
"""Update all job-related metrics."""
661662
queued_count = 0
662663
running_count = 0
@@ -670,66 +671,66 @@ def update_jobs(workflow_runs):
670671
RUNNING_JOB_LABELS._metrics.clear()
671672
RUNNING_JOB_TIME._metrics.clear()
672673

673-
for run in workflow_runs:
674-
for job in run.jobs():
675-
# Normalize job status
676-
status = normalize_status(job)
674+
for run, job in run_jobs:
675+
676+
# Normalize job status
677+
status = normalize_status(job)
678+
679+
job_info = {
680+
"name": job.name,
681+
"workflow_name": run.name,
682+
"repository": run.repository.full_name,
683+
"status": status,
684+
"queued_at": job.raw_data.get("started_at", ""),
685+
"run_attempt": str(run.run_attempt),
686+
"run_number": str(run.run_number),
687+
"head_branch": run.head_branch or "",
688+
"head_sha": run.head_sha or "",
689+
}
690+
691+
if status == "queued":
692+
queued_count += 1
693+
# Track detailed job info
694+
QUEUED_JOB_INFO.labels(job_id=str(job.id), run_id=str(run.id)).info(
695+
job_info
696+
)
677697

678-
job_info = {
679-
"name": job.name,
680-
"workflow_name": run.name,
681-
"repository": run.repository.full_name,
682-
"status": status,
683-
"queued_at": job.raw_data.get("started_at", ""),
684-
"run_attempt": str(run.run_attempt),
685-
"run_number": str(run.run_number),
686-
"head_branch": run.head_branch or "",
687-
"head_sha": run.head_sha or "",
688-
}
698+
# Track job labels
699+
for label in job.raw_data.get("labels", []):
700+
QUEUED_JOB_LABELS.labels(
701+
job_id=str(job.id), run_id=str(run.id), label=label.lower()
702+
).set(1)
689703

690-
if status == "queued":
691-
queued_count += 1
692-
# Track detailed job info
693-
QUEUED_JOB_INFO.labels(job_id=str(job.id), run_id=str(run.id)).info(
694-
job_info
704+
# Track job wait time
705+
started_at = job.raw_data.get("started_at")
706+
if started_at:
707+
started_at = dateutil.parser.parse(started_at)
708+
wait_time = current_time - started_at.timestamp()
709+
QUEUED_JOB_WAIT_TIME.labels(job_id=str(job.id), run_id=str(run.id)).set(
710+
wait_time
695711
)
696712

697-
# Track job labels
698-
for label in job.raw_data.get("labels", []):
699-
QUEUED_JOB_LABELS.labels(
700-
job_id=str(job.id), run_id=str(run.id), label=label.lower()
701-
).set(1)
702-
703-
# Track job wait time
704-
started_at = job.raw_data.get("started_at")
705-
if started_at:
706-
started_at = dateutil.parser.parse(started_at)
707-
wait_time = current_time - started_at.timestamp()
708-
QUEUED_JOB_WAIT_TIME.labels(
709-
job_id=str(job.id), run_id=str(run.id)
710-
).set(wait_time)
711-
712-
elif status == "in_progress":
713-
running_count += 1
714-
# Track detailed job info
715-
RUNNING_JOB_INFO.labels(job_id=str(job.id), run_id=str(run.id)).info(
716-
job_info
717-
)
713+
elif status == "in_progress":
714+
running_count += 1
715+
# Track detailed job info
716+
RUNNING_JOB_INFO.labels(job_id=str(job.id), run_id=str(run.id)).info(
717+
job_info
718+
)
718719

719-
# Track job labels
720-
for label in job.raw_data.get("labels", []):
721-
RUNNING_JOB_LABELS.labels(
722-
job_id=str(job.id), run_id=str(run.id), label=label.lower()
723-
).set(1)
720+
# Track job labels
721+
for label in job.raw_data.get("labels", []):
722+
RUNNING_JOB_LABELS.labels(
723+
job_id=str(job.id), run_id=str(run.id), label=label.lower()
724+
).set(1)
724725

725-
# Track job run time
726-
started_at = job.raw_data.get("started_at")
727-
if started_at:
728-
started_at = dateutil.parser.parse(started_at)
729-
run_time = current_time - started_at.timestamp()
730-
RUNNING_JOB_TIME.labels(job_id=str(job.id), run_id=str(run.id)).set(
731-
run_time
732-
)
726+
# Track job run time
727+
started_at = job.raw_data.get("started_at")
728+
if started_at:
729+
started_at = dateutil.parser.parse(started_at)
730+
run_time = current_time - started_at.timestamp()
731+
RUNNING_JOB_TIME.labels(job_id=str(job.id), run_id=str(run.id)).set(
732+
run_time
733+
)
733734

734735
QUEUED_JOBS.set(queued_count)
735736
RUNNING_JOBS.set(running_count)

testflows/github/hetzner/runners/scale_up.py

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,38 @@ def raise_exception(exc):
353353
raise exc
354354

355355

356+
def get_job_labels(job):
357+
"""Get job labels."""
358+
return set([label.lower() for label in job.raw_data["labels"]])
359+
360+
361+
def job_matches_labels(job_labels, with_label):
362+
"""Check if job matches with_label criteria."""
363+
if with_label is None:
364+
return True
365+
366+
for label in with_label:
367+
if not label.lower() in job_labels:
368+
return (False, label)
369+
370+
return True
371+
372+
373+
def filtered_run_jobs(workflow_runs, with_label):
374+
"""Filter jobs to select only queued or in progress and match with_label criteria."""
375+
run_jobs = []
376+
for run in workflow_runs:
377+
for job in run.jobs():
378+
if job.status == "completed":
379+
continue
380+
if not (job.status == "in_progress" or job.status == "queued"):
381+
continue
382+
labels = get_job_labels(job)
383+
if job_matches_labels(labels, with_label) is True:
384+
run_jobs.append((run, job))
385+
return run_jobs
386+
387+
356388
def create_server(
357389
hetzner_token: str,
358390
setup_worker_pool: ThreadPoolExecutor,
@@ -854,8 +886,10 @@ def create_runner_server(
854886
in_progress_runs = list(
855887
repo.get_workflow_runs(status="in_progress")
856888
)
857-
# Update job metrics with all runs
858-
metrics.update_jobs(queued_runs + in_progress_runs)
889+
# Update job metrics using only queued or in progress runs that match with_label criteria
890+
metrics.update_jobs(
891+
filtered_run_jobs(queued_runs + in_progress_runs, with_label)
892+
)
859893
# For job processing, we'll use only queued runs
860894
workflow_runs = queued_runs
861895

@@ -952,10 +986,7 @@ def create_runner_server(
952986
):
953987
pass
954988

955-
labels = set(
956-
[label.lower() for label in job.raw_data["labels"]]
957-
)
958-
989+
labels = get_job_labels(job)
959990
server_name = (
960991
f"{server_name_prefix}{job.run_id}-{job.id}"
961992
)
@@ -1008,18 +1039,14 @@ def create_runner_server(
10081039
):
10091040
break
10101041

1011-
if with_label is not None:
1012-
found_all_with_labels = True
1013-
for label in with_label:
1014-
if not label.lower() in labels:
1015-
found_all_with_labels = False
1016-
with Action(
1017-
f"Skipping {job} with {labels} as it is missing label '{label}'",
1018-
server_name=server_name,
1019-
interval=interval,
1020-
):
1021-
break
1022-
if not found_all_with_labels:
1042+
result = job_matches_labels(labels, with_label)
1043+
if result is not True:
1044+
_, missing_label = result
1045+
with Action(
1046+
f"Skipping {job} with {labels} as it is missing label '{missing_label}'",
1047+
server_name=server_name,
1048+
interval=interval,
1049+
):
10231050
continue
10241051

10251052
with Action(

0 commit comments

Comments
 (0)