Skip to content

Python stdlib `logging`

Ship records from Python’s built-in logging module to AxonPush as OpenTelemetry-shaped app.log events. Works with any framework that uses stdlib logging — FastAPI, Flask, Django, aiohttp, Starlette, etc.

[!TIP] Non-blocking by default (v0.0.5+)

emit() is O(microseconds) — it pushes the publish kwargs onto a bounded in-memory queue and returns immediately. A single daemon thread drains the queue and calls client.events.publish(...) in the background, so logging from an async request handler no longer stalls the event loop. The queue is bounded (default 1000 records); overflow drops records and logs a rate-limited warning. Call handler.flush(timeout=) to block until the queue is drained (useful in tests, Lambda handlers, or before a shutdown). Pass mode="sync" to the constructor to fall back to the old blocking behavior.

[!TIP] Self-recursion filter is built in

AxonPushLoggingHandler installs a filter by default that drops records from httpx, httpcore, and the SDK’s own axonpush logger. Without it, each publish would trigger an httpx INFO log (HTTP Request: POST /event 201) that would get re-shipped, and the channel would fill with echoing POSTs. The filter is always-on and cannot be disabled; you can add more excluded prefixes via exclude_loggers=[...].

Terminal window
pip install axonpush

No extras needed — the stdlib handler ships with the base package.

import logging
from axonpush import AxonPush
from axonpush.integrations.logging_handler import AxonPushLoggingHandler
client = AxonPush(api_key="ak_...", tenant_id="1")
handler = AxonPushLoggingHandler(
client=client,
channel_id=1,
service_name="my-api",
environment="production",
)
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.INFO)

Log normally — extra={} kwargs become OTel attributes:

logger = logging.getLogger("my_app.orders")
logger.info("order created", extra={"order_id": 1234, "total": 49.99})
logger.warning("stock low for sku=%s", "A-42", extra={"remaining": 3})
try:
raise RuntimeError("payment gateway timeout")
except RuntimeError:
logger.exception("failed to charge card", extra={"order_id": 1234})

Construct the handler in your main.py before the FastAPI() app is instantiated so that startup logs go through it too.

main.py
import logging
import os
from fastapi import FastAPI
from axonpush import AxonPush
from axonpush.integrations.logging_handler import AxonPushLoggingHandler
client = AxonPush(
api_key=os.environ["AXONPUSH_API_KEY"],
tenant_id=os.environ["AXONPUSH_TENANT_ID"],
)
axonpush_handler = AxonPushLoggingHandler(
client=client,
channel_id=int(os.environ["AXONPUSH_CHANNEL_ID_LOGGING"]),
service_name="my-api",
)
axonpush_handler.setLevel(logging.INFO)
# Root handler catches non-uvicorn loggers (app code, third-party libs).
logging.getLogger().addHandler(axonpush_handler)
# Uvicorn's default LOGGING_CONFIG sets uvicorn.propagate=False, which
# prevents uvicorn.error records from ever reaching the root logger. So
# attach the handler to uvicorn.error directly — otherwise your startup
# logs, request errors, and everything else you emit via
# logging.getLogger("uvicorn.error") will be invisible to AxonPush.
logging.getLogger("uvicorn.error").addHandler(axonpush_handler)
# Optional: ship uvicorn access logs (one event per HTTP request).
# logging.getLogger("uvicorn.access").addHandler(axonpush_handler)
logging.getLogger().setLevel(logging.INFO)
app = FastAPI()

[!CAUTION] Uvicorn propagation trap

This catches everyone eventually. Uvicorn installs its own LOGGING_CONFIG when the server starts, which sets:

"uvicorn": {"handlers": ["default"], "propagate": False}
"uvicorn.access": {"handlers": ["access"], "propagate": False}

Records emitted on uvicorn.error propagate up to uvicorn, fire its default StreamHandler (what prints INFO: to your console), then hit propagate=False and stop. They never reach the root logger. So a root-only attach captures every logger in your app EXCEPT uvicorn.* — which happens to include most of the logs you actually wrote in main.py via logging.getLogger("uvicorn.error").

The two-line fix in the snippet above (addHandler on both root AND uvicorn.error) covers the gap. Don’t attach to uvicorn itself — that would get every record twice because root and uvicorn.error handlers both fire during propagation.

[!TIP] Non-blocking is the default now

Since v0.0.5, emit() never touches the network on the caller’s thread — the publish runs on a background worker thread. You can ignore the old “pass an AsyncAxonPush client to avoid blocking” advice; a plain AxonPush client is fine in async request handlers.

AWS Lambda / Google Cloud Functions / Azure Functions

Section titled “AWS Lambda / Google Cloud Functions / Azure Functions”

Serverless containers are frozen between invocations, so a background worker thread doesn’t get a chance to drain while the process is paused. To guarantee delivery, call handler.flush() at the end of each invocation. The @flush_after_invocation decorator wraps your handler function and flushes in a finally: block:

import logging
import os
from axonpush import AxonPush
from axonpush.integrations.logging_handler import (
AxonPushLoggingHandler,
flush_after_invocation,
)
client = AxonPush(
api_key=os.environ["AXONPUSH_API_KEY"],
tenant_id=os.environ["AXONPUSH_TENANT_ID"],
)
handler = AxonPushLoggingHandler(
client=client,
channel_id=int(os.environ["AXONPUSH_CHANNEL_ID_LOGGING"]),
service_name="my-lambda",
)
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.INFO)
@flush_after_invocation(handler)
def lambda_handler(event, context):
logging.info("processing event", extra={"event_id": event["id"]})
return {"statusCode": 200}

