Skip to content

Commit 5a44dc2

Browse files
committed
async concurrent mapping: decouple concurrency and buffersize
1 parent d3a18e8 commit 5a44dc2

File tree

6 files changed

+23
-3
lines changed

6 files changed

+23
-3
lines changed

streamable/afunctions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ def amap(
235235
return ConcurrentAMapAsyncIterator(
236236
aiterator,
237237
transformation,
238+
concurrency=concurrency,
238239
buffersize=concurrency,
239240
ordered=ordered,
240241
)

streamable/aiterators.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -717,15 +717,25 @@ def __init__(
717717
self,
718718
iterator: AsyncIterator[T],
719719
transformation: Callable[[T], Coroutine[Any, Any, U]],
720+
concurrency: int,
720721
buffersize: int,
721722
ordered: bool,
722723
) -> None:
723724
super().__init__(iterator, buffersize, ordered)
724725
self.transformation = transformation
726+
self.concurrency = concurrency
727+
self.semaphore: asyncio.Semaphore
728+
729+
def _context_manager(self) -> ContextManager:
730+
self.semaphore = asyncio.Semaphore(
731+
self.concurrency, loop=asyncio.get_running_loop()
732+
)
733+
return super()._context_manager()
725734

726735
async def _safe_transformation(self, elem: T) -> Union[U, ExceptionContainer]:
727736
try:
728-
return await self.transformation(elem)
737+
async with self.semaphore:
738+
return await self.transformation(elem)
729739
except Exception as e:
730740
return ExceptionContainer(e)
731741

@@ -749,13 +759,15 @@ def __init__(
749759
self,
750760
iterator: AsyncIterator[T],
751761
transformation: Callable[[T], Coroutine[Any, Any, U]],
762+
concurrency: int,
752763
buffersize: int,
753764
ordered: bool,
754765
) -> None:
755766
super().__init__(
756767
_ConcurrentAMapAsyncIterable(
757768
iterator,
758769
transformation,
770+
concurrency,
759771
buffersize,
760772
ordered,
761773
).__aiter__()

streamable/functions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ def amap(
231231
event_loop,
232232
iterator,
233233
transformation,
234+
concurrency=concurrency,
234235
buffersize=concurrency,
235236
ordered=ordered,
236237
)

streamable/iterators.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,16 +667,19 @@ def __init__(
667667
event_loop: asyncio.AbstractEventLoop,
668668
iterator: Iterator[T],
669669
transformation: Callable[[T], Coroutine[Any, Any, U]],
670+
concurrency: int,
670671
buffersize: int,
671672
ordered: bool,
672673
) -> None:
673674
super().__init__(iterator, buffersize, ordered)
674675
self.transformation = transformation
675676
self.event_loop = event_loop
677+
self.semaphore = asyncio.Semaphore(concurrency, loop=event_loop)
676678

677679
async def _safe_transformation(self, elem: T) -> Union[U, ExceptionContainer]:
678680
try:
679-
return await self.transformation(elem)
681+
async with self.semaphore:
682+
return await self.transformation(elem)
680683
except Exception as e:
681684
return ExceptionContainer(e)
682685

@@ -701,6 +704,7 @@ def __init__(
701704
event_loop: asyncio.AbstractEventLoop,
702705
iterator: Iterator[T],
703706
transformation: Callable[[T], Coroutine[Any, Any, U]],
707+
concurrency: int,
704708
buffersize: int,
705709
ordered: bool,
706710
) -> None:
@@ -709,6 +713,7 @@ def __init__(
709713
event_loop,
710714
iterator,
711715
transformation,
716+
concurrency,
712717
buffersize,
713718
ordered,
714719
).__iter__()

tests/test_iterators.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ def test_ConcurrentAMapAsyncIterable(self) -> None:
2222
_ConcurrentAMapAsyncIterable(
2323
sync_to_async_iter(iter(src)),
2424
async_identity,
25+
concurrency=2,
2526
buffersize=2,
2627
ordered=True,
2728
)

tests/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class TestError(Exception):
156156
pass
157157

158158

159-
DELTA_RATE = 0.4
159+
DELTA_RATE = 0.1
160160
# size of the test collections
161161
N = 256
162162

0 commit comments

Comments
 (0)