131 lines
4.2 KiB
TypeScript
131 lines
4.2 KiB
TypeScript
import { NextRequest } from "next/server";
|
|
import { getServerSession } from "@/lib/auth/guards";
|
|
import { prisma } from "@/lib/db";
|
|
import { isTerminal } from "@/lib/episodes/status";
|
|
|
|
export const dynamic = "force-dynamic";
|
|
|
|
/**
|
|
* Per-user concurrency cap for SSE streams. Each open connection polls the DB
|
|
* every 1.5s, so unbounded streams per user are a cheap DoS / resource leak.
|
|
* This is an in-process counter (one web instance); see the rate-limiter note
|
|
* about scaling to multiple nodes.
|
|
*/
|
|
const MAX_STREAMS_PER_USER = 5;
|
|
const activeStreams = new Map<string, number>();
|
|
|
|
function tryAcquireStream(userId: string): boolean {
|
|
const current = activeStreams.get(userId) ?? 0;
|
|
if (current >= MAX_STREAMS_PER_USER) return false;
|
|
activeStreams.set(userId, current + 1);
|
|
return true;
|
|
}
|
|
|
|
function releaseStream(userId: string): void {
|
|
const current = activeStreams.get(userId) ?? 0;
|
|
if (current <= 1) activeStreams.delete(userId);
|
|
else activeStreams.set(userId, current - 1);
|
|
}
|
|
|
|
/**
|
|
* Server-Sent Events stream of an episode's generation status. Polls the row
|
|
* every 1.5s and emits on change until the episode reaches a terminal state.
|
|
* (LISTEN/NOTIFY is a future optimization; polling is simpler and robust.)
|
|
*/
|
|
export async function GET(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
|
|
const { id } = await params;
|
|
|
|
const session = await getServerSession();
|
|
if (!session) return new Response("Unauthorized", { status: 401 });
|
|
|
|
const ep = await prisma.episode.findUnique({ where: { id }, select: { userId: true } });
|
|
if (!ep) return new Response("Not found", { status: 404 });
|
|
if (ep.userId !== session.user.id && session.user.role !== "admin") {
|
|
return new Response("Forbidden", { status: 403 });
|
|
}
|
|
|
|
// Cap concurrent streams per user. Released in EVERY stop path below.
|
|
const streamUserId = session.user.id;
|
|
if (!tryAcquireStream(streamUserId)) {
|
|
return new Response("Too many concurrent streams", {
|
|
status: 429,
|
|
headers: { "Retry-After": "5" },
|
|
});
|
|
}
|
|
|
|
const encoder = new TextEncoder();
|
|
// Exposed so the ReadableStream's `cancel` can also release the slot if the
|
|
// consumer tears down without an abort signal.
|
|
let stopRef: () => void = () => releaseStream(streamUserId);
|
|
|
|
const stream = new ReadableStream({
|
|
start(controller) {
|
|
let lastSig = "";
|
|
let stopped = false;
|
|
let pollTimer: ReturnType<typeof setInterval>;
|
|
let pingTimer: ReturnType<typeof setInterval>;
|
|
|
|
const send = (data: unknown) => {
|
|
if (!stopped) controller.enqueue(encoder.encode(`data: ${JSON.stringify(data)}\n\n`));
|
|
};
|
|
|
|
const stop = () => {
|
|
if (stopped) return;
|
|
stopped = true;
|
|
// Release the per-user stream slot exactly once (terminal status,
|
|
// abort, not-found, or error all route through here).
|
|
releaseStream(streamUserId);
|
|
clearInterval(pollTimer);
|
|
clearInterval(pingTimer);
|
|
try {
|
|
controller.close();
|
|
} catch {
|
|
/* already closed */
|
|
}
|
|
};
|
|
|
|
const poll = async () => {
|
|
if (stopped) return;
|
|
const e = await prisma.episode.findUnique({
|
|
where: { id },
|
|
select: { status: true, stage: true, errorMessage: true },
|
|
});
|
|
if (!e) {
|
|
send({ status: "FAILED", error: "Episode not found" });
|
|
stop();
|
|
return;
|
|
}
|
|
const sig = `${e.status}:${e.stage ?? ""}`;
|
|
if (sig !== lastSig) {
|
|
lastSig = sig;
|
|
send({ status: e.status, stage: e.stage, error: e.errorMessage });
|
|
}
|
|
if (isTerminal(e.status)) stop();
|
|
};
|
|
|
|
stopRef = stop;
|
|
|
|
send({ type: "open" });
|
|
void poll();
|
|
pollTimer = setInterval(poll, 1500);
|
|
pingTimer = setInterval(() => {
|
|
if (!stopped) controller.enqueue(encoder.encode(": ping\n\n"));
|
|
}, 15000);
|
|
|
|
req.signal.addEventListener("abort", stop);
|
|
},
|
|
cancel() {
|
|
// Consumer disconnected/cancelled — ensure the slot is released.
|
|
stopRef();
|
|
},
|
|
});
|
|
|
|
return new Response(stream, {
|
|
headers: {
|
|
"Content-Type": "text/event-stream",
|
|
"Cache-Control": "no-cache, no-transform",
|
|
Connection: "keep-alive",
|
|
},
|
|
});
|
|
}
|