Aiorq is a distributed task queue with asyncio and redis

Overview

πŸ‘½ Aiorq

Introduction

Aiorq is a distributed task queue with asyncio and redis, which rewrite from arq to make improvement and include web interface.

See documentation for more details.

Requirements

  • redis >= 5.0
  • aioredis>=1.1.0 <2.0.0

Install

pip install aiorq
pip install aioredis

Quick Start

Task Definition

None: await asyncio.sleep(3) print(f"Hi {name}") async def startup(ctx): print("starting... done") async def shutdown(ctx): print("ending... done") async def run_cron(ctx, time_='2021-11-16 10:26:05'): print(time_) class WorkerSettings: redis_settings = RedisSettings( host=os.getenv("REDIS_HOST", "127.0.0.1"), port=os.getenv("REDIS_PORT", 6379), database=os.getenv("REDIS_DATABASE", 0), password=os.getenv("REDIS_PASSWORD", None) ) functions = [say_hello, say_hi] on_startup = startup on_shutdown = shutdown cron_jobs = [ cron(coroutine=run_cron, name="x100", minute=40, second=50, keep_result_forever=True) ] # allow_abort_jobs = True # worker_name = "ohuo" # queue_name = "ohuo" ">
# tasks.py
# -*- coding: utf-8 -*-

import asyncio
import os

from aiorq.connections import RedisSettings
from aiorq.cron import cron


async def say_hello(ctx, name) -> None:
    await asyncio.sleep(5)
    print(f"Hello {name}")


async def say_hi(ctx, name) -> None:
    await asyncio.sleep(3)
    print(f"Hi {name}")


async def startup(ctx):
    print("starting... done")


async def shutdown(ctx):
    print("ending... done")


async def run_cron(ctx, time_='2021-11-16 10:26:05'):
    print(time_)


class WorkerSettings:
    redis_settings = RedisSettings(
        host=os.getenv("REDIS_HOST", "127.0.0.1"),
        port=os.getenv("REDIS_PORT", 6379),
        database=os.getenv("REDIS_DATABASE", 0),
        password=os.getenv("REDIS_PASSWORD", None)
    )

    functions = [say_hello, say_hi]

    on_startup = startup

    on_shutdown = shutdown

    cron_jobs = [
        cron(coroutine=run_cron, name="x100", minute=40, second=50, keep_result_forever=True)
    ]

    # allow_abort_jobs = True

    # worker_name = "ohuo"
    # queue_name = "ohuo"

Run aiorq worker

> aiorq tasks.WorkerSettings
15:08:50: Starting Queue: ohuo
15:08:50: Starting Worker: ohuo@04dce85c-1798-43eb-89d8-7c6d78919feb
15:08:50: Starting Functions: say_hello, EnHeng
15:08:50: redis_version=5.0.10 mem_usage=731.12K clients_connected=2 db_keys=9
starting...

Integration in FastAPI

