Skip to content

Commit f233e5f

Browse files
Fix asynchronous deletion of oversized corpus.
This approach should take about 150 MB of RAM and get to about 500 execs/sec. Also increase the amount of files we can delete. The previous approach was broken since the batch API was being invoked incorrectly.
1 parent ffdf67e commit f233e5f

File tree

3 files changed

+91
-81
lines changed

3 files changed

+91
-81
lines changed

src/clusterfuzz/_internal/bot/tasks/utasks/corpus_pruning_task.py

+38-45
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import os
2020
import random
2121
import shutil
22+
import time
2223
from typing import Dict
2324
from typing import List
2425
import zipfile
@@ -76,7 +77,7 @@
7677

7778
# Corpus files limit for cases when corpus pruning task failed in the last
7879
# execution.
79-
CORPUS_FILES_LIMIT_FOR_FAILURES = 50000
80+
CORPUS_FILES_LIMIT_FOR_FAILURES = 50_000
8081

8182
# Corpus total size limit for cases when corpus pruning task failed in the last
8283
# execution.
@@ -129,36 +130,36 @@ def _get_corpus_file_paths(corpus_path):
129130
async def _limit_corpus_sizes(corpus_urls):
130131
try:
131132
await asyncio.gather(
132-
*[_limit_corpus_size(corpus_url) for corpus_url in corpus_urls])
133+
*[_limit_corpus_size_async(corpus_url) for corpus_url in corpus_urls])
133134
except Exception as e:
134135
# Catch any unexpected exceptions
135136
logs.error(f"Error in _limit_corpus_size: {e}")
136137

137138

138-
async def _limit_corpus_size(corpus_url):
139-
"""Limits corpus size asynchronously."""
140-
logs.info('Limiting corpus size')
139+
async def _limit_corpus_size_async(corpus_url):
140+
"""Limits corpus size using async listing and deleting blobs one by one."""
141141
creds, _ = credentials.get_default()
142142
creds.refresh(google_auth_requests.Request())
143-
bucket = storage.get_bucket_name_and_path(corpus_url)[0]
143+
bucket, path = storage.get_bucket_name_and_path(corpus_url)
144+
logs.info(f'Limiting corpus size {corpus_url}')
144145

145-
semaphore = asyncio.Semaphore(20)
146+
deleting = False
147+
corpus_size = 0
148+
num_deleted = 0
149+
delete_tasks = []
146150

147-
async def delete_gcs_blobs_batch(session, bucket, blobs_to_delete, token):
148-
async with semaphore:
149-
return await delete_gcs_blobs_batch(session, bucket, blobs_to_delete,
150-
token)
151+
async def _delete_blob(name, session):
152+
await fast_http.delete_blob_async(bucket, name, session, creds.token)
151153

154+
# Create the aiohttp session once to reuse it for all requests
152155
async with aiohttp.ClientSession() as session:
156+
start_time = time.time()
153157
idx = 0
154-
deleting = False
155-
corpus_size = 0
156-
num_deleted = 0
157-
blobs_to_delete = []
158-
delete_tasks = []
159-
num_batches = 0
160-
for blob in storage.get_blobs_no_retry(corpus_url, recursive=True):
158+
num_deleted = 1
159+
async for blob in fast_http.list_blobs_async(bucket, path, creds.token):
161160
idx += 1
161+
if idx >= 5_000_000:
162+
break
162163
if not deleting:
163164
corpus_size += blob['size']
164165
if (idx >= CORPUS_FILES_LIMIT_FOR_FAILURES or
@@ -167,33 +168,25 @@ async def delete_gcs_blobs_batch(session, bucket, blobs_to_delete, token):
167168
continue
168169

169170
assert deleting
170-
blobs_to_delete.append(blob)
171-
if len(blobs_to_delete) == GOOGLE_CLOUD_MAX_BATCH_SIZE:
172-
task = asyncio.create_task(
173-
fast_http.delete_gcs_blobs_batch(session, bucket,
174-
blobs_to_delete.copy(),
175-
creds.token))
176-
delete_tasks.append(task)
177-
blobs_to_delete = []
178-
num_batches += 1
179-
if num_batches == 3_000_000 / GOOGLE_CLOUD_MAX_BATCH_SIZE:
180-
break
181-
182-
if blobs_to_delete:
183-
task = asyncio.create_task(
184-
delete_gcs_blobs_batch(session, bucket, blobs_to_delete.copy(),
185-
creds.token))
186-
delete_tasks.append(task)
187-
188-
results = await asyncio.gather(*delete_tasks)
189-
for task_success in results:
190-
if task_success:
191-
num_deleted += GOOGLE_CLOUD_MAX_BATCH_SIZE
192-
193-
if num_deleted:
194-
logs.info(f'Deleted over {num_deleted} corpus files.')
195-
else:
196-
logs.info('No need to limit corpus.')
171+
if idx % 20_000 == 0: # Arbitrary limit.
172+
logs.info(f'Deleting url {blob["name"]}')
173+
174+
if idx % 100_000 == 0:
175+
creds.refresh(google_auth_requests.Request())
176+
177+
delete_tasks.append(
178+
asyncio.create_task(_delete_blob(blob['name'], session)))
179+
num_deleted += 1
180+
if len(delete_tasks
181+
) >= 1000: # Arbitrary limit so we don't use too much RAM.
182+
# If *any* tasks complete, we can schedule more.
183+
_, pending = await asyncio.wait(
184+
delete_tasks, return_when=asyncio.FIRST_COMPLETED)
185+
delete_tasks = list(pending)
186+
187+
await asyncio.gather(*delete_tasks)
188+
logs.info(f'Deleted {num_deleted} blobs.')
189+
logs.info(f'Total time to delete blobs: {time.time() - start_time}')
197190

198191

199192
def _get_time_remaining(start_time):

src/clusterfuzz/_internal/fuzzing/corpus_manager.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -438,10 +438,11 @@ def rsync_from_disk(self,
438438
# Assert that we aren't making the very bad mistake of deleting the entire
439439
# corpus because we messed up our determination of which files were deleted
440440
# by libFuzzer during merge/pruning. We have to do this hacky <500 check
441-
# because we have many different kinds of corpuses (e.g. quarantine, regression)
442-
# but this check is for the main corpus.
441+
# because we have many different kinds of corpuses
442+
# (e.g. quarantine, regression) but this check is for the main corpus.
443443
assert ((len(filenames_to_delete) != len(
444-
self._filenames_to_delete_urls_mapping)) or len(filenames_to_delete) < 500)
444+
self._filenames_to_delete_urls_mapping)) or
445+
len(filenames_to_delete) < 500)
445446

