import "dotenv/config"; import { getBoss } from "@/lib/queue/pgboss"; import { QUEUES, type GenerateEpisodePayload, type EchoPayload } from "@/lib/queue/jobs"; import { runEpisodeGeneration, refundEpisodeUsage } from "@/lib/ai/pipeline/generate-episode"; import { setEpisodeStatus } from "@/lib/episodes/status"; import { recordHeartbeat } from "@/lib/queue/health"; const HEARTBEAT_NAME = "generation-worker"; const CONCURRENCY = Math.max(1, Number(process.env.WORKER_CONCURRENCY ?? "2")); async function main() { const boss = await getBoss({ supervise: true }); console.log(`[worker] started (concurrency=${CONCURRENCY})`); // Liveness heartbeat so the admin health page can tell the worker is running. await recordHeartbeat(HEARTBEAT_NAME).catch(() => {}); setInterval(() => { recordHeartbeat(HEARTBEAT_NAME).catch((e) => console.error("[worker] heartbeat failed", e)); }, 15_000); // Proof-of-loop queue used by health checks / verification. await boss.work(QUEUES.echo, { batchSize: 1 }, async (jobs) => { for (const job of jobs) console.log("[echo]", job.data); }); // Episode generation. batchSize 1 = independent retries per job. await boss.work( QUEUES.generateEpisode, { batchSize: 1, includeMetadata: true }, async (jobs) => { for (const job of jobs) { await handleGenerate(job); } } ); const shutdown = async (signal: string) => { console.log(`[worker] ${signal} — shutting down`); await boss.stop({ graceful: true }); process.exit(0); }; process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGTERM", () => void shutdown("SIGTERM")); } async function handleGenerate(job: { data: GenerateEpisodePayload; retryCount?: number; retryLimit?: number; }) { const { episodeId, type } = job.data; try { await runEpisodeGeneration(episodeId, type ?? "full"); } catch (err) { const message = err instanceof Error ? err.message : String(err); const retryCount = job.retryCount ?? 0; const retryLimit = job.retryLimit ?? 0; const exhausted = retryCount >= retryLimit; console.error( `[generate] ${episodeId} failed (attempt ${retryCount}/${retryLimit}): ${message}` ); if (exhausted) { // Terminal failure: refund the usage the enqueuing caller reserved up // front, so a failed generation doesn't permanently consume quota. // (The worker never increments; see lib/usage/meter.ts invariant.) try { await refundEpisodeUsage(episodeId, type ?? "full"); } catch (refundErr) { console.error(`[generate] ${episodeId} usage refund failed`, refundErr); } await setEpisodeStatus(episodeId, "FAILED", { errorMessage: message }); } else { throw err; // let pg-boss retry with backoff } } } main().catch((err) => { console.error("[worker] fatal", err); process.exit(1); });