Comprehensive admin + user dashboards (production-ready)
This commit is contained in:
@@ -5,6 +5,28 @@ import { isTerminal } from "@/lib/episodes/status";
|
||||
|
||||
export const dynamic = "force-dynamic";
|
||||
|
||||
/**
|
||||
* Per-user concurrency cap for SSE streams. Each open connection polls the DB
|
||||
* every 1.5s, so unbounded streams per user are a cheap DoS / resource leak.
|
||||
* This is an in-process counter (one web instance); see the rate-limiter note
|
||||
* about scaling to multiple nodes.
|
||||
*/
|
||||
const MAX_STREAMS_PER_USER = 5;
|
||||
const activeStreams = new Map<string, number>();
|
||||
|
||||
function tryAcquireStream(userId: string): boolean {
|
||||
const current = activeStreams.get(userId) ?? 0;
|
||||
if (current >= MAX_STREAMS_PER_USER) return false;
|
||||
activeStreams.set(userId, current + 1);
|
||||
return true;
|
||||
}
|
||||
|
||||
function releaseStream(userId: string): void {
|
||||
const current = activeStreams.get(userId) ?? 0;
|
||||
if (current <= 1) activeStreams.delete(userId);
|
||||
else activeStreams.set(userId, current - 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Server-Sent Events stream of an episode's generation status. Polls the row
|
||||
* every 1.5s and emits on change until the episode reaches a terminal state.
|
||||
@@ -22,7 +44,19 @@ export async function GET(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
return new Response("Forbidden", { status: 403 });
|
||||
}
|
||||
|
||||
// Cap concurrent streams per user. Released in EVERY stop path below.
|
||||
const streamUserId = session.user.id;
|
||||
if (!tryAcquireStream(streamUserId)) {
|
||||
return new Response("Too many concurrent streams", {
|
||||
status: 429,
|
||||
headers: { "Retry-After": "5" },
|
||||
});
|
||||
}
|
||||
|
||||
const encoder = new TextEncoder();
|
||||
// Exposed so the ReadableStream's `cancel` can also release the slot if the
|
||||
// consumer tears down without an abort signal.
|
||||
let stopRef: () => void = () => releaseStream(streamUserId);
|
||||
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
@@ -38,6 +72,9 @@ export async function GET(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
const stop = () => {
|
||||
if (stopped) return;
|
||||
stopped = true;
|
||||
// Release the per-user stream slot exactly once (terminal status,
|
||||
// abort, not-found, or error all route through here).
|
||||
releaseStream(streamUserId);
|
||||
clearInterval(pollTimer);
|
||||
clearInterval(pingTimer);
|
||||
try {
|
||||
@@ -66,6 +103,8 @@ export async function GET(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
if (isTerminal(e.status)) stop();
|
||||
};
|
||||
|
||||
stopRef = stop;
|
||||
|
||||
send({ type: "open" });
|
||||
void poll();
|
||||
pollTimer = setInterval(poll, 1500);
|
||||
@@ -75,6 +114,10 @@ export async function GET(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
|
||||
req.signal.addEventListener("abort", stop);
|
||||
},
|
||||
cancel() {
|
||||
// Consumer disconnected/cancelled — ensure the slot is released.
|
||||
stopRef();
|
||||
},
|
||||
});
|
||||
|
||||
return new Response(stream, {
|
||||
|
||||
Reference in New Issue
Block a user