Redis client for Python asyncio (PEP 3156)

Overview

Redis client for Python asyncio.

Build Status

Redis client for the PEP 3156 Python event loop.

This Redis library is a completely asynchronous, non-blocking client for a Redis server. It depends on asyncio (PEP 3156) and requires Python 3.6 or greater. If you're new to asyncio, it can be helpful to check out the asyncio documentation first.

Maintainers needed!

Right now, this library is working fine, but not actively maintained, due to lack of time and shift of priorities on my side (Jonathan). Most of my time doing open source goes to prompt_toolkt community.

I still merge pull request when they are fine, especially for bug/security fixes. But for a while now, we don't have new features. If you are already using it, then there's not really a need to worry, asyncio-redis will keep working fine, and we fix bugs, but it's not really evolving.

If anyone is interested to seriously take over development, please let me know. Also keep in mind that there is a competing library called aioredis, which does have a lot of activity.

See issue https://github.com/jonathanslenders/asyncio-redis/issues/134 to discuss.

Features

  • Works for the asyncio (PEP3156) event loop
  • No dependencies except asyncio
  • Connection pooling
  • Automatic conversion from unicode (Python) to bytes (inside Redis.)
  • Bytes and str protocols.
  • Completely tested
  • Blocking calls and transactions supported
  • Streaming of some multi bulk replies
  • Pubsub support

Trollius support: There is a fork by Ben Jolitz that has the necessary changes for using this asyncio-redis library with Trollius.

Installation

pip install asyncio_redis

Documentation

View documentation at read-the-docs

The connection class

A asyncio_redis.Connection instance will take care of the connection and will automatically reconnect, using a new transport when the connection drops. This connection class also acts as a proxy to a asyncio_redis.RedisProtocol instance; any Redis command of the protocol can be called directly at the connection.

import asyncio
import asyncio_redis


async def example():
    # Create Redis connection
    connection = await asyncio_redis.Connection.create(host='localhost', port=6379)

    # Set a key
    await connection.set('my_key', 'my_value')

    # When finished, close the connection.
    connection.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(example())

Connection pooling

Requests will automatically be distributed among all connections in a pool. If a connection is blocking because of --for instance-- a blocking rpop, another connection will be used for new commands.

import asyncio
import asyncio_redis


async def example():
    # Create Redis connection
    connection = await asyncio_redis.Pool.create(host='localhost', port=6379, poolsize=10)

    # Set a key
    await connection.set('my_key', 'my_value')

    # When finished, close the connection pool.
    connection.close()

Transactions example

import asyncio
import asyncio_redis


async def example(loop):
    # Create Redis connection
    connection = await asyncio_redis.Pool.create(host='localhost', port=6379, poolsize=10)

    # Create transaction
    transaction = await connection.multi()

    # Run commands in transaction (they return future objects)
    f1 = await transaction.set('key', 'value')
    f2 = await transaction.set('another_key', 'another_value')

    # Commit transaction
    await transaction.exec()

    # Retrieve results
    result1 = await f1
    result2 = await f2

    # When finished, close the connection pool.
    connection.close()

It's recommended to use a large enough poolsize. A connection will be occupied as long as there's a transaction running in there.

Pubsub example

import asyncio
import asyncio_redis

async def example():
    # Create connection
    connection = await asyncio_redis.Connection.create(host='localhost', port=6379)

    # Create subscriber.
    subscriber = await connection.start_subscribe()

    # Subscribe to channel.
    await subscriber.subscribe([ 'our-channel' ])

    # Inside a while loop, wait for incoming events.
    while True:
        reply = await subscriber.next_published()
        print('Received: ', repr(reply.value), 'on channel', reply.channel)

    # When finished, close the connection.
    connection.close()

LUA Scripting example

import asyncio
import asyncio_redis

code = \
"""
local value = redis.call('GET', KEYS[1])
value = tonumber(value)
return value * ARGV[1]
"""


async def example():
    connection = await asyncio_redis.Connection.create(host='localhost', port=6379)

    # Set a key
    await connection.set('my_key', '2')

    # Register script
    multiply = await connection.register_script(code)

    # Run script
    script_reply = await multiply.run(keys=['my_key'], args=['5'])
    result = await script_reply.return_value()
    print(result) # prints 2 * 5

    # When finished, close the connection.
    connection.close()

Example using the Protocol class

import asyncio
import asyncio_redis


async def example():
    loop = asyncio.get_event_loop()

    # Create Redis connection
    transport, protocol = await loop.create_connection(
                asyncio_redis.RedisProtocol, 'localhost', 6379)

    # Set a key
    await protocol.set('my_key', 'my_value')

    # Get a key
    result = await protocol.get('my_key')
    print(result)

    # Close transport when finished.
    transport.close()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(example())
