Skip to content
Open

1.6.6 #119

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:

strategy:
matrix:
python-version: ['3.7.17', '3.8.18', '3.9.18', '3.10.13', '3.11.7', '3.12.1', '3.13.1', '3.14.0-beta.1']
python-version: ['3.7.17', '3.8.18', '3.9.18', '3.10.13', '3.11.7', '3.12.1', '3.13.1', '3.14.0', '3.15.0-alpha.1']

steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions streamable/afunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def amap(
return ConcurrentAMapAsyncIterator(
aiterator,
transformation,
concurrency=concurrency,
buffersize=concurrency,
ordered=ordered,
)
Expand Down
154 changes: 64 additions & 90 deletions streamable/aiterators.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import datetime
import multiprocessing
import queue
import time
from abc import ABC, abstractmethod
from collections import defaultdict, deque
Expand All @@ -22,7 +21,6 @@
Iterable,
Iterator,
List,
NamedTuple,
Optional,
Set,
Tuple,
Expand All @@ -34,14 +32,13 @@

from streamable.util.asynctools import awaitable_to_coroutine, empty_aiter
from streamable.util.contextmanagertools import noop_context_manager
from streamable.util.errortools import ExceptionContainer
from streamable.util.loggertools import get_logger

from streamable.util.constants import NO_REPLACEMENT
from streamable.util.futuretools import (
FDFOAsyncFutureResultCollection,
FDFOOSFutureResultCollection,
FIFOAsyncFutureResultCollection,
FIFOOSFutureResultCollection,
FutureResult,
FutureResultCollection,
)
Expand Down Expand Up @@ -559,9 +556,6 @@ async def __anext__(self) -> T:


class _RaisingAsyncIterator(AsyncIterator[T]):
class ExceptionContainer(NamedTuple):
exception: Exception

def __init__(
self,
iterator: AsyncIterator[Union[T, ExceptionContainer]],
Expand All @@ -570,7 +564,7 @@ def __init__(

async def __anext__(self) -> T:
elem = await self.iterator.__anext__()
if isinstance(elem, self.ExceptionContainer):
if isinstance(elem, ExceptionContainer):
try:
raise elem.exception
finally:
Expand All @@ -586,7 +580,7 @@ async def __anext__(self) -> T:
class _BaseConcurrentMapAsyncIterable(
Generic[T, U],
ABC,
AsyncIterable[Union[U, _RaisingAsyncIterator.ExceptionContainer]],
AsyncIterable[Union[U, ExceptionContainer]],
):
def __init__(
self,
Expand All @@ -602,29 +596,30 @@ def _context_manager(self) -> ContextManager:
return noop_context_manager()

@abstractmethod
def _launch_task(
self, elem: T
) -> "Future[Union[U, _RaisingAsyncIterator.ExceptionContainer]]": ...
def _launch_task(self, elem: T) -> "Future[Union[U, ExceptionContainer]]": ...

# factory method
@abstractmethod
def _future_result_collection(
self,
) -> FutureResultCollection[Union[U, _RaisingAsyncIterator.ExceptionContainer]]: ...
) -> FutureResultCollection[Union[U, ExceptionContainer]]:
if self.ordered:
return FIFOAsyncFutureResultCollection(asyncio.get_running_loop())
else:
return FDFOAsyncFutureResultCollection(asyncio.get_running_loop())

async def _next_future(
self,
) -> Optional["Future[Union[U, _RaisingAsyncIterator.ExceptionContainer]]"]:
) -> Optional["Future[Union[U, ExceptionContainer]]"]:
try:
return self._launch_task(await self.iterator.__anext__())
elem = await self.iterator.__anext__()
except StopAsyncIteration:
return None
except Exception as e:
return FutureResult(_RaisingAsyncIterator.ExceptionContainer(e))
return FutureResult(ExceptionContainer(e))
return self._launch_task(elem)

async def __aiter__(
self,
) -> AsyncIterator[Union[U, _RaisingAsyncIterator.ExceptionContainer]]:
) -> AsyncIterator[Union[U, ExceptionContainer]]:
with self._context_manager():
future_results = self._future_result_collection()

Expand Down Expand Up @@ -674,26 +669,18 @@ def _context_manager(self) -> ContextManager:
@staticmethod
def _safe_transformation(
transformation: Callable[[T], U], elem: T
) -> Union[U, _RaisingAsyncIterator.ExceptionContainer]:
) -> Union[U, ExceptionContainer]:
try:
return transformation(elem)
except Exception as e:
return _RaisingAsyncIterator.ExceptionContainer(e)

