Skip to content

structlog

Forward structlog events to AxonPush via a non-destructive processor. The processor composes cleanly with the rest of your structlog chain — it doesn’t modify the event dict flowing to downstream processors, it just emits a side-effect publish call.

[!TIP] Non-blocking by default (v0.0.5+)

The processor pushes publishes onto a bounded in-memory queue and drains them from a single background daemon thread — logging stays O(microseconds) on the caller’s thread. Call processor.flush(timeout=) at known checkpoints (end of a Lambda invocation, end of a test) to guarantee delivery. Pass mode="sync" for blocking publishes. See the stdlib logging page for the Lambda / GCF / Azure Functions flush pattern — the flush_after_invocation decorator works with the structlog processor too.

Terminal window
pip install "axonpush[structlog]"
import os
import structlog
from axonpush import AxonPush
from axonpush.integrations.structlog import axonpush_structlog_processor
client = AxonPush(
api_key=os.environ["AXONPUSH_API_KEY"],
tenant_id=os.environ["AXONPUSH_TENANT_ID"],
)
forwarder = axonpush_structlog_processor(
client=client,
channel_id=1,
service_name="my-api",
environment="production",
)
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
forwarder, # ← place BEFORE the renderer
structlog.processors.JSONRenderer(),
],
)

[!TIP] Processor order matters

Place forwarder before the final renderer (JSONRenderer, KeyValueRenderer, or ConsoleRenderer). Renderers convert the event dict into a string for the final sink; once rendered, the structured fields are gone.

log = structlog.get_logger("my_app")
log.info("user signed in", user_id=42, method="oauth")
log.warning("rate limit approaching", endpoint="/api/search", remaining=3)
log.error("downstream timeout", endpoint="/api/search", elapsed_ms=5000)
# Bound context is forwarded too
request_log = log.bind(request_id="req-9f21")
request_log.info("handling request", path="/chat")
forwarder.flush(timeout=1.0) # block until queue is drained
forwarder.close() # drain pending records and stop the worker

close() is also called automatically by atexit at interpreter exit.

FieldValue
identifierThe structlog logger name
event_typeapp.log (or agent.log if source="agent")
payload.bodyThe event key from the structlog dict
payload.severityNumber / payload.severityTextOTel severity mapped from the log level
payload.attributesEvery other key in the structlog event dict (bound context, keyword args, timestamps)
payload.resourceservice.name, service.version, deployment.environment (if configured)