Module realm_api.rpc
RPC module
Sub-modules
realm_api.rpc.roll
-
Dice roller RPC module
Functions
def handler(message: dict)
-
Expand source code
def handler(message: dict): """Handle an incoming RPC operation and publish the result.""" data: dict = json.loads(message["data"]) log.info(f"RPC op: {data['op']}") result = handlers[data["op"]]( *data.get("args", []), **data.get("kwargs", dict()), ) redis_conn.publish(data["uuid"], json.dumps(result))
Handle an incoming RPC operation and publish the result.
async def rpc_bot(op: str, *args, timeout=3, **kwargs)
-
Expand source code
async def rpc_bot(op: str, *args, timeout=3, **kwargs): """Perform an outgoing bot RPC operation and await the response.""" q = aio.Queue() def handler(message: dict): data = message["data"] aio.new_event_loop().run_until_complete(q.put(data)) uuid = str(uuid4()) pubsub.subscribe(**{uuid: handler}) try: redis_conn.publish( "rpc.bot", json.dumps( { "uuid": uuid, "op": op, "args": args, "kwargs": kwargs, } ), ) return await aio.wait_for(q.get(), timeout) finally: pubsub.unsubscribe(uuid)
Perform an outgoing bot RPC operation and await the response.
def setup_webapp(*_)
-
Expand source code
def setup_webapp(*_): pubsub.subscribe(**{"rpc.api": handler}) pubsub.run_in_thread(daemon=True, sleep_time=0.01)
def teardown_webapp(*_)
-
Expand source code
def teardown_webapp(*_): pubsub.unsubscribe() pubsub.close()