Skip to content

Commit ac4b997

Browse files
committed
_ConcurrentMapAsyncIterable: run_in_executor
1 parent 5d3b48b commit ac4b997

File tree

2 files changed

+21
-25
lines changed

2 files changed

+21
-25
lines changed

streamable/aiterators.py

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
import datetime
33
import multiprocessing
4-
import queue
54
import time
65
from abc import ABC, abstractmethod
76
from collections import defaultdict, deque
@@ -39,9 +38,7 @@
3938
from streamable.util.constants import NO_REPLACEMENT
4039
from streamable.util.futuretools import (
4140
FDFOAsyncFutureResultCollection,
42-
FDFOOSFutureResultCollection,
4341
FIFOAsyncFutureResultCollection,
44-
FIFOOSFutureResultCollection,
4542
FutureResult,
4643
FutureResultCollection,
4744
)
@@ -601,11 +598,13 @@ def _context_manager(self) -> ContextManager:
601598
@abstractmethod
602599
def _launch_task(self, elem: T) -> "Future[Union[U, ExceptionContainer]]": ...
603600

604-
# factory method
605-
@abstractmethod
606601
def _future_result_collection(
607602
self,
608-
) -> FutureResultCollection[Union[U, ExceptionContainer]]: ...
603+
) -> FutureResultCollection[Union[U, ExceptionContainer]]:
604+
if self.ordered:
605+
return FIFOAsyncFutureResultCollection(asyncio.get_running_loop())
606+
else:
607+
return FDFOAsyncFutureResultCollection(asyncio.get_running_loop())
609608

610609
async def _next_future(
611610
self,
@@ -676,17 +675,11 @@ def _safe_transformation(
676675
return ExceptionContainer(e)
677676

678677
def _launch_task(self, elem: T) -> "Future[Union[U, ExceptionContainer]]":
679-
return self.executor.submit(
680-
self._safe_transformation, self.transformation, elem
681-
)
682-
683-
def _future_result_collection(
684-
self,
685-
) -> FutureResultCollection[Union[U, ExceptionContainer]]:
686-
if self.ordered:
687-
return FIFOOSFutureResultCollection()
688-
return FDFOOSFutureResultCollection(
689-
multiprocessing.Queue if self.via == "process" else queue.Queue
678+
return cast(
679+
"Future[Union[U, ExceptionContainer]]",
680+
asyncio.get_running_loop().run_in_executor(
681+
self.executor, self._safe_transformation, self.transformation, elem
682+
),
690683
)
691684

692685

@@ -745,14 +738,6 @@ def _launch_task(self, elem: T) -> "Future[Union[U, ExceptionContainer]]":
745738
asyncio.get_running_loop().create_task(self._safe_transformation(elem)),
746739
)
747740

748-
def _future_result_collection(
749-
self,
750-
) -> FutureResultCollection[Union[U, ExceptionContainer]]:
751-
if self.ordered:
752-
return FIFOAsyncFutureResultCollection(asyncio.get_running_loop())
753-
else:
754-
return FDFOAsyncFutureResultCollection(asyncio.get_running_loop())
755-
756741

757742
class ConcurrentAMapAsyncIterator(_RaisingAsyncIterator[U]):
758743
def __init__(

tests/test_util.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import asyncio
12
import unittest
23

34
from streamable.util.functiontools import sidify, star
5+
from streamable.util.futuretools import FIFOOSFutureResultCollection, FutureResult
46

57

68
class TestUtil(unittest.TestCase):
@@ -32,3 +34,12 @@ def mul(a: int, b: int) -> int:
3234
list(map(mul, enumerate(range(10)))),
3335
list(map(lambda x: x**2, range(10))),
3436
)
37+
38+
def test_os_future_result_collection_anext(self):
39+
result = object()
40+
future_results = FIFOOSFutureResultCollection()
41+
future_results.add_future(FutureResult(result))
42+
self.assertEqual(
43+
asyncio.run(future_results.__anext__()),
44+
result,
45+
)

0 commit comments

Comments
 (0)