Skip to content

Option to pass in new event loop or specify use existing for async functions? #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
msluyter opened this issue Apr 23, 2025 · 8 comments
Labels
documentation Improvements or additions to documentation

Comments

@msluyter
Copy link

msluyter commented Apr 23, 2025

Hi,
First off, let me say that I love this library.

Question/issue wrt async: I was doing some experimentation using aioboto3, where I ran into a problem. With aioboto3, you have to use a contextmanager to create your async boto client. And, to do that, you'll need to be running in an event loop before starting your stream, eg:

# say we have an async function to get an s3 file that reads the file and then returns the contents as a string...
async def get_s3_file(s3_file) -> str: 
     ...

async def main() -> None:
    session = aioboto3.Session()
    
    async with session.client("s3", config=aiobotocore_config) as client:
        pipeline: Stream = (
            Stream(["file1", "file2"])
            .amap(get_s3_file, concurrency=2)
            .foreach(print)
        )
        pipeline()
       
    
if __name__ == "__main__":
    asyncio.run(main())

I've elided over a few issues here, like how to pass client to get_s3_file(), which I found somewhat tricky, but in any event, the above will fail with:

  File "/Users/.../.venv/lib/python3.12/site-packages/streamable/util/futuretools.py", line 83, in __next__
    return self.event_loop.run_until_complete(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 667, in run_until_complete
    self._check_running()
  File "/opt/homebrew/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 626, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

Basically, I'm wondering if there's a way to either pass an event loop into the stream or check if we're already in one before starting a new one to be able to allow this sort of pattern?

@ebonnal
Copy link
Owner

ebonnal commented Apr 25, 2025

Hi @msluyter,
Thanks a lot for your support and feedback!

oh yes you need to allow event loops nesting if you are required to iterate over the stream from within an event loop. Can you try to pip install nest_asyncio and add this at the beginning of your script?

import nest_asyncio
nest_asyncio.apply()

It should do the trick 🙏🏻 let me know if it works on your side and I will add that to the README.

Note that tackling #8 would allow async iteration on the stream (async for elem in stream), which would be cleaner. I haven’t yet found an elegant implementation though — I keep ending up duplicating a lot of logic for the async counterparts 😬 .

@ebonnal ebonnal added documentation Improvements or additions to documentation and removed awaiting review labels Apr 25, 2025
@ebonnal
Copy link
Owner

ebonnal commented Apr 25, 2025

I've elided over a few issues here, like how to pass client to get_s3_file(), which I found somewhat tricky, but in any event, the above will fail with:

I'm very curious to see your full snippet 🙏🏻 maybe we can make .amap easier to work with.

@msluyter
Copy link
Author

msluyter commented Apr 25, 2025

Thanks, I'll try nest_asyncio when I get a chance!

I don't want to accidentally leak propriety comany information, so I can't really cut & paste the whole thing, but basically I'm passing the aioboto3 s3 client into the get_s3_file function like so:

async def get_s3_file(s3_client, s3_file):
    ....

But then in the stream, in order to pass the s3 client to the function, I ended up doing this:

      pipeline: Stream = (
            Stream(zip(itertools.repeat(client), iter_transaction_dates(cur_date, end_date)))
            .observe("dates")
            .amap(get_s3_file, concurrency=4)
            .flatten(concurrency=4)
            ...

Using zip felt a little hacky and I'm sure there's a better approach. A better way might have been using a class rather than a single function and passing client via __init__() prior to running the stream. Or using a partial. I was trying to shoehorng thing into an existing pattern so this was my initial attempt.

Thanks!

@ebonnal
Copy link
Owner

ebonnal commented Apr 26, 2025

I don't want to accidentally leak propriety comany information

Of course sorry! Thank you for sharing the way you managed the client @msluyter

Using zip felt a little hacky and I'm sure there's a better approach

Indeed functools.partial should do the trick in this case:

import aioboto3
import asyncio
from functools import partial
from streamable.stream import Stream

import nest_asyncio
nest_asyncio.apply()

async def get_s3_file(s3_client, s3_file) -> str: 
    return f"content of file={s3_file} read via client={s3_client}"

async def main() -> None:
    session = aioboto3.Session()
    aiobotocore_config = None
    async with session.client("s3", config=aiobotocore_config) as client:
        pipeline: Stream = (
            Stream(["file1", "file2"])
            .amap(partial(get_s3_file, client), concurrency=2)
            .foreach(print)
        )
        pipeline()

if __name__ == "__main__":
    asyncio.run(main())

Note: The .amap operation will execute 2 (concurrency) get_s3_file calls asynchronously using a nested event loop, but the call pipeline() itself is synchronous and will block your main event loop.

If you want to avoid nest_asyncio, you have to iterate over the stream outside of the event loop, you have to run the context manager logic (__aenter__/__aexit__) manually:

import aioboto3
import asyncio
from functools import partial
from streamable.stream import Stream

async def get_s3_file(s3_client, s3_file) -> str: 
    return f"content of file={s3_file} read via client={s3_client}"

session = aioboto3.Session()
aiobotocore_config = None
client = asyncio.run(session.client("s3", config=aiobotocore_config).__aenter__())
pipeline: Stream = (
    Stream(["file1", "file2"])
    .amap(partial(get_s3_file, client), concurrency=2)
    .foreach(print)
)
pipeline()
asyncio.run(client.__aexit__(None, None, None))

Your feedback make me realize that I really need to tackle #8 / #100 / #101 to make streamable elegant asyncio-wise, will keep you tuned. Once done your script would just have to replace pipeline() by await pipeline.

@msluyter
Copy link
Author

Hey, just following up to let you know that using nest_asyncio I was able to get my example to work without issue. And using a partial got around the zip hack. Thanks!

@ebonnal
Copy link
Owner

ebonnal commented Apr 29, 2025

@msluyter great news!

fyi I am working actively on #102, one of the things it will enable is to await pipeline to consume it as an AsyncIterable, no more blocking nested event loop, everything will run in your main one cleanly. I would love to have your feedback on that once it's ready. I keep you posted!

@ebonnal ebonnal closed this as completed Apr 29, 2025
@ebonnal
Copy link
Owner

ebonnal commented May 11, 2025

Hello @msluyter!

I have worked on the full async compatibility of Streams: A Stream[T] is an Iterable[T] but now it is also an AsyncIterable[T] and an Awaitable.

It's only in alpha for now while I make sure it is production ready. I have extended the test suite to cover 100% of the new async codebase too, and I am working on testing it in real life scripts, I would really appreciate if you can give it a try! 🙏🏻

install: It's only in alpha for now, so you have to install the version explicitly: pip install streamable==1.6.0a6
test:

  • remove the dependency on nest_asyncio
  • replace pipeline() by await pipeline

Now your .amap operation will run in the main event loop in a non-blocking way.

Looking forward to reading your feedback, thanks a lot in advance 🙏🏻 .

Note: feel free to check the README of the alpha version

@ebonnal ebonnal reopened this May 11, 2025
@ebonnal
Copy link
Owner

ebonnal commented May 20, 2025

fyi @msluyter : a Beta version is now available (pip install streamable==1.6.0b0), I will likely release 1.6.0 this week.

I'm closing this issue but I am still very interested in your feedback regarding the await pipeline for your use case 🙏🏻

@ebonnal ebonnal closed this as completed May 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
None yet
Development

No branches or pull requests

2 participants