Thread-safe asyncio-aware queue for Python

Overview

janus

Chat on Gitter

Mixed sync-async queue, supposed to be used for communicating between classic synchronous (threaded) code and asynchronous (in terms of asyncio) one.

Like Janus god the queue object from the library has two faces: synchronous and asynchronous interface.

Synchronous is fully compatible with standard queue, asynchronous one follows asyncio queue design.

Usage example (Python 3.7+)

import asyncio
import janus


def threaded(sync_q: janus.SyncQueue[int]) -> None:
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_coro(async_q: janus.AsyncQueue[int]) -> None:
    for i in range(100):
        val = await async_q.get()
        assert val == i
        async_q.task_done()


async def main() -> None:
    queue: janus.Queue[int] = janus.Queue()
    loop = asyncio.get_running_loop()
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    await async_coro(queue.async_q)
    await fut
    queue.close()
    await queue.wait_closed()


asyncio.run(main())

Usage example (Python 3.5 and 3.6)

import asyncio
import janus

loop = asyncio.get_event_loop()


def threaded(sync_q):
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_coro(async_q):
    for i in range(100):
        val = await async_q.get()
        assert val == i
        async_q.task_done()


async def main():
    queue = janus.Queue()
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    await async_coro(queue.async_q)
    await fut
    queue.close()
    await queue.wait_closed()

try:
    loop.run_until_complete(main())
finally:
    loop.close()

Communication channels

GitHub Discussions: https://github.com/aio-libs/janus/discussions

Feel free to post your questions and ideas here.

gitter chat https://gitter.im/aio-libs/Lobby

License

janus library is offered under Apache 2 license.

Thanks

