Unsynchronize asyncio by using an ambient event loop, or executing in separate threads or processes.

Overview

unsync

Unsynchronize asyncio by using an ambient event loop, or executing in separate threads or processes.

Quick Overview

Functions marked with the @unsync decorator will behave in one of the following ways:

  • async functions will run in the unsync.loop event loop executed from unsync.thread
  • Regular functions will execute in unsync.thread_executor, a ThreadPoolExecutor
    • Useful for IO bounded work that does not support asyncio
  • Regular functions marked with @unsync(cpu_bound=True) will execute in unsync.process_executor, a ProcessPoolExecutor
    • Useful for CPU bounded work

All @unsync functions will return an Unfuture object. This new future type combines the behavior of asyncio.Future and concurrent.Future with the following changes:

  • Unfuture.set_result is threadsafe unlike asyncio.Future
  • Unfuture instances can be awaited, even if made from concurrent.Future
  • Unfuture.result() is a blocking operation except in unsync.loop/unsync.thread where it behaves like asyncio.Future.result and will throw an exception if the future is not done

Examples

Simple Sleep

A simple sleeping example with asyncio:

async def sync_async():
    await asyncio.sleep(1)
    return 'I hate event loops'


async def main():
    future1 = asyncio.create_task(sync_async())
    future2 = asyncio.create_task(sync_async())

    await future1, future2

    print(future1.result() + future2.result())

asyncio.run(main())
# Takes 1 second to run

Same example with unsync:

@unsync
async def unsync_async():
    await asyncio.sleep(1)
    return 'I like decorators'

unfuture1 = unsync_async()
unfuture2 = unsync_async()
print(unfuture1.result() + unfuture2.result())
# Takes 1 second to run

Multi-threading an IO-bound function

Synchronous functions can be made to run asynchronously by executing them in a concurrent.ThreadPoolExecutor. This can be easily accomplished by marking the regular function @unsync.

@unsync
def non_async_function(seconds):
    time.sleep(seconds)
    return 'Run concurrently!'

start = time.time()
tasks = [non_async_function(0.1) for _ in range(10)]
print([task.result() for task in tasks])
print('Executed in {} seconds'.format(time.time() - start))

Which prints:

['Run concurrently!', 'Run concurrently!', ...]
Executed in 0.10807514190673828 seconds

Continuations

Using Unfuture.then chains asynchronous calls and returns an Unfuture that wraps both the source, and continuation. The continuation is invoked with the source Unfuture as the first argument. Continuations can be regular functions (which will execute synchronously), or @unsync functions.

@unsync
async def initiate(request):
    await asyncio.sleep(0.1)
    return request + 1

@unsync
async def process(task):
    await asyncio.sleep(0.1)
    return task.result() * 2

start = time.time()
print(initiate(3).then(process).result())
print('Executed in {} seconds'.format(time.time() - start))

Which prints:

8
Executed in 0.20314741134643555 seconds

Mixing methods

We'll start by converting a regular synchronous function into a threaded Unfuture which will begin our request.

@unsync
def non_async_function(num):
    time.sleep(0.1)
    return num, num + 1

We may want to refine the result in another function, so we define the following continuation.

@unsync
async def result_continuation(task):
    await asyncio.sleep(0.1)
    num, res = task.result()
    return num, res * 2

We then aggregate all the results into a single dictionary in an async function.

@unsync
async def result_processor(tasks):
    output = {}
    for task in tasks:
        num, res = await task
        output[num] = res
    return output

Executing the full chain of non_async_functionresult_continuationresult_processor would look like:

start = time.time()
print(result_processor([non_async_function(i).then(result_continuation) for i in range(10)]).result())
print('Executed in {} seconds'.format(time.time() - start))

Which prints:

{0: 2, 1: 4, 2: 6, 3: 8, 4: 10, 5: 12, 6: 14, 7: 16, 8: 18, 9: 20}
Executed in 0.22115683555603027 seconds

Preserving typing

As far as we know it is not possible to change the return type of a method or function using a decorator. Therefore, we need a workaround to properly use IntelliSense. You have three options in general:

  1. Ignore type warnings.

  2. Use a suppression statement where you reach the type warning.

    A. When defining the unsynced method by changing the return type to an Unfuture.

    B. When using the unsynced method.

  3. Wrap the function without a decorator. Example:

    def function_name(x: str) -> Unfuture[str]:
        async_method = unsync(__function_name_synced)
        return async_method(x)
    
    def __function_name_synced(x: str) -> str:
        return x + 'a'
    
    future_result = function_name('b')
    self.assertEqual('ba', future_result.result())

