Background jobs — Hatchet
Some work has no business happening inside an HTTP request: extracting and embedding a 1000-page PDF,
sweeping old rows at 3 a.m., or waking an agent up every minute to look around. That work runs out
of band, in a separate process, on a schedule — and it has to survive a crash mid-run. This stack
gets all of that from Hatchet, wrapped in the tiny
fusion-jobs package.
Coming from React + Django? This is the Celery chapter. Hatchet ≈ Celery + Celery Beat: a task/workflow engine you enqueue work onto, plus a scheduler for crons.
bun run workeris yourcelery worker. The big difference is that Hatchet is a durable workflow engine you self-host (its own service + Postgres + RabbitMQ), and — crucially for air-gapped installs — when you don't run it, jobs fall back to running inline, exactly likeCELERY_TASK_ALWAYS_EAGER = True.
What Hatchet is
Hatchet is a self-hosted durable task/workflow engine — a scheduler plus a fleet of worker
processes — not a queue you hand-roll out of a table and a polling loop. You declare a workflow (a
named unit of work), register it on a long-lived worker, and then either enqueue a run or let
a cron fire it. The engine persists every run, so a worker that dies mid-task doesn't lose the
work: a healthy worker picks it back up. That durability is the whole reason to reach for it over
"just setTimeout on the server" — which, on a server-rendered app, dies the moment the request that
spawned it returns (see the storage / SSR gotchas).
It is genuinely a service you run. Locally that's three containers in
example/docker-compose.yml:
# ── Hatchet (self-hosted, single-node "lite") — fusion-jobs scheduler ────────
hatchet-postgres: # the engine's OWN Postgres (separate from the app DB)
image: postgres:15.6
# POSTGRES_USER/DB: hatchet …
hatchet-rabbitmq: # its task queue (broker)
image: rabbitmq:3-management
hatchet-lite: # the engine + dashboard, single-node "lite" build
image: ghcr.io/hatchet-dev/hatchet/hatchet-lite:latest
ports:
- "${HATCHET_GRPC_PORT:-7077}:7077" # gRPC — the SDK talks to this
- "${HATCHET_DASHBOARD_PORT:-8888}:8888" # web dashboardThree moving parts: a dedicated Postgres (the engine's own state — not your app database), a
RabbitMQ broker (the actual task queue), and the lite engine (scheduler + a web dashboard on
:8888). Your code talks to the engine over gRPC on :7077. In production the engine is a
managed/standalone deployment; see Deploy.
Coming from Django? RabbitMQ here is the same RabbitMQ you'd point Celery at — it's the broker. What's different is the engine in front of it: Hatchet owns scheduling, retries, timeouts, and run history in its own Postgres, so you query runs in its dashboard rather than spelunking a
django_celery_resultstable.
The package is a 20-line seam
fusion-jobs is deliberately tiny — it exists to keep the heavy gRPC SDK out of the web server's
bundle and to give every caller one boolean to branch on. It ships two entry points (see its
package.json exports), and the split is load-bearing:
/**
* Background jobs are optional. They light up only when `HATCHET_CLIENT_TOKEN`
* is set — the same graceful-degradation contract as the mailer (`SMTP_HOST`
* in @tikab-interactive/fusion-mailer). When unset, the web app boots normally and enqueue
* helpers simply log-and-skip.
*
* This entry imports NOTHING (no Hatchet SDK), so the web server can check the
* flag — and the app's enqueue helpers can guard on it — without pulling the
* gRPC client into the request bundle. The SDK lives behind the separate
* `@tikab-interactive/fusion-jobs/hatchet` entry, which is only ever loaded via dynamic import
* (in the app's enqueue helpers) or by the standalone worker.
*/
export function isJobsEnabled(): boolean {
return Boolean(process.env.HATCHET_CLIENT_TOKEN);
}import { Hatchet } from "@hatchet-dev/typescript-sdk";
/**
* The shared Hatchet client, memoized.
*
* `Hatchet.init()` reads `HATCHET_CLIENT_TOKEN` from the environment and THROWS
* if it's missing — so this is only ever reached on a code path that has
* already checked `isJobsEnabled()` (the worker entry, or enqueue helpers
* behind a dynamic import). The web server never imports this entry statically,
* which keeps the gRPC SDK out of its request bundle.
*
* Both the app's task declarations and its worker entry call this, so they
* share one client instance.
*/
let client: ReturnType<typeof Hatchet.init> | undefined;
export function getHatchet(): ReturnType<typeof Hatchet.init> {
if (!client) {
client = Hatchet.init();
}
return client;
}Two rules fall out of this and explain almost every jobs-related line in the app:
getHatchet()throws without a token.Hatchet.init()readsHATCHET_CLIENT_TOKENand throws if it's missing. So any module that calls it at import time (the worker, the workflow definitions) is worker-only — the web server must never statically import it. That's why the worker and the enqueue helpers reach for it viaawait import("@tikab-interactive/fusion-jobs/hatchet"), after checking the flag.- The browser never sees either. A third entry,
browser-stub.ts, is wired into the"browser"export condition; both functions there areserverOnly()throwers, so an accidental client import fails loudly instead of shipping gRPC to the browser.
Defining and registering a workflow
A workflow is declared by calling getHatchet().task({ ... }) — a name, a hard execution-time cap,
and an async fn that does the work. The simplest real one in the repo is the document-RAG job:
import { getHatchet } from "@tikab-interactive/fusion-jobs/hatchet";
import { processChatDocument } from "#/lib/chat-document-process";
/**
* Worker job that extracts → chunks → embeds an uploaded chat attachment. TRIGGERED (not a
* cron): `uploadChatDocument` enqueues it via `runWorkflow("process-chat-document", {
* documentId })` when Hatchet is enabled; without Hatchet the upload runs
* `processChatDocument` inline instead. WORKER-ONLY (importing calls `getHatchet()`, which
* throws without HATCHET_CLIENT_TOKEN). The 10-minute cap covers extraction + embedding of
* large (1000+ page) PDFs. The job body never throws — `processChatDocument` records
* failures on the row itself.
*/
export const processChatDocumentJob = getHatchet().task({
name: "process-chat-document",
executionTimeout: "600s",
fn: async (input: { documentId: string }) => {
await processChatDocument(input.documentId);
return { documentId: input.documentId, done: true as const };
},
});Declaring the task is half of it; it only runs once a worker is told to host it. Registration is
just listing the exported task objects when you create the worker — there's no decorator magic and no
autodiscovery. getHatchet().task(...) returns a handle; the worker takes an array of handles.
example/src/worker.ts is the entry point, and it shows the gating pattern end to end:
import "#/env"; // load + validate .env.local (HATCHET_CLIENT_TOKEN, DATABASE_URL, …) before anything reads it
import { isJobsEnabled } from "@tikab-interactive/fusion-jobs";
/**
* Standalone background worker: `bun src/worker.ts`. Gated — without
* HATCHET_CLIENT_TOKEN it exits cleanly (the web app runs fine without it). The
* Hatchet SDK and the agent workflow are loaded via dynamic import only AFTER the
* guard, so the gRPC client never loads in a non-worker context.
*/
async function main(): Promise<void> {
if (!isJobsEnabled()) {
console.log("[worker] HATCHET_CLIENT_TOKEN unset — background jobs disabled; nothing to run.");
return;
}
const { getHatchet } = await import("@tikab-interactive/fusion-jobs/hatchet");
const {
proactiveAgentTick,
deliveryRetryTick,
agentRetentionTick,
embeddingBackfillTick,
memoryConsolidationTick,
digestTick,
registerAgentCron,
} = await import("#/lib/agent-jobs");
const { processChatDocumentJob } = await import("#/lib/document-jobs");
const worker = await getHatchet().worker("fusion-example-agent-worker", {
workflows: [
proactiveAgentTick,
deliveryRetryTick,
agentRetentionTick,
embeddingBackfillTick,
memoryConsolidationTick,
digestTick,
processChatDocumentJob,
],
});
await registerAgentCron();
console.log(
"[worker] proactive-agent worker started; crons registered (tick + delivery-retry + retention + backfill + consolidation + digest).",
);
await worker.start();
}
void main();The worker — bun run worker
bun run worker runs example/src/worker.ts as a standalone long-lived process, separate from
the web server. It connects to the Hatchet engine, registers the workflows above, and then
worker.start() blocks forever, pulling assigned runs off the engine and executing their fn. This
is the process that does the actual background work; the web app only ever enqueues.
Two things about it are worth internalising:
- It needs the token to do anything. Without
HATCHET_CLIENT_TOKENthe worker prints the "disabled" line and exits — by design, so the same command is safe to run in a deployment that has no jobs engine. - It loads the Drizzle schema at startup. The workflow
fns pull indband the schema modules (processChatDocument,runAgentTick, the retention sweeps — all hit the database). The worker is a separate Node/Bun process frombun run dev, so it snapshots the schema when it boots.
The "restart the worker after a schema change" gotcha. Because the worker is its own process that imports the schema at startup, editing a table (or running a new migration) does not reach an already-running worker. Stop it and re-run
bun run worker, or jobs will execute against the old shape of the schema. The dev server reloads on edit; the worker does not.
Crons — scheduled workflows
A workflow becomes recurring by calling .cron(name, expression, input) on its task handle. The app
registers six of these from one place, registerAgentCron() in example/src/lib/agent-jobs.ts,
which the worker calls once at startup:
export async function registerAgentCron(): Promise<void> {
await ensureCron(proactiveAgentTick, "proactive-agent-every-minute", "* * * * *");
await ensureCron(deliveryRetryTick, "agent-delivery-retry-every-minute", "* * * * *");
await ensureCron(agentRetentionTick, "agent-retention-daily", "0 3 * * *");
await ensureCron(embeddingBackfillTick, "agent-embedding-backfill", "*/5 * * * *");
await ensureCron(memoryConsolidationTick, "agent-memory-consolidation-daily", "30 3 * * *");
await ensureCron(digestTick, "agent-digest-briefing", "30 7 * * *");
}One wrinkle that the wrapper handles for you: cron registration must be idempotent. Hatchet
rejects re-registering an existing (name, expression) pair with a 400 already exists, which would
crash the worker on every restart. ensureCron swallows exactly that one conflict and rethrows
anything else:
async function ensureCron(
task: { cron: (name: string, expr: string, input: Record<string, never>) => Promise<unknown> },
name: string,
expr: string,
): Promise<void> {
try {
await task.cron(name, expr, {});
} catch (err) {
const detail = `${JSON.stringify((err as { response?: { data?: unknown } })?.response?.data ?? "")} ${String(err)}`;
if (!detail.includes("already exists")) throw err;
}
}The most important cron is the first one. proactive-agent-tick rides this heartbeat — the
* * * * * (every-minute) schedule is what wakes Carola up to look around on her own. The tick's fn
enforces single-flight via a DB advisory lock (so an overrunning run is simply skipped next minute)
and applies a missed-tick catch-up policy if the worker was down. The full mechanism is its own page:
How the heartbeat works.
Coming from Django? This is Celery Beat.
"* * * * *"is the same crontab syntax. The equivalent ofCELERYBEAT_SCHEDULEis thisregisterAgentCron()call — except the schedule lives in the engine's Postgres once registered, andensureCronmakes re-registration on restart a no-op instead of aunique_togetherclash.
The enqueue → worker flow
Triggered (non-cron) jobs are enqueued with getHatchet().admin.runWorkflow(name, input). The
canonical call site is the chat-attachment upload, and it shows the always-safe seam every
enqueue follows — branch on the flag, and only import("…/hatchet") on the enabled path:
/** Kick off processing: a Hatchet job when available, else inline on the long-running SSR runtime. */
async function kickProcessing(id: string): Promise<void> {
if (isJobsEnabled()) {
const { getHatchet } = await import("@tikab-interactive/fusion-jobs/hatchet");
await getHatchet()
.admin.runWorkflow("process-chat-document", { documentId: id })
.catch(() => processChatDocument(id)); // even if the engine hiccups, do the work inline
} else {
await processChatDocument(id); // no engine configured → run it right here
}
}Here is the whole path, from a server function to the worker's fn — and the inline shortcut that
runs the same work when there's no engine:
The server function returns immediately after enqueuing; the heavy work happens on the worker,
durably, with the executionTimeout as a hard cap and the engine retrying transient failures.
The HATCHET_CLIENT_TOKEN gate and graceful degradation
The single environment variable HATCHET_CLIENT_TOKEN decides everything, through one function,
isJobsEnabled():
HATCHET_CLIENT_TOKEN | What happens |
|---|---|
| set (engine reachable) | work is enqueued to the engine and runs on the worker — durable, retried, on a schedule for crons |
| unset | enqueue helpers fall back to inline execution in the calling process — no durability, no retries, no schedule |
This is the same graceful-degradation contract as the rest of the stack (the mailer keys off
SMTP_HOST, the embedder off the AI provider): the feature lights up when configured and no-ops
cleanly when not, so the app always boots. The cost of the inline fallback is honest and worth
stating: a triggered job still runs (you don't lose the work), but it runs inside the request with
no retry and no timeout guarantee — and crons simply don't exist without the engine, because
there's nothing to hold the schedule, so the proactive agent never fires autonomously.
This same gate is what document RAG processing rides: with a worker, a freshly uploaded PDF is extracted, chunked, and embedded on the worker; without one, it's processed inline during the upload request (fine for small files, slow for a 1000-page spec). See Search & RAG for that pipeline.
Coming from Django?
HATCHET_CLIENT_TOKENunset is preciselyCELERY_TASK_ALWAYS_EAGER = True—.runWorkflow(...)executes synchronously in-process instead of being dispatched. The difference: with Celery-eager your beat schedules still wouldn't run either, and that's true here too — no engine means no crons. Set the token (and run the worker) to get real async + a real scheduler.
To turn it on locally: bring the Hatchet containers up (docker compose up), set
HATCHET_CLIENT_TOKEN in .env.local, and run bun run worker alongside bun run dev. The
/sandbox/jobs page shows a live enabled/disabled badge (via getJobsStatus()) and the same
enqueue-seam snippet, so you can confirm at a glance which mode you're in.
Where to go next
- The proactive agent — the
proactive-agent-tickcron in depth: single-flight, missed-tick catch-up, and what a "tick" actually does. - Search & RAG — the
process-chat-documentjob's payload: extract → chunk → embed, and how the inline fallback behaves. - Deploy — running the engine and a worker in the cloud (and the air-gapped story, where the inline fallback keeps everything working with no engine at all).
- Packages —
fusion-jobsalongside the other graceful-degradation seams (fusion-ai,fusion-mailer,fusion-storage).