Module realm_bot.rpc
RPC functionality and Aethersprite extension
Functions
def handler(message: dict)
-
Expand source code
def handler(message: dict): """Handle an incoming RPC operation and publish the result.""" data: dict = json.loads(message["data"]) log.info(f"RPC op: {data['op']}") async def f(): result: BaseModel = await getattr(RPCHandlers, data["op"])( *data.get("args", []), **data.get("kwargs", dict()), ) redis_conn.publish( data["uuid"], result.model_dump_json() if result else "" ) aio.run_coroutine_threadsafe(f(), bot.loop)
Handle an incoming RPC operation and publish the result.
async def rpc_api(op: str, *args, timeout=3, **kwargs)
-
Expand source code
async def rpc_api(op: str, *args, timeout=3, **kwargs): """Perform an outgoing API RPC operation and await the response.""" q = aio.Queue() def handler(message: dict): data = message["data"] aio.run_coroutine_threadsafe(q.put(data), bot.loop) uuid = str(uuid4()) pubsub.subscribe(**{uuid: handler}) try: redis_conn.publish( "rpc.api", json.dumps( { "uuid": uuid, "op": op, "args": args, "kwargs": kwargs, } ), ) return await aio.wait_for(q.get(), timeout) finally: pubsub.unsubscribe(uuid)
Perform an outgoing API RPC operation and await the response.
async def setup(bot_: discord.ext.commands.bot.Bot)
-
Expand source code
async def setup(bot_: Bot): global bot bot = bot_ pubsub.subscribe(**{"rpc.bot": handler}) pubsub.run_in_thread(daemon=True, sleep_time=0.01)
async def teardown(*_)
-
Expand source code
async def teardown(*_): pubsub.unsubscribe() pubsub.close()
Classes
class RPCHandlers
-
Expand source code
class RPCHandlers: """ Incoming RPC operation handler functions Since there are going to be a small number of operations supported by the bot, the handlers can be collected here as static methods instead of breaking each out into its own module. """ @staticmethod async def guilds(*_, **__): """Get list of guilds the bot is joined to.""" return BotGuildsResponse( guild_ids=set([str(guild.id) for guild in bot.guilds]), )
Incoming RPC operation handler functions
Since there are going to be a small number of operations supported by the bot, the handlers can be collected here as static methods instead of breaking each out into its own module.
Static methods
async def guilds(*_, **__)
-
Expand source code
@staticmethod async def guilds(*_, **__): """Get list of guilds the bot is joined to.""" return BotGuildsResponse( guild_ids=set([str(guild.id) for guild in bot.guilds]), )
Get list of guilds the bot is joined to.