Skip to content

Channels

Channels route events within an app. You can subscribe to channels in real-time via SSE or WebSocket.

CRUD

python
# Create
channel = client.channels.create(name="events", app_id=1)

# Get
channel = client.channels.get(channel_id=1)

# Update
channel = client.channels.update(channel_id=1, name="renamed")

# Delete
client.channels.delete(channel_id=1)

SSE Subscription

Subscribe to all events on a channel:

python
with client.channels.subscribe_sse(channel_id=1) as stream:
    for event in stream:
        print(event.agent_id, event.identifier, event.payload)

Filter by event type, agent, or trace:

python
from axonpush import EventType

with client.channels.subscribe_sse(
    channel_id=1,
    agent_id="researcher",
    event_type=EventType.AGENT_ERROR,
    trace_id="tr_run_42",
) as stream:
    for event in stream:
        print(f"[{event.agent_id}] {event.identifier}: {event.payload}")

Subscribe to a specific event identifier:

python
with client.channels.subscribe_event_sse(
    channel_id=1,
    event_identifier="web_search",
) as stream:
    for event in stream:
        print(event.payload)

WebSocket

Requires pip install "axonpush[websocket]".

python
ws = client.connect_websocket()
ws.on_event(lambda e: print(e.agent_id, e.payload))
ws.subscribe(channel_id=1, event_type="agent.tool_call.start")
ws.wait()  # blocks until disconnected

Publish and manage subscriptions:

python
ws.publish(channel_id=1, identifier="status", payload={"step": "done"}, agent_id="worker")
ws.unsubscribe(channel_id=1)
ws.disconnect()

Async WebSocket

python
ws = await async_client.connect_websocket()
ws.on_event(lambda e: print(e.agent_id, e.payload))
await ws.subscribe(channel_id=1)
await ws.wait()