Custom Event Loops

In order to use custom event loops, be sure to set the event loop policy before calling any @unsync methods. For example, to use uvloop simply:

import unsync
import uvloop

@unsync
async def main():
    # Main entry-point.
    ...

uvloop.install() # Equivalent to asyncio.set_event_loop_policy(EventLoopPolicy())
main()
Owner
Alex Sherman
Alex Sherman
SCOOP (Scalable COncurrent Operations in Python)

SCOOP (Scalable COncurrent Operations in Python) is a distributed task module allowing concurrent parallel programming on various environments, from h

Yannick Hold 573 Dec 27, 2022
rosny is a lightweight library for building concurrent systems.

rosny is a lightweight library for building concurrent systems. Installation Tested on: Linux Python = 3.6 From pip: pip install rosny From source: p

Ruslan Baikulov 6 Oct 05, 2021
Unsynchronize asyncio by using an ambient event loop, or executing in separate threads or processes.

unsync Unsynchronize asyncio by using an ambient event loop, or executing in separate threads or processes. Quick Overview Functions marked with the @

Alex Sherman 802 Dec 28, 2022
A concurrent sync tool which works with multiple sources and targets.

Concurrent Sync A concurrent sync tool which works similar to rsync. It supports syncing given sources with multiple targets concurrently. Requirement

Halit Şimşek 2 Jan 11, 2022
Python Multithreading without GIL

Multithreaded Python without the GIL

Sam Gross 2.3k Jan 05, 2023
Ultra fast asyncio event loop.

uvloop is a fast, drop-in replacement of the built-in asyncio event loop. uvloop is implemented in Cython and uses libuv under the hood. The project d

magicstack 9.1k Jan 07, 2023
Trio – a friendly Python library for async concurrency and I/O

Trio – a friendly Python library for async concurrency and I/O The Trio project aims to produce a production-quality, permissively licensed, async/awa

5k Jan 07, 2023
🌀 Pykka makes it easier to build concurrent applications.

🌀 Pykka Pykka makes it easier to build concurrent applications. Pykka is a Python implementation of the actor model. The actor model introduces some

Stein Magnus Jodal 1.1k Dec 30, 2022
A curated list of awesome Python asyncio frameworks, libraries, software and resources

Awesome asyncio A carefully curated list of awesome Python asyncio frameworks, libraries, software and resources. The Python asyncio module introduced

Timo Furrer 3.8k Jan 08, 2023
AnyIO is an asynchronous networking and concurrency library that works on top of either asyncio or trio.

AnyIO is an asynchronous networking and concurrency library that works on top of either asyncio or trio. It implements trio-like structured concurrenc

Alex Grönholm 1.1k Jan 06, 2023
aiomisc - miscellaneous utils for asyncio

aiomisc - miscellaneous utils for asyncio Miscellaneous utils for asyncio. The complete documentation is available in the following languages: English

aiokitchen 295 Jan 08, 2023
Parallelformers: An Efficient Model Parallelization Toolkit for Deployment

Parallelformers: An Efficient Model Parallelization Toolkit for Deployment

TUNiB 559 Dec 28, 2022
Backport of the concurrent.futures package to Python 2.6 and 2.7

This is a backport of the concurrent.futures standard library module to Python 2. It does not work on Python 3 due to Python 2 syntax being used in th

Alex Grönholm 224 Nov 07, 2022
A Python package for easy multiprocessing, but faster than multiprocessing

MPIRE, short for MultiProcessing Is Really Easy, is a Python package for multiprocessing, but faster and more user-friendly than the default multiprocessing package.

753 Dec 29, 2022
A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs.

A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs, supporting both control flow and dataflow execution paradigms as well as de-centrali

102 Jan 06, 2023
Simple package to enhance Python's concurrent.futures for memory efficiency

future-map is a Python library to use together with the official concurrent.futures module.

Arai Hiroki 2 Nov 15, 2022
Thread-safe asyncio-aware queue for Python

Mixed sync-async queue, supposed to be used for communicating between classic synchronous (threaded) code and asynchronous

aio-libs 665 Jan 03, 2023
Raise asynchronous exceptions in other thread, control the timeout of blocks or callables with a context manager or a decorator

stopit Raise asynchronous exceptions in other threads, control the timeout of blocks or callables with two context managers and two decorators. Attent

Gilles Lenfant 97 Oct 12, 2022
Jug: A Task-Based Parallelization Framework

Jug: A Task-Based Parallelization Framework Jug allows you to write code that is broken up into tasks and run different tasks on different processors.

Luis Pedro Coelho 387 Dec 21, 2022