Functional interface for concurrent futures, including asynchronous I/O.

Overview

image image image image image image image image image

Futured provides a consistent interface for concurrent functional programming in Python. It wraps any callable to return a concurrent.futures.Future, wraps any async coroutine to return an asyncio.Future, and provides concurrent iterators and context managers for futures.

Usage

threaded, processed

Transform any callable into one which runs in a thread or process pool, and returns a future.

from futured import threaded, processed
import httpx

fetch = threaded(httpx.Client().get)
fetch(url)  # return Future

fs = (fetch(url + path) for path in paths)
threaded.results(fs)  # generate results from futures
threaded.results(fs, timeout=...)  # generate results as completed

fetch.map(urls)  # generate results in order
fetch.map(urls, timeout=...)  # generate results as completed
fetch.mapzip(urls)  # generate (url, result) pairs as completed

Thread and process pool executors may be used as context managers, customized with options, and reused with different callables.

threaded(max_workers=...)(func, ...)
processed(max_workers=...)(func, ...)

futured classes have a waiting context manager which collects results from tasks. Futures can be registered at creation, or appended to the list of tasks.

with threaded.waiting(*fs) as tasks:
    tasks.append(future)
tasks  # list of completed results

futured classes provide a tasks interface which generalizes futures.as_completed and futures.wait, while allowing the set of tasks to be modified, e.g., for retries.

threaded.tasks(fs, timeout=...)  # mutable set of running tasks which iterate as completed

asynced

The same interface works for asyncio.

from futured import asynced
import httpx

fetch = asynced(httpx.AsyncClient().get)
fetch(url)  # return coroutine

asynced.results(fs)  # generate results from futures
asynced.results(fs, timeout=...)  # generate results as completed

fetch.map(urls)  # generate results in order
fetch.map(urls, timeout=...)  # generate results as completed
fetch.mapzip(urls)  # generate (url, result) pairs as completed

asynced provides utilities for calling coroutines from a synchronous context. waiting is similar to trio's nursery, but returns results from a synchronous with block.

asynced.run(async_func, ...)  # call and run until complete
asynced.run(async_gen, ...)  # call and run synchronous iterator
with asynced.waiting(*fs) as tasks:  # concurrent coroutines completed in a block
asynced.tasks(fs, timeout=...)  # mutable set of running tasks which iterate as completed

decorators

Naturally futured wrappers can be used as decorators, but arguments can also be partially bound.

@threaded
def slow():
   ...

fetch = threaded(httpx.Client().get, url)
fetch(params=...)

Methods are supported, as well as a decorated utility for automatically subclassing.

from futured import decorated

FutureClient = decorated(httpx.Client, request=threaded)

 # equivalent to
class FutureClient(httpx.Client):
    request = threaded(httpx.Client.request)

command

command wraps subprocess.Popen to provide a Future compatible interface.

from futured import futured, command

command('ls').result()  # return stdout or raises stderr
command('ls').pipe('wc')  # pipes into next command, or | ('wc',... )
for line in command('ls'):  # iterable lines
command.coroutine('ls')  # return coroutine

futured(command, 'ls')  # supports `map` interface
asynced(command.coroutine, 'ls')  # supports `map` interface with timeout

forked

forked allows iteration in separate child processes.

from futured import forked

for value in forked(values, max_workers=...):
    # in a child process
 # in parent after children have exited

Installation

% pip install futured

Tests

100% branch coverage.

% pytest [--cov]

Changes

1.3

  • Python >=3.7 required
  • Python 3.10 event loop changes
  • Streams replaced with tasks

1.2

  • Python >=3.6 required

1.1

  • Stream completed futures from a pending pool

1.0

  • Executed functions are context managers
  • starmap supported

0.3

  • forked has optional maximum number of workers
  • waiting context manager
  • command pipes (|)
  • distributed.Client support

0.2

  • command.coroutine creates asyncio subprocesses
  • futured.mapzip generates results zipped with arguments
  • asynced.run supports asynchronous iterators
firefox session recovery

firefox session recovery

Ahmad Sadraei 5 Nov 29, 2022
Adversarial Robustness with Non-uniform Perturbations

Adversarial Robustness with Non-uniform Perturbations This repository hosts the code to replicate experiments of the paper Adversarial Robustness with

