Framework for creating efficient data processing pipelines



Framework for creating efficient data processing pipelines.


Feel free to ask questions in telegram

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask

class MyModel:
    """This is CPU bound model example."""
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        self.number = number
        self.sum = None  # result will be here
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await
        task = Task(int(number))
        return web.json_response(data={'result': task.sum})

def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
    app.router.add_post('/sum', SumView)

    return app

if __name__ == '__main__':


Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example

class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None

class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    OVERHEAD_TIME = 0.02

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)

class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict

def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]

async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))

tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,

await flow_with_batch_handler.stop()


The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log

if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
  • Allowed different multiprocessing start method

    Allowed different multiprocessing start method

    • CUDA runtime does not support the fork start method, so other MP start methods are required see comment here

    • Lambda function as the default for handle_condition cannot be pickled (required for both "spawn" and "forkserver" start methods)

    opened by dzhvansky 5
  • Clarify use cases in README

    Clarify use cases in README

    Current intro in README seems a bit misleading

    Framework for creating efficient data processing pipelines.

    As "data pipelines" already have strong association with some kind of ETL processes and frameworks like luigi, airflow, pyspark, etc.

    I suggest getting rid of "data pipelines" term in README and adding explicit use cases somewhere in the beginning of the docs.

    opened by vvkh 2
  • torch.cuda.get_device_name -> Cannot re-initialize CUDA in forked subprocess

    torch.cuda.get_device_name -> Cannot re-initialize CUDA in forked subprocess


    The call of torch.cuda.get_device_name() results in an error

    [2021-10-28 11:27:22,347] INFO [Flow] [pid:3602] Flow is starting
    [2021-10-28 11:27:22,350] INFO [Flow] [pid:3602] Created step <__main__.SimpleHandler object at 0x7fec940ef280>, queue_in: <multiprocessing.queues.Queue object at 0x7fec940ef340>, queue_out:<multiprocessing.queues.Queue object at 0x7fec8c0d3eb0>
    [2021-10-28 11:27:22,351] INFO [Flow] [pid:3606] [Worker] initialising handler SimpleHandler
    [2021-10-28 11:27:22,351] INFO [Flow] [pid:3602] Flow was started
    [2021-10-28 11:27:22,353] ERROR [Flow] [pid:3606] Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/aqueduct/", line 61, in _wrap
        fn(i, *args)
      File "/usr/local/lib/python3.8/site-packages/aqueduct/", line 120, in loop
      File "/usr/local/lib/python3.8/site-packages/aqueduct/", line 75, in _start
      File "", line 15, in on_start
        torch.zeros((10, 10), device=torch.device('cuda:0'))
      File "/usr/local/lib/python3.8/site-packages/torch/cuda/", line 147, in _lazy_init
        raise RuntimeError(
    RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/aqueduct/", line 61, in _wrap
        fn(i, *args)
      File "/usr/local/lib/python3.8/site-packages/aqueduct/", line 120, in loop
      File "/usr/local/lib/python3.8/site-packages/aqueduct/", line 75, in _start
      File "", line 15, in on_start
        torch.zeros((10, 10), device=torch.device('cuda:0'))
      File "/usr/local/lib/python3.8/site-packages/torch/cuda/", line 147, in _lazy_init
        raise RuntimeError(
    RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method
    [2021-10-28 11:27:23,353] ERROR [Flow] [pid:3602] The process 3606 for SimpleHandler handler is dead
    [2021-10-28 11:27:23,353] INFO [Flow] [pid:3602] Flow is stopping

    aqueduct-1.7.1, base example:

    import asyncio
    import torch
    from aqueduct.flow import Flow, FlowStep
    from aqueduct.handler import BaseTaskHandler
    from aqueduct.task import BaseTask
    class SimpleHandler(BaseTaskHandler):
        def on_start(self):
            torch.zeros((10, 10), device=torch.device('cuda:0'))
        def handle(self, *tasks: BaseTask):
    def main():
        flow = Flow(FlowStep(SimpleHandler()))
        coro = flow.process(BaseTask())
    if __name__ == '__main__':
    opened by avito-ds 1
  • Dynamic batch

    Dynamic batch

    When batch_timeout is set to 0 use alternative batching stratagy. Instead of waiting fixed timeout while batch is ready, grab all task6 that are curentely in input queue and process them right away. We effectively using queue as batch buffer.

    opened by artemcpp 0
  • Posibility to not create subprocess for flowstep

    Posibility to not create subprocess for flowstep

    Sometime we do not need run step in separate process and don't spend time moving code into another step. So it will be good to have posibility to attach flowstep to previou or to following process. It my be like a parameter for FlowStep: attach=Flow.TO_PREVIOUS | Flow.TO_NEXT

    opened by bugrimov 0
  • Add support for pytest-cov

    Add support for pytest-cov

    For fully support coverage count by pytest-cov, we need to join() supbrocess when stopping flow.

    opened by bugrimov 0
Owner engineering team open source projects
EasyBuild is a software build and installation framework that allows you to manage (scientific) software on High Performance Computing (HPC) systems in an efficient way.

EasyBuild is a software build and installation framework that allows you to manage (scientific) software on High Performance Computing (HPC) systems in an efficient way.

EasyBuild community 87 Dec 27, 2022
redun aims to be a more expressive and efficient workflow framework

redun yet another redundant workflow engine redun aims to be a more expressive and efficient workflow framework, built on top of the popular Python pr

insitro 372 Jan 4, 2023
Project5 Data processing system

Project5-Data-processing-system User just needed to copy both these file to a folder and open using cmd or using any python ide. It is to

null 1 Nov 23, 2021
Python library for creating PEG parsers

PyParsing -- A Python Parsing Module Introduction The pyparsing module is an alternative approach to creating and executing simple grammars, vs. the t

Pyparsing 1.7k Jan 3, 2023
Attempt at creating organized collection of little handy snippets of code I'm receiving along the way

ChaosCode Attempt at creating organized collection of little handy snippets of code I'm receiving along the way I always considered coding and program

INFU 4 Nov 26, 2022
Python module for creating the circuit simulation definitions for Elmer FEM

elmer_circuitbuilder Python module for creating the circuit simulation definitions for Elmer FEM. The circuit definitions enable easy setup of coils (

null 5 Oct 3, 2022
These are After Effects and Python files that were made in the process of creating the video for the contest.

spirograph These are After Effects and Python files that were made in the process of creating the video for the contest. In the python file you can qu

null 91 Dec 7, 2022
Collaboration project to creating bank application maded by Anzhelica Sakun and Yuriy Konyukh

Collaboration project to creating bank application maded by Anzhelica Sakun and Yuriy Konyukh

Yuriy 1 Jan 8, 2022
In the works, creating a new Chess Board and way to Play...

sWJz4Chess date started on 11-13-2021 In the works, creating a new Chess Board and way to Play... starting to write this in Pygame, any ind

Shawn 2 Nov 18, 2021
Grimoire is a Python library for creating interactive fiction as hyperlinked html.

Grimoire Grimoire is a Python library for creating interactive fiction as hyperlinked html. Installation pip install grimoire-if Usage Check out the

Scott Russell 5 Oct 11, 2022
Python library for creating and parsing HSReplay XML files

python-hsreplay A python module for HSReplay support. Installation The library is available on PyPI. pip install hsre

HearthSim 45 Mar 28, 2022
Make creating Excel XLSX files fun again

Poi: Make creating Excel XLSX files fun again. Poi helps you write Excel sheet in a declarative way, ensuring you have a better Excel writing experien

Ryan Wang 11 Apr 1, 2022
A script for creating battle animations in FEGBA format.

AA2 Made by Huichelaar. I heavily referenced FEBuilderGBA. I also referenced circleseverywhere's Animation Assembler. This is also where I took lzss.p

null 2 May 31, 2022
Bootstraparse is a personal project started with a specific goal in mind: creating static html pages for direct display from a markdown-like file

Bootstraparse is a personal project started with a specific goal in mind: creating static html pages for direct display from a markdown-like file

null 1 Jun 15, 2022
Stopmagic gives you the power of creating amazing Stop Motion animations faster and easier than ever before.

Stopmagic gives you the power of creating amazing Stop Motion animations faster and easier than ever before. This project is maintained by Aldrin Mathew.

Aldrin's Art Factory 67 Dec 31, 2022
Viewflow is an Airflow-based framework that allows data scientists to create data models without writing Airflow code.

Viewflow Viewflow is a framework built on the top of Airflow that enables data scientists to create materialized views. It allows data scientists to f

DataCamp 114 Oct 12, 2022
A Pythonic Data Catalog powered by Ray that brings exabyte-level scalability and fast, ACID-compliant, change-data-capture to your big data workloads.

DeltaCAT DeltaCAT is a Pythonic Data Catalog powered by Ray. Its data storage model allows you to define and manage fast, scalable, ACID-compliant dat

null 45 Oct 15, 2022
Tools for downloading and processing numerical weather predictions

NWP Tools for downloading and processing numerical weather predictions At the moment, this code is focused on downloading historical UKV NWPs produced

Open Climate Fix 6 Nov 24, 2022
Small Arrow Vortex clipboard processing library

Description Small Arrow Vortex clipboard processing library. Install You can install this library from PyPI with pip install av-clipboard-lib or compi

Delta Epsilon 1 Dec 18, 2021