import { prisma } from "@/lib/db"; const STALE_MS = 60_000; /** Worker calls this on an interval so the admin can tell it's alive. */ export async function recordHeartbeat( name: string, stats?: { queued?: number; running?: number } ): Promise { const now = new Date(); await prisma.workerHeartbeat.upsert({ where: { name }, create: { name, lastBeatAt: now, queued: stats?.queued, running: stats?.running }, update: { lastBeatAt: now, queued: stats?.queued, running: stats?.running }, }); } export interface WorkerHealth { name: string; alive: boolean; lastBeatAt: Date; secondsAgo: number; queued: number | null; running: number | null; } export async function getWorkerHealth(): Promise { const rows = await prisma.workerHeartbeat.findMany({ orderBy: { name: "asc" } }); const now = Date.now(); return rows.map((r) => ({ name: r.name, alive: now - r.lastBeatAt.getTime() < STALE_MS, lastBeatAt: r.lastBeatAt, secondsAgo: Math.round((now - r.lastBeatAt.getTime()) / 1000), queued: r.queued, running: r.running, })); } export interface QueueStat { queue: string; queued: number; active: number; completed: number; retry: number; failed: number; } /** * Per-queue job counts straight from pg-boss's own tables (same Postgres). * Returns [] if the pgboss schema isn't reachable (e.g. worker never started). */ export async function getQueueStats(): Promise { try { const rows = await prisma.$queryRawUnsafe<{ name: string; state: string; n: number }[]>( `SELECT name, state, count(*)::int AS n FROM pgboss.job GROUP BY name, state` ); const map = new Map(); for (const r of rows) { const stat = map.get(r.name) ?? ({ queue: r.name, queued: 0, active: 0, completed: 0, retry: 0, failed: 0 } as QueueStat); if (r.state === "created") stat.queued += r.n; else if (r.state === "active") stat.active += r.n; else if (r.state === "completed") stat.completed += r.n; else if (r.state === "retry") stat.retry += r.n; else if (r.state === "failed") stat.failed += r.n; map.set(r.name, stat); } return Array.from(map.values()).sort((a, b) => a.queue.localeCompare(b.queue)); } catch { return []; } }