Module realm_api.auth.depends
FastAPI resources for use in Depends()
Functions
async def require_login(user_id: str = Depends(APIKeyHeader),
realm_token: str = Depends(APIKeyHeader),
db: sqlalchemy.ext.asyncio.session.AsyncSession = Depends(get_session))-
Expand source code
async def require_login( user_id: str = Depends(APIKeyHeader(name="X-Realm-User")), realm_token: str = Depends(APIKeyHeader(name="X-Realm-Token")), db: AsyncSession = Depends(get_session), ): user_session = ( await db.execute( select(UserSession).where( UserSession.user_id == user_id, UserSession.realm_token == realm_token, ) ) ).scalar_one_or_none() if not user_session: raise HTTPException(status.HTTP_401_UNAUTHORIZED) return user_session