Realtime
Subscribe to events as they happen over MQTT-over-WSS, backed by AWS IoT Core. The Python SDK ships two clients:
RealtimeClient— synchronous, built onpaho-mqtt.AsyncRealtimeClient—asyncio-native, built onaiomqtt.
Both are public from axonpush==0.0.11+. paho-mqtt and aiomqtt are runtime dependencies of axonpush — no extras flag required.
Install
Section titled “Install”pip install axonpushQuick Start (sync)
Section titled “Quick Start (sync)”from axonpush import AxonPush
with AxonPush( api_key="ak_...", tenant_id="1", base_url="https://api.axonpush.xyz",) as client: rt = client.connect_realtime() # opens MQTT-over-WSS rt.subscribe( channel_id="ch_...", callback=lambda payload: print(payload), ) # rt is alive on a background thread; do other work, then: rt.disconnect()connect_realtime() returns a connected RealtimeClient. The reader runs on paho-mqtt’s background thread, so subscribe callbacks fire from that thread — keep them quick (or hand work off to your own queue).
Quick Start (async)
Section titled “Quick Start (async)”from axonpush import AsyncAxonPush
async with AsyncAxonPush( api_key="ak_...", tenant_id="1", base_url="https://api.axonpush.xyz",) as client: rt = await client.connect_realtime() await rt.subscribe( channel_id="ch_...", callback=lambda payload: print(payload), ) # rt is driven by an asyncio reader task; do other awaits, then: await rt.disconnect()The async client accepts both sync and async def callbacks. A failing callback is logged and skipped — it never kills the reader task.
Filtering on the server
Section titled “Filtering on the server”subscribe() accepts optional filters that translate into MQTT topic wildcards, so the broker (not your code) does the filtering:
rt.subscribe( channel_id="ch_...", app_id="app_...", # optional — None matches any app event_type="agent.message", # optional agent_id="researcher", # optional callback=handle,)Each call returns the underlying MQTT topic filter string — pass it back to rt.unsubscribe(topic) to drop the subscription.
Publishing
Section titled “Publishing”rt.publish( channel_id="ch_...", app_id="app_...", event_type="agent.message", agent_id="researcher", payload={"text": "hello"},)app_id and event_type are required on publish; agent_id is optional.
Auth under the hood
Section titled “Auth under the hood”You don’t have to think about this — the SDK handles it — but for the curious:
connect_realtime()callsPOST /auth/iot-credentialswith your API key (or JWT).- The backend returns
{ presignedWssUrl, authorizerName, authToken, clientId, region, topicPrefix, envSlug, expiresAt, ... }. The URL is unsigned and carries?x-amz-customauthorizer-name=NAME— there is no SigV4 signature. (Pre-0.0.11SDKs used a SigV4-signed URL; that flow is gone.) - The SDK opens a WebSocket to the presigned URL with
Sec-WebSocket-Protocol: mqttv5.0. - It sends an MQTT v5 CONNECT packet whose
usernameis the JWTauthToken. AWS IoT routes the CONNECT to the named custom JWT authorizer Lambda, which validates the token and returns an IAM policy scoped to your org’s topic prefix. - The SDK schedules a credential refresh ~60s before
expiresAtand transparently reconnects with the new token. Subscriptions are restored on the new connection.
If the broker rejects the CONNECT, connect_realtime() raises ConnectionError — your code is free to retry.
See also
Section titled “See also”- Channels — channel CRUD.
- Events — REST publish + list.
- POST /auth/iot-credentials — full response shape.