Performance stays good: emit() is still O(microseconds), and flush() runs once per invocation at the end — not once per log call. The handler auto-detects Lambda (via AWS_LAMBDA_FUNCTION_NAME), Google Cloud Functions (via FUNCTION_TARGET), and Azure Functions (via AZURE_FUNCTIONS_ENVIRONMENT) at construction time and logs a one-time reminder to use flush_after_invocation.

Pass *handlers to the decorator to flush multiple integrations in one wrap:

@flush_after_invocation(logging_handler, otel_exporter, loguru_sink)
def lambda_handler(event, context):
...

Or call handler.flush(timeout=1.0) inline at a manual checkpoint if you can’t wrap the handler function (e.g. a framework-managed entry point).

Gunicorn --preload / Celery --pool=prefork

Section titled “Gunicorn --preload / Celery --pool=prefork”

The background publisher is fork-safe via os.register_at_fork. When the parent process constructs the handler (module-level or Gunicorn’s preload phase), the child fork gets a fresh queue and a fresh worker thread after forking — the parent’s stale thread reference is cleared and no publish races leak across processes. You can keep constructing the handler once at module level.

Same root-logger pattern as FastAPI, or attach to app.logger if you only want Flask-scoped logs:

from flask import Flask
from axonpush import AxonPush
from axonpush.integrations.logging_handler import AxonPushLoggingHandler
client = AxonPush(api_key="ak_...", tenant_id="1")
app = Flask(__name__)
handler = AxonPushLoggingHandler(
client=client,
channel_id=1,
service_name="my-flask-app",
)
app.logger.addHandler(handler)

Werkzeug’s dev-server access logs are not excluded by default — they’re not a feedback-loop risk. Add exclude_loggers=["werkzeug"] if you don’t want them shipped to AxonPush.

Django configures logging via a LOGGING dict in settings.py. dictConfig only supports primitive kwargs (strings, ints), so you can’t pass a pre-built client instance — instead, let the handler build its own client from credential kwargs (or from AXONPUSH_API_KEY / AXONPUSH_TENANT_ID env vars).

settings.py
import os
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"verbose": {
"format": "{asctime} {levelname} {name} {message}",
"style": "{",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "verbose",
},
"axonpush": {
"class": "axonpush.integrations.logging_handler.AxonPushLoggingHandler",
"channel_id": int(os.environ["AXONPUSH_CHANNEL_ID_LOGGING"]),
"api_key": os.environ["AXONPUSH_API_KEY"], # or env fallback
"tenant_id": os.environ["AXONPUSH_TENANT_ID"], # or env fallback
"service_name": "my-django-app",
"environment": "production",
# Optional: block noisy Django sub-loggers from shipping to AxonPush.
# They still appear in the console handler.
"exclude_loggers": ["django.db.backends"],
},
},
"root": {
"handlers": ["console", "axonpush"],
"level": "INFO",
},
"loggers": {
"django.request": {"handlers": ["console", "axonpush"], "level": "WARNING", "propagate": False},
"django.server": {"handlers": ["console"], "level": "INFO", "propagate": False},
},
}

If AXONPUSH_API_KEY and AXONPUSH_TENANT_ID are set in the environment (e.g., via django-environ or a .env file loaded in settings.py), you can omit the api_key / tenant_id keys from the handler config entirely — the handler reads them from os.environ as a fallback.

[!TIP] Chatty sub-loggers

django.db.backends logs every SQL query at DEBUG level and can flood your AxonPush channel. Either add it to exclude_loggers=[...] or pin its level to WARNING in the loggers block.

AxonPushLoggingHandler(
*,
client=None, # Optional AxonPush | AsyncAxonPush
channel_id: int, # required
api_key=None, # dictConfig path (or env var)
tenant_id=None, # dictConfig path (or env var)
base_url=None, # dictConfig path (or env var)
source="app", # "app" → app.log, "agent" → agent.log
service_name=None,
service_version=None,
environment=None,
agent_id=None,
level=logging.NOTSET,
exclude_loggers=None, # additional excluded logger-name prefixes
mode=None, # "background" (default) | "sync"
queue_size=1000, # max records buffered before drop
shutdown_timeout=2.0, # seconds to wait for drain on close()
)

Pass either client= or the credential kwargs — not both. If neither is provided, the handler falls back to AXONPUSH_API_KEY / AXONPUSH_TENANT_ID / AXONPUSH_BASE_URL environment variables.

ValueWhen to use
"background" (default)FastAPI, Django, Flask, Celery, any long-running process. Also Lambda / GCF / Azure Functions with @flush_after_invocation. Non-blocking.
"sync"One-shot scripts, debug sessions, deterministic tests. emit() blocks on the HTTP publish.
  • handler.flush(timeout=None) — block until the background queue is drained, or until timeout seconds have elapsed (no-op in mode="sync").
  • handler.close() — drain pending records, stop the background worker, and release the handler. Also called automatically by logging.shutdown() at interpreter exit.

Each record becomes an event with:

FieldValue
identifierThe stdlib logger name (e.g. my_app.orders)
event_typeapp.log (or agent.log if source="agent")
payload.bodyThe formatted log message
payload.severityNumber / payload.severityTextOTel severity mapped from the Python level (DEBUG=5, INFO=9, WARN=13, ERROR=17, FATAL=21)
payload.attributesFile path, function name, line number, logger name, thread name, PID, plus any extra={...} kwargs
payload.resourceservice.name, service.version, deployment.environment (if configured)