A Python concurrency scheduling library, compatible with asyncio and trio.

Overview

aiometer

Build Status Coverage Python versions Package version

aiometer is a Python 3.6+ concurrency scheduling library compatible with asyncio and trio and inspired by Trimeter. It makes it easier to execute lots of tasks concurrently while controlling concurrency limits (i.e. applying backpressure) and collecting results in a predictable manner.

Content

Example

Let's use HTTPX to make web requests concurrently...

Try this code interactively using IPython.

>> requests = [ ... httpx.Request("POST", "https://httpbin.org/anything", json={"index": index}) ... for index in range(100) ... ] ... >>> # Send requests, and process responses as they're made available: >>> async with aiometer.amap( ... functools.partial(fetch, client), ... requests, ... max_at_once=10, # Limit maximum number of concurrently running tasks. ... max_per_second=5, # Limit request rate to not overload the server. ... ) as results: ... async for data in results: ... print(data) ... {'index': 3} {'index': 4} {'index': 1} {'index': 2} {'index': 0} ... >>> # Alternatively, fetch and aggregate responses into an (ordered) list... >>> jobs = [functools.partial(fetch, client, request) for request in requests] >>> results = await aiometer.run_all(jobs, max_at_once=10, max_per_second=5) >>> results [{'index': 0}, {'index': 1}, {'index': 2}, {'index': 3}, {'index': 4}, ...]">
>>> import asyncio
>>> import functools
>>> import random
>>> import aiometer
>>> import httpx
>>>
>>> client = httpx.AsyncClient()
>>>
>>> async def fetch(client, request):
...     response = await client.send(request)
...     # Simulate extra processing...
...     await asyncio.sleep(2 * random.random())
...     return response.json()["json"]
...
>>> requests = [
...     httpx.Request("POST", "https://httpbin.org/anything", json={"index": index})
...     for index in range(100)
... ]
...
>>> # Send requests, and process responses as they're made available:
>>> async with aiometer.amap(
...     functools.partial(fetch, client),
...     requests,
...     max_at_once=10, # Limit maximum number of concurrently running tasks.
...     max_per_second=5,  # Limit request rate to not overload the server.
... ) as results:
...     async for data in results:
...         print(data)
...
{'index': 3}
{'index': 4}
{'index': 1}
{'index': 2}
{'index': 0}
...
>>> # Alternatively, fetch and aggregate responses into an (ordered) list...
>>> jobs = [functools.partial(fetch, client, request) for request in requests]
>>> results = await aiometer.run_all(jobs, max_at_once=10, max_per_second=5)
>>> results
[{'index': 0}, {'index': 1}, {'index': 2}, {'index': 3}, {'index': 4}, ...]

Installation

This project is in beta and maturing. Be sure to pin any dependencies to the latest minor.

pip install "aiometer==0.3.*"

Features

  • Concurrency management and throttling helpers.
  • asyncio and trio support.
  • Fully type annotated.
  • 100% test coverage.

Guide

Flow control

The key highlight of aiometer is allowing you to apply flow control strategies in order to limit the degree of concurrency of your programs.

There are two knobs you can play with to fine-tune concurrency:

  • max_at_once: this is used to limit the maximum number of concurrently running tasks at any given time. (If you have 100 tasks and set max_at_once=10, then aiometer will ensure that no more than 10 run at the same time.)
  • max_per_second: this option limits the number of tasks spawned per second. This is useful to not overload I/O resources, such as servers that may have a rate limiting policy in place.

Example usage:

>>> import asyncio
>>> import aiometer
>>> async def make_query(query):
...     await asyncio.sleep(0.05)  # Simulate a database request.
...
>>> queries = ['SELECT * from authors'] * 1000
>>> # Allow at most 5 queries to run concurrently at any given time:
>>> await aiometer.run_on_each(make_query, queries, max_at_once=5)
...
>>> # Make at most 10 queries per second:
>>> await aiometer.run_on_each(make_query, queries, max_per_second=10)
...
>>> # Run at most 10 concurrent jobs, spawning new ones at least every 5 seconds:
>>> async def job(id):
...     await asyncio.sleep(10)  # A very long task.
...
>>> await aiometer.run_on_each(job, range(100),  max_at_once=10, max_per_second=0.2)

