Skip to content

Aioredis

WARNING

📢🚨 Aioredis는 이제 redis-py 4.2.0rc1+에 있습니다 🚨🚨

asyncio (PEP 3156) Redis support.

Simple Example

import asyncio
import aioredis

async def main():
    pool = await aioredis.create_pool("redis://localhost", minsize=5, maxsize=10)
    with await pool as conn:  # low-level redis connection
        await conn.execute("set", "my-key", "value")
        val = await conn.execute("get", "my-key")
    print("raw value:", val)
    pool.close()
    await pool.wait_closed()  # closing all open connections

if __name__ == "__main__":
    asyncio.run(main())

Version 2.x simple example

import asyncio
import aioredis


async def main():
    redis = aioredis.from_url("redis://localhost")
    await redis.set("my-key", "value")
    value = await redis.get("my-key")
    print(value)


if __name__ == "__main__":
    asyncio.run(main())

Cache Store

Version 1.x

1.x 버전 까지 사용할 수 있는 방법이다:

# -*- coding: utf-8 -*-

from typing import Optional
from aioredis import create_pool, ConnectionsPool, RedisConnection
from aioredis.abc import AbcPool
from recc.cache.async_cs_interface import AsyncCacheStoreInterface
from recc.variables.cache import (
    DEFAULT_CONNECTION_POOL_MIN_SIZE,
    DEFAULT_CONNECTION_POOL_MAX_SIZE,
)

EX_KEY_MINSIZE = "minsize"
EX_KEY_MAXSIZE = "maxsize"


class AsyncRedisCacheStore(AsyncCacheStoreInterface):
    """
    Redis cache store class.
    """

    def __init__(
        self,
        cs_host: str,
        cs_port: int,
        cs_pw: Optional[str] = None,
        **kwargs,
    ):
        self._cs_host = cs_host
        self._cs_port = cs_port
        self._cs_pw = cs_pw
        self._kwargs = kwargs
        self._pool: Optional[ConnectionsPool] = None

    def _get_minsize(self, default_value=DEFAULT_CONNECTION_POOL_MIN_SIZE) -> int:
        try:
            return int(self._kwargs.get(EX_KEY_MINSIZE, default_value))
        except ValueError:
            return default_value

    def _get_maxsize(self, default_value=DEFAULT_CONNECTION_POOL_MAX_SIZE) -> int:
        try:
            return int(self._kwargs.get(EX_KEY_MAXSIZE, default_value))
        except ValueError:
            return default_value

    def is_open(self) -> bool:
        return self._pool is not None

    async def open(self) -> None:
        self._pool = await create_pool(
            address=f"redis://{self._cs_host}:{self._cs_port}",
            password=self._cs_pw if self._cs_pw else None,
            minsize=self._get_minsize(),
            maxsize=self._get_maxsize(),
        )

    async def close(self) -> None:
        assert self._pool
        self._pool.close()
        await self._pool.wait_closed()
        self._pool = None

    @property
    def pool(self) -> ConnectionsPool:
        assert self._pool
        return self._pool

    class _Connection:
        """
        Implementation for Type Hinting.
        """

        __slots__ = ("_pool", "_conn")

        def __init__(self, pool: AbcPool):
            self._pool = pool
            self._conn: Optional[RedisConnection] = None

        async def __aenter__(self) -> RedisConnection:
            conn = await self._pool.acquire()
            self._conn = conn
            return self._conn

        async def __aexit__(self, exc_type, exc_value, tb):
            try:
                self._pool.release(self._conn)
            finally:
                self._pool = None
                self._conn = None

    def _conn(self) -> _Connection:
        return self._Connection(self.pool)

    async def set(self, key: str, val: bytes) -> None:
        async with self._conn() as conn:
            await conn.execute(b"SET", key, val)

    async def get(self, key: str) -> bytes:
        async with self._conn() as conn:
            return await conn.execute(b"GET", key)

    async def delete(self, key: str) -> None:
        async with self._conn() as conn:
            await conn.execute(b"DEL", key)

    async def exists(self, key: str) -> bool:
        async with self._conn() as conn:
            return bool(await conn.execute(b"EXISTS", key))

Version 2.x

# -*- coding: utf-8 -*-

from typing import Optional
from aioredis import Redis, ConnectionPool
from recc.cache.async_cs_interface import AsyncCacheStoreInterface
from recc.variables.cache import DEFAULT_MAX_CONNECTIONS

EX_KEY_MAX_CONNECTIONS = "max_connections"


class AsyncRedisCacheStore(AsyncCacheStoreInterface):
    """
    Redis cache store class.
    """

    def __init__(
        self,
        cs_host: str,
        cs_port: int,
        cs_pw: Optional[str] = None,
        **kwargs,
    ):
        self._cs_host = cs_host
        self._cs_port = cs_port
        self._cs_pw = cs_pw
        self._kwargs = kwargs
        self._redis: Optional[Redis] = None

    def _get_max_connections(self, default_value=DEFAULT_MAX_CONNECTIONS) -> int:
        try:
            return int(self._kwargs.get(EX_KEY_MAX_CONNECTIONS, default_value))
        except ValueError:
            return default_value

    def is_open(self) -> bool:
        return self._redis is not None

    async def open(self) -> None:
        pool = ConnectionPool.from_url(
            f"redis://{self._cs_host}:{self._cs_port}",
            password=self._cs_pw if self._cs_pw else None,
            max_connections=self._get_max_connections(),
        )
        self._redis = Redis(connection_pool=pool)

    async def close(self) -> None:
        assert self._redis
        await self._redis.close()
        self._redis = None

    async def set(self, key: str, val: bytes) -> None:
        await self._redis.execute_command("SET", key, val)

    async def get(self, key: str) -> bytes:
        result = await self._redis.execute_command("GET", key)
        assert isinstance(result, bytes)
        return result

    async def delete(self, key: str) -> None:
        await self._redis.execute_command("DEL", key)

    async def exists(self, key: str) -> bool:
        result = await self._redis.execute_command("EXISTS", key)
        assert isinstance(result, int)
        return bool(result)

Connecting to password-protected Redis instance

The password can be specified either in keyword argument or in address URI:

redis = await aioredis.create_redis_pool(
    'redis://localhost', password='sEcRet')

redis = await aioredis.create_redis_pool(
    'redis://:sEcRet@localhost/')

redis = await aioredis.create_redis_pool(
    'redis://localhost/?password=sEcRet')

See also

Favorite site