Realtime — live updates (and the cache + collab siblings)
Most of the stack is request/response: the browser asks, a server function
answers, the page re-renders. Realtime is the one place the server speaks first —
it pushes an event and every subscribed browser hears it without polling. The
transport is Centrifugo (a standalone WebSocket server),
wrapped by @tikab-interactive/fusion-realtime into three small functions and one
browser subscribe helper.
Like every optional capability here, it is env-gated: with no Centrifugo
configured, publish() is a no-op and the app keeps working — you just lose the
live push and fall back to whatever refresh the page already does.
Coming from React + Django? This is Django Channels. Centrifugo plays the role of your ASGI server + channel layer (the Redis pub-sub Channels uses behind
group_send);publish(channel, data)ischannel_layer.group_send(...); the browsersubscribe()is the JS that opens the Channels WebSocket and handlesonmessage. The difference: Centrifugo is its own process you run, not code inside your app server — so your server publishes over HTTP and never holds the sockets itself.
The shape of it
The package is pure transport — no app types, no UI strings. It splits cleanly by where the secrets live:
| Side | Module | Job |
|---|---|---|
| Server | fusion-realtime (src/index.ts) | Mint a connection token; publish() a message over Centrifugo's HTTP API |
| Browser | fusion-realtime/client (src/client.ts) | subscribe() to a channel over the WebSocket; receive publications |
The two halves never overlap. The HMAC secret and the API key are server-only, so the browser entry point is the only thing client code is allowed to import — and a browser stub throws if you reach for the server half by mistake.
How a live update flows
Four moving parts: your server function, Centrifugo, the user's browser, and a server-minted token that ties them together.
The key idea: the browser never gets a secret. It gets a short-lived JWT that proves who it is, opens the socket with that, and from then on Centrifugo decides what it's allowed to hear. Publishing is a completely separate path — a server-to- server HTTP call authenticated with the API key.
1. The token handshake
A WebSocket the browser opens needs to be authenticated as the signed-in user.
Centrifugo does this with a connection token: an HS256 JWT whose sub is the
user id, signed with a secret both your server and Centrifugo share. Your server
mints it; Centrifugo verifies it on connect. issueConnectionToken in
src/index.ts is the whole minting step:
/**
* Mint a Centrifugo connection token: an HS256 JWT whose `sub` is the user id,
* signed with the shared HMAC secret. Centrifugo verifies it on WebSocket
* connect, so the socket is authenticated as that user. Default TTL 1 hour.
*/
export function issueConnectionToken(userId: string, ttlSeconds = 3600): string {
const secret = process.env.CENTRIFUGO_TOKEN_HMAC_SECRET;
if (!secret) throw new Error("CENTRIFUGO_TOKEN_HMAC_SECRET is not set");
const header = base64url(JSON.stringify({ alg: "HS256", typ: "JWT" }));
// `exp` would need a wall clock; the caller passes ttl and we let the
// platform clock fill it in — kept simple and stateless.
const now = Math.floor(Date.now() / 1000);
const payload = base64url(JSON.stringify({ sub: userId, exp: now + ttlSeconds }));
const signature = createHmac("sha256", secret).update(`${header}.${payload}`).digest("base64url");
return `${header}.${payload}.${signature}`;
}
/** The browser WebSocket endpoint, e.g. ws://localhost:8000/connection/websocket. */
export function realtimeWsUrl(): string {
return process.env.CENTRIFUGO_WS_URL ?? "ws://localhost:8000/connection/websocket";
}No JWT library, no state — just node:crypto's createHmac. The token defaults to a
one-hour TTL and is handed straight to the browser. Because it's signed with the
shared secret, Centrifugo trusts it without calling back to your app.
2. Subscribing in the browser
realtimeWsUrl() gives the browser the WebSocket endpoint (default
ws://localhost:8000/connection/websocket). With { url, token, channel } in hand,
the browser calls subscribe from fusion-realtime/client — a thin wrapper over the
centrifuge client:
import { Centrifuge, type Subscription } from "centrifuge";
/**
* Browser half of @tikab-interactive/fusion-realtime — a thin wrapper over the
* centrifuge client. The consumer fetches `{ url, token, channel }` from a
* server function (the token is minted there), then calls `subscribe` to
* receive live messages. Pure transport: no app types, no strings.
*/
export interface RealtimeHandle {
/** Close the subscription and the socket. */
close: () => void;
}
/**
* Connect with a server-minted token and subscribe to one channel. `onMessage`
* fires for every publication; `onState` reports connect/disconnect so the UI
* can show a live/offline dot. Returns a handle whose `close()` tears it all
* down (call it on unmount).
*/
export function subscribe(opts: {
url: string;
token: string;
channel: string;
onMessage: (data: unknown) => void;
onState?: (connected: boolean) => void;
}): RealtimeHandle {
const centrifuge = new Centrifuge(opts.url, { token: opts.token });
centrifuge.on("connected", () => opts.onState?.(true));
centrifuge.on("disconnected", () => opts.onState?.(false));
const sub: Subscription = centrifuge.newSubscription(opts.channel);
sub.on("publication", (ctx) => opts.onMessage(ctx.data));
sub.subscribe();
centrifuge.connect();
return {
close: () => {
sub.unsubscribe();
centrifuge.disconnect();
},
};
}onMessage fires for every publication on the channel; onState flips a live/offline
indicator. The returned RealtimeHandle.close() tears the socket down — call it on
unmount so a navigated-away component doesn't leak a connection.
3. Publishing an event
Anywhere on the server — a mutation finishes, a job completes, an agent
posts a finding — you call publish(channel, data). It POSTs to Centrifugo's HTTP API
with the API key and fans the message out to every subscriber:
/**
* Publish a JSON message to a channel through Centrifugo's HTTP API. A no-op
* (returns false) when realtime is disabled, so callers can fire-and-forget
* without guarding. Never throws on a delivery failure — realtime is a
* best-effort side channel, not the source of truth.
*/
export async function publish(channel: string, data: unknown): Promise<boolean> {
if (!isRealtimeEnabled()) return false;
const apiUrl = process.env.CENTRIFUGO_API_URL ?? "http://localhost:8000/api";
try {
const res = await fetch(`${apiUrl}/publish`, {
method: "POST",
headers: {
"content-type": "application/json",
"X-API-Key": process.env.CENTRIFUGO_API_KEY!,
},
body: JSON.stringify({ channel, data }),
signal: AbortSignal.timeout(5000),
});
return res.ok;
} catch {
return false;
}
}Two design choices worth calling out, both visible in those last few lines:
- It never throws. A delivery failure (Centrifugo down, network blip, 5-second
timeout) returns
false. Realtime is a best-effort side channel, not the source of truth — the write already landed in Postgres. A failed push must not fail the request that triggered it. - It returns
falsewhen disabled, so callers can fire-and-forget without guarding.await publish(...)is safe to drop into any mutation; with no backend it's a cheap no-op.
Graceful degradation
Whether realtime is "on" comes down to one predicate, isRealtimeEnabled():
// fusion-realtime/src/index.ts
export function isRealtimeEnabled(): boolean {
return Boolean(process.env.CENTRIFUGO_TOKEN_HMAC_SECRET && process.env.CENTRIFUGO_API_KEY);
}Both env vars present → enabled. Miss either one → disabled, and the consequences are benign by construction:
publish()short-circuits tofalsebefore touching the network.- The example's server function returns
{ enabled: false }, so the page renders a "not configured" hint instead of trying to open a doomed socket. - The app is otherwise unchanged — every page still loads, every mutation still writes. You simply don't get the live push; a manual refresh shows the new state.
Coming from Django? Same instinct as Channels being optional: a Django project runs fine under plain WSGI with no channel layer — you just don't get WebSockets. Here the gate is
CENTRIFUGO_*env vars instead of anasgi.py+CHANNEL_LAYERSsetting, and the fallback is automatic rather than something you wire by hand.
Where the example demos it
The sandbox has a self-contained realtime page that shows the whole handshake. It's also a small case study in keeping a browser-only library out of SSR.
The server function (example/src/lib/realtime-server.ts) checks auth, checks the
gate, and — only when enabled — mints a token for the signed-in user:
import { createServerFn } from "@tanstack/react-start";
import {
isRealtimeEnabled,
issueConnectionToken,
realtimeWsUrl,
} from "@tikab-interactive/fusion-realtime";
import { getSession } from "#/lib/auth";
/**
* Discriminated-union result for the realtime sandbox page. `enabled: false`
* means the CENTRIFUGO_* env vars are missing — the page shows setup hints.
* `enabled: true` carries the minted JWT, the WS URL, and the demo channel.
*/
export type RealtimeConnectionResult =
| { enabled: false }
| { enabled: true; token: string; url: string; channel: string };
/**
* Server function that checks auth, checks whether Centrifugo is configured,
* and — when it is — mints a short-lived connection token for the signed-in
* user. Called once on route load; the token is passed straight to the browser
* component so it can open the WebSocket.
*/
export const getRealtimeConnection = createServerFn({ method: "GET" }).handler(
async (): Promise<RealtimeConnectionResult> => {
const session = await getSession();
if (!session) throw new Response("Unauthorized", { status: 401 });
if (!isRealtimeEnabled()) return { enabled: false } as const;
const channel = "sandbox:demo";
const token = issueConnectionToken(session.user.id);
return { enabled: true, token, url: realtimeWsUrl(), channel } as const;
},
);The discriminated union ({ enabled: false } | { enabled: true; token; url; channel })
makes the two states impossible to confuse on the page: sandbox.realtime.tsx shows
setup hints for the false branch and the live RealtimeDemo for the true branch.
The component (example/src/components/RealtimeDemo.tsx) is where the SSR care
shows. The centrifuge client touches browser-only globals, so a top-level import
would crash server rendering. Instead the import is dynamic, inside the click
handler:
async function connect() {
setConnecting(true);
// Dynamic import — centrifuge must never load during SSR.
const { subscribe } = await import("@tikab-interactive/fusion-realtime/client");
const handle = subscribe({
url,
token,
channel,
onMessage: (data) => {
const id = seqRef.current;
seqRef.current += 1;
setMessages((prev) => [...prev, { id, data }]);
},
onState: (c) => setConnected(c),
});
handleRef.current = handle;
setConnecting(false);
}The demo deliberately does not auto-connect on mount — you click "Connect" so the
socket opens on demand. Even with no Centrifugo running, the page is useful: it proves
the token mint + subscribe wiring (the badge just stays "offline"). Point it at a
live Centrifugo and POST to sandbox:demo and the messages appear in the list.
Coming from React? The dynamic-
import()-in-a-handler pattern is the stack's standard fix for "this library only runs in the browser." See it again in collab — anything that opens a raw WebSocket gets imported lazily, never at module top level.
Why the split is load-bearing
Both fusion-realtime and fusion-collab ship a browser/server stub so importing
the wrong half fails loudly instead of leaking secrets or crashing the bundler. The
realtime server stub (src/browser-stub.ts) reports disabled and throws on the
server-only functions:
/**
* Browser-side stub for @tikab-interactive/fusion-realtime. Token minting and
* HTTP-API publishing are server-only (they hold the HMAC secret and API key).
* The browser uses @tikab-interactive/fusion-realtime/client to subscribe;
* anything here throws if called.
*/
function serverOnly(): never {
throw new Error(
"@tikab-interactive/fusion-realtime is server-only. Use @tikab-interactive/fusion-realtime/client to subscribe in the browser.",
);
}
export function isRealtimeEnabled(): boolean {
return false;
}
export const issueConnectionToken = serverOnly;
export const publish = serverOnly;
export const realtimeWsUrl = serverOnly;Bundlers resolve this stub for browser builds, so the HMAC secret and API key physically cannot reach client code. The error is the safety net if you import the main entry point somewhere it doesn't belong.
Running it locally
Centrifugo is an external service (not baked into the example's docker-compose.yml).
Run one — docker run the official centrifugo/centrifugo image, or any reachable
instance — then set four vars in .env.local:
CENTRIFUGO_TOKEN_HMAC_SECRET=<shared-secret> # must match Centrifugo's token_hmac_secret_key
CENTRIFUGO_API_KEY=<api-key> # must match Centrifugo's api_key
CENTRIFUGO_API_URL=http://localhost:8000/api # server → Centrifugo (publish)
CENTRIFUGO_WS_URL=ws://localhost:8000/connection/websocket # browser → Centrifugo (subscribe)The secret and API key are shared config: your app signs/authenticates with the same values Centrifugo is started with. See Deploy for the production wiring.
The siblings
Realtime sits in a family of optional, env-gated infrastructure packages. Two are worth knowing about; both follow the identical contract — configured → it works, unset → it degrades, the app never depends on the backend being up.
Sibling: fusion-cache — Valkey / Redis key-value cache
A tiny JSON cache over Valkey (the Redis fork), turned on by a
single VALKEY_URL. cacheGet / cacheSet / cacheDelete / cacheTtl in
fusion-cache/src/index.ts give you a TTL'd key-value store; getSecondaryStorage()
hands the same backend to Better Auth so rate limits hold across
instances instead of per-process. Degradation is the whole point: with no VALKEY_URL,
cacheGet returns null and cacheSet is a no-op, and even with Valkey configured
a failure degrades to a cache miss (lazyConnect, maxRetriesPerRequest: 1,
enableOfflineQueue: false) — a down Valkey makes the app slower, never broken. The
example's docker-compose.yml ships a valkey:8-alpine service for local use.
Coming from Django? This is Django's cache framework.
cacheGet/cacheSet≈cache.get/cache.set;VALKEY_URL≈ pointingCACHES["default"]atRedisCache. And like Django'sLocMemCachefallback, the app runs fine with no cache backend at all —getjust always misses.
Sibling: fusion-collab — collaborative editing
Real-time shared-document editing via a CRDT — Yjs over a
y-websocket provider, bound to a ProseKit editor.
createCollabSession({ url, room, user }) in fusion-collab/src/index.ts opens a Yjs
doc that syncs between everyone in the same room; defineCollab(session) is the editor
extension that wires sync + collaborative undo + remote cursors (Yjs owns undo, so you
drop the editor's own history). It's transport + binding only — you bring the schema and
the UI. Degradation here is elegant: with no server url, the session is a purely
local Yjs doc — the editor still works, it just doesn't sync — so the app never
depends on the collab server. Like realtime, it's browser-only and ships a server stub
that throws on import in SSR.
Coming from Django? Collaborative editing is the one piece without a clean Django analogue — the merge logic lives in the CRDT (Yjs), not the server. The server is just a relay (
y-websocket), much like a Channels consumer that rebroadcasts to a group but doesn't itself resolve conflicts.
For the full surface of either package — every function, the degradation matrix, and the live sandbox pages — see Packages and the Sandbox.
When to reach for each
| You want… | Package | Backend |
|---|---|---|
| Push a server-side event to many browsers | fusion-realtime | Centrifugo (WS) |
| A fast shared key-value cache / shared rate limits | fusion-cache | Valkey / Redis |
| Two people editing the same document at once | fusion-collab | y-websocket (Yjs) |
All three are opt-in. None of them are load-bearing for correctness — Postgres remains the source of truth, and the app boots and serves every page with every one of them switched off.
See also
- Packages — the full polyrepo map and where these three sit in the tiers.
- Sandbox — the live
realtime,cache, andcollabdemo pages. - Deploy — wiring Centrifugo and Valkey in production.
- Coming from Django — Channels → realtime, the cache framework → fusion-cache.