'use strict'; class MutentError extends Error { constructor (code, message, info) { super(message); if (Error.captureStackTrace) { Error.captureStackTrace(this, this.constructor); } this.name = 'MutentError'; this.code = code || 'EMUT_GENERIC_ERROR'; if (info !== undefined) { this.info = info; } } get [Symbol.toStringTag] () { return 'Error' } toString () { return `${this.name} [${this.code}]: ${this.message}` } toJSON () { return { error: this.code, message: this.message, info: this.info } } } function isObjectLike (value) { return typeof value === 'object' && value !== null } function isAsyncIterable (value) { return isObjectLike(value) && Symbol.asyncIterator in value } function isIterable (value) { return isObjectLike(value) && Symbol.iterator in value } function isFunction (value) { return typeof value === 'function' } function isPositiveInteger (value) { return typeof value === 'number' && Number.isInteger(value) && value > 0 } function passthrough (value) { return value } /** * Return the customized Adapter's name (if any) or something descriptive * (if possibile). */ function getAdapterName (adapter) { return typeof adapter === 'object' && adapter !== null ? adapter[Symbol.for('adapter-name')] || adapter.constructor.name : 'Unknown Adapter' } async function * findEntity (ctx) { const { adapter, argument, hooks, intent, options } = ctx; if (!isFunction(adapter.find) && !isFunction(adapter.findEntity)) { throw new MutentError( 'EMUT_PARTIAL_ADAPTER', 'The adapter does not implement both ".find" and ".findEntity" methods', { adapter: getAdapterName(adapter), intent, argument } ) } for (const hook of hooks.onFind) { await hook(argument, ctx); } const data = isFunction(adapter.findEntity) ? await adapter.findEntity(argument, ctx) : await adapter.find(argument, options); if (data !== null && data !== undefined) { yield data; } } async function * filterEntities (ctx) { const { adapter, argument, hooks, intent, options } = ctx; if (!isFunction(adapter.filter) && !isFunction(adapter.filterEntities)) { throw new MutentError( 'EMUT_PARTIAL_ADAPTER', 'The adapter does not implement both ".filter" and ".filterEntities" methods', { adapter: getAdapterName(adapter), intent, argument } ) } for (const hook of hooks.onFilter) { await hook(argument, ctx); } if (isFunction(adapter.filterEntities)) { yield * adapter.filterEntities(argument, ctx); } else { yield * adapter.filter(argument, options); } } function iterateContext (ctx) { const { argument, intent, multiple } = ctx; if (intent === 'FILTER') { return filterEntities(ctx) } else if (intent === 'FIND' || intent === 'READ') { return findEntity(ctx) } else if (multiple) { return argument } else { return asIterable(argument) } } /** * Wrap a single value into an `AsyncIterable`. */ async function * asIterable (argument) { const data = await (typeof argument === 'function' ? argument() : argument); if (data !== null && data !== undefined) { yield data; } } async function createEntity (entity, ctx) { const { adapter, argument, hooks, intent, options } = ctx; if (!isFunction(adapter.create) && !isFunction(adapter.createEntity)) { throw new MutentError( 'EMUT_PARTIAL_ADAPTER', 'The Adapter does not implement both the ".create" and ".createEntity" methods', { adapter: getAdapterName(adapter), intent, argument } ) } for (const hook of hooks.beforeCreate) { await hook(entity, ctx); } if (isFunction(adapter.createEntity)) { await adapter.createEntity(entity, ctx); } else { const result = await adapter.create(entity.target, options); if (result !== null && result !== undefined) { entity.set(result); } } for (const hook of hooks.afterCreate) { await hook(entity, ctx); } } async function updateEntity (entity, ctx) { const { adapter, argument, hooks, intent, options } = ctx; if (!isFunction(adapter.update) && !isFunction(adapter.updateEntity)) { throw new MutentError( 'EMUT_PARTIAL_ADAPTER', 'The Adapter does not implement both the ".update" and ".updateEntity" methods', { adapter: getAdapterName(adapter), intent, argument } ) } for (const hook of hooks.beforeUpdate) { await hook(entity, ctx); } if (isFunction(adapter.updateEntity)) { await adapter.updateEntity(entity, ctx); } else { const result = await adapter.update(entity.source, entity.target, options); if (result !== null && result !== undefined) { entity.set(result); } } for (const hook of hooks.afterUpdate) { await hook(entity, ctx); } } async function deleteEntity (entity, ctx) { const { adapter, argument, hooks, intent, options } = ctx; if (!isFunction(adapter.delete) && !isFunction(adapter.deleteEntity)) { throw new MutentError( 'EMUT_PARTIAL_ADAPTER', 'The Adapter does not implement both the ".delete" and ".deleteEntity" methods', { adapter: getAdapterName(adapter), intent, argument } ) } for (const hook of hooks.beforeDelete) { await hook(entity, ctx); } if (isFunction(adapter.deleteEntity)) { await adapter.deleteEntity(entity, ctx); } else { await adapter.delete(entity.source, options); } for (const hook of hooks.afterDelete) { await hook(entity, ctx); } } /** * Single Adapter's write operation. */ async function writeEntity (entity, ctx) { if (entity.shouldCreate) { await createEntity(entity, ctx); } else if (entity.shouldUpdate) { await updateEntity(entity, ctx); } else if (entity.shouldDelete) { await deleteEntity(entity, ctx); } return entity.commit() } async function * sequentialWrite (iterable, ctx) { for await (const entity of iterable) { yield writeEntity(entity, ctx); } } async function * concurrentWrite (iterable, ctx) { const { writeSize } = ctx; let buffer = []; for await (const entity of iterable) { buffer.push(entity); if (buffer.length >= writeSize) { yield * await Promise.all( buffer.map(e => writeEntity(e, ctx)) ); buffer = []; } } if (buffer.length > 0) { yield * await Promise.all( buffer.map(e => writeEntity(e, ctx)) ); buffer = []; } } async function * bulkWrite (iterable, ctx) { const { adapter, argument, hooks, intent, writeSize } = ctx; if (!isFunction(adapter.bulk) && !isFunction(adapter.bulkEntities)) { throw new MutentError( 'EMUT_PARTIAL_ADAPTER', 'The Adapter does not implement both the ".bulk" and ".bulkEntities" methods', { adapter: getAdapterName(adapter), intent, argument } ) } let queue = []; let waitingCommit = 0; for await (const entity of iterable) { queue.push(setEntityHooks(entity, hooks)); if (entity.shouldCommit) { // Preserve the Entities' order waitingCommit++; } if (waitingCommit >= writeSize) { yield * flushQueue(queue, ctx); queue = []; waitingCommit = 0; } } if (queue.length > 0) { yield * flushQueue(queue, ctx); queue = []; waitingCommit = 0; } } /** * See `fireEntityHooksBefore` implementation. */ const K_BEFORE = Symbol('mutent-hooks-before'); /** * See `fireEntityHooksAfter` implementation. */ const K_AFTER = Symbol('mutent-hooks-after'); /** * Save adapter hooks inside Entity's meta. */ function setEntityHooks (entity, hooks) { if (entity.shouldCreate) { entity.meta[K_BEFORE] = hooks.beforeCreate; entity.meta[K_AFTER] = hooks.afterCreate; } else if (entity.shouldUpdate) { entity.meta[K_BEFORE] = hooks.beforeUpdate; entity.meta[K_AFTER] = hooks.afterUpdate; } else if (entity.shouldDelete) { entity.meta[K_BEFORE] = hooks.beforeDelete; entity.meta[K_AFTER] = hooks.afterDelete; } return entity } /** * Execute "before adapter write" hooks (if any). */ async function fireEntityHooksBefore (entity, ctx) { const hooks = entity.meta[K_BEFORE]; if (hooks) { for (const hook of hooks) { await hook(entity, ctx); } } } /** * Execute "after adapter write" hooks (if any). * This function also remove the saved hooks. */ async function fireEntityHooksAfter (entity, ctx) { const hooks = entity.meta[K_AFTER]; if (hooks) { for (const hook of hooks) { await hook(entity, ctx); } // Clean-up my dirt entity.meta[K_BEFORE] = undefined; entity.meta[K_AFTER] = undefined; } } async function * flushQueue (queue, ctx) { const { adapter } = ctx; for (const entity of queue) { await fireEntityHooksBefore(entity, ctx); } const pending = queue.filter(e => e.shouldCommit); if (pending.length > 0) { if (isFunction(adapter.bulkEntities)) { await adapter.bulkEntities(pending, ctx); } else { const result = Object( await adapter.bulk( pending.map(createBulkAction), ctx.options ) ); for (let i = 0; i < pending.length; i++) { if ( !pending[i].shouldDelete && result[i] !== undefined && result[i] !== null ) { pending[i].set(result[i]); } } } } for (const entity of queue) { // Commit Entity before any other action entity.commit(); // Fire the "after commit" hooks await fireEntityHooksAfter(entity, ctx); // Yield out the committed Entity yield entity; } } function createBulkAction (entity) { if (entity.shouldCreate) { return { type: 'CREATE', data: entity.target } } else if (entity.shouldUpdate) { return { type: 'UPDATE', oldData: entity.source, newData: entity.target } } else if (entity.shouldDelete) { return { type: 'DELETE', data: entity.source } } else { throw new Error('Expected Entity to commit') } } class Entity { static create (data) { return new Entity(data) } static read (data) { return Entity.create(data).commit() } get shouldCreate () { return this.source === null && this.created && !this.deleted } get shouldUpdate () { return this.source !== null && this.updated && !this.deleted } get shouldDelete () { return this.source !== null && this.deleted } get shouldCommit () { return this.shouldCreate || this.shouldUpdate || this.shouldDelete } constructor (data) { if (data === null || data === undefined) { throw new TypeError('Cannot accept a nullish value to create an entity') } this.source = null; this.target = data; this.created = true; this.updated = false; this.deleted = false; this.meta = {}; } commit () { this.source = this.deleted ? null : this.target; this.created = false; this.updated = false; this.deleted = false; return this } delete () { this.deleted = true; return this } set (data) { if (data === null || data === undefined) { throw new TypeError('Cannot accept a nullish value to update an entity') } this.target = data; return this } update (data) { if (this.target !== data) { this.set(data); this.updated = true; } return this } valueOf () { return this.target } } function ensure (one) { return async function * mutatorEnsure (iterable, ctx) { let exists = false; for await (const entity of iterable) { exists = true; yield entity; } if (!exists) { const data = await (typeof one === 'function' ? one() : one); const entity = Entity.create(data); for (const hook of ctx.hooks.onEntity) { await hook(entity, ctx); } yield entity; } } } function filter (predicate) { if (!isFunction(predicate)) { throw new TypeError('Filter mutator expectes a predicate function') } return async function * mutatorFilter (iterable) { let index = 0; for await (const entity of iterable) { if (await predicate(entity.valueOf(), index++)) { yield entity; } } } } async function * mutatorDelete (iterable) { for await (const entity of iterable) { yield entity.delete(); } } function ddelete (predicate) { if (!predicate) { return mutatorDelete } if (!isFunction(predicate)) { throw new TypeError('Delete mutator expectes a predicate function') } return async function * mutatorDeleteConditional (iterable) { let index = 0; for await (const entity of iterable) { if (await predicate(entity.valueOf(), index++)) { entity.delete(); } yield entity; } } } async function * mutatorCommit (iterable, ctx) { const { adapter, multiple, writeSize } = ctx; let writeMode = ctx.writeMode; if (writeMode === 'AUTO' && writeSize >= 2) { writeMode = multiple && (adapter.bulk || adapter.bulkEntities) ? 'BULK' : 'SEQUENTIAL'; } if (writeMode === 'BULK') { yield * bulkWrite(iterable, ctx); } else if (writeMode === 'CONCURRENT') { yield * concurrentWrite(iterable, ctx); } else { yield * sequentialWrite(iterable, ctx); } } function commit () { return mutatorCommit } function update (mapper) { if (!isFunction(mapper)) { throw new TypeError('Update mutator expects a map function') } return async function * mutatorUpdate (iterable) { let index = 0; for await (const entity of iterable) { const result = await mapper(entity.valueOf(), index++); if (result === undefined || result === null) { yield entity; } else { yield entity.update(result); } } } } function assign (...objects) { return update((data) => Object.assign({}, data, ...objects)) } function tap (callback) { return async function * mutatorTap (iterable) { let index = 0; for await (const entity of iterable) { await callback(entity.valueOf(), index++); yield entity; } } } function pipe (...mutators) { return function mutatorPipe (iterable, ctx) { return mutators.reduce( (accumulator, mutator) => mutator(accumulator, ctx), iterable ) } } function skip (n = 0) { if (n === 0) { return passthrough // keep the same iterable } if (!isPositiveInteger(n)) { throw new TypeError('Skip mutator expects a positive integer or zero') } return async function * mutatorSkip (iterable) { for await (const entity of iterable) { if (n > 0) { n--; } else { yield entity; } } } } function limit (n) { if (!isPositiveInteger(n)) { throw new TypeError('Limit mutator expects a positive integer') } return async function * mutatorLimit (iterable) { for await (const entity of iterable) { if (n-- > 0) { yield entity; } else { break } } } } const knownHooks = [ 'onFind', 'onFilter', 'onEntity', 'beforeCreate', 'beforeUpdate', 'beforeDelete', 'afterCreate', 'afterUpdate', 'afterDelete' ]; function normalizeHooks (raw) { raw = Object(raw); const hooks = {}; for (const key of knownHooks) { const value = raw[key]; if (Array.isArray(value)) { hooks[key] = value; } else if (typeof value === 'function') { hooks[key] = [value]; } else if (value === undefined) { hooks[key] = []; } else { throw new TypeError(`Invalid ${key} hook definition`) } } return hooks } function mergeHooks (oldHooks, newHooks) { const resHooks = {}; for (const key of knownHooks) { resHooks[key] = oldHooks[key].concat(newHooks[key]); } return resHooks } function normalizeMutators (mutators = []) { if (!Array.isArray(mutators)) { throw new TypeError('Expected array of mutators') } for (const mutator of mutators) { if (typeof mutator !== 'function') { throw new TypeError('Expected mutator function') } } return mutators } function parseCommitMode (value = 'AUTO') { if ( value !== 'AUTO' && value !== 'MANUAL' && value !== 'SAFE' ) { throw new Error(`Unknown commit mode ${value}`) } return value } function parseWriteMode (value = 'AUTO') { if ( value !== 'AUTO' && value !== 'BULK' && value !== 'CONCURRENT' && value !== 'SEQUENTIAL' ) { throw new Error(`Unknown commit mode ${value}`) } return value } function parseWriteSize (value = 16) { if (!isPositiveInteger(value)) { throw new TypeError('Write size must be a positive integer') } return value } class Mutation { constructor (ctx, mutators = []) { this._ctx = ctx; this._mutators = mutators; this.mutable = false; } assign (...objects) { return this.pipe(assign(...objects)) } async consume (options) { let count = 0; // eslint-disable-next-line for await (const _ of this.iterate(options)) { count++; } return count } commit () { return this.pipe(commit()) } delete (predicate) { return this.pipe(ddelete(predicate)) } ensure (one) { return this.pipe(ensure(one)) } filter (predicate) { return this.pipe(filter(predicate)) } iterate (options) { return iterateMethod( prepareContext(this._ctx, options), this._mutators ) } limit (n) { return this.pipe(limit(n)) } pipe (...mutators) { if (this.mutable) { this._mutators.push(...mutators); return this } return new Mutation( this._ctx, this._mutators.concat(mutators) ) } skip (n) { return this.pipe(skip(n)) } tap (callback) { return this.pipe(tap(callback)) } unwrap (options) { return unwrapMethod( prepareContext(this._ctx, options), this._mutators ) } update (mapper) { return this.pipe(update(mapper)) } } function isMultiple (intent, argument) { if (intent === 'CREATE' || intent === 'FROM') { return isIterable(argument) || isAsyncIterable(argument) } else if (intent === 'FIND' || intent === 'READ') { return false } else { return true } } function prepareContext (ctx, options = {}) { if (!isObjectLike(options)) { throw new TypeError('Unwrap options must be an object') } const mutentOptions = Object(options.mutent); return { ...ctx, commitMode: mutentOptions.commitMode !== undefined ? parseCommitMode(mutentOptions.commitMode) : ctx.commitMode, handlers: ctx.handlers.concat( normalizeMutators(mutentOptions.handlers) ), hooks: mergeHooks(ctx.hooks, normalizeHooks(mutentOptions.hooks)), multiple: isMultiple(ctx.intent, ctx.argument), mutators: ctx.mutators.concat( normalizeMutators(mutentOptions.mutators) ), opaque: mutentOptions.opaque, options, writeMode: mutentOptions.writeMode !== undefined ? parseWriteMode(mutentOptions.writeMode) : ctx.writeMode, writeSize: mutentOptions.writeSize !== undefined ? parseWriteSize(mutentOptions.writeSize) : ctx.writeSize } } async function * iterateMethod (ctx, mutators) { const { adapter, argument, intent, multiple } = ctx; const chain = [ // Plugins' mutators ...ctx.mutators, // Local chain mutators ...mutators, // Internal end/safe mutator mutatorClose, // Plugins' handlers (end mutators) ...ctx.handlers ]; const iterable = chain.reduce( (acc, mutator) => mutator(acc, ctx), iterateEntities(iterateContext(ctx), ctx) ); let count = 0; for await (const entity of iterable) { if (!multiple && count++ >= 1) { throw new MutentError( 'EMUT_MUTATION_OVERFLOW', 'Current transaction returned multiple values unexpectedly', { adapter: getAdapterName(adapter), intent, argument } ) } yield entity.valueOf(); } if (intent === 'READ' && count <= 0) { throw new MutentError( 'EMUT_ENTITY_REQUIRED', 'Current transaction requires one entity to output', { adapter: getAdapterName(adapter), intent, argument } ) } } async function unwrapMethod (ctx, mutators) { const results = []; for await (const data of iterateMethod(ctx, mutators)) { results.push(data); } return ctx.multiple ? results : results.length > 0 ? results[0] : null } async function * iterateEntities (iterable, ctx) { const { hooks, intent } = ctx; for await (const data of iterable) { const entity = intent === 'CREATE' ? Entity.create(data) : Entity.read(data); for (const hook of hooks.onEntity) { await hook(entity, ctx); } yield entity; } } async function * mutatorClose (iterable, ctx) { const { commitMode } = ctx; if (commitMode === 'AUTO') { yield * commit()(iterable, ctx); } else if (commitMode === 'MANUAL') { yield * iterable; } else { yield * mutatorSafe(iterable, ctx); } } async function * mutatorSafe (iterable, ctx) { const { adapter, argument, intent } = ctx; for await (const entity of iterable) { if (entity.shouldCommit) { throw new MutentError( 'EMUT_UNSAFE_UNWRAP', 'An entity with uncommitted changes was found', { adapter: getAdapterName(adapter), intent, argument, entity } ) } yield entity; } } class Store { get raw () { return this.adapter.raw } constructor (options) { if (!isObjectLike(options)) { throw new TypeError('Expected Store options object') } if (!isObjectLike(options.adapter)) { throw new TypeError('Exepcted Store adapter') } // Set defaults this.adapter = options.adapter; this.commitMode = 'AUTO'; this.handlers = []; this.hooks = normalizeHooks(); this.mutable = options.mutable === true; this.mutators = []; this.writeMode = 'AUTO'; this.writeSize = 16; // Parse and save submitted options this.register(options); // Register plugins if (isIterable(options.plugins)) { for (const plugin of options.plugins) { this.register(plugin); } } } create (data) { return this.mutation('CREATE', data) } filter (query) { return this.mutation('FILTER', query) } find (query) { return this.mutation('FIND', query) } from (data) { return this.mutation('FROM', data) } mutation (intent, argument) { const mut = new Mutation({ adapter: this.adapter, argument, commitMode: this.commitMode, handlers: this.handlers, hooks: this.hooks, intent, multiple: undefined, mutators: this.mutators, opaque: undefined, options: undefined, writeMode: this.writeMode, writeSize: this.writeSize }); if (this.mutable) { mut.mutable = true; } return mut } read (query) { return this.mutation('READ', query) } register (plugin) { if (!isObjectLike(plugin)) { throw new TypeError('A plugin must be an object') } const { commitMode, handlers, hooks, mutators, writeMode, writeSize } = plugin; if (commitMode !== undefined) { this.commitMode = parseCommitMode(commitMode); } if (handlers !== undefined) { this.handlers = this.handlers.concat(normalizeMutators(handlers)); } if (hooks !== undefined) { this.hooks = mergeHooks(this.hooks, normalizeHooks(hooks)); } if (mutators !== undefined) { this.mutators = this.mutators.concat(normalizeMutators(mutators)); } if (writeMode !== undefined) { this.writeMode = parseWriteMode(writeMode); } if (writeSize !== undefined) { this.writeSize = parseWriteSize(writeSize); } return this } } exports.Entity = Entity; exports.MutentError = MutentError; exports.Store = Store; exports.assign = assign; exports.commit = commit; exports.ddelete = ddelete; exports.ensure = ensure; exports.filter = filter; exports.getAdapterName = getAdapterName; exports.limit = limit; exports.pipe = pipe; exports.skip = skip; exports.tap = tap; exports.update = update;