446447
logs.info('Deleting files.')
447448
storage.delete_signed_urls(filenames_to_delete)

src/clusterfuzz/_internal/system/fast_http.py

+49-33
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,12 @@
1919
import urllib.parse
2020

2121
import aiohttp
22+
import google.api_core.exceptions
2223

2324
from clusterfuzz._internal.base import concurrency
24-
from clusterfuzz._internal.base import retry
2525
from clusterfuzz._internal.base import utils
2626
from clusterfuzz._internal.metrics import logs
2727

28-
BATCH_DELETE_URL = 'https://storage.googleapis.com/batch/storage/v1'
29-
30-
MULTIPART_BOUNDARY = 'multi-part-boundary'
31-
3228

3329
def download_urls(urls_and_filepaths: List[Tuple[str, str]]) -> List[bool]:
3430
"""Downloads multiple urls to filepaths in parallel and asynchronously.
@@ -91,37 +87,57 @@ async def _async_download_file(session: aiohttp.ClientSession, url: str,
9187
fp.write(chunk)
9288

9389

94-
@retry.wrap(
95-
retries=2,
96-
delay=1,
97-
function='system.fast_http.delete_gcs_blobs_batch',
98-
exception_types=[asyncio.TimeoutError],
99-
retry_on_false=True)
100-
async def delete_gcs_blobs_batch(session, bucket, blobs, auth_token):
101-
"""Batch deletes |blobs| asynchronously."""
90+
async def delete_blob_async(bucket_name, blob_name, session, auth_token):
91+
"""Asynchronously deletes a GCS blob."""
92+
blob_name = urllib.parse.quote(blob_name, safe='')
93+
url = (
94+
f'https://storage.googleapis.com/storage/v1/b/{bucket_name}/o/{blob_name}'
95+
)
10296
headers = {
10397
'Authorization': f'Bearer {auth_token}',
104-
'Content-Type': f'multipart/mixed; boundary={MULTIPART_BOUNDARY}'
10598
}
106-
# Build multipart body
107-
body = []
108-
bucket = urllib.parse.quote(bucket, safe='')
109-
for idx, blob in enumerate(blobs):
110-
path = urllib.parse.quote(blob['name'], safe='')
111-
body.append(f'--{MULTIPART_BOUNDARY}\r\n'
112-
'Content-Type: application/http\r\n'
113-
f'Content-ID: <item{idx+1}>\r\n\r\n'
114-
f'DELETE /storage/v1/b/{bucket}/o/{path} HTTP/1.1\r\n'
115-
'Content-Length: 0\r\n\r\n'
116-
'Host: storage.googleapis.com\r\n')
117-
body.append(f'--{MULTIPART_BOUNDARY}--\r\n')
118-
body = '\r\n'.join(body)
11999

120100
try:
121-
async with session.post(
122-
BATCH_DELETE_URL, headers=headers, data=body, timeout=25) as response:
123-
response.raise_for_status()
124-
return True
101+
async with session.delete(url, headers=headers) as response:
102+
if response.status != 204:
103+
response_text = await response.text()
104+
logs.error(f'Failed to delete blob {blob_name}. Status code: '
105+
f'{response.status} {response_text}')
106+
except google.api_core.exceptions.NotFound:
107+
logs.info(f'Not found: {blob_name} {response_text}')
125108
except Exception as e:
126-
logs.info(f'Failed to batch delete {e}')
127-
return False
109+
logs.error(f'Error deleting {blob_name}: {e}')
110+
111+
112+
async def list_blobs_async(bucket_name, path, auth_token):
113+
"""Asynchronously lists blobs, yielding dicts containing their size, updated
114+
time and name."""
115+
async with aiohttp.ClientSession() as session:
116+
url = f'https://storage.googleapis.com/storage/v1/b/{bucket_name}/o'
117+
params = {
118+
'prefix': path,
119+
'delimiter': '/',
120+
# Need token and save space in response.
121+
'fields': 'items(name,size,updated),nextPageToken'
122+
}
123+
while True:
124+
async with session.get(
125+
url, headers={'Authorization': f'Bearer {auth_token}'},
126+
params=params) as response:
127+
if response.status == 200:
128+
data = await response.json()
129+
items = data.get('items', [])
130+
for blob in items:
131+
yield {
132+
'size': int(blob['size']),
133+
'updated': blob['updated'],
134+
'name': blob['name'],
135+
}
136+
137+
next_page_token = data.get('nextPageToken')
138+
if not next_page_token:
139+
break
140+
params['pageToken'] = next_page_token
141+
else:
142+
logs.error(f'No blobsm, tatus code: {response.status}')
143+
break

0 commit comments

Comments
 (0)