A Python concurrency scheduling library, compatible with asyncio and trio.

Overview

aiometer

Build Status Coverage Python versions Package version

aiometer is a Python 3.6+ concurrency scheduling library compatible with asyncio and trio and inspired by Trimeter. It makes it easier to execute lots of tasks concurrently while controlling concurrency limits (i.e. applying backpressure) and collecting results in a predictable manner.

Content

Example

Let's use HTTPX to make web requests concurrently...

Try this code interactively using IPython.

>> requests = [ ... httpx.Request("POST", "https://httpbin.org/anything", json={"index": index}) ... for index in range(100) ... ] ... >>> # Send requests, and process responses as they're made available: >>> async with aiometer.amap( ... functools.partial(fetch, client), ... requests, ... max_at_once=10, # Limit maximum number of concurrently running tasks. ... max_per_second=5, # Limit request rate to not overload the server. ... ) as results: ... async for data in results: ... print(data) ... {'index': 3} {'index': 4} {'index': 1} {'index': 2} {'index': 0} ... >>> # Alternatively, fetch and aggregate responses into an (ordered) list... >>> jobs = [functools.partial(fetch, client, request) for request in requests] >>> results = await aiometer.run_all(jobs, max_at_once=10, max_per_second=5) >>> results [{'index': 0}, {'index': 1}, {'index': 2}, {'index': 3}, {'index': 4}, ...]">
>>> import asyncio
>>> import functools
>>> import random
>>> import aiometer
>>> import httpx
>>>
>>> client = httpx.AsyncClient()
>>>
>>> async def fetch(client, request):
...     response = await client.send(request)
...     # Simulate extra processing...
...     await asyncio.sleep(2 * random.random())
...     return response.json()["json"]
...
>>> requests = [
...     httpx.Request("POST", "https://httpbin.org/anything", json={"index": index})
...     for index in range(100)
... ]
...
>>> # Send requests, and process responses as they're made available:
>>> async with aiometer.amap(
...     functools.partial(fetch, client),
...     requests,
...     max_at_once=10, # Limit maximum number of concurrently running tasks.
...     max_per_second=5,  # Limit request rate to not overload the server.
... ) as results:
...     async for data in results:
...         print(data)
...
{'index': 3}
{'index': 4}
{'index': 1}
{'index': 2}
{'index': 0}
...
>>> # Alternatively, fetch and aggregate responses into an (ordered) list...
>>> jobs = [functools.partial(fetch, client, request) for request in requests]
>>> results = await aiometer.run_all(jobs, max_at_once=10, max_per_second=5)
>>> results
[{'index': 0}, {'index': 1}, {'index': 2}, {'index': 3}, {'index': 4}, ...]

Installation

This project is in beta and maturing. Be sure to pin any dependencies to the latest minor.

pip install "aiometer==0.3.*"

Features

  • Concurrency management and throttling helpers.
  • asyncio and trio support.
  • Fully type annotated.
  • 100% test coverage.

Guide

Flow control

The key highlight of aiometer is allowing you to apply flow control strategies in order to limit the degree of concurrency of your programs.

There are two knobs you can play with to fine-tune concurrency:

  • max_at_once: this is used to limit the maximum number of concurrently running tasks at any given time. (If you have 100 tasks and set max_at_once=10, then aiometer will ensure that no more than 10 run at the same time.)
  • max_per_second: this option limits the number of tasks spawned per second. This is useful to not overload I/O resources, such as servers that may have a rate limiting policy in place.

Example usage:

>>> import asyncio
>>> import aiometer
>>> async def make_query(query):
...     await asyncio.sleep(0.05)  # Simulate a database request.
...
>>> queries = ['SELECT * from authors'] * 1000
>>> # Allow at most 5 queries to run concurrently at any given time:
>>> await aiometer.run_on_each(make_query, queries, max_at_once=5)
...
>>> # Make at most 10 queries per second:
>>> await aiometer.run_on_each(make_query, queries, max_per_second=10)
...
>>> # Run at most 10 concurrent jobs, spawning new ones at least every 5 seconds:
>>> async def job(id):
...     await asyncio.sleep(10)  # A very long task.
...
>>> await aiometer.run_on_each(job, range(100),  max_at_once=10, max_per_second=0.2)

