Channels
Channels route events within an app. You can subscribe to channels in real-time via SSE or WebSocket.
CRUD
python
# Create
channel = client.channels.create(name="events", app_id=1)
# Get
channel = client.channels.get(channel_id=1)
# Update
channel = client.channels.update(channel_id=1, name="renamed")
# Delete
client.channels.delete(channel_id=1)SSE Subscription
Subscribe to all events on a channel:
python
with client.channels.subscribe_sse(channel_id=1) as stream:
for event in stream:
print(event.agent_id, event.identifier, event.payload)Filter by event type, agent, or trace:
python
from axonpush import EventType
with client.channels.subscribe_sse(
channel_id=1,
agent_id="researcher",
event_type=EventType.AGENT_ERROR,
trace_id="tr_run_42",
) as stream:
for event in stream:
print(f"[{event.agent_id}] {event.identifier}: {event.payload}")Subscribe to a specific event identifier:
python
with client.channels.subscribe_event_sse(
channel_id=1,
event_identifier="web_search",
) as stream:
for event in stream:
print(event.payload)WebSocket
Requires pip install "axonpush[websocket]".
python
ws = client.connect_websocket()
ws.on_event(lambda e: print(e.agent_id, e.payload))
ws.subscribe(channel_id=1, event_type="agent.tool_call.start")
ws.wait() # blocks until disconnectedPublish and manage subscriptions:
python
ws.publish(channel_id=1, identifier="status", payload={"step": "done"}, agent_id="worker")
ws.unsubscribe(channel_id=1)
ws.disconnect()Async WebSocket
python
ws = await async_client.connect_websocket()
ws.on_event(lambda e: print(e.agent_id, e.payload))
await ws.subscribe(channel_id=1)
await ws.wait()