structlog
Forward structlog events to AxonPush via a non-destructive processor. The processor composes cleanly with the rest of your structlog chain — it doesn’t modify the event dict flowing to downstream processors, it just emits a side-effect publish call.
[!TIP] Non-blocking by default (v0.0.5+)
The processor pushes publishes onto a bounded in-memory queue and drains them from a single background daemon thread — logging stays O(microseconds) on the caller’s thread. Call
processor.flush(timeout=)at known checkpoints (end of a Lambda invocation, end of a test) to guarantee delivery. Passmode="sync"for blocking publishes. See the stdlib logging page for the Lambda / GCF / Azure Functions flush pattern — theflush_after_invocationdecorator works with the structlog processor too.
Installation
Section titled “Installation”pip install "axonpush[structlog]"import osimport structlogfrom axonpush import AxonPushfrom axonpush.integrations.structlog import axonpush_structlog_processor
client = AxonPush( api_key=os.environ["AXONPUSH_API_KEY"], tenant_id=os.environ["AXONPUSH_TENANT_ID"],)
forwarder = axonpush_structlog_processor( client=client, channel_id=1, service_name="my-api", environment="production",)
structlog.configure( processors=[ structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), forwarder, # ← place BEFORE the renderer structlog.processors.JSONRenderer(), ],)[!TIP] Processor order matters
Place
forwarderbefore the final renderer (JSONRenderer,KeyValueRenderer, orConsoleRenderer). Renderers convert the event dict into a string for the final sink; once rendered, the structured fields are gone.
log = structlog.get_logger("my_app")log.info("user signed in", user_id=42, method="oauth")log.warning("rate limit approaching", endpoint="/api/search", remaining=3)log.error("downstream timeout", endpoint="/api/search", elapsed_ms=5000)
# Bound context is forwarded toorequest_log = log.bind(request_id="req-9f21")request_log.info("handling request", path="/chat")Flushing and closing
Section titled “Flushing and closing”forwarder.flush(timeout=1.0) # block until queue is drainedforwarder.close() # drain pending records and stop the workerclose() is also called automatically by atexit at interpreter exit.
Events
Section titled “Events”| Field | Value |
|---|---|
identifier | The structlog logger name |
event_type | app.log (or agent.log if source="agent") |
payload.body | The event key from the structlog dict |
payload.severityNumber / payload.severityText | OTel severity mapped from the log level |
payload.attributes | Every other key in the structlog event dict (bound context, keyword args, timestamps) |
payload.resource | service.name, service.version, deployment.environment (if configured) |