None: app.state.redis = await create_pool( RedisSettings( host=os.getenv("REDIS_HOST", "127.0.0.1"), port=os.getenv("REDIS_PORT", 6379), database=os.getenv("REDIS_DATABASE", 0), password=os.getenv("REDIS_PASSWORD", None) ) ) @app.get("/get_health_check") async def get_health_check(request: Request, worker_name): result = await request.app.state.redis._get_health_check(worker_name=worker_name) return {"result": json.loads(result)} @app.get("/enqueue_job_") async def enqueue_job_(request: Request): job = await request.app.state.redis.enqueue_job('qy_spider_', _queue_name="comment_queue", _job_try=4) job_ = await job.info() return {"job_": job_} @app.get("/index") async def index(request: Request): functions = await request.app.state.redis.all_tasks() workers = await request.app.state.redis.all_workers() results = await request.app.state.redis.all_job_results() functions_num = len(json.loads(functions)) workers_num = len(workers) results_num = len(results) results = {"functions_num": functions_num, "workers_num": workers_num, "results_num": results_num} return {"results": results} @app.get("/get_all_workers") async def get_all_workers(request: Request): results = await request.app.state.redis.all_workers() results = [json.loads(v) for v in results] return {"results": results} @app.get("/get_all_functions") async def get_all_functions(request: Request): results = await request.app.state.redis.all_tasks() return {"results": json.loads(results)} @app.get("/get_all_result") async def get_all_result(request: Request, worker=None, task=None, job_id=None): all_result_ = await request.app.state.redis.all_job_results() if worker: all_result_ = [result_ for result_ in all_result_ if result_.get("worker_name") == worker] if task: all_result_ = [result_ for result_ in all_result_ if result_.get("function") == task] if job_id: all_result_ = [result_ for result_ in all_result_ if result_.get("job_id") == job_id] return {"results_": all_result_} @app.get("/queued_jobs") async def queued_jobs(request: Request, queue_name="aiorq:queue"): queued_jobs_ = await request.app.state.redis.queued_jobs(queue_name=queue_name) queued_jobs__ = [] for queued_job_ in queued_jobs_: state = await Job(job_id=queued_job_.__dict__.get("job_id"), redis=request.app.state.redis, _queue_name=queue_name).status() queued_job_.__dict__.update({"state": state}) queued_jobs__.append(queued_job_) return {"queued_jobs": queued_jobs__} # job status @app.get("/job_status") async def job_status(request: Request, job_id="12673208ee3b417192b7cce06844adda", _queue_name="aiorq:queue"): job_status_ = await Job(job_id=job_id, redis=request.app.state.redis, _queue_name=_queue_name).info() return {"job_status_": job_status_} if __name__ == '__main__': import uvicorn uvicorn.run(app='main:app', host="0.0.0.0", port=9999, reload=True) ">
# -*- coding: utf-8 -*-
import json
import os

from fastapi import FastAPI
from starlette.requests import Request

from aiorq.connections import RedisSettings, create_pool
from aiorq.jobs import Job

app = FastAPI()


@app.on_event("startup")
async def startup() -> None:
    app.state.redis = await create_pool(
        RedisSettings(
            host=os.getenv("REDIS_HOST", "127.0.0.1"),
            port=os.getenv("REDIS_PORT", 6379),
            database=os.getenv("REDIS_DATABASE", 0),
            password=os.getenv("REDIS_PASSWORD", None)
        )
    )


@app.get("/get_health_check")
async def get_health_check(request: Request, worker_name):
    result = await request.app.state.redis._get_health_check(worker_name=worker_name)
    return {"result": json.loads(result)}


@app.get("/enqueue_job_")
async def enqueue_job_(request: Request):
    job = await request.app.state.redis.enqueue_job('qy_spider_', _queue_name="comment_queue", _job_try=4)
    job_ = await job.info()
    return {"job_": job_}


@app.get("/index")
async def index(request: Request):
    functions = await request.app.state.redis.all_tasks()
    workers = await request.app.state.redis.all_workers()
    results = await request.app.state.redis.all_job_results()
    functions_num = len(json.loads(functions))
    workers_num = len(workers)
    results_num = len(results)
    results = {"functions_num": functions_num, "workers_num": workers_num, "results_num": results_num}
    return {"results": results}


@app.get("/get_all_workers")
async def get_all_workers(request: Request):
    results = await request.app.state.redis.all_workers()
    results = [json.loads(v) for v in results]
    return {"results": results}


@app.get("/get_all_functions")
async def get_all_functions(request: Request):
    results = await request.app.state.redis.all_tasks()
    return {"results": json.loads(results)}


@app.get("/get_all_result")
async def get_all_result(request: Request, worker=None, task=None, job_id=None):
    all_result_ = await request.app.state.redis.all_job_results()
    if worker:
        all_result_ = [result_ for result_ in all_result_ if result_.get("worker_name") == worker]
    if task:
        all_result_ = [result_ for result_ in all_result_ if result_.get("function") == task]
    if job_id:
        all_result_ = [result_ for result_ in all_result_ if result_.get("job_id") == job_id]

    return {"results_": all_result_}

@app.get("/queued_jobs")
async def queued_jobs(request: Request, queue_name="aiorq:queue"):
    queued_jobs_ = await request.app.state.redis.queued_jobs(queue_name=queue_name)
    queued_jobs__ = []
    for queued_job_ in queued_jobs_:
        state = await Job(job_id=queued_job_.__dict__.get("job_id"), redis=request.app.state.redis,
                          _queue_name=queue_name).status()
        queued_job_.__dict__.update({"state": state})
        queued_jobs__.append(queued_job_)
    return {"queued_jobs": queued_jobs__}


