Skip to content

Commit dea4abe

Browse files
committed
Per file progress meter
1 parent a12df22 commit dea4abe

File tree

1 file changed

+96
-33
lines changed

1 file changed

+96
-33
lines changed

src/cli/fuzzer_bridge/cli.py

Lines changed: 96 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,49 @@ def process_single_file(
6969
return fixtures
7070

7171

72+
def process_single_file_worker(
73+
file_info: Tuple[Path, Path],
74+
fork: Optional[str],
75+
pretty: bool,
76+
merge: bool,
77+
evm_bin: Optional[Path],
78+
) -> Tuple[Optional[Tuple[Path, Dict[str, Any]]], Optional[Tuple[Path, Exception]]]:
79+
"""Process a single file in a worker process."""
80+
json_file_path, output_file = file_info
81+
82+
# Create transition tool per worker (cached per process)
83+
if not hasattr(process_single_file_worker, '_t8n'):
84+
t8n = GethTransitionTool(binary=evm_bin) if evm_bin else GethTransitionTool()
85+
process_single_file_worker._t8n = t8n
86+
process_single_file_worker._builder = BlocktestBuilder(t8n)
87+
88+
builder = process_single_file_worker._builder
89+
90+
try:
91+
with open(json_file_path) as f:
92+
fuzzer_data = json.load(f)
93+
94+
# Override fork if specified
95+
if fork:
96+
fuzzer_data["fork"] = fork
97+
98+
# Build blocktest
99+
blocktest = builder.build_blocktest(fuzzer_data)
100+
test_name = generate_test_name(json_file_path)
101+
fixtures = {test_name: blocktest}
102+
103+
if not merge:
104+
# Write individual file preserving structure
105+
output_file.parent.mkdir(parents=True, exist_ok=True)
106+
json_kwargs = {"indent": 2} if pretty else {}
107+
with open(output_file, "w") as f:
108+
json.dump(fixtures, f, **json_kwargs)
109+
110+
return (json_file_path, fixtures), None
111+
except Exception as e:
112+
return None, (json_file_path, e)
113+
114+
72115
def process_file_batch(
73116
file_batch: list[Tuple[Path, Path]],
74117
fork: Optional[str],
@@ -143,58 +186,77 @@ def process_directory_parallel(
143186
if num_workers is None:
144187
num_workers = min(mp.cpu_count(), max(1, file_count // 10))
145188

146-
# Batch files for workers (balanced batches)
147-
# More smaller batches for better load balancing
148-
batch_size = max(1, file_count // (num_workers * 4))
149-
file_batches = []
150-
for i in range(0, file_count, batch_size):
151-
batch = files_to_process[i:i + batch_size]
152-
file_batches.append(batch)
153-
154189
success_count = 0
155190
error_count = 0
156191

157192
with Progress(
158-
TextColumn("[bold cyan]Processing with {task.fields[workers]} workers", justify="left"),
193+
TextColumn("[bold cyan]{task.fields[filename]}", justify="left"),
159194
BarColumn(bar_width=None, complete_style="green3", finished_style="bold green3"),
160195
TaskProgressColumn(),
196+
TextColumn("[dim]({task.fields[workers]} workers)[/dim]"),
161197
TimeElapsedColumn(),
162198
expand=True,
163199
disable=quiet,
164200
) as progress:
165201
task_id = progress.add_task(
166-
"Processing", total=file_count, workers=num_workers
202+
"Processing", total=file_count, filename="Starting...", workers=num_workers
167203
)
168204

169-
# Process batches in parallel
170-
process_batch_func = partial(
171-
process_file_batch,
205+
# Process files individually in parallel (better progress tracking)
206+
process_func = partial(
207+
process_single_file_worker,
172208
fork=fork,
173209
pretty=pretty,
174210
merge=merge,
175211
evm_bin=evm_bin,
176212
)
177213

178214
with ProcessPoolExecutor(max_workers=num_workers) as executor:
179-
futures = {executor.submit(process_batch_func, batch): batch for batch in file_batches}
180-
181-
for future in as_completed(futures):
182-
results, errors = future.result()
183-
184-
# Update progress and collect results
185-
progress.update(task_id, advance=len(results) + len(errors))
186-
success_count += len(results)
187-
error_count += len(errors)
188-
189-
# Collect fixtures for merging
190-
if merge:
191-
for _, fixtures in results:
192-
all_fixtures.update(fixtures)
193-
194-
# Report errors
195-
if not quiet:
196-
for file_path, error in errors:
197-
progress.console.print(f"[red]Error processing {file_path}: {error}[/red]")
215+
# Submit all files to the pool
216+
futures_to_files = {
217+
executor.submit(process_func, file_info): file_info[0]
218+
for file_info in files_to_process
219+
}
220+
221+
# Process completions as they happen for real-time progress
222+
for future in as_completed(futures_to_files):
223+
file_path = futures_to_files[future]
224+
225+
# Update progress with current file
226+
rel_path = file_path.relative_to(input_dir)
227+
display_name = str(rel_path)
228+
if len(display_name) > 40:
229+
display_name = "..." + display_name[-37:]
230+
231+
try:
232+
result, error = future.result()
233+
234+
if result:
235+
success_count += 1
236+
_, fixtures = result
237+
if merge:
238+
all_fixtures.update(fixtures)
239+
elif error:
240+
error_count += 1
241+
error_file, exception = error
242+
if not quiet:
243+
progress.console.print(
244+
f"[red]Error processing {error_file}: {exception}[/red]"
245+
)
246+
247+
# Update progress bar
248+
progress.update(
249+
task_id,
250+
advance=1,
251+
filename=display_name,
252+
workers=num_workers
253+
)
254+
255+
except Exception as e:
256+
error_count += 1
257+
if not quiet:
258+
progress.console.print(f"[red]Worker error for {file_path}: {e}[/red]")
259+
progress.update(task_id, advance=1, filename=display_name)
198260

199261
# Write merged file if requested
200262
if merge and all_fixtures:
@@ -211,7 +273,8 @@ def process_directory_parallel(
211273
progress.update(
212274
task_id,
213275
completed=file_count,
214-
workers=f"Done! {success_count} succeeded, {error_count} failed {emoji}",
276+
filename=f"Done! {success_count} succeeded, {error_count} failed {emoji}",
277+
workers=num_workers
215278
)
216279

217280

0 commit comments

Comments
 (0)