Comments
  • Multibulk result read performance

    Multibulk result read performance

    Simple ZRANGE test for key which contains 10000 items.

    def sync():
        r = redis.Redis()
        result = r.zrange("key", 0, -1, withscores=True)
    
    print(timeit.timeit(sync, number=10))
    #1.33 sec
    
    @asyncio.coroutine
    def get_async():
        r = yield from asyncio_redis.Connection.create()
        f = yield from r.zrange("key", 0, -1)
        result = yield from f.asdict()
    
    def async():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(asyncio.async(get_async()))
        loop.close()
        asyncio.set_event_loop(None)
    
    print(timeit.timeit(async, number=10))
    #25 seconds! 
    

    It's because to receive one item-score pair we perform yield from future. Multibulk parser need to have less overhead since redis itself is very fast.

    opened by tumb1er 21
  • Using `start_subscribe`

    Using `start_subscribe`

    When using

    subscriber = yield from connection_pool.start_subscribe()
    

    From what I understand, the connection_pool is set to be in pub/sub mode and methods to get or set keys won't be allowed for that particular connection_pool since it can only do pub/sub stuff.

    What I don't understand however is if the subscriber is attached to only one connection or the whole bunch of connections in the connection_pool.

    Does anyone have a light?

    opened by tgy 18
  • TypeError: Got unexpected return type 'Future' in RedisProtocol.decr, expected <class 'int'>

    TypeError: Got unexpected return type 'Future' in RedisProtocol.decr, expected

    This has been happening with increasing frequency in production for us:

    TypeError: Got unexpected return type 'Future' in RedisProtocol.decr, expected <class 'int'>
      File "courier/handlers/socket.py", line 271, in handle_close
        connection_count = await self._decrement_open_connections()
      File "courier/handlers/socket.py", line 169, in _decrement_open_connections
        decr_result = await self.redis.decr(self._open_connections_key)
      File "asyncio_redis/protocol.py", line 659, in wrapper
        typecheck_return(protocol_self, result)
      File "asyncio_redis/protocol.py", line 517, in typecheck_return
        (type(result).__name__, self.method.__name__, expected_type))
    

    It doesn't happen every time, but it definitely happens enough to be a problem. It seems to be a bug in this library. @jonathanslenders any idea why the protocol is sometimes returning a Future and sometimes an int?

    Our stack is:

    • Python 3.5.2
    • tornado 4.4.1 (using the asyncio event loop)
    • asyncio_redis 0.14.2
    • Redis 3.2.3
    bug 
    opened by adamrothman 13
  • Cyclic reference issue

    Cyclic reference issue

    I've found that after running full gc collection (with 2nd generation) Connection objects not always were freeing. From Python 3 doc:

    Objects that have del() methods and are part of a reference cycle cause the entire reference cycle to be uncollectable, including objects not necessarily in the cycle but reachable only from it

    So I tried to break cyclic references using weak-ref.I might be wrong but del method should be removed fromConnection class.
    Any another thought about it?

    opened by msoedov 8
  • CancelledError in _get_answer stops all the magic

    CancelledError in _get_answer stops all the magic

    I use aiohttp + asyncio_redis. When client aborts http request, aiohttp raises CancelledError, and mostly it affects RedisProtocol._get_answer. Next http request just blocks forever. In RedisProtocol instance in self._queue cancelled Futures are accumulated, in my case one Future<CANCELLED> per request. Haven't written test for it yet, but is it always correct to pop one cancelled future from _queue on CancelledError?

    def _get_answer(...):
        try:
             result = yield from answer_f
        except CancelledError:
             self._pop_cancelled_future()
             raise
        ...
    
    opened by tumb1er 8
  • Use 'scan' function, lost some key

    Use 'scan' function, lost some key

    import asyncio
    import asyncio_redis
    from asyncio_redis.encoders import BytesEncoder
    from policy import _redis_config
    
    @asyncio.coroutine
    def callback(cors,envenloop):
        result = yield from asyncio.wait_for(cors,timeout=None,loop=envenloop)
        yield from asyncio.sleep(1)
    
    @asyncio.coroutine
    def enumkey(redis_conn,m):
        redis_cursor = yield from redis_conn.scan(match=m)
        keys = yield from redis_cursor.fetchall()
        i = 0
        for key in keys:
            print(i,":",key)
            i+=1
        return i
    @asyncio.coroutine
    def task(eventloop):
        redis_conn = None
        try:
            redis_conn = yield from asyncio_redis.Connection.create(auto_reconnect=False,encoder=BytesEncoder(),loop=eventloop,**_redis_config)
            print('-----------------------------------')
            i = yield from enumkey(redis_conn,b'p*')
            print('p* total %d'%i)
            print('-----------------------------------')
            i = yield from enumkey(redis_conn,b'pu:*')
            print('pu:* total %d'%i)
            print('-----------------------------------')
            i = yield from enumkey(redis_conn,b'pn:*')
            print('pn:* total %d'%i)
            print('-----------------------------------')
            i = yield from enumkey(redis_conn,b'k*')
            print('k* total %d'%i)
            print('-----------------------------------')
            i = yield from enumkey(redis_conn,b'sync*')
            print('sync* total %d'%i)
            print('-----------------------------------')
            i = yield from enumkey(redis_conn,b'*')
            print('* total %d'%i)
    
        except Exception as e:
            print(e)
        finally:
            if redis_conn:
                redis_conn.close()
    if __name__ == '__main__':
        eventloop = asyncio.get_event_loop()
        cors = task(eventloop)
        eventloop.run_until_complete(callback(cors,eventloop))
    

    There are output:

    p* total 70 pu:* total 0 pn:* total 64 k* total 1 k2* total 0 k3* total 0 sync* total 0

    • total 74

    But, when I use redis lib,there are output: p* total 70 pu:* total 3 pn:* total 64 k* total 3 k2* total 1 k3* total 1 sync* total 1

    • total 74
    bug 
    opened by AttentionZ 7
  • Add Python 3.8; drop Python <3.6

    Add Python 3.8; drop Python <3.6

    • Closes #133
    • Added support for Python 3.8
    • Dropped support for Python <3.6
    • Deprecated 'loop' parameter in Connection.create and Pool.create
    • Fixed all DeprecationWarnings
    • Fix missing object asyncio.streams.IncompleteReadError
    • Fixed all Sphinx build errors and warnings
    • Intersphinx: all classes that can be imported from the top-level module must now be referenced from the top-level module in the docstrings too. E.g. :class:`asyncio_redis.exceptions.Error` has changed to :class:`asyncio_redis.Error`.
    opened by crusaderky 6
  • Added missing @asyncio.coroutine

    Added missing @asyncio.coroutine

    Python 3.5 introduces the couple of keywords async/await.

    They are compatible with the "yield from" syntax and generator coroutines as long as they are decorated with @asyncio.coroutine. Some of them where missing, so code like:

    async def coro(redis_client):
        await redis_client.watch(key)
    

    failed with the error:

    TypeError: object generator can't be used in 'await' expression
    

    It may be a good idea to write a special test file for python3.5 to be sure everything is running well when using async/await.

    opened by Martiusweb 6
  • Add support for COUNT option in SCAN and its families

    Add support for COUNT option in SCAN and its families

    Redis' default is to fetch at most 10 items per each SCAN commands, but asyncio_redis currently does not provide any means to modify this number, which may be required for performance tuning. Redis' documentation says that it is okay to change the count number during cursor iteration, so it would be good to add an optional argument count to fetchone() and fetchall() methods in the Cursor class and pass it to the _scanfunc() via _fetch_more().

    enhancement 
    opened by achimnol 6
  • Issue 43 hiredis parser and stream read performance

    Issue 43 hiredis parser and stream read performance

    Added HiRedisProtocol which uses hiredis c-extension for response parsing. It passes all RedisProtocolTesttests and RedisBytesProtocolTest. Also it's improves StreamReader effectiveness by usingread(n) instead of readexactly(1) and readline(), which were adding a lot of unused asynchronous overhead.

    New parser and stream reading improvements reduces 10K items zset read time from 5.8 to 0.9 seconds with custom asdict() implementation (see in #43 comments) and from 25 to 19.8 seconds with native ZRangeReply.asdict()

    opened by tumb1er 6
  • strange bug with start_subscribe

    strange bug with start_subscribe

    Hello! Just got some strange bug with PUB/SUB code: In coroutine I have:

    pubsub = yield from redis.start_subscribe()
    yield from pubsub.subscribe([channel])
    

    In my case all stops after yielding from redis.start_subscribe() - reactor just don't return pubsub object. But! It only happens on second script launch (after __pycache__ dir filled with compiled code). And if I remove this directory, all works fine again until next launch.

    As a workaround, I replaced first yield with:

    try:
        redis.start_subscribe().__next__()
    except StopIteration as e:
        pubsub = e.value
    

    and all works without magic.

    So, my proposal is to remove unnecessary @asyncio.coroutine decorator from start_subscribe and propose to use it like a synchronous method.

    Sergey.

    bug 
    opened by tumb1er 6
  • Async Class Constructors

    Async Class Constructors

    Ive noticed that this library uses classmethods to mimic an async init magic method, but by using the new magic method in a clever way we can do it directly in the constructor.

    Also, this makes it so you can't have a un-initialized class by initializing the class without using the async classmethod.

    Here is an example:

    class AsyncNeededToInit:
        def __new__(cls, foo):
            async def init():
                self = super(cls, cls).__new__(cls)
                self.foo = foo
                self.bar = await async_func()
    
                return self
    
            return init()
    

    Which can be used by:

    baz = await AsyncNeededToInit(myfoo)
    

    Could this be added? I could make a PR too.

    opened by RGBCube 4
  • Proposing a PR to fix a few small typos

    Proposing a PR to fix a few small typos

    Issue Type

    [x] Bug (Typo)

    Steps to Replicate and Expected Behaviour

    • Examine asyncio_redis/protocol.py and observe recieve, however expect to see receive.
    • Examine docs/pages/examples.rst and observe commited, however expect to see committed.

    Notes

    Semi-automated issue generated by https://github.com/timgates42/meticulous/blob/master/docs/NOTE.md

    To avoid wasting CI processing resources a branch with the fix has been prepared but a pull request has not yet been created. A pull request fixing the issue can be prepared from the link below, feel free to create it or request @timgates42 create the PR. Alternatively if the fix is undesired please close the issue with a small comment about the reasoning.

    https://github.com/timgates42/asyncio-redis/pull/new/bugfix_typos

    Thanks.

    opened by timgates42 0
  • Adding python types

    Adding python types

    Thanks for the project, it's been super useful!

    I was wondering if you'd be open to adding types to the repo to support https://www.python.org/dev/peps/pep-0561/#packaging-type-information

    opened by sbdchd 1
  • asyncio_redis should wait for server restart

    asyncio_redis should wait for server restart

    Use case

    An asyncio_redis connection with auto_reconnect=True is already connected to the Redis server. The Redis server unexpectedly and randomly crashes and is resurrected shortly thereafter at the same host and port by a nanny, e.g. Kubernetes.

    Current asyncio_redis behaviour

    Application-level requests to the server fail until the connection is reestablished.

    Expected behaviour

    Application-level requests are silently enqueued and wait until the connection is reestablished. If a connection timeout were to be implemented in create(), the reconnection attempt should use the same timeout.

    POC

    Tested on Ubuntu Linux x64, asyncio_redis 0.16.0, redis 5.0.3

    import asyncio
    import subprocess
    
    import asyncio_redis
    
    
    async def main():
        server = subprocess.Popen(["redis-server"])
        try:
            client = await asyncio_redis.Pool.create()
            await client.ping()
            server.kill()
            server = subprocess.Popen(["redis-server"])
            await asyncio.sleep(1)
            await client.ping()
        finally:
            server.kill()
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    The above runs with no error. After commenting out the sleep(1): asyncio_redis.exceptions.ConnectionLostError: None

    opened by crusaderky 0
  • intersphinx looks for implementation modules

    intersphinx looks for implementation modules

    This is a widespread issue caused by the pattern of defining objects in private module and then exposing them to the final user by importing them in the top-level __init__.py, vs. how intersphinx works.

    Exact same issue in different projects:

    • https://github.com/aio-libs/aiohttp/issues/3714
    • https://jira.mongodb.org/browse/MOTOR-338
    • https://github.com/tkem/cachetools/issues/178
    • https://github.com/pydata/xarray/issues/4279
    • https://github.com/AmphoraInc/xarray_mongodb/pull/22

    If a project

    1. uses asyncio_redis, intersphinx, and autodoc
    2. subclasses any of the classes exposed by asyncio_redis/__init__.py and documents the new class with the :show-inheritance: flag
    3. Starting from Sphinx 3, has any of the above classes anywhere in a type annotation

    Then Sphinx emits a warning and fails to create a hyperlink, because intersphinx uses the __module__ attribute to look up the object in objects.inv, but __module__ points to the implementation module while objects.inv points to the top-level module.

    Workaround

    In conf.py:

    import asyncio_redis
    asyncio_redis.Pool.__module__ = "asyncio_redis"
    

    Solution

    Put the above hack in asyncio_redis/__init__.py

    opened by crusaderky 0
Owner
Jonathan Slenders
Author of prompt_toolkit.
Jonathan Slenders
Python cluster client for the official redis cluster. Redis 3.0+.

redis-py-cluster This client provides a client for redis cluster that was added in redis 3.0. This project is a port of redis-rb-cluster by antirez, w

Grokzen 1.1k Jan 5, 2023
An asyncio compatible Redis driver, written purely in Python. This is really just a pet-project for me.

asyncredis An asyncio compatible Redis driver. Just a pet-project. Information asyncredis is, like I've said above, just a pet-project for me. I reall

Vish M 1 Dec 25, 2021
Redis Python Client

redis-py The Python interface to the Redis key-value store. Python 2 Compatibility Note redis-py 3.5.x will be the last version of redis-py that suppo

Andy McCurdy 11k Dec 29, 2022
A fast PostgreSQL Database Client Library for Python/asyncio.

asyncpg -- A fast PostgreSQL Database Client Library for Python/asyncio asyncpg is a database interface library designed specifically for PostgreSQL a

magicstack 5.8k Dec 31, 2022
CouchDB client built on top of aiohttp (asyncio)

aiocouchdb source: https://github.com/aio-libs/aiocouchdb documentation: http://aiocouchdb.readthedocs.org/en/latest/ license: BSD CouchDB client buil

aio-libs 53 Apr 5, 2022
A tiny python web application based on Flask to set, get, expire, delete keys of Redis database easily with direct link at the browser.

First Redis Python (CRUD) A tiny python web application based on Flask to set, get, expire, delete keys of Redis database easily with direct link at t

Max Base 9 Dec 24, 2022
Motor - the async Python driver for MongoDB and Tornado or asyncio

Motor Info: Motor is a full-featured, non-blocking MongoDB driver for Python Tornado and asyncio applications. Documentation: Available at motor.readt

mongodb 2.1k Dec 26, 2022
Motor - the async Python driver for MongoDB and Tornado or asyncio

Motor Info: Motor is a full-featured, non-blocking MongoDB driver for Python Tornado and asyncio applications. Documentation: Available at motor.readt

mongodb 1.6k Feb 6, 2021
GINO Is Not ORM - a Python asyncio ORM on SQLAlchemy core.

GINO - GINO Is Not ORM - is a lightweight asynchronous ORM built on top of SQLAlchemy core for Python asyncio. GINO 1.0 supports only PostgreSQL with

GINO Community 2.5k Dec 27, 2022
Familiar asyncio ORM for python, built with relations in mind

Tortoise ORM Introduction Tortoise ORM is an easy-to-use asyncio ORM (Object Relational Mapper) inspired by Django. Tortoise ORM was build with relati

Tortoise 3.3k Dec 31, 2022
aiopg is a library for accessing a PostgreSQL database from the asyncio

aiopg aiopg is a library for accessing a PostgreSQL database from the asyncio (PEP-3156/tulip) framework. It wraps asynchronous features of the Psycop

aio-libs 1.3k Jan 3, 2023
aiomysql is a library for accessing a MySQL database from the asyncio

aiomysql aiomysql is a "driver" for accessing a MySQL database from the asyncio (PEP-3156/tulip) framework. It depends on and reuses most parts of PyM

aio-libs 1.5k Jan 3, 2023
aioodbc - is a library for accessing a ODBC databases from the asyncio

aioodbc aioodbc is a Python 3.5+ module that makes it possible to access ODBC databases with asyncio. It relies on the awesome pyodbc library and pres

aio-libs 253 Dec 31, 2022
asyncio compatible driver for elasticsearch

asyncio client library for elasticsearch aioes is a asyncio compatible library for working with Elasticsearch The project is abandoned aioes is not su

null 97 Sep 5, 2022
Asynchronous interface for peewee ORM powered by asyncio

peewee-async Asynchronous interface for peewee ORM powered by asyncio. Important notes Since version 0.6.0a only peewee 3.5+ is supported If you still

05Bit 666 Dec 30, 2022
Pure Python MySQL Client

PyMySQL Table of Contents Requirements Installation Documentation Example Resources License This package contains a pure-Python MySQL client library,

PyMySQL 7.2k Jan 9, 2023
Python client for Apache Kafka

Kafka Python client Python client for the Apache Kafka distributed stream processing system. kafka-python is designed to function much like the offici

Dana Powers 5.1k Jan 8, 2023
Asynchronous Python client for InfluxDB

aioinflux Asynchronous Python client for InfluxDB. Built on top of aiohttp and asyncio. Aioinflux is an alternative to the official InfluxDB Python cl

Gustavo Bezerra 159 Dec 27, 2022
Google Cloud Client Library for Python

Google Cloud Python Client Python idiomatic clients for Google Cloud Platform services. Stability levels The development status classifier on PyPI ind

Google APIs 4.1k Jan 1, 2023