-
Notifications
You must be signed in to change notification settings - Fork 4
Option to pass in new event loop or specify use existing for async functions? #99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Hi @msluyter, oh yes you need to allow event loops nesting if you are required to iterate over the stream from within an event loop. Can you try to import nest_asyncio
nest_asyncio.apply() It should do the trick 🙏🏻 let me know if it works on your side and I will add that to the README. Note that tackling #8 would allow async iteration on the stream ( |
I'm very curious to see your full snippet 🙏🏻 maybe we can make |
Thanks, I'll try I don't want to accidentally leak propriety comany information, so I can't really cut & paste the whole thing, but basically I'm passing the aioboto3 s3 client into the async def get_s3_file(s3_client, s3_file):
.... But then in the stream, in order to pass the s3 client to the function, I ended up doing this: pipeline: Stream = (
Stream(zip(itertools.repeat(client), iter_transaction_dates(cur_date, end_date)))
.observe("dates")
.amap(get_s3_file, concurrency=4)
.flatten(concurrency=4)
... Using Thanks! |
Of course sorry! Thank you for sharing the way you managed the client @msluyter
Indeed import aioboto3
import asyncio
from functools import partial
from streamable.stream import Stream
import nest_asyncio
nest_asyncio.apply()
async def get_s3_file(s3_client, s3_file) -> str:
return f"content of file={s3_file} read via client={s3_client}"
async def main() -> None:
session = aioboto3.Session()
aiobotocore_config = None
async with session.client("s3", config=aiobotocore_config) as client:
pipeline: Stream = (
Stream(["file1", "file2"])
.amap(partial(get_s3_file, client), concurrency=2)
.foreach(print)
)
pipeline()
if __name__ == "__main__":
asyncio.run(main()) Note: The If you want to avoid import aioboto3
import asyncio
from functools import partial
from streamable.stream import Stream
async def get_s3_file(s3_client, s3_file) -> str:
return f"content of file={s3_file} read via client={s3_client}"
session = aioboto3.Session()
aiobotocore_config = None
client = asyncio.run(session.client("s3", config=aiobotocore_config).__aenter__())
pipeline: Stream = (
Stream(["file1", "file2"])
.amap(partial(get_s3_file, client), concurrency=2)
.foreach(print)
)
pipeline()
asyncio.run(client.__aexit__(None, None, None)) Your feedback make me realize that I really need to tackle #8 / #100 / #101 to make streamable elegant |
Hey, just following up to let you know that using |
@msluyter great news! fyi I am working actively on #102, one of the things it will enable is to |
Hello @msluyter! I have worked on the full It's only in alpha for now while I make sure it is production ready. I have extended the test suite to cover 100% of the new async codebase too, and I am working on testing it in real life scripts, I would really appreciate if you can give it a try! 🙏🏻 install: It's only in alpha for now, so you have to install the version explicitly:
Now your Looking forward to reading your feedback, thanks a lot in advance 🙏🏻 . Note: feel free to check the README of the alpha version |
fyi @msluyter : a Beta version is now available ( I'm closing this issue but I am still very interested in your feedback regarding the |
Uh oh!
There was an error while loading. Please reload this page.
Hi,
First off, let me say that I love this library.
Question/issue wrt async: I was doing some experimentation using aioboto3, where I ran into a problem. With aioboto3, you have to use a contextmanager to create your async boto client. And, to do that, you'll need to be running in an event loop before starting your stream, eg:
I've elided over a few issues here, like how to pass client to get_s3_file(), which I found somewhat tricky, but in any event, the above will fail with:
Basically, I'm wondering if there's a way to either pass an event loop into the stream or check if we're already in one before starting a new one to be able to allow this sort of pattern?
The text was updated successfully, but these errors were encountered: