janus
Mixed sync-async queue, supposed to be used for communicating between classic synchronous (threaded) code and asynchronous (in terms of asyncio) one.
Like Janus god the queue object from the library has two faces: synchronous and asynchronous interface.
Synchronous is fully compatible with standard queue, asynchronous one follows asyncio queue design.
Usage example (Python 3.7+)
import asyncio
import janus
def threaded(sync_q: janus.SyncQueue[int]) -> None:
for i in range(100):
sync_q.put(i)
sync_q.join()
async def async_coro(async_q: janus.AsyncQueue[int]) -> None:
for i in range(100):
val = await async_q.get()
assert val == i
async_q.task_done()
async def main() -> None:
queue: janus.Queue[int] = janus.Queue()
loop = asyncio.get_running_loop()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
await async_coro(queue.async_q)
await fut
queue.close()
await queue.wait_closed()
asyncio.run(main())
Usage example (Python 3.5 and 3.6)
import asyncio
import janus
loop = asyncio.get_event_loop()
def threaded(sync_q):
for i in range(100):
sync_q.put(i)
sync_q.join()
async def async_coro(async_q):
for i in range(100):
val = await async_q.get()
assert val == i
async_q.task_done()
async def main():
queue = janus.Queue()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
await async_coro(queue.async_q)
await fut
queue.close()
await queue.wait_closed()
try:
loop.run_until_complete(main())
finally:
loop.close()
Communication channels
GitHub Discussions: https://github.com/aio-libs/janus/discussions
Feel free to post your questions and ideas here.
gitter chat https://gitter.im/aio-libs/Lobby
License
janus
library is offered under Apache 2 license.
Thanks
The library development is sponsored by DataRobot (https://datarobot.com)