Module realm_api.db
Database functionality and Aethersprite extension
Functions
async def get_session()
-
Expand source code
async def get_session(): """Get an `AsyncSession` object.""" async with AsyncSession(async_engine) as session: yield session
Get an
AsyncSession
object. def setup_webapp(app: fastapi.applications.FastAPI, *_)
-
Expand source code
def setup_webapp(app: FastAPI, *_): app.add_middleware(StartupMiddleware)
Classes
class StartupMiddleware (app)
-
Expand source code
class StartupMiddleware: """Startup middleware for initializing the database""" initialized: aio.Event = aio.Event() def __init__(self, app): self.app = app @classmethod async def init_db(cls): """Initializes the database.""" from .models.user_session import UserSession # noqa: F401 log.info("Initializing database") async with async_engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) async def __call__(self, scope, receive, send): if not StartupMiddleware.initialized.is_set(): await StartupMiddleware.init_db() StartupMiddleware.initialized.set() await self.app(scope, receive, send)
Startup middleware for initializing the database
Class variables
var initialized : asyncio.locks.Event
Static methods
async def init_db()
-
Initializes the database.