Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
An example file showing a simple endpoints like a login/logout function and maybe some others.

Flask API Example An example project showing a simple endpoints like a login/logout function and maybe some others. How to use: Open up your IDE (or u

Kevin 1 Oct 27, 2021
contextlib2 is a backport of the standard library's contextlib module to earlier Python versions.

contextlib2 is a backport of the standard library's contextlib module to earlier Python versions. It also sometimes serves as a real world proving gro

Jazzband 35 Dec 23, 2022
Binary++ is an esoteric programming language based on* binary

Binary++ is an esoteric programming language based on* binary. * It's meant to be based on binary, but you can write Binary++ code using different mea

Supercolbat 3 Feb 18, 2022
Experiments with Tox plugin system

The project is an attempt to add to the tox some missing out of the box functionality. Basically it is just an extension for the tool that will be loa

Volodymyr Vitvitskyi 30 Nov 26, 2022
A simple flashcard app built as a final project for a databases class.

CS2300 Final Project - Flashcard app 'FlashStudy' Tech stack Backend Python (Language) Django (Web framework) SQLite (Database) Frontend HTML/CSS/Java

Christopher Spencer 2 Feb 03, 2022
A python script to get your activity

activities A python script to get your activity Not complete Requirements Python (=3.7) Pip (for python = 3.7) Git Pip packages psutil asyncio aioht

StarNumber 3 Nov 07, 2021
Create standalone, installable R Shiny apps using Electron

Create standalone, installable R Shiny apps using Electron

Chase Clark 5 Dec 24, 2021
A set of tools for ripping music from Konami mobile games

Konami Mobile Ripping Toolset A set of tools for ripping music from Konami mobile games Contents nigger.py for niggering konami's website, ripping all

5 Oct 20, 2022
Advent of Code is an Advent calendar of small programming puzzles for a variety of skill sets and skill levels that can be solved in any programming language you like.

Advent Of Code 2021 - Python English Advent of Code is an Advent calendar of small programming puzzles for a variety of skill sets and skill levels th

Coral Izquierdo Muñiz 2 Jan 09, 2022
Run Python code right in your Telegram messages

Run Python code right in your Telegram messages Made with Telethon library, TGPy is a tool for evaluating expressions and Telegram API scripts. Instal

29 Nov 22, 2022
The ROS package for Airbotics.

airbotics The ROS package for Airbotics: Developed for ROS 1 and Python 3.8. The package has not been officially released on ROS yet so manual install

Airbotics 19 Dec 25, 2022
A Tool to validate domestic New Zealand vaccine passes

Vaccine Validator Tool to validate domestic New Zealand vaccine passes Create a new virtual environment: python3 -m venv ./venv Activate virtual envi

8 May 01, 2022
A redesign of our previous Python World Cup, aiming to simulate the 2022 World Cup all the way from the qualifiers

A redesign of our previous Python World Cup, aiming to simulate the 2022 World Cup all the way from the qualifiers. This new version is designed to be more compact and more efficient and will reflect

Sam Counsell 1 Jan 07, 2022
Import modules and files straight from URLs.

Import Python code from modules straight from the internet.

Nate 2 Jan 15, 2022
Adversarial Robustness with Non-uniform Perturbations

Adversarial Robustness with Non-uniform Perturbations This repository hosts the code to replicate experiments of the paper Adversarial Robustness with

5 May 20, 2022
HomeAssistant Linux Companion

Application to run on linux desktop computer to provide sensors data to homeasssistant, and get notifications as if it was a mobile device.

Javier Lopez 10 Dec 27, 2022
Bootcamp de Introducción a la Programación. Módulo 6: Matemáticas Discretas

Módulo 6: Matemáticas Discretas Última actualización: 12 de marzo Irónicamente, las matemáticas discretas son las matemáticas que lo cuentan todo. Si

Cynthia Castillo 34 Sep 29, 2022
The calculator on Python.

Calculator Contributors: Delitanast An official website. Information Hello! I am Damir. It`s my first Python project. I think you want see this. I imp

3 Mar 13, 2022
App to get data from popular polish pages with job offers

Job board parser I written simple app to get me data from popular pages with job offers, because I wanted to knew immidietly if there is some new offe

0 Jan 04, 2022
tagls is a language server based on gtags.

tagls tagls is a language server based on gtags. Why I wrote it? Almost all modern editors have great support to LSP, but language servers based on se

daquexian 31 Dec 01, 2022