Ship records from Python’s built-in logging module to AxonPush as
OpenTelemetry-shaped app.log events. Works with any framework that uses
stdlib logging — FastAPI, Flask, Django, aiohttp, Starlette, etc.
[!TIP]
Non-blocking by default (v0.0.5+)
emit() is O(microseconds) — it pushes the publish kwargs onto a bounded
in-memory queue and returns immediately. A single daemon thread drains
the queue and calls client.events.publish(...) in the background, so
logging from an async request handler no longer stalls the event loop.
The queue is bounded (default 1000 records); overflow drops records and
logs a rate-limited warning. Call handler.flush(timeout=) to block
until the queue is drained (useful in tests, Lambda handlers, or before
a shutdown). Pass mode="sync" to the constructor to fall back to the
old blocking behavior.
[!TIP]
Self-recursion filter is built in
AxonPushLoggingHandler installs a filter by default that drops records from
httpx, httpcore, and the SDK’s own axonpush logger. Without it, each
publish would trigger an httpx INFO log (HTTP Request: POST /event 201)
that would get re-shipped, and the channel would fill with echoing POSTs.
The filter is always-on and cannot be disabled; you can add more
excluded prefixes via exclude_loggers=[...].
Records emitted on uvicorn.error propagate up to uvicorn, fire its
default StreamHandler (what prints INFO: to your console), then hit
propagate=False and stop. They never reach the root logger. So a
root-only attach captures every logger in your app EXCEPT uvicorn.* —
which happens to include most of the logs you actually wrote in main.py
via logging.getLogger("uvicorn.error").
The two-line fix in the snippet above (addHandler on both root AND
uvicorn.error) covers the gap. Don’t attach to uvicorn itself — that
would get every record twice because root and uvicorn.error handlers
both fire during propagation.
[!TIP]
Non-blocking is the default now
Since v0.0.5, emit() never touches the network on the caller’s thread —
the publish runs on a background worker thread. You can ignore the old
“pass an AsyncAxonPush client to avoid blocking” advice; a plain
AxonPush client is fine in async request handlers.
AWS Lambda / Google Cloud Functions / Azure Functions
Serverless containers are frozen between invocations, so a background
worker thread doesn’t get a chance to drain while the process is paused.
To guarantee delivery, call handler.flush() at the end of each
invocation. The @flush_after_invocation decorator wraps your handler
function and flushes in a finally: block:
import logging
import os
from axonpush import AxonPush
from axonpush.integrations.logging_handler import (
Performance stays good: emit() is still O(microseconds), and flush()
runs once per invocation at the end — not once per log call. The
handler auto-detects Lambda (via AWS_LAMBDA_FUNCTION_NAME), Google
Cloud Functions (via FUNCTION_TARGET), and Azure Functions (via
AZURE_FUNCTIONS_ENVIRONMENT) at construction time and logs a one-time
reminder to use flush_after_invocation.
Pass *handlers to the decorator to flush multiple integrations in one
wrap:
The background publisher is fork-safe via os.register_at_fork. When
the parent process constructs the handler (module-level or Gunicorn’s
preload phase), the child fork gets a fresh queue and a fresh worker
thread after forking — the parent’s stale thread reference is cleared
and no publish races leak across processes. You can keep constructing
the handler once at module level.
Same root-logger pattern as FastAPI, or attach to app.logger if you only
want Flask-scoped logs:
from flask import Flask
from axonpush import AxonPush
from axonpush.integrations.logging_handler import AxonPushLoggingHandler
client =AxonPush(api_key="ak_...",tenant_id="1")
app =Flask(__name__)
handler =AxonPushLoggingHandler(
client=client,
channel_id=1,
service_name="my-flask-app",
)
app.logger.addHandler(handler)
Werkzeug’s dev-server access logs are not excluded by default — they’re
not a feedback-loop risk. Add exclude_loggers=["werkzeug"] if you don’t
want them shipped to AxonPush.
Django configures logging via a LOGGING dict in settings.py. dictConfig
only supports primitive kwargs (strings, ints), so you can’t pass a pre-built
client instance — instead, let the handler build its own client from
credential kwargs (or from AXONPUSH_API_KEY / AXONPUSH_TENANT_ID env vars).
If AXONPUSH_API_KEY and AXONPUSH_TENANT_ID are set in the environment
(e.g., via django-environ or a .env file loaded in settings.py), you
can omit the api_key / tenant_id keys from the handler config entirely —
the handler reads them from os.environ as a fallback.
[!TIP]
Chatty sub-loggers
django.db.backends logs every SQL query at DEBUG level and can flood your
AxonPush channel. Either add it to exclude_loggers=[...] or pin its level
to WARNING in the loggers block.
queue_size=1000,# max records buffered before drop
shutdown_timeout=2.0,# seconds to wait for drain on close()
)
Pass eitherclient=or the credential kwargs — not both. If
neither is provided, the handler falls back to AXONPUSH_API_KEY /
AXONPUSH_TENANT_ID / AXONPUSH_BASE_URL environment variables.
handler.flush(timeout=None) — block until the background queue is drained, or until timeout seconds have elapsed (no-op in mode="sync").
handler.close() — drain pending records, stop the background worker, and release the handler. Also called automatically by logging.shutdown() at interpreter exit.