'use strict'; const zod = require('zod'); const shortUUID = require('short-uuid'); const ms = require('ms'); const parser = require('cron-parser'); const dateUtils = require('@slay-pics/date-utils'); const signature = require('@slay-pics/signature'); function _interopDefaultCompat (e) { return e && typeof e === 'object' && 'default' in e ? e.default : e; } const shortUUID__default = /*#__PURE__*/_interopDefaultCompat(shortUUID); const ms__default = /*#__PURE__*/_interopDefaultCompat(ms); const parser__default = /*#__PURE__*/_interopDefaultCompat(parser); const SlayQJobDefinitionSchema = zod.z.object({ identifier: zod.z.string(), job_key: zod.z.string(), payload: zod.z.any().nullable(), queue_name: zod.z.string().optional(), run_at: zod.z.string().optional(), max_attempts: zod.z.number().optional(), priority: zod.z.number().optional(), flags: zod.z.string().array().optional(), job_key_mode: zod.z.enum(["replace", "preserve_run_at", "unsafe_dedupe"]).optional() }); const SlayQEmptyEvent = zod.z.object({}); const SlayQReceiveEventPayloadSchema = zod.z.object({ job_key: zod.z.string(), retry: zod.z.number(), time: zod.z.number(), event: zod.z.string(), data: zod.z.any(), cache: zod.z.any().optional() }); var __defProp$4 = Object.defineProperty; var __defNormalProp$4 = (obj, key, value) => key in obj ? __defProp$4(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value; var __publicField$4 = (obj, key, value) => { __defNormalProp$4(obj, typeof key !== "symbol" ? key + "" : key, value); return value; }; class SlayQDelayError extends Error { constructor(message, delay) { super(message); __publicField$4(this, "delay"); this.delay = delay; } } class SlayQWaitOnEventError extends Error { constructor(message, step, event, timeout) { super(message); __publicField$4(this, "event"); __publicField$4(this, "timeout"); __publicField$4(this, "step"); this.event = event; this.timeout = timeout; this.step = step; } } class SlayQInvokeEventError extends Error { constructor(message, step, event, data, timeout = "15m") { super(message); __publicField$4(this, "event"); __publicField$4(this, "step"); __publicField$4(this, "data"); __publicField$4(this, "timeout"); this.event = event; this.step = step; this.data = data; this.timeout = timeout; } } class SlayQNoRetryError extends Error { } class SlayQEventMissingError extends Error { } class SlayQInvalidSignatureError extends Error { } class SlayQTimingError extends Error { } function defineSlayQFunction(options, callback) { return { cron: options.cron, event: options.event, schema: options.schema, retries: options.retries, queue: options.queue, invokes: options.invokes, priority: options.priority, cancelOn: options.cancelOn, waitsOn: options.waitsOn, callback }; } var __defProp$3 = Object.defineProperty; var __defNormalProp$3 = (obj, key, value) => key in obj ? __defProp$3(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value; var __publicField$3 = (obj, key, value) => { __defNormalProp$3(obj, typeof key !== "symbol" ? key + "" : key, value); return value; }; shortUUID__default(shortUUID__default.constants.flickrBase58, { consistentLength: false }); class SlayQStepCache { constructor(cacheData = void 0) { __publicField$3(this, "_cache", /* @__PURE__ */ new Map()); if (cacheData) { this._cache = new Map(Object.entries(cacheData)); } } has(functionName) { return this._cache.has(functionName); } set(functionName, value) { if (this.has(functionName)) { throw new Error(`Duplicate step ${functionName} detected.`); } this._cache.set(functionName, value); } get(functionName) { return this._cache.get(functionName); } get cache() { return Object.fromEntries(this._cache.entries()); } } var __defProp$2 = Object.defineProperty; var __defNormalProp$2 = (obj, key, value) => key in obj ? __defProp$2(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value; var __publicField$2 = (obj, key, value) => { __defNormalProp$2(obj, typeof key !== "symbol" ? key + "" : key, value); return value; }; class SlayQStep { constructor(runCache, client) { __publicField$2(this, "_client"); __publicField$2(this, "_stepCache"); this._stepCache = runCache; this._client = client; } async run(name, fn) { if (this._stepCache.has(name)) { return this._stepCache.get(name); } const res = await fn(); this._stepCache.set(name, res ?? null); return res; } async sendEvent(name, event) { if (this._stepCache.has(name)) { return; } await this._client.sendEvent(event.name, event.data); this._stepCache.set(name, true); } async sendEvents(name, events) { if (this._stepCache.has(name)) { return; } await this._client.sendEvents(events); this._stepCache.set(name, true); } async sleep(name, duration) { if (this._stepCache.has(name)) { return; } const time = Date.now() + ms__default(duration) * this._client.timeMultiplier; this._stepCache.set(name, true); throw new SlayQDelayError("Sleep", time); } async sleepUntil(name, date) { if (this._stepCache.has(name)) { return; } if (typeof date === "string") { date = new Date(date); } this._stepCache.set(name, true); throw new SlayQDelayError("Sleep", date.getTime()); } async waitForEvent(name, event, timeout) { if (this._stepCache.has(name)) { return !!this._stepCache.get(name); } this._stepCache.set(name, false); throw new SlayQWaitOnEventError("Wait", name, event, timeout); } async invoke(name, event, data, timeout = "15m") { if (this._stepCache.has(name)) { return this._stepCache.get(name); } this._stepCache.set(name, null); throw new SlayQInvokeEventError("Invoke", name, event, data, timeout); } } var __defProp$1 = Object.defineProperty; var __defNormalProp$1 = (obj, key, value) => key in obj ? __defProp$1(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value; var __publicField$1 = (obj, key, value) => { __defNormalProp$1(obj, typeof key !== "symbol" ? key + "" : key, value); return value; }; class SlayQCron { constructor() { __publicField$1(this, "_cron", []); } addCron(cron, event) { let interval; if (cron.toUpperCase().startsWith("TZ=")) { const parts = cron.split(" "); if (parts.length <= 1) { throw new Error("Invalid cron"); } const tz = parts.shift().substring(3); const actualCron = parts.join(" "); interval = parser__default.parseExpression(actualCron, { tz }); } else { interval = parser__default.parseExpression(cron); } this._cron.push({ next: interval.next().getTime(), interval, event }); } checkCron() { const result = []; const now = Date.now(); for (const cron of this._cron) { if (now >= cron.next) { result.push(cron.event); cron.next = cron.interval.next().getTime(); } } return result; } } var __defProp = Object.defineProperty; var __defNormalProp = (obj, key, value) => key in obj ? __defProp(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value; var __publicField = (obj, key, value) => { __defNormalProp(obj, typeof key !== "symbol" ? key + "" : key, value); return value; }; const translator = shortUUID__default(shortUUID__default.constants.flickrBase58, { consistentLength: false }); class SlayQClient { constructor(options) { __publicField(this, "_functionMap"); __publicField(this, "_cancelsMap"); __publicField(this, "_waitedForMap"); __publicField(this, "_driver"); __publicField(this, "_endpoint"); __publicField(this, "_cron", new SlayQCron()); __publicField(this, "_invokeable", /* @__PURE__ */ new Set()); __publicField(this, "timeMultiplier"); this._driver = options.driver; this._endpoint = options.endpoint; this.timeMultiplier = options.timeMultiplier ?? 1; this._cancelsMap = /* @__PURE__ */ new Map(); this._waitedForMap = /* @__PURE__ */ new Map(); options.functions.forEach((fn) => { if (fn.invokes) { fn.invokes.forEach((thing) => this._invokeable.add(thing)); } if (fn.cron) { this._cron.addCron(fn.cron, fn.event); } if (fn.cancelOn && fn.cancelOn.length > 0) { fn.cancelOn.forEach((cancel) => { if (!this._cancelsMap.has(cancel.event)) { this._cancelsMap.set(cancel.event, []); } this._cancelsMap.get(cancel.event).push({ event: fn.event, match: cancel.match }); }); } if (fn.waitsOn && fn.waitsOn.length > 0) { fn.waitsOn.forEach((waitOn) => { if (!this._waitedForMap.has(waitOn.event)) { this._waitedForMap.set(waitOn.event, []); } if (!this._waitedForMap.get(waitOn.event).includes(waitOn.match)) { this._waitedForMap.get(waitOn.event).push(waitOn.match); } }); } }); this._functionMap = new Map(options.functions.map((fn) => [fn.event, fn])); } async cancelEvent(event, data) { const cancels = this._cancelsMap.get(event); if (!cancels) { console.warn(`No cancel for ${event}`); return; } for (const cancel of cancels) { const matchVal = data.hasOwnProperty(cancel.match) ? data[cancel.match] : null; if (!matchVal) { console.warn(`No value for ${cancel.match}`); continue; } await this._driver.cancelPendingTasks(cancel.event, cancel.match, matchVal); } } async sendEvent(event, data, delay = null, runAt = null, cache = null, customJobKey = null) { if (this._cancelsMap.has(event)) { await this.cancelEvent(event, data); } await this.updateEvent(customJobKey ?? translator.generate(), event, data, delay, runAt, cache); } async sendEvents(events, batchSize = 8) { const eventsCopy = [...events]; while (eventsCopy.length > 0) { const batch = eventsCopy.splice(0, batchSize); const promises = batch.map((e) => this.sendEvent(e.name, e.data)); await Promise.all(promises); } } async updateEvent(jobKey, event, data, delay = null, runAt = null, cache = null) { if (!this._functionMap.has(event)) { throw new Error(`Event ${event} not found`); } const f = this._functionMap.get(event); if (!f.schema.safeParse(data).success) { throw new Error(`Event ${event} data does not match schema`); } const args = {}; if (f.retries !== void 0 && f.retries !== null) { args.max_attempts = f.retries + 1; } if (f.priority !== void 0) { args.priority = f.priority; } if (delay && delay > 0) { args.run_at = dateUtils.addSeconds(null, delay).toISOString(); } else if (runAt) { args.run_at = runAt.toISOString(); } const payload = { url: this._endpoint, retry: 0, event, data }; if (cache) { payload.cache = cache; } const error = await this._driver.addJob({ identifier: f.queue ?? "general", job_key: jobKey, ...args, payload }); if (error) { console.error(error.message); } } async triggerWaitedFor(event, data) { const waitMatches = this._waitedForMap.get(event); if (!waitMatches) { console.warn(`No wait matches for ${event}`); return; } for (const match of waitMatches) { const matchVal = data.hasOwnProperty(match) ? data[match] : null; if (!matchVal) { console.warn(`No value for ${match}`); continue; } const error = await this._driver.triggerWaiting(event, match, matchVal); if (error) { console.error(error); } } } async invokeEvent(jobKey, event, data, cache = null, retry = 0) { if (event === "cron") { const events = this._cron.checkCron(); if (events.length > 0) { await this.sendEvents( events.map((event2) => { return { name: event2, data: {} }; }) ); } return; } if (!this._functionMap.has(event)) { throw new Error(`Event ${event} not found`); } const f = this._functionMap.get(event); if (!f.schema.safeParse(data).success) { throw new Error(`Event ${event} data does not match schema`); } const stepCache = new SlayQStepCache(cache); const step = new SlayQStep(stepCache, this); try { const result = await f.callback({ event, data, step, retry }); if (this._waitedForMap.has(event)) { await this.triggerWaitedFor(event, data); } if (this._invokeable.has(event)) { const error = await this._driver.triggerWaitingInvokers(jobKey, result ?? null); if (error) { console.error(error.message); } } } catch (e) { if (e instanceof SlayQDelayError) { await this.updateEvent(jobKey, event, data, null, new Date(e.delay), stepCache.cache); } else if (e instanceof SlayQWaitOnEventError) { if (!f.waitsOn) { throw new Error("You are waiting for an event but haven't specified it in the waitsOn property of your function."); } const waitingEvent = e.event; const waitsOn = f.waitsOn.find((waitsOn2) => waitsOn2.event === waitingEvent); if (!waitsOn) { throw new Error("You are waiting for an event but haven't specified it in the waitsOn property of your function."); } if (!data.hasOwnProperty(waitsOn.match)) { throw new Error("Invalid property specified for match on waitForEvent."); } const matchVal = `${data[waitsOn.match]}`; const timeoutDate = new Date(Date.now() + ms__default(e.timeout)); await this.updateEvent(jobKey, event, data, null, timeoutDate, stepCache.cache); const error = await this._driver.waitForEvent(jobKey, e.event, waitsOn.match, matchVal, e.step, timeoutDate); if (error) { console.error(error.message); } } else if (e instanceof SlayQInvokeEventError) { if (!f.invokes || !f.invokes.includes(e.event)) { throw new Error("You are invoking a function that you haven't specified in your function's invoke property."); } const timeoutDate = new Date(Date.now() + ms__default(e.timeout)); const invokeJobKey = translator.generate(); await this.updateEvent(jobKey, event, data, null, timeoutDate, stepCache.cache); const error = await this._driver.waitForInvocation(jobKey, invokeJobKey, e.step); if (error) { console.error(error.message); } else { await this.sendEvent(e.event, e.data, null, null, null, invokeJobKey); } } else if (e instanceof SlayQNoRetryError) ; else { await this._driver.updateStepCache(jobKey, stepCache.cache); throw e; } } } async receiveEvent(sigHeader, payload) { if (payload.event !== "cron" && !this._functionMap.has(payload.event)) { throw new SlayQEventMissingError(`Event ${payload.event} not found`); } const calculateSig = signature.calcSig(payload, process.env.SLAY_Q_WORKER_SECRET); if (calculateSig !== sigHeader) { throw new SlayQInvalidSignatureError(`Event ${payload.event} has invalid signature`); } const timeDelta = (/* @__PURE__ */ new Date()).getTime() - payload.time; if (timeDelta / 1e3 > 15) { throw new SlayQTimingError(`Event ${payload.event} has invalid timestamp`); } await this.invokeEvent(payload.job_key, payload.event, payload.data, payload.cache, payload.retry ?? 0); } } exports.SlayQClient = SlayQClient; exports.SlayQEmptyEvent = SlayQEmptyEvent; exports.SlayQJobDefinitionSchema = SlayQJobDefinitionSchema; exports.SlayQNoRetryError = SlayQNoRetryError; exports.SlayQReceiveEventPayloadSchema = SlayQReceiveEventPayloadSchema; exports.SlayQStep = SlayQStep; exports.defineSlayQFunction = defineSlayQFunction;