I think there are some places where zarr would benefit immensely from some async capabilities when reading and writing data. I will try to illustrate this with the simplest example I can.
Let's consider a zarr array stored in a public S3 bucket, which we can read with fsspec's HTTPFileSystem
interface (no special S3 API needed, just regular http calls).
import fsspec
url_base = 'https://mur-sst.s3.us-west-2.amazonaws.com/zarr/time'
mapper = fsspec.get_mapper(url_base)
za = zarr.open(mapper)
za.info
Note that this is a highly sub-optimal choice of chunks. The 1D array of shape (6443,) is stored in chunks of only (5,) items, resulting in over 1000 tiny chunks. Reading this data takes forever, over 5 minutes
%prun tdata = za[:]
20312192 function calls (20310903 primitive calls) in 342.624 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
1289 139.941 0.109 140.077 0.109 {built-in method _openssl.SSL_do_handshake}
2578 99.914 0.039 99.914 0.039 {built-in method _openssl.SSL_read}
1289 68.375 0.053 68.375 0.053 {method 'connect' of '_socket.socket' objects}
1289 9.252 0.007 9.252 0.007 {built-in method _openssl.SSL_CTX_load_verify_locations}
1289 7.857 0.006 7.868 0.006 {built-in method _socket.getaddrinfo}
1289 1.619 0.001 1.828 0.001 connectionpool.py:455(close)
930658 0.980 0.000 2.103 0.000 os.py:674(__getitem__)
...
I believe fsspec is introducing some major overhead by not reusing a connectionpool. But regardless, zarr is iterating synchronously over each chunk to load the data:
https://github.com/zarr-developers/zarr-python/blob/994f2449b84be544c9dfac3e23a15be3f5478b71/zarr/core.py#L1023-L1028
As a lower bound on how fast this approach could be, we bypass zarr and fsspec and just fetch all the chunks with requests:
import requests
s = requests.Session()
def get_chunk_http(n):
r = s.get(url_base + f'/{n}')
r.raise_for_status()
return r.content
%prun all_data = [get_chunk_http(n) for n in range(za.shape[0] // za.chunks[0])]
12550435 function calls (12549147 primitive calls) in 98.508 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
2576 87.798 0.034 87.798 0.034 {built-in method _openssl.SSL_read}
13 1.436 0.110 1.437 0.111 {built-in method _openssl.SSL_do_handshake}
929936 1.042 0.000 2.224 0.000 os.py:674(__getitem__)
As expected, reusing a connection pool sped things up, but it still takes 100 s to read the array.
Finally, we can try the same thing with asyncio
import asyncio
import aiohttp
import time
async def get_chunk_http_async(n, session):
url = url_base + f'/{n}'
async with session.get(url) as r:
r.raise_for_status()
data = await r.read()
return data
async with aiohttp.ClientSession() as session:
tic = time.time()
all_data = await asyncio.gather(*[get_chunk_http_async(n, session)
for n in range(za.shape[0] // za.chunks[0])])
print(f"{time.time() - tic} seconds")
# > 1.7969944477081299 seconds
This is a MAJOR speedup!
I am aware that using dask could possibly help me here. But I don't have big data here, and I don't want to use dask. I want zarr to support asyncio natively.
I am quite new to async programming and have no idea how hard / complicated it would be to do this. But based on this experiment, I am quite sure there are major performance benefits to be had, particularly when using zarr with remote storage protocols.
Thoughts?
cc @cgentemann