Skip to content

Commit 5f042f1

Browse files
fix(executor): simplify process_futures calls and progress bar updates
- Removed optional progress bar parameter from process_futures. - Updated calls to process_futures to handle progress bar updates directly. - Cleaned up error handling logic for results in async processing.
1 parent 837ae73 commit 5f042f1

File tree

2 files changed

+11
-33
lines changed

2 files changed

+11
-33
lines changed

ragas/src/ragas/async_utils.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import logging
55
import typing as t
66

7-
from tqdm.auto import tqdm
8-
97
logger = logging.getLogger(__name__)
108

119

@@ -58,14 +56,13 @@ async def sema_coro(coro):
5856

5957

6058
async def process_futures(
61-
futures: t.Iterator[asyncio.Future], pbar: t.Optional[tqdm] = None
59+
futures: t.Iterator[asyncio.Future],
6260
) -> t.AsyncGenerator[t.Any, None]:
6361
"""
6462
Process futures with optional progress tracking.
6563
6664
Args:
6765
futures: Iterator of asyncio futures to process (e.g., from asyncio.as_completed)
68-
pbar: Optional progress bar to update
6966
7067
Yields:
7168
Results from completed futures as they finish
@@ -77,8 +74,6 @@ async def process_futures(
7774
except Exception as e:
7875
result = e
7976

80-
if pbar:
81-
pbar.update(1)
8277
yield result
8378

8479

@@ -139,10 +134,9 @@ async def _run():
139134

140135
if not batch_size:
141136
with pbm.create_single_bar(total_tasks) as pbar:
142-
async for result in process_futures(
143-
as_completed(tasks, max_workers), pbar
144-
):
137+
async for result in process_futures(as_completed(tasks, max_workers)):
145138
results.append(result)
139+
pbar.update(1)
146140
else:
147141
total_tasks = len(tasks)
148142
batches = batched(tasks, batch_size)
@@ -153,9 +147,10 @@ async def _run():
153147
for i, batch in enumerate(batches, 1):
154148
pbm.update_batch_bar(batch_pbar, i, n_batches, len(batch))
155149
async for result in process_futures(
156-
as_completed(batch, max_workers), batch_pbar
150+
as_completed(batch, max_workers)
157151
):
158152
results.append(result)
153+
batch_pbar.update(1)
159154
overall_pbar.update(len(batch))
160155

161156
return results

ragas/src/ragas/executor.py

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -148,36 +148,19 @@ async def _process_batched_jobs(
148148
afunc(*args, **kwargs) for afunc, args, kwargs, _ in batch
149149
]
150150
async for result in process_futures(
151-
as_completed(coroutines, max_workers), batch_pbar
151+
as_completed(coroutines, max_workers)
152152
):
153-
# Ensure result is always a tuple (counter, value)
154-
if isinstance(result, Exception):
155-
# Find the counter for this failed job
156-
idx = coroutines.index(result.__context__)
157-
counter = (
158-
batch[idx][0].__closure__[1].cell_contents
159-
) # counter from closure
160-
results.append((counter, result))
161-
else:
162-
results.append(result)
153+
results.append(result)
154+
batch_pbar.update(1)
163155
# Update overall progress bar for all futures in this batch
164156
overall_pbar.update(len(batch))
165157

166158
async def _process_coroutines(self, jobs, pbar, results, max_workers):
167159
"""Helper function to process coroutines and update the progress bar."""
168160
coroutines = [afunc(*args, **kwargs) for afunc, args, kwargs, _ in jobs]
169-
async for result in process_futures(
170-
as_completed(coroutines, max_workers), pbar
171-
):
172-
# Ensure result is always a tuple (counter, value)
173-
if isinstance(result, Exception):
174-
idx = coroutines.index(result.__context__)
175-
counter = (
176-
jobs[idx][0].__closure__[1].cell_contents
177-
) # counter from closure
178-
results.append((counter, result))
179-
else:
180-
results.append(result)
161+
async for result in process_futures(as_completed(coroutines, max_workers)):
162+
results.append(result)
163+
pbar.update(1)
181164

182165
async def aresults(self) -> t.List[t.Any]:
183166
"""

0 commit comments

Comments
 (0)