Skip to content

Realtime

Subscribe to events as they happen over MQTT-over-WSS, backed by AWS IoT Core. The TypeScript SDK exposes a single RealtimeClient built on mqtt.js (a runtime dependency of @axonpush/sdk, no extra install needed).

Available from @axonpush/sdk@0.0.6+.

Terminal window
npm install @axonpush/sdk
# or
bun add @axonpush/sdk
import { AxonPush } from "@axonpush/sdk";
const client = new AxonPush({
apiKey: process.env.AXONPUSH_API_KEY!,
tenantId: process.env.AXONPUSH_TENANT_ID!,
});
const rt = await client.connectRealtime();
await rt.subscribe(
{ channelId: "ch_..." },
(event) => console.log(event),
);
// rt is alive in the background; do other work, then:
await rt.disconnect();

connectRealtime() returns a RealtimeClient once the broker has accepted the connection. Subscribe callbacks may be sync or async — a failing callback is logged via the SDK’s onError hook and never crashes the reader.

await rt.subscribe(
{
channelId: "ch_...",
appId: "app_...",
agentId: "researcher",
eventType: "agent.message",
},
(event) => handle(event),
);

Filters with values become concrete MQTT topic segments; missing filters become wildcards, so the broker does the work. Drop a subscription with the same filter object you used to register it:

await rt.unsubscribe({ channelId: "ch_...", agentId: "researcher" });

You can also register a global handler that sees every incoming event:

const off = rt.onEvent((event) => log(event));
// later:
off();
await rt.publish({
channelId: "ch_...",
appId: "app_...",
identifier: "agent.message",
payload: { text: "hello" },
agentId: "researcher",
});

You don’t have to think about this — the SDK handles it — but for the curious:

  1. connectRealtime() calls POST /auth/iot-credentials with your API key (or JWT).
  2. The backend returns { presignedWssUrl, authorizerName, authToken, clientId, region, topicPrefix, envSlug, expiresAt, ... }. The URL is unsigned and carries ?x-amz-customauthorizer-name=NAME — there is no SigV4 signature. (Pre-0.0.6 SDKs used a SigV4-signed URL; that flow is gone.)
  3. The SDK calls mqtt.connect(presignedWssUrl, { wsOptions: { protocol: "mqttv5.0" }, protocolVersion: 5, username: authToken, password: "", clientId }).
  4. AWS IoT routes the MQTT CONNECT to the named custom JWT authorizer Lambda, which validates authToken and returns an IAM policy scoped to your org’s topic prefix.
  5. About 60s before expiresAt, the SDK fetches a fresh set of credentials, opens a second connection, swaps it in, restores subscriptions, and tears the old connection down. Refresh failures back off (5s → 15s → 30s → 60s) and surface through onError.

If the broker rejects the initial CONNECT, connectRealtime() rejects — your code is free to retry.

const rt = await client.connectRealtime({
environment: "production", // default env slug for publishes
credentialsRefreshLeadMs: 60_000, // refresh window before expiry
onError: (err) => report(err), // refresh / reconnect / callback failures
});