import { Prisma } from "@prisma/client"; import { prisma } from "@/lib/db"; import { setEpisodeStatus } from "@/lib/episodes/status"; import { scriptProvider, audioProvider, artProvider } from "@/lib/ai/providers"; import { buildCoverPrompt } from "@/lib/ai/providers/openai-art"; import { segmentScript } from "./segment"; import { stitchMp3 } from "./stitch"; import { storage, assetKey } from "@/lib/storage"; import { recordCost, scriptCostUsd, audioCostUsd, artCostUsd } from "@/lib/ai/cost"; import { refundUsage } from "@/lib/usage/meter"; import { isFlagEnabled } from "@/lib/flags"; import { moderateText, moderationReason } from "@/lib/ai/moderation"; import { sendEmail, emailLayout } from "@/lib/email"; import { DEFAULT_VOICE_IDS } from "@/lib/ai/voices"; import type { EpisodeConfig, StructuredScript } from "@/lib/ai/types"; import type { GenerationType } from "@/lib/queue/jobs"; import type { UsageMetric } from "@/lib/billing/plans"; type EpisodeWithRelations = Prisma.EpisodeGetPayload<{ include: { speakers: true; user: true }; }>; /** * Usage metrics RESERVED by the enqueuing caller for a given generation `type`. * The worker uses this only to REFUND on terminal failure — it never increments * (see the metering invariant in lib/usage/meter.ts). Must stay in sync with the * reservations made in the create/regenerate paths. */ export function reservedMetricsFor(type: GenerationType): UsageMetric[] { switch (type) { case "full": return ["script", "audio", "art"]; case "script": return ["script"]; case "audio": return ["audio"]; case "art": return ["art"]; default: return []; } } /** Refund every metric reserved for `type` (used by the worker on terminal failure). */ export async function refundEpisodeUsage( episodeId: string, type: GenerationType ): Promise { const episode = await prisma.episode.findUnique({ where: { id: episodeId }, select: { userId: true, organizationId: true }, }); if (!episode) return; const ownerId = episode.organizationId ?? episode.userId; const ownerType = episode.organizationId ? "organization" : "user"; for (const metric of reservedMetricsFor(type)) { await refundUsage(ownerId, ownerType, metric); } } /** * The episode generation pipeline, run by the worker. * Stages: script → segment → synthesize → stitch → art → save. * `type` selects which stages run (full, or a single re-generation). * * NOTE: usage is NOT metered here. The enqueuing caller already RESERVED the * relevant metrics (script/audio/art) up front; on terminal failure the worker * refunds them. See the metering invariant in lib/usage/meter.ts. */ export async function runEpisodeGeneration( episodeId: string, type: GenerationType = "full" ): Promise { const episode = await loadEpisode(episodeId); const config = toConfig(episode); if (type === "full" || type === "script") { await generateScript(episode, config); } if (type === "full" || type === "script" || type === "audio") { await generateAudio(episode); } if (type === "full" || type === "art") { await generateArt(episode); } await setEpisodeStatus(episodeId, "SAVING", { stage: "Finalizing your episode" }); // No metering here: usage was reserved at enqueue time. See meter.ts invariant. await setEpisodeStatus(episodeId, "READY", { stage: "Done" }); await notifyReady(episode); } async function loadEpisode(episodeId: string): Promise { const episode = await prisma.episode.findUnique({ where: { id: episodeId }, include: { speakers: true, user: true }, }); if (!episode) throw new Error(`Episode ${episodeId} not found`); return episode; } function toConfig(episode: EpisodeWithRelations): EpisodeConfig { const speakers = episode.speakers.length > 0 ? episode.speakers.map((s) => ({ speakerKey: s.speakerKey, displayName: s.displayName })) : [{ speakerKey: "host", displayName: "Host" }]; return { title: episode.title, topic: episode.topic, tone: episode.tone, format: episode.format, language: episode.language, targetLengthMin: episode.targetLengthMin, audience: episode.audience ?? undefined, speakers, }; } // ─────────────── Stage 1: script ─────────────── async function generateScript(episode: EpisodeWithRelations, config: EpisodeConfig) { await setEpisodeStatus(episode.id, "SCRIPTING", { stage: "Writing the script" }); const { script, usage } = await scriptProvider().generate(config); await prisma.script.upsert({ where: { episodeId: episode.id }, create: { episodeId: episode.id, content: script as unknown as Prisma.InputJsonValue, model: scriptProvider().model, }, update: { content: script as unknown as Prisma.InputJsonValue, version: { increment: 1 } }, }); // Adopt the generated title when the user didn't set one. if (!episode.title?.trim() && script.title) { await prisma.episode.update({ where: { id: episode.id }, data: { title: script.title } }); episode.title = script.title; } await recordCost({ provider: "openai", operation: "script", units: usage.inputTokens + usage.outputTokens, costUsd: scriptCostUsd(usage), episodeId: episode.id, userId: episode.userId, }); await flagScriptIfFlagged(episode.id, script); } /** * Screen the generated script with automated moderation. On a violation we queue * a ContentFlag for admin review (rather than hard-failing the episode the user * asked for); the admin moderation queue is the consumer. No-op when the * moderation flag is off or an open flag already exists for the episode. */ async function flagScriptIfFlagged(episodeId: string, script: StructuredScript) { if (!(await isFlagEnabled("ai_moderation_enabled"))) return; const text = script.sections.flatMap((s) => s.turns.map((t) => t.text)).join("\n"); const result = await moderateText(text); if (!result.flagged) return; const existing = await prisma.contentFlag.findFirst({ where: { episodeId, status: "open" } }); if (existing) return; await prisma.contentFlag.create({ data: { episodeId, reason: moderationReason(result), source: "moderation", severity: "high" }, }); console.warn(`[moderation] flagged episode ${episodeId}: ${result.categories.join(", ")}`); } // ─────────────── Stages 2–4: segment → synthesize → stitch ─────────────── async function generateAudio(episode: EpisodeWithRelations) { await setEpisodeStatus(episode.id, "SYNTHESIZING", { stage: "Recording the audio" }); const scriptRow = await prisma.script.findUnique({ where: { episodeId: episode.id } }); if (!scriptRow) throw new Error("Cannot synthesize audio before a script exists"); const script = scriptRow.content as unknown as StructuredScript; const voiceMap: Record = {}; for (const s of episode.speakers) voiceMap[s.speakerKey] = s.elevenVoiceId; const fallback = episode.speakers[0]?.elevenVoiceId ?? DEFAULT_VOICE_IDS.host; const provider = audioProvider(); const segments = segmentScript(script, voiceMap, fallback, provider.maxCharsPerRequest); if (segments.length === 0) throw new Error("Script produced no spoken lines"); const buffers: Buffer[] = []; let totalChars = 0; for (const seg of segments) { const res = seg.uniqueVoices <= 1 ? await provider.synthesizeSpeech( seg.turns.map((t) => t.text).join(" "), seg.turns[0].voiceId, { language: episode.language } ) : await provider.synthesizeDialogue(seg.turns, { language: episode.language }); buffers.push(res.audio); totalChars += res.characters; } await setEpisodeStatus(episode.id, "STITCHING", { stage: "Mixing the audio" }); const { data, durationSec } = await stitchMp3(buffers); const key = assetKey("mp3", `${episode.id}.mp3`); await storage().put(key, data, "audio/mpeg"); await prisma.audioAsset.upsert({ where: { episodeId: episode.id }, create: { episodeId: episode.id, storageKey: key, durationSec, sizeBytes: data.length, segments: { count: segments.length } as Prisma.InputJsonValue, }, update: { storageKey: key, durationSec, sizeBytes: data.length, segments: { count: segments.length } as Prisma.InputJsonValue, }, }); await recordCost({ provider: "elevenlabs", operation: "audio", units: totalChars, costUsd: audioCostUsd(totalChars), episodeId: episode.id, userId: episode.userId, }); } // ─────────────── Stage 5: cover art ─────────────── async function generateArt(episode: EpisodeWithRelations) { await setEpisodeStatus(episode.id, "ART", { stage: "Designing the cover art" }); const prompt = buildCoverPrompt(episode.topic, episode.tone, episode.title); const { data, revisedPrompt } = await artProvider().generateCover(prompt); const key = assetKey("art", `${episode.id}.png`); await storage().put(key, data, "image/png"); await prisma.coverArt.upsert({ where: { episodeId: episode.id }, create: { episodeId: episode.id, storageKey: key, prompt: revisedPrompt ?? prompt, model: artProvider().model }, update: { storageKey: key, prompt: revisedPrompt ?? prompt }, }); await recordCost({ provider: "openai", operation: "art", units: 1, costUsd: artCostUsd(1), episodeId: episode.id, userId: episode.userId, }); } async function notifyReady(episode: EpisodeWithRelations) { const appUrl = process.env.NEXT_PUBLIC_APP_URL || "http://localhost:3000"; try { await sendEmail({ to: episode.user.email, subject: `🎙️ "${episode.title}" is ready`, html: emailLayout( "Your episode is ready", `“${episode.title}” has finished generating — script, audio, and cover art are all set.`, { label: "Open episode", url: `${appUrl}/episodes/${episode.id}` } ), text: `Your episode "${episode.title}" is ready: ${appUrl}/episodes/${episode.id}`, }); } catch (err) { console.error("[notifyReady] email failed (non-fatal)", err); } }