Module realm_api.rpc

RPC module

Sub-modules

realm_api.rpc.roll

Dice roller RPC module

Functions

async def handler(message: dict)
Expand source code
async def handler(message: dict):
    """Handle an incoming RPC operation and publish the result."""

    data: dict = json.loads(message["data"])
    logger.info(f"Incoming RPC op: {data['uuid']} {data['op']}")
    result: BaseModel = await handlers[data["op"]](
        *data.get("args", []),
        **data.get("kwargs", dict()),
    )
    response = result.model_dump_json()
    await redis_conn.publish(data["uuid"], response)

Handle an incoming RPC operation and publish the result.

async def init_pubsub() ‑> _asyncio.Task
Expand source code
async def init_pubsub() -> aio.Task:
    await pubsub.subscribe(**{"rpc.api": handler})
    return aio.create_task(pubsub.run())
async def rpc_bot(op: str, *args, timeout=3, **kwargs)
Expand source code
async def rpc_bot(op: str, *args, timeout=3, **kwargs):
    """Perform an outgoing bot RPC operation and await the response."""

    q = aio.Queue()

    async def handler(message: dict):
        data = message["data"]
        await q.put(data)

    uuid = str(uuid4())
    await pubsub.subscribe(**{uuid: handler})
    message = {
        "uuid": uuid,
        "op": op,
        "args": args,
        "kwargs": kwargs,
    }
    logger.info(f"Outgoing RPC op: {message['uuid']} {message['op']}")

    try:
        await redis_conn.publish("rpc.bot", json.dumps(message))
        return await aio.wait_for(q.get(), timeout)

    finally:
        await pubsub.unsubscribe(uuid)

Perform an outgoing bot RPC operation and await the response.

async def shutdown_pubsub(task: _asyncio.Task)
Expand source code
async def shutdown_pubsub(task: aio.Task):
    await pubsub.unsubscribe()
    await pubsub.close()
    task.cancel()