Running tasks

aiometer provides 4 different ways to run tasks concurrently in the form of 4 different run functions. Each function accepts all the options documented in Flow control, and runs tasks in a slightly different way, allowing to address a variety of use cases. Here's a handy table for reference:

Entrypoint Use case
run_on_each() Execute async callbacks in any order.
run_all() Return results as an ordered list.
amap() Iterate over results as they become available.
run_any() Return result of first completed function.

To illustrate the behavior of each run function, let's first setup a hello world async program:

>> async def greet(name): ... greeting = await get_greeting(name) ... print(greeting) ... >>> names = ["Robert", "Carmen", "Lucas"]">
>>> import asyncio
>>> import random
>>> from functools import partial
>>> import aiometer
>>>
>>> async def get_greeting(name):
...     await asyncio.sleep(random.random())  # Simulate I/O
...     return f"Hello, {name}"
...
>>> async def greet(name):
...     greeting = await get_greeting(name)
...     print(greeting)
...
>>> names = ["Robert", "Carmen", "Lucas"]

Let's start with run_on_each(). It executes an async function once for each item in a list passed as argument:

>>> await aiometer.run_on_each(greet, names)
'Hello, Robert!'
'Hello, Lucas!'
'Hello, Carmen!'

If we'd like to get the list of greetings in the same order as names, in a fashion similar to Promise.all(), we can use run_all():

>>> await aiometer.run_all([partial(get_greeting, name) for name in names])
['Hello, Robert', 'Hello, Carmen!', 'Hello, Lucas!']

amap() allows us to process each greeting as it becomes available (which means maintaining order is not guaranteed):

>>> async with aiometer.amap(get_greeting, names) as greetings:
...     async for greeting in greetings:
...         print(greeting)
'Hello, Lucas!'
'Hello, Robert!'
'Hello, Carmen!'

Lastly, run_any() can be used to run async functions until the first one completes, similarly to Promise.any():

>>> await aiometer.run_any([partial(get_greeting, name) for name in names])
'Hello, Carmen!'

As a last fun example, let's use amap() to implement a no-threads async version of sleep sort:

>>> import asyncio
>>> from functools import partial
>>> import aiometer
>>> numbers = [0.3, 0.1, 0.6, 0.2, 0.7, 0.5, 0.5, 0.2]
>>> async def process(n):
...     await asyncio.sleep(n)
...     return n
...
>>> async with aiometer.amap(process, numbers) as results:
...     sorted_numbers = [n async for n in results]
...
>>> sorted_numbers
[0.1, 0.2, 0.2, 0.3, 0.5, 0.5, 0.6, 0.7]

How To

Multiple parametrized values in run_on_each and amap

run_on_each and amap only accept functions that accept a single positional argument (i.e. (Any) -> Awaitable).

So if you have a function that is parametrized by multiple values, you should refactor it to match this form.

This can generally be achieved like this:

  1. Build a proxy container type (eg. a namedtuple), eg T.
  2. Refactor your function so that its signature is now (T) -> Awaitable.
  3. Build a list of these proxy containers, and pass it to aiometer.

For example, assuming you have a function that processes X/Y coordinates...

async def process(x: float, y: float) -> None:
    pass

xs = list(range(100))
ys = list(range(100))

for x, y in zip(xs, ys):
    await process(x, y)

You could use it with amap by refactoring it like this:

from typing import NamedTuple

# Proxy container type:
class Point(NamedTuple):
    x: float
    y: float

# Rewrite to accept a proxy as a single positional argument:
async def process(point: Point) -> None:
    x = point.x
    y = point.y
    ...

xs = list(range(100))
ys = list(range(100))

# Build a list of proxy containers:
points = [Point(x, y) for x, y in zip(x, y)]

# Use it:
async with aiometer.amap(process, points) as results:
    ...

License

MIT

