import "dotenv/config"; import { getBoss } from "@/lib/queue/pgboss"; import { QUEUES, generateEpisodePayloadSchema, echoPayloadSchema, type GenerateEpisodePayload, type EchoPayload, } from "@/lib/queue/jobs"; import { runEpisodeGeneration, refundEpisodeUsage } from "@/lib/ai/pipeline/generate-episode"; import { setEpisodeStatus } from "@/lib/episodes/status"; import { recordHeartbeat } from "@/lib/queue/health"; const HEARTBEAT_NAME = "generation-worker"; const CONCURRENCY = Math.max(1, Number(process.env.WORKER_CONCURRENCY ?? "2")); async function main() { const boss = await getBoss({ supervise: true }); console.log(`[worker] started (concurrency=${CONCURRENCY})`); // Liveness heartbeat so the admin health page can tell the worker is running. await recordHeartbeat(HEARTBEAT_NAME).catch(() => {}); setInterval(() => { recordHeartbeat(HEARTBEAT_NAME).catch((e) => console.error("[worker] heartbeat failed", e)); }, 15_000); // Proof-of-loop queue used by health checks / verification. await boss.work(QUEUES.echo, { batchSize: 1 }, async (jobs) => { for (const job of jobs) { const parsed = echoPayloadSchema.safeParse(job.data); if (!parsed.success) { console.error("[echo] invalid payload — skipping job", parsed.error.issues); continue; } console.log("[echo]", parsed.data); } }); // Episode generation. batchSize 1 = independent retries per job. await boss.work( QUEUES.generateEpisode, { batchSize: 1, includeMetadata: true }, async (jobs) => { for (const job of jobs) { await handleGenerate(job); } } ); const shutdown = async (signal: string) => { console.log(`[worker] ${signal} — shutting down`); await boss.stop({ graceful: true }); process.exit(0); }; process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGTERM", () => void shutdown("SIGTERM")); } async function handleGenerate(job: { data: GenerateEpisodePayload; retryCount?: number; retryLimit?: number; }) { // Runtime-validate the payload at the consume boundary. An invalid payload is // not retryable, so skip it rather than throwing into the work loop. const parsed = generateEpisodePayloadSchema.safeParse(job.data); if (!parsed.success) { console.error("[generate] invalid payload — skipping job", parsed.error.issues); return; } const { episodeId, type } = parsed.data; try { await runEpisodeGeneration(episodeId, type ?? "full"); } catch (err) { const message = err instanceof Error ? err.message : String(err); const retryCount = job.retryCount ?? 0; const retryLimit = job.retryLimit ?? 0; const exhausted = retryCount >= retryLimit; console.error( `[generate] ${episodeId} failed (attempt ${retryCount}/${retryLimit}): ${message}` ); if (exhausted) { // Terminal failure: refund the usage the enqueuing caller reserved up // front, so a failed generation doesn't permanently consume quota. // (The worker never increments; see lib/usage/meter.ts invariant.) try { await refundEpisodeUsage(episodeId, type ?? "full"); } catch (refundErr) { console.error(`[generate] ${episodeId} usage refund failed`, refundErr); } await setEpisodeStatus(episodeId, "FAILED", { errorMessage: message }); } else { throw err; // let pg-boss retry with backoff } } } main().catch((err) => { console.error("[worker] fatal", err); process.exit(1); });