Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
Flask html response minifier

Flask-HTMLmin Minify flask text/html mime type responses. Just add MINIFY_HTML = True to your deployment config to minify HTML and text responses of y

Hamid Feizabadi 85 Dec 07, 2022
Create beautiful diagrams just by typing mathematical notation in plain text.

Penrose Penrose is an early-stage system that is still in development. Our system is not ready for contributions or public use yet, but hopefully will

Penrose 5.6k Jan 08, 2023
🐍 A Python lib for (de)serializing Python objects to/from JSON

Turn Python objects into dicts or (json)strings and back No changes required to your objects Easily customizable and extendable Works with dataclasses

Ramon Hagenaars 253 Dec 14, 2022
Python Classes Without Boilerplate

attrs is the Python package that will bring back the joy of writing classes by relieving you from the drudgery of implementing object protocols (aka d

The attrs Cabal 4.6k Jan 02, 2023
Nesse repositório serão armazenados os conteúdos de aula

Lets_Code_DS_Degree_Alunos Nesse repositório serão armazenados os conteúdos de aula Formato das aulas: Notebook de aula já vem comentado para reduzir

Patricia Bongiovanni Catandi 6 Jan 21, 2022
A basic interpreted programming language written in python

shin A basic interpreted programming language written in python. extension You can use our own extension ".shin". Example: main.shin How to start Clon

12 Nov 04, 2022
Audio-analytics for music-producers! Automate tedious tasks such as musical scale detection, BPM rate classification and audio file conversion.

Click here to be re-directed to the Beat Inspect Streamlit Web-App You are a music producer? Let's get in touch via LinkedIn Fundamental Analytics for

Stefan Rummer 11 Dec 27, 2022
Vita Specific Patches and Application for Doki Doki Literature Club (Steam Version) using Ren'Py PSVita

Doki-Doki-Literature-Club-Vita Vita Specific Patches and Application for Doki Doki Literature Club (Steam Version) using Ren'Py PSVita Contains: Modif

Jaylon Gowie 25 Dec 30, 2022
An evolutionary multi-agent platform based on mesa and NEAT

An evolutionary multi-agent platform based on mesa and NEAT

Valerio1988 6 Dec 04, 2022
A tool to quickly create codeforces contest directories with templates.

Codeforces Template Tool I created this tool to help me quickly set up codeforces contests/singular problems with templates. Tested for windows, shoul

1 Jun 02, 2022
A few of my adventures with Devito.

Devito-playbox A few of my adventures with Devito. This repository contains a few notebooks and scripts that will lead me in the road of learning this

Átila Saraiva Quintela Soares 1 Feb 08, 2022
Random Turkish name generator with realistic probabilities.

trnames Random Turkish name generator with realistic probabilities. Based on Trey Hunner's names package. Installation The package can be installed us

Kaan Öztürk 20 Jan 02, 2023
Strawberry Benchmark With Python

Strawberry benchmarks these benchmarks have been made to compare the performance of dataloaders and joined database queries. How to use You can run th

Doctor 4 Feb 23, 2022
Esercizi di Python svolti per il biennio di Tecnologie Informatiche.

Esercizi di Python Un piccolo aiuto per Sofia che nel 2° quadrimestre inizierà Python :) Questo repository (termine tecnico di Git) puoi trovare tutti

Leonardo Essam Dei Rossi 2 Nov 07, 2022
A python script to make leaderboards using a CSV with the runners name, IDs and Flag Emojis

SrcLbMaker A python script to make speedrun.com global leaderboards. Installation You need python 3.6 or higher. First, go to the folder where you wan

2 Jul 25, 2022
A streaming animation of all the edits to a given Wikipedia page.

WikiFilms! What is it? A streaming animation of all the edits to a given Wikipedia page. How it works. It works by creating a "virtual camera," which

Tal Zaken 2 Jan 18, 2022
The Official Jaseci Code Repository

Jaseci Release Notes Version 1.2.2 Updates Added new built-ins for nodes and edges (context, info, and details) Fixed dot output Added reset command t

136 Dec 20, 2022
a url shortener with fastapi and tortoise-orm

fastapi-tortoise-orm-url-shortener a url shortener with fastapi and tortoise-orm

19 Aug 12, 2022
A Notifier Program that Notifies you to relax your eyes Every 15 Minutes👀

Every 15 Minutes is an application that is used to Notify you to Relax your eyes Every 15 Minutes, This is fully made with Python and also with the us

FSP Gang s' YT 2 Nov 11, 2021
Tool for running a high throughput data ingestion/transformation workload with MongoDB

Mongo Mangler The mongo-mangler tool is a lightweight Python utility, which you can run from a low-powered machine to execute a high throughput data i

Paul Done 9 Jan 02, 2023