"use server"; import { randomBytes } from "node:crypto"; import { revalidatePath } from "next/cache"; import { z } from "zod"; import { getServerSession } from "@/lib/auth/guards"; import { prisma } from "@/lib/db"; import { getEffectivePlan } from "@/lib/billing/subscription"; import { reserveLimit, LimitExceededError } from "@/lib/usage/limits"; import { refundUsage } from "@/lib/usage/meter"; import { enqueueEpisodeGeneration } from "@/lib/queue/pgboss"; import { rateLimit, LIMITS } from "@/lib/ratelimit"; import { isFlagEnabled } from "@/lib/flags"; import { moderateText } from "@/lib/ai/moderation"; import type { UsageMetric } from "@/lib/billing/plans"; import type { GenerationType } from "@/lib/queue/jobs"; import type { Prisma } from "@prisma/client"; const speakerSchema = z.object({ speakerKey: z.string().min(1).max(40), displayName: z.string().min(1).max(60), elevenVoiceId: z.string().min(1).max(60), }); const createSchema = z.object({ title: z.string().max(120).optional(), topic: z.string().min(10, "Describe your topic in a bit more detail").max(2000), tone: z.string().min(1), format: z.enum(["SOLO", "INTERVIEW", "MULTI_HOST"]), language: z.string().min(2).max(5), targetLengthMin: z.number().int().min(1).max(180), audience: z.string().max(200).optional(), speakers: z.array(speakerSchema).min(1).max(6), }); export type CreateEpisodeInput = z.infer; export type CreateEpisodeResult = | { ok: true; episodeId: string } | { ok: false; error: string; limited?: boolean }; export async function createEpisodeAction(input: CreateEpisodeInput): Promise { const session = await getServerSession(); if (!session) return { ok: false, error: "You must be signed in." }; const rl = await rateLimit("generation", session.user.id, LIMITS.generation); if (!rl.ok) { return { ok: false, error: `Too many requests. Try again in ${rl.retryAfterSec}s.` }; } if (!(await isFlagEnabled("episode_generation_enabled"))) { return { ok: false, error: "Episode generation is temporarily paused. Please try again shortly." }; } const parsed = createSchema.safeParse(input); if (!parsed.success) { return { ok: false, error: parsed.error.issues[0]?.message ?? "Invalid input." }; } const data = parsed.data; const activeOrgId = session.session.activeOrganizationId; const { plan, subjectId, subjectType } = await getEffectivePlan(session.user.id, activeOrgId); if (data.targetLengthMin > plan.limits.maxEpisodeMinutes) { return { ok: false, error: `The ${plan.name} plan supports episodes up to ${plan.limits.maxEpisodeMinutes} minutes.`, limited: true, }; } // Screen the requested topic before spending any quota or AI budget. if (await isFlagEnabled("ai_moderation_enabled")) { const mod = await moderateText([data.title, data.topic, data.audience].filter(Boolean).join("\n")); if (mod.flagged) { return { ok: false, error: "This topic may violate our content policy and can't be generated. Please revise it and try again.", }; } } // Reserve quota atomically up front (a full generation consumes script, // audio and art). The worker won't re-meter; we refund below if create/enqueue // fails. See the metering invariant in lib/usage/meter.ts. const reserved: UsageMetric[] = []; const refundReserved = async () => { for (const m of reserved) await refundUsage(subjectId, subjectType, m); }; try { await reserveLimit(session.user.id, "script", activeOrgId); reserved.push("script"); await reserveLimit(session.user.id, "audio", activeOrgId); reserved.push("audio"); await reserveLimit(session.user.id, "art", activeOrgId); reserved.push("art"); } catch (err) { await refundReserved(); if (err instanceof LimitExceededError) { return { ok: false, error: `You've reached your monthly ${err.check.metric} limit on the ${err.check.plan} plan. Upgrade to keep creating.`, limited: true, }; } throw err; } try { const episode = await prisma.episode.create({ data: { userId: session.user.id, organizationId: activeOrgId ?? undefined, title: data.title?.trim() || deriveTitle(data.topic), topic: data.topic, tone: data.tone, format: data.format, language: data.language, targetLengthMin: data.targetLengthMin, audience: data.audience, status: "QUEUED", stage: "Queued for generation", speakers: { create: data.speakers.map((s) => ({ speakerKey: s.speakerKey, displayName: s.displayName, elevenVoiceId: s.elevenVoiceId, })), }, jobs: { create: { type: "full", status: "queued" } }, }, }); await enqueueEpisodeGeneration( { episodeId: episode.id, type: "full" }, { priority: plan.features.includes("priority_generation") ? 10 : 0 } ); revalidatePath("/episodes"); revalidatePath("/dashboard"); return { ok: true, episodeId: episode.id }; } catch (err) { await refundReserved(); throw err; } } export async function regenerateAction( episodeId: string, type: GenerationType ): Promise<{ ok: boolean; error?: string }> { const session = await getServerSession(); if (!session) return { ok: false, error: "You must be signed in." }; const rl = await rateLimit("generation", session.user.id, LIMITS.generation); if (!rl.ok) { return { ok: false, error: `Too many requests. Try again in ${rl.retryAfterSec}s.` }; } if (!(await isFlagEnabled("episode_generation_enabled"))) { return { ok: false, error: "Episode generation is temporarily paused. Please try again shortly." }; } const episode = await prisma.episode.findUnique({ where: { id: episodeId }, select: { userId: true, organizationId: true }, }); if (!episode) return { ok: false, error: "Episode not found." }; if (episode.userId !== session.user.id && session.user.role !== "admin") { return { ok: false, error: "Not allowed." }; } const activeOrgId = session.session.activeOrganizationId; const { subjectId, subjectType } = await getEffectivePlan(session.user.id, activeOrgId); // Reserve the metrics this regeneration will consume up front. The worker // won't re-meter; refund below if enqueue fails. See meter.ts invariant. const metrics: UsageMetric[] = type === "art" ? ["art"] : type === "audio" ? ["audio"] : ["script", "audio"]; const reserved: UsageMetric[] = []; const refundReserved = async () => { for (const m of reserved) await refundUsage(subjectId, subjectType, m); }; try { for (const m of metrics) { await reserveLimit(session.user.id, m, activeOrgId); reserved.push(m); } } catch (err) { await refundReserved(); if (err instanceof LimitExceededError) { return { ok: false, error: `Monthly ${err.check.metric} limit reached on the ${err.check.plan} plan.` }; } throw err; } try { await prisma.episode.update({ where: { id: episodeId }, data: { status: "QUEUED", stage: "Queued for regeneration", errorMessage: null }, }); await prisma.generationJob.create({ data: { episodeId, type, status: "queued" } }); await enqueueEpisodeGeneration({ episodeId, type }); } catch (err) { await refundReserved(); throw err; } revalidatePath(`/episodes/${episodeId}`); return { ok: true }; } const scriptContentSchema = z.object({ title: z.string().min(1), sections: z .array( z.object({ id: z.string().min(1), title: z.string().min(1), turns: z.array(z.object({ speakerKey: z.string(), text: z.string() })).min(1), }) ) .min(1), }); export async function updateScriptAction( episodeId: string, content: unknown ): Promise<{ ok: boolean; error?: string }> { const session = await getServerSession(); if (!session) return { ok: false, error: "You must be signed in." }; const episode = await prisma.episode.findUnique({ where: { id: episodeId }, select: { userId: true }, }); if (!episode || episode.userId !== session.user.id) return { ok: false, error: "Not allowed." }; const parsed = scriptContentSchema.safeParse(content); if (!parsed.success) return { ok: false, error: "Invalid script format." }; await prisma.script.update({ where: { episodeId }, data: { content: parsed.data as unknown as Prisma.InputJsonValue, version: { increment: 1 } }, }); revalidatePath(`/episodes/${episodeId}`); return { ok: true }; } export async function regenerateSectionAction( episodeId: string, sectionId: string ): Promise<{ ok: boolean; error?: string; section?: { id: string; title: string; turns: { speakerKey: string; text: string }[] } }> { const session = await getServerSession(); if (!session) return { ok: false, error: "You must be signed in." }; const episode = await prisma.episode.findUnique({ where: { id: episodeId }, include: { speakers: true, script: true }, }); if (!episode || (episode.userId !== session.user.id && session.user.role !== "admin")) { return { ok: false, error: "Not allowed." }; } if (!episode.script) return { ok: false, error: "No script to edit yet." }; const rl = await rateLimit("generation", session.user.id, LIMITS.generation); if (!rl.ok) { return { ok: false, error: `Too many requests. Try again in ${rl.retryAfterSec}s.` }; } const activeOrgId = session.session.activeOrganizationId; // Reserve the script unit atomically up front. This synchronous action does // NOT increment afterwards (that would double-count) — it refunds on failure. try { await reserveLimit(session.user.id, "script", activeOrgId); } catch (err) { if (err instanceof LimitExceededError) { return { ok: false, error: `Monthly script limit reached on the ${err.check.plan} plan.` }; } throw err; } const ownerId = episode.organizationId ?? episode.userId; const ownerType = episode.organizationId ? "organization" : "user"; // Imported lazily so the AI SDK never reaches client bundles importing this file. const { scriptProvider } = await import("@/lib/ai/providers"); const { recordCost, scriptCostUsd } = await import("@/lib/ai/cost"); const config = { title: episode.title, topic: episode.topic, tone: episode.tone, format: episode.format, language: episode.language, targetLengthMin: episode.targetLengthMin, audience: episode.audience ?? undefined, speakers: episode.speakers.map((s) => ({ speakerKey: s.speakerKey, displayName: s.displayName })), }; const current = episode.script.content as unknown as { title: string; sections: { id: string; title: string; turns: { speakerKey: string; text: string }[] }[]; }; let section: { id: string; title: string; turns: { speakerKey: string; text: string }[] }; let usage: { inputTokens: number; outputTokens: number }; try { ({ section, usage } = await scriptProvider().regenerateSection(config, current, sectionId)); const updated = { ...current, sections: current.sections.map((s) => (s.id === sectionId ? section : s)), }; await prisma.script.update({ where: { episodeId }, data: { content: updated as unknown as Prisma.InputJsonValue, version: { increment: 1 } }, }); } catch (err) { // Generation/save failed — refund the script unit we reserved. await refundUsage(ownerId, ownerType, "script"); throw err; } // No incrementUsage here: the unit was already reserved above. await recordCost({ provider: "openai", operation: "script", units: usage.inputTokens + usage.outputTokens, costUsd: scriptCostUsd(usage), episodeId, userId: episode.userId, }); revalidatePath(`/episodes/${episodeId}`); return { ok: true, section }; } export async function repurposeAction( episodeId: string, format: "blog" | "social_thread" | "newsletter" ): Promise<{ ok: boolean; error?: string; content?: { title: string; body: string } }> { const session = await getServerSession(); if (!session) return { ok: false, error: "You must be signed in." }; const episode = await prisma.episode.findUnique({ where: { id: episodeId }, include: { script: true }, }); if (!episode || (episode.userId !== session.user.id && session.user.role !== "admin")) { return { ok: false, error: "Not allowed." }; } if (!episode.script) return { ok: false, error: "Generate the episode first." }; const rl = await rateLimit("repurpose", session.user.id, LIMITS.repurpose); if (!rl.ok) { return { ok: false, error: `Too many requests. Try again in ${rl.retryAfterSec}s.` }; } const activeOrgId = session.session.activeOrganizationId; // Reserve the repurpose unit atomically up front; refund on failure. This // synchronous action does NOT increment afterwards (that would double-count). try { await reserveLimit(session.user.id, "repurpose", activeOrgId); } catch (err) { if (err instanceof LimitExceededError) { return { ok: false, error: `Monthly repurpose limit reached on the ${err.check.plan} plan.` }; } throw err; } const ownerId = episode.organizationId ?? episode.userId; const ownerType = episode.organizationId ? "organization" : "user"; const { repurposeScript } = await import("@/lib/ai/pipeline/repurpose"); const { recordCost, scriptCostUsd } = await import("@/lib/ai/cost"); let content: { title: string; body: string }; let usage: { inputTokens: number; outputTokens: number }; try { ({ content, usage } = await repurposeScript( episode.script.content as unknown as Parameters[0], format )); await prisma.repurposedContent.create({ data: { episodeId, type: format, content: content as unknown as Prisma.InputJsonValue }, }); } catch (err) { // Generation/save failed — refund the repurpose unit we reserved. await refundUsage(ownerId, ownerType, "repurpose"); throw err; } // No incrementUsage here: the unit was already reserved above. await recordCost({ provider: "openai", operation: "repurpose", units: usage.inputTokens + usage.outputTokens, costUsd: scriptCostUsd(usage), episodeId, userId: episode.userId, }); revalidatePath(`/episodes/${episodeId}/repurpose`); return { ok: true, content }; } export async function deleteEpisodeAction(episodeId: string): Promise<{ ok: boolean; error?: string }> { const session = await getServerSession(); if (!session) return { ok: false, error: "You must be signed in." }; const episode = await prisma.episode.findUnique({ where: { id: episodeId }, select: { userId: true }, }); if (!episode || (episode.userId !== session.user.id && session.user.role !== "admin")) { return { ok: false, error: "Not allowed." }; } await prisma.episode.delete({ where: { id: episodeId } }); revalidatePath("/episodes"); return { ok: true }; } /** * Toggle public sharing for an episode. When enabled, mints a random url-safe * `shareId` (reachable at /p/ with no auth) and stamps `sharedAt`; * when disabled, clears both so the public page 404s. Ownership-checked. */ export async function setEpisodeShareAction( episodeId: string, enabled: boolean ): Promise<{ ok: boolean; error?: string; shareId?: string | null }> { const session = await getServerSession(); if (!session) return { ok: false, error: "You must be signed in." }; const episode = await prisma.episode.findUnique({ where: { id: episodeId }, select: { userId: true, shareId: true, status: true }, }); if (!episode || (episode.userId !== session.user.id && session.user.role !== "admin")) { return { ok: false, error: "Not allowed." }; } if (enabled) { if (episode.status !== "READY") { return { ok: false, error: "Finish generating the episode before sharing it." }; } // Reuse an existing shareId if one was already minted (stable public URL). const shareId = episode.shareId ?? randomShareId(); await prisma.episode.update({ where: { id: episodeId }, data: { shareId, sharedAt: new Date() }, }); revalidatePath(`/episodes/${episodeId}`); return { ok: true, shareId }; } await prisma.episode.update({ where: { id: episodeId }, data: { shareId: null, sharedAt: null }, }); revalidatePath(`/episodes/${episodeId}`); return { ok: true, shareId: null }; } /** url-safe base64 token (~22 chars, 128 bits) for public share links. */ function randomShareId(): string { return randomBytes(16).toString("base64url"); } function deriveTitle(topic: string): string { const trimmed = topic.trim().replace(/\s+/g, " "); return trimmed.length <= 60 ? trimmed : trimmed.slice(0, 57) + "…"; }