1
+
1
2
# Copyright 2024 Google LLC
2
3
#
3
4
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -76,7 +77,7 @@ def count_unacked(creds, project_id, subscription_id):
76
77
return 0
77
78
78
79
79
- def get_cpu_limit_for_regions (creds , project : str , region : str ) -> int :
80
+ def get_cpu_usage (creds , project : str , region : str ) -> int :
80
81
"""Returns the number of available CPUs in the current GCE region."""
81
82
82
83
quotas = _get_quotas (creds , project , region )
@@ -110,8 +111,8 @@ def get_cpu_limit_for_regions(creds, project: str, region: str) -> int:
110
111
# We need this because us-central1 and us-east4 have different numbers of
111
112
# cores alloted to us in their quota. Treat them the same to simplify things.
112
113
limit = quota ['limit' ]
113
- limit -= quota [ 'usage' ]
114
- return min ( limit , 100_000 )
114
+ limit = min ( limit , 100_000 )
115
+ return limit , quota [ 'usage' ]
115
116
116
117
117
118
class BaseFuzzTaskScheduler :
@@ -250,33 +251,44 @@ def get_available_cpus(project: str, regions: List[str]) -> int:
250
251
# tasks (nor preemptible and non-preemptible CPUs). Fix this.
251
252
# Get total scheduled and queued.
252
253
creds = credentials .get_default ()[0 ]
253
- count_args = ((project , region ) for region in regions )
254
- with multiprocessing .Pool (2 ) as pool :
255
- # These calls are extremely slow (about 1 minute total).
256
- result = pool .starmap_async ( # pylint: disable=no-member
257
- batch .count_queued_or_scheduled_tasks , count_args )
258
- waiting_tasks = count_unacked (creds , project , 'preprocess' )
259
- waiting_tasks += count_unacked (creds , project , 'utask_main' )
260
- region_counts = zip (* result .get ()) # Group all queued and all scheduled.
261
-
262
- # Add up all queued and scheduled.
263
- region_counts = [sum (tup ) for tup in region_counts ]
264
- logs .info (f'Region counts: { region_counts } ' )
265
- if region_counts [0 ] > 50_000 :
266
- # Check queued tasks.
267
- logs .info ('Too many jobs queued, not scheduling more fuzzing.' )
268
- return 0
269
- waiting_tasks += sum (region_counts ) # Add up queued and scheduled.
270
- soon_occupied_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB
271
- logs .info (f'Soon occupied CPUs: { soon_occupied_cpus } ' )
272
- cpu_limit = int (
273
- sum (
274
- get_cpu_limit_for_regions (creds , project , region )
275
- for region in regions ) * CPU_BUFFER_MULTIPLIER )
254
+
255
+ target = 0
256
+ usage = 0
257
+ for region in regions :
258
+ region_target , region_usage = get_cpu_usage (creds , project , region )
259
+ target += region_target
260
+ usage += region_usage
261
+ waiting_tasks = (
262
+ count_unacked (creds , project , 'preprocess' ) + count_unacked (
263
+ creds , project , 'utask_main' ))
264
+
265
+ if usage + waiting_tasks * CPUS_PER_FUZZ_JOB > .95 * target :
266
+ # Only worry about queueing build up if we are above 95% utilization.
267
+ count_args = ((project , region ) for region in regions )
268
+ with multiprocessing .Pool (2 ) as pool :
269
+ target *= CPU_BUFFER_MULTIPLIER
270
+ # These calls are extremely slow (about 30 minutes total).
271
+ result = pool .starmap_async ( # pylint: disable=no-member
272
+ batch .count_queued_or_scheduled_tasks , count_args )
273
+
274
+ region_counts = zip (* result .get ()) # Group all queued and all scheduled.
275
+ # Add up all queued and scheduled.
276
+ region_counts = [sum (tup ) for tup in region_counts ]
277
+ logs .info (f'QUEUED/SCHEDULED tasks per region: { region_counts } ' )
278
+ if region_counts [0 ] > 10_000 :
279
+ # Check queued tasks.
280
+ logs .info ('Too many jobs queued, not scheduling more fuzzing.' )
281
+ return 0
282
+ waiting_tasks += sum (region_counts ) # Add up queued and scheduled.
283
+ else :
284
+ logs .info ('Skipping getting tasks.' )
285
+
286
+ occupied_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB + usage
287
+ logs .info (f'Soon or currently occupied CPUs: { occupied_cpus } ' )
276
288
277
289
logs .info ('Actually free CPUs (before subtracting soon '
278
- f'occupied): { cpu_limit } ' )
279
- available_cpus = max (cpu_limit - soon_occupied_cpus , 0 )
290
+ f'occupied): { target } ' )
291
+ available_cpus = max (target - occupied_cpus , 0 )
280
292
281
293
# Don't schedule more than 50K tasks at once. So we don't overload batch.
282
294
# This number is arbitrary, but we aren't at full capacity at lower numbers.
@@ -293,7 +305,7 @@ def schedule_fuzz_tasks() -> bool:
293
305
regions = get_batch_regions (batch_config )
294
306
start = time .time ()
295
307
available_cpus = get_available_cpus (project , regions )
296
- logs .error (f'{ available_cpus } available CPUs.' )
308
+ logs .info (f'{ available_cpus } available CPUs.' )
297
309
if not available_cpus :
298
310
return False
299
311
0 commit comments