Files
podcastdistributiona/worker/index.ts
T

103 lines
3.5 KiB
TypeScript
Raw Normal View History

import "dotenv/config";
import { getBoss } from "@/lib/queue/pgboss";
2026-06-20 20:59:03 -04:00
import {
QUEUES,
generateEpisodePayloadSchema,
echoPayloadSchema,
type GenerateEpisodePayload,
type EchoPayload,
} from "@/lib/queue/jobs";
import { runEpisodeGeneration, refundEpisodeUsage } from "@/lib/ai/pipeline/generate-episode";
import { setEpisodeStatus } from "@/lib/episodes/status";
import { recordHeartbeat } from "@/lib/queue/health";
const HEARTBEAT_NAME = "generation-worker";
const CONCURRENCY = Math.max(1, Number(process.env.WORKER_CONCURRENCY ?? "2"));
async function main() {
const boss = await getBoss({ supervise: true });
console.log(`[worker] started (concurrency=${CONCURRENCY})`);
// Liveness heartbeat so the admin health page can tell the worker is running.
await recordHeartbeat(HEARTBEAT_NAME).catch(() => {});
setInterval(() => {
recordHeartbeat(HEARTBEAT_NAME).catch((e) => console.error("[worker] heartbeat failed", e));
}, 15_000);
// Proof-of-loop queue used by health checks / verification.
await boss.work<EchoPayload>(QUEUES.echo, { batchSize: 1 }, async (jobs) => {
2026-06-20 20:59:03 -04:00
for (const job of jobs) {
const parsed = echoPayloadSchema.safeParse(job.data);
if (!parsed.success) {
console.error("[echo] invalid payload — skipping job", parsed.error.issues);
continue;
}
console.log("[echo]", parsed.data);
}
});
// Episode generation. batchSize 1 = independent retries per job.
await boss.work<GenerateEpisodePayload>(
QUEUES.generateEpisode,
{ batchSize: 1, includeMetadata: true },
async (jobs) => {
for (const job of jobs) {
await handleGenerate(job);
}
}
);
const shutdown = async (signal: string) => {
console.log(`[worker] ${signal} — shutting down`);
await boss.stop({ graceful: true });
process.exit(0);
};
process.on("SIGINT", () => void shutdown("SIGINT"));
process.on("SIGTERM", () => void shutdown("SIGTERM"));
}
async function handleGenerate(job: {
data: GenerateEpisodePayload;
retryCount?: number;
retryLimit?: number;
}) {
2026-06-20 20:59:03 -04:00
// Runtime-validate the payload at the consume boundary. An invalid payload is
// not retryable, so skip it rather than throwing into the work loop.
const parsed = generateEpisodePayloadSchema.safeParse(job.data);
if (!parsed.success) {
console.error("[generate] invalid payload — skipping job", parsed.error.issues);
return;
}
const { episodeId, type } = parsed.data;
try {
await runEpisodeGeneration(episodeId, type ?? "full");
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
const retryCount = job.retryCount ?? 0;
const retryLimit = job.retryLimit ?? 0;
const exhausted = retryCount >= retryLimit;
console.error(
`[generate] ${episodeId} failed (attempt ${retryCount}/${retryLimit}): ${message}`
);
if (exhausted) {
// Terminal failure: refund the usage the enqueuing caller reserved up
// front, so a failed generation doesn't permanently consume quota.
// (The worker never increments; see lib/usage/meter.ts invariant.)
try {
await refundEpisodeUsage(episodeId, type ?? "full");
} catch (refundErr) {
console.error(`[generate] ${episodeId} usage refund failed`, refundErr);
}
await setEpisodeStatus(episodeId, "FAILED", { errorMessage: message });
} else {
throw err; // let pg-boss retry with backoff
}
}
}
main().catch((err) => {
console.error("[worker] fatal", err);
process.exit(1);
});