Skip to content
Fusion

Background jobs — Hatchet

Some work has no business happening inside an HTTP request: extracting and embedding a 1000-page PDF, sweeping old rows at 3 a.m., or waking an agent up every minute to look around. That work runs out of band, in a separate process, on a schedule — and it has to survive a crash mid-run. This stack gets all of that from Hatchet, wrapped in the tiny fusion-jobs package.

Coming from React + Django? This is the Celery chapter. Hatchet ≈ Celery + Celery Beat: a task/workflow engine you enqueue work onto, plus a scheduler for crons. bun run worker is your celery worker. The big difference is that Hatchet is a durable workflow engine you self-host (its own service + Postgres + RabbitMQ), and — crucially for air-gapped installs — when you don't run it, jobs fall back to running inline, exactly like CELERY_TASK_ALWAYS_EAGER = True.

What Hatchet is

Hatchet is a self-hosted durable task/workflow engine — a scheduler plus a fleet of worker processes — not a queue you hand-roll out of a table and a polling loop. You declare a workflow (a named unit of work), register it on a long-lived worker, and then either enqueue a run or let a cron fire it. The engine persists every run, so a worker that dies mid-task doesn't lose the work: a healthy worker picks it back up. That durability is the whole reason to reach for it over "just setTimeout on the server" — which, on a server-rendered app, dies the moment the request that spawned it returns (see the storage / SSR gotchas).

It is genuinely a service you run. Locally that's three containers in example/docker-compose.yml:

example/docker-compose.yml
# ── Hatchet (self-hosted, single-node "lite") — fusion-jobs scheduler ────────
hatchet-postgres: # the engine's OWN Postgres (separate from the app DB)
  image: postgres:15.6
  # POSTGRES_USER/DB: hatchet …
 
hatchet-rabbitmq: # its task queue (broker)
  image: rabbitmq:3-management
 
hatchet-lite: # the engine + dashboard, single-node "lite" build
  image: ghcr.io/hatchet-dev/hatchet/hatchet-lite:latest
  ports:
    - "${HATCHET_GRPC_PORT:-7077}:7077" # gRPC — the SDK talks to this
    - "${HATCHET_DASHBOARD_PORT:-8888}:8888" # web dashboard

