Introduction
Asynchronous cache manager designed for horizontally scaled web applications. NOTE: Currently has implementation only for FastAPI using Redis.
Requirements
Python 3.7+
Installation
$ pip install atomcache
---> 100%
Explanation schema
As UML
@startuml
!theme materia
participant Redis
participant Instance_A as A
participant Instance_B as B
participant Instance_N as C
B <-> Redis: GET: Cache=null & GET: Lock=null
B <-> Redis: SET: Lock = true
activate B #Red
A <--> Redis: GET: Cache=null & GET: Lock=true
activate A #Transparent
C <--> Redis: GET: Cache=null & GET: Lock=true
activate C #Transparent
B <--> B: Do the computation
B -> Redis: SET: Cache={...}
deactivate B
group Notify Cache SET
Redis -> C
Redis -> A
end
group GET Cache
Redis <-> C
deactivate C
Redis <-> A
deactivate A
end
@enduml
Examples:
Usage as FastAPI Dependency
- Create a file
events.py
with:
from typing import Optional, Callable
import aioredis
from fastapi import FastAPI, Depends
from atomcache import Cache
def create_start_app_handler(app: FastAPI) -> Callable:
async def start_app() -> None:
redis: aioredis.Redis = await aioredis.from_url(url="redis://localhost", encoding="utf-8")
await Cache.init(app, redis)
return start_app
def create_stop_app_handler(app: FastAPI) -> Callable:
async def stop_app() -> None:
await Cache.backend.close()
return stop_app
- Create a file
main.py
with:
from typing import Optional
from fastapi import FastAPI, Depends
from atomcache import Cache
from .events import create_start_app_handler, create_stop_app_handler
app = FastAPI()
app.add_event_handler("startup", create_start_app_handler(app))
app.add_event_handler("shutdown", create_stop_app_handler(app))
@router.get("/resources", response_model=List[TheResponseModel], name="main:test-example")
async def resources(offset: int = 0, items: int = 10, cache: Cache = Depends(Cache(exp=600)):
cache_id = f"{offset}-{items}" # Build cache identifier
await cache.raise_try(cache_id) # Try to respond from cache
response = await db.find(TheResponseModel, skip=offset, limit=items)
await asyncio.sleep(10) # Do some heavy work for 10 sec, see `lock_timeout`
return cache.set(response, cache_id=cache_id)
Direct cache usage for avoiding repetitive calling on external resources:
from aiohttp import ClientSession
from atomcache import Cache
cache = Cache(exp=1200, namespace="my-namespace:")
async def requesting_helper(ref: str) -> List[dict]:
cached_value = await cache.get(cache_id=ref)
if cached_value is not None:
return cached_value
async with ClientSession() as session:
async with session.get(f"https://external-api.io/{ref}") as response:
if response.ok:
cached_value = response.json()
return cache.set(cached_value, cache_id=ref)
return []