Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
El Niño - Southern Oscillation analysis compared to minimum flow rates of rivers in northeast Brazil

ENSO (El Niño - Southern Oscillation) analysis in northeast Brazil É comprovada a influência dos fenômenos El Niño e La Niña nas secas no nordesde bra

Weyder Freire 1 Jan 13, 2022
For when you really need to rank things

Comparisonator For when you really need to rank things. Do you know that feeling when there's this urge deep within you that tells you to compare thin

Maciej Wilczyński 1 Nov 01, 2021
Iss-tracker - ISS tracking script in python using NASA's API

ISS Tracker Tracking International Space Station using NASA's API and plotting i

Partho 9 Nov 29, 2022
More routines for operating on iterables, beyond itertools

More Itertools Python's itertools library is a gem - you can compose elegant solutions for a variety of problems with the functions it provides. In mo

2.8k Jan 02, 2023
Addon for Blender 2.8+ that automatically creates NLA tracks for all animations. Useful for GLTF export.

PushDownAll An addon for Blender 2.8+ that runs Push Down on all animations, creating NLA tracks for each. This is useful if you have an object with m

Cory Petkovsek 16 Oct 06, 2022
Let's make a lot of random function from Scracth...

Pseudo-Random On a whim I asked myself the question about how randomness is integrated into an algorithm? So I started the adventure by trying to code

Yacine 2 Jan 19, 2022
WGGCommute - Adding Commute Times to WG-Gesucht Listings

WGGCommute - Adding Commute Times to WG-Gesucht Listings This is a barebones implementation of a chrome extension that can be used to add commute time

Jannis 2 Jul 20, 2022
A fluid medium for storing, relating, and surfacing thoughts.

Conceptarium A fluid medium for storing, relating, and surfacing thoughts. Read more... Instructions The conceptarium takes up about 1GB RAM when runn

115 Dec 19, 2022
Performance data for WASM SIMD instructions.

WASM SIMD Data This repository contains code and data which can be used to generate a JSON file containing information about the WASM SIMD proposal. F

Evan Nemerson 5 Jul 24, 2022
Usos Semester average helper

Usos Semester average helper Dzieki temu skryptowi mozesz sprawdzic srednia ocen na kazdy odbyty przez ciebie semestr PARAMETERS required: '--username

2 Jan 17, 2022
Chemical Analysis Calculator, with full solution display.

Chemicology Chemical Analysis Calculator, to solve problems efficiently by displaying whole solution. Go to releases for downloading .exe, .dmg, Linux

Muhammad Moazzam 2 Aug 06, 2022
A sandpit for textual related things

A sandpit repo for testing textual related things.

Craig Gumbley 1 Nov 08, 2021
Pygments is a generic syntax highlighter written in Python

Welcome to Pygments This is the source of Pygments. It is a generic syntax highlighter written in Python that supports over 500 languages and text for

1.2k Jan 06, 2023
ColabFold / AlphaFold2_advanced on your local PC (or macOS)

LocalColabFold ColabFold / AlphaFold2_advanced on your local PC (or macOS) Installation For Linux Make sure curl and wget commands are already install

Yoshitaka Moriwaki 207 Dec 22, 2022
This is a a CSMA/CA simulator written in Python based on simulator of the same type

This is a a CSMA/CA simulator written in Python based on simulator of the same type found the link https://github.com/StevenSLXie/CSMA-Simulator with

M. Ismail 4 Nov 22, 2022
A guy with a lot of useful things to do when doing AtCoder in Python

atcoder_python_env Python で AtCoder をやるときに便利な諸々を用意したやつ コンテスト用フォルダの作成 セットアップ 自動テス

2 Dec 28, 2021
An Advent calendar of small programming puzzles for a variety of skill sets and skill levels.

Advent of Code 2021 The Advent of Code is an Advent calendar of small programming puzzles for a variety of skill sets and skill levels that can be sol

Evan Cope 0 Feb 13, 2022
PIP VA TASHQI KUTUBXONALAR

39-dars PIP VA TASHQI KUTUBXONALAR KIRISH Avvalgi darsimizda Python bilan birga o'rnatluvchi, standart kutubxona va undagi ba'zi foydali modullar bila

Sayfiddin 3 Nov 25, 2021
A country information finder module

A country information finder module

Fayas Noushad 3 Nov 28, 2021
Automates the fixing of problems reported by yamllint by parsing its output

yamlfixer yamlfixer automates the fixing of problems reported by yamllint by parsing its output. Usage This software automatically fixes some errors a

OPT Nouvelle Caledonie 26 Dec 26, 2022