5 May 20, 2022
Time python - Códigos para auxiliar e mostrar formas de como fazer um relógio e manipular o seu tempo

Time_python Códigos para auxiliar e mostrar formas de como fazer um relógio e manipular o seu tempo. Bibliotecas Nestes foram usadas bibliotecas nativ

Eduardo Henrique 1 Jan 03, 2022
JPMC Virtual Experience

This repository contains the submitted patch files along with raw files of the various tasks assigned by JPMorgan Chase & Co. through its Software Engineering Virtual Experience Program on Forage (fo

Vardhini K 1 Dec 05, 2021
Projeto de análise de dados com SQL

Project-Analizyng-International-Debt-Statistics- Projeto de análise de dados com SQL - Plataforma Data Camp Descrição do Projeto : Não é que nós human

Lorrayne Silva 1 Feb 01, 2022
Pre-1.0 door/chest sound injector for Minecraft

doorjector Pre-1.0 door/chest sound injector for Minecraft. While the game is running, doorjector hotswaps the new sounds for the old right before the

Sam 1 Nov 20, 2021
dotfiles - Cristian Valero Abundio

In this repository you can find various configurations to configure your Linux operating system, preferably ArchLinux and its derivatives.

Cristian Valero Abundio 1 Jan 09, 2022
A simple method to create strong password.

A simple method to create strong password.

1 Jan 23, 2022
100 Days of Python Programming

100 days of Python Following the initiative of my friend Helber Belmiro, who is almost done with his 100 days of Java, I have decided to start my 100

Henrique Pereira 19 Nov 08, 2021
Wordle Solver

Wordle Solver Installation Install the following onto your computer: Python 3.10.x Download Page Run pip install -r requirements.txt Instructions To r

John Bucknam 1 Feb 15, 2022
The official Repository wherein newbies into Open Source can Contribute during the Hacktoberfest 2021

Hacktoberfest 2021 Get Started With your first Contrinution/Pull Request : Fork/Copy the repo by clicking the right most button on top of the page. Go

HacOkars 25 Aug 20, 2022
Oblique Strategies for Python

Oblique Strategies for Python

Łukasz Langa 3 Feb 17, 2022
Predicting Global Crop Yield for World Hunger

Crop Yield And Global Famine - The fifth project I created during my time at General Assembly. I completed this project with three other classmates in the span of three weeks. Most of my work was dir

Adam Muhammad Klesc 2 Jun 19, 2022
A simple and efficient computing package for Genshin Impact gacha analysis

GGanalysisLite计算包 这个版本的计算包追求计算速度,而GGanalysis包有着更多计算功能。 GGanalysisLite包通过卷积计算分布列,通过FFT和快速幂加速卷积计算。 测试玩家得到的排名值rank的数学意义是:与抽了同样数量五星的其他玩家相比,测试玩家花费的抽数大于等于比例

一棵平衡树 34 Nov 26, 2022
Programming labs for 6.S060 (Foundations of Computer Security).

6.S060 Labs This git repository contains the code for the labs in 6.S060. In these labs, you will add a series of security features to a photo-sharing

MIT PDOS 10 Nov 02, 2022
A collection of resources on neural rendering.

awesome neural rendering A collection of resources on neural rendering. Contributing If you think I have missed out on something (or) have any suggest

1.8k Dec 30, 2022
RecurrentArchitectures - See the accompanying blog post

Why this? What is the goal? The goal of this repository is to write all the recurrent architectures from scratch in tensorflow for learning purposes.

Debajyoti Datta 9 Feb 06, 2022
Code repository for the Pytheas submersible observation platform

Pytheas Main repository for the Pytheas submersible probe system. List of Acronyms/Terms USP - Underwater Sensor Platform - The primary platform in th

UltraChip 2 Nov 19, 2022
Async Python Circuit Breaker implementation

aiocircuitbreaker This is an async Python implementation of the circuitbreaker library. Installation The project is available on PyPI. Simply run: $ p

5 Sep 05, 2022
Demodulate and error correct FIS-B and ADS-B signals on 978 MHz.

FIS-B 978 ('fisb-978') is a set of programs that demodulates and error corrects FIS-B (Flight Information System - Broadcast) and ADS-B (Automatic Dep

2 Nov 15, 2022