1 | import { connect, Message } from 'amqplib';
|
2 |
|
3 | import { Source, SourceCallback, SourceOptions } from '@neoskop/paperboy';
|
4 |
|
5 | import { fetchDamAssets } from './dam.util';
|
6 | import { MagnoliaSourceOptions } from './magnolia-source-options.interface';
|
7 | import {
|
8 | fetchPages, fetchSitemap, fetchWorkspace, sanitizeJson, writePagesFile, writeWorkspaceFile
|
9 | } from './pages.util';
|
10 |
|
11 | import AsyncLock = require('async-lock');
|
12 |
|
13 | export class MagnoliaSource implements Source {
|
14 | private readonly options: MagnoliaSourceOptions;
|
15 | private readonly callback: SourceCallback;
|
16 | private readonly generationLock: AsyncLock = new AsyncLock({ maxPending: 1 });
|
17 |
|
18 | constructor(options: SourceOptions, callback: SourceCallback) {
|
19 | this.options = <MagnoliaSourceOptions>options;
|
20 | this.callback = callback;
|
21 | }
|
22 |
|
23 | public generate(): Promise<void> {
|
24 | return new Promise(async resolve => {
|
25 | const sitemap = await fetchSitemap(this.options);
|
26 | const website = await fetchPages(this.options);
|
27 | const pages = sitemap
|
28 | .map(path => website.find((page: any) => page['@path'] === path))
|
29 | .filter(page => typeof page !== 'undefined');
|
30 |
|
31 | const workspaces: { [workspace: string]: any } = {};
|
32 |
|
33 | if (this.options.magnolia.workspaces) {
|
34 | for (const workspace of this.options.magnolia.workspaces) {
|
35 | workspaces[workspace] = await fetchWorkspace(workspace, this.options);
|
36 | }
|
37 | }
|
38 |
|
39 |
|
40 | const nodes = pages.concat(
|
41 | Object.keys(workspaces).reduce((prev, current) => prev.concat(workspaces[current]), [])
|
42 | );
|
43 | const match = JSON.stringify(nodes).match(
|
44 | /jcr:([0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})/g
|
45 | );
|
46 |
|
47 | let damUuids = match ? match.map(id => id.substring(4)) : [];
|
48 | damUuids = damUuids.filter((id, pos) => {
|
49 | return damUuids.indexOf(id) === pos;
|
50 | });
|
51 |
|
52 | const damAssets = await fetchDamAssets(damUuids, this.options);
|
53 | const pagesObj: any = pages.map((page: any) =>
|
54 | sanitizeJson(page, damAssets, pages, this.options, workspaces)
|
55 | );
|
56 |
|
57 | await writePagesFile(pagesObj, this.options);
|
58 |
|
59 | if (this.options.magnolia.workspaces) {
|
60 | for (const workspace of Object.keys(workspaces)) {
|
61 | const workspaceData = workspaces[workspace];
|
62 |
|
63 | if (workspaceData) {
|
64 | const sanitized: any[] = [];
|
65 |
|
66 | for (const item of workspaceData) {
|
67 | sanitized.push(
|
68 | sanitizeJson(item, damAssets, workspaceData, this.options, workspaces)
|
69 | );
|
70 | }
|
71 |
|
72 | await writeWorkspaceFile(workspace, sanitized, this.options);
|
73 | }
|
74 | }
|
75 | }
|
76 |
|
77 | resolve();
|
78 | });
|
79 | }
|
80 |
|
81 | public async start(): Promise<void> {
|
82 | connect(this.options.queue.uri)
|
83 | .then(conn => {
|
84 | conn.on('error', this.retryConnection.bind(this, this.options.queue));
|
85 | return conn.createChannel();
|
86 | })
|
87 | .then(channel => {
|
88 | channel
|
89 | .assertExchange(this.options.queue.exchangeName || 'paperboy', 'fanout', {
|
90 | durable: false
|
91 | })
|
92 | .then(() => {
|
93 | return channel.assertQueue(null, {
|
94 | autoDelete: true
|
95 | });
|
96 | })
|
97 | .then(qok => {
|
98 | channel.bindQueue(qok.queue, this.options.queue.exchangeName || 'paperboy', '');
|
99 | channel.consume(qok.queue, this.consumeMessage.bind(this), {
|
100 | noAck: true
|
101 | });
|
102 | });
|
103 | })
|
104 | .catch(() => {
|
105 | this.retryConnection();
|
106 | });
|
107 | }
|
108 |
|
109 | private retryConnection() {
|
110 | console.info('Connection to queue failed, will retry in 10s...');
|
111 | setTimeout(() => {
|
112 | this.start();
|
113 | }, 10000);
|
114 | }
|
115 |
|
116 | private consumeMessage(message: Message | null) {
|
117 | console.info(
|
118 | "[x] from Magnolia: %s -> '%s'",
|
119 | message.fields.routingKey,
|
120 | message.content.toString()
|
121 | );
|
122 |
|
123 | this.generationLock.acquire(
|
124 | 'generationLock',
|
125 | done => {
|
126 | this.generate()
|
127 | .then(() => {
|
128 | this.callback().then(() => done());
|
129 | })
|
130 | .catch(err => {
|
131 | console.error('Generation failed.', err);
|
132 | done();
|
133 | });
|
134 | },
|
135 | (err, ret) => {
|
136 | if (err) {
|
137 | console.info('Already another pending message. Message discarded!');
|
138 | }
|
139 | }
|
140 | );
|
141 | }
|
142 | }
|