Skip to content

Realtime

Subscribe to events as they happen over MQTT-over-WSS, backed by AWS IoT Core. The Python SDK ships two clients:

  • RealtimeClient — synchronous, built on paho-mqtt.
  • AsyncRealtimeClientasyncio-native, built on aiomqtt.

Both are public from axonpush==0.0.11+. paho-mqtt and aiomqtt are runtime dependencies of axonpush — no extras flag required.

Terminal window
pip install axonpush
from axonpush import AxonPush
with AxonPush(
api_key="ak_...",
tenant_id="1",
base_url="https://api.axonpush.xyz",
) as client:
rt = client.connect_realtime() # opens MQTT-over-WSS
rt.subscribe(
channel_id="ch_...",
callback=lambda payload: print(payload),
)
# rt is alive on a background thread; do other work, then:
rt.disconnect()

connect_realtime() returns a connected RealtimeClient. The reader runs on paho-mqtt’s background thread, so subscribe callbacks fire from that thread — keep them quick (or hand work off to your own queue).

from axonpush import AsyncAxonPush
async with AsyncAxonPush(
api_key="ak_...",
tenant_id="1",
base_url="https://api.axonpush.xyz",
) as client:
rt = await client.connect_realtime()
await rt.subscribe(
channel_id="ch_...",
callback=lambda payload: print(payload),
)
# rt is driven by an asyncio reader task; do other awaits, then:
await rt.disconnect()

The async client accepts both sync and async def callbacks. A failing callback is logged and skipped — it never kills the reader task.

subscribe() accepts optional filters that translate into MQTT topic wildcards, so the broker (not your code) does the filtering:

rt.subscribe(
channel_id="ch_...",
app_id="app_...", # optional — None matches any app
event_type="agent.message", # optional
agent_id="researcher", # optional
callback=handle,
)

Each call returns the underlying MQTT topic filter string — pass it back to rt.unsubscribe(topic) to drop the subscription.

rt.publish(
channel_id="ch_...",
app_id="app_...",
event_type="agent.message",
agent_id="researcher",
payload={"text": "hello"},
)

app_id and event_type are required on publish; agent_id is optional.

You don’t have to think about this — the SDK handles it — but for the curious:

  1. connect_realtime() calls POST /auth/iot-credentials with your API key (or JWT).
  2. The backend returns { presignedWssUrl, authorizerName, authToken, clientId, region, topicPrefix, envSlug, expiresAt, ... }. The URL is unsigned and carries ?x-amz-customauthorizer-name=NAME — there is no SigV4 signature. (Pre-0.0.11 SDKs used a SigV4-signed URL; that flow is gone.)
  3. The SDK opens a WebSocket to the presigned URL with Sec-WebSocket-Protocol: mqttv5.0.
  4. It sends an MQTT v5 CONNECT packet whose username is the JWT authToken. AWS IoT routes the CONNECT to the named custom JWT authorizer Lambda, which validates the token and returns an IAM policy scoped to your org’s topic prefix.
  5. The SDK schedules a credential refresh ~60s before expiresAt and transparently reconnects with the new token. Subscriptions are restored on the new connection.

If the broker rejects the CONNECT, connect_realtime() raises ConnectionError — your code is free to retry.