Running tasks

aiometer provides 4 different ways to run tasks concurrently in the form of 4 different run functions. Each function accepts all the options documented in Flow control, and runs tasks in a slightly different way, allowing to address a variety of use cases. Here's a handy table for reference:

Entrypoint Use case
run_on_each() Execute async callbacks in any order.
run_all() Return results as an ordered list.
amap() Iterate over results as they become available.
run_any() Return result of first completed function.

To illustrate the behavior of each run function, let's first setup a hello world async program:

>> async def greet(name): ... greeting = await get_greeting(name) ... print(greeting) ... >>> names = ["Robert", "Carmen", "Lucas"]">
>>> import asyncio
>>> import random
>>> from functools import partial
>>> import aiometer
>>>
>>> async def get_greeting(name):
...     await asyncio.sleep(random.random())  # Simulate I/O
...     return f"Hello, {name}"
...
>>> async def greet(name):
...     greeting = await get_greeting(name)
...     print(greeting)
...
>>> names = ["Robert", "Carmen", "Lucas"]

Let's start with run_on_each(). It executes an async function once for each item in a list passed as argument:

>>> await aiometer.run_on_each(greet, names)
'Hello, Robert!'
'Hello, Lucas!'
'Hello, Carmen!'

If we'd like to get the list of greetings in the same order as names, in a fashion similar to Promise.all(), we can use run_all():

>>> await aiometer.run_all([partial(get_greeting, name) for name in names])
['Hello, Robert', 'Hello, Carmen!', 'Hello, Lucas!']

amap() allows us to process each greeting as it becomes available (which means maintaining order is not guaranteed):

>>> async with aiometer.amap(get_greeting, names) as greetings:
...     async for greeting in greetings:
...         print(greeting)
'Hello, Lucas!'
'Hello, Robert!'
'Hello, Carmen!'

Lastly, run_any() can be used to run async functions until the first one completes, similarly to Promise.any():

>>> await aiometer.run_any([partial(get_greeting, name) for name in names])
'Hello, Carmen!'

As a last fun example, let's use amap() to implement a no-threads async version of sleep sort:

>>> import asyncio
>>> from functools import partial
>>> import aiometer
>>> numbers = [0.3, 0.1, 0.6, 0.2, 0.7, 0.5, 0.5, 0.2]
>>> async def process(n):
...     await asyncio.sleep(n)
...     return n
...
>>> async with aiometer.amap(process, numbers) as results:
...     sorted_numbers = [n async for n in results]
...
>>> sorted_numbers
[0.1, 0.2, 0.2, 0.3, 0.5, 0.5, 0.6, 0.7]

How To

Multiple parametrized values in run_on_each and amap

run_on_each and amap only accept functions that accept a single positional argument (i.e. (Any) -> Awaitable).

So if you have a function that is parametrized by multiple values, you should refactor it to match this form.

This can generally be achieved like this:

  1. Build a proxy container type (eg. a namedtuple), eg T.
  2. Refactor your function so that its signature is now (T) -> Awaitable.
  3. Build a list of these proxy containers, and pass it to aiometer.

For example, assuming you have a function that processes X/Y coordinates...

async def process(x: float, y: float) -> None:
    pass

xs = list(range(100))
ys = list(range(100))

for x, y in zip(xs, ys):
    await process(x, y)

You could use it with amap by refactoring it like this:

from typing import NamedTuple

# Proxy container type:
class Point(NamedTuple):
    x: float
    y: float

# Rewrite to accept a proxy as a single positional argument:
async def process(point: Point) -> None:
    x = point.x
    y = point.y
    ...

xs = list(range(100))
ys = list(range(100))

# Build a list of proxy containers:
points = [Point(x, y) for x, y in zip(x, y)]

