Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
A simple, light-weight and highly maintainable online judge system for secondary education

y³OJ a simple, light-weight and highly maintainable online judge system for secondary education 一个简单、轻量化、易于维护的、为中学信息技术学科课业教学设计的 Online Judge 系统。 Onlin

20 Oct 04, 2022
Example python package with pybind11 cpp extension

Developing C++ extension in Python using pybind11 This is a summary of the commands used in the tutorial.

55 Sep 04, 2022
Fully coded Apps by Codex.

OpenAI-Codex-Code-Generation Fully coded Apps by Codex. How I use Codex in VSCode to generate multiple completions with autosorting by highest "mean p

nanowell 47 Jan 01, 2023
Stocks Trading News Alert Using Python

Stocks-Trading-News-Alert-Using-Python Ever Thought of Buying Shares of your Dream Company, When their stock price got down? But It is not possible to

Ayush Verma 3 Jul 29, 2022
Integration of Hotwire's Turbo library with Flask.

turbo-flask Integration of Hotwire's Turbo library with Flask, to allow you to create applications that look and feel like single-page apps without us

Miguel Grinberg 240 Jan 06, 2023
Appointment Tracker that allows user to input client information and update if needed.

Appointment-Tracker Appointment Tracker allows an assigned admin to input client information regarding their appointment and their appointment time. T

IS Coding @ KSU 1 Nov 30, 2021
Discover and load entry points from installed packages

Entry points are a way for Python packages to advertise objects with some common interface. The most common examples are console_scripts entry points,

Thomas Kluyver 69 Jul 05, 2022
Test pour savoir si je suis capable de paratger une lib avec le monde entier !!

Data analysis Document here the project: MLproject Description: Project Description Data Source: Type of analysis: Please document the project the bet

Lucas_Penarrubia 0 Jan 18, 2022
Back-end API for the reternal framework

RE:TERNAL RE:TERNAL is a centralised purple team simulation platform. Reternal uses agents installed on a simulation network to execute various known

Joey Dreijer 7 Apr 15, 2022
Type Persian without confusing words for yourself and others, in Adobe Connect

About In the Adobe Connect chat section, to type in Persian or Arabic, the written words will be confused and will be written and sent illegibly (This

Matin Najafi 23 Nov 26, 2021
Backup dc registry - A simple POC that abuses Backup Operator privileges to remote dump SAM, SYSTEM, and SECURITY

Backup Operator Registry Backup to Domain Compromise A simple POC that abuses Ba

Horizon 3 AI Inc 57 Dec 18, 2022
Interpreting-compiling programming language.

HoneyASM The programming language written on Python, which can be as interpreted as compiled. HoneyASM is easy for use very optimized PL, which can so

TalismanChet 1 Dec 25, 2021
OnTime is a small python that you set a time and on that time, app will send you notification and also play an alarm.

OnTime Always be OnTime! What is OnTime? OnTime is a small python that you set a time and on that time, app will send you notification and also play a

AmirHossein Mohammadi 11 Jan 16, 2022
About A python based Apple Quicktime protocol,you can record audio and video from real iOS devices

介绍 本应用程序使用 python 实现,可以通过 USB 连接 iOS 设备进行屏幕共享 高帧率(30〜60fps) 高画质 低延迟(200ms) 非侵入性 支持多设备并行 Mac OSX 安装 python =3.7 brew install libusb pkg-config 如需使用 g

YueC 124 Nov 30, 2022
Herramienta para pentesting web.

iTell 🕴 ¡Tool con herramientas para pentesting web! Metodos ❣ DDoS Attacks Recon Active Recon (Vulns) Extras (Bypass CF, FTP && SSH Bruter) Respons

1 Jul 28, 2022
App and Python library for parsing, writing, and validation of the STAND013 file format.

python-stand013 python-stand013 is a Python app and library for parsing, writing, and validation of the STAND013 file format. Features The following i

Oda 3 Nov 09, 2022
Web App for University Project

University Project About I made this web app to finish a project assigned by my teacher. It is written entirely in Python, thanks to streamlit to make

15 Nov 27, 2022
BOHB tune library template (included example)

BOHB-template 실행 방법 python main.py 2021-10-10 기준 tf keras 버전 (tunecallback 방식) 완료 tf gradienttape 버전 (train_iteration 방식) 완료 pytorch 버전은 구현 준비중 방법 소개

Seungwoo Han 5 Mar 24, 2022
More granular intermediaries for legacy Minecraft versions

Orinthe/Intermediary mappings This repository contains the match information between different versions of Minecraft created by the Orinthe project, a

4 Jan 11, 2022
Source-o-grapher is a tool built with the aim to investigate software resilience aspects of Open Source Software (OSS) projects.

Source-o-grapher is a tool built with the aim to investigate software resilience aspects of Open Source Software (OSS) projects.

Aristotle University 5 Jun 28, 2022