Comments
  • Interaction with tenacity retry

    Interaction with tenacity retry

    Hi,

    I was wondering whether aiometer interacts well with tenacity.retry? Specifically, I would like to rate limit a large number of requests and retry requests.

    async with aiometer.amap(..., max_per_second=10)

    question 
    opened by Midnighter 5
  • Simplifying amap usage

    Simplifying amap usage

    Reading through the docs it came to my mind that the async with usage of amap doesn’t β€œlook necessary” from a UX perspective.

    When using it we always have to do two things:

    • Enter a context with async with (looks like an implementation detail)
    • Iterate over results with async for

    As a user really only care about the second operation.

    So what if we moved async with inside the implementation of amap, so that users can just do...

    results = [result async for result in amap(process, items)]
    
    opened by florimondmanca 3
  • requests as list

    requests as list

    I'm using your library to try and drop catch some domain names, so I query the api with the same request several times per second. Do I have to always generate a list of requests with the same element repeated thousands of times or there is a more efficient way to do it? Thanks

    question 
    opened by lordcris 2
  • Switch to generic cell rate algorithm (GCRA) for `max_per_second`

    Switch to generic cell rate algorithm (GCRA) for `max_per_second`

    As suggested by @pgjones on Twitter πŸ˜„

    Used thes resources for the implementation:

    • https://gitlab.com/pgjones/quart-rate-limiter/-/blob/master/src/quart_rate_limiter/init.py#L252-282
    • https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm#Virtual_scheduling_description

    Also found a way to make tests for max_per_second much simpler/more reliable.

    opened by florimondmanca 2
  • Fix `ValueError: max() arg is an empty sequence`

    Fix `ValueError: max() arg is an empty sequence`

    Sometimes in this function :

    https://github.com/florimondmanca/aiometer/blob/17c8b70ce02838927f7f97bb7510c30e9f731e78/src/aiometer/_impl/utils.py#L8-L12

    The dct param is {}, so the max(dct) throws a ValueError: max() arg is an empty sequence. I just added a check before calling max().

    opened by mxrch 1
  • Expand docs: API reference, contributing guide

    Expand docs: API reference, contributing guide

    Thought we'd need a clearer listing of available pieces of public API. Adding a small contributing guide and a slight restructuring of the table of contents too.

    documentation 
    opened by florimondmanca 1
  • Release 0.3.0

    Release 0.3.0

    0.3.0 - 2021-07-05

    Changed

    • Update anyio dependency to v3 (previously v1). (Pull #25)
      • NB: no API change, but dependency mismatches may occur. Be sure to port your codebase to anyio v3 before upgrading aiometer.

    Added

    • Add support for Python 3.6 (installs the contextlib2 backport library there). (Pull #26)
    • Officialize support for Python 3.9. (Pull #26)
    opened by florimondmanca 1
  • Add a Gitter chat badge to README.md

    Add a Gitter chat badge to README.md

    florimondmanca/aiometer now has a Chat Room on Gitter

    @florimondmanca has just created a chat room. You can visit it here: https://gitter.im/florimondmanca-oss/aiometer.

    This pull-request adds this badge to your README.md:

    Gitter

    If my aim is a little off, please let me know.

    Happy chatting.

    PS: Click here if you would prefer not to receive automatic pull-requests from Gitter in future.

    opened by gitter-badger 1
  • Handling of exceptions and documentation

    Handling of exceptions and documentation

    Hi! Thank you for writing this useful library!

    I couldn't find any documentation about exceptions. Can we use aiometer.run_all and make it continue even if there are exceptions so that we can gather them all at the end?

    Thank you.

    opened by fersarr 0
  • Migrate to release-on-tag

    Migrate to release-on-tag

    I pushed tag 0.3.0 and associated release assuming it would auto-release to PyPI like for some other packages (asgi-lifespan, arel, etc), but the tooling is not in place yet. This PR moves to using the publish stage AZP template.

    opened by florimondmanca 0
  • Release 0.2.1

    Release 0.2.1

    0.2.1 - 2020-03-26

    Fixed

    • Improve robustness of the max_per_second implementation by using the generic cell rate algorithm (GCRA) instead of leaky bucket. (Pull #5)
    opened by florimondmanca 0
  • chore: relax typing-extensions version specifier

    chore: relax typing-extensions version specifier

    Relax typing-extensions version specifier to support typing-extensions v4. (#31)

    I use a library which relies on typing-extensions v4 and it will be great if aiometer allows to use that. (I tested with typing-extensions v4 and there is no issue)

    opened by ninoseki 2
  • Best practice for handling rate limits

    Best practice for handling rate limits

    I have bit of code which i have added aiometer. What iam trying to understand is how best to optimize it to get the maximum performance. The API iam calling has a rate limit of 300 requests per minute hence setting the max_per_second to 5 (if i understand this correctly) should be within the limits however, i hit 429's even then. My code is below:

    async def async_iterate_paginated_api(
        function: Callable[..., Awaitable[Any]], **kwargs: Any
    ) -> AsyncGenerator[List[Any], None]:
        """Return an async iterator over the results of any API."""
        page_number = 1
        response = await function(**kwargs, pageNumber=page_number)
        yield response.get("entities") 
        total_pages = response.get('pageCount')
        pages = [
            page
            for page in range(2, total_pages + 1)
        ]
        async def process(function: Callable[..., Awaitable[Any]], kwarg: Any, page: Any):
            response = await function(**kwarg,  pageNumber=page)
            return response.get('entities')
    
        async with aiometer.amap(functools.partial(process, function, kwargs), pages, max_per_second=5) as results:
            async for result in results:
                yield result
    

    Any feedback would be helpful.

    opened by iserialize 1
  • Update requirements

    Update requirements

    Could you update or loosen the version specifications in the requirements of the package? Specifically the restriction for typing-extensions~=3.10 is conflicting for me, as I need a newer version.

    good first issue 
    opened by nardi 1
  • swap the stream contexts with the taskgroup context

    swap the stream contexts with the taskgroup context

            with send_channel, receive_channel:
                async with anyio.create_task_group() as task_group:
    

    might be better as

            async with anyio.create_task_group() as task_group:
                with send_channel, receive_channel:
    

    this way closing the amap context manager (None, None, None)ly would allow currently running tasks to finish, and prevent new tasks being added. Closing the amap context manager (type[T], T, tb)ly would still cancel all tasks

    opened by graingert 5
Releases(0.3.0)
  • 0.3.0(Jul 6, 2021)

    0.3.0 - 2021-07-06

    Changed

    • Update anyio dependency to v3 (previously v1). (Pull #25)
      • NB: no API change, but dependency mismatches may occur. Be sure to port your codebase to anyio v3 before upgrading aiometer.

    Added

    • Add support for Python 3.6 (installs the contextlib2 backport library there). (Pull #26)
    • Officialize support for Python 3.9. (Pull #26)
    Source code(tar.gz)
    Source code(zip)
Owner
Florimond Manca
Pythonista, open source developer, casual tech blogger. Idealist on a journey, and it’s good fun!
Florimond Manca
A task scheduler with task scheduling, timing and task completion time tracking functions

A task scheduler with task scheduling, timing and task completion time tracking functions. Could be helpful for time management in daily life.

ArthurLCW 0 Jan 15, 2022
CoSA: Scheduling by Constrained Optimization for Spatial Accelerators

CoSA is a scheduler for spatial DNN accelerators that generate high-performance schedules in one shot using mixed integer programming

UC Berkeley Architecture Research 44 Dec 13, 2022
Clepsydra is a mini framework for task scheduling

Intro Clepsydra is a mini framework for task scheduling All parts are designed to be replaceable. Main ideas are: No pickle! Tasks are stored in reada

Andrey Tikhonov 15 Nov 4, 2022
Aiorq is a distributed task queue with asyncio and redis

Aiorq is a distributed task queue with asyncio and redis, which rewrite from arq to make improvement and include web interface.

PY-GZKY 5 Mar 18, 2022
generate HPC scheduler systems jobs input scripts and submit these scripts to HPC systems and poke until they finish

DPDispatcher DPDispatcher is a python package used to generate HPC(High Performance Computing) scheduler systems (Slurm/PBS/LSF/dpcloudserver) jobs in

DeepModeling 23 Nov 30, 2022
Crontab jobs management in Python

Plan Plan is a Python package for writing and deploying cron jobs. Plan will convert Python code to cron syntax. You can easily manage you

Shipeng Feng 1.2k Dec 28, 2022
A powerful workflow engine implemented in pure Python

Spiff Workflow Summary Spiff Workflow is a workflow engine implemented in pure Python. It is based on the excellent work of the Workflow Patterns init

Samuel 1.3k Jan 8, 2023
Python-Repeated-Timer is an open-source & highly performing timer using only standard-libraries.

Python Repeated Timer Python-Repeated-Timer is an open-source & highly performing timer using only standard-libraries.

TACKHYUN JUNG 3 Oct 9, 2022
Automate SQL Jobs Monitoring with python

Automate_SQLJobsMonitoring_python Using python 3rd party modules we can automate

Aejaz Ayaz 1 Dec 27, 2021
A simple scheduler tool that provides desktop notifications about classes and opens their meet links in the browser automatically at the start of the class.

This application provides desktop notifications about classes and opens their meet links in browser automatically at the start of the class.

Anshit 14 Jun 29, 2022
Here is the live demonstration of endpoints and celery worker along with RabbitMQ

whelp-task Here is the live demonstration of endpoints and celery worker along with RabbitMQ Before running the application make sure that you have yo

Yalchin403 0 Nov 14, 2021
Trio – a friendly Python library for async concurrency and I/O

Trio – a friendly Python library for async concurrency and I/O The Trio project aims to produce a production-quality, permissively licensed, async/awa

null 5k Jan 7, 2023
Fully Automated YouTube Channel ▢️with Added Extra Features.

Fully Automated Youtube Channel β–’β–ˆβ–€β–€β–ˆ β–ˆβ–€β–€β–ˆ β–€β–€β–ˆβ–€β–€ β–€β–€β–ˆβ–€β–€ β–ˆβ–‘β–‘β–ˆ β–ˆβ–€β–€β–„ β–ˆβ–€β–€ β–ˆβ–€β–€β–ˆ β–’β–ˆβ–€β–€β–„ β–ˆβ–‘β–‘β–ˆ β–‘β–‘β–ˆβ–‘β–‘ β–‘β–’β–ˆβ–‘β–‘ β–ˆβ–‘β–‘β–ˆ β–ˆβ–€β–€β–„ β–ˆβ–€β–€ β–ˆβ–„β–„β–€ β–’β–ˆβ–„β–„β–ˆ β–€β–€β–€β–€ β–‘β–‘β–€β–‘β–‘ β–‘β–’β–ˆβ–‘β–‘ β–‘β–€β–€β–€ β–€β–€β–€β–‘

sam-sepiol 249 Jan 2, 2023
A SOCKS proxy server implemented with the powerful python cooperative concurrency framework asyncio.

asyncio-socks-server A SOCKS proxy server implemented with the powerful python cooperative concurrency framework asyncio. Features Supports both TCP a

Amaindex 164 Dec 30, 2022
An asyncio compatible Redis driver, written purely in Python. This is really just a pet-project for me.

asyncredis An asyncio compatible Redis driver. Just a pet-project. Information asyncredis is, like I've said above, just a pet-project for me. I reall

Vish M 1 Dec 25, 2021
Coroutine-based concurrency library for Python

gevent Read the documentation online at http://www.gevent.org. Post issues on the bug tracker, discuss and ask open ended questions on the mailing lis

gevent 5.9k Dec 28, 2022
TriOTP, the OTP framework for Python Trio

TriOTP, the OTP framework for Python Trio See documentation for more informations. Introduction This project is a simplified implementation of the Erl

David Delassus 7 Nov 21, 2022
asyncio compatible driver for elasticsearch

asyncio client library for elasticsearch aioes is a asyncio compatible library for working with Elasticsearch The project is abandoned aioes is not su

null 97 Sep 5, 2022
Lightweight asyncio compatible utilities for consuming broker messages.

A simple asyncio compatible consumer for handling amqp messages.

Mehdi Kamani 3 Apr 10, 2022
SNV calling pipeline developed explicitly to process individual or trio vcf files obtained from Illumina based pipeline (grch37/grch38).

SNV Pipeline SNV calling pipeline developed explicitly to process individual or trio vcf files obtained from Illumina based pipeline (grch37/grch38).

East Genomics 1 Nov 2, 2021