Module realm_bot.rpc

RPC functionality and Aethersprite extension

Functions

async def handler(message: dict)
Expand source code
async def handler(message: dict):
    """Handle an incoming RPC operation and publish the result."""

    data: dict = json.loads(message["data"])
    log.info(f"Incoming RPC op: {data['uuid']} {data['op']}")
    result: BaseModel = await getattr(RPCHandlers, data["op"])(
        *data.get("args", []),
        **data.get("kwargs", dict()),
    )
    response = result.model_dump_json()
    await redis_conn.publish(data["uuid"], response)

Handle an incoming RPC operation and publish the result.

async def rpc_api(op: str, *args, timeout=3, **kwargs)
Expand source code
async def rpc_api(op: str, *args, timeout=3, **kwargs):
    """Perform an outgoing API RPC operation and await the response."""

    q = aio.Queue()

    async def handler(message: dict):
        data = message["data"]
        await q.put(data)

    uuid = str(uuid4())
    await pubsub.subscribe(**{uuid: handler})
    message = {
        "uuid": uuid,
        "op": op,
        "args": args,
        "kwargs": kwargs,
    }
    log.info(f"Outgoing RPC op: {message['uuid']} {message['op']}")

    try:
        await redis_conn.publish("rpc.api", json.dumps(message))
        return await aio.wait_for(q.get(), timeout)

    finally:
        await pubsub.unsubscribe(uuid)

Perform an outgoing API RPC operation and await the response.

async def setup(bot_: discord.ext.commands.bot.Bot)
Expand source code
async def setup(bot_: Bot):
    global bot, task
    bot = bot_
    await pubsub.subscribe(**{"rpc.bot": handler})
    task = aio.create_task(pubsub.run())
async def teardown(*_)
Expand source code
async def teardown(*_):
    await pubsub.unsubscribe()
    await pubsub.close()
    task.cancel()

Classes

class RPCHandlers
Expand source code
class RPCHandlers:
    """
    Incoming RPC operation handler functions

    Since there are going to be a small number of operations supported by the
    bot, the handlers can be collected here as static methods instead of
    breaking each out into its own module.
    """

    @staticmethod
    async def guilds(*_, **__):
        """Get list of guilds the bot is joined to."""

        return BotGuildsResponse(
            guild_ids=set([str(guild.id) for guild in bot.guilds]),
        )

Incoming RPC operation handler functions

Since there are going to be a small number of operations supported by the bot, the handlers can be collected here as static methods instead of breaking each out into its own module.

Static methods

async def guilds(*_, **__)
Expand source code
@staticmethod
async def guilds(*_, **__):
    """Get list of guilds the bot is joined to."""

    return BotGuildsResponse(
        guild_ids=set([str(guild.id) for guild in bot.guilds]),
    )

Get list of guilds the bot is joined to.