Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
A script to generate NFT art living on the Solana blockchain.

NFT Generator This script generates NFT art based on its desired traits with their specific rarities. It has been used to generate the full collection

Rude Golems 24 Oct 08, 2022
Vehicle Identification Speed Detection (VISD) extracts vehicle information like License Plate number, Manufacturer and colour from a video and provides this data in the form of a CSV file

Vehicle Identification Speed Detection (VISD) extracts vehicle information like License Plate number, Manufacturer and colour from a video and provides this data in the form of a CSV file. VISD can a

6 Feb 22, 2022
Projects using the Tkinter module in Python!

Tkinter projects This repository includes some Tkinter projects made by me. All of these are simple to understand. I create apps with good functionali

Amey 0 Sep 24, 2021
Mute your mic while you're typing. An app for Ubuntu.

Hushboard Mute your microphone while typing, for Ubuntu. Install from kryogenix.org/code/hushboard/. Installation We recommend you install Hushboard t

Stuart Langridge 142 Jan 05, 2023
Bionic is Python Framework for crafting beautiful, fast user experiences for web and is free and open source.

Bionic is Python Framework for crafting beautiful, fast user experiences for web and is free and open source. Getting Started This is an example of ho

14 Apr 10, 2022
Jack Morgan's Advent of Code Solutions

Advent-of-Code Jack Morgan's Advent of Code Solutions Usage Run . initiate.sh year day To initiate a day. This sets up a template python file, and pul

Jack Morgan 1 Dec 10, 2021
An app that mirrors your phone to your compute and maps controller input to the screen

What is 'Dragalia Control'? An app that mirrors your phone to your compute and maps controller input to the screen. Inputs are mapped specifically for

1 May 03, 2022
Code needed for hybrid land cover change analysis for NASA IDS project

Documentation for the NASA IDS change analysis Poley 10/21/2021 Required python packages: whitebox numpy rasterio rasterio.mask os glob math itertools

Andrew Poley 2 Nov 12, 2021
Originally used during Marketplace.tf's open period, this program was used to get the profit of items bought with keys and sold for dollars.

Originally used during Marketplace.tf's open period, this program was used to get the profit of items bought with keys and sold for dollars. Practically useless for me now, but can be used as an exam

BoggoTV 1 Dec 11, 2021
It converts ING BANK account historic into a csv file you can import in HomeBank application.

ing2homebank It converts your ING Bank account historic csv file into another csv file you can import in HomeBank application

1 Feb 14, 2022
Simple Crud Python vs MySQL

Simple Crud Python vs MySQL The idea came when I was studying MySQ... A desire to create a python program that can give access to a "localhost" databa

Lucas 1 Jan 21, 2022
System Design Assignments as part of Arpit's System Design Masterclass

System Design Assignments The repository contains a set of problem statements around Software Architecture and System Design as conducted by Arpit's S

Relog 1.1k Jan 09, 2023
This is a python package to get wards, districts,cities and provinces in Zimbabwe

Zim-Places Features This is a python package that allows you to search for cities, provinces, and districts in Zimbabwe.Zimbabwe is split into eight p

RONALD KANYEPI 2 Mar 01, 2022
YunoHost is an operating system aiming to simplify as much as possible the administration of a server.

YunoHost is an operating system aiming to simplify as much as possible the administration of a server. This repository corresponds to the core code, written mostly in Python and Bash.

YunoHost 1.5k Jan 09, 2023
Macros in Python: quasiquotes, case classes, LINQ and more!

MacroPy3 1.1.0b2 MacroPy is an implementation of Syntactic Macros in the Python Programming Language. MacroPy provides a mechanism for user-defined fu

Li Haoyi 3.2k Jan 06, 2023
A Python package for searching journal publications and researchers

scholarpy A python package for searching journal publications and researchers Free software: MIT license Documentation: https://giswqs.github.io/schol

Qiusheng Wu 8 Mar 12, 2022
Anki Addon idea by gbrl.sc to see previous ratings of a card in the reviewer

Card History At A Glance Stop having to press card browser and ctrl+i for every card and then WINCING to see it's history of reviews FEATURES Visualiz

Jerry Zhou 11 Dec 19, 2022
Automatically find solutions when your Python code encounters an issue.

What The Python?! Helping you find answers to the errors Python spits out. Installation You can find the source code on GitHub at: https://github.com/

What The Python?! 139 Dec 14, 2022
A Bot that adds YouTube views to your video of choice

YoutubeViews Free Youtube viewer bot A Bot that adds YouTube views to your video of choice Installation git clone https://github.com/davdtheemonk/Yout

ProbablyX 5 Dec 06, 2022
A simple python script that print the Mandelbrot set for every power of the formal formula.

Python Mandelbrot A simple python script that print the Mandelbrot set for every power of the formal formula.

Paride Giunta 2 Apr 15, 2022