Module realm_bot.rpc

RPC functionality and Aethersprite extension

Functions

def handler(message: dict)
Expand source code
def handler(message: dict):
    """Handle an incoming RPC operation and publish the result."""

    data: dict = json.loads(message["data"])
    log.info(f"RPC op: {data['op']}")

    async def f():
        result: BaseModel = await getattr(RPCHandlers, data["op"])(
            *data.get("args", []),
            **data.get("kwargs", dict()),
        )
        redis_conn.publish(
            data["uuid"], result.model_dump_json() if result else ""
        )

    aio.run_coroutine_threadsafe(f(), bot.loop)

Handle an incoming RPC operation and publish the result.

async def rpc_api(op: str, *args, timeout=3, **kwargs)
Expand source code
async def rpc_api(op: str, *args, timeout=3, **kwargs):
    """Perform an outgoing API RPC operation and await the response."""

    q = aio.Queue()

    def handler(message: dict):
        data = message["data"]
        aio.run_coroutine_threadsafe(q.put(data), bot.loop)

    uuid = str(uuid4())
    pubsub.subscribe(**{uuid: handler})

    try:
        redis_conn.publish(
            "rpc.api",
            json.dumps(
                {
                    "uuid": uuid,
                    "op": op,
                    "args": args,
                    "kwargs": kwargs,
                }
            ),
        )

        return await aio.wait_for(q.get(), timeout)

    finally:
        pubsub.unsubscribe(uuid)

Perform an outgoing API RPC operation and await the response.

async def setup(bot_: discord.ext.commands.bot.Bot)
Expand source code
async def setup(bot_: Bot):
    global bot
    bot = bot_
    pubsub.subscribe(**{"rpc.bot": handler})
    pubsub.run_in_thread(daemon=True, sleep_time=0.01)
async def teardown(*_)
Expand source code
async def teardown(*_):
    pubsub.unsubscribe()
    pubsub.close()

Classes

class RPCHandlers
Expand source code
class RPCHandlers:
    """
    Incoming RPC operation handler functions

    Since there are going to be a small number of operations supported by the
    bot, the handlers can be collected here as static methods instead of
    breaking each out into its own module.
    """

    @staticmethod
    async def guilds(*_, **__):
        """Get list of guilds the bot is joined to."""

        return BotGuildsResponse(
            guild_ids=set([str(guild.id) for guild in bot.guilds]),
        )

Incoming RPC operation handler functions

Since there are going to be a small number of operations supported by the bot, the handlers can be collected here as static methods instead of breaking each out into its own module.

Static methods

async def guilds(*_, **__)
Expand source code
@staticmethod
async def guilds(*_, **__):
    """Get list of guilds the bot is joined to."""

    return BotGuildsResponse(
        guild_ids=set([str(guild.id) for guild in bot.guilds]),
    )

Get list of guilds the bot is joined to.