Module realm_api.rpc

RPC module

Sub-modules

realm_api.rpc.roll

Dice roller RPC module

Functions

def handler(message: dict)
Expand source code
def handler(message: dict):
    """Handle an incoming RPC operation and publish the result."""

    data: dict = json.loads(message["data"])
    log.info(f"RPC op: {data['op']}")
    result = handlers[data["op"]](
        *data.get("args", []),
        **data.get("kwargs", dict()),
    )
    redis_conn.publish(data["uuid"], json.dumps(result))

Handle an incoming RPC operation and publish the result.

async def rpc_bot(op: str, *args, timeout=3, **kwargs)
Expand source code
async def rpc_bot(op: str, *args, timeout=3, **kwargs):
    """Perform an outgoing bot RPC operation and await the response."""

    q = aio.Queue()

    def handler(message: dict):
        data = message["data"]
        aio.new_event_loop().run_until_complete(q.put(data))

    uuid = str(uuid4())
    pubsub.subscribe(**{uuid: handler})

    try:
        redis_conn.publish(
            "rpc.bot",
            json.dumps(
                {
                    "uuid": uuid,
                    "op": op,
                    "args": args,
                    "kwargs": kwargs,
                }
            ),
        )

        return await aio.wait_for(q.get(), timeout)

    finally:
        pubsub.unsubscribe(uuid)

Perform an outgoing bot RPC operation and await the response.

def setup_webapp(*_)
Expand source code
def setup_webapp(*_):
    pubsub.subscribe(**{"rpc.api": handler})
    pubsub.run_in_thread(daemon=True, sleep_time=0.01)
def teardown_webapp(*_)
Expand source code
def teardown_webapp(*_):
    pubsub.unsubscribe()
    pubsub.close()