# job status
@app.get("/job_status")
async def job_status(request: Request, job_id="12673208ee3b417192b7cce06844adda", _queue_name="aiorq:queue"):
    job_status_ = await Job(job_id=job_id, redis=request.app.state.redis, _queue_name=_queue_name).info()
    return {"job_status_": job_status_}


if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app='main:app', host="0.0.0.0", port=9999, reload=True)

Thanks

License

MIT

You might also like...
Real-time monitor and web admin for Celery distributed task queue

Flower Flower is a web based tool for monitoring and administrating Celery clusters. Features Real-time monitoring using Celery Events Task progress a

A multiprocessing distributed task queue for Django

A multiprocessing distributed task queue for Django Features Multiprocessing worker pool Asynchronous tasks Scheduled, cron and repeated tasks Signed

Distributed Task Queue (development branch)
Distributed Task Queue (development branch)

Version: 5.0.5 (singularity) Web: http://celeryproject.org/ Download: https://pypi.org/project/celery/ Source: https://github.com/celery/celery/ Keywo

A multiprocessing distributed task queue for Django

A multiprocessing distributed task queue for Django Features Multiprocessing worker pool Asynchronous tasks Scheduled, cron and repeated tasks Signed

Distributed Task Queue (development branch)
Distributed Task Queue (development branch)

Version: 5.1.0b1 (singularity) Web: https://docs.celeryproject.org/en/stable/index.html Download: https://pypi.org/project/celery/ Source: https://git

A simple docker-compose app for orchestrating a fastapi application, a celery queue with rabbitmq(broker) and redis(backend)

