SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis

Overview

SAQ

SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis. It can be used for processing background jobs with workers. For example, you could use SAQ to schedule emails, execute long queries, or do expensive data analysis.

It uses aioredis >= 2.0.

It is similar to RQ and heavily inspired by ARQ. Unlike RQ, it is async and thus significantly faster if your jobs are async. Even if they are not, SAQ is still considerably faster due to lower overhead.

SAQ optionally comes with a simple UI for monitor workers and jobs.

SAQ Web UI

Install

# minimal install
pip install saq

# web + hiredis
pip install saq[web,hiredis]

Usage

usage: saq [-h] [--workers WORKERS] [--verbose] [--web] settings

Start Simple Async Queue Worker

positional arguments:
  settings           Namespaced variable containing worker settings eg: eg module_a.settings

options:
  -h, --help         show this help message and exit
  --workers WORKERS  Number of worker processes
  --verbose, -v      Logging level: 0: ERROR, 1: INFO, 2: DEBUG
  --web              Start web app

Example

import asyncio

from saq import Queue

# all functions take in context dict and kwargs
async def test(ctx, *, a):
    await asyncio.sleep(0.5)
    # result should be json serializable
    # custom serializers and deserializers can be used through Queue(dump=,load=)
    return {"x": a}

async def startup(ctx):
    await ctx["db"] = create_db()

async def shutdown(ctx):
    await ctx["db"].disconnect()

async def before_process(ctx):
    print(ctx["job"], ctx["db"])

async def after_process(ctx):
    pass

queue = Queue.from_url("redis://localhost")

settings = {
    "queue": queue,
    "functions": [test],
    "concurrency": 10,
    "startup": startup,
    "shutdown": shutdown,
    "before_process": before_process,
    "after_process": after_process,
}

To start the worker, assuming the previous is available in the python path

saq module.file.settings

To enqueue jobs

# schedule a job normally
job = await queue.enqueue("test", a=1)

# wait 1 second for the job to complete
await job.refresh(1)
print(job.results)

# schedule a job in 10 seconds
await queue.enqueue("test", a=1, scheduled=time.time() + 10)

Demo

Start the worker

saq examples.simple.settings --web

Navigate to the web ui

Enqueue jobs

python examples/simple.py

Comparison to ARQ

SAQ is heavily inspired by ARQ but has several enhancements.

  1. Avoids polling by leveraging BLMOVE or RPOPLPUSH and NOTIFY
    1. SAQ has much lower latency than ARQ, with delays of < 5ms. ARQ's default polling frequency is 0.5 seconds
    2. SAQ is up to 8x faster than ARQ
  2. Web interface for monitoring queues and workers
  3. Heartbeat monitor for abandoned jobs
  4. More robust failure handling
    1. Storage of stack traces
    2. Sweeping stuck jobs
    3. Handling of cancelled jobs different from failed jobs (machine redeployments)
  5. Before and after job hooks
  6. Easily run multiple workers to leverage more cores

Development