# Use it:
async with aiometer.amap(process, points) as results:
    ...

License

MIT

Comments
  • Interaction with tenacity retry

    Interaction with tenacity retry

    Hi,

    I was wondering whether aiometer interacts well with tenacity.retry? Specifically, I would like to rate limit a large number of requests and retry requests.

    async with aiometer.amap(..., max_per_second=10)

    question 
    opened by Midnighter 5
  • Simplifying amap usage

    Simplifying amap usage

    Reading through the docs it came to my mind that the async with usage of amap doesn’t “look necessary” from a UX perspective.

    When using it we always have to do two things:

    • Enter a context with async with (looks like an implementation detail)
    • Iterate over results with async for

    As a user really only care about the second operation.

    So what if we moved async with inside the implementation of amap, so that users can just do...

    results = [result async for result in amap(process, items)]
    
    opened by florimondmanca 3
  • requests as list

    requests as list

    I'm using your library to try and drop catch some domain names, so I query the api with the same request several times per second. Do I have to always generate a list of requests with the same element repeated thousands of times or there is a more efficient way to do it? Thanks

    question 
    opened by lordcris 2
  • Switch to generic cell rate algorithm (GCRA) for `max_per_second`

    Switch to generic cell rate algorithm (GCRA) for `max_per_second`

    As suggested by @pgjones on Twitter 😄

    Used thes resources for the implementation:

    • https://gitlab.com/pgjones/quart-rate-limiter/-/blob/master/src/quart_rate_limiter/init.py#L252-282
    • https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm#Virtual_scheduling_description

    Also found a way to make tests for max_per_second much simpler/more reliable.

    opened by florimondmanca 2
  • Fix `ValueError: max() arg is an empty sequence`

    Fix `ValueError: max() arg is an empty sequence`

    Sometimes in this function :

    https://github.com/florimondmanca/aiometer/blob/17c8b70ce02838927f7f97bb7510c30e9f731e78/src/aiometer/_impl/utils.py#L8-L12

    The dct param is {}, so the max(dct) throws a ValueError: max() arg is an empty sequence. I just added a check before calling max().

    opened by mxrch 1
  • Expand docs: API reference, contributing guide

    Expand docs: API reference, contributing guide

    Thought we'd need a clearer listing of available pieces of public API. Adding a small contributing guide and a slight restructuring of the table of contents too.

    documentation 
    opened by florimondmanca 1
  • Release 0.3.0

    Release 0.3.0

    0.3.0 - 2021-07-05

    Changed

    • Update anyio dependency to v3 (previously v1). (Pull #25)
      • NB: no API change, but dependency mismatches may occur. Be sure to port your codebase to anyio v3 before upgrading aiometer.

    Added

    • Add support for Python 3.6 (installs the contextlib2 backport library there). (Pull #26)
    • Officialize support for Python 3.9. (Pull #26)
    opened by florimondmanca 1
  • Add a Gitter chat badge to README.md

    Add a Gitter chat badge to README.md

    florimondmanca/aiometer now has a Chat Room on Gitter

    @florimondmanca has just created a chat room. You can visit it here: https://gitter.im/florimondmanca-oss/aiometer.

    This pull-request adds this badge to your README.md:

    Gitter

    If my aim is a little off, please let me know.

    Happy chatting.

    PS: Click here if you would prefer not to receive automatic pull-requests from Gitter in future.

    opened by gitter-badger 1
  • Handling of exceptions and documentation

    Handling of exceptions and documentation

    Hi! Thank you for writing this useful library!

    I couldn't find any documentation about exceptions. Can we use aiometer.run_all and make it continue even if there are exceptions so that we can gather them all at the end?

    Thank you.

    opened by fersarr 0
  • Migrate to release-on-tag

    Migrate to release-on-tag

    I pushed tag 0.3.0 and associated release assuming it would auto-release to PyPI like for some other packages (asgi-lifespan, arel, etc), but the tooling is not in place yet. This PR moves to using the publish stage AZP template.

    opened by florimondmanca 0
  • Release 0.2.1

    Release 0.2.1

    0.2.1 - 2020-03-26

    Fixed

    • Improve robustness of the max_per_second implementation by using the generic cell rate algorithm (GCRA) instead of leaky bucket. (Pull #5)
    opened by florimondmanca 0
  • chore: relax typing-extensions version specifier

    chore: relax typing-extensions version specifier

    Relax typing-extensions version specifier to support typing-extensions v4. (#31)

    I use a library which relies on typing-extensions v4 and it will be great if aiometer allows to use that. (I tested with typing-extensions v4 and there is no issue)

    opened by ninoseki 2
  • Best practice for handling rate limits

    Best practice for handling rate limits

    I have bit of code which i have added aiometer. What iam trying to understand is how best to optimize it to get the maximum performance. The API iam calling has a rate limit of 300 requests per minute hence setting the max_per_second to 5 (if i understand this correctly) should be within the limits however, i hit 429's even then. My code is below:

    async def async_iterate_paginated_api(
        function: Callable[..., Awaitable[Any]], **kwargs: Any
    ) -> AsyncGenerator[List[Any], None]:
        """Return an async iterator over the results of any API."""
        page_number = 1
        response = await function(**kwargs, pageNumber=page_number)
        yield response.get("entities") 
        total_pages = response.get('pageCount')
        pages = [
            page
            for page in range(2, total_pages + 1)
        ]
        async def process(function: Callable[..., Awaitable[Any]], kwarg: Any, page: Any):
            response = await function(**kwarg,  pageNumber=page)
            return response.get('entities')
    
        async with aiometer.amap(functools.partial(process, function, kwargs), pages, max_per_second=5) as results:
            async for result in results:
                yield result
    

    Any feedback would be helpful.

    opened by iserialize 1
  • Update requirements

    Update requirements

    Could you update or loosen the version specifications in the requirements of the package? Specifically the restriction for typing-extensions~=3.10 is conflicting for me, as I need a newer version.

    good first issue 
    opened by nardi 1
  • swap the stream contexts with the taskgroup context

    swap the stream contexts with the taskgroup context

            with send_channel, receive_channel:
                async with anyio.create_task_group() as task_group:
    

    might be better as

            async with anyio.create_task_group() as task_group:
                with send_channel, receive_channel:
    

    this way closing the amap context manager (None, None, None)ly would allow currently running tasks to finish, and prevent new tasks being added. Closing the amap context manager (type[T], T, tb)ly would still cancel all tasks

    opened by graingert 5
Releases(0.3.0)
  • 0.3.0(Jul 6, 2021)

    0.3.0 - 2021-07-06

    Changed

    • Update anyio dependency to v3 (previously v1). (Pull #25)
      • NB: no API change, but dependency mismatches may occur. Be sure to port your codebase to anyio v3 before upgrading aiometer.

    Added

    • Add support for Python 3.6 (installs the contextlib2 backport library there). (Pull #26)
    • Officialize support for Python 3.9. (Pull #26)
    Source code(tar.gz)
    Source code(zip)
Owner
Florimond Manca
Pythonista, open source developer, casual tech blogger. Idealist on a journey, and it’s good fun!
Florimond Manca
A task scheduler with task scheduling, timing and task completion time tracking functions

A task scheduler with task scheduling, timing and task completion time tracking functions. Could be helpful for time management in daily life.

ArthurLCW 0 Jan 15, 2022
Remote task execution tool

Gunnery Gunnery is a multipurpose task execution tool for distributed systems with web-based interface. If your application is divided into multiple s

Gunnery 747 Nov 09, 2022
A Lightweight Cluster/Cloud VM Job Management Tool 🚀

Lightweight Cluster/Cloud VM Job Management 🚀 Are you looking for a tool to manage your training runs locally, on Slurm/Open Grid Engine clusters, SS

29 Dec 12, 2022
Ffxiv-blended-job-icons - All action icons for each class/job are blended together to create new backgrounds for each job/class icon!

ffxiv-blended-job-icons All action icons for each class/job are blended together to create new backgrounds for each job/class icon! I used python to c

Jon Strutz 2 Jul 07, 2022
Python-Repeated-Timer is an open-source & highly performing timer using only standard-libraries.

Python Repeated Timer Python-Repeated-Timer is an open-source & highly performing timer using only standard-libraries.

TACKHYUN JUNG 3 Oct 09, 2022
Aiorq is a distributed task queue with asyncio and redis

Aiorq is a distributed task queue with asyncio and redis, which rewrite from arq to make improvement and include web interface.

PY-GZKY 5 Mar 18, 2022
A calendaring app for Django. It is now stable, Please feel free to use it now. Active development has been taken over by bartekgorny.

Django-schedule A calendaring/scheduling application, featuring: one-time and recurring events calendar exceptions (occurrences changed or cancelled)

Tony Hauber 814 Dec 26, 2022
dragonscales is a highly customizable asynchronous job-scheduler framework

dragonscales 🐉 dragonscales is a highly customizable asynchronous job-scheduler framework. This framework is used to scale the execution of multiple

Sorcero 2 May 16, 2022
Crontab jobs management in Python

Plan Plan is a Python package for writing and deploying cron jobs. Plan will convert Python code to cron syntax. You can easily manage you

Shipeng Feng 1.2k Dec 28, 2022
A Python concurrency scheduling library, compatible with asyncio and trio.

aiometer aiometer is a Python 3.6+ concurrency scheduling library compatible with asyncio and trio and inspired by Trimeter. It makes it easier to exe

Florimond Manca 182 Dec 26, 2022
Here is the live demonstration of endpoints and celery worker along with RabbitMQ

whelp-task Here is the live demonstration of endpoints and celery worker along with RabbitMQ Before running the application make sure that you have yo

Yalchin403 0 Nov 14, 2021
Automate SQL Jobs Monitoring with python

Automate_SQLJobsMonitoring_python Using python 3rd party modules we can automate

Aejaz Ayaz 1 Dec 27, 2021
The easiest way to automate your data

Hello, world! 👋 We've rebuilt data engineering for the data science era. Prefect is a new workflow management system, designed for modern infrastruct

Prefect 10.9k Jan 04, 2023
Vertigo is an application used to schedule @code4tomorrow classes.

Vertigo Vertigo is an application used to schedule @code4tomorrow classes. It uses the Google Sheets API and is deployed using AWS. Documentation Lear

Ben Nguyen 4 Feb 10, 2022
CoSA: Scheduling by Constrained Optimization for Spatial Accelerators

CoSA is a scheduler for spatial DNN accelerators that generate high-performance schedules in one shot using mixed integer programming

UC Berkeley Architecture Research 44 Dec 13, 2022
A flexible python library for building your own cron-like system, with REST APIs and a Web UI.

Nextdoor Scheduler ndscheduler is a flexible python library for building your own cron-like system to schedule jobs, which is to run a tornado process

1k Dec 15, 2022
A powerful workflow engine implemented in pure Python

Spiff Workflow Summary Spiff Workflow is a workflow engine implemented in pure Python. It is based on the excellent work of the Workflow Patterns init

Samuel 1.3k Jan 08, 2023
Another Scheduler is a Kubernetes controller that automatically starts, stops, or restarts pods from a deployment at a specified time using a cron annotation.

Another Scheduler Another Scheduler is a Kubernetes controller that automatically starts, stops, or restarts pods from a deployment at a specified tim

Diego Najar 66 Nov 19, 2022
Clepsydra is a mini framework for task scheduling

Intro Clepsydra is a mini framework for task scheduling All parts are designed to be replaceable. Main ideas are: No pickle! Tasks are stored in reada

Andrey Tikhonov 15 Nov 04, 2022
Python job scheduling for humans.

schedule Python job scheduling for humans. Run Python functions (or any other callable) periodically using a friendly syntax. A simple to use API for

Dan Bader 10.4k Jan 02, 2023