Thank you for the great work done in Beanie, it simplified my life significantly.
Describe the bug
This is a DB Client I wrote:
class MongoClient:
def __init__(self):
client_options = {'appname': self.__class__.__name__}
self.client = AsyncIOMotorClient(MONGO_URL, **client_options)
self.client.get_io_loop = asyncio.get_running_loop
self.is_testing = TESTING == Environments.Testing.value
@property
def db_name(self) -> str:
db_postfix = ENV
return f'{svc}-{db_postfix}'
@abstractmethod
async def initialize(self):
raise NotImplementedError('MongoClient.initialize() must be implemented')
@asynccontextmanager
async def transaction(self) -> AsyncIOMotorClientSession:
async with await self.client.start_session() as session:
async with session.start_transaction():
yield session
The initialize()
method is implemented like below:
class MongoClientNumberOne(MongoClient):
async def initialize(self):
collections = [
Service,
...
]
await init_beanie(database=self.client[self.db_name], document_models=collections)
The way I use the Mongo client is to declare it as a dependency in FastAPI routes like below:
class MongoClientDependency:
def __init__(self, db_type: Type[T]):
if db_type == MongoClientNumberOne:
self.client = MongoClientNumberOne()
elif db_type == MongoClientNumberTwo:
self.client = MongoClientNumberTwo()
else:
raise ValueError("Invalid DB Type given")
async def __call__(self, request: Request) -> MongoClientNumberOne | MongoClientNumberTwo:
await self.client.initialize()
return self.client
class Context:
def __init__(
self,
access_token: str | None = None,
permissions: List[Permissions] | None = None,
mongo_client: MongoClientNumberOne | None = None,
mongo_session: AsyncIOMotorClientSession | None = None,
):
self.access_token: str = access_token
self.permissions: List[Permissions] = permissions
self.mongo_client: AsyncIOMotorClient = mongo_client
self.mongo_session: AsyncIOMotorClientSession = mongo_session
if self.mongo_client:
self.db_name: str = self.mongo_client.db_name
self.current_user: User | None = None
self.current_service: Service | None = None
@classmethod
@asynccontextmanager
async def plain(
cls, mongo_session: AsyncIOMotorClientSession | None = None, mongo_client: MongoClientNumberOne | None = None
) -> Context:
yield cls(mongo_session=mongo_session, mongo_client=mongo_client)
@router.get(
'/blog/posts',
summary='Get latest blog posts',
tags=['Blog'],
response_model=List[BlogPost],
responses={503: {'model': ErrorResponse}},
)
async def get_latest_blog_posts(mongo_client: MongoClientNumberOne = Depends(MongoClientDependency(MongoClientNumberOne))):
async with mongo_client.transaction():
async with Context.plain():
return await Blog.get_latest_posts()
The problem that I see in Sentry is that at times, read/write queries encountered the motor_collection
in ItemSettings
is None
. A screenshot for this below:
Unfortunately this happens randomly and intermittent. Usually when there's no traffic and new traffic comes, these errors popped up.
To Reproduce
At the start, we almost succeeded in replicating this by creating this test, not so many requests but very high concurrency. Before the codes I wrote above, we have an async context manager that was buggy which is now fixed and the test below passed.
@pytest.mark.asyncio
async def test_concurrency(client: TestClient, individual_user: (RegisterRequest, str, str)):
token = individual_user[2]
async def get_profile():
headers = {'Authorization': f'Bearer {token}'}
response = await asyncify(client.get)(headers=headers, url='/me')
assert response.status_code == 200
async def get_wishlist():
headers = {'Authorization': f'Bearer {token}'}
response = await asyncify(client.get)(headers=headers, url='/wishlist')
assert response.status_code == 200
async def get_txn():
headers = {'Authorization': f'Bearer {token}'}
url = '/transactions?limit=10&skip=0&statuses=submitted,accepted,waitlisted,pending-signature,pending-funding,funds-received,in-execution,completed,unsuccessful'
response = await asyncify(client.get)(headers=headers, url=url)
assert response.status_code == 200
for i in range(10):
print(f'Iteration {i} - Enter')
await asyncio.gather(get_profile(), get_wishlist(), get_txn(), get_profile(), get_wishlist(), get_txn())
print(f'Iteration {i} - Exit')
The bug is still happening though, motor_collection
sometimes still returns None
.
Expected behavior
self.document_model.get_motor_collection()
should return the collection.
Additional context
I'm happy to elaborate more with code examples.