import { NextRequest } from "next/server"; import { getServerSession } from "@/lib/auth/guards"; import { prisma } from "@/lib/db"; import { isTerminal } from "@/lib/episodes/status"; export const dynamic = "force-dynamic"; /** * Per-user concurrency cap for SSE streams. Each open connection polls the DB * every 1.5s, so unbounded streams per user are a cheap DoS / resource leak. * This is an in-process counter (one web instance); see the rate-limiter note * about scaling to multiple nodes. */ const MAX_STREAMS_PER_USER = 5; const activeStreams = new Map(); function tryAcquireStream(userId: string): boolean { const current = activeStreams.get(userId) ?? 0; if (current >= MAX_STREAMS_PER_USER) return false; activeStreams.set(userId, current + 1); return true; } function releaseStream(userId: string): void { const current = activeStreams.get(userId) ?? 0; if (current <= 1) activeStreams.delete(userId); else activeStreams.set(userId, current - 1); } /** * Server-Sent Events stream of an episode's generation status. Polls the row * every 1.5s and emits on change until the episode reaches a terminal state. * (LISTEN/NOTIFY is a future optimization; polling is simpler and robust.) */ export async function GET(req: NextRequest, { params }: { params: Promise<{ id: string }> }) { const { id } = await params; const session = await getServerSession(); if (!session) return new Response("Unauthorized", { status: 401 }); const ep = await prisma.episode.findUnique({ where: { id }, select: { userId: true } }); if (!ep) return new Response("Not found", { status: 404 }); if (ep.userId !== session.user.id && session.user.role !== "admin") { return new Response("Forbidden", { status: 403 }); } // Cap concurrent streams per user. Released in EVERY stop path below. const streamUserId = session.user.id; if (!tryAcquireStream(streamUserId)) { return new Response("Too many concurrent streams", { status: 429, headers: { "Retry-After": "5" }, }); } const encoder = new TextEncoder(); // Exposed so the ReadableStream's `cancel` can also release the slot if the // consumer tears down without an abort signal. let stopRef: () => void = () => releaseStream(streamUserId); const stream = new ReadableStream({ start(controller) { let lastSig = ""; let stopped = false; let pollTimer: ReturnType; let pingTimer: ReturnType; const send = (data: unknown) => { if (!stopped) controller.enqueue(encoder.encode(`data: ${JSON.stringify(data)}\n\n`)); }; const stop = () => { if (stopped) return; stopped = true; // Release the per-user stream slot exactly once (terminal status, // abort, not-found, or error all route through here). releaseStream(streamUserId); clearInterval(pollTimer); clearInterval(pingTimer); try { controller.close(); } catch { /* already closed */ } }; const poll = async () => { if (stopped) return; const e = await prisma.episode.findUnique({ where: { id }, select: { status: true, stage: true, errorMessage: true }, }); if (!e) { send({ status: "FAILED", error: "Episode not found" }); stop(); return; } const sig = `${e.status}:${e.stage ?? ""}`; if (sig !== lastSig) { lastSig = sig; send({ status: e.status, stage: e.stage, error: e.errorMessage }); } if (isTerminal(e.status)) stop(); }; stopRef = stop; send({ type: "open" }); void poll(); pollTimer = setInterval(poll, 1500); pingTimer = setInterval(() => { if (!stopped) controller.enqueue(encoder.encode(": ping\n\n")); }, 15000); req.signal.addEventListener("abort", stop); }, cancel() { // Consumer disconnected/cancelled — ensure the slot is released. stopRef(); }, }); return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache, no-transform", Connection: "keep-alive", }, }); }