Skip to content

Commit 4a7a4d2

Browse files
committed
move ExceptionContainer in errortools
1 parent 5112acc commit 4a7a4d2

File tree

2 files changed

+60
-104
lines changed

2 files changed

+60
-104
lines changed

streamable/aiterators.py

Lines changed: 30 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
Iterable,
2323
Iterator,
2424
List,
25-
NamedTuple,
2625
Optional,
2726
Set,
2827
Tuple,
@@ -34,6 +33,7 @@
3433

3534
from streamable.util.asynctools import awaitable_to_coroutine, empty_aiter
3635
from streamable.util.contextmanagertools import noop_context_manager
36+
from streamable.util.errortools import ExceptionContainer
3737
from streamable.util.loggertools import get_logger
3838

3939
from streamable.util.constants import NO_REPLACEMENT
@@ -559,9 +559,6 @@ async def __anext__(self) -> T:
559559

560560

561561
class _RaisingAsyncIterator(AsyncIterator[T]):
562-
class ExceptionContainer(NamedTuple):
563-
exception: Exception
564-
565562
def __init__(
566563
self,
567564
iterator: AsyncIterator[Union[T, ExceptionContainer]],
@@ -570,7 +567,7 @@ def __init__(
570567

571568
async def __anext__(self) -> T:
572569
elem = await self.iterator.__anext__()
573-
if isinstance(elem, self.ExceptionContainer):
570+
if isinstance(elem, ExceptionContainer):
574571
try:
575572
raise elem.exception
576573
finally:
@@ -586,7 +583,7 @@ async def __anext__(self) -> T:
586583
class _BaseConcurrentMapAsyncIterable(
587584
Generic[T, U],
588585
ABC,
589-
AsyncIterable[Union[U, _RaisingAsyncIterator.ExceptionContainer]],
586+
AsyncIterable[Union[U, ExceptionContainer]],
590587
):
591588
def __init__(
592589
self,
@@ -602,29 +599,27 @@ def _context_manager(self) -> ContextManager:
602599
return noop_context_manager()
603600

604601
@abstractmethod
605-
def _launch_task(
606-
self, elem: T
607-
) -> "Future[Union[U, _RaisingAsyncIterator.ExceptionContainer]]": ...
602+
def _launch_task(self, elem: T) -> "Future[Union[U, ExceptionContainer]]": ...
608603

609604
# factory method
610605
@abstractmethod
611606
def _future_result_collection(
612607
self,
613-
) -> FutureResultCollection[Union[U, _RaisingAsyncIterator.ExceptionContainer]]: ...
608+
) -> FutureResultCollection[Union[U, ExceptionContainer]]: ...
614609

615610
async def _next_future(
616611
self,
617-
) -> Optional["Future[Union[U, _RaisingAsyncIterator.ExceptionContainer]]"]:
612+
) -> Optional["Future[Union[U, ExceptionContainer]]"]:
618613
try:
619614
return self._launch_task(await self.iterator.__anext__())
620615
except StopAsyncIteration:
621616
return None
622617
except Exception as e:
623-
return FutureResult(_RaisingAsyncIterator.ExceptionContainer(e))
618+
return FutureResult(ExceptionContainer(e))
624619

625620
async def __aiter__(
626621
self,
627-
) -> AsyncIterator[Union[U, _RaisingAsyncIterator.ExceptionContainer]]:
622+
) -> AsyncIterator[Union[U, ExceptionContainer]]:
628623
with self._context_manager():
629624
future_results = self._future_result_collection()
630625

@@ -637,9 +632,7 @@ async def __aiter__(
637632
future_results.add_future(future)
638633

639634
# wait, enqueue, yield
640-
to_yield: Deque[Union[U, _RaisingAsyncIterator.ExceptionContainer]] = deque(
641-
maxlen=1
642-
)
635+
to_yield: Deque[Union[U, ExceptionContainer]] = deque(maxlen=1)
643636
while future_results:
644637
to_yield.append(await future_results.__anext__())
645638
future = await self._next_future()
@@ -678,22 +671,20 @@ def _context_manager(self) -> ContextManager:
678671
@staticmethod
679672
def _safe_transformation(
680673
transformation: Callable[[T], U], elem: T
681-
) -> Union[U, _RaisingAsyncIterator.ExceptionContainer]:
674+
) -> Union[U, ExceptionContainer]:
682675
try:
683676
return transformation(elem)
684677
except Exception as e:
685-
return _RaisingAsyncIterator.ExceptionContainer(e)
678+
return ExceptionContainer(e)
686679

687-
def _launch_task(
688-
self, elem: T
689-
) -> "Future[Union[U, _RaisingAsyncIterator.ExceptionContainer]]":
680+
def _launch_task(self, elem: T) -> "Future[Union[U, ExceptionContainer]]":
690681
return self.executor.submit(
691682
self._safe_transformation, self.transformation, elem
692683
)
693684

