import Boom = require("boom"); import * as Bookshelf from "bookshelf"; import { FetchOptions, FetchAllOptions, Collection } from "bookshelf"; import Knex = require("knex"); import Path = require("path"); import dotenv = require("dotenv"); import pg = require("pg"); import { Controller } from "./controller"; import { IRepository, IExtendedModel } from "./types"; import { eventBus } from "./event-bus"; import { Observable } from "rxjs/Observable"; import "rxjs/add/operator/finally"; dotenv.config(); pg.types.setTypeParser(20, "text", parseInt); const env = process.env.NODE_ENV || "development"; const rootPath = Path.resolve(process.cwd()); const knexfilePath = Path.join(rootPath, "./knexfile"); const knexfile = require(knexfilePath); const knex = Knex(knexfile[env]); export const bookshelf: any = Bookshelf(knex); bookshelf.plugin("registry"); bookshelf.plugin("visibility"); export const tableModelMap = {}; // function getModelDependencies(data: any): string[] { // const dependencies = []; // Object.values(data.relations).forEach((relation: any) => { // const tableName = relation.relatedData.targetTableName; // dependencies.push(tableModelMap[tableName]); // }); // return dependencies; // } // function getCollectionDependencies(data: Collection): string[] { // const dependencies = []; // Object.values(data.head().relations).forEach((relation: any) => { // const tableName = relation.relatedData.targetTableName; // dependencies.push(tableModelMap[tableName]); // }); // return dependencies; // } function getDependencies(model) { const dependencies = []; if (!model || !model.relations) { return; } const relations = [...Object.values(model.relations)]; while (relations.length) { const relation: any = relations.pop(); let tableName; if ( relation.head && typeof relation.head === "function" && relation.head().relatedData ) { tableName = relation.head().relatedData.targetTableName; if (relation.head().relations) { Object.values(relation.head().relations).forEach(i => relations.push(i) ); } } else if (relation.relatedData && relation.relatedData.targetTableName) { tableName = relation.relatedData.targetTableName; if (relation.relations) { Object.values(relation.relations).forEach(i => relations.push(i)); } } if (!dependencies.includes(tableModelMap[tableName])) { dependencies.push(tableModelMap[tableName]); } } return dependencies; } function getCollectionDependencies(data: Collection): string[] { return getDependencies(data.head()); } export class Repository implements IRepository { public model: IExtendedModel; public name: string; public path: string; public controller: Controller; public websocket: boolean; constructor( path: string, options: any, name: string, props: any, websocket = false, rest = true ) { tableModelMap[props.tableName] = name; this.name = name; this.path = path; this.websocket = websocket; props = { ...props, initialize() { if (websocket) { this.on("created", model => { eventBus.emit(`DB_CHANGE:${name.toUpperCase()}`); }); this.on("updated", model => { eventBus.emit(`DB_CHANGE:${name.toUpperCase()}`); }); } } }; this.model = bookshelf.model(name, props); this.makeCrud(name, options); this.makeReactive(name); if (rest) { this.controller = new Controller(path, (server, opts, next) => { server.get("/", this.model.findAll); server.get("/:id", this.model.findById); server.post("/", this.model.upsert); server.delete("/:id", this.model.remove); server.post("/:id/undelete", this.model.recover); }); } } public makeCrud(name: string, options: any) { this.model.findAll = async () => { return this.model.where("deleted", false).fetchAll(options) || []; }; this.model.findById = async (request, reply) => { const item = await this.model .where("id", request.params.id) .fetch(options); if (!item) { return Boom.badData( `Entity ${name} with ID ${request.params.id} not exists` ); } return item; }; this.model.upsert = async (request, reply) => { const item = request.body; if (!item) { return Boom.badData(`Entity ${name}, Undefined Body`); } return this.model.forge(item).save(); }; this.model.remove = async (request, reply) => { const id = request.params.id; if (!id) { return Boom.badData(`Entity ${name} undefined ID`); } const old = await this.model.where("id", id).fetch(); if (!old) { return Boom.badData(`Entity ${name} with ID ${id} not exists`); } return old.save({ deleted: true }); }; this.model.recover = async (request, reply) => { const item = request.body; if (!item) { return Boom.badData(`Entity ${name}, Undefined Body`); } const old = await this.model.where("id", item.id).fetch(); if (!old) { return Boom.badData(`Entity ${name} with ID ${item.id} not exists`); } return old.save({ deleted: false }); }; } public makeReactive(name: string) { this.model.watch = (options?: FetchOptions): Observable => { const subscriptions = []; return Observable.create(observer => { this.model.fetch(options).then(data => { observer.next(data); const listenTo = [name, ...getDependencies(data)]; listenTo.map(l => l.toUpperCase()).forEach(dep => { const sub = () => this.model.fetch(options).then(stream => observer.next(stream)); subscriptions.push(sub); eventBus.on(`DB_CHANGE:${dep}`, sub); }); }); }).finally(() => subscriptions.forEach(s => eventBus.unsubscribe(s))); }; this.model.watchAll = (options?: FetchAllOptions): Observable => { const subscriptions = []; return Observable.create(observer => { this.model.fetchAll(options).then(data => { observer.next(data); const listenTo = [name, ...getCollectionDependencies(data)]; listenTo.map(l => l.toUpperCase()).forEach(dep => { const sub = () => this.model .fetchAll(options) .then(stream => observer.next(stream)); subscriptions.push(sub); eventBus.on(`DB_CHANGE:${dep}`, sub); }); }); }).finally(() => subscriptions.forEach(s => eventBus.unsubscribe(s))); }; } }