python -m venv env
source env/bin/activate
pip install -e .[dev,web]
docker run -p 6379:6379 redis
./run_checks.sh
Comments
  • Feat: Adds type annotations

    Feat: Adds type annotations

    • Used MonkeyType and the test suite to generate a set of annotations
    • Added mypy and necessary stub dependencies to dev extras
    • Adds types.py module for type definitions/aliases etc
    • Adds py.typed
    • Removes support for 3.7

    Related to #39

    opened by peterschutt 11
  • Add type annotations

    Add type annotations

    It would be really great if this project had PEP 484 type hints, for better editor support and integration with Mypy.

    I'd be willing to help with this.

    opened by MrAwesomeRocks 8
  • Fix/logging

    Fix/logging

    Hi,

    Thanks a lot for this library.

    I implemented a small fix on the way the cli logging config is set-up.

    From what I understood, the issue with basicConfig is that is sets a handler on the root logger, which can be annoying for applications using this library because python's logging automatically propagates the children logs to its parent, resulting in duplicate logs printed. Here's a excerpt from my logs:

    I, [2022-05-27T20:03:35.260 #73489]     INFO -- : [None] Processing Job<...>
    INFO:saq:Processing Job<...>
    I, [2022-05-27T20:03:35.260 #73489]     INFO -- : [None] Enqueuing Job<...>
    INFO:saq:Enqueuing Job<...>
    I, [2022-05-27T20:03:35.262 #73489]     INFO -- : [None] Finished Job<...>
    INFO:saq:Finished Job<...>
    

    A solution to fix that for the client application is to detach all handlers from the root logger, but it can be cumbersome.

    Another solution is to replace the usage of basicConfig by dictConfig, as I did here.

    opened by paul-finary 6
  • `ModuleNotFoundError` on `saq` start

    `ModuleNotFoundError` on `saq` start

    Hello! Thank you for nice framework!

    I'm trying to use it, but getting an exception on attempt to start worker:

    $ saq app.worker.settings
    Traceback (most recent call last):
      File "********/saq-test/.venv/bin/saq", line 8, in <module>
        sys.exit(main())
      File "********/saq-test/.venv/lib/python3.10/site-packages/saq/__main__.py", line
     73, in main
        start(
      File "********/saq-test/.venv/lib/python3.10/site-packages/saq/worker.py", line 2
    83, in start
        settings = import_settings(settings)
      File "********/saq-test/.venv/lib/python3.10/site-packages/saq/worker.py", line 2
    78, in import_settings
        module = importlib.import_module(module_path)
      File "/usr/lib/python3.10/importlib/__init__.py", line 126, in import_module
        return _bootstrap._gcd_import(name[level:], package, level)
      File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
      File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
      File "<frozen importlib._bootstrap>", line 992, in _find_and_load_unlocked
      File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
      File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
      File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
      File "<frozen importlib._bootstrap>", line 1004, in _find_and_load_unlocked
    ModuleNotFoundError: No module named 'app'
    

    Project structure:

     ├── app
     │  ├── __init__.py
     │  └── worker.py
     ├── poetry.lock
     └── pyproject.toml
    

    worker.py contents:

    from typing import Any, Dict
    
    from saq import Queue
    
    
    async def startup(_: Dict[str, Any]) -> None:
        print("Hello from startup job!")
    
    
    settings = {
        "queue": Queue.from_url("redis://localhost"),
        "functions": [],
        "concurrency": 8,
        "startup": startup,
    }
    

    If I try to use importlib from interpreter, everything is OK:

    >>> import importlib
    >>> importlib.import_module("app.worker")
    <module 'app.worker' from '********/saq-test/app/worker.py'>
    >>> _.settings
    {'queue': <saq.queue.Queue object at 0x7f81fcd52740>, 'functions': [], 'concurrency': 8, 'startup': <function startup at 0x7f81fcd681f0>}
    >>>
    

    I've found, that current directory not in sys.path and used PYTHONPATH to this fix the problem:

    $ PYTHONPATH="$PYTHONPATH:$PWD" saq app.worker.settings
    Hello from startup job!
    

    Could you help me to figure out, is this a bug or my fault?

    opened by lxmnk 6
  • Periodic task decorator

    Periodic task decorator

    I've been using this decorator to enable jobs to automatically execute update on an interval.

    The decorator determines if the job has a heartbeat setting enabled, and if so, it automatically ticks while the job is executing.

    The current formula for how often to tick is the following: max(round(job.heartbeat/2),1).

    I've added an example of how it works in the examples/monitored.py, and I took a shot at test cases.

    from saq.job import monitored_job
    
    @monitored_job
    async def monitored_sleeper(ctx):
        await asyncio.sleep(ctx.get("sleep", 10))
        return {"a": 1, "b": []}
    

    The heartbeat will automatically start once you've scheduled a job for this function with job.heartbeat

    await queue.apply(
            func,
            heartbeat=3,
        )
    

    After starting a job with the decorator applied, you'll see that it's executing periodically in the logs:

    INFO:saq:Processing Job<function=monitored_cron_job, queue=default, id=saq:job:default:cron:monitored_cron_job, scheduled=1654467040, progress=0.0, start_ms=16107, attempts=1, status=active, meta={}>
    INFO:saq:starting heartbeat service for saq:job:default:cron:monitored_cron_job. Ticking every 1 second(s)
    excuting cron job
    INFO:saq:ticking heartbeat for saq:job:default:cron:monitored_cron_job, and sleeping for 1 second(s)
    INFO:saq:stopping heartbeat service for saq:job:default:cron:monitored_cron_job
    
    opened by cofin 4
  • Mechanism for transferring state from enqueing process to worker

    Mechanism for transferring state from enqueing process to worker

    Hi @tobymao,

    The before_process and after_process hooks are great. Do you think it would also be possible to add an event hook for on_enqueue.

    Celery has the before_task_publish event - this is really what I'd like to mirror.

    # celery example
    
    @before_task_publish.connect
    def transfer_correlation_id(headers: Dict[str, str], **kwargs: Any) -> None:
        """
        Attach request ID to the task headers.
        """
        cid = correlation_id.get() 
        if cid:
            headers[header_key] = cid
    

    I need to be able to pass state from this event handler, to the worker, so there needs to be some data structure involved that is passed to the worker. Celery uses a plain dict I believe.

    If you think this is a good idea, I'd also be happy to open a PR for this myself 👍

    opened by sondrelg 4
  • Error a long task

    Error a long task

    I start a long task using a framework PlayWright

    ERROR:saq:Error processing job Job<function=fgis, kwargs={'cmd': 'parse', 'cn': '67:17:0010333:59'}, queue=fgis, id=saq:job:fgis:57646e99-7bcd-11ed-b618-2d9cfaea7267, scheduled=0, progress=0.0, start_ms=23, attempts=1, status=active, meta={}>
    Traceback (most recent call last):
      File "/root/playwright/saq_srv.py", line 18, in fgis
        return await w.job(cmd, **kvargs)
      File "/root/playwright/rosreestr/fgis/worker.py", line 51, in job
        self.current = await self.search.cadnum(cn, regions[cn[:2]])
      File "/root/playwright/rosreestr/fgis/_page/search.py", line 14, in cadnum
        await page.locator(".v-filterselect-input").first.type(region, delay=100)
      File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/async_api/_generated.py", line 15250, in type
        await self._impl_obj.type(
      File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_locator.py", line 558, in type
        return await self._frame.type(
      File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_frame.py", line 710, in type
        await self._channel.send("type", locals_to_params(locals()))
      File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_connection.py", line 44, in send
        return await self._connection.wrap_api_call(
      File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_connection.py", line 419, in wrap_api_call
        return await cb()
      File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_connection.py", line 70, in inner_send
        done, _ = await asyncio.wait(
      File "/usr/lib/python3.10/asyncio/tasks.py", line 384, in wait
        return await _wait(fs, timeout, return_when, loop)
      File "/usr/lib/python3.10/asyncio/tasks.py", line 491, in _wait
        await waiter
    asyncio.exceptions.CancelledError
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
        return fut.result()
    asyncio.exceptions.CancelledError
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/root/playwright/.pw/lib/python3.10/site-packages/saq/worker.py", line 247, in process
        result = await asyncio.wait_for(task, job.timeout)
      File "/usr/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
        raise exceptions.TimeoutError() from exc
    asyncio.exceptions.TimeoutError
    
    opened by genfild 3
  • Readiness and liveness probes

    Readiness and liveness probes

    Hi :wave:

    I just wanted to ask a quick question: have you spent any time considering what effective liveness/readiness probes might look like for saq @tobymao?

    For a web server I think convention is to ping a healthcheck endpoint, or something equivalent. I'm not aware of an obvious equivalent for a task runner.

    I'm sure this has been answered for other task runners, and I'm planning to do a little research now. Just asking in case you've either got an answer ready, or if there's an implementation detail unique to saq that would make sense to use for this :+1:

    (I'm not running the aiohttp part of saq, so I don't want to rely on that :slightly_smiling_face: )

    opened by sondrelg 3
  • Add authentication to web dashboard

    Add authentication to web dashboard

    It would be nice to have some authentication for the web dashboard, like flower, so not everyone can start/stop/retry tasks.

    It doesn't have to be anything fancy, HTTP Basic Auth with only one user should be enough.

    Alternatively, each request could require a ?token=XXX query parameter, where the token is passed in through an environment variable.

    opened by MrAwesomeRocks 3
  • fix: Replace exception logger function with logger.exception

    fix: Replace exception logger function with logger.exception

    This PR tweaks the way errors are logged, to include more metadata.

    I didn't spend a lot of time crafting the logger messages, so suggestions for better ones are very welcome 🙂

    Motivation

    We use Sentry for errors, and saq currently logs job exceptions in a way that's slightly too generic for Sentry. Since the plaintext dump of a traceback is logged without any extra information, all errors raised by saq are bundled together. Moreover, there is a size limit to log message length in Sentry, so we're not even receiving the full tracebacks (see screenshot at the bottom of the comment below).

    image

    Using logger.exception or logger.error(..., exc_info=True) (they're the same thing), we're able to get proper Sentry events, containing the complete traceback plus "breadcrumbs"

    image

    opened by sondrelg 3
  • Why are you using a Semaphore instead of a BlockingConnectionPool?

    Why are you using a Semaphore instead of a BlockingConnectionPool?

    Hi,

    I migrated from ARQ to SAQ (again: great job with this library, I find it way cleaner), and I faced some issues while migrating from aioredis 1.3.1 (used by ARQ) to redis-py 4.2.0 (used by SAQ) in other parts of my project.

    During my investigation, I found out that you implemented a way to throttle the number of connections to Redis using a semaphore (_op_sem) instead of redis-py's BlockingConnectionPool (which, with the max_connections set to your max_concurrent_ops and timeout set to None, would have the same behaviour: wait for a connection to be available before executing a command).

    So I wonder, is there a reason you choose to implement what seems to be your own BlockingConnectionPool instead of using redis-py's?

    opened by paul-finary 3
Owner
Toby Mao
Toby Mao
RQ (Redis Queue) integration for Flask applications

Flask-RQ RQ (Redis Queue) integration for Flask applications Resources Documentation Issue Tracker Code Development Version Installation $ pip install

Matt Wright 205 Nov 6, 2022
Redis-backed message queue implementation that can hook into a discord bot written with hikari-lightbulb.

Redis-backed FIFO message queue implementation that can hook into a discord bot written with hikari-lightbulb. This is eventually intended to be the backend communication between a bot and a web dashboard.

thomm.o 7 Dec 5, 2022
Simple job queues for Python

RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis and it is desi

RQ 8.7k Jan 7, 2023
Sync Laravel queue with Python. Provides an interface for communication between Laravel and Python.

Python Laravel Queue Queue sync between Python and Laravel using Redis driver. You can process jobs dispatched from Laravel in Python. NOTE: This pack

Sinan Bekar 3 Oct 1, 2022
a little task queue for python

a lightweight alternative. huey is: a task queue (2019-04-01: version 2.0 released) written in python (2.7+, 3.4+) clean and simple API redis, sqlite,

Charles Leifer 4.3k Jan 8, 2023
Distributed Task Queue (development branch)

Version: 5.0.5 (singularity) Web: http://celeryproject.org/ Download: https://pypi.org/project/celery/ Source: https://github.com/celery/celery/ Keywo

Celery 20.7k Jan 2, 2023
A multiprocessing distributed task queue for Django

A multiprocessing distributed task queue for Django Features Multiprocessing worker pool Asynchronous tasks Scheduled, cron and repeated tasks Signed

Ilan Steemers 1.7k Jan 3, 2023
Distributed Task Queue (development branch)

Version: 5.1.0b1 (singularity) Web: https://docs.celeryproject.org/en/stable/index.html Download: https://pypi.org/project/celery/ Source: https://git

Celery 20.7k Jan 1, 2023
Accept queue automatically on League of Legends.

Accept queue automatically on League of Legends. I was inspired by the lucassmonn code accept-queue-lol-telegram, and I modify it according to my need

null 2 Sep 6, 2022
Full featured redis cache backend for Django.

Redis cache backend for Django This is a Jazzband project. By contributing you agree to abide by the Contributor Code of Conduct and follow the guidel

Jazzband 2.5k Jan 3, 2023
Asynchronous tasks in Python with Celery + RabbitMQ + Redis

python-asynchronous-tasks Setup & Installation Create a virtual environment and install the dependencies: $ python -m venv venv $ source env/bin/activ

Valon Januzaj 40 Dec 3, 2022
OpenQueue is a experimental CS: GO match system written in asyncio python.

What is OpenQueue OpenQueue is a experimental CS: GO match system written in asyncio python. Please star! This project was a lot of work & still has a

OpenQueue 10 May 13, 2022
Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code

py_extract Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code. Still in Development Stage! I

I'm Not A Bot #Left_TG 7 Nov 7, 2021
A fast and reliable background task processing library for Python 3.

dramatiq A fast and reliable distributed task processing library for Python 3. Changelog: https://dramatiq.io/changelog.html Community: https://groups

Bogdan Popa 3.4k Jan 1, 2023
Flower is a web based tool for monitoring and administrating Celery clusters.

Real-time monitor and web admin for Celery distributed task queue

Mher Movsisyan 5.5k Jan 2, 2023
Clearly see and debug your celery cluster in real time!

Clearly see and debug your celery cluster in real time! Do you use celery, and monitor your tasks with flower? You'll probably like Clearly! ?? Clearl

Rogério Sampaio de Almeida 364 Jan 2, 2023
Queuing with django celery and rabbitmq

queuing-with-django-celery-and-rabbitmq Install Python 3.6 or above sudo apt-get install python3.6 Install RabbitMQ sudo apt-get install rabbitmq-ser

null 1 Dec 22, 2021
Mr. Queue - A distributed worker task queue in Python using Redis & gevent

MRQ MRQ is a distributed task queue for python built on top of mongo, redis and gevent. Full documentation is available on readthedocs Why? MRQ is an

Pricing Assistant 871 Dec 25, 2022
Aiorq is a distributed task queue with asyncio and redis

Aiorq is a distributed task queue with asyncio and redis, which rewrite from arq to make improvement and include web interface.

PY-GZKY 5 Mar 18, 2022