Futured provides a consistent interface for concurrent functional programming in Python. It wraps any callable to return a concurrent.futures.Future
, wraps any async coroutine to return an asyncio.Future
, and provides concurrent iterators and context managers for futures.
Usage
threaded, processed
Transform any callable into one which runs in a thread or process pool, and returns a future.
from futured import threaded, processed
import httpx
fetch = threaded(httpx.Client().get)
fetch(url) # return Future
fs = (fetch(url + path) for path in paths)
threaded.results(fs) # generate results from futures
threaded.results(fs, timeout=...) # generate results as completed
fetch.map(urls) # generate results in order
fetch.map(urls, timeout=...) # generate results as completed
fetch.mapzip(urls) # generate (url, result) pairs as completed
Thread and process pool executors may be used as context managers, customized with options, and reused with different callables.
threaded(max_workers=...)(func, ...)
processed(max_workers=...)(func, ...)
futured
classes have a waiting
context manager which collects results from tasks. Futures can be registered at creation, or appended to the list of tasks.
with threaded.waiting(*fs) as tasks:
tasks.append(future)
tasks # list of completed results
futured
classes provide a tasks
interface which generalizes futures.as_completed
and futures.wait
, while allowing the set of tasks to be modified, e.g., for retries.
threaded.tasks(fs, timeout=...) # mutable set of running tasks which iterate as completed
asynced
The same interface works for asyncio
.
from futured import asynced
import httpx
fetch = asynced(httpx.AsyncClient().get)
fetch(url) # return coroutine
asynced.results(fs) # generate results from futures
asynced.results(fs, timeout=...) # generate results as completed
fetch.map(urls) # generate results in order
fetch.map(urls, timeout=...) # generate results as completed
fetch.mapzip(urls) # generate (url, result) pairs as completed
asynced
provides utilities for calling coroutines from a synchronous context. waiting
is similar to trio's nursery, but returns results from a synchronous with
block.
asynced.run(async_func, ...) # call and run until complete
asynced.run(async_gen, ...) # call and run synchronous iterator
with asynced.waiting(*fs) as tasks: # concurrent coroutines completed in a block
asynced.tasks(fs, timeout=...) # mutable set of running tasks which iterate as completed
decorators
Naturally futured
wrappers can be used as decorators, but arguments can also be partially bound.
@threaded
def slow():
...
fetch = threaded(httpx.Client().get, url)
fetch(params=...)
Methods are supported, as well as a decorated
utility for automatically subclassing.
from futured import decorated
FutureClient = decorated(httpx.Client, request=threaded)
# equivalent to
class FutureClient(httpx.Client):
request = threaded(httpx.Client.request)
command
command
wraps subprocess.Popen
to provide a Future
compatible interface.
from futured import futured, command
command('ls').result() # return stdout or raises stderr
command('ls').pipe('wc') # pipes into next command, or | ('wc',... )
for line in command('ls'): # iterable lines
command.coroutine('ls') # return coroutine
futured(command, 'ls') # supports `map` interface
asynced(command.coroutine, 'ls') # supports `map` interface with timeout
forked
forked
allows iteration in separate child processes.
from futured import forked
for value in forked(values, max_workers=...):
# in a child process
# in parent after children have exited
Installation
% pip install futured
Tests
100% branch coverage.
% pytest [--cov]
Changes
1.3
- Python >=3.7 required
- Python 3.10 event loop changes
- Streams replaced with tasks
1.2
- Python >=3.6 required
1.1
- Stream completed futures from a pending pool
1.0
- Executed functions are context managers
starmap
supported
0.3
forked
has optional maximum number of workerswaiting
context managercommand
pipes (|
)distributed.Client
support
0.2
command.coroutine
creates asyncio subprocessesfutured.mapzip
generates results zipped with argumentsasynced.run
supports asynchronous iterators