A multiprocessing distributed task queue for Django
Features
- Multiprocessing worker pool
- Asynchronous tasks
- Scheduled, cron and repeated tasks
- Signed and compressed packages
- Failure and success database or cache
- Result hooks, groups and chains
- Django Admin integration
- PaaS compatible with multiple instances
- Multi cluster monitor
- Redis, Disque, IronMQ, SQS, MongoDB or ORM
- Rollbar and Sentry support
Requirements
- Django > = 2.2
- Django-picklefield
- Arrow
- Blessed
Tested with: Python 3.7, 3.8 Django 2.2.X and 3.1.X
Warning
Since Python 3.7 async became a reserved keyword and was refactored to async_task
Brokers
Installation
Install the latest version with pip:
$ pip install django-q
Add django_q to your INSTALLED_APPS in your projects settings.py:
INSTALLED_APPS = ( # other apps 'django_q', )
Run Django migrations to create the database tables:
$ python manage.py migrate
Choose a message broker , configure and install the appropriate client library.
Read the full documentation at https://django-q.readthedocs.org
Configuration
All configuration settings are optional. e.g:
# settings.py example
Q_CLUSTER = {
'name': 'myproject',
'workers': 8,
'recycle': 500,
'timeout': 60,
'compress': True,
'cpu_affinity': 1,
'save_limit': 250,
'queue_limit': 500,
'label': 'Django Q',
'redis': {
'host': '127.0.0.1',
'port': 6379,
'db': 0, }
}
For full configuration options, see the configuration documentation.
Management Commands
Start a cluster with:
$ python manage.py qcluster
Monitor your clusters with:
$ python manage.py qmonitor
Check overall statistics with:
$ python manage.py qinfo
Creating Tasks
Use async_task from your code to quickly offload tasks:
from django_q.tasks import async_task, result
# create the task
async_task('math.copysign', 2, -2)
# or with a reference
import math.copysign
task_id = async_task(copysign, 2, -2)
# get the result
task_result = result(task_id)
# result returns None if the task has not been executed yet
# you can wait for it
task_result = result(task_id, 200)
# but in most cases you will want to use a hook:
async_task('math.modf', 2.5, hook='hooks.print_result')
# hooks.py
def print_result(task):
print(task.result)
For more info see Tasks
Schedule
Schedules are regular Django models. You can manage them through the Admin page or directly from your code:
# Use the schedule function
from django_q.tasks import schedule
schedule('math.copysign',
2, -2,
hook='hooks.print_result',
schedule_type=Schedule.DAILY)
# Or create the object directly
from django_q.models import Schedule
Schedule.objects.create(func='math.copysign',
hook='hooks.print_result',
args='2,-2',
schedule_type=Schedule.DAILY
)
# Run a task every 5 minutes, starting at 6 today
# for 2 hours
import arrow
schedule('math.hypot',
3, 4,
schedule_type=Schedule.MINUTES,
minutes=5,
repeats=24,
next_run=arrow.utcnow().replace(hour=18, minute=0))
# Use a cron expression
schedule('math.hypot',
3, 4,
schedule_type=Schedule.CRON,
cron = '0 22 * * 1-5')
For more info check the Schedules documentation.
Testing
To run the tests you will need the following in addition to install requirements:
- py.test
- pytest-django
- disque from https://github.com/antirez/disque.git
- Redis
- MongoDB
The following commands can be used to run the tests:
# Create virtual environment
python -m venv venv
# Install requirements
venv/bin/pip install -r requirements.txt
# Install test dependencies
venv/bin/pip install pytest pytest-django
# Install django-q
venv/bin/python setup.py develop
# Run required services (you need to have docker-compose installed)
docker-compose -f test-services-docker-compose.yaml up -d
# Run tests
venv/bin/pytest
# Stop the services required by tests (when you no longer plan to run tests)
docker-compose -f test-services-docker-compose.yaml down
Locale
Currently available in English and French. Translation pull requests are always welcome.
Todo
- Better tests and coverage
- Less dependencies?