def _launch_task(
self, elem: T
) -> "Future[Union[U, _RaisingAsyncIterator.ExceptionContainer]]":
return self.executor.submit(
self._safe_transformation, self.transformation, elem
)
return ExceptionContainer(e)

def _future_result_collection(
self,
) -> FutureResultCollection[Union[U, _RaisingAsyncIterator.ExceptionContainer]]:
if self.ordered:
return FIFOOSFutureResultCollection()
return FDFOOSFutureResultCollection(
multiprocessing.Queue if self.via == "process" else queue.Queue
def _launch_task(self, elem: T) -> "Future[Union[U, ExceptionContainer]]":
return cast(
"Future[Union[U, ExceptionContainer]]",
asyncio.get_running_loop().run_in_executor(
self.executor, self._safe_transformation, self.transformation, elem
),
)


Expand Down Expand Up @@ -724,49 +711,49 @@ def __init__(
self,
iterator: AsyncIterator[T],
transformation: Callable[[T], Coroutine[Any, Any, U]],
concurrency: int,
buffersize: int,
ordered: bool,
) -> None:
super().__init__(iterator, buffersize, ordered)
self.transformation = transformation
self.concurrency = concurrency
self._semaphore: Optional[asyncio.Semaphore] = None

@property
def semaphore(self) -> asyncio.Semaphore:
if not self._semaphore:
self._semaphore = asyncio.Semaphore(self.concurrency)
return self._semaphore

async def _safe_transformation(
self, elem: T
) -> Union[U, _RaisingAsyncIterator.ExceptionContainer]:
async def _safe_transformation(self, elem: T) -> Union[U, ExceptionContainer]:
try:
return await self.transformation(elem)
async with self.semaphore:
return await self.transformation(elem)
except Exception as e:
return _RaisingAsyncIterator.ExceptionContainer(e)
return ExceptionContainer(e)

def _launch_task(
self, elem: T
) -> "Future[Union[U, _RaisingAsyncIterator.ExceptionContainer]]":
def _launch_task(self, elem: T) -> "Future[Union[U, ExceptionContainer]]":
return cast(
"Future[Union[U, _RaisingAsyncIterator.ExceptionContainer]]",
"Future[Union[U, ExceptionContainer]]",
asyncio.get_running_loop().create_task(self._safe_transformation(elem)),
)

def _future_result_collection(
self,
) -> FutureResultCollection[Union[U, _RaisingAsyncIterator.ExceptionContainer]]:
if self.ordered:
return FIFOAsyncFutureResultCollection(asyncio.get_running_loop())
else:
return FDFOAsyncFutureResultCollection(asyncio.get_running_loop())


class ConcurrentAMapAsyncIterator(_RaisingAsyncIterator[U]):
def __init__(
self,
iterator: AsyncIterator[T],
transformation: Callable[[T], Coroutine[Any, Any, U]],
concurrency: int,
buffersize: int,
ordered: bool,
) -> None:
super().__init__(
_ConcurrentAMapAsyncIterable(
iterator,
transformation,
concurrency,
buffersize,
ordered,
).__aiter__()
Expand All @@ -778,9 +765,7 @@ def __init__(
######################


class _ConcurrentFlattenAsyncIterable(
AsyncIterable[Union[T, _RaisingAsyncIterator.ExceptionContainer]]
):
class _ConcurrentFlattenAsyncIterable(AsyncIterable[Union[T, ExceptionContainer]]):
def __init__(
self,
iterables_iterator: AsyncIterator[Iterable[T]],
Expand All @@ -790,34 +775,29 @@ def __init__(
self.iterables_iterator = iterables_iterator
self.concurrency = concurrency
self.buffersize = buffersize
self._next = ExceptionContainer.wrap(next)

async def __aiter__(
self,
) -> AsyncIterator[Union[T, _RaisingAsyncIterator.ExceptionContainer]]:
) -> AsyncIterator[Union[T, ExceptionContainer]]:
with ThreadPoolExecutor(max_workers=self.concurrency) as executor:
iterator_and_future_pairs: Deque[
Tuple[
Optional[Iterator[T]],
"Future[Union[T, _RaisingAsyncIterator.ExceptionContainer]]",
"Awaitable[Union[T, ExceptionContainer]]",
]
] = deque()
element_to_yield: Deque[
Union[T, _RaisingAsyncIterator.ExceptionContainer]
] = deque(maxlen=1)
to_yield: Deque[Union[T, ExceptionContainer]] = deque(maxlen=1)
iterator_to_queue: Optional[Iterator[T]] = None
# wait, queue, yield (FIFO)
while True:
if iterator_and_future_pairs:
iterator, future = iterator_and_future_pairs.popleft()
try:
element_to_yield.append(future.result())
iterator_to_queue = iterator
except StopIteration:
pass
except Exception as e:
element_to_yield.append(
_RaisingAsyncIterator.ExceptionContainer(e)
)
elem = await future
if not isinstance(elem, ExceptionContainer) or not isinstance(
elem.exception, StopIteration
):
to_yield.append(elem)
iterator_to_queue = iterator

# queue tasks up to buffersize
Expand All @@ -831,18 +811,18 @@ async def __aiter__(
iterator_to_queue = iterable.__iter__()
except Exception as e:
iterator_to_queue = None
future = FutureResult(
_RaisingAsyncIterator.ExceptionContainer(e)
)
future = FutureResult(ExceptionContainer(e))
iterator_and_future_pairs.append(
(iterator_to_queue, future)
)
continue
future = executor.submit(next, iterator_to_queue)
future = asyncio.get_running_loop().run_in_executor(
executor, self._next, iterator_to_queue
)
iterator_and_future_pairs.append((iterator_to_queue, future))
iterator_to_queue = None
if element_to_yield:
yield element_to_yield.pop()
if to_yield:
yield to_yield.pop()
if not iterator_and_future_pairs:
break

Expand All @@ -863,9 +843,7 @@ def __init__(
)


class _ConcurrentAFlattenAsyncIterable(
AsyncIterable[Union[T, _RaisingAsyncIterator.ExceptionContainer]]
):
class _ConcurrentAFlattenAsyncIterable(AsyncIterable[Union[T, ExceptionContainer]]):
def __init__(
self,
iterables_iterator: AsyncIterator[AsyncIterable[T]],
Expand All @@ -878,28 +856,26 @@ def __init__(

async def __aiter__(
self,
) -> AsyncIterator[Union[T, _RaisingAsyncIterator.ExceptionContainer]]:
) -> AsyncIterator[Union[T, ExceptionContainer]]:
iterator_and_future_pairs: Deque[
Tuple[
Optional[AsyncIterator[T]],
Awaitable[Union[T, _RaisingAsyncIterator.ExceptionContainer]],
Awaitable[Union[T, ExceptionContainer]],
]
] = deque()
element_to_yield: Deque[Union[T, _RaisingAsyncIterator.ExceptionContainer]] = (
deque(maxlen=1)
)
to_yield: Deque[Union[T, ExceptionContainer]] = deque(maxlen=1)
iterator_to_queue: Optional[AsyncIterator[T]] = None
# wait, queue, yield (FIFO)
while True:
if iterator_and_future_pairs:
iterator, future = iterator_and_future_pairs.popleft()
try:
element_to_yield.append(await future)
to_yield.append(await future)
iterator_to_queue = iterator
except StopAsyncIteration:
pass
except Exception as e:
element_to_yield.append(_RaisingAsyncIterator.ExceptionContainer(e))
to_yield.append(ExceptionContainer(e))
iterator_to_queue = iterator

# queue tasks up to buffersize
Expand All @@ -913,18 +889,16 @@ async def __aiter__(
iterator_to_queue = iterable.__aiter__()
except Exception as e:
iterator_to_queue = None
future = FutureResult(
_RaisingAsyncIterator.ExceptionContainer(e)
)
future = FutureResult(ExceptionContainer(e))
iterator_and_future_pairs.append((iterator_to_queue, future))
continue
future = asyncio.get_running_loop().create_task(
awaitable_to_coroutine(iterator_to_queue.__anext__())
)
iterator_and_future_pairs.append((iterator_to_queue, future))
iterator_to_queue = None
if element_to_yield:
yield element_to_yield.pop()
if to_yield:
yield to_yield.pop()
if not iterator_and_future_pairs:
break

Expand Down
1 change: 1 addition & 0 deletions streamable/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ def amap(
event_loop,
iterator,
transformation,
concurrency=concurrency,
buffersize=concurrency,
ordered=ordered,
)
Expand Down
Loading