Three moving parts: a dedicated Postgres (the engine's own state — not your app database), a RabbitMQ broker (the actual task queue), and the lite engine (scheduler + a web dashboard on :8888). Your code talks to the engine over gRPC on :7077. In production the engine is a managed/standalone deployment; see Deploy.

Coming from Django? RabbitMQ here is the same RabbitMQ you'd point Celery at — it's the broker. What's different is the engine in front of it: Hatchet owns scheduling, retries, timeouts, and run history in its own Postgres, so you query runs in its dashboard rather than spelunking a django_celery_results table.

The package is a 20-line seam

fusion-jobs is deliberately tiny — it exists to keep the heavy gRPC SDK out of the web server's bundle and to give every caller one boolean to branch on. It ships two entry points (see its package.json exports), and the split is load-bearing:

fusion-jobs/src/index.ts
/**
 * Background jobs are optional. They light up only when `HATCHET_CLIENT_TOKEN`
 * is set — the same graceful-degradation contract as the mailer (`SMTP_HOST`
 * in @tikab-interactive/fusion-mailer). When unset, the web app boots normally and enqueue
 * helpers simply log-and-skip.
 *
 * This entry imports NOTHING (no Hatchet SDK), so the web server can check the
 * flag — and the app's enqueue helpers can guard on it — without pulling the
 * gRPC client into the request bundle. The SDK lives behind the separate
 * `@tikab-interactive/fusion-jobs/hatchet` entry, which is only ever loaded via dynamic import
 * (in the app's enqueue helpers) or by the standalone worker.
 */
export function isJobsEnabled(): boolean {
	return Boolean(process.env.HATCHET_CLIENT_TOKEN);
}
fusion-jobs/src/hatchet.ts
import { Hatchet } from "@hatchet-dev/typescript-sdk";
 
/**
 * The shared Hatchet client, memoized.
 *
 * `Hatchet.init()` reads `HATCHET_CLIENT_TOKEN` from the environment and THROWS
 * if it's missing — so this is only ever reached on a code path that has
 * already checked `isJobsEnabled()` (the worker entry, or enqueue helpers
 * behind a dynamic import). The web server never imports this entry statically,
 * which keeps the gRPC SDK out of its request bundle.
 *
 * Both the app's task declarations and its worker entry call this, so they
 * share one client instance.
 */
let client: ReturnType<typeof Hatchet.init> | undefined;
 
export function getHatchet(): ReturnType<typeof Hatchet.init> {
	if (!client) {
		client = Hatchet.init();
	}
	return client;
}

Two rules fall out of this and explain almost every jobs-related line in the app:

  1. getHatchet() throws without a token. Hatchet.init() reads HATCHET_CLIENT_TOKEN and throws if it's missing. So any module that calls it at import time (the worker, the workflow definitions) is worker-only — the web server must never statically import it. That's why the worker and the enqueue helpers reach for it via await import("@tikab-interactive/fusion-jobs/hatchet"), after checking the flag.
  2. The browser never sees either. A third entry, browser-stub.ts, is wired into the "browser" export condition; both functions there are serverOnly() throwers, so an accidental client import fails loudly instead of shipping gRPC to the browser.

Defining and registering a workflow

A workflow is declared by calling getHatchet().task({ ... }) — a name, a hard execution-time cap, and an async fn that does the work. The simplest real one in the repo is the document-RAG job:

example/src/lib/document-jobs.ts
import { getHatchet } from "@tikab-interactive/fusion-jobs/hatchet";
 
import { processChatDocument } from "#/lib/chat-document-process";
 
/**
 * Worker job that extracts → chunks → embeds an uploaded chat attachment. TRIGGERED (not a
 * cron): `uploadChatDocument` enqueues it via `runWorkflow("process-chat-document", {
 * documentId })` when Hatchet is enabled; without Hatchet the upload runs
 * `processChatDocument` inline instead. WORKER-ONLY (importing calls `getHatchet()`, which
 * throws without HATCHET_CLIENT_TOKEN). The 10-minute cap covers extraction + embedding of
 * large (1000+ page) PDFs. The job body never throws — `processChatDocument` records
 * failures on the row itself.
 */
export const processChatDocumentJob = getHatchet().task({
	name: "process-chat-document",
	executionTimeout: "600s",
	fn: async (input: { documentId: string }) => {
		await processChatDocument(input.documentId);
		return { documentId: input.documentId, done: true as const };
	},
});

Declaring the task is half of it; it only runs once a worker is told to host it. Registration is just listing the exported task objects when you create the worker — there's no decorator magic and no autodiscovery. getHatchet().task(...) returns a handle; the worker takes an array of handles.

example/src/worker.ts is the entry point, and it shows the gating pattern end to end:

example/src/worker.ts
import "#/env"; // load + validate .env.local (HATCHET_CLIENT_TOKEN, DATABASE_URL, …) before anything reads it
 
import { isJobsEnabled } from "@tikab-interactive/fusion-jobs";
 
/**
 * Standalone background worker: `bun src/worker.ts`. Gated — without
 * HATCHET_CLIENT_TOKEN it exits cleanly (the web app runs fine without it). The
 * Hatchet SDK and the agent workflow are loaded via dynamic import only AFTER the
 * guard, so the gRPC client never loads in a non-worker context.
 */
async function main(): Promise<void> {
	if (!isJobsEnabled()) {
		console.log("[worker] HATCHET_CLIENT_TOKEN unset — background jobs disabled; nothing to run.");
		return;
	}
	const { getHatchet } = await import("@tikab-interactive/fusion-jobs/hatchet");
	const {
		proactiveAgentTick,
		deliveryRetryTick,
		agentRetentionTick,
		embeddingBackfillTick,
		memoryConsolidationTick,
		digestTick,
		registerAgentCron,
	} = await import("#/lib/agent-jobs");
	const { processChatDocumentJob } = await import("#/lib/document-jobs");
 
	const worker = await getHatchet().worker("fusion-example-agent-worker", {
		workflows: [
			proactiveAgentTick,
			deliveryRetryTick,
			agentRetentionTick,
			embeddingBackfillTick,
			memoryConsolidationTick,
			digestTick,
			processChatDocumentJob,
		],
	});
	await registerAgentCron();
	console.log(
		"[worker] proactive-agent worker started; crons registered (tick + delivery-retry + retention + backfill + consolidation + digest).",
	);
	await worker.start();
}
 
void main();

The worker — bun run worker

bun run worker runs example/src/worker.ts as a standalone long-lived process, separate from the web server. It connects to the Hatchet engine, registers the workflows above, and then worker.start() blocks forever, pulling assigned runs off the engine and executing their fn. This is the process that does the actual background work; the web app only ever enqueues.

Two things about it are worth internalising:

  • It needs the token to do anything. Without HATCHET_CLIENT_TOKEN the worker prints the "disabled" line and exits — by design, so the same command is safe to run in a deployment that has no jobs engine.
  • It loads the Drizzle schema at startup. The workflow fns pull in db and the schema modules (processChatDocument, runAgentTick, the retention sweeps — all hit the database). The worker is a separate Node/Bun process from bun run dev, so it snapshots the schema when it boots.

The "restart the worker after a schema change" gotcha. Because the worker is its own process that imports the schema at startup, editing a table (or running a new migration) does not reach an already-running worker. Stop it and re-run bun run worker, or jobs will execute against the old shape of the schema. The dev server reloads on edit; the worker does not.

Crons — scheduled workflows

A workflow becomes recurring by calling .cron(name, expression, input) on its task handle. The app registers six of these from one place, registerAgentCron() in example/src/lib/agent-jobs.ts, which the worker calls once at startup:

example/src/lib/agent-jobs.ts
export async function registerAgentCron(): Promise<void> {
	await ensureCron(proactiveAgentTick, "proactive-agent-every-minute", "* * * * *");
	await ensureCron(deliveryRetryTick, "agent-delivery-retry-every-minute", "* * * * *");
	await ensureCron(agentRetentionTick, "agent-retention-daily", "0 3 * * *");
	await ensureCron(embeddingBackfillTick, "agent-embedding-backfill", "*/5 * * * *");
	await ensureCron(memoryConsolidationTick, "agent-memory-consolidation-daily", "30 3 * * *");
	await ensureCron(digestTick, "agent-digest-briefing", "30 7 * * *");
}

One wrinkle that the wrapper handles for you: cron registration must be idempotent. Hatchet rejects re-registering an existing (name, expression) pair with a 400 already exists, which would crash the worker on every restart. ensureCron swallows exactly that one conflict and rethrows anything else:

example/src/lib/agent-jobs.ts
async function ensureCron(
	task: { cron: (name: string, expr: string, input: Record<string, never>) => Promise<unknown> },
	name: string,
	expr: string,
): Promise<void> {
	try {
		await task.cron(name, expr, {});
	} catch (err) {
		const detail = `${JSON.stringify((err as { response?: { data?: unknown } })?.response?.data ?? "")} ${String(err)}`;
		if (!detail.includes("already exists")) throw err;
	}
}

The most important cron is the first one. proactive-agent-tick rides this heartbeat — the * * * * * (every-minute) schedule is what wakes Carola up to look around on her own. The tick's fn enforces single-flight via a DB advisory lock (so an overrunning run is simply skipped next minute) and applies a missed-tick catch-up policy if the worker was down. The full mechanism is its own page: How the heartbeat works.

Coming from Django? This is Celery Beat. "* * * * *" is the same crontab syntax. The equivalent of CELERYBEAT_SCHEDULE is this registerAgentCron() call — except the schedule lives in the engine's Postgres once registered, and ensureCron makes re-registration on restart a no-op instead of a unique_together clash.

The enqueue → worker flow

Triggered (non-cron) jobs are enqueued with getHatchet().admin.runWorkflow(name, input). The canonical call site is the chat-attachment upload, and it shows the always-safe seam every enqueue follows — branch on the flag, and only import("…/hatchet") on the enabled path:

example/src/lib/chat-document-server.ts
/** Kick off processing: a Hatchet job when available, else inline on the long-running SSR runtime. */
async function kickProcessing(id: string): Promise<void> {
	if (isJobsEnabled()) {
		const { getHatchet } = await import("@tikab-interactive/fusion-jobs/hatchet");
		await getHatchet()
			.admin.runWorkflow("process-chat-document", { documentId: id })
			.catch(() => processChatDocument(id)); // even if the engine hiccups, do the work inline
	} else {
		await processChatDocument(id); // no engine configured → run it right here
	}
}

Here is the whole path, from a server function to the worker's fn — and the inline shortcut that runs the same work when there's no engine:

Loading diagram...

The server function returns immediately after enqueuing; the heavy work happens on the worker, durably, with the executionTimeout as a hard cap and the engine retrying transient failures.

The HATCHET_CLIENT_TOKEN gate and graceful degradation

The single environment variable HATCHET_CLIENT_TOKEN decides everything, through one function, isJobsEnabled():

HATCHET_CLIENT_TOKENWhat happens
set (engine reachable)work is enqueued to the engine and runs on the worker — durable, retried, on a schedule for crons
unsetenqueue helpers fall back to inline execution in the calling process — no durability, no retries, no schedule

This is the same graceful-degradation contract as the rest of the stack (the mailer keys off SMTP_HOST, the embedder off the AI provider): the feature lights up when configured and no-ops cleanly when not, so the app always boots. The cost of the inline fallback is honest and worth stating: a triggered job still runs (you don't lose the work), but it runs inside the request with no retry and no timeout guarantee — and crons simply don't exist without the engine, because there's nothing to hold the schedule, so the proactive agent never fires autonomously.

This same gate is what document RAG processing rides: with a worker, a freshly uploaded PDF is extracted, chunked, and embedded on the worker; without one, it's processed inline during the upload request (fine for small files, slow for a 1000-page spec). See Search & RAG for that pipeline.

Coming from Django? HATCHET_CLIENT_TOKEN unset is precisely CELERY_TASK_ALWAYS_EAGER = True.runWorkflow(...) executes synchronously in-process instead of being dispatched. The difference: with Celery-eager your beat schedules still wouldn't run either, and that's true here too — no engine means no crons. Set the token (and run the worker) to get real async + a real scheduler.

To turn it on locally: bring the Hatchet containers up (docker compose up), set HATCHET_CLIENT_TOKEN in .env.local, and run bun run worker alongside bun run dev. The /sandbox/jobs page shows a live enabled/disabled badge (via getJobsStatus()) and the same enqueue-seam snippet, so you can confirm at a glance which mode you're in.


Where to go next

  • The proactive agent — the proactive-agent-tick cron in depth: single-flight, missed-tick catch-up, and what a "tick" actually does.
  • Search & RAG — the process-chat-document job's payload: extract → chunk → embed, and how the inline fallback behaves.
  • Deploy — running the engine and a worker in the cloud (and the air-gapped story, where the inline fallback keeps everything working with no engine at all).
  • Packagesfusion-jobs alongside the other graceful-degradation seams (fusion-ai, fusion-mailer, fusion-storage).