import "dotenv/config"; import { getBoss } from "@/lib/queue/pgboss"; import { QUEUES, type GenerateEpisodePayload, type EchoPayload } from "@/lib/queue/jobs"; import { runEpisodeGeneration } from "@/lib/ai/pipeline/generate-episode"; import { setEpisodeStatus } from "@/lib/episodes/status"; const CONCURRENCY = Math.max(1, Number(process.env.WORKER_CONCURRENCY ?? "2")); async function main() { const boss = await getBoss({ supervise: true }); console.log(`[worker] started (concurrency=${CONCURRENCY})`); // Proof-of-loop queue used by health checks / verification. await boss.work(QUEUES.echo, { batchSize: 1 }, async (jobs) => { for (const job of jobs) console.log("[echo]", job.data); }); // Episode generation. batchSize 1 = independent retries per job. await boss.work( QUEUES.generateEpisode, { batchSize: 1, includeMetadata: true }, async (jobs) => { for (const job of jobs) { await handleGenerate(job); } } ); const shutdown = async (signal: string) => { console.log(`[worker] ${signal} — shutting down`); await boss.stop({ graceful: true }); process.exit(0); }; process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGTERM", () => void shutdown("SIGTERM")); } async function handleGenerate(job: { data: GenerateEpisodePayload; retryCount?: number; retryLimit?: number; }) { const { episodeId, type } = job.data; try { await runEpisodeGeneration(episodeId, type ?? "full"); } catch (err) { const message = err instanceof Error ? err.message : String(err); const retryCount = job.retryCount ?? 0; const retryLimit = job.retryLimit ?? 0; const exhausted = retryCount >= retryLimit; console.error( `[generate] ${episodeId} failed (attempt ${retryCount}/${retryLimit}): ${message}` ); if (exhausted) { await setEpisodeStatus(episodeId, "FAILED", { errorMessage: message }); } else { throw err; // let pg-boss retry with backoff } } } main().catch((err) => { console.error("[worker] fatal", err); process.exit(1); });