Module realm_api.rpc
RPC module
Sub-modules
realm_api.rpc.roll
-
Dice roller RPC module
Functions
async def handler(message: dict)
-
Expand source code
async def handler(message: dict): """Handle an incoming RPC operation and publish the result.""" data: dict = json.loads(message["data"]) logger.info(f"Incoming RPC op: {data['uuid']} {data['op']}") result: BaseModel = await handlers[data["op"]]( *data.get("args", []), **data.get("kwargs", dict()), ) response = result.model_dump_json() await redis_conn.publish(data["uuid"], response)
Handle an incoming RPC operation and publish the result.
async def init_pubsub() ‑> _asyncio.Task
-
Expand source code
async def init_pubsub() -> aio.Task: await pubsub.subscribe(**{"rpc.api": handler}) return aio.create_task(pubsub.run())
async def rpc_bot(op: str, *args, timeout=3, **kwargs)
-
Expand source code
async def rpc_bot(op: str, *args, timeout=3, **kwargs): """Perform an outgoing bot RPC operation and await the response.""" q = aio.Queue() async def handler(message: dict): data = message["data"] await q.put(data) uuid = str(uuid4()) await pubsub.subscribe(**{uuid: handler}) message = { "uuid": uuid, "op": op, "args": args, "kwargs": kwargs, } logger.info(f"Outgoing RPC op: {message['uuid']} {message['op']}") try: await redis_conn.publish("rpc.bot", json.dumps(message)) return await aio.wait_for(q.get(), timeout) finally: await pubsub.unsubscribe(uuid)
Perform an outgoing bot RPC operation and await the response.
async def shutdown_pubsub(task: _asyncio.Task)
-
Expand source code
async def shutdown_pubsub(task: aio.Task): await pubsub.unsubscribe() await pubsub.close() task.cancel()