import PgBoss from "pg-boss"; import { ALL_QUEUES, QUEUES, type GenerateEpisodePayload } from "./jobs"; // One pg-boss instance per process, lazily started. The worker process supervises // (maintenance/scheduling); the web process only sends, so it skips supervision. let instance: PgBoss | null = null; let startup: Promise | null = null; let queuesReady = false; interface BossOptions { /** True only in the worker process. */ supervise?: boolean; } export async function getBoss(opts: BossOptions = {}): Promise { if (instance) return instance; if (!startup) { const boss = new PgBoss({ connectionString: process.env.DATABASE_URL, supervise: opts.supervise ?? false, schedule: opts.supervise ?? false, // Keep the pool small — the web process shares the DB with Prisma. max: opts.supervise ? 10 : 3, }); boss.on("error", (err) => console.error("[pg-boss] error", err)); startup = boss.start().then(async () => { instance = boss; await ensureQueues(boss); return boss; }); } return startup; } /** Create every known queue (idempotent) so send()/work() never race on existence. */ async function ensureQueues(boss: PgBoss): Promise { if (queuesReady) return; for (const name of ALL_QUEUES) { await boss.createQueue(name); } queuesReady = true; } /** Enqueue a full or partial episode generation. */ export async function enqueueEpisodeGeneration( payload: GenerateEpisodePayload, options?: { priority?: number; singletonKey?: string } ): Promise { const boss = await getBoss(); return boss.send(QUEUES.generateEpisode, payload, { retryLimit: 2, retryDelay: 30, retryBackoff: true, expireInMinutes: 30, ...(options?.priority ? { priority: options.priority } : {}), ...(options?.singletonKey ? { singletonKey: options.singletonKey } : {}), }); } export { QUEUES };