The library development is sponsored by DataRobot (https://datarobot.com)

Comments
  • Error with Python 3.10:

    Error with Python 3.10: "ValueError: loop argument must agree with lock"

    I ran into this in my own project, see https://github.com/simonw/datasette/pull/1481 - then I tried using a fork of this project to run the unit tests against Python 3.10 and got the same error: https://github.com/simonw/janus/runs/3842463703?check_suite_focus=true

     ============================= test session starts ==============================
     platform linux -- Python 3.10.0, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
     rootdir: /home/runner/work/janus/janus
     plugins: cov-2.12.1, asyncio-0.15.1
     collected 72 items
     
     tests/test_async.py FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF                      [ 43%]
     tests/test_mixed.py .FFFFFFFFFFFFFFFFF                                   [ 68%]
     tests/test_sync.py FFFFFFFFFFFFFFFFFFFFFFF                               [100%]
     
     =================================== FAILURES ===================================
     __________________________ TestQueueBasic.test_empty ___________________________
     
     self = <test_async.TestQueueBasic object at 0x7fb0f561e4d0>
     
         @pytest.mark.asyncio
         async def test_empty(self):
     >       _q = janus.Queue()
     
     tests/test_async.py:65: 
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
     janus/__init__.py:39: in __init__
         self._async_not_empty = asyncio.Condition(self._async_mutex)
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
     
     self = <[AttributeError("'Condition' object has no attribute 'locked'") raised in repr()] Condition object at 0x7fb0f569a620>
     lock = <asyncio.locks.Lock object at 0x7fb0f569ada0 [unlocked]>
     
         def __init__(self, lock=None, *, loop=mixins._marker):
             super().__init__(loop=loop)
             if lock is None:
                 lock = Lock()
             elif lock._loop is not self._get_loop():
     >           raise ValueError("loop argument must agree with lock")
     E           ValueError: loop argument must agree with lock
    
    opened by simonw 10
  • accept loop parameter for Queue

    accept loop parameter for Queue

    What do these changes do?

    Use case: If one wants to create the Queue in the main thread of the app, but use it with an async loop that is running in a thread (not the one in the main thread).

    Otw. one would have to create the queue in the thread to pick up the correct loop.

    Are there changes in behavior for the user?

    no change in behavior if used as before. Now user can pass a custom loop.

    Checklist

    • [x] I think the code is well written
    • [x] Add a new news fragment into the CHANGES folder

    This change is Reviewable

    opened by ali5h 9
  • expose the unfinished tasks variable

    expose the unfinished tasks variable

    I use queue.empty() and queue._parent._unfinished_tasks == 0 to check whether a queue is complete and this pull request exposes the _unfinished_tasks variable.

    My use case is 2 queues that can send work to each other until both are complete - is there a better way to achieve this? Note that I'm not trying to join() since a thread should wake up if there is more work to do.

    opened by richardbaronpenman 9
  • Fix Syntax error with Python 3.7

    Fix Syntax error with Python 3.7

    async is reserved keyword in Python 3.7

    Fix issue #95 Most likely https://github.com/Rapptz/discord.py/commit/096584733e8a8025b13f46fa920e18abe19352c1

    opened by EcmaXp 7
  • `RuntimeError: no running event loop` after upgrading to 0.5.0

    `RuntimeError: no running event loop` after upgrading to 0.5.0

    Unfortunately, changes in #246 broke the code that relies on creating an instance of janus.Queue outside of the scope of a running event loop, for no obvious reason.

    Also, the behavior now contradicts with the behavior of asyncio.Queue.

    >>> import asyncio
    >>> asyncio.Queue()
    <Queue at 0x7f76be085730 maxsize=0>
    
    >>> import janus
    >>> janus.Queue()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/home/user/projects/project/.venv/lib64/python3.8/site-packages/janus/__init__.py", line 29, in __init__
        self._loop = current_loop()
    RuntimeError: no running event loop
    

    Is there any good reason to not just use asyncio.get_event_loop? From performance perspective it shouldn't be significant.

    opened by martyanov 6
  • doesn't play nicely with concurrent.futures.ProcessPoolExecutor

    doesn't play nicely with concurrent.futures.ProcessPoolExecutor

    if loop.run_in_executor(executor,...) is ProcessPoolExecutor instead of None, exceptions will be raised such as inability to serialize thread lock objects and callItems.

    opened by FirefighterBlu3 5
  • Inaccurate dependence python 3.5.3 - because of typing.Deque

    Inaccurate dependence python 3.5.3 - because of typing.Deque

    The class typing.Deque was first introduced in Python 3.5.4. The parameter for package dependencies should be updated to: python_requires='>=3.5.4'

    opened by farax4de 4
  • Add typing inside the code and few other lint improvements

    Add typing inside the code and few other lint improvements

    What do these changes do?

    1. Removed stub file
    2. Source code fully annotated
    3. Added bandit and pyroma linters

    Are there changes in behavior for the user?

    Related issue number

    Checklist

    • [ ] I think the code is well written
    • [ ] Unit tests for the changes exist
    • [ ] Documentation reflects the changes
    • [ ] If you provide code modification, please add yourself to CONTRIBUTORS.txt
      • The format is <Name> <Surname>.
      • Please keep alphabetical order, the file is sorted by names.
    • [ ] Add a new news fragment into the CHANGES folder
      • name it <issue_id>.<type> (e.g. 588.bugfix)
      • if you don't have an issue_id change it to the pr id after creating the PR
      • ensure type is one of the following:
        • .feature: Signifying a new feature.
        • .bugfix: Signifying a bug fix.
        • .doc: Signifying a documentation improvement.
        • .removal: Signifying a deprecation or removal of public API.
        • .misc: A ticket has been closed, but it is not of interest to users.
      • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.
    opened by jettify 4
  • add SyncQueue and AsyncQueue Protocols

    add SyncQueue and AsyncQueue Protocols

    What do these changes do?

    Add protocols to avoid the use of private classes _SyncQueueProxy and _AsyncQueueProxy when adding type hints.

    Are there changes in behavior for the user?

    I assume that not. But it would be good if somebody more experienced could check this.

    Related issue number

    See https://github.com/aio-libs/janus/issues/372

    This PR is not intended to be directly merged, but to open a discussion about this approach.

    Using this approach, the following seems to work for adding type hints:

    import asyncio
    import janus
    
    
    def threaded(sync_q: janus.SyncQueue[int]):
        for i in range(100):
            sync_q.put(i)
        sync_q.join()
    
    
    async def async_coro(async_q: janus.AsyncQueue[int]):
        for i in range(100):
            val = await async_q.get()
            assert val == i
            async_q.task_done()
    
    
    async def main():
        queue: janus.Queue[int] = janus.Queue()
        loop = asyncio.get_running_loop()
        fut = loop.run_in_executor(None, threaded, queue.sync_q)
        await async_coro(queue.async_q)
        await fut
        queue.close()
        await queue.wait_closed()
    
    
    asyncio.run(main())
    
    

    Full disclaimer: I never used Protocols before. I skimmed https://www.python.org/dev/peps/pep-0544 and watched https://www.youtube.com/watch?v=kDDCKwP7QgQ Also, I only recently started using typing with Python.

    While this approach seems to work, it looks rather verbose.

    opened by rbuffat 3
  • What is the correct way to type annotate a janus.Queue?

    What is the correct way to type annotate a janus.Queue?

    When I try to add type hints to the example, pylance creates reportPrivateUsage warnings.

    import asyncio
    import janus
    
    
    def threaded(sync_q: janus._SyncQueueProxy[int]):
        for i in range(100):
            sync_q.put(i)
        sync_q.join()
    
    
    async def async_coro(async_q: janus._AsyncQueueProxy[int]):
        for i in range(100):
            val = await async_q.get()
            assert val == i
            async_q.task_done()
    
    
    async def main():
        queue: janus.Queue[int] = janus.Queue()
        loop = asyncio.get_running_loop()
        fut = loop.run_in_executor(None, threaded, queue.sync_q)
        await async_coro(queue.async_q)
        await fut
        queue.close()
        await queue.wait_closed()
    
    
    asyncio.run(main())
    

    Warnings:

    "_SyncQueueProxy" is private and used outside of the module in which it is declared

    "_AsyncQueueProxy" is private and used outside of the module in which it is declared

    What is the preferred way to add type hints to janus Queues?

    opened by rbuffat 3
  • Fix compatibility with Python 3.10, refs #358

    Fix compatibility with Python 3.10, refs #358

    What do these changes do?

    Tests now also run against Python 3.10, and code incorporates a workaround for a bug in Python 3.10 that affects Janus.

    Related issue number

    • #358

    Checklist

    • [x] I think the code is well written
    • [x] Unit tests for the changes exist
    • [x] Documentation reflects the changes - (not needed)
    • [ ] Add a new news fragment into the CHANGES folder (skipped, not sure how to do this)
    opened by simonw 3
  • Add CodeQL workflow

    Add CodeQL workflow

    Hi aio-libs/janus!

    This is not an automatic, 🤖-generated PR, as you can check in my GitHub profile, I work for GitHub and I am part of the GitHub Security Lab which is helping out with the migration of LGTM configurations to Code Scanning. You might have heard that we've integrated LGTM's underlying CodeQL analysis engine natively into GitHub. The result is GitHub code scanning!

    With LGTM fully integrated into code scanning, we are focused on improving CodeQL within the native GitHub code scanning experience. In order to take advantage of current and future improvements to our analysis capabilities, we suggest you enable code scanning on your repository. Please take a look at our blog post for more information.

    This pull request enables code scanning by adding an auto-generated codeql.yml workflow file for GitHub Actions to your repository — take a look! We tested it before opening this pull request, so all should be working :heavy_check_mark:. In fact, you might already have seen some alerts appear on this pull request!

    Where needed and if possible, we’ve adjusted the configuration to the needs of your particular repository. But of course, you should feel free to tweak it further! Check this page for detailed documentation.

    Questions? Check out the FAQ below!

    FAQ

    Click here to expand the FAQ section

    How often will the code scanning analysis run?

    By default, code scanning will trigger a scan with the CodeQL engine on the following events:

    • On every pull request — to flag up potential security problems for you to investigate before merging a PR.
    • On every push to your default branch and other protected branches — this keeps the analysis results on your repository’s Security tab up to date.
    • Once a week at a fixed time — to make sure you benefit from the latest updated security analysis even when no code was committed or PRs were opened.

    What will this cost?

    Nothing! The CodeQL engine will run inside GitHub Actions, making use of your unlimited free compute minutes for public repositories.

    What types of problems does CodeQL find?

    The CodeQL engine that powers GitHub code scanning is the exact same engine that powers LGTM.com. The exact set of rules has been tweaked slightly, but you should see almost exactly the same types of alerts as you were used to on LGTM.com: we’ve enabled the security-and-quality query suite for you.

    How do I upgrade my CodeQL engine?

    No need! New versions of the CodeQL analysis are constantly deployed on GitHub.com; your repository will automatically benefit from the most recently released version.

    The analysis doesn’t seem to be working

    If you get an error in GitHub Actions that indicates that CodeQL wasn’t able to analyze your code, please follow the instructions here to debug the analysis.

    How do I disable LGTM.com?

    If you have LGTM’s automatic pull request analysis enabled, then you can follow these steps to disable the LGTM pull request analysis. You don’t actually need to remove your repository from LGTM.com; it will automatically be removed in the next few months as part of the deprecation of LGTM.com (more info here).

    Which source code hosting platforms does code scanning support?

    GitHub code scanning is deeply integrated within GitHub itself. If you’d like to scan source code that is hosted elsewhere, we suggest that you create a mirror of that code on GitHub.

    How do I know this PR is legitimate?

    This PR is filed by the official LGTM.com GitHub App, in line with the deprecation timeline that was announced on the official GitHub Blog. The proposed GitHub Action workflow uses the official open source GitHub CodeQL Action. If you have any other questions or concerns, please join the discussion here in the official GitHub community!

    I have another question / how do I get in touch?

    Please join the discussion here to ask further questions and send us suggestions!

    opened by Kwstubbs 0
  • No running event loop

    No running event loop

    I am having trouble creating the janus Queue outside of asyncio. It requires me to make dummy background tasks to just create the object, since not running loop exists I am wonder if this could be lazy or allow the loop to be passed in the constructor? It makes sync to async code a tad harder.

    opened by aaronclong 0
  • Added the possibility of synchronous initialization

    Added the possibility of synchronous initialization

    What do these changes do?

    Prior to this commit, you could only initialize a queue in an asynchronous function. Now it is possible in synchronous, but you need to pass the event loop (even empty\not running) to the constructor

    Just a new way to init janus. But gives more ways to handle exceptions and code manage.

    (Minimum code to understand the changes)

    Before

    import asyncio
    import janus
    
    def async_f(async_q: janus.AsyncQueue[int]):
        ...
    def sync_f(sync_q: janus.SyncQueue[int]):
        ...
    async def main() -> None:
        loop = asyncio.get_running_loop()
        queue: janus.Queue[int] = janus.Queue()
    
        loop.run_in_executor(None, sync_f, queue.sync_q)
        await async_f(queue.async_q)
        ...
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    Now

    import asyncio
    import janus
    
    def async_f(async_q: janus.AsyncQueue[int]):
        ...
    def sync_f(sync_q: janus.SyncQueue[int]):
        ...
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        queue: janus.Queue[int] = janus.Queue(loop=loop)
        loop.create_task(async_f(queue.async_q))
        loop.run_in_executor(None, sync_f, queue.sync_q)
        try:
             loop.run_forever()
        except KeyboardInterrupt:
             pass
    
    

    Are there changes in behavior for the user?

    There are no behavior changes for users.

    Checklist

    • [X] I think the code is well written
    • [ ] Unit tests for the changes exist
    • [ ] Documentation reflects the changes
    • [ ] Add a new news fragment into the CHANGES folder
      • name it <issue_id>.<type> (e.g. 588.bugfix)
      • if you don't have an issue_id change it to the pr id after creating the PR
      • ensure type is one of the following:
        • .feature: Signifying a new feature.
        • .bugfix: Signifying a bug fix.
        • .doc: Signifying a documentation improvement.
        • .removal: Signifying a deprecation or removal of public API.
        • .misc: A ticket has been closed, but it is not of interest to users.
      • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.
    opened by s0d3s 0
  • Actually include LICENSE file

    Actually include LICENSE file

    What do these changes do?

    License file is missing from distribution. This change fixes an issue.

    Are there changes in behavior for the user?

    Yes. Users can read license file, packagers shouldn't add hacks to package license file.

    Related issue number

    No issues.

    Checklist

    • [x] I think the code is well written
    • [x] Unit tests for the changes exist
    • [x] Documentation reflects the changes
    • [x] Add a new news fragment into the CHANGES folder
      • name it <issue_id>.<type> (e.g. 588.bugfix)
      • if you don't have an issue_id change it to the pr id after creating the PR
      • ensure type is one of the following:
        • .feature: Signifying a new feature.
        • .bugfix: Signifying a bug fix.
        • .doc: Signifying a documentation improvement.
        • .removal: Signifying a deprecation or removal of public API.
        • .misc: A ticket has been closed, but it is not of interest to users.
      • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.

    (I've checked all checkboxes, however they are not applicable, and there is no CHANGES folder in this repository)

    opened by rominf 0
  • Double-ended Queue Support

    Double-ended Queue Support

    Hey! Recently I found your library and it's really helping me with the communication between sub thread coroutines and main thread functions. One of my use cases needs the ability to clear all the already exiting messages in a queue, or to insert the new message to the end of the queue. That ability is not possible when using a normal queue, so I wanted to know if there is any planning for a double-ended queue support any time soon. Thanks!

    opened by ronigober 0
  • Performance benefits?

    Performance benefits?

    So I've been testing the performance a bit, varying e.g.

    • 1:1, 16:1, 64:1 producer : consumer ratio
    • normal/lifo/priority queue etc
    • asyncio/uvloop event loop
    • async -> async, async -> sync, sync -> sync
    • (probably more)

    I found that, janus queues are ~5x slower in sync->sync, ~9x slower in sync->async, and ~15x slower in async->async. This is pretty much consistent across all parameter sets.

    This confirmed my suspicion that the performance gain of parallel computation is often less than the cost of using e.g. threading.Lock a lot (the GIL certainly doesn't help either).

    Right now, I can imagine that many users have incorrect expectations of janus. To avoid this, you could add an example that shows how janus can outperform single-threaded asyncio, by employing multiple threads. Additionally, a caveat about janus' performance would be helpful.

    opened by jorenham 1
Releases(v1.0.0)
Owner
aio-libs
The set of asyncio-based libraries built with high quality
aio-libs
SCOOP (Scalable COncurrent Operations in Python)

SCOOP (Scalable COncurrent Operations in Python) is a distributed task module allowing concurrent parallel programming on various environments, from h

Yannick Hold 573 Dec 27, 2022
Trio – a friendly Python library for async concurrency and I/O

Trio – a friendly Python library for async concurrency and I/O The Trio project aims to produce a production-quality, permissively licensed, async/awa

null 5k Jan 7, 2023
A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs.

A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs, supporting both control flow and dataflow execution paradigms as well as de-centralized CPU & GPU scheduling.

null 102 Jan 6, 2023
A Python package for easy multiprocessing, but faster than multiprocessing

MPIRE, short for MultiProcessing Is Really Easy, is a Python package for multiprocessing, but faster and more user-friendly than the default multiprocessing package.

null 753 Dec 29, 2022
Simple package to enhance Python's concurrent.futures for memory efficiency

future-map is a Python library to use together with the official concurrent.futures module.

Arai Hiroki 2 Nov 15, 2022
Python HTTP library with thread-safe connection pooling, file post support, user friendly, and more.

urllib3 is a powerful, user-friendly HTTP client for Python. Much of the Python ecosystem already uses urllib3 and you should too. urllib3 brings many

urllib3 3.2k Dec 29, 2022
Python HTTP library with thread-safe connection pooling, file post support, user friendly, and more.

urllib3 is a powerful, user-friendly HTTP client for Python. Much of the Python ecosystem already uses urllib3 and you should too. urllib3 brings many

urllib3 3.2k Jan 2, 2023
Thread-safe Python RabbitMQ Client & Management library

AMQPStorm Thread-safe Python RabbitMQ Client & Management library. Introduction AMQPStorm is a library designed to be consistent, stable and thread-sa

Erik Olof Gunnar Andersson 167 Nov 20, 2022
Pglive - Pglive package adds support for thread-safe live plotting to pyqtgraph

Live pyqtgraph plot Pglive package adds support for thread-safe live plotting to

Martin Domaracký 15 Dec 10, 2022
Mr. Queue - A distributed worker task queue in Python using Redis & gevent

MRQ MRQ is a distributed task queue for python built on top of mongo, redis and gevent. Full documentation is available on readthedocs Why? MRQ is an

Pricing Assistant 871 Dec 25, 2022
A django integration for huey task queue that supports multi queue management

django-huey This package is an extension of huey contrib djhuey package that allows users to manage multiple queues. Installation Using pip package ma

GAIA Software 32 Nov 26, 2022
Yet Another Python Profiler, but this time thread&coroutine&greenlet aware.

Yappi Yet Another Python Profiler, but this time thread&coroutine&greenlet aware. Highlights Fast: Yappi is fast. It is completely written in C and lo

Sümer Cip 1k Jan 1, 2023
Aiorq is a distributed task queue with asyncio and redis

Aiorq is a distributed task queue with asyncio and redis, which rewrite from arq to make improvement and include web interface.

PY-GZKY 5 Mar 18, 2022
Python script to download all images/webms of a 4chan thread

Python3 script to continuously download all images/webms of multiple 4chan thread simultaneously - without installation

Micha Fink 208 Jan 4, 2023
🕷 Phone Crawler with multi-thread functionality

Phone Crawler: Phone Crawler with multi-thread functionality Disclaimer: I'm not responsible for any illegal/misuse actions, this program was made for

Kmuv1t 3 Feb 10, 2022
Sync Laravel queue with Python. Provides an interface for communication between Laravel and Python.

Python Laravel Queue Queue sync between Python and Laravel using Redis driver. You can process jobs dispatched from Laravel in Python. NOTE: This pack

Sinan Bekar 3 Oct 1, 2022
a little task queue for python

a lightweight alternative. huey is: a task queue (2019-04-01: version 2.0 released) written in python (2.7+, 3.4+) clean and simple API redis, sqlite,

Charles Leifer 4.3k Jan 8, 2023
a little task queue for python

a lightweight alternative. huey is: a task queue (2019-04-01: version 2.0 released) written in python (2.7+, 3.4+) clean and simple API redis, sqlite,

Charles Leifer 4.3k Dec 29, 2022
A Python 3 client for the beanstalkd work queue

Greenstalk Greenstalk is a small and unopinionated Python client library for communicating with the beanstalkd work queue. The API provided mostly map

Justin Mayhew 67 Dec 8, 2022
MM1 and MMC Queue Simulation using python - Results and parameters in excel and csv files

implementation of MM1 and MMC Queue on randomly generated data and evaluate simulation results then compare with analytical results and draw a plot curve for them, simulate some integrals and compare results and run monte carlo algorithm with them

Mohamadreza Rezaei 1 Jan 19, 2022