Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
Python version of RocketLeague-Dropshot-Calculated-shot

Python version of RocketLeague-Dropshot-Calculated-shot. This is just to demo around and a tool I used to develop the actual plugin.

JareBear 1 Jan 14, 2022
Linux Security and Monitoring Scripts

Linux Security and Monitoring Scripts These are a collection of security and monitoring scripts you can use to monitor your Linux installation for sec

Andre Pawlowski 65 Aug 27, 2022
Simple script to match riders with drivers.

theBestPooler Simple script to match riders with drivers. It's a greedy, unoptimised search, so no guarantees that it works. It just seems to work (ve

Devansh 1 Nov 22, 2021
A country information finder module

A country information finder module

Fayas Noushad 3 Nov 28, 2021
Birthday program - A program that lookups a birthday txt file and compares to the current date to check for birthdays

Birthday Program This is a program that lookups a birthday txt file and compares

Daquiver 4 Feb 02, 2022
Starscape is a Blender add-on for adding stars to the background of a scene.

Starscape Starscape is a Blender add-on for adding stars to the background of a scene. Features The add-on provides the following features: Procedural

Marco Rossini 5 Jun 24, 2022
COVID-19 case tracker in Dash

covid_dashy_personal This is a personal project to build a simple COVID-19 tracker for Australia with Dash. Key functions of this dashy will be to Dis

Jansen Zhang 1 Nov 30, 2021
Pyrmanent - Make all your classes permanent in a flash 💾

Pyrmanent A base class to make your Python classes permanent in a flash. Features Easy to use. Great compatibility. No database needed. Ask for new fe

Sergio Abad 4 Jan 07, 2022
An extensive password manager built using Python, multiple implementations. Something to meet everyone's taste.

An awesome open-sourced password manager! Explore the docs » View Demo · Report Bug · Request Feature 🐍 Python Password Manager 🔐 An extensive passw

Sam R 7 Sep 28, 2021
🔵Open many google dorks in a fasted way

Dorkinho 🔵 The author is not responsible for misuse of the tool, use it in good practices like Pentest and CTF OSINT challenges. Dorkinho is a script

SidHawks 2 May 02, 2022
An open source server for Super Mario Bros. 35

SMB35 A custom server for Super Mario Bros. 35 This server is highly experimental. Do not expect it to work without flaws.

Yannik Marchand 162 Dec 07, 2022
Calculate the efficient frontier

关于 代码主要参考Fábio Neves的文章,你可以在他的文章中找到一些细节性的解释

Wyman Lin 104 Nov 11, 2022
Explore related sequences in the OEIS

OEIS explorer This is a tool for exploring two different kinds of relationships between sequences in the OEIS: mentions (links) of other sequences on

Alex Hall 6 Mar 15, 2022
A simple and convenient build-and-run system for C and C++.

smake Smake is a simple and convenient build-and-run system for C and C++ projects. Why make another build system? CMake and GNU Make are great build

Venkataram Edavamadathil Sivaram 18 Nov 13, 2022
Buildium-to-stessa - Automation to assist in converting Buildium transactions into Stessa format

Buildium Transactions - Stessa Transactions There is currently no third-party i

Austin Comstock 4 Apr 17, 2022
An example project that shows how to check if a certain macro is active in a file.

PlatformIO Check Compiler Flags Example Description Demonstrates the usage of an extra script and a special compilter invocation to get the active mac

Maximilian Gerhardt 1 Oct 28, 2021
An extended, game oriented, turtle

Burtle A Better TURTLE. Makes making games easier. write less do more!! Documentation & guide: https://alannxq.github.io/burtle/ Installation pip inst

5 May 19, 2022
This Open-Source project is great for sensor capture and storage solutions.

Phase 1 This project helps developers in the creation of extended realities that communicate with Arduino and require the security of blockchain stora

Wolfberry, LLC 10 Dec 28, 2022
Create a program for generator Truth Table

Python-Truth-Table-Ver-1.0 Create a program for generator Truth Table in here you have to install truth-table-generator module for python modules inst

JehanKandy 10 Jul 13, 2022
MODeflattener deobfuscates control flow flattened functions obfuscated by OLLVM using Miasm.

MODeflattener deobfuscates control flow flattened functions obfuscated by OLLVM using Miasm.

Suraj Malhotra 138 Jan 07, 2023