Skip to content

Trace a Multi-Step Agent Run End-to-End

Correlate every event in a single agent run with auto-generated trace and span IDs. Find where things went wrong without reading walls of logs.

Your agent runs 12 steps across 3 tool calls. Something went wrong at step 8. You have logs, but they’re a wall of text with no correlation. You can’t tell which events belong to the same run. You need a trace ID that ties everything together.

from axonpush import AxonPush, EventType, get_or_create_trace
with AxonPush(api_key="ak_...", tenant_id="1", base_url="https://api.axonpush.xyz") as client:
trace = get_or_create_trace()
client.events.publish(
"web_search", {"query": "AI frameworks"},
channel_id=1, agent_id="researcher",
trace_id=trace.trace_id, span_id=trace.next_span_id(),
event_type=EventType.AGENT_TOOL_CALL_START,
)
client.events.publish(
"summarize", {"input_tokens": 1200},
channel_id=1, agent_id="researcher",
trace_id=trace.trace_id, span_id=trace.next_span_id(),
event_type=EventType.AGENT_TOOL_CALL_START,
)
summary = client.traces.get_summary(trace.trace_id)
print(f"Events: {summary.total_events}, Duration: {summary.duration_ms}ms")
print(f"Errors: {summary.error_count}, Tool calls: {summary.tool_call_count}")
  • get_or_create_trace() creates a TraceContext with an auto-generated trace_id (prefixed tr_).
  • trace.next_span_id() generates sequential span IDs so you can see event order.
  • Both events share the same trace_id, linking them as part of one run.
  • traces.get_summary() returns analytics: total events, duration, error count, tool call count, and agents involved.
Go Deeper
traces = client.traces.list(page=1, limit=20)
for t in traces:
print(f"{t.trace_id}: {t.event_count} events ({t.start_time}{t.end_time})")
events = client.traces.get_events("tr_run_42")
for e in events:
print(f" [{e.span_id}] {e.identifier} ({e.event_type})")

Pass an explicit trace_id to correlate events across microservices:

# Service A
trace = get_or_create_trace("tr_pipeline_run_99")
client_a.events.publish("step_a", {...}, channel_id=1, trace_id=trace.trace_id, ...)
# Service B — same trace_id, different channel
trace = get_or_create_trace("tr_pipeline_run_99")
client_b.events.publish("step_b", {...}, channel_id=2, trace_id=trace.trace_id, ...)

get_or_create_trace() uses Python’s contextvars. The trace context auto-propagates to other functions in the same thread and asyncio tasks spawned from the current task.

from axonpush import AsyncAxonPush, get_or_create_trace
async with AsyncAxonPush(api_key="ak_...", tenant_id="1", base_url="https://api.axonpush.xyz") as client:
trace = get_or_create_trace()
await client.events.publish(
"web_search", {"query": "AI agents"},
channel_id=1, agent_id="researcher",
trace_id=trace.trace_id, span_id=trace.next_span_id(),
event_type=EventType.AGENT_TOOL_CALL_START,
)