Files
podcastdistributiona/lib/ai/pipeline/generate-episode.ts
T

280 lines
10 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { Prisma } from "@prisma/client";
import { prisma } from "@/lib/db";
import { setEpisodeStatus } from "@/lib/episodes/status";
import { scriptProvider, audioProvider, artProvider } from "@/lib/ai/providers";
import { buildCoverPrompt } from "@/lib/ai/providers/openai-art";
import { segmentScript } from "./segment";
import { stitchMp3 } from "./stitch";
import { storage, assetKey } from "@/lib/storage";
import { recordCost, scriptCostUsd, audioCostUsd, artCostUsd } from "@/lib/ai/cost";
import { refundUsage } from "@/lib/usage/meter";
import { isFlagEnabled } from "@/lib/flags";
import { moderateText, moderationReason } from "@/lib/ai/moderation";
import { sendEmail, emailLayout } from "@/lib/email";
import { DEFAULT_VOICE_IDS } from "@/lib/ai/voices";
import type { EpisodeConfig, StructuredScript } from "@/lib/ai/types";
import type { GenerationType } from "@/lib/queue/jobs";
import type { UsageMetric } from "@/lib/billing/plans";
type EpisodeWithRelations = Prisma.EpisodeGetPayload<{
include: { speakers: true; user: true };
}>;
/**
* Usage metrics RESERVED by the enqueuing caller for a given generation `type`.
* The worker uses this only to REFUND on terminal failure — it never increments
* (see the metering invariant in lib/usage/meter.ts). Must stay in sync with the
* reservations made in the create/regenerate paths.
*/
export function reservedMetricsFor(type: GenerationType): UsageMetric[] {
switch (type) {
case "full":
return ["script", "audio", "art"];
case "script":
return ["script"];
case "audio":
return ["audio"];
case "art":
return ["art"];
default:
return [];
}
}
/** Refund every metric reserved for `type` (used by the worker on terminal failure). */
export async function refundEpisodeUsage(
episodeId: string,
type: GenerationType
): Promise<void> {
const episode = await prisma.episode.findUnique({
where: { id: episodeId },
select: { userId: true, organizationId: true },
});
if (!episode) return;
const ownerId = episode.organizationId ?? episode.userId;
const ownerType = episode.organizationId ? "organization" : "user";
for (const metric of reservedMetricsFor(type)) {
await refundUsage(ownerId, ownerType, metric);
}
}
/**
* The episode generation pipeline, run by the worker.
* Stages: script → segment → synthesize → stitch → art → save.
* `type` selects which stages run (full, or a single re-generation).
*
* NOTE: usage is NOT metered here. The enqueuing caller already RESERVED the
* relevant metrics (script/audio/art) up front; on terminal failure the worker
* refunds them. See the metering invariant in lib/usage/meter.ts.
*/
export async function runEpisodeGeneration(
episodeId: string,
type: GenerationType = "full"
): Promise<void> {
const episode = await loadEpisode(episodeId);
const config = toConfig(episode);
if (type === "full" || type === "script") {
await generateScript(episode, config);
}
if (type === "full" || type === "script" || type === "audio") {
await generateAudio(episode);
}
if (type === "full" || type === "art") {
await generateArt(episode);
}
await setEpisodeStatus(episodeId, "SAVING", { stage: "Finalizing your episode" });
// No metering here: usage was reserved at enqueue time. See meter.ts invariant.
await setEpisodeStatus(episodeId, "READY", { stage: "Done" });
await notifyReady(episode);
}
async function loadEpisode(episodeId: string): Promise<EpisodeWithRelations> {
const episode = await prisma.episode.findUnique({
where: { id: episodeId },
include: { speakers: true, user: true },
});
if (!episode) throw new Error(`Episode ${episodeId} not found`);
return episode;
}
function toConfig(episode: EpisodeWithRelations): EpisodeConfig {
const speakers =
episode.speakers.length > 0
? episode.speakers.map((s) => ({ speakerKey: s.speakerKey, displayName: s.displayName }))
: [{ speakerKey: "host", displayName: "Host" }];
return {
title: episode.title,
topic: episode.topic,
tone: episode.tone,
format: episode.format,
language: episode.language,
targetLengthMin: episode.targetLengthMin,
audience: episode.audience ?? undefined,
speakers,
};
}
// ─────────────── Stage 1: script ───────────────
async function generateScript(episode: EpisodeWithRelations, config: EpisodeConfig) {
await setEpisodeStatus(episode.id, "SCRIPTING", { stage: "Writing the script" });
const { script, usage } = await scriptProvider().generate(config);
await prisma.script.upsert({
where: { episodeId: episode.id },
create: {
episodeId: episode.id,
content: script as unknown as Prisma.InputJsonValue,
model: scriptProvider().model,
},
update: { content: script as unknown as Prisma.InputJsonValue, version: { increment: 1 } },
});
// Adopt the generated title when the user didn't set one.
if (!episode.title?.trim() && script.title) {
await prisma.episode.update({ where: { id: episode.id }, data: { title: script.title } });
episode.title = script.title;
}
await recordCost({
provider: "openai",
operation: "script",
units: usage.inputTokens + usage.outputTokens,
costUsd: scriptCostUsd(usage),
episodeId: episode.id,
userId: episode.userId,
});
await flagScriptIfFlagged(episode.id, script);
}
/**
* Screen the generated script with automated moderation. On a violation we queue
* a ContentFlag for admin review (rather than hard-failing the episode the user
* asked for); the admin moderation queue is the consumer. No-op when the
* moderation flag is off or an open flag already exists for the episode.
*/
async function flagScriptIfFlagged(episodeId: string, script: StructuredScript) {
if (!(await isFlagEnabled("ai_moderation_enabled"))) return;
const text = script.sections.flatMap((s) => s.turns.map((t) => t.text)).join("\n");
const result = await moderateText(text);
if (!result.flagged) return;
const existing = await prisma.contentFlag.findFirst({ where: { episodeId, status: "open" } });
if (existing) return;
await prisma.contentFlag.create({
data: { episodeId, reason: moderationReason(result), source: "moderation", severity: "high" },
});
console.warn(`[moderation] flagged episode ${episodeId}: ${result.categories.join(", ")}`);
}
// ─────────────── Stages 24: segment → synthesize → stitch ───────────────
async function generateAudio(episode: EpisodeWithRelations) {
await setEpisodeStatus(episode.id, "SYNTHESIZING", { stage: "Recording the audio" });
const scriptRow = await prisma.script.findUnique({ where: { episodeId: episode.id } });
if (!scriptRow) throw new Error("Cannot synthesize audio before a script exists");
const script = scriptRow.content as unknown as StructuredScript;
const voiceMap: Record<string, string> = {};
for (const s of episode.speakers) voiceMap[s.speakerKey] = s.elevenVoiceId;
const fallback =
episode.speakers[0]?.elevenVoiceId ?? DEFAULT_VOICE_IDS.host;
const provider = audioProvider();
const segments = segmentScript(script, voiceMap, fallback, provider.maxCharsPerRequest);
if (segments.length === 0) throw new Error("Script produced no spoken lines");
const buffers: Buffer[] = [];
let totalChars = 0;
for (const seg of segments) {
const res =
seg.uniqueVoices <= 1
? await provider.synthesizeSpeech(
seg.turns.map((t) => t.text).join(" "),
seg.turns[0].voiceId,
{ language: episode.language }
)
: await provider.synthesizeDialogue(seg.turns, { language: episode.language });
buffers.push(res.audio);
totalChars += res.characters;
}
await setEpisodeStatus(episode.id, "STITCHING", { stage: "Mixing the audio" });
const { data, durationSec } = await stitchMp3(buffers);
const key = assetKey("mp3", `${episode.id}.mp3`);
await storage().put(key, data, "audio/mpeg");
await prisma.audioAsset.upsert({
where: { episodeId: episode.id },
create: {
episodeId: episode.id,
storageKey: key,
durationSec,
sizeBytes: data.length,
segments: { count: segments.length } as Prisma.InputJsonValue,
},
update: {
storageKey: key,
durationSec,
sizeBytes: data.length,
segments: { count: segments.length } as Prisma.InputJsonValue,
},
});
await recordCost({
provider: "elevenlabs",
operation: "audio",
units: totalChars,
costUsd: audioCostUsd(totalChars),
episodeId: episode.id,
userId: episode.userId,
});
}
// ─────────────── Stage 5: cover art ───────────────
async function generateArt(episode: EpisodeWithRelations) {
await setEpisodeStatus(episode.id, "ART", { stage: "Designing the cover art" });
const prompt = buildCoverPrompt(episode.topic, episode.tone, episode.title);
const { data, revisedPrompt } = await artProvider().generateCover(prompt);
const key = assetKey("art", `${episode.id}.png`);
await storage().put(key, data, "image/png");
await prisma.coverArt.upsert({
where: { episodeId: episode.id },
create: { episodeId: episode.id, storageKey: key, prompt: revisedPrompt ?? prompt, model: artProvider().model },
update: { storageKey: key, prompt: revisedPrompt ?? prompt },
});
await recordCost({
provider: "openai",
operation: "art",
units: 1,
costUsd: artCostUsd(1),
episodeId: episode.id,
userId: episode.userId,
});
}
async function notifyReady(episode: EpisodeWithRelations) {
const appUrl = process.env.NEXT_PUBLIC_APP_URL ?? "http://localhost:3000";
try {
await sendEmail({
to: episode.user.email,
subject: `🎙️ "${episode.title}" is ready`,
html: emailLayout(
"Your episode is ready",
`${episode.title}” has finished generating — script, audio, and cover art are all set.`,
{ label: "Open episode", url: `${appUrl}/episodes/${episode.id}` }
),
text: `Your episode "${episode.title}" is ready: ${appUrl}/episodes/${episode.id}`,
});
} catch (err) {
console.error("[notifyReady] email failed (non-fatal)", err);
}
}