fastapi - celery - rabbitmq - redis - Docker A simple docker-compose app for orchestrating a fastapi application, a celery queue with rabbitmq(broker

A simple app that provides django integration for RQ (Redis Queue)
A simple app that provides django integration for RQ (Redis Queue)

Django-RQ Django integration with RQ, a Redis based Python queuing library. Django-RQ is a simple app that allows you to configure your queues in djan

RQ (Redis Queue) integration for Flask applications

Flask-RQ RQ (Redis Queue) integration for Flask applications Resources Documentation Issue Tracker Code Development Version Installation $ pip install

A simple app that provides django integration for RQ (Redis Queue)
A simple app that provides django integration for RQ (Redis Queue)

Django-RQ Django integration with RQ, a Redis based Python queuing library. Django-RQ is a simple app that allows you to configure your queues in djan

Redis-backed message queue implementation that can hook into a discord bot written with hikari-lightbulb.

Redis-backed FIFO message queue implementation that can hook into a discord bot written with hikari-lightbulb. This is eventually intended to be the backend communication between a bot and a web dashboard.

Interactive Redis: A Terminal Client for Redis with AutoCompletion and Syntax Highlighting.
Interactive Redis: A Terminal Client for Redis with AutoCompletion and Syntax Highlighting.

Interactive Redis: A Cli for Redis with AutoCompletion and Syntax Highlighting. IRedis is a terminal client for redis with auto-completion and syntax

Cache-house - Caching tool for python, working with Redis single instance and Redis cluster mode

Caching tool for python, working with Redis single instance and Redis cluster mo

Python cluster client for the official redis cluster. Redis 3.0+.

redis-py-cluster This client provides a client for redis cluster that was added in redis 3.0. This project is a port of redis-rb-cluster by antirez, w

Py-instant-search-redis - Source code example for how to build an instant search with redis in python

py-instant-search-redis Source code example for how to build an instant search (

asyncio (PEP 3156) Redis support

aioredis asyncio (PEP 3156) Redis client library. Features hiredis parser Yes Pure-python parser Yes Low-level & High-level APIs Yes Connections Pool

Redis client for Python asyncio (PEP 3156)

Redis client for Python asyncio. Redis client for the PEP 3156 Python event loop. This Redis library is a completely asynchronous, non-blocking client

An asyncio compatible Redis driver, written purely in Python. This is really just a pet-project for me.
An asyncio compatible Redis driver, written purely in Python. This is really just a pet-project for me.

asyncredis An asyncio compatible Redis driver. Just a pet-project. Information asyncredis is, like I've said above, just a pet-project for me. I reall

A task scheduler with task scheduling, timing and task completion time tracking functions

A task scheduler with task scheduling, timing and task completion time tracking functions. Could be helpful for time management in daily life.

Bigdata - This Scrapy project uses Redis and Kafka to create a distributed on demand scraping cluster

Scrapy Cluster This Scrapy project uses Redis and Kafka to create a distributed

Owner
PY-GZKY
δ½ ε₯½ζ‘ε“₯
PY-GZKY
A Python concurrency scheduling library, compatible with asyncio and trio.

aiometer aiometer is a Python 3.6+ concurrency scheduling library compatible with asyncio and trio and inspired by Trimeter. It makes it easier to exe

Florimond Manca 182 Dec 26, 2022
Remote task execution tool

Gunnery Gunnery is a multipurpose task execution tool for distributed systems with web-based interface. If your application is divided into multiple s

Gunnery 747 Nov 9, 2022
Clepsydra is a mini framework for task scheduling

Intro Clepsydra is a mini framework for task scheduling All parts are designed to be replaceable. Main ideas are: No pickle! Tasks are stored in reada

Andrey Tikhonov 15 Nov 4, 2022
generate HPC scheduler systems jobs input scripts and submit these scripts to HPC systems and poke until they finish

DPDispatcher DPDispatcher is a python package used to generate HPC(High Performance Computing) scheduler systems (Slurm/PBS/LSF/dpcloudserver) jobs in

DeepModeling 23 Nov 30, 2022
A simple scheduler tool that provides desktop notifications about classes and opens their meet links in the browser automatically at the start of the class.

This application provides desktop notifications about classes and opens their meet links in browser automatically at the start of the class.

Anshit 14 Jun 29, 2022
Here is the live demonstration of endpoints and celery worker along with RabbitMQ

whelp-task Here is the live demonstration of endpoints and celery worker along with RabbitMQ Before running the application make sure that you have yo

Yalchin403 0 Nov 14, 2021
Mr. Queue - A distributed worker task queue in Python using Redis & gevent

MRQ MRQ is a distributed task queue for python built on top of mongo, redis and gevent. Full documentation is available on readthedocs Why? MRQ is an

Pricing Assistant 871 Dec 25, 2022
A django integration for huey task queue that supports multi queue management

django-huey This package is an extension of huey contrib djhuey package that allows users to manage multiple queues. Installation Using pip package ma

GAIA Software 32 Nov 26, 2022
Minimal example utilizing fastapi and celery with RabbitMQ for task queue, Redis for celery backend and flower for monitoring the celery tasks.

FastAPI with Celery Minimal example utilizing FastAPI and Celery with RabbitMQ for task queue, Redis for Celery backend and flower for monitoring the

Grega Vrbančič 371 Jan 1, 2023
Fully Automated YouTube Channel ▢️with Added Extra Features.

Fully Automated Youtube Channel β–’β–ˆβ–€β–€β–ˆ β–ˆβ–€β–€β–ˆ β–€β–€β–ˆβ–€β–€ β–€β–€β–ˆβ–€β–€ β–ˆβ–‘β–‘β–ˆ β–ˆβ–€β–€β–„ β–ˆβ–€β–€ β–ˆβ–€β–€β–ˆ β–’β–ˆβ–€β–€β–„ β–ˆβ–‘β–‘β–ˆ β–‘β–‘β–ˆβ–‘β–‘ β–‘β–’β–ˆβ–‘β–‘ β–ˆβ–‘β–‘β–ˆ β–ˆβ–€β–€β–„ β–ˆβ–€β–€ β–ˆβ–„β–„β–€ β–’β–ˆβ–„β–„β–ˆ β–€β–€β–€β–€ β–‘β–‘β–€β–‘β–‘ β–‘β–’β–ˆβ–‘β–‘ β–‘β–€β–€β–€ β–€β–€β–€β–‘

sam-sepiol 249 Jan 2, 2023