Skip to content

Handle Errors, Retries, and Rate Limits

Ship to production with confidence. The SDK is fail-open by default — connection failures never crash your app.

The axonpush SDK will never crash your application out of the box. When the API is unreachable (network errors, DNS failures, timeouts), the SDK silently suppresses the error and logs a warning. Methods return None or [] instead of raising.

from axonpush import AxonPush
# Default behavior: fail_open=True
with AxonPush(api_key="ak_...", tenant_id="1", base_url="https://api.axonpush.xyz") as client:
# If axonpush is down, this returns None — your app keeps running
event = client.events.publish(
"web_search", {"query": "AI agents"},
channel_id=1, agent_id="researcher",
)

All framework integrations (LangChain, Anthropic, OpenAI Agents, CrewAI, Deep Agents) always suppress errors — observability will never break your AI pipeline.

HTTP errors (bad credentials, invalid requests) still raise typed exceptions so you can fix configuration issues. These only fire when the API is reachable but returns an error.

from axonpush import AxonPush
from axonpush.exceptions import RateLimitError, AuthenticationError, ServerError
import time
with AxonPush(api_key="ak_...", tenant_id="1", base_url="https://api.axonpush.xyz") as client:
try:
event = client.events.publish(
"web_search", {"query": "AI agents"},
channel_id=1, agent_id="researcher",
)
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after}s")
time.sleep(e.retry_after or 1)
except AuthenticationError:
print("Invalid API key — check your credentials")
except ServerError:
print("axonpush is temporarily unavailable — fall back to local logging")
AxonPushError (base)
├── APIConnectionError → Network unreachable (only raised when fail_open=False)
├── AuthenticationError → HTTP 401
├── ForbiddenError → HTTP 403
├── NotFoundError → HTTP 404
├── ValidationError → HTTP 400
├── RateLimitError → HTTP 429 (includes retry_after)
└── ServerError → HTTP 5xx
Go Deeper
from axonpush.exceptions import RateLimitError, ServerError
def publish_with_retry(client, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
return client.events.publish(**kwargs)
except RateLimitError as e:
wait = e.retry_after or (2 ** attempt)
time.sleep(wait)
except ServerError:
time.sleep(2 ** attempt)
raise RuntimeError(f"Failed after {max_retries} retries")

Record failures in your trace before re-raising:

from axonpush import EventType, get_or_create_trace
from axonpush.exceptions import AxonPushError
trace = get_or_create_trace()
try:
result = call_external_tool()
except Exception as e:
try:
client.events.publish(
"tool_failure",
{"error": str(e), "tool": "web_search"},
channel_id=1,
agent_id="researcher",
trace_id=trace.trace_id,
event_type=EventType.AGENT_ERROR,
)
except AxonPushError:
pass # don't let observability failures break the agent
raise
from axonpush import AsyncAxonPush
from axonpush.exceptions import RateLimitError
import asyncio
async with AsyncAxonPush(api_key="ak_...", tenant_id="1", base_url="https://api.axonpush.xyz") as client:
try:
event = await client.events.publish(
"web_search", {"query": "AI agents"},
channel_id=1, agent_id="researcher",
)
except RateLimitError as e:
await asyncio.sleep(e.retry_after or 1)