Loguru
Forward Loguru records to AxonPush as OpenTelemetry-shaped app.log events.
[!TIP] Non-blocking by default (v0.0.5+)
The sink pushes publishes onto a bounded in-memory queue and drains them from a single background daemon thread —
logger.info(...)stays O(microseconds) on the caller’s thread. Callsink.flush(timeout=)at known checkpoints (end of a Lambda invocation, end of a test) to guarantee delivery. Passmode="sync"for blocking publishes. See the stdlib logging page for the Lambda / GCF / Azure Functions flush pattern — theflush_after_invocationdecorator works with the Loguru sink too.
Installation
Section titled “Installation”pip install "axonpush[loguru]"import osfrom loguru import loggerfrom axonpush import AxonPushfrom axonpush.integrations.loguru import create_axonpush_loguru_sink
client = AxonPush( api_key=os.environ["AXONPUSH_API_KEY"], tenant_id=os.environ["AXONPUSH_TENANT_ID"],)
sink = create_axonpush_loguru_sink( client=client, channel_id=1, service_name="my-api", environment="production",)
# serialize=True is REQUIRED — it tells Loguru to pass a JSON string# of the record to the sink, which the AxonPush sink parses.logger.add(sink, serialize=True)Log normally — Loguru bind() context and keyword args become attributes:
logger.info("user signed in", user_id=42, method="oauth")logger.warning("rate limit approaching", endpoint="/api/search", remaining=3)
try: raise RuntimeError("downstream timeout")except RuntimeError: logger.exception("search backend failed", endpoint="/api/search")
# Bound context is forwarded toorequest_logger = logger.bind(request_id="req-9f21", user_id=42)request_logger.info("handling request")Flushing and closing
Section titled “Flushing and closing”sink.flush(timeout=1.0) # block until queue is drained, up to 1 secondsink.close() # drain pending records and stop the background workersink.close() is also called automatically by atexit at interpreter
exit, so a single long-running process doesn’t need to call it explicitly.
Removing the sink
Section titled “Removing the sink”Sinks are identified by the handle returned from logger.add():
sink_id = logger.add(sink, serialize=True)...sink.flush(timeout=1.0)logger.remove(sink_id)sink.close()Events
Section titled “Events”| Field | Value |
|---|---|
identifier | The record’s module name |
event_type | app.log (or agent.log if source="agent") |
payload.body | The Loguru message |
payload.severityNumber / payload.severityText | OTel severity mapped from the Loguru level |
payload.attributes | File/function/line metadata plus any bound context and keyword args |
payload.resource | service.name, service.version, deployment.environment (if configured) |