'use strict'; var _ = require('lodash'); var EventEmitter = require('node:events'); var ms = require('ms'); var mongoose = require('mongoose'); var jsonpatch = require('fast-json-patch'); var omit = require('omit-deep'); var semver = require('semver'); var powerAssign = require('power-assign'); class PatchEventEmitter extends EventEmitter { } const em = new PatchEventEmitter(); const HistorySchema = new mongoose.Schema( { op: { type: String, required: true }, modelName: { type: String, required: true }, collectionName: { type: String, required: true }, collectionId: { type: mongoose.Schema.Types.ObjectId, required: true }, doc: { type: Object }, patch: { type: Array }, user: { type: Object }, reason: { type: String }, metadata: { type: Object }, version: { type: Number, min: 0, default: 0 } }, { timestamps: true } ); HistorySchema.index({ collectionId: 1, version: -1 }); HistorySchema.index({ op: 1, modelName: 1, collectionName: 1, collectionId: 1, reason: 1, version: 1 }); const HistoryModel = mongoose.model("History", HistorySchema, "history"); const isHookIgnored = (options) => { return options.ignoreHook === true || options.ignoreEvent === true && options.ignorePatchHistory === true; }; const toObjectOptions = { depopulate: true, virtuals: false }; const setPatchHistoryTTL = async (ttl) => { const name = "createdAt_1_TTL"; try { const indexes = await HistoryModel.collection.indexes(); const existingIndex = indexes?.find((index) => index.name === name); if (!ttl && existingIndex) { await HistoryModel.collection.dropIndex(name); return; } const milliseconds = typeof ttl === "string" ? ms(ttl) : ttl; if (milliseconds < 1e3 && existingIndex) { await HistoryModel.collection.dropIndex(name); return; } const expireAfterSeconds = milliseconds / 1e3; if (existingIndex && existingIndex.expireAfterSeconds === expireAfterSeconds) { return; } if (existingIndex) { await HistoryModel.collection.dropIndex(name); } await HistoryModel.collection.createIndex({ createdAt: 1 }, { expireAfterSeconds, name }); } catch (err) { console.error("Couldn't create or update index for history collection", err); } }; function isPatchHistoryEnabled(opts, context) { return !opts.patchHistoryDisabled && !context.ignorePatchHistory; } function getJsonOmit(opts, doc) { const object = JSON.parse(JSON.stringify(doc)); if (opts.omit) { return omit(object, opts.omit); } return object; } function getObjectOmit(opts, doc) { if (opts.omit) { return omit(_.isFunction(doc?.toObject) ? doc.toObject() : doc, opts.omit); } return doc; } async function getUser(opts) { if (_.isFunction(opts.getUser)) { return await opts.getUser(); } return void 0; } async function getReason(opts) { if (_.isFunction(opts.getReason)) { return await opts.getReason(); } return void 0; } async function getMetadata(opts) { if (_.isFunction(opts.getMetadata)) { return await opts.getMetadata(); } return void 0; } function getValue(item) { return item.status === "fulfilled" ? item.value : void 0; } async function getData(opts) { return Promise.allSettled([getUser(opts), getReason(opts), getMetadata(opts)]).then(([user, reason, metadata]) => { return [getValue(user), getValue(reason), getValue(metadata)]; }); } function emitEvent(context, event, data) { if (event && !context.ignoreEvent) { em.emit(event, data); } } async function bulkPatch(opts, context, eventKey, docsKey) { const history = isPatchHistoryEnabled(opts, context); const event = opts[eventKey]; const docs = context[docsKey]; const key = eventKey === "eventCreated" ? "doc" : "oldDoc"; if (_.isEmpty(docs) || !event && !history) return; const [user, reason, metadata] = await getData(opts); const chunks = _.chunk(docs, 1e3); for await (const chunk of chunks) { const bulk = []; for (const doc of chunk) { emitEvent(context, event, { [key]: doc }); if (history) { bulk.push({ insertOne: { document: { op: context.op, modelName: context.modelName, collectionName: context.collectionName, collectionId: doc._id, doc: getObjectOmit(opts, doc), user, reason, metadata, version: 0 } } }); } } if (history && !_.isEmpty(bulk)) { await HistoryModel.bulkWrite(bulk, { ordered: false }).catch((error) => { console.error(error.message); }); } } } async function createPatch(opts, context) { await bulkPatch(opts, context, "eventCreated", "createdDocs"); } async function updatePatch(opts, context, current, original) { const history = isPatchHistoryEnabled(opts, context); const currentObject = getJsonOmit(opts, current); const originalObject = getJsonOmit(opts, original); if (_.isEmpty(originalObject) || _.isEmpty(currentObject)) return; const patch = jsonpatch.compare(originalObject, currentObject, true); if (_.isEmpty(patch)) return; emitEvent(context, opts.eventUpdated, { oldDoc: original, doc: current, patch }); if (history) { let version = 0; const lastHistory = await HistoryModel.findOne({ collectionId: original._id }).sort("-version").exec(); if (lastHistory && lastHistory.version >= 0) { version = lastHistory.version + 1; } const [user, reason, metadata] = await getData(opts); await HistoryModel.create({ op: context.op, modelName: context.modelName, collectionName: context.collectionName, collectionId: original._id, patch, user, reason, metadata, version }); } } async function deletePatch(opts, context) { await bulkPatch(opts, context, "eventDeleted", "deletedDocs"); } const isMongooseLessThan8 = semver.satisfies(mongoose.version, "<8"); const isMongooseLessThan7 = semver.satisfies(mongoose.version, "<7"); const isMongoose6 = semver.satisfies(mongoose.version, "6"); if (isMongoose6) { mongoose.set("strictQuery", false); } const deleteMethods = ["remove", "findOneAndDelete", "findOneAndRemove", "findByIdAndDelete", "findByIdAndRemove", "deleteOne", "deleteMany"]; const deleteHooksInitialize = (schema, opts) => { schema.pre(deleteMethods, { document: false, query: true }, async function() { const options = this.getOptions(); if (isHookIgnored(options)) return; const model = this.model; const filter = this.getFilter(); this._context = { op: this.op, modelName: opts.modelName ?? this.model.modelName, collectionName: opts.collectionName ?? this.model.collection.collectionName, ignoreEvent: options.ignoreEvent, ignorePatchHistory: options.ignorePatchHistory }; if (["remove", "deleteMany"].includes(this._context.op) && !options.single) { const docs = await model.find(filter).lean().exec(); if (!_.isEmpty(docs)) { this._context.deletedDocs = docs; } } else { const doc = await model.findOne(filter).lean().exec(); if (!_.isEmpty(doc)) { this._context.deletedDocs = [doc]; } } if (opts.preDelete && _.isArray(this._context.deletedDocs) && !_.isEmpty(this._context.deletedDocs)) { await opts.preDelete(this._context.deletedDocs); } }); schema.post(deleteMethods, { document: false, query: true }, async function() { const options = this.getOptions(); if (isHookIgnored(options)) return; await deletePatch(opts, this._context); }); }; const saveHooksInitialize = (schema, opts) => { schema.pre("save", async function() { if (this.constructor.name !== "model") return; const current = this.toObject(toObjectOptions); const model = this.constructor; const context = { op: this.isNew ? "create" : "update", modelName: opts.modelName ?? model.modelName, collectionName: opts.collectionName ?? model.collection.collectionName, createdDocs: [current] }; if (this.isNew) { await createPatch(opts, context); } else { const original = await model.findById(current._id).lean().exec(); if (original) { await updatePatch(opts, context, current, original); } } }); }; const updateMethods = ["update", "updateOne", "replaceOne", "updateMany", "findOneAndUpdate", "findOneAndReplace", "findByIdAndUpdate"]; const assignUpdate = (document, update, commands) => { let updated = powerAssign.assign(document.toObject(toObjectOptions), update); _.forEach(commands, (command) => { try { updated = powerAssign.assign(updated, command); } catch { } }); const doc = document.set(updated).toObject(toObjectOptions); if (update.createdAt) doc.createdAt = update.createdAt; return doc; }; const splitUpdateAndCommands = (updateQuery) => { let update = {}; const commands = []; if (!_.isEmpty(updateQuery) && !_.isArray(updateQuery) && _.isObjectLike(updateQuery)) { update = _.cloneDeep(updateQuery); const keysWithDollarSign = _.keys(update).filter((key) => key.startsWith("$")); if (!_.isEmpty(keysWithDollarSign)) { _.forEach(keysWithDollarSign, (key) => { commands.push({ [key]: update[key] }); delete update[key]; }); } } return { update, commands }; }; const updateHooksInitialize = (schema, opts) => { schema.pre(updateMethods, async function() { const options = this.getOptions(); if (isHookIgnored(options)) return; const model = this.model; const filter = this.getFilter(); const count = await this.model.countDocuments(filter).exec(); this._context = { op: this.op, modelName: opts.modelName ?? this.model.modelName, collectionName: opts.collectionName ?? this.model.collection.collectionName, isNew: Boolean(options.upsert) && count === 0, ignoreEvent: options.ignoreEvent, ignorePatchHistory: options.ignorePatchHistory }; const updateQuery = this.getUpdate(); const { update, commands } = splitUpdateAndCommands(updateQuery); const cursor = model.find(filter).cursor(); await cursor.eachAsync(async (doc) => { const origDoc = doc.toObject(toObjectOptions); await updatePatch(opts, this._context, assignUpdate(doc, update, commands), origDoc); }); }); schema.post(updateMethods, async function() { const options = this.getOptions(); if (isHookIgnored(options)) return; if (!this._context.isNew) return; const model = this.model; const updateQuery = this.getUpdate(); const { update, commands } = splitUpdateAndCommands(updateQuery); let current = null; const filter = this.getFilter(); const combined = assignUpdate(model.hydrate({}), update, commands); if (!_.isEmpty(update) && !current) { current = await model.findOne(update).sort("desc").lean().exec(); } if (!_.isEmpty(combined) && !current) { current = await model.findOne(combined).sort("desc").lean().exec(); } if (!_.isEmpty(filter) && !current) { console.log("filter", filter); current = await model.findOne(filter).sort("desc").lean().exec(); } if (current) { this._context.createdDocs = [current]; await createPatch(opts, this._context); } }); }; const remove = isMongooseLessThan7 ? "remove" : "deleteOne"; const patchEventEmitter = em; const patchHistoryPlugin = function plugin(schema, opts) { saveHooksInitialize(schema, opts); updateHooksInitialize(schema, opts); deleteHooksInitialize(schema, opts); schema.post("insertMany", async function(docs) { const context = { op: "create", modelName: opts.modelName ?? this.modelName, collectionName: opts.collectionName ?? this.collection.collectionName, createdDocs: docs }; await createPatch(opts, context); }); if (isMongooseLessThan8) { schema.pre(remove, { document: true, query: false }, async function() { const original = this.toObject(toObjectOptions); if (opts.preDelete && !_.isEmpty(original)) { await opts.preDelete([original]); } }); schema.post(remove, { document: true, query: false }, async function() { const original = this.toObject(toObjectOptions); const model = this.constructor; const context = { op: "delete", modelName: opts.modelName ?? model.modelName, collectionName: opts.collectionName ?? model.collection.collectionName, deletedDocs: [original] }; await deletePatch(opts, context); }); } }; exports.patchEventEmitter = patchEventEmitter; exports.patchHistoryPlugin = patchHistoryPlugin; exports.setPatchHistoryTTL = setPatchHistoryTTL;