694685
def _future_result_collection(
695686
self,
696-
) -> FutureResultCollection[Union[U, _RaisingAsyncIterator.ExceptionContainer]]:
687+
) -> FutureResultCollection[Union[U, ExceptionContainer]]:
697688
if self.ordered:
698689
return FIFOOSFutureResultCollection()
699690
return FDFOOSFutureResultCollection(
@@ -734,25 +725,21 @@ def __init__(
734725
super().__init__(iterator, buffersize, ordered)
735726
self.transformation = transformation
736727

737-
async def _safe_transformation(
738-
self, elem: T
739-
) -> Union[U, _RaisingAsyncIterator.ExceptionContainer]:
728+
async def _safe_transformation(self, elem: T) -> Union[U, ExceptionContainer]:
740729
try:
741730
return await self.transformation(elem)
742731
except Exception as e:
743-
return _RaisingAsyncIterator.ExceptionContainer(e)
732+
return ExceptionContainer(e)
744733

745-
def _launch_task(
746-
self, elem: T
747-
) -> "Future[Union[U, _RaisingAsyncIterator.ExceptionContainer]]":
734+
def _launch_task(self, elem: T) -> "Future[Union[U, ExceptionContainer]]":
748735
return cast(
749-
"Future[Union[U, _RaisingAsyncIterator.ExceptionContainer]]",
736+
"Future[Union[U, ExceptionContainer]]",
750737
asyncio.get_running_loop().create_task(self._safe_transformation(elem)),
751738
)
752739

753740
def _future_result_collection(
754741
self,
755-
) -> FutureResultCollection[Union[U, _RaisingAsyncIterator.ExceptionContainer]]:
742+
) -> FutureResultCollection[Union[U, ExceptionContainer]]:
756743
if self.ordered:
757744
return FIFOAsyncFutureResultCollection(asyncio.get_running_loop())
758745
else:
@@ -782,9 +769,7 @@ def __init__(
782769
######################
783770

784771

785-
class _ConcurrentFlattenAsyncIterable(
786-
AsyncIterable[Union[T, _RaisingAsyncIterator.ExceptionContainer]]
787-
):
772+
class _ConcurrentFlattenAsyncIterable(AsyncIterable[Union[T, ExceptionContainer]]):
788773
def __init__(
789774
self,
790775
iterables_iterator: AsyncIterator[Iterable[T]],
@@ -797,17 +782,15 @@ def __init__(
797782

798783
async def __aiter__(
799784
self,
800-
) -> AsyncIterator[Union[T, _RaisingAsyncIterator.ExceptionContainer]]:
785+
) -> AsyncIterator[Union[T, ExceptionContainer]]:
801786
with ThreadPoolExecutor(max_workers=self.concurrency) as executor:
802787
iterator_and_future_pairs: Deque[
803788
Tuple[
804789
Optional[Iterator[T]],
805-
"Future[Union[T, _RaisingAsyncIterator.ExceptionContainer]]",
790+
"Future[Union[T, ExceptionContainer]]",
806791
]
807792
] = deque()
808-
to_yield: Deque[Union[T, _RaisingAsyncIterator.ExceptionContainer]] = deque(
809-
maxlen=1
810-
)
793+
to_yield: Deque[Union[T, ExceptionContainer]] = deque(maxlen=1)
811794
iterator_to_queue: Optional[Iterator[T]] = None
812795
# wait, queue, yield (FIFO)
813796
while True:
@@ -819,7 +802,7 @@ async def __aiter__(
819802
except StopIteration:
820803
pass
821804
except Exception as e:
822-
to_yield.append(_RaisingAsyncIterator.ExceptionContainer(e))
805+
to_yield.append(ExceptionContainer(e))
823806
iterator_to_queue = iterator
824807

825808
# queue tasks up to buffersize
@@ -833,9 +816,7 @@ async def __aiter__(
833816
iterator_to_queue = iterable.__iter__()
834817
except Exception as e:
835818
iterator_to_queue = None
836-
future = FutureResult(
837-
_RaisingAsyncIterator.ExceptionContainer(e)
838-
)
819+
future = FutureResult(ExceptionContainer(e))
839820
iterator_and_future_pairs.append(
840821
(iterator_to_queue, future)
841822
)
@@ -865,9 +846,7 @@ def __init__(
865846
)
866847

867848

868-
class _ConcurrentAFlattenAsyncIterable(
869-
AsyncIterable[Union[T, _RaisingAsyncIterator.ExceptionContainer]]
870-
):
849+
class _ConcurrentAFlattenAsyncIterable(AsyncIterable[Union[T, ExceptionContainer]]):
871850
def __init__(
872851
self,
873852
iterables_iterator: AsyncIterator[AsyncIterable[T]],
@@ -880,16 +859,14 @@ def __init__(
880859

881860
async def __aiter__(
882861
self,
883-
) -> AsyncIterator[Union[T, _RaisingAsyncIterator.ExceptionContainer]]:
862+
) -> AsyncIterator[Union[T, ExceptionContainer]]:
884863
iterator_and_future_pairs: Deque[
885864
Tuple[
886865
Optional[AsyncIterator[T]],
887-
Awaitable[Union[T, _RaisingAsyncIterator.ExceptionContainer]],
866+
Awaitable[Union[T, ExceptionContainer]],
888867
]
889868
] = deque()
890-
to_yield: Deque[Union[T, _RaisingAsyncIterator.ExceptionContainer]] = deque(
891-
maxlen=1
892-
)
869+
to_yield: Deque[Union[T, ExceptionContainer]] = deque(maxlen=1)
893870
iterator_to_queue: Optional[AsyncIterator[T]] = None
894871
# wait, queue, yield (FIFO)
895872
while True:
@@ -901,7 +878,7 @@ async def __aiter__(
901878
except StopAsyncIteration:
902879
pass
903880
except Exception as e:
904-
to_yield.append(_RaisingAsyncIterator.ExceptionContainer(e))
881+
to_yield.append(ExceptionContainer(e))
905882
iterator_to_queue = iterator
906883

907884
# queue tasks up to buffersize
@@ -915,9 +892,7 @@ async def __aiter__(
915892
iterator_to_queue = iterable.__aiter__()
916893
except Exception as e:
917894
iterator_to_queue = None
918-
future = FutureResult(
919-
_RaisingAsyncIterator.ExceptionContainer(e)
920-
)
895+
future = FutureResult(ExceptionContainer(e))
921896
iterator_and_future_pairs.append((iterator_to_queue, future))
922897
continue
923898
future = asyncio.get_running_loop().create_task(

0 commit comments

Comments
 (0)