'use strict'; const citty = require('citty'); const fs = require('fs'); const dotenv = require('dotenv'); const zod = require('zod'); const signature = require('@slay-pics/signature'); const graphileWorker = require('graphile-worker'); require('pg'); function _interopNamespaceCompat(e) { if (e && typeof e === 'object' && 'default' in e) return e; const n = Object.create(null); if (e) { for (const k in e) { n[k] = e[k]; } } n.default = e; return n; } const dotenv__namespace = /*#__PURE__*/_interopNamespaceCompat(dotenv); const PayloadSchema = zod.z.object({ url: zod.z.string().url(), retry: zod.z.number(), event: zod.z.string(), data: zod.z.any().optional(), cache: zod.z.any().optional() }); async function httpHandler(payload, helpers) { const validPayload = PayloadSchema.safeParse(payload); if (!validPayload.success) { console.log(payload); console.error("Invalid payload"); return; } const data = { job_key: helpers.job.key ?? "cron", retry: helpers.job.attempts, time: (/* @__PURE__ */ new Date()).getTime(), event: payload.event, data: payload.data, cache: payload.cache }; const sig = signature.calcSig(data, process.env.SLAY_Q_SECRET); const res = await fetch(payload.url, { method: "POST", headers: { "Content-Type": "application/json", "X-SlayQ-Signature": sig }, body: JSON.stringify(data) }); if ([200, 666, 404].includes(res.status)) { return; } throw new Error("Error: " + res.status + " " + res.statusText); } const migrations = { "20240110.up.sql": String.raw`------------------------------------------------------------------------------------------------------------------------ -- Create slayq schema ------------------------------------------------------------------------------------------------------------------------ create schema if not exists slayq; ------------------------------------------------------------------------------------------------------------------------ -- Migrations table ------------------------------------------------------------------------------------------------------------------------ create table if not exists slayq._migrations ( id bigserial primary key, migration text not null unique, created_at timestamptz not null default now() ); ------------------------------------------------------------------------------------------------------------------------ -- Table for tasks waiting for an event to run ------------------------------------------------------------------------------------------------------------------------ create table if not exists slayq._private_waiting_on ( id bigserial primary key, job_id bigint not null references graphile_worker._private_jobs(id) on delete cascade, event text, match_key text, match_value text, step_name text, expires_at timestamptz not null ); ------------------------------------------------------------------------------------------------------------------------ -- Table for tasks that have invoked another and are waiting for results ------------------------------------------------------------------------------------------------------------------------ create table if not exists slayq._private_invoker ( id bigserial primary key, job_id bigint not null references graphile_worker._private_jobs(id) on delete cascade, job_key text not null, step_name text ); ------------------------------------------------------------------------------------------------------------------------ -- add_worker_job - exposes graphile_worker.add_job to public schema ------------------------------------------------------------------------------------------------------------------------ -- drop function if exists add_worker_job; create or replace function public.add_worker_job ( identifier text, payload json DEFAULT NULL::json, queue_name text DEFAULT NULL::text, run_at timestamp with time zone DEFAULT NULL::timestamp with time zone, max_attempts integer DEFAULT NULL::integer, job_key text DEFAULT NULL::text, priority integer DEFAULT NULL::integer, flags text[] DEFAULT NULL::text[], job_key_mode text DEFAULT 'replace'::text ) returns void security definer language plpgsql as $$ begin perform graphile_worker.add_job(identifier, payload, queue_name, run_at, max_attempts, job_key, priority, flags, job_key_mode); end $$; ------------------------------------------------------------------------------------------------------------------------ -- wait_for_worker_event - Waits for the specified event to fire ------------------------------------------------------------------------------------------------------------------------ -- drop function if exists wait_for_worker_event; create or replace function public.wait_for_worker_event(_job_key text, _event text, _match_key text, _match_value text, _step_name text, _expires_at timestamptz) returns void as $$ declare _job_id bigint; begin select into _job_id id from graphile_worker._private_jobs where key = _job_key; if _job_id is null then raise exception 'Job not found: %', _job_key; end if; insert into slayq._private_waiting_on (job_id, event, match_key, match_value, step_name, expires_at) values (_job_id, _event, _match_key, _match_value, _step_name, _expires_at); end $$ language plpgsql security definer; ------------------------------------------------------------------------------------------------------------------------ -- wait_for_invocation - Invokes specified event and pauses caller until done ------------------------------------------------------------------------------------------------------------------------ -- drop function if exists wait_for_invocation; create or replace function wait_for_invocation(_invoker_job_key text, _target_job_key text, _step_name text) returns void as $$ declare _job_id bigint; begin select into _job_id id from graphile_worker._private_jobs where key = _invoker_job_key; if _job_id is null then raise exception 'Job not found: %', _invoker_job_key; end if; insert into slayq._private_invoker (job_id, job_key, step_name) values (_job_id, _target_job_key, _step_name); end $$ language plpgsql security definer; ------------------------------------------------------------------------------------------------------------------------ -- trigger_waiting_invoker - Called when invoked function is done ------------------------------------------------------------------------------------------------------------------------ -- drop function if exists trigger_waiting_invoker; create or replace function trigger_waiting_invoker(_job_key text, _result json) returns void as $$ declare _job_id bigint; _waiting_id bigint; _step_name text; begin loop select into _waiting_id, _job_id, _step_name id, job_id, step_name from slayq._private_invoker where slayq._private_invoker.job_key = _job_key order by id limit 1; if _waiting_id is null then exit; end if; update graphile_worker._private_jobs set run_at = now(), payload = jsonb_set(coalesce(payload::jsonb, '{}'::jsonb), concat('{cache,', _step_name, '}')::text[], _result::jsonb)::json where id = _job_id; delete from slayq._private_invoker where id = _waiting_id; end loop; end $$ language plpgsql security definer; ------------------------------------------------------------------------------------------------------------------------ -- trigger_waiting_workers - Reschedules any waiting events for immeadiate execution ------------------------------------------------------------------------------------------------------------------------ -- drop function if exists trigger_waiting_workers; create or replace function trigger_waiting_workers(_event text, _match_key text, _match_value text) returns void as $$ declare _job_id bigint; _waiting_id bigint; _step_name text; begin loop select into _waiting_id, _job_id, _step_name id, job_id, step_name from slayq._private_waiting_on where slayq._private_waiting_on.event = _event and match_key = _match_key and match_value = _match_value and expires_at >= now() order by id limit 1; if _waiting_id is null then exit; end if; update graphile_worker._private_jobs set run_at = now(), payload = jsonb_set(coalesce(payload::jsonb, '{}'::jsonb), concat('{cache,', _step_name, '}')::text[], 'true')::json where id = _job_id; delete from slayq._private_waiting_on where id = _waiting_id; end loop; end $$ language plpgsql security definer; ------------------------------------------------------------------------------------------------------------------------ -- cancel_pending_tasks - Cancels any pending tasks ------------------------------------------------------------------------------------------------------------------------ -- drop function if exists cancel_pending_tasks; create or replace function cancel_pending_tasks(event text, matchKey text, matchVal text) returns void language plpgsql security definer as $$ begin delete from graphile_worker._private_jobs where payload->>'event' = event and json_extract_path_text(payload, 'data', matchKey) = matchVal and locked_at is null; end $$; `, "20240116.up.sql": String.raw`------------------------------------------------------------------------------------------------------------------------ -- update_step_cache - Updates step cache for a given task ------------------------------------------------------------------------------------------------------------------------ -- drop function if exists update_step_cache; create or replace function update_step_cache(_job_key text, _step_cache json) returns void language plpgsql security definer as $$ begin update graphile_worker._private_jobs set payload = jsonb_set(coalesce(payload::jsonb, '{}'::jsonb), '{cache}', _step_cache::jsonb)::json where key = _job_key; end $$; ` }; const MigrationPlugin = { name: "Migration Plugin", version: "1.0", worker: { hooks: { async postmigrate(ctx, event) { const res = await event.client.query("select exists(select from pg_tables where schemaname = 'slayq' and tablename = '_migrations')"); let hasMigrations = false; if (res.rowCount && res.rowCount > 0 && res.rows[0].exists) { ctx.logger.info(`SlayQ migrations table exists.`); hasMigrations = true; } else { ctx.logger.warn("SlayQ migrations table does not exist."); } const allKeys = Object.keys(migrations).toSorted((a, b) => { return a.localeCompare(b); }); for (const key of allKeys) { if (hasMigrations) { const res2 = await event.client.query(`select * from slayq._migrations where migration = '${key}'`); if (res2.rowCount && res2.rowCount > 0) { ctx.logger.info(`Migration ${key} exists. Continuing.`); continue; } } ctx.logger.info(`Running migration '${key}'`); await event.client.query(migrations[key]); await event.client.query(`insert into slayq._migrations(migration) values ('${key}')`); hasMigrations = true; } ctx.logger.info("Done running SlayQ migrations."); } } } }; async function server(configFile) { const configText = fs.readFileSync(configFile).toString("utf-8"); const config = JSON.parse(configText); let cronQueue; console.log(config); if (config.cron) { const cronItems = config.cron.map((cronEntry) => { return { task: cronEntry.event, match: cronEntry.cron, payload: { url: process.env.SLAY_Q_CRON_URL, event: cronEntry.event, retry: 0, data: {} } }; }); let cronTasks = {}; cronItems.forEach((item) => cronTasks[item.task] = httpHandler); cronQueue = await graphileWorker.run({ connectionString: process.env.SLAY_Q_DATABASE_URL, maxPoolSize: 11, concurrency: 1, noHandleSignals: false, parsedCronItems: graphileWorker.parseCronItems(cronItems), taskList: cronTasks, preset: { plugins: [MigrationPlugin] } }); } else { cronQueue = await graphileWorker.run({ connectionString: process.env.SLAY_Q_DATABASE_URL, maxPoolSize: 11, concurrency: 1, noHandleSignals: false, crontab: `* * * * * cronQueue ?max=1&jobKey=cron {url:"${process.env.SLAY_Q_CRON_URL}",retry:0,event:"cron",data:{}}`, taskList: { cronQueue: httpHandler }, preset: { plugins: [MigrationPlugin] } }); } const promises = [cronQueue]; for (const queue of Object.keys(config.queues)) { const taskList = { [queue]: httpHandler }; if (config.queues[queue].alias) { config.queues[queue].alias.forEach((name) => taskList[name] = httpHandler); } const newQueue = await graphileWorker.run({ connectionString: process.env.SLAY_Q_DATABASE_URL, concurrency: config.queues[queue].concurrency, noHandleSignals: false, pollInterval: 1e3, maxPoolSize: 11, taskList }); promises.push(newQueue); } await Promise.all(promises); } dotenv__namespace.config(); const mainCommand = citty.defineCommand({ meta: { name: "Slay Q Server", version: "1.0.0", description: "Graphile based Inngest wannabe" }, args: { configFile: { type: "positional", description: "Relative or absolute path to config file", required: true } }, async run({ args }) { if (!process.env.SLAY_Q_DATABASE_URL) { console.error("Missing required environment variable SLAY_Q_DATABASE_URL"); process.exit(1); } let configFileRealPath = args.configFile; if (!fs.existsSync(configFileRealPath)) { configFileRealPath = process.cwd() + "/" + args.configFile; if (!fs.existsSync(configFileRealPath)) { console.error("Unable to find slay-config.json"); process.exit(1); } } if (!process.env.SLAY_Q_SECRET) { console.error("Missing SLAY_Q_SECRET environment variable."); process.exit(1); } if (!process.env.SLAY_Q_CRON_URL) { console.error("Missing SLAY_Q_CRON_URL environment variable."); process.exit(1); } server(configFileRealPath).catch((err) => { console.error(err); process.exit(1); }); } }); function run() { console.log("running"); citty.runMain(mainCommand).catch((err) => { console.error(err); process